diff --git a/packages/ws/src/utils/constants.ts b/packages/ws/src/utils/constants.ts index a99f817f9..c901c23b2 100644 --- a/packages/ws/src/utils/constants.ts +++ b/packages/ws/src/utils/constants.ts @@ -71,7 +71,7 @@ export const ImportantGatewayOpcodes = new Set([ export function getInitialSendRateLimitState(): SendRateLimitState { return { - remaining: 120, + sent: 0, resetAt: Date.now() + 60_000, }; } diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index c1e14cdbd..5af2d2597 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -75,8 +75,8 @@ export enum CloseCodes { } export interface SendRateLimitState { - remaining: number; resetAt: number; + sent: number; } const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket() @@ -203,12 +203,14 @@ export class WebSocketShard extends AsyncEventEmitter { void this.onClose(event.code); }; + connection.onopen = () => { + this.sendRateLimitState = getInitialSendRateLimitState(); + }; + this.connection = connection; this.#status = WebSocketShardStatus.Connecting; - this.sendRateLimitState = getInitialSendRateLimitState(); - const { ok } = await this.waitForEvent(WebSocketShardEvents.Hello, this.strategy.options.helloTimeout); if (!ok) { return; @@ -357,6 +359,15 @@ export class WebSocketShard extends AsyncEventEmitter { throw new Error("WebSocketShard wasn't connected"); } + // Generally, the way we treat payloads is 115/60 seconds. The actual limit is 120/60, so we have a bit of leeway. + // We use that leeway for those special payloads that we just fire with no checking, since there's no shot we ever + // send more than 5 of those in a 60 second interval. This way we can avoid more complex queueing logic. + + if (ImportantGatewayOpcodes.has(payload.op)) { + this.connection.send(JSON.stringify(payload)); + return; + } + if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) { this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']); // This will throw if the shard throws an error event in the meantime, just requeue the payload @@ -369,34 +380,36 @@ export class WebSocketShard extends AsyncEventEmitter { await this.sendQueue.wait(); - if (--this.sendRateLimitState.remaining <= 0) { - const now = Date.now(); - - if (this.sendRateLimitState.resetAt > now) { - const sleepFor = this.sendRateLimitState.resetAt - now; - - this.debug([`Was about to hit the send rate limit, sleeping for ${sleepFor}ms`]); - const controller = new AbortController(); - - // Sleep for the remaining time, but if the connection closes in the meantime, we shouldn't wait the remainder to avoid blocking the new conn - const interrupted = await Promise.race([ - sleep(sleepFor).then(() => false), - once(this, WebSocketShardEvents.Closed, { signal: controller.signal }).then(() => true), - ]); - - if (interrupted) { - this.debug(['Connection closed while waiting for the send rate limit to reset, re-queueing payload']); - this.sendQueue.shift(); - return this.send(payload); - } - - // This is so the listener from the `once` call is removed - controller.abort(); - } - + const now = Date.now(); + if (now >= this.sendRateLimitState.resetAt) { this.sendRateLimitState = getInitialSendRateLimitState(); } + if (this.sendRateLimitState.sent + 1 >= 115) { + // Sprinkle in a little randomness just in case. + const sleepFor = this.sendRateLimitState.resetAt - now + Math.random() * 1_500; + + this.debug([`Was about to hit the send rate limit, sleeping for ${sleepFor}ms`]); + const controller = new AbortController(); + + // Sleep for the remaining time, but if the connection closes in the meantime, we shouldn't wait the remainder to avoid blocking the new conn + const interrupted = await Promise.race([ + sleep(sleepFor).then(() => false), + once(this, WebSocketShardEvents.Closed, { signal: controller.signal }).then(() => true), + ]); + + if (interrupted) { + this.debug(['Connection closed while waiting for the send rate limit to reset, re-queueing payload']); + this.sendQueue.shift(); + return this.send(payload); + } + + // This is so the listener from the `once` call is removed + controller.abort(); + } + + this.sendRateLimitState.sent++; + this.sendQueue.shift(); this.connection.send(JSON.stringify(payload)); }