Add worker-based sharding to the ShardingManager (#2908)

* Add worker-based sharding mode to ShardingManager

* Fix ClientShardUtil mode

* Fix worker not being cleared on shard death

* Update docs and typings

* Clean up Client sharding logic a bit

* Add info about requirements for worker mode
This commit is contained in:
Schuyler Cebulskie
2018-10-29 15:02:36 -04:00
committed by GitHub
parent b759fc415e
commit ab3a439198
5 changed files with 196 additions and 76 deletions

View File

@@ -2,19 +2,45 @@ const Util = require('../util/Util');
const { Events } = require('../util/Constants');
/**
* Helper class for sharded clients spawned as a child process, such as from a {@link ShardingManager}.
* Helper class for sharded clients spawned as a child process/worker, such as from a {@link ShardingManager}.
* Utilises IPC to send and receive data to/from the master process and other shards.
*/
class ShardClientUtil {
/**
* @param {Client} client Client of the current shard
* @param {ShardingManagerMode} mode Mode the shard was spawned with
*/
constructor(client) {
constructor(client, mode) {
/**
* Client for the shard
* @type {Client}
*/
this.client = client;
process.on('message', this._handleMessage.bind(this));
client.on('ready', () => { process.send({ _ready: true }); });
client.on('disconnect', () => { process.send({ _disconnect: true }); });
client.on('reconnecting', () => { process.send({ _reconnecting: true }); });
/**
* Mode the shard was spawned with
* @type {ShardingManagerMode}
*/
this.mode = mode;
/**
* Message port for the master process (only when {@link ShardClientUtil#mode} is `worker`)
* @type {?MessagePort}
*/
this.parentPort = null;
if (mode === 'process') {
process.on('message', this._handleMessage.bind(this));
client.on('ready', () => { process.send({ _ready: true }); });
client.on('disconnect', () => { process.send({ _disconnect: true }); });
client.on('reconnecting', () => { process.send({ _reconnecting: true }); });
} else if (mode === 'worker') {
this.parentPort = require('worker_threads').parentPort;
this.parentPort.on('message', this._handleMessage.bind(this));
client.on('ready', () => { this.parentPort.postMessage({ _ready: true }); });
client.on('disconnect', () => { this.parentPort.postMessage({ _disconnect: true }); });
client.on('reconnecting', () => { this.parentPort.postMessage({ _reconnecting: true }); });
}
}
/**
@@ -42,9 +68,14 @@ class ShardClientUtil {
*/
send(message) {
return new Promise((resolve, reject) => {
process.send(message, err => {
if (err) reject(err); else resolve();
});
if (this.mode === 'process') {
process.send(message, err => {
if (err) reject(err); else resolve();
});
} else if (this.mode === 'worker') {
this.parentPort.postMessage(message);
resolve();
}
});
}
@@ -60,15 +91,17 @@ class ShardClientUtil {
*/
fetchClientValues(prop) {
return new Promise((resolve, reject) => {
const parent = this.parentPort || process;
const listener = message => {
if (!message || message._sFetchProp !== prop) return;
process.removeListener('message', listener);
parent.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(Util.makeError(message._error));
};
process.on('message', listener);
parent.on('message', listener);
this.send({ _sFetchProp: prop }).catch(err => {
process.removeListener('message', listener);
parent.removeListener('message', listener);
reject(err);
});
});
@@ -86,16 +119,18 @@ class ShardClientUtil {
*/
broadcastEval(script) {
return new Promise((resolve, reject) => {
const parent = this.parentPort || process;
script = typeof script === 'function' ? `(${script})(this)` : script;
const listener = message => {
if (!message || message._sEval !== script) return;
process.removeListener('message', listener);
parent.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(Util.makeError(message._error));
};
process.on('message', listener);
parent.on('message', listener);
this.send({ _sEval: script }).catch(err => {
process.removeListener('message', listener);
parent.removeListener('message', listener);
reject(err);
});
});
@@ -104,7 +139,7 @@ class ShardClientUtil {
/**
* Requests a respawn of all shards.
* @param {number} [shardDelay=5000] How long to wait between shards (in milliseconds)
* @param {number} [respawnDelay=500] How long to wait between killing a shard's process and restarting it
* @param {number} [respawnDelay=500] How long to wait between killing a shard's process/worker and restarting it
* (in milliseconds)
* @param {boolean} [waitForReady=true] Whether to wait for a shard to become ready before continuing to another
* @returns {Promise<void>} Resolves upon the message being sent
@@ -151,14 +186,15 @@ class ShardClientUtil {
/**
* Creates/gets the singleton of this class.
* @param {Client} client The client to use
* @param {ShardingManagerMode} mode Mode the shard was spawned with
* @returns {ShardClientUtil}
*/
static singleton(client) {
static singleton(client, mode) {
if (!this._singleton) {
this._singleton = new this(client);
this._singleton = new this(client, mode);
} else {
client.emit(Events.WARN,
'Multiple clients created in child process; only the first will handle sharding helpers.');
'Multiple clients created in child process/worker; only the first will handle sharding helpers.');
}
return this._singleton;
}