diff --git a/src/sharding/Shard.js b/src/sharding/Shard.js index 9bb157623..b6fddb420 100644 --- a/src/sharding/Shard.js +++ b/src/sharding/Shard.js @@ -1,17 +1,19 @@ const childProcess = require('child_process'); +const EventEmitter = require('events'); const path = require('path'); const Util = require('../util/Util'); /** * Represents a Shard spawned by the ShardingManager. */ -class Shard { +class Shard extends EventEmitter { /** * @param {ShardingManager} manager The sharding manager * @param {number} id The ID of this shard * @param {Array} [args=[]] Command line arguments to pass to the script */ constructor(manager, id, args = []) { + super(); /** * Manager that created the shard * @type {ShardingManager} @@ -35,19 +37,77 @@ class Shard { }); /** - * Process of the shard - * @type {ChildProcess} + * Whether the shard's {@link Client} is ready + * @type {boolean} */ - this.process = childProcess.fork(path.resolve(this.manager.file), args, { - env: this.env, - }); - this.process.on('message', this._handleMessage.bind(this)); - this.process.once('exit', () => { - if (this.manager.respawn) this.manager.createShard(this.id); - }); + this.ready = false; this._evals = new Map(); this._fetches = new Map(); + + /** + * Listener function for the {@link ChildProcess}' `exit` event + * @type {Function} + * @private + */ + this._exitListener = this._handleExit.bind(this, undefined); + + /** + * Process of the shard + * @type {ChildProcess} + */ + this.process = null; + + this.spawn(args); + } + + /** + * Forks a child process for the shard. + * You should not need to call this manually. + * @param {Array} [args=this.manager.args] Command line arguments to pass to the script + * @param {Array} [execArgv=this.manager.execArgv] Command line arguments to pass to the process executable + * @returns {ChildProcess} + */ + spawn(args = this.manager.args, execArgv = this.manager.execArgv) { + this.process = childProcess.fork(path.resolve(this.manager.file), args, { + env: this.env, execArgv, + }) + .on('exit', this._exitListener) + .on('message', this._handleMessage.bind(this)); + + /** + * Emitted upon the creation of the shard's child process. + * @event Shard#spawn + * @param {ChildProcess} process Child process that was created + */ + this.emit('spawn', this.process); + + return new Promise((resolve, reject) => { + this.once('ready', resolve); + this.once('disconnect', () => reject(new Error(`Shard ${this.id}'s Client disconnected before becoming ready.`))); + this.once('death', () => reject(new Error(`Shard ${this.id}'s process exited before its Client became ready.`))); + setTimeout(() => reject(new Error(`Shard ${this.id}'s Client took too long to become ready.`)), 30000); + }).then(() => this.process); + } + + /** + * Immediately kills the shard's process and does not restart it. + */ + kill() { + this.process.removeListener('exit', this._exitListener); + this.process.kill(); + this._handleExit(false); + } + + /** + * Kills and restarts the shard's process. + * @param {number} [delay=500] How long to wait between killing the process and restarting it (in milliseconds) + * @returns {Promise} + */ + respawn(delay = 500) { + this.kill(); + if (delay > 0) return Util.delayFor(delay).then(() => this.spawn()); + return this.spawn(); } /** @@ -130,6 +190,39 @@ class Shard { */ _handleMessage(message) { if (message) { + // Shard is ready + if (message._ready) { + this.ready = true; + /** + * Emitted upon the shard's {@link Client#ready} event. + * @event Shard#ready + */ + this.emit('ready'); + return; + } + + // Shard has disconnected + if (message._disconnect) { + this.ready = false; + /** + * Emitted upon the shard's {@link Client#disconnect} event. + * @event Shard#disconnect + */ + this.emit('disconnect'); + return; + } + + // Shard is attempting to reconnect + if (message._reconnecting) { + this.ready = false; + /** + * Emitted upon the shard's {@link Client#reconnecting} event. + * @event Shard#reconnecting + */ + this.emit('reconnecting'); + return; + } + // Shard is requesting a property fetch if (message._sFetchProp) { this.manager.fetchClientValues(message._sFetchProp).then( @@ -156,6 +249,33 @@ class Shard { * @param {*} message Message that was received */ this.manager.emit('message', this, message); + + /** + * Emitted upon recieving a message from the child process. + * @event Shard#message + * @param {*} message Message that was received + */ + this.emit('message', message); + } + + /** + * Handles the shard's process exiting. + * @param {boolean} [respawn=this.manager.respawn] Whether to spawn the shard again + * @private + */ + _handleExit(respawn = this.manager.respawn) { + /** + * Emitted upon the shard's child process exiting. + * @event Shard#death + * @param {ChildProcess} process Child process that exited + */ + this.emit('death', this.process); + + this.process = null; + this._evals.clear(); + this._fetches.clear(); + + if (respawn) this.manager.createShard(this.id); } } diff --git a/src/sharding/ShardClientUtil.js b/src/sharding/ShardClientUtil.js index 8541c3578..28571f1f9 100644 --- a/src/sharding/ShardClientUtil.js +++ b/src/sharding/ShardClientUtil.js @@ -10,6 +10,9 @@ class ShardClientUtil { constructor(client) { this.client = client; process.on('message', this._handleMessage.bind(this)); + client.on('ready', () => { process.send({ _ready: true }); }); + client.on('disconnect', () => { process.send({ _disconnect: true }); }); + client.on('reconnecting', () => { process.send({ _reconnecting: true }); }); } /** diff --git a/src/sharding/ShardingManager.js b/src/sharding/ShardingManager.js index aa40547bd..d5e31edad 100644 --- a/src/sharding/ShardingManager.js +++ b/src/sharding/ShardingManager.js @@ -66,6 +66,12 @@ class ShardingManager extends EventEmitter { */ this.shardArgs = options.shardArgs; + /** + * Arguments for the shard's process executable + * @type {?string[]} + */ + this.execArgv = options.execArgv; + /** * Token to use for obtaining the automatic shard count, and passing to shards * @type {?string} @@ -189,6 +195,26 @@ class ShardingManager extends EventEmitter { for (const shard of this.shards.values()) promises.push(shard.fetchClientValue(prop)); return Promise.all(promises); } + + /** + * Kills all running shards and respawns them. + * @param {number} [shardDelay=5000] How long to wait between shards (in milliseconds) + * @param {number} [respawnDelay=500] How long to wait between killing a shard's process and restarting it + * (in milliseconds) + * @param {boolean} [waitForReady=true] Whether to wait for a shard to become ready before continuing to another + * @param {number} [currentShardIndex=0] The shard index to start respawning at + * @returns {Promise>} + */ + respawnAll(shardDelay = 5000, respawnDelay = 500, waitForReady = true, currentShardIndex = 0) { + let s = 0; + const shard = this.shards.get(currentShardIndex); + const promises = [shard.respawn(respawnDelay, waitForReady)]; + if (++s < this.shards.size && shardDelay > 0) promises.push(Util.delayFor(shardDelay)); + return Promise.all(promises).then(() => { + if (++currentShardIndex === this.shards.size) return this.shards; + return this.respawnAll(shardDelay, respawnDelay, waitForReady, currentShardIndex); + }); + } } module.exports = ShardingManager; diff --git a/src/util/Util.js b/src/util/Util.js index 2be7c5658..7fa6d33fa 100644 --- a/src/util/Util.js +++ b/src/util/Util.js @@ -202,6 +202,18 @@ class Util { } return array.indexOf(element); } + + /** + * Creates a Promise that resolves after a specified duration. + * @param {number} ms How long to wait before resolving (in milliseconds) + * @returns {Promise} + * @private + */ + static delayFor(ms) { + return new Promise(resolve => { + setTimeout(resolve, ms); + }); + } } module.exports = Util;