From ab3a43919890f0a45812fd91788ea911a106f937 Mon Sep 17 00:00:00 2001 From: Schuyler Cebulskie Date: Mon, 29 Oct 2018 15:02:36 -0400 Subject: [PATCH] Add worker-based sharding to the ShardingManager (#2908) * Add worker-based sharding mode to ShardingManager * Fix ClientShardUtil mode * Fix worker not being cleared on shard death * Update docs and typings * Clean up Client sharding logic a bit * Add info about requirements for worker mode --- src/client/Client.js | 34 ++++++++-- src/sharding/Shard.js | 113 ++++++++++++++++++++------------ src/sharding/ShardClientUtil.js | 74 +++++++++++++++------ src/sharding/ShardingManager.js | 40 +++++++++-- typings/index.d.ts | 11 +++- 5 files changed, 196 insertions(+), 76 deletions(-) diff --git a/src/client/Client.js b/src/client/Client.js index b87d21c94..4b3160dfe 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -31,12 +31,30 @@ class Client extends BaseClient { constructor(options = {}) { super(Object.assign({ _tokenType: 'Bot' }, options)); - // Obtain shard details from environment - if (!browser && !this.options.shardId && 'SHARD_ID' in process.env) { - this.options.shardId = Number(process.env.SHARD_ID); - } - if (!browser && !this.options.shardCount && 'SHARD_COUNT' in process.env) { - this.options.shardCount = Number(process.env.SHARD_COUNT); + // Figure out the shard details + if (!browser && process.env.SHARDING_MANAGER) { + // Try loading workerData if it's present + let workerData; + try { + workerData = require('worker_threads').workerData; + } catch (err) { + // Do nothing + } + + if (!this.options.shardId) { + if (workerData && 'SHARD_ID' in workerData) { + this.options.shardId = workerData.SHARD_ID; + } else if ('SHARD_ID' in process.env) { + this.options.shardId = Number(process.env.SHARD_ID); + } + } + if (!this.options.shardCount) { + if (workerData && 'SHARD_COUNT' in workerData) { + this.options.shardCount = workerData.SHARD_COUNT; + } else if ('SHARD_COUNT' in process.env) { + this.options.shardCount = Number(process.env.SHARD_COUNT); + } + } } this._validateOptions(); @@ -73,7 +91,9 @@ class Client extends BaseClient { * Shard helpers for the client (only if the process was spawned from a {@link ShardingManager}) * @type {?ShardClientUtil} */ - this.shard = !browser && process.env.SHARDING_MANAGER ? ShardClientUtil.singleton(this) : null; + this.shard = !browser && process.env.SHARDING_MANAGER ? + ShardClientUtil.singleton(this, process.env.SHARDING_MANAGER_MODE) : + null; /** * All of the {@link User} objects that have been cached at any point, mapped by their IDs diff --git a/src/sharding/Shard.js b/src/sharding/Shard.js index 1fb479752..a43bb7e92 100644 --- a/src/sharding/Shard.js +++ b/src/sharding/Shard.js @@ -1,13 +1,14 @@ -const childProcess = require('child_process'); const EventEmitter = require('events'); const path = require('path'); const Util = require('../util/Util'); const { Error } = require('../errors'); +let childProcess = null; +let Worker = null; /** * A self-contained shard created by the {@link ShardingManager}. Each one has a {@link ChildProcess} that contains - * an instance of the bot and its {@link Client}. When its child process exits for any reason, the shard will spawn a - * new one to replace it as necessary. + * an instance of the bot and its {@link Client}. When its child process/worker exits for any reason, the shard will + * spawn a new one to replace it as necessary. * @extends EventEmitter */ class Shard extends EventEmitter { @@ -18,6 +19,9 @@ class Shard extends EventEmitter { constructor(manager, id) { super(); + if (manager.mode === 'process') childProcess = require('child_process'); + else if (manager.mode === 'worker') Worker = require('worker_threads').Worker; + /** * Manager that created the shard * @type {ShardingManager} @@ -31,26 +35,24 @@ class Shard extends EventEmitter { this.id = id; /** - * Arguments for the shard's process + * Arguments for the shard's process (only when {@link ShardingManager#mode} is `process`) * @type {string[]} */ this.args = manager.shardArgs || []; /** - * Arguments for the shard's process executable + * Arguments for the shard's process executable (only when {@link ShardingManager#mode} is `process`) * @type {?string[]} */ this.execArgv = manager.execArgv; /** - * Environment variables for the shard's process + * Environment variables for the shard's process, or workerData for the shard's worker * @type {Object} */ this.env = Object.assign({}, process.env, { - SHARDING_MANAGER: true, SHARD_ID: this.id, SHARD_COUNT: this.manager.totalShards, - DISCORD_TOKEN: this.manager.token, }); /** @@ -60,11 +62,17 @@ class Shard extends EventEmitter { this.ready = false; /** - * Process of the shard + * Process of the shard (if {@link ShardingManager#mode} is `process`) * @type {?ChildProcess} */ this.process = null; + /** + * Worker of the shard (if {@link ShardingManager#mode} is `worker`) + * @type {?Worker} + */ + this.worker = null; + /** * Ongoing promises for calls to {@link Shard#eval}, mapped by the `script` they were called with * @type {Map} @@ -88,49 +96,62 @@ class Shard extends EventEmitter { } /** - * Forks a child process for the shard. + * Forks a child process or creates a worker thread for the shard. * You should not need to call this manually. * @param {boolean} [waitForReady=true] Whether to wait until the {@link Client} has become ready before resolving * @returns {Promise} */ async spawn(waitForReady = true) { if (this.process) throw new Error('SHARDING_PROCESS_EXISTS', this.id); + if (this.worker) throw new Error('SHARDING_WORKER_EXISTS', this.id); - this.process = childProcess.fork(path.resolve(this.manager.file), this.args, { - env: this.env, execArgv: this.execArgv, - }) - .on('message', this._handleMessage.bind(this)) - .on('exit', this._exitListener); + if (this.manager.mode === 'process') { + this.process = childProcess.fork(path.resolve(this.manager.file), this.args, { + env: this.env, execArgv: this.execArgv, + }) + .on('message', this._handleMessage.bind(this)) + .on('exit', this._exitListener); + } else if (this.manager.mode === 'worker') { + this.worker = new Worker(path.resolve(this.manager.file), { workerData: this.env }) + .on('message', this._handleMessage.bind(this)) + .on('exit', this._exitListener); + } /** - * Emitted upon the creation of the shard's child process. + * Emitted upon the creation of the shard's child process/worker. * @event Shard#spawn - * @param {ChildProcess} process Child process that was created + * @param {ChildProcess|Worker} process Child process/worker that was created */ - this.emit('spawn', this.process); + this.emit('spawn', this.process || this.worker); - if (!waitForReady) return this.process; + if (!waitForReady) return this.process || this.worker; await new Promise((resolve, reject) => { this.once('ready', resolve); this.once('disconnect', () => reject(new Error('SHARDING_READY_DISCONNECTED', this.id))); this.once('death', () => reject(new Error('SHARDING_READY_DIED', this.id))); setTimeout(() => reject(new Error('SHARDING_READY_TIMEOUT', this.id)), 30000); }); - return this.process; + return this.process || this.worker; } /** - * Immediately kills the shard's process and does not restart it. + * Immediately kills the shard's process/worker and does not restart it. */ kill() { - this.process.removeListener('exit', this._exitListener); - this.process.kill(); + if (this.process) { + this.process.removeListener('exit', this._exitListener); + this.process.kill(); + } else { + this.worker.removeListener('exit', this._exitListener); + this.worker.terminate(); + } + this._handleExit(false); } /** - * Kills and restarts the shard's process. - * @param {number} [delay=500] How long to wait between killing the process and restarting it (in milliseconds) + * Kills and restarts the shard's process/worker. + * @param {number} [delay=500] How long to wait between killing the process/worker and restarting it (in milliseconds) * @param {boolean} [waitForReady=true] Whether to wait until the {@link Client} has become ready before resolving * @returns {Promise} */ @@ -141,15 +162,20 @@ class Shard extends EventEmitter { } /** - * Sends a message to the shard's process. + * Sends a message to the shard's process/worker. * @param {*} message Message to send to the shard * @returns {Promise} */ send(message) { return new Promise((resolve, reject) => { - this.process.send(message, err => { - if (err) reject(err); else resolve(this); - }); + if (this.process) { + this.process.send(message, err => { + if (err) reject(err); else resolve(this); + }); + } else { + this.worker.postMessage(message); + resolve(this); + } }); } @@ -166,16 +192,18 @@ class Shard extends EventEmitter { if (this._fetches.has(prop)) return this._fetches.get(prop); const promise = new Promise((resolve, reject) => { + const child = this.process || this.worker; + const listener = message => { if (!message || message._fetchProp !== prop) return; - this.process.removeListener('message', listener); + child.removeListener('message', listener); this._fetches.delete(prop); resolve(message._result); }; - this.process.on('message', listener); + child.on('message', listener); this.send({ _fetchProp: prop }).catch(err => { - this.process.removeListener('message', listener); + child.removeListener('message', listener); this._fetches.delete(prop); reject(err); }); @@ -194,17 +222,19 @@ class Shard extends EventEmitter { if (this._evals.has(script)) return this._evals.get(script); const promise = new Promise((resolve, reject) => { + const child = this.process || this.worker; + const listener = message => { if (!message || message._eval !== script) return; - this.process.removeListener('message', listener); + child.removeListener('message', listener); this._evals.delete(script); if (!message._error) resolve(message._result); else reject(Util.makeError(message._error)); }; - this.process.on('message', listener); + child.on('message', listener); const _eval = typeof script === 'function' ? `(${script})(this)` : script; this.send({ _eval }).catch(err => { - this.process.removeListener('message', listener); + child.removeListener('message', listener); this._evals.delete(script); reject(err); }); @@ -215,7 +245,7 @@ class Shard extends EventEmitter { } /** - * Handles an IPC message. + * Handles a message received from the child process/worker. * @param {*} message Message received * @private */ @@ -283,7 +313,7 @@ class Shard extends EventEmitter { } /** - * Emitted upon recieving a message from the child process. + * Emitted upon recieving a message from the child process/worker. * @event Shard#message * @param {*} message Message that was received */ @@ -291,20 +321,21 @@ class Shard extends EventEmitter { } /** - * Handles the shard's process exiting. + * Handles the shard's process/worker exiting. * @param {boolean} [respawn=this.manager.respawn] Whether to spawn the shard again * @private */ _handleExit(respawn = this.manager.respawn) { /** - * Emitted upon the shard's child process exiting. + * Emitted upon the shard's child process/worker exiting. * @event Shard#death - * @param {ChildProcess} process Child process that exited + * @param {ChildProcess|Worker} process Child process/worker that exited */ - this.emit('death', this.process); + this.emit('death', this.process || this.worker); this.ready = false; this.process = null; + this.worker = null; this._evals.clear(); this._fetches.clear(); diff --git a/src/sharding/ShardClientUtil.js b/src/sharding/ShardClientUtil.js index 07c282ebf..4aa35a3b2 100644 --- a/src/sharding/ShardClientUtil.js +++ b/src/sharding/ShardClientUtil.js @@ -2,19 +2,45 @@ const Util = require('../util/Util'); const { Events } = require('../util/Constants'); /** - * Helper class for sharded clients spawned as a child process, such as from a {@link ShardingManager}. + * Helper class for sharded clients spawned as a child process/worker, such as from a {@link ShardingManager}. * Utilises IPC to send and receive data to/from the master process and other shards. */ class ShardClientUtil { /** * @param {Client} client Client of the current shard + * @param {ShardingManagerMode} mode Mode the shard was spawned with */ - constructor(client) { + constructor(client, mode) { + /** + * Client for the shard + * @type {Client} + */ this.client = client; - process.on('message', this._handleMessage.bind(this)); - client.on('ready', () => { process.send({ _ready: true }); }); - client.on('disconnect', () => { process.send({ _disconnect: true }); }); - client.on('reconnecting', () => { process.send({ _reconnecting: true }); }); + + /** + * Mode the shard was spawned with + * @type {ShardingManagerMode} + */ + this.mode = mode; + + /** + * Message port for the master process (only when {@link ShardClientUtil#mode} is `worker`) + * @type {?MessagePort} + */ + this.parentPort = null; + + if (mode === 'process') { + process.on('message', this._handleMessage.bind(this)); + client.on('ready', () => { process.send({ _ready: true }); }); + client.on('disconnect', () => { process.send({ _disconnect: true }); }); + client.on('reconnecting', () => { process.send({ _reconnecting: true }); }); + } else if (mode === 'worker') { + this.parentPort = require('worker_threads').parentPort; + this.parentPort.on('message', this._handleMessage.bind(this)); + client.on('ready', () => { this.parentPort.postMessage({ _ready: true }); }); + client.on('disconnect', () => { this.parentPort.postMessage({ _disconnect: true }); }); + client.on('reconnecting', () => { this.parentPort.postMessage({ _reconnecting: true }); }); + } } /** @@ -42,9 +68,14 @@ class ShardClientUtil { */ send(message) { return new Promise((resolve, reject) => { - process.send(message, err => { - if (err) reject(err); else resolve(); - }); + if (this.mode === 'process') { + process.send(message, err => { + if (err) reject(err); else resolve(); + }); + } else if (this.mode === 'worker') { + this.parentPort.postMessage(message); + resolve(); + } }); } @@ -60,15 +91,17 @@ class ShardClientUtil { */ fetchClientValues(prop) { return new Promise((resolve, reject) => { + const parent = this.parentPort || process; + const listener = message => { if (!message || message._sFetchProp !== prop) return; - process.removeListener('message', listener); + parent.removeListener('message', listener); if (!message._error) resolve(message._result); else reject(Util.makeError(message._error)); }; - process.on('message', listener); + parent.on('message', listener); this.send({ _sFetchProp: prop }).catch(err => { - process.removeListener('message', listener); + parent.removeListener('message', listener); reject(err); }); }); @@ -86,16 +119,18 @@ class ShardClientUtil { */ broadcastEval(script) { return new Promise((resolve, reject) => { + const parent = this.parentPort || process; script = typeof script === 'function' ? `(${script})(this)` : script; + const listener = message => { if (!message || message._sEval !== script) return; - process.removeListener('message', listener); + parent.removeListener('message', listener); if (!message._error) resolve(message._result); else reject(Util.makeError(message._error)); }; - process.on('message', listener); + parent.on('message', listener); this.send({ _sEval: script }).catch(err => { - process.removeListener('message', listener); + parent.removeListener('message', listener); reject(err); }); }); @@ -104,7 +139,7 @@ class ShardClientUtil { /** * Requests a respawn of all shards. * @param {number} [shardDelay=5000] How long to wait between shards (in milliseconds) - * @param {number} [respawnDelay=500] How long to wait between killing a shard's process and restarting it + * @param {number} [respawnDelay=500] How long to wait between killing a shard's process/worker and restarting it * (in milliseconds) * @param {boolean} [waitForReady=true] Whether to wait for a shard to become ready before continuing to another * @returns {Promise} Resolves upon the message being sent @@ -151,14 +186,15 @@ class ShardClientUtil { /** * Creates/gets the singleton of this class. * @param {Client} client The client to use + * @param {ShardingManagerMode} mode Mode the shard was spawned with * @returns {ShardClientUtil} */ - static singleton(client) { + static singleton(client, mode) { if (!this._singleton) { - this._singleton = new this(client); + this._singleton = new this(client, mode); } else { client.emit(Events.WARN, - 'Multiple clients created in child process; only the first will handle sharding helpers.'); + 'Multiple clients created in child process/worker; only the first will handle sharding helpers.'); } return this._singleton; } diff --git a/src/sharding/ShardingManager.js b/src/sharding/ShardingManager.js index bbec15b67..7cedc37ef 100644 --- a/src/sharding/ShardingManager.js +++ b/src/sharding/ShardingManager.js @@ -8,29 +8,42 @@ const { Error, TypeError, RangeError } = require('../errors'); /** * This is a utility class that makes multi-process sharding of a bot an easy and painless experience. - * It works by spawning a self-contained {@link ChildProcess} for each individual shard, each containing its own - * instance of your bot's {@link Client}. They all have a line of communication with the master process, and there are - * several useful methods that utilise it in order to simplify tasks that are normally difficult with sharding. It can - * spawn a specific number of shards or the amount that Discord suggests for the bot, and takes a path to your main bot - * script to launch for each one. + * It works by spawning a self-contained {@link ChildProcess} or {@link Worker} for each individual shard, each + * containing its own instance of your bot's {@link Client}. They all have a line of communication with the master + * process, and there are several useful methods that utilise it in order to simplify tasks that are normally difficult + * with sharding. It can spawn a specific number of shards or the amount that Discord suggests for the bot, and takes a + * path to your main bot script to launch for each one. * @extends {EventEmitter} */ class ShardingManager extends EventEmitter { + /** + * The mode to spawn shards with for a {@link ShardingManager}: either "process" to use child processes, or + * "worker" to use workers. The "worker" mode relies on the experimental + * [Worker threads](https://nodejs.org/api/worker_threads.html) functionality that is present in Node v10.5.0 or + * newer. Node must be started with the `--experimental-worker` flag to expose it. + * @typedef {Object} ShardingManagerMode + */ + /** * @param {string} file Path to your shard script file * @param {Object} [options] Options for the sharding manager * @param {number|string} [options.totalShards='auto'] Number of shards to spawn, or "auto" + * @param {ShardingManagerMode} [options.mode='process'] Which mode to use for shards * @param {boolean} [options.respawn=true] Whether shards should automatically respawn upon exiting * @param {string[]} [options.shardArgs=[]] Arguments to pass to the shard script when spawning + * (only available when using the `process` mode) * @param {string[]} [options.execArgv=[]] Arguments to pass to the shard script executable when spawning + * (only available when using the `process` mode) * @param {string} [options.token] Token to use for automatic shard count and passing to shards */ constructor(file, options = {}) { super(); options = Util.mergeDefault({ totalShards: 'auto', + mode: 'process', respawn: true, shardArgs: [], + execArgv: [], token: process.env.DISCORD_TOKEN, }, options); @@ -59,6 +72,15 @@ class ShardingManager extends EventEmitter { } } + /** + * Mode for shards to spawn with + * @type {ShardingManagerMode} + */ + this.mode = options.mode; + if (this.mode !== 'process' && this.mode !== 'worker') { + throw new RangeError('CLIENT_INVALID_OPTION', 'Sharding mode', '"process" or "worker"'); + } + /** * Whether shards should automatically respawn upon exiting * @type {boolean} @@ -66,13 +88,13 @@ class ShardingManager extends EventEmitter { this.respawn = options.respawn; /** - * An array of arguments to pass to shards + * An array of arguments to pass to shards (only when {@link ShardingManager#mode} is `process`) * @type {string[]} */ this.shardArgs = options.shardArgs; /** - * An array of arguments to pass to the executable + * An array of arguments to pass to the executable (only when {@link ShardingManager#mode} is `process`) * @type {string[]} */ this.execArgv = options.execArgv; @@ -88,6 +110,10 @@ class ShardingManager extends EventEmitter { * @type {Collection} */ this.shards = new Collection(); + + process.env.SHARDING_MANAGER = true; + process.env.SHARDING_MANAGER_MODE = this.mode; + process.env.DISCORD_TOKEN = this.token; } /** diff --git a/typings/index.d.ts b/typings/index.d.ts index 70ff34b39..30fe364a9 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -921,6 +921,7 @@ declare module 'discord.js' { public manager: ShardingManager; public process: ChildProcess; public ready: boolean; + public worker: Worker; public eval(script: string): Promise; public eval(fn: (client: Client) => T): Promise; public fetchClientValue(prop: string): Promise; @@ -945,24 +946,28 @@ declare module 'discord.js' { } export class ShardClientUtil { - constructor(client: Client); + constructor(client: Client, mode: ShardingManagerMode); private _handleMessage(message: any): void; private _respond(type: string, message: any): void; + public client: Client; public readonly count: number; public readonly id: number; + public mode: ShardingManagerMode; + public parentPort: MessagePort; public broadcastEval(script: string): Promise; public broadcastEval(fn: (client: Client) => T): Promise; public fetchClientValues(prop: string): Promise; public respawnAll(shardDelay?: number, respawnDelay?: number, waitForReady?: boolean): Promise; public send(message: any): Promise; - public static singleton(client: Client): ShardClientUtil; + public static singleton(client: Client, mode: ShardingManagerMode): ShardClientUtil; } export class ShardingManager extends EventEmitter { constructor(file: string, options?: { totalShards?: number | 'auto'; + mode?: ShardingManagerMode; respawn?: boolean; shardArgs?: string[]; token?: string; @@ -2022,6 +2027,8 @@ declare module 'discord.js' { type RoleResolvable = Role | string; + type ShardingManagerMode = 'process' | 'worker'; + type Snowflake = string; type SplitOptions = {