Add client shard eval/client property fetching, and improve existing shard stuff

This commit is contained in:
Schuyler Cebulskie
2016-09-27 02:19:53 -04:00
parent c26fc49caf
commit e5a987e0ef
7 changed files with 177 additions and 71 deletions

File diff suppressed because one or more lines are too long

View File

@@ -10,7 +10,7 @@ const WebSocketManager = require('./websocket/WebSocketManager');
const ActionsManager = require('./actions/ActionsManager');
const Collection = require('../util/Collection');
const Presence = require('../structures/Presence').Presence;
const ShardUtil = require('../sharding/ShardUtil');
const ShardClientUtil = require('../sharding/ShardClientUtil');
/**
* The starting point for making a Discord Bot.
@@ -90,7 +90,7 @@ class Client extends EventEmitter {
* The shard helpers for the client (only if the process was spawned as a child, such as from a ShardingManager)
* @type {?ShardUtil}
*/
this.shard = process.send ? ShardUtil.singleton(this) : null;
this.shard = process.send ? ShardClientUtil.singleton(this) : null;
if (this.shard) process.on('message', this.shard._handleMessage.bind(this.shard));
/**
@@ -339,6 +339,10 @@ class Client extends EventEmitter {
}
this.presences.set(id, new Presence(presence));
}
_eval(script) {
return eval(script);
}
}
module.exports = Client;

View File

@@ -1,5 +1,6 @@
const childProcess = require('child_process');
const path = require('path');
const makeError = require('../util/MakeError');
/**
* Represents a Shard spawned by the ShardingManager.
@@ -32,11 +33,7 @@ class Shard {
SHARD_COUNT: this.manager.totalShards,
},
});
this.process.on('message', message => {
this.manager.emit('message', this, message);
});
this.process.on('message', message => { this.manager.emit('message', this, message); });
this.process.once('exit', () => {
if (this.manager.respawn) this.manager.createShard(this.id);
});
@@ -69,20 +66,10 @@ class Shard {
const promise = new Promise((resolve, reject) => {
const listener = message => {
if (!message) return;
if (message._evalResult) {
this.process.removeListener('message', listener);
this._evals.delete(script);
resolve(message._evalResult);
} else if (message._evalError) {
this.process.removeListener('message', listener);
const err = new Error(message._evalError.message, message._evalError.fileName, message._evalError.lineNumber);
err.name = message._evalError.name;
err.columnNumber = message._evalError.columnNumber;
err.stack = message._evalError.stack;
this._evals.delete(script);
reject(err);
}
if (!message || message._eval !== script) return;
this.process.removeListener('message', listener);
this._evals.delete(script);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
this.process.on('message', listener);
@@ -111,10 +98,10 @@ class Shard {
const promise = new Promise((resolve, reject) => {
const listener = message => {
if (typeof message !== 'object' || message._fetchProp !== prop) return;
if (!message || message._fetchProp !== prop) return;
this.process.removeListener('message', listener);
this._fetches.delete(prop);
resolve(message._fetchPropValue);
resolve(message._result);
};
this.process.on('message', listener);

View File

@@ -0,0 +1,138 @@
const makeError = require('../util/MakeError');
/**
* Helper class for sharded clients spawned as a child process, such as from a ShardingManager
*/
class ShardClientUtil {
/**
* @param {Client} client Client of the current shard
*/
constructor(client) {
this.client = client;
}
/**
* ID of this shard
* @type {number}
*/
get id() {
return this.client.options.shard_id;
}
/**
* Total number of shards
* @type {number}
*/
get count() {
return this.client.options.shard_count;
}
/**
* Sends a message to the master process
* @param {*} message Message to send
* @returns {Promise<void>}
*/
send(message) {
return new Promise((resolve, reject) => {
const sent = process.send(message, err => {
if (err) reject(err); else resolve();
});
if (!sent) throw new Error('Failed to send message to master process.');
});
}
/**
* Evaluates a script on all shards, in the context of the Clients.
* @param {string} script JavaScript to run on each shard
* @returns {Promise<Array>} Results of the script execution
*/
broadcastEval(script) {
return new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._sEval !== script) return;
process.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
process.on('message', listener);
this.send({ _sEval: script }).catch(err => {
process.removeListener('message', listener);
reject(err);
});
});
}
/**
* Fetches a Client property value of each shard.
* @param {string} prop Name of the Client property to get, using periods for nesting
* @returns {Promise<Array>}
* @example
* manager.fetchClientValues('guilds.size').then(results => {
* console.log(`${results.reduce((prev, val) => prev + val, 0)} total guilds`);
* }).catch(console.error);
*/
fetchClientValues(prop) {
return new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._sFetchProp !== prop) return;
process.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
process.on('message', listener);
this.send({ _sFetchProp: prop }).catch(err => {
process.removeListener('message', listener);
reject(err);
});
});
}
/**
* Handles an IPC message
* @param {*} message Message received
* @private
*/
_handleMessage(message) {
if (!message) return;
if (message._eval) {
try {
this._respond('eval', { _eval: message._eval, _result: this.client._eval(message._eval) });
} catch (err) {
this._respond('eval', { _eval: message._eval, _error: err });
}
} else if (message._fetchProp) {
const props = message._fetchProp.split('.');
let value = this.client;
for (const prop of props) value = value[prop];
this._respond('fetchProp', { _fetchProp: message._fetchProp, _result: value });
}
}
/**
* Sends a message to the master process, emitting an error from the client upon failure
* @param {string} type Type of response to send
* @param {*} message Message to send
* @private
*/
_respond(type, message) {
this.send(message).catch(err =>
this.client.emit('error', `Error when sending ${type} response to master process: ${err}`)
);
}
/**
* Creates/gets the singleton of this class
* @param {Client} client Client to use
* @returns {ShardUtil}
*/
static singleton(client) {
if (!this._singleton) {
this._singleton = new this(client);
} else {
client.emit('error', 'Multiple clients created in child process; only the first will handle sharding helpers.');
}
return this._singleton;
}
}
module.exports = ShardClientUtil;

View File

@@ -1,47 +0,0 @@
/**
* Helper class for sharded clients
*/
class ShardUtil {
/**
* @param {Client} client Client of the current shard
*/
constructor(client) {
this.client = client;
}
/**
* Handles an IPC message
* @param {*} message Message received
*/
_handleMessage(message) {
if (!message) return;
if (message._eval) {
try {
process.send({ _evalResult: eval(message._eval) });
} catch (err) {
process.send({ _evalError: err });
}
} else if (message._fetchProp) {
const props = message._fetchProp.split('.');
let value = this.client;
for (const prop of props) value = value[prop];
process.send({ _fetchProp: message._fetchProp, _fetchPropValue: value });
}
}
/**
* Creates/gets the singleton of this class
* @param {Client} client Client to use
* @returns {ShardUtil}
*/
static singleton(client) {
if (!this._singleton) {
this._singleton = new this(client);
} else {
client.emit('error', 'Multiple clients created in child process; only the first will handle sharding helpers.');
}
return this._singleton;
}
}
module.exports = ShardUtil;

View File

@@ -41,6 +41,10 @@ class ShardingManager extends EventEmitter {
if (totalShards < 1) throw new RangeError('Amount of shards must be at least 1.');
if (totalShards !== Math.floor(totalShards)) throw new RangeError('Amount of shards must be an integer.');
/**
* Whether shards should automatically respawn upon exiting
* @type {boolean}
*/
this.respawn = respawn;
/**
@@ -48,6 +52,19 @@ class ShardingManager extends EventEmitter {
* @type {Collection<number, Shard>}
*/
this.shards = new Collection();
this.on('message', (shard, message) => {
if (!message) return;
if (message._sEval) {
this.broadcastEval(message._sEval)
.then(results => shard.send({ _sEval: message._sEval, _result: results }))
.catch(err => shard.send({ _sEval: message._sEval, _error: err }));
} else if (message._sFetchProp) {
this.fetchClientValues(message._sFetchProp)
.then(results => shard.send({ _sFetchProp: message._sFetchProp, _result: results }))
.catch(err => shard.send({ _sFetchProp: message._sFetchProp, _error: err }));
}
});
}
/**

7
src/util/MakeError.js Normal file
View File

@@ -0,0 +1,7 @@
module.exports = function makeError(obj) {
const err = new Error(obj.message, obj.fileName, obj.lineNumber);
err.name = obj.name;
err.columnNumber = obj.columnNumber;
err.stack = obj.stack;
return err;
};