diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index 70347d0d8..abf1f10ad 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -1,7 +1,7 @@ /* eslint-disable id-length */ import { Buffer } from 'node:buffer'; import { once } from 'node:events'; -import { setTimeout, clearInterval, clearTimeout, setInterval } from 'node:timers'; +import { clearInterval, clearTimeout, setInterval, setTimeout } from 'node:timers'; import { setTimeout as sleep } from 'node:timers/promises'; import { URLSearchParams } from 'node:url'; import { TextDecoder } from 'node:util'; @@ -16,14 +16,14 @@ import { GatewayOpcodes, type GatewayDispatchPayload, type GatewayIdentifyData, + type GatewayReadyDispatchData, type GatewayReceivePayload, type GatewaySendPayload, - type GatewayReadyDispatchData, } from 'discord-api-types/v10'; import { WebSocket, type RawData } from 'ws'; import type { Inflate } from 'zlib-sync'; import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy'; -import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js'; +import { ImportantGatewayOpcodes, getInitialSendRateLimitState } from '../utils/constants.js'; import type { SessionInfo } from './WebSocketManager.js'; // eslint-disable-next-line promise/prefer-await-to-then @@ -101,14 +101,15 @@ export class WebSocketShard extends AsyncEventEmitter { private lastHeartbeatAt = -1; - private session: SessionInfo | null = null; - // Indicates whether the shard has already resolved its original connect() call private initialConnectResolved = false; + // Indicates if we failed to connect to the ws url (ECONNREFUSED/ECONNRESET) + private failedToConnectDueToNetworkError = false; + private readonly sendQueue = new AsyncQueue(); - private readonly timeouts = new Collection(); + private readonly timeoutAbortControllers = new Collection(); private readonly strategy: IContextFetchingStrategy; @@ -127,6 +128,14 @@ export class WebSocketShard extends AsyncEventEmitter { } public async connect() { + const promise = this.initialConnectResolved ? Promise.resolve() : once(this, WebSocketShardEvents.Ready); + void this.internalConnect(); + + await promise; + this.initialConnectResolved = true; + } + + private async internalConnect() { if (this.#status !== WebSocketShardStatus.Idle) { throw new Error("Tried to connect a shard that wasn't idle"); } @@ -149,7 +158,7 @@ export class WebSocketShard extends AsyncEventEmitter { } } - const session = this.session ?? (await this.strategy.retrieveSessionInfo(this.id)); + const session = await this.strategy.retrieveSessionInfo(this.id); const url = `${session?.resumeURL ?? this.strategy.options.gatewayInformation.url}?${params.toString()}`; this.debug([`Connecting to ${url}`]); @@ -165,21 +174,16 @@ export class WebSocketShard extends AsyncEventEmitter { this.sendRateLimitState = getInitialSendRateLimitState(); - const { ok } = await this.bubbleWaitForEventError( - this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout), - ); + const { ok } = await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout); if (!ok) { return; } if (session?.shardCount === this.strategy.options.shardCount) { - this.session = session; await this.resume(session); } else { await this.identify(); } - - this.initialConnectResolved = true; } public async destroy(options: WebSocketShardDestroyOptions = {}) { @@ -212,9 +216,16 @@ export class WebSocketShard extends AsyncEventEmitter { this.lastHeartbeatAt = -1; + for (const controller of this.timeoutAbortControllers.values()) { + controller.abort(); + } + + this.timeoutAbortControllers.clear(); + + this.failedToConnectDueToNetworkError = false; + // Clear session state if applicable - if (options.recover !== WebSocketShardDestroyRecovery.Resume && this.session) { - this.session = null; + if (options.recover !== WebSocketShardDestroyRecovery.Resume) { await this.strategy.updateSessionInfo(this.id, null); } @@ -248,65 +259,50 @@ export class WebSocketShard extends AsyncEventEmitter { this.#status = WebSocketShardStatus.Idle; if (options.recover !== undefined) { - return this.connect(); + return this.internalConnect(); } } - private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise { + private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise<{ ok: boolean }> { this.debug([`Waiting for event ${event} ${timeoutDuration ? `for ${timeoutDuration}ms` : 'indefinitely'}`]); - const controller = new AbortController(); - const timeout = timeoutDuration ? setTimeout(() => controller.abort(), timeoutDuration).unref() : null; - if (timeout) { - this.timeouts.set(event, timeout); - } + const timeoutController = new AbortController(); + const timeout = timeoutDuration ? setTimeout(() => timeoutController.abort(), timeoutDuration).unref() : null; - await once(this, event, { signal: controller.signal }).finally(() => { - if (timeout) { - clearTimeout(timeout); - this.timeouts.delete(event); - } - }); - } + this.timeoutAbortControllers.set(event, timeoutController); + + const closeController = new AbortController(); - /** - * Does special error handling for waitForEvent calls, depending on the current state of the connection lifecycle - * (i.e. whether or not the original connect() call has resolved or if the user has an error listener) - */ - private async bubbleWaitForEventError( - promise: Promise, - ): Promise<{ error: unknown; ok: false } | { ok: true }> { try { - await promise; - return { ok: true }; - } catch (error) { - const isAbortError = error instanceof Error && error.name === 'AbortError'; + // If the first promise resolves, all is well. If the 2nd promise resolves, + // the shard has meanwhile closed. In that case, a destroy is already ongoing, so we just need to + // return false. Meanwhile, if something rejects (error event) or the first controller is aborted, + // we enter the catch block and trigger a destroy there. + const closed = await Promise.race([ + once(this, event, { signal: timeoutController.signal }).then(() => false), + once(this, WebSocketShardEvents.Closed, { signal: closeController.signal }).then(() => true), + ]); - // Any error that isn't an abort error would have been caused by us emitting an error event in the first place - // See https://nodejs.org/api/events.html#eventsonceemitter-name-options for `once()` behavior - if (isAbortError) { - this.emit(WebSocketShardEvents.Error, { error: error as Error }); - } - - // As stated previously, any other error would have been caused by us emitting the error event, which looks - // like { error: unknown } - // eslint-disable-next-line no-ex-assign - error = error instanceof Error ? error : (error as { error: unknown }).error; - - // If the user has no handling on their end (error event) simply throw. - // We also want to throw if we're still in the initial `connect()` call, since that's the only time - // the user can catch the error "normally" - if (this.listenerCount(WebSocketShardEvents.Error) === 0 || !this.initialConnectResolved) { - throw error; - } - - // If the error is handled, we can just try to reconnect + return { ok: !closed }; + } catch { + // If we're here because of other reasons, we need to destroy the shard void this.destroy({ code: CloseCodes.Normal, reason: 'Something timed out or went wrong while waiting for an event', recover: WebSocketShardDestroyRecovery.Reconnect, }); - return { ok: false, error }; + return { ok: false }; + } finally { + if (timeout) { + clearTimeout(timeout); + } + + this.timeoutAbortControllers.delete(event); + + // Clean up the close listener to not leak memory + if (!closeController.signal.aborted) { + closeController.abort(); + } } } @@ -393,13 +389,17 @@ export class WebSocketShard extends AsyncEventEmitter { d, }); - await this.bubbleWaitForEventError( - this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout), - ); + await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout); } private async resume(session: SessionInfo) { - this.debug(['Resuming session']); + this.debug([ + 'Resuming session', + `resume url: ${session.resumeURL}`, + `sequence: ${session.sequence}`, + `shard id: ${this.id.toString()}`, + ]); + this.#status = WebSocketShardStatus.Resuming; this.replayedEvents = 0; return this.send({ @@ -417,9 +417,11 @@ export class WebSocketShard extends AsyncEventEmitter { return this.destroy({ reason: 'Zombie connection', recover: WebSocketShardDestroyRecovery.Resume }); } + const session = await this.strategy.retrieveSessionInfo(this.id); + await this.send({ op: GatewayOpcodes.Heartbeat, - d: this.session?.sequence ?? null, + d: session?.sequence ?? null, }); this.lastHeartbeatAt = Date.now(); @@ -506,7 +508,7 @@ export class WebSocketShard extends AsyncEventEmitter { case GatewayDispatchEvents.Ready: { this.#status = WebSocketShardStatus.Ready; - this.session ??= { + const session = { sequence: payload.s, sessionId: payload.d.session_id, shardId: this.id, @@ -514,7 +516,7 @@ export class WebSocketShard extends AsyncEventEmitter { resumeURL: payload.d.resume_gateway_url, }; - await this.strategy.updateSessionInfo(this.id, this.session); + await this.strategy.updateSessionInfo(this.id, session); this.emit(WebSocketShardEvents.Ready, { data: payload.d }); break; @@ -532,9 +534,15 @@ export class WebSocketShard extends AsyncEventEmitter { } } - if (this.session && payload.s > this.session.sequence) { - this.session.sequence = payload.s; - await this.strategy.updateSessionInfo(this.id, this.session); + const session = await this.strategy.retrieveSessionInfo(this.id); + if (session) { + if (payload.s > session.sequence) { + await this.strategy.updateSessionInfo(this.id, { ...session, sequence: payload.s }); + } + } else { + this.debug([ + `Received a ${payload.t} event but no session is available. Session information cannot be re-constructed in this state without a full reconnect`, + ]); } this.emit(WebSocketShardEvents.Dispatch, { data: payload }); @@ -556,10 +564,8 @@ export class WebSocketShard extends AsyncEventEmitter { } case GatewayOpcodes.InvalidSession: { - const readyTimeout = this.timeouts.get(WebSocketShardEvents.Ready); - readyTimeout?.refresh(); this.debug([`Invalid session; will attempt to resume: ${payload.d.toString()}`]); - const session = this.session ?? (await this.strategy.retrieveSessionInfo(this.id)); + const session = await this.strategy.retrieveSessionInfo(this.id); if (payload.d && session) { await this.resume(session); } else { @@ -612,6 +618,12 @@ export class WebSocketShard extends AsyncEventEmitter { } private onError(error: Error) { + if ('code' in error && ['ECONNRESET', 'ECONNREFUSED'].includes(error.code as string)) { + this.debug(['Failed to connect to the gateway URL specified due to a network error']); + this.failedToConnectDueToNetworkError = true; + return; + } + this.emit(WebSocketShardEvents.Error, { error }); } @@ -696,8 +708,17 @@ export class WebSocketShard extends AsyncEventEmitter { } default: { - this.debug([`The gateway closed with an unexpected code ${code}, attempting to resume.`]); - return this.destroy({ code, recover: WebSocketShardDestroyRecovery.Resume }); + this.debug([ + `The gateway closed with an unexpected code ${code}, attempting to ${ + this.failedToConnectDueToNetworkError ? 'reconnect' : 'resume' + }.`, + ]); + return this.destroy({ + code, + recover: this.failedToConnectDueToNetworkError + ? WebSocketShardDestroyRecovery.Reconnect + : WebSocketShardDestroyRecovery.Resume, + }); } } }