Files
discord.js/packages/ws/src/utils/WorkerBootstrapper.ts
Almeida e2e71b4d09 build: bump dependencies (#10457)
* build: bump `@vladfrangu/async_event_emitter`

* chore: bump again + fixes

* build: bump types/node and some dev deps

* build: bump discord-api-types again

* style: remove unused eslint-ignore comment

* build: sync dependencies and update templates

* build: bump turbo

* build: vercel + vitest

* build: bump undici

---------

Co-authored-by: Vlad Frangu <me@vladfrangu.dev>
2024-08-22 17:33:35 +02:00

177 lines
4.8 KiB
TypeScript

import { isMainThread, parentPort, workerData } from 'node:worker_threads';
import { Collection } from '@discordjs/collection';
import type { Awaitable } from '@discordjs/util';
import { WorkerContextFetchingStrategy } from '../strategies/context/WorkerContextFetchingStrategy.js';
import {
WorkerReceivePayloadOp,
WorkerSendPayloadOp,
type WorkerData,
type WorkerReceivePayload,
type WorkerSendPayload,
} from '../strategies/sharding/WorkerShardingStrategy.js';
import type { WebSocketShardDestroyOptions } from '../ws/WebSocketShard.js';
import { WebSocketShardEvents, WebSocketShard } from '../ws/WebSocketShard.js';
/**
* Options for bootstrapping the worker
*/
export interface BootstrapOptions {
/**
* Shard events to just arbitrarily forward to the parent thread for the manager to emit
* Note: By default, this will include ALL events
* you most likely want to handle dispatch within the worker itself
*/
forwardEvents?: WebSocketShardEvents[];
/**
* Function to call when a shard is created for additional setup
*/
shardCallback?(shard: WebSocketShard): Awaitable<void>;
}
/**
* Utility class for bootstrapping a worker thread to be used for sharding
*/
export class WorkerBootstrapper {
/**
* The data passed to the worker thread
*/
protected readonly data = workerData as WorkerData;
/**
* The shards that are managed by this worker
*/
protected readonly shards = new Collection<number, WebSocketShard>();
public constructor() {
if (isMainThread) {
throw new Error('Expected WorkerBootstrap to not be used within the main thread');
}
}
/**
* Helper method to initiate a shard's connection process
*/
protected async connect(shardId: number): Promise<void> {
const shard = this.shards.get(shardId);
if (!shard) {
throw new RangeError(`Shard ${shardId} does not exist`);
}
await shard.connect();
}
/**
* Helper method to destroy a shard
*/
protected async destroy(shardId: number, options?: WebSocketShardDestroyOptions): Promise<void> {
const shard = this.shards.get(shardId);
if (!shard) {
throw new RangeError(`Shard ${shardId} does not exist`);
}
await shard.destroy(options);
}
/**
* Helper method to attach event listeners to the parentPort
*/
protected setupThreadEvents(): void {
parentPort!
.on('messageerror', (err) => {
throw err;
})
.on('message', async (payload: WorkerSendPayload) => {
switch (payload.op) {
case WorkerSendPayloadOp.Connect: {
await this.connect(payload.shardId);
const response: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.Connected,
shardId: payload.shardId,
};
parentPort!.postMessage(response);
break;
}
case WorkerSendPayloadOp.Destroy: {
await this.destroy(payload.shardId, payload.options);
const response: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.Destroyed,
shardId: payload.shardId,
};
parentPort!.postMessage(response);
break;
}
case WorkerSendPayloadOp.Send: {
const shard = this.shards.get(payload.shardId);
if (!shard) {
throw new RangeError(`Shard ${payload.shardId} does not exist`);
}
await shard.send(payload.payload);
break;
}
case WorkerSendPayloadOp.SessionInfoResponse: {
break;
}
case WorkerSendPayloadOp.ShardIdentifyResponse: {
break;
}
case WorkerSendPayloadOp.FetchStatus: {
const shard = this.shards.get(payload.shardId);
if (!shard) {
throw new Error(`Shard ${payload.shardId} does not exist`);
}
const response: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.FetchStatusResponse,
status: shard.status,
nonce: payload.nonce,
};
parentPort!.postMessage(response);
break;
}
}
});
}
/**
* Bootstraps the worker thread with the provided options
*/
public async bootstrap(options: Readonly<BootstrapOptions> = {}): Promise<void> {
// Start by initializing the shards
for (const shardId of this.data.shardIds) {
const shard = new WebSocketShard(new WorkerContextFetchingStrategy(this.data), shardId);
for (const event of options.forwardEvents ?? Object.values(WebSocketShardEvents)) {
shard.on(event, (...args: unknown[]) => {
const payload: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.Event,
event,
data: args,
shardId,
};
parentPort!.postMessage(payload);
});
}
// Any additional setup the user might want to do
await options.shardCallback?.(shard);
this.shards.set(shardId, shard);
}
// Lastly, start listening to messages from the parent thread
this.setupThreadEvents();
const message: WorkerReceivePayload = {
op: WorkerReceivePayloadOp.WorkerReady,
};
parentPort!.postMessage(message);
}
}