refactor(WebSocketShard): waitForEvent and its error handling (#9282)

* refactor(WebSocketShard): waitForEvent and its error handling

* chore: remove unnecessary error event

* chore: handle ECONNREFUSED/ECONNRESET

* fix: reset network error check

---------

Co-authored-by: Vlad Frangu <kingdgrizzle@gmail.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
DD
2023-04-11 14:10:32 +03:00
committed by GitHub
parent 676307ff5c
commit dcf58d8140

View File

@@ -1,7 +1,7 @@
/* eslint-disable id-length */
import { Buffer } from 'node:buffer';
import { once } from 'node:events';
import { setTimeout, clearInterval, clearTimeout, setInterval } from 'node:timers';
import { clearInterval, clearTimeout, setInterval, setTimeout } from 'node:timers';
import { setTimeout as sleep } from 'node:timers/promises';
import { URLSearchParams } from 'node:url';
import { TextDecoder } from 'node:util';
@@ -16,14 +16,14 @@ import {
GatewayOpcodes,
type GatewayDispatchPayload,
type GatewayIdentifyData,
type GatewayReadyDispatchData,
type GatewayReceivePayload,
type GatewaySendPayload,
type GatewayReadyDispatchData,
} from 'discord-api-types/v10';
import { WebSocket, type RawData } from 'ws';
import type { Inflate } from 'zlib-sync';
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy';
import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js';
import { ImportantGatewayOpcodes, getInitialSendRateLimitState } from '../utils/constants.js';
import type { SessionInfo } from './WebSocketManager.js';
// eslint-disable-next-line promise/prefer-await-to-then
@@ -101,14 +101,15 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private lastHeartbeatAt = -1;
private session: SessionInfo | null = null;
// Indicates whether the shard has already resolved its original connect() call
private initialConnectResolved = false;
// Indicates if we failed to connect to the ws url (ECONNREFUSED/ECONNRESET)
private failedToConnectDueToNetworkError = false;
private readonly sendQueue = new AsyncQueue();
private readonly timeouts = new Collection<WebSocketShardEvents, NodeJS.Timeout>();
private readonly timeoutAbortControllers = new Collection<WebSocketShardEvents, AbortController>();
private readonly strategy: IContextFetchingStrategy;
@@ -127,6 +128,14 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
public async connect() {
const promise = this.initialConnectResolved ? Promise.resolve() : once(this, WebSocketShardEvents.Ready);
void this.internalConnect();
await promise;
this.initialConnectResolved = true;
}
private async internalConnect() {
if (this.#status !== WebSocketShardStatus.Idle) {
throw new Error("Tried to connect a shard that wasn't idle");
}
@@ -149,7 +158,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}
const session = this.session ?? (await this.strategy.retrieveSessionInfo(this.id));
const session = await this.strategy.retrieveSessionInfo(this.id);
const url = `${session?.resumeURL ?? this.strategy.options.gatewayInformation.url}?${params.toString()}`;
this.debug([`Connecting to ${url}`]);
@@ -165,21 +174,16 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.sendRateLimitState = getInitialSendRateLimitState();
const { ok } = await this.bubbleWaitForEventError(
this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout),
);
const { ok } = await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout);
if (!ok) {
return;
}
if (session?.shardCount === this.strategy.options.shardCount) {
this.session = session;
await this.resume(session);
} else {
await this.identify();
}
this.initialConnectResolved = true;
}
public async destroy(options: WebSocketShardDestroyOptions = {}) {
@@ -212,9 +216,16 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.lastHeartbeatAt = -1;
for (const controller of this.timeoutAbortControllers.values()) {
controller.abort();
}
this.timeoutAbortControllers.clear();
this.failedToConnectDueToNetworkError = false;
// Clear session state if applicable
if (options.recover !== WebSocketShardDestroyRecovery.Resume && this.session) {
this.session = null;
if (options.recover !== WebSocketShardDestroyRecovery.Resume) {
await this.strategy.updateSessionInfo(this.id, null);
}
@@ -248,65 +259,50 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.#status = WebSocketShardStatus.Idle;
if (options.recover !== undefined) {
return this.connect();
return this.internalConnect();
}
}
private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise<void> {
private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise<{ ok: boolean }> {
this.debug([`Waiting for event ${event} ${timeoutDuration ? `for ${timeoutDuration}ms` : 'indefinitely'}`]);
const controller = new AbortController();
const timeout = timeoutDuration ? setTimeout(() => controller.abort(), timeoutDuration).unref() : null;
if (timeout) {
this.timeouts.set(event, timeout);
}
const timeoutController = new AbortController();
const timeout = timeoutDuration ? setTimeout(() => timeoutController.abort(), timeoutDuration).unref() : null;
await once(this, event, { signal: controller.signal }).finally(() => {
if (timeout) {
clearTimeout(timeout);
this.timeouts.delete(event);
}
});
}
this.timeoutAbortControllers.set(event, timeoutController);
const closeController = new AbortController();
/**
* Does special error handling for waitForEvent calls, depending on the current state of the connection lifecycle
* (i.e. whether or not the original connect() call has resolved or if the user has an error listener)
*/
private async bubbleWaitForEventError(
promise: Promise<unknown>,
): Promise<{ error: unknown; ok: false } | { ok: true }> {
try {
await promise;
return { ok: true };
} catch (error) {
const isAbortError = error instanceof Error && error.name === 'AbortError';
// If the first promise resolves, all is well. If the 2nd promise resolves,
// the shard has meanwhile closed. In that case, a destroy is already ongoing, so we just need to
// return false. Meanwhile, if something rejects (error event) or the first controller is aborted,
// we enter the catch block and trigger a destroy there.
const closed = await Promise.race<boolean>([
once(this, event, { signal: timeoutController.signal }).then(() => false),
once(this, WebSocketShardEvents.Closed, { signal: closeController.signal }).then(() => true),
]);
// Any error that isn't an abort error would have been caused by us emitting an error event in the first place
// See https://nodejs.org/api/events.html#eventsonceemitter-name-options for `once()` behavior
if (isAbortError) {
this.emit(WebSocketShardEvents.Error, { error: error as Error });
}
// As stated previously, any other error would have been caused by us emitting the error event, which looks
// like { error: unknown }
// eslint-disable-next-line no-ex-assign
error = error instanceof Error ? error : (error as { error: unknown }).error;
// If the user has no handling on their end (error event) simply throw.
// We also want to throw if we're still in the initial `connect()` call, since that's the only time
// the user can catch the error "normally"
if (this.listenerCount(WebSocketShardEvents.Error) === 0 || !this.initialConnectResolved) {
throw error;
}
// If the error is handled, we can just try to reconnect
return { ok: !closed };
} catch {
// If we're here because of other reasons, we need to destroy the shard
void this.destroy({
code: CloseCodes.Normal,
reason: 'Something timed out or went wrong while waiting for an event',
recover: WebSocketShardDestroyRecovery.Reconnect,
});
return { ok: false, error };
return { ok: false };
} finally {
if (timeout) {
clearTimeout(timeout);
}
this.timeoutAbortControllers.delete(event);
// Clean up the close listener to not leak memory
if (!closeController.signal.aborted) {
closeController.abort();
}
}
}
@@ -393,13 +389,17 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
d,
});
await this.bubbleWaitForEventError(
this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout),
);
await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
}
private async resume(session: SessionInfo) {
this.debug(['Resuming session']);
this.debug([
'Resuming session',
`resume url: ${session.resumeURL}`,
`sequence: ${session.sequence}`,
`shard id: ${this.id.toString()}`,
]);
this.#status = WebSocketShardStatus.Resuming;
this.replayedEvents = 0;
return this.send({
@@ -417,9 +417,11 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
return this.destroy({ reason: 'Zombie connection', recover: WebSocketShardDestroyRecovery.Resume });
}
const session = await this.strategy.retrieveSessionInfo(this.id);
await this.send({
op: GatewayOpcodes.Heartbeat,
d: this.session?.sequence ?? null,
d: session?.sequence ?? null,
});
this.lastHeartbeatAt = Date.now();
@@ -506,7 +508,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
case GatewayDispatchEvents.Ready: {
this.#status = WebSocketShardStatus.Ready;
this.session ??= {
const session = {
sequence: payload.s,
sessionId: payload.d.session_id,
shardId: this.id,
@@ -514,7 +516,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
resumeURL: payload.d.resume_gateway_url,
};
await this.strategy.updateSessionInfo(this.id, this.session);
await this.strategy.updateSessionInfo(this.id, session);
this.emit(WebSocketShardEvents.Ready, { data: payload.d });
break;
@@ -532,9 +534,15 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
}
if (this.session && payload.s > this.session.sequence) {
this.session.sequence = payload.s;
await this.strategy.updateSessionInfo(this.id, this.session);
const session = await this.strategy.retrieveSessionInfo(this.id);
if (session) {
if (payload.s > session.sequence) {
await this.strategy.updateSessionInfo(this.id, { ...session, sequence: payload.s });
}
} else {
this.debug([
`Received a ${payload.t} event but no session is available. Session information cannot be re-constructed in this state without a full reconnect`,
]);
}
this.emit(WebSocketShardEvents.Dispatch, { data: payload });
@@ -556,10 +564,8 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
case GatewayOpcodes.InvalidSession: {
const readyTimeout = this.timeouts.get(WebSocketShardEvents.Ready);
readyTimeout?.refresh();
this.debug([`Invalid session; will attempt to resume: ${payload.d.toString()}`]);
const session = this.session ?? (await this.strategy.retrieveSessionInfo(this.id));
const session = await this.strategy.retrieveSessionInfo(this.id);
if (payload.d && session) {
await this.resume(session);
} else {
@@ -612,6 +618,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
private onError(error: Error) {
if ('code' in error && ['ECONNRESET', 'ECONNREFUSED'].includes(error.code as string)) {
this.debug(['Failed to connect to the gateway URL specified due to a network error']);
this.failedToConnectDueToNetworkError = true;
return;
}
this.emit(WebSocketShardEvents.Error, { error });
}
@@ -696,8 +708,17 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
default: {
this.debug([`The gateway closed with an unexpected code ${code}, attempting to resume.`]);
return this.destroy({ code, recover: WebSocketShardDestroyRecovery.Resume });
this.debug([
`The gateway closed with an unexpected code ${code}, attempting to ${
this.failedToConnectDueToNetworkError ? 'reconnect' : 'resume'
}.`,
]);
return this.destroy({
code,
recover: this.failedToConnectDueToNetworkError
? WebSocketShardDestroyRecovery.Reconnect
: WebSocketShardDestroyRecovery.Resume,
});
}
}
}