diff --git a/packages/brokers/package.json b/packages/brokers/package.json index 8132b8baa..e61d382a2 100644 --- a/packages/brokers/package.json +++ b/packages/brokers/package.json @@ -58,7 +58,7 @@ "homepage": "https://discord.js.org", "dependencies": { "@msgpack/msgpack": "^2.8.0", - "@vladfrangu/async_event_emitter": "^2.1.3", + "@vladfrangu/async_event_emitter": "^2.1.4", "ioredis": "^5.2.4" }, "devDependencies": { diff --git a/packages/core/package.json b/packages/core/package.json index 519b44f73..732809faf 100644 --- a/packages/core/package.json +++ b/packages/core/package.json @@ -50,7 +50,7 @@ "@discordjs/util": "workspace:^", "@discordjs/ws": "workspace:^", "@sapphire/snowflake": "^3.4.0", - "@vladfrangu/async_event_emitter": "^2.1.3", + "@vladfrangu/async_event_emitter": "^2.1.4", "discord-api-types": "^0.37.35" }, "devDependencies": { diff --git a/packages/ws/package.json b/packages/ws/package.json index 3a09d2790..d4709a99b 100644 --- a/packages/ws/package.json +++ b/packages/ws/package.json @@ -64,7 +64,7 @@ "@discordjs/util": "workspace:^", "@sapphire/async-queue": "^1.5.0", "@types/ws": "^8.5.4", - "@vladfrangu/async_event_emitter": "^2.1.3", + "@vladfrangu/async_event_emitter": "^2.1.4", "discord-api-types": "^0.37.35", "tslib": "^2.4.1", "ws": "^8.12.0" diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index 3861888f0..e868ad5ca 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -33,6 +33,7 @@ export enum WebSocketShardEvents { Closed = 'closed', Debug = 'debug', Dispatch = 'dispatch', + Error = 'error', HeartbeatComplete = 'heartbeat', Hello = 'hello', Ready = 'ready', @@ -56,6 +57,7 @@ export type WebSocketShardEventsMap = { [WebSocketShardEvents.Closed]: [{ code: number }]; [WebSocketShardEvents.Debug]: [payload: { message: string }]; [WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }]; + [WebSocketShardEvents.Error]: [payload: { error: Error }]; [WebSocketShardEvents.Hello]: []; [WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }]; [WebSocketShardEvents.Resumed]: []; @@ -99,6 +101,9 @@ export class WebSocketShard extends AsyncEventEmitter { private session: SessionInfo | null = null; + // Indicates whether the shard has already resolved its original connect() call + private initialConnectResolved = false; + private readonly sendQueue = new AsyncQueue(); private readonly timeouts = new Collection(); @@ -158,7 +163,12 @@ export class WebSocketShard extends AsyncEventEmitter { this.sendRateLimitState = getInitialSendRateLimitState(); - await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout); + const { ok } = await this.bubbleWaitForEventError( + this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout), + ); + if (!ok) { + return; + } if (session?.shardCount === this.strategy.options.shardCount) { this.session = session; @@ -166,6 +176,8 @@ export class WebSocketShard extends AsyncEventEmitter { } else { await this.identify(); } + + this.initialConnectResolved = true; } public async destroy(options: WebSocketShardDestroyOptions = {}) { @@ -234,18 +246,59 @@ export class WebSocketShard extends AsyncEventEmitter { } } - private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null) { - this.debug([`Waiting for event ${event} for ${timeoutDuration ? `${timeoutDuration}ms` : 'indefinitely'}`]); + private async waitForEvent(event: WebSocketShardEvents, timeoutDuration?: number | null): Promise { + this.debug([`Waiting for event ${event} ${timeoutDuration ? `for ${timeoutDuration}ms` : 'indefinitely'}`]); const controller = new AbortController(); const timeout = timeoutDuration ? setTimeout(() => controller.abort(), timeoutDuration).unref() : null; if (timeout) { this.timeouts.set(event, timeout); } - await once(this, event, { signal: controller.signal }); - if (timeout) { - clearTimeout(timeout); - this.timeouts.delete(event); + await once(this, event, { signal: controller.signal }).finally(() => { + if (timeout) { + clearTimeout(timeout); + this.timeouts.delete(event); + } + }); + } + + /** + * Does special error handling for waitForEvent calls, depending on the current state of the connection lifecycle + * (i.e. whether or not the original connect() call has resolved or if the user has an error listener) + */ + private async bubbleWaitForEventError( + promise: Promise, + ): Promise<{ error: unknown; ok: false } | { ok: true }> { + try { + await promise; + return { ok: true }; + } catch (error) { + // Any error that isn't an abort error would have been caused by us emitting an error event in the first place + // See https://nodejs.org/api/events.html#eventsonceemitter-name-options for `once()` behavior + if (error instanceof Error && error.name === 'AbortError') { + this.emit(WebSocketShardEvents.Error, { error }); + } + + // As stated previously, any other error would have been caused by us emitting the error event, which looks + // like { error: unknown } + // eslint-disable-next-line no-ex-assign + error = (error as { error: unknown }).error; + + // If the user has no handling on their end (error event) simply throw. + // We also want to throw if we're still in the initial `connect()` call, since that's the only time + // the user can catch the error "normally" + if (this.listenerCount(WebSocketShardEvents.Error) === 0 || !this.initialConnectResolved) { + throw error; + } + + // If the error is handled, we can just try to reconnect + await this.destroy({ + code: CloseCodes.Normal, + reason: 'Something timed out', + recover: WebSocketShardDestroyRecovery.Reconnect, + }); + + return { ok: false, error }; } } @@ -256,7 +309,12 @@ export class WebSocketShard extends AsyncEventEmitter { if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) { this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']); - await once(this, WebSocketShardEvents.Ready); + // This will throw if the shard throws an error event in the meantime, just requeue the payload + try { + await once(this, WebSocketShardEvents.Ready); + } catch { + return this.send(payload); + } } await this.sendQueue.wait(); @@ -325,7 +383,13 @@ export class WebSocketShard extends AsyncEventEmitter { d, }); - await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout); + const { ok } = await this.bubbleWaitForEventError( + this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout), + ); + if (!ok) { + return; + } + this.#status = WebSocketShardStatus.Ready; } @@ -393,7 +457,9 @@ export class WebSocketShard extends AsyncEventEmitter { this.inflate.push(Buffer.from(decompressable), flush ? zlib.Z_SYNC_FLUSH : zlib.Z_NO_FLUSH); if (this.inflate.err) { - this.emit('error', `${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`); + this.emit(WebSocketShardEvents.Error, { + error: new Error(`${this.inflate.err}${this.inflate.msg ? `: ${this.inflate.msg}` : ''}`), + }); } if (!flush) { @@ -521,8 +587,8 @@ export class WebSocketShard extends AsyncEventEmitter { } } - private onError(err: Error) { - this.emit('error', err); + private onError(error: Error) { + this.emit(WebSocketShardEvents.Error, { error }); } private async onClose(code: number) { diff --git a/yarn.lock b/yarn.lock index 243683d7b..856a4f169 100644 --- a/yarn.lock +++ b/yarn.lock @@ -2039,7 +2039,7 @@ __metadata: "@msgpack/msgpack": ^2.8.0 "@types/node": 16.18.11 "@vitest/coverage-c8": ^0.27.1 - "@vladfrangu/async_event_emitter": ^2.1.3 + "@vladfrangu/async_event_emitter": ^2.1.4 cross-env: ^7.0.3 eslint: ^8.31.0 eslint-config-neon: ^0.1.40 @@ -2112,7 +2112,7 @@ __metadata: "@sapphire/snowflake": ^3.4.0 "@types/node": 16.18.11 "@vitest/coverage-c8": ^0.27.1 - "@vladfrangu/async_event_emitter": ^2.1.3 + "@vladfrangu/async_event_emitter": ^2.1.4 cross-env: ^7.0.3 discord-api-types: ^0.37.35 eslint: ^8.31.0 @@ -2516,7 +2516,7 @@ __metadata: "@types/node": 16.18.11 "@types/ws": ^8.5.4 "@vitest/coverage-c8": ^0.27.1 - "@vladfrangu/async_event_emitter": ^2.1.3 + "@vladfrangu/async_event_emitter": ^2.1.4 cross-env: ^7.0.3 discord-api-types: ^0.37.35 esbuild-plugin-version-injector: ^1.0.2 @@ -5346,10 +5346,10 @@ __metadata: languageName: node linkType: hard -"@vladfrangu/async_event_emitter@npm:^2.1.3": - version: 2.1.3 - resolution: "@vladfrangu/async_event_emitter@npm:2.1.3" - checksum: 1541b281550b39446f86ea9d4622be0d74c4d3924b42550db11164b409a82010f396b588a87ffe27f72a96a7f92af0190f4c3b57861249a4038515e0d474b3c6 +"@vladfrangu/async_event_emitter@npm:^2.1.4": + version: 2.1.4 + resolution: "@vladfrangu/async_event_emitter@npm:2.1.4" + checksum: 604d228a4fa46c0686d4377c2ca63035aa266382133f351f098d85782df4e451ebba2c528a7d54aa955c7fdb824a642a7ec63d5a85cf46f6cbaea46ea56a0959 languageName: node linkType: hard