refactor(ws): event layout (#10376)

* refactor(ws): event layout

BREAKING CHANGE: All events now emit shard id as its own param

* fix: worker event forwarding

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
DD
2024-07-24 21:40:34 +03:00
committed by GitHub
parent fcd35ea2e7
commit bf6761a44a
7 changed files with 137 additions and 134 deletions

View File

@@ -28,8 +28,8 @@ export class SimpleShardingStrategy implements IShardingStrategy {
const strategy = new SimpleContextFetchingStrategy(this.manager, strategyOptions);
const shard = new WebSocketShard(strategy, shardId);
for (const event of Object.values(WebSocketShardEvents)) {
// @ts-expect-error: Intentional
shard.on(event, (payload) => this.manager.emit(event, { ...payload, shardId }));
// @ts-expect-error Event props can't be resolved properly, but they are correct
shard.on(event, (...args) => this.manager.emit(event, ...args, shardId));
}
this.shards.set(shardId, shard);

View File

@@ -48,7 +48,7 @@ export enum WorkerReceivePayloadOp {
export type WorkerReceivePayload =
// Can't seem to get a type-safe union based off of the event, so I'm sadly leaving data as any for now
| { data: any; event: WebSocketShardEvents; op: WorkerReceivePayloadOp.Event; shardId: number }
| { data: any[]; event: WebSocketShardEvents; op: WorkerReceivePayloadOp.Event; shardId: number }
| { nonce: number; op: WorkerReceivePayloadOp.CancelIdentify }
| { nonce: number; op: WorkerReceivePayloadOp.FetchStatusResponse; status: WebSocketShardStatus }
| { nonce: number; op: WorkerReceivePayloadOp.RetrieveSessionInfo; shardId: number }
@@ -293,7 +293,8 @@ export class WorkerShardingStrategy implements IShardingStrategy {
}
case WorkerReceivePayloadOp.Event: {
this.manager.emit(payload.event, { ...payload.data, shardId: payload.shardId });
// @ts-expect-error Event props can't be resolved properly, but they are correct
this.manager.emit(payload.event, ...payload.data, payload.shardId);
break;
}

View File

@@ -148,12 +148,11 @@ export class WorkerBootstrapper {
for (const shardId of this.data.shardIds) {
const shard = new WebSocketShard(new WorkerContextFetchingStrategy(this.data), shardId);
for (const event of options.forwardEvents ?? Object.values(WebSocketShardEvents)) {
// @ts-expect-error: Event types incompatible
shard.on(event, (data) => {
shard.on(event, (...args) => {
const payload: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.Event,
event,
data,
data: args,
shardId,
};
parentPort!.postMessage(payload);

View File

@@ -197,15 +197,15 @@ export interface CreateWebSocketManagerOptions
RequiredWebSocketManagerOptions {}
export interface ManagerShardEventsMap {
[WebSocketShardEvents.Closed]: [{ code: number; shardId: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string; shardId: number }];
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload; shardId: number }];
[WebSocketShardEvents.Error]: [payload: { error: Error; shardId: number }];
[WebSocketShardEvents.Hello]: [{ shardId: number }];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData; shardId: number }];
[WebSocketShardEvents.Resumed]: [{ shardId: number }];
[WebSocketShardEvents.Closed]: [code: number, shardId: number];
[WebSocketShardEvents.Debug]: [message: string, shardId: number];
[WebSocketShardEvents.Dispatch]: [payload: GatewayDispatchPayload, shardId: number];
[WebSocketShardEvents.Error]: [error: Error, shardId: number];
[WebSocketShardEvents.Hello]: [shardId: number];
[WebSocketShardEvents.Ready]: [data: GatewayReadyDispatchData, shardId: number];
[WebSocketShardEvents.Resumed]: [shardId: number];
[WebSocketShardEvents.HeartbeatComplete]: [
payload: { ackAt: number; heartbeatAt: number; latency: number; shardId: number },
stats: { ackAt: number; heartbeatAt: number; latency: number; shardId: number },
];
}

View File

@@ -60,14 +60,14 @@ export enum WebSocketShardDestroyRecovery {
}
export interface WebSocketShardEventsMap {
[WebSocketShardEvents.Closed]: [{ code: number }];
[WebSocketShardEvents.Debug]: [payload: { message: string }];
[WebSocketShardEvents.Dispatch]: [payload: { data: GatewayDispatchPayload }];
[WebSocketShardEvents.Error]: [payload: { error: Error }];
[WebSocketShardEvents.Closed]: [code: number];
[WebSocketShardEvents.Debug]: [message: string];
[WebSocketShardEvents.Dispatch]: [payload: GatewayDispatchPayload];
[WebSocketShardEvents.Error]: [error: Error];
[WebSocketShardEvents.Hello]: [];
[WebSocketShardEvents.Ready]: [payload: { data: GatewayReadyDispatchData }];
[WebSocketShardEvents.Ready]: [payload: GatewayReadyDispatchData];
[WebSocketShardEvents.Resumed]: [];
[WebSocketShardEvents.HeartbeatComplete]: [payload: { ackAt: number; heartbeatAt: number; latency: number }];
[WebSocketShardEvents.HeartbeatComplete]: [stats: { ackAt: number; heartbeatAt: number; latency: number }];
}
export interface WebSocketShardDestroyOptions {
@@ -203,7 +203,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
});
inflate.on('error', (error) => {
this.emit(WebSocketShardEvents.Error, { error });
this.emit(WebSocketShardEvents.Error, error);
});
this.nativeInflate = inflate;
@@ -352,7 +352,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.connection.close(options.code, options.reason);
await promise;
this.emit(WebSocketShardEvents.Closed, { code: options.code });
this.emit(WebSocketShardEvents.Closed, options.code);
}
// Lastly, remove the error event.
@@ -639,11 +639,10 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH);
if (this.zLibSyncInflate.err) {
this.emit(WebSocketShardEvents.Error, {
error: new Error(
`${this.zLibSyncInflate.err}${this.zLibSyncInflate.msg ? `: ${this.zLibSyncInflate.msg}` : ''}`,
),
});
this.emit(
WebSocketShardEvents.Error,
new Error(`${this.zLibSyncInflate.err}${this.zLibSyncInflate.msg ? `: ${this.zLibSyncInflate.msg}` : ''}`),
);
}
if (!flush) {
@@ -692,7 +691,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
await this.strategy.updateSessionInfo(this.id, session);
this.emit(WebSocketShardEvents.Ready, { data: payload.d });
this.emit(WebSocketShardEvents.Ready, payload.d);
break;
}
@@ -719,7 +718,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
]);
}
this.emit(WebSocketShardEvents.Dispatch, { data: payload });
this.emit(WebSocketShardEvents.Dispatch, payload);
break;
}
@@ -798,11 +797,11 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
return;
}
this.emit(WebSocketShardEvents.Error, { error });
this.emit(WebSocketShardEvents.Error, error);
}
private async onClose(code: number) {
this.emit(WebSocketShardEvents.Closed, { code });
this.emit(WebSocketShardEvents.Closed, code);
switch (code) {
case CloseCodes.Normal: {
@@ -838,9 +837,11 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
case GatewayCloseCodes.AuthenticationFailed: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Authentication failed'),
});
this.emit(
WebSocketShardEvents.Error,
new Error('Authentication failed'),
);
return this.destroy({ code });
}
@@ -865,37 +866,43 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
case GatewayCloseCodes.InvalidShard: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Invalid shard'),
});
this.emit(WebSocketShardEvents.Error, new Error('Invalid shard'));
return this.destroy({ code });
}
case GatewayCloseCodes.ShardingRequired: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Sharding is required'),
});
this.emit(
WebSocketShardEvents.Error,
new Error('Sharding is required'),
);
return this.destroy({ code });
}
case GatewayCloseCodes.InvalidAPIVersion: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Used an invalid API version'),
});
this.emit(
WebSocketShardEvents.Error,
new Error('Used an invalid API version'),
);
return this.destroy({ code });
}
case GatewayCloseCodes.InvalidIntents: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Used invalid intents'),
});
this.emit(
WebSocketShardEvents.Error,
new Error('Used invalid intents'),
);
return this.destroy({ code });
}
case GatewayCloseCodes.DisallowedIntents: {
this.emit(WebSocketShardEvents.Error, {
error: new Error('Used disallowed intents'),
});
this.emit(
WebSocketShardEvents.Error,
new Error('Used disallowed intents'),
);
return this.destroy({ code });
}
@@ -916,6 +923,6 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
}
private debug(messages: [string, ...string[]]) {
this.emit(WebSocketShardEvents.Debug, { message: messages.join('\n\t') });
this.emit(WebSocketShardEvents.Debug, messages.join('\n\t'));
}
}