mirror of
https://github.com/discordjs/discord.js.git
synced 2026-03-14 18:43:31 +01:00
feat(ws): metrics (#9005)
* feat(WebSocketManager): fetch status * feat(WebSocketShard): heartbeat event * chore: ci Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
@@ -114,6 +114,10 @@ vi.mock('node:worker_threads', async () => {
|
|||||||
case WorkerSendPayloadOp.ShardCanIdentify: {
|
case WorkerSendPayloadOp.ShardCanIdentify: {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case WorkerSendPayloadOp.FetchStatus: {
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -1,6 +1,7 @@
|
|||||||
|
import type { Collection } from '@discordjs/collection';
|
||||||
import type { Awaitable } from '@discordjs/util';
|
import type { Awaitable } from '@discordjs/util';
|
||||||
import type { GatewaySendPayload } from 'discord-api-types/v10';
|
import type { GatewaySendPayload } from 'discord-api-types/v10';
|
||||||
import type { WebSocketShardDestroyOptions } from '../../ws/WebSocketShard';
|
import type { WebSocketShardDestroyOptions, WebSocketShardStatus } from '../../ws/WebSocketShard';
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Strategies responsible for spawning, initializing connections, destroying shards, and relaying events
|
* Strategies responsible for spawning, initializing connections, destroying shards, and relaying events
|
||||||
@@ -14,6 +15,10 @@ export interface IShardingStrategy {
|
|||||||
* Destroys all the shards
|
* Destroys all the shards
|
||||||
*/
|
*/
|
||||||
destroy(options?: Omit<WebSocketShardDestroyOptions, 'recover'>): Awaitable<void>;
|
destroy(options?: Omit<WebSocketShardDestroyOptions, 'recover'>): Awaitable<void>;
|
||||||
|
/**
|
||||||
|
* Fetches the status of all the shards
|
||||||
|
*/
|
||||||
|
fetchStatus(): Awaitable<Collection<number, WebSocketShardStatus>>;
|
||||||
/**
|
/**
|
||||||
* Sends a payload to a shard
|
* Sends a payload to a shard
|
||||||
*/
|
*/
|
||||||
|
|||||||
@@ -70,4 +70,11 @@ export class SimpleShardingStrategy implements IShardingStrategy {
|
|||||||
if (!shard) throw new Error(`Shard ${shardId} not found`);
|
if (!shard) throw new Error(`Shard ${shardId} not found`);
|
||||||
return shard.send(payload);
|
return shard.send(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc IShardingStrategy.fetchStatus}
|
||||||
|
*/
|
||||||
|
public async fetchStatus() {
|
||||||
|
return this.shards.mapValues((shard) => shard.status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -5,7 +5,7 @@ import { Collection } from '@discordjs/collection';
|
|||||||
import type { GatewaySendPayload } from 'discord-api-types/v10';
|
import type { GatewaySendPayload } from 'discord-api-types/v10';
|
||||||
import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js';
|
import { IdentifyThrottler } from '../../utils/IdentifyThrottler.js';
|
||||||
import type { SessionInfo, WebSocketManager } from '../../ws/WebSocketManager';
|
import type { SessionInfo, WebSocketManager } from '../../ws/WebSocketManager';
|
||||||
import type { WebSocketShardDestroyOptions, WebSocketShardEvents } from '../../ws/WebSocketShard';
|
import type { WebSocketShardDestroyOptions, WebSocketShardEvents, WebSocketShardStatus } from '../../ws/WebSocketShard';
|
||||||
import { managerToFetchingStrategyOptions, type FetchingStrategyOptions } from '../context/IContextFetchingStrategy.js';
|
import { managerToFetchingStrategyOptions, type FetchingStrategyOptions } from '../context/IContextFetchingStrategy.js';
|
||||||
import type { IShardingStrategy } from './IShardingStrategy.js';
|
import type { IShardingStrategy } from './IShardingStrategy.js';
|
||||||
|
|
||||||
@@ -19,9 +19,11 @@ export enum WorkerSendPayloadOp {
|
|||||||
Send,
|
Send,
|
||||||
SessionInfoResponse,
|
SessionInfoResponse,
|
||||||
ShardCanIdentify,
|
ShardCanIdentify,
|
||||||
|
FetchStatus,
|
||||||
}
|
}
|
||||||
|
|
||||||
export type WorkerSendPayload =
|
export type WorkerSendPayload =
|
||||||
|
| { nonce: number; op: WorkerSendPayloadOp.FetchStatus; shardId: number }
|
||||||
| { nonce: number; op: WorkerSendPayloadOp.SessionInfoResponse; session: SessionInfo | null }
|
| { nonce: number; op: WorkerSendPayloadOp.SessionInfoResponse; session: SessionInfo | null }
|
||||||
| { nonce: number; op: WorkerSendPayloadOp.ShardCanIdentify }
|
| { nonce: number; op: WorkerSendPayloadOp.ShardCanIdentify }
|
||||||
| { op: WorkerSendPayloadOp.Connect; shardId: number }
|
| { op: WorkerSendPayloadOp.Connect; shardId: number }
|
||||||
@@ -35,11 +37,13 @@ export enum WorkerRecievePayloadOp {
|
|||||||
RetrieveSessionInfo,
|
RetrieveSessionInfo,
|
||||||
UpdateSessionInfo,
|
UpdateSessionInfo,
|
||||||
WaitForIdentify,
|
WaitForIdentify,
|
||||||
|
FetchStatusResponse,
|
||||||
}
|
}
|
||||||
|
|
||||||
export type WorkerRecievePayload =
|
export type WorkerRecievePayload =
|
||||||
// Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now
|
// Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now
|
||||||
| { data: any; event: WebSocketShardEvents; op: WorkerRecievePayloadOp.Event; shardId: number }
|
| { data: any; event: WebSocketShardEvents; op: WorkerRecievePayloadOp.Event; shardId: number }
|
||||||
|
| { nonce: number; op: WorkerRecievePayloadOp.FetchStatusResponse; status: WebSocketShardStatus }
|
||||||
| { nonce: number; op: WorkerRecievePayloadOp.RetrieveSessionInfo; shardId: number }
|
| { nonce: number; op: WorkerRecievePayloadOp.RetrieveSessionInfo; shardId: number }
|
||||||
| { nonce: number; op: WorkerRecievePayloadOp.WaitForIdentify }
|
| { nonce: number; op: WorkerRecievePayloadOp.WaitForIdentify }
|
||||||
| { op: WorkerRecievePayloadOp.Connected; shardId: number }
|
| { op: WorkerRecievePayloadOp.Connected; shardId: number }
|
||||||
@@ -72,6 +76,8 @@ export class WorkerShardingStrategy implements IShardingStrategy {
|
|||||||
|
|
||||||
private readonly destroyPromises = new Collection<number, () => void>();
|
private readonly destroyPromises = new Collection<number, () => void>();
|
||||||
|
|
||||||
|
private readonly fetchStatusPromises = new Collection<number, (status: WebSocketShardStatus) => void>();
|
||||||
|
|
||||||
private readonly throttler: IdentifyThrottler;
|
private readonly throttler: IdentifyThrottler;
|
||||||
|
|
||||||
public constructor(manager: WebSocketManager, options: WorkerShardingStrategyOptions) {
|
public constructor(manager: WebSocketManager, options: WorkerShardingStrategyOptions) {
|
||||||
@@ -179,18 +185,41 @@ export class WorkerShardingStrategy implements IShardingStrategy {
|
|||||||
worker.postMessage(payload);
|
worker.postMessage(payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* {@inheritDoc IShardingStrategy.fetchStatus}
|
||||||
|
*/
|
||||||
|
public async fetchStatus() {
|
||||||
|
const statuses = new Collection<number, WebSocketShardStatus>();
|
||||||
|
|
||||||
|
for (const [shardId, worker] of this.#workerByShardId.entries()) {
|
||||||
|
const nonce = Math.random();
|
||||||
|
const payload = {
|
||||||
|
op: WorkerSendPayloadOp.FetchStatus,
|
||||||
|
shardId,
|
||||||
|
nonce,
|
||||||
|
} satisfies WorkerSendPayload;
|
||||||
|
|
||||||
|
// eslint-disable-next-line no-promise-executor-return
|
||||||
|
const promise = new Promise<WebSocketShardStatus>((resolve) => this.fetchStatusPromises.set(nonce, resolve));
|
||||||
|
worker.postMessage(payload);
|
||||||
|
|
||||||
|
const status = await promise;
|
||||||
|
statuses.set(shardId, status);
|
||||||
|
}
|
||||||
|
|
||||||
|
return statuses;
|
||||||
|
}
|
||||||
|
|
||||||
private async onMessage(worker: Worker, payload: WorkerRecievePayload) {
|
private async onMessage(worker: Worker, payload: WorkerRecievePayload) {
|
||||||
switch (payload.op) {
|
switch (payload.op) {
|
||||||
case WorkerRecievePayloadOp.Connected: {
|
case WorkerRecievePayloadOp.Connected: {
|
||||||
const resolve = this.connectPromises.get(payload.shardId)!;
|
this.connectPromises.get(payload.shardId)?.();
|
||||||
resolve();
|
|
||||||
this.connectPromises.delete(payload.shardId);
|
this.connectPromises.delete(payload.shardId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
case WorkerRecievePayloadOp.Destroyed: {
|
case WorkerRecievePayloadOp.Destroyed: {
|
||||||
const resolve = this.destroyPromises.get(payload.shardId)!;
|
this.destroyPromises.get(payload.shardId)?.();
|
||||||
resolve();
|
|
||||||
this.destroyPromises.delete(payload.shardId);
|
this.destroyPromises.delete(payload.shardId);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
@@ -225,6 +254,12 @@ export class WorkerShardingStrategy implements IShardingStrategy {
|
|||||||
worker.postMessage(response);
|
worker.postMessage(response);
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case WorkerRecievePayloadOp.FetchStatusResponse: {
|
||||||
|
this.fetchStatusPromises.get(payload.nonce)?.(payload.status);
|
||||||
|
this.fetchStatusPromises.delete(payload.nonce);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -97,5 +97,21 @@ parentPort!
|
|||||||
case WorkerSendPayloadOp.ShardCanIdentify: {
|
case WorkerSendPayloadOp.ShardCanIdentify: {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
case WorkerSendPayloadOp.FetchStatus: {
|
||||||
|
const shard = shards.get(payload.shardId);
|
||||||
|
if (!shard) {
|
||||||
|
throw new Error(`Shard ${payload.shardId} does not exist`);
|
||||||
|
}
|
||||||
|
|
||||||
|
const response = {
|
||||||
|
op: WorkerRecievePayloadOp.FetchStatusResponse,
|
||||||
|
status: shard.status,
|
||||||
|
nonce: payload.nonce,
|
||||||
|
} satisfies WorkerRecievePayload;
|
||||||
|
|
||||||
|
parentPort!.postMessage(response);
|
||||||
|
break;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
|
|||||||
@@ -190,7 +190,7 @@ export class WebSocketManager extends AsyncEventEmitter<ManagerShardEventsMap> {
|
|||||||
/**
|
/**
|
||||||
* Strategy used to manage shards
|
* Strategy used to manage shards
|
||||||
*
|
*
|
||||||
* @defaultValue `SimpleManagerToShardStrategy`
|
* @defaultValue `SimpleShardingStrategy`
|
||||||
*/
|
*/
|
||||||
private strategy: IShardingStrategy = new SimpleShardingStrategy(this);
|
private strategy: IShardingStrategy = new SimpleShardingStrategy(this);
|
||||||
|
|
||||||
@@ -300,4 +300,8 @@ export class WebSocketManager extends AsyncEventEmitter<ManagerShardEventsMap> {
|
|||||||
public send(shardId: number, payload: GatewaySendPayload) {
|
public send(shardId: number, payload: GatewaySendPayload) {
|
||||||
return this.strategy.send(shardId, payload);
|
return this.strategy.send(shardId, payload);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public fetchStatus() {
|
||||||
|
return this.strategy.fetchStatus();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -33,6 +33,7 @@ export enum WebSocketShardEvents {
|
|||||||
Closed = 'closed',
|
Closed = 'closed',
|
||||||
Debug = 'debug',
|
Debug = 'debug',
|
||||||
Dispatch = 'dispatch',
|
Dispatch = 'dispatch',
|
||||||
|
HeartbeatComplete = 'heartbeat',
|
||||||
Hello = 'hello',
|
Hello = 'hello',
|
||||||
Ready = 'ready',
|
Ready = 'ready',
|
||||||
Resumed = 'resumed',
|
Resumed = 'resumed',
|
||||||
@@ -54,10 +55,11 @@ export enum WebSocketShardDestroyRecovery {
|
|||||||
export type WebSocketShardEventsMap = {
|
export type WebSocketShardEventsMap = {
|
||||||
[WebSocketShardEvents.Closed]: [{ code: number }];
|
[WebSocketShardEvents.Closed]: [{ code: number }];
|
||||||
[WebSocketShardEvents.Debug]: [payload: { message: string }];
|
[WebSocketShardEvents.Debug]: [payload: { message: string }];
|
||||||
|
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
|
||||||
[WebSocketShardEvents.Hello]: [];
|
[WebSocketShardEvents.Hello]: [];
|
||||||
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
|
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
|
||||||
[WebSocketShardEvents.Resumed]: [];
|
[WebSocketShardEvents.Resumed]: [];
|
||||||
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
|
[WebSocketShardEvents.HeartbeatComplete]: [payload: { ackAt: number; heartbeatAt: number; latency: number }];
|
||||||
};
|
};
|
||||||
|
|
||||||
export interface WebSocketShardDestroyOptions {
|
export interface WebSocketShardDestroyOptions {
|
||||||
@@ -87,8 +89,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
|
|
||||||
private readonly textDecoder = new TextDecoder();
|
private readonly textDecoder = new TextDecoder();
|
||||||
|
|
||||||
private status: WebSocketShardStatus = WebSocketShardStatus.Idle;
|
|
||||||
|
|
||||||
private replayedEvents = 0;
|
private replayedEvents = 0;
|
||||||
|
|
||||||
private isAck = true;
|
private isAck = true;
|
||||||
@@ -107,6 +107,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
|
|
||||||
public readonly strategy: IContextFetchingStrategy;
|
public readonly strategy: IContextFetchingStrategy;
|
||||||
|
|
||||||
|
#status: WebSocketShardStatus = WebSocketShardStatus.Idle;
|
||||||
|
|
||||||
|
public get status(): WebSocketShardStatus {
|
||||||
|
return this.#status;
|
||||||
|
}
|
||||||
|
|
||||||
public constructor(strategy: IContextFetchingStrategy, id: number) {
|
public constructor(strategy: IContextFetchingStrategy, id: number) {
|
||||||
super();
|
super();
|
||||||
this.strategy = strategy;
|
this.strategy = strategy;
|
||||||
@@ -114,7 +120,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async connect() {
|
public async connect() {
|
||||||
if (this.status !== WebSocketShardStatus.Idle) {
|
if (this.#status !== WebSocketShardStatus.Idle) {
|
||||||
throw new Error("Tried to connect a shard that wasn't idle");
|
throw new Error("Tried to connect a shard that wasn't idle");
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -148,7 +154,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
connection.binaryType = 'arraybuffer';
|
connection.binaryType = 'arraybuffer';
|
||||||
this.connection = connection;
|
this.connection = connection;
|
||||||
|
|
||||||
this.status = WebSocketShardStatus.Connecting;
|
this.#status = WebSocketShardStatus.Connecting;
|
||||||
|
|
||||||
this.sendRateLimitState = getInitialSendRateLimitState();
|
this.sendRateLimitState = getInitialSendRateLimitState();
|
||||||
|
|
||||||
@@ -163,7 +169,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public async destroy(options: WebSocketShardDestroyOptions = {}) {
|
public async destroy(options: WebSocketShardDestroyOptions = {}) {
|
||||||
if (this.status === WebSocketShardStatus.Idle) {
|
if (this.#status === WebSocketShardStatus.Idle) {
|
||||||
this.debug(['Tried to destroy a shard that was idle']);
|
this.debug(['Tried to destroy a shard that was idle']);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@@ -221,7 +227,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
|
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.status = WebSocketShardStatus.Idle;
|
this.#status = WebSocketShardStatus.Idle;
|
||||||
|
|
||||||
if (options.recover !== undefined) {
|
if (options.recover !== undefined) {
|
||||||
return this.connect();
|
return this.connect();
|
||||||
@@ -248,7 +254,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
throw new Error("WebSocketShard wasn't connected");
|
throw new Error("WebSocketShard wasn't connected");
|
||||||
}
|
}
|
||||||
|
|
||||||
if (this.status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
|
if (this.#status !== WebSocketShardStatus.Ready && !ImportantGatewayOpcodes.has(payload.op)) {
|
||||||
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
|
this.debug(['Tried to send a non-crucial payload before the shard was ready, waiting']);
|
||||||
await once(this, WebSocketShardEvents.Ready);
|
await once(this, WebSocketShardEvents.Ready);
|
||||||
}
|
}
|
||||||
@@ -320,12 +326,12 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
});
|
});
|
||||||
|
|
||||||
await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
|
await this.waitForEvent(WebSocketShardEvents.Ready, this.strategy.options.readyTimeout);
|
||||||
this.status = WebSocketShardStatus.Ready;
|
this.#status = WebSocketShardStatus.Ready;
|
||||||
}
|
}
|
||||||
|
|
||||||
private async resume(session: SessionInfo) {
|
private async resume(session: SessionInfo) {
|
||||||
this.debug(['Resuming session']);
|
this.debug(['Resuming session']);
|
||||||
this.status = WebSocketShardStatus.Resuming;
|
this.#status = WebSocketShardStatus.Resuming;
|
||||||
this.replayedEvents = 0;
|
this.replayedEvents = 0;
|
||||||
return this.send({
|
return this.send({
|
||||||
op: GatewayOpcodes.Resume,
|
op: GatewayOpcodes.Resume,
|
||||||
@@ -420,7 +426,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
|
|
||||||
switch (payload.op) {
|
switch (payload.op) {
|
||||||
case GatewayOpcodes.Dispatch: {
|
case GatewayOpcodes.Dispatch: {
|
||||||
if (this.status === WebSocketShardStatus.Resuming) {
|
if (this.#status === WebSocketShardStatus.Resuming) {
|
||||||
this.replayedEvents++;
|
this.replayedEvents++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -442,7 +448,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
case GatewayDispatchEvents.Resumed: {
|
case GatewayDispatchEvents.Resumed: {
|
||||||
this.status = WebSocketShardStatus.Ready;
|
this.#status = WebSocketShardStatus.Ready;
|
||||||
this.debug([`Resumed and replayed ${this.replayedEvents} events`]);
|
this.debug([`Resumed and replayed ${this.replayedEvents} events`]);
|
||||||
this.emit(WebSocketShardEvents.Resumed);
|
this.emit(WebSocketShardEvents.Resumed);
|
||||||
break;
|
break;
|
||||||
@@ -502,7 +508,14 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
|
|||||||
|
|
||||||
case GatewayOpcodes.HeartbeatAck: {
|
case GatewayOpcodes.HeartbeatAck: {
|
||||||
this.isAck = true;
|
this.isAck = true;
|
||||||
this.debug([`Got heartbeat ack after ${Date.now() - this.lastHeartbeatAt}ms`]);
|
|
||||||
|
const ackAt = Date.now();
|
||||||
|
this.emit(WebSocketShardEvents.HeartbeatComplete, {
|
||||||
|
ackAt,
|
||||||
|
heartbeatAt: this.lastHeartbeatAt,
|
||||||
|
latency: ackAt - this.lastHeartbeatAt,
|
||||||
|
});
|
||||||
|
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user