From 0ff67d8e7adee43ff82bbf072dac9a4c7c9fe8c2 Mon Sep 17 00:00:00 2001 From: DD Date: Fri, 6 Jan 2023 14:00:47 +0200 Subject: [PATCH] feat(ws): metrics (#9005) * feat(WebSocketManager): fetch status * feat(WebSocketShard): heartbeat event * chore: ci Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../strategy/WorkerShardingStrategy.test.ts | 4 ++ .../strategies/sharding/IShardingStrategy.ts | 7 ++- .../sharding/SimpleShardingStrategy.ts | 7 +++ .../sharding/WorkerShardingStrategy.ts | 45 ++++++++++++++++--- packages/ws/src/strategies/sharding/worker.ts | 16 +++++++ packages/ws/src/ws/WebSocketManager.ts | 6 ++- packages/ws/src/ws/WebSocketShard.ts | 39 ++++++++++------ 7 files changed, 104 insertions(+), 20 deletions(-) diff --git a/packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts b/packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts index 141dfa4b4..18809afe7 100644 --- a/packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts +++ b/packages/ws/__tests__/strategy/WorkerShardingStrategy.test.ts @@ -114,6 +114,10 @@ vi.mock('node:worker_threads', async () => { case WorkerSendPayloadOp.ShardCanIdentify: { break; } + + case WorkerSendPayloadOp.FetchStatus: { + break; + } } } diff --git a/packages/ws/src/strategies/sharding/IShardingStrategy.ts b/packages/ws/src/strategies/sharding/IShardingStrategy.ts index 085981ef7..ec7ce133e 100644 --- a/packages/ws/src/strategies/sharding/IShardingStrategy.ts +++ b/packages/ws/src/strategies/sharding/IShardingStrategy.ts @@ -1,6 +1,7 @@ +import type { Collection } from '@discordjs/collection'; import type { Awaitable } from '@discordjs/util'; import type { GatewaySendPayload } from 'discord-api-types/v10'; -import type { WebSocketShardDestroyOptions } from '../../ws/WebSocketShard'; +import type { WebSocketShardDestroyOptions, WebSocketShardStatus } from '../../ws/WebSocketShard'; /** * Strategies responsible for spawning, initializing connections, destroying shards, and relaying events @@ -14,6 +15,10 @@ export interface IShardingStrategy { * Destroys all the shards */ destroy(options?: Omit): Awaitable; + /** + * Fetches the status of all the shards + */ + fetchStatus(): Awaitable>; /** * Sends a payload to a shard */ diff --git a/packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts b/packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts index d2592af69..2480f25df 100644 --- a/packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts +++ b/packages/ws/src/strategies/sharding/SimpleShardingStrategy.ts @@ -70,4 +70,11 @@ export class SimpleShardingStrategy implements IShardingStrategy { if (!shard) throw new Error(`Shard ${shardId} not found`); return shard.send(payload); } + + /** + * {@inheritDoc IShardingStrategy.fetchStatus} + */ + public async fetchStatus() { + return this.shards.mapValues((shard) => shard.status); + } } diff --git a/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts b/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts index c2f0df500..aade79ece 100644 --- a/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts +++ b/packages/ws/src/strategies/sharding/WorkerShardingStrategy.ts @@ -5,7 +5,7 @@ import { Collection } from '@discordjs/collection'; import type { GatewaySendPayload } from 'discord-api-types/v10'; import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js'; import type { SessionInfo, WebSocketManager } from '../../ws/WebSocketManager'; -import type { WebSocketShardDestroyOptions, WebSocketShardEvents } from '../../ws/WebSocketShard'; +import type { WebSocketShardDestroyOptions, WebSocketShardEvents, WebSocketShardStatus } from '../../ws/WebSocketShard'; import { managerToFetchingStrategyOptions, type FetchingStrategyOptions } from '../context/IContextFetchingStrategy.js'; import type { IShardingStrategy } from './IShardingStrategy.js'; @@ -19,9 +19,11 @@ export enum WorkerSendPayloadOp { Send, SessionInfoResponse, ShardCanIdentify, + FetchStatus, } export type WorkerSendPayload = + | { nonce: number; op: WorkerSendPayloadOp.FetchStatus; shardId: number } | { nonce: number; op: WorkerSendPayloadOp.SessionInfoResponse; session: SessionInfo | null } | { nonce: number; op: WorkerSendPayloadOp.ShardCanIdentify } | { op: WorkerSendPayloadOp.Connect; shardId: number } @@ -35,11 +37,13 @@ export enum WorkerRecievePayloadOp { RetrieveSessionInfo, UpdateSessionInfo, WaitForIdentify, + FetchStatusResponse, } export type WorkerRecievePayload = // Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now | { data: any; event: WebSocketShardEvents; op: WorkerRecievePayloadOp.Event; shardId: number } + | { nonce: number; op: WorkerRecievePayloadOp.FetchStatusResponse; status: WebSocketShardStatus } | { nonce: number; op: WorkerRecievePayloadOp.RetrieveSessionInfo; shardId: number } | { nonce: number; op: WorkerRecievePayloadOp.WaitForIdentify } | { op: WorkerRecievePayloadOp.Connected; shardId: number } @@ -72,6 +76,8 @@ export class WorkerShardingStrategy implements IShardingStrategy { private readonly destroyPromises = new Collection void>(); + private readonly fetchStatusPromises = new Collection void>(); + private readonly throttler: IdentifyThrottler; public constructor(manager: WebSocketManager, options: WorkerShardingStrategyOptions) { @@ -179,18 +185,41 @@ export class WorkerShardingStrategy implements IShardingStrategy { worker.postMessage(payload); } + /** + * {@inheritDoc IShardingStrategy.fetchStatus} + */ + public async fetchStatus() { + const statuses = new Collection(); + + for (const [shardId, worker] of this.#workerByShardId.entries()) { + const nonce = Math.random(); + const payload = { + op: WorkerSendPayloadOp.FetchStatus, + shardId, + nonce, + } satisfies WorkerSendPayload; + + // eslint-disable-next-line no-promise-executor-return + const promise = new Promise((resolve) => this.fetchStatusPromises.set(nonce, resolve)); + worker.postMessage(payload); + + const status = await promise; + statuses.set(shardId, status); + } + + return statuses; + } + private async onMessage(worker: Worker, payload: WorkerRecievePayload) { switch (payload.op) { case WorkerRecievePayloadOp.Connected: { - const resolve = this.connectPromises.get(payload.shardId)!; - resolve(); + this.connectPromises.get(payload.shardId)?.(); this.connectPromises.delete(payload.shardId); break; } case WorkerRecievePayloadOp.Destroyed: { - const resolve = this.destroyPromises.get(payload.shardId)!; - resolve(); + this.destroyPromises.get(payload.shardId)?.(); this.destroyPromises.delete(payload.shardId); break; } @@ -225,6 +254,12 @@ export class WorkerShardingStrategy implements IShardingStrategy { worker.postMessage(response); break; } + + case WorkerRecievePayloadOp.FetchStatusResponse: { + this.fetchStatusPromises.get(payload.nonce)?.(payload.status); + this.fetchStatusPromises.delete(payload.nonce); + break; + } } } } diff --git a/packages/ws/src/strategies/sharding/worker.ts b/packages/ws/src/strategies/sharding/worker.ts index 4bc3347b5..19345343c 100644 --- a/packages/ws/src/strategies/sharding/worker.ts +++ b/packages/ws/src/strategies/sharding/worker.ts @@ -97,5 +97,21 @@ parentPort! case WorkerSendPayloadOp.ShardCanIdentify: { break; } + + case WorkerSendPayloadOp.FetchStatus: { + const shard = shards.get(payload.shardId); + if (!shard) { + throw new Error(`Shard ${payload.shardId} does not exist`); + } + + const response = { + op: WorkerRecievePayloadOp.FetchStatusResponse, + status: shard.status, + nonce: payload.nonce, + } satisfies WorkerRecievePayload; + + parentPort!.postMessage(response); + break; + } } }); diff --git a/packages/ws/src/ws/WebSocketManager.ts b/packages/ws/src/ws/WebSocketManager.ts index 8b32cb57f..5321535c4 100644 --- a/packages/ws/src/ws/WebSocketManager.ts +++ b/packages/ws/src/ws/WebSocketManager.ts @@ -190,7 +190,7 @@ export class WebSocketManager extends AsyncEventEmitter { /** * Strategy used to manage shards * - * @defaultValue `SimpleManagerToShardStrategy` + * @defaultValue `SimpleShardingStrategy` */ private strategy: IShardingStrategy = new SimpleShardingStrategy(this); @@ -300,4 +300,8 @@ export class WebSocketManager extends AsyncEventEmitter { public send(shardId: number, payload: GatewaySendPayload) { return this.strategy.send(shardId, payload); } + + public fetchStatus() { + return this.strategy.fetchStatus(); + } } diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index 21f072fff..023d6182d 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -33,6 +33,7 @@ export enum WebSocketShardEvents { Closed = 'closed', Debug = 'debug', Dispatch = 'dispatch', + HeartbeatComplete = 'heartbeat', Hello = 'hello', Ready = 'ready', Resumed = 'resumed', @@ -54,10 +55,11 @@ export enum WebSocketShardDestroyRecovery { export type WebSocketShardEventsMap = { [WebSocketShardEvents.Closed]: [{ code: number }]; [WebSocketShardEvents.Debug]: [payload: { message: string }]; + [WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }]; [WebSocketShardEvents.Hello]: []; [WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }]; [WebSocketShardEvents.Resumed]: []; - [WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }]; + [WebSocketShardEvents.HeartbeatComplete]: [payload: { ackAt: number; heartbeatAt: number; latency: number }]; }; export interface WebSocketShardDestroyOptions { @@ -87,8 +89,6 @@ export class WebSocketShard extends AsyncEventEmitter { private readonly textDecoder = new TextDecoder(); - private status: WebSocketShardStatus = WebSocketShardStatus.Idle; - private replayedEvents = 0; private isAck = true; @@ -107,6 +107,12 @@ export class WebSocketShard extends AsyncEventEmitter { public readonly strategy: IContextFetchingStrategy; + #status: WebSocketShardStatus = WebSocketShardStatus.Idle; + + public get status(): WebSocketShardStatus { + return this.#status; + } + public constructor(strategy: IContextFetchingStrategy, id: number) { super(); this.strategy = strategy; @@ -114,7 +120,7 @@ export class WebSocketShard extends AsyncEventEmitter { } public async connect() { - if (this.status !== WebSocketShardStatus.Idle) { + if (this.#status !== WebSocketShardStatus.Idle) { throw new Error("Tried to connect a shard that wasn't idle"); } @@ -148,7 +154,7 @@ export class WebSocketShard extends AsyncEventEmitter { connection.binaryType = 'arraybuffer'; this.connection = connection; - this.status = WebSocketShardStatus.Connecting; + this.#status = WebSocketShardStatus.Connecting; this.sendRateLimitState = getInitialSendRateLimitState(); @@ -163,7 +169,7 @@ export class WebSocketShard extends AsyncEventEmitter { } public async destroy(options: WebSocketShardDestroyOptions = {}) { - if (this.status === WebSocketShardStatus.Idle) { + if (this.#status === WebSocketShardStatus.Idle) { this.debug(['Tried to destroy a shard that was idle']); return; } @@ -221,7 +227,7 @@ export class WebSocketShard extends AsyncEventEmitter { this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']); } - this.status = WebSocketShardStatus.Idle; + this.#status = WebSocketShardStatus.Idle; if (options.recover !== undefined) { return this.connect(); @@ -248,7 +254,7 @@ export class WebSocketShard extends AsyncEventEmitter { throw new Error("WebSocketShard wasn't connected"); } - if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) { + if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) { this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']); await once(this, WebSocketShardEvents.Ready); } @@ -320,12 +326,12 @@ export class WebSocketShard extends AsyncEventEmitter { }); await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout); - this.status = WebSocketShardStatus.Ready; + this.#status = WebSocketShardStatus.Ready; } private async resume(session: SessionInfo) { this.debug(['Resuming session']); - this.status = WebSocketShardStatus.Resuming; + this.#status = WebSocketShardStatus.Resuming; this.replayedEvents = 0; return this.send({ op: GatewayOpcodes.Resume, @@ -420,7 +426,7 @@ export class WebSocketShard extends AsyncEventEmitter { switch (payload.op) { case GatewayOpcodes.Dispatch: { - if (this.status === WebSocketShardStatus.Resuming) { + if (this.#status === WebSocketShardStatus.Resuming) { this.replayedEvents++; } @@ -442,7 +448,7 @@ export class WebSocketShard extends AsyncEventEmitter { } case GatewayDispatchEvents.Resumed: { - this.status = WebSocketShardStatus.Ready; + this.#status = WebSocketShardStatus.Ready; this.debug([`Resumed and replayed ${this.replayedEvents} events`]); this.emit(WebSocketShardEvents.Resumed); break; @@ -502,7 +508,14 @@ export class WebSocketShard extends AsyncEventEmitter { case GatewayOpcodes.HeartbeatAck: { this.isAck = true; - this.debug([`Got heartbeat ack after ${Date.now() - this.lastHeartbeatAt}ms`]); + + const ackAt = Date.now(); + this.emit(WebSocketShardEvents.HeartbeatComplete, { + ackAt, + heartbeatAt: this.lastHeartbeatAt, + latency: ackAt - this.lastHeartbeatAt, + }); + break; } }