From 40b504a2088effc6a467f40ac3cf2a6d736ab209 Mon Sep 17 00:00:00 2001 From: DD Date: Thu, 1 Dec 2022 12:58:00 +0200 Subject: [PATCH] fix(WebSocketShard): send ratelimit handling (#8887) * fix(WebSocketShard): send ratelimit handling * chore: remove unnecessary else Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- packages/ws/src/utils/constants.ts | 8 +++ packages/ws/src/ws/WebSocketShard.ts | 76 ++++++++++++++++++++-------- 2 files changed, 63 insertions(+), 21 deletions(-) diff --git a/packages/ws/src/utils/constants.ts b/packages/ws/src/utils/constants.ts index 72d20ec65..6dc72c490 100644 --- a/packages/ws/src/utils/constants.ts +++ b/packages/ws/src/utils/constants.ts @@ -3,6 +3,7 @@ import { Collection } from '@discordjs/collection'; import { lazy } from '@discordjs/util'; import { APIVersion, GatewayOpcodes } from 'discord-api-types/v10'; import type { OptionalWebSocketManagerOptions, SessionInfo } from '../ws/WebSocketManager.js'; +import type { SendRateLimitState } from '../ws/WebSocketShard.js'; /** * Valid encoding types @@ -60,3 +61,10 @@ export const ImportantGatewayOpcodes = new Set([ GatewayOpcodes.Identify, GatewayOpcodes.Resume, ]); + +export function getInitialSendRateLimitState(): SendRateLimitState { + return { + remaining: 120, + resetAt: Date.now() + 60_000, + }; +} diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index e78ec14e1..04624381f 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -23,13 +23,14 @@ import { import { WebSocket, type RawData } from 'ws'; import type { Inflate } from 'zlib-sync'; import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy'; -import { ImportantGatewayOpcodes } from '../utils/constants.js'; +import { getInitialSendRateLimitState, ImportantGatewayOpcodes } from '../utils/constants.js'; import type { SessionInfo } from './WebSocketManager.js'; // eslint-disable-next-line promise/prefer-await-to-then const getZlibSync = lazy(async () => import('zlib-sync').then((mod) => mod.default).catch(() => null)); export enum WebSocketShardEvents { + Closed = 'closed', Debug = 'debug', Dispatch = 'dispatch', Hello = 'hello', @@ -51,6 +52,7 @@ export enum WebSocketShardDestroyRecovery { // eslint-disable-next-line @typescript-eslint/consistent-type-definitions export type WebSocketShardEventsMap = { + [WebSocketShardEvents.Closed]: [{ code: number }]; [WebSocketShardEvents.Debug]: [payload: { message: string }]; [WebSocketShardEvents.Hello]: []; [WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }]; @@ -69,6 +71,11 @@ export enum CloseCodes { Resuming = 4_200, } +export interface SendRateLimitState { + remaining: number; + resetAt: number; +} + export class WebSocketShard extends AsyncEventEmitter { private connection: WebSocket | null = null; @@ -86,10 +93,7 @@ export class WebSocketShard extends AsyncEventEmitter { private isAck = true; - private sendRateLimitState = { - remaining: 120, - resetAt: Date.now(), - }; + private sendRateLimitState: SendRateLimitState = getInitialSendRateLimitState(); private heartbeatInterval: NodeJS.Timer | null = null; @@ -146,6 +150,8 @@ export class WebSocketShard extends AsyncEventEmitter { this.status = WebSocketShardStatus.Connecting; + this.sendRateLimitState = getInitialSendRateLimitState(); + await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout); if (session?.shardCount === this.strategy.options.shardCount) { @@ -187,22 +193,32 @@ export class WebSocketShard extends AsyncEventEmitter { await this.strategy.updateSessionInfo(this.id, null); } - if ( - this.connection && - (this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING) - ) { + if (this.connection) { // No longer need to listen to messages this.connection.removeAllListeners('message'); // Prevent a reconnection loop by unbinding the main close event this.connection.removeAllListeners('close'); - this.connection.close(options.code, options.reason); - // Actually wait for the connection to close - await once(this.connection, 'close'); + const shouldClose = + this.connection.readyState === WebSocket.OPEN || this.connection.readyState === WebSocket.CONNECTING; + + this.debug([ + 'Connection status during destroy', + `Needs closing: ${shouldClose}`, + `Ready state: ${this.connection.readyState}`, + ]); + + if (shouldClose) { + this.connection.close(options.code, options.reason); + await once(this.connection, 'close'); + this.emit(WebSocketShardEvents.Closed, { code: options.code }); + } // Lastly, remove the error event. // Doing this earlier would cause a hard crash in case an error event fired on our `close` call this.connection.removeAllListeners('error'); + } else { + this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']); } this.status = WebSocketShardStatus.Idle; @@ -227,26 +243,44 @@ export class WebSocketShard extends AsyncEventEmitter { } } - public async send(payload: GatewaySendPayload) { + public async send(payload: GatewaySendPayload): Promise { if (!this.connection) { throw new Error("WebSocketShard wasn't connected"); } if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) { + this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']); await once(this, WebSocketShardEvents.Ready); } await this.sendQueue.wait(); if (--this.sendRateLimitState.remaining <= 0) { - if (this.sendRateLimitState.resetAt < Date.now()) { - await sleep(Date.now() - this.sendRateLimitState.resetAt); + const now = Date.now(); + + if (this.sendRateLimitState.resetAt > now) { + const sleepFor = this.sendRateLimitState.resetAt - now; + + this.debug([`Was about to hit the send rate limit, sleeping for ${sleepFor}ms`]); + const controller = new AbortController(); + + // Sleep for the remaining time, but if the connection closes in the meantime, we shouldn't wait the remainder to avoid blocking the new conn + const interrupted = await Promise.race([ + sleep(sleepFor).then(() => false), + once(this, WebSocketShardEvents.Closed, { signal: controller.signal }).then(() => true), + ]); + + if (interrupted) { + this.debug(['Connection closed while waiting for the send rate limit to reset, re-queueing payload']); + this.sendQueue.shift(); + return this.send(payload); + } + + // This is so the listener from the `once` call is removed + controller.abort(); } - this.sendRateLimitState = { - remaining: 119, - resetAt: Date.now() + 60_000, - }; + this.sendRateLimitState = getInitialSendRateLimitState(); } this.sendQueue.shift(); @@ -476,9 +510,10 @@ export class WebSocketShard extends AsyncEventEmitter { } private async onClose(code: number) { + this.emit(WebSocketShardEvents.Closed, { code }); + switch (code) { case CloseCodes.Normal: { - this.debug([`Disconnected normally from code ${code}`]); return this.destroy({ code, reason: 'Got disconnected by Discord', @@ -487,7 +522,6 @@ export class WebSocketShard extends AsyncEventEmitter { } case CloseCodes.Resuming: { - this.debug([`Disconnected normally from code ${code}`]); break; }