Cleanup and reorganise some shard stuff

This commit is contained in:
Schuyler Cebulskie
2016-09-27 02:25:38 -04:00
parent e5a987e0ef
commit 9d3b7b49dd
4 changed files with 65 additions and 65 deletions

File diff suppressed because one or more lines are too long

View File

@@ -12,19 +12,19 @@ class Shard {
*/ */
constructor(manager, id) { constructor(manager, id) {
/** /**
* The manager of the spawned shard * Manager that created the shard
* @type {ShardingManager} * @type {ShardingManager}
*/ */
this.manager = manager; this.manager = manager;
/** /**
* The shard ID * ID of the shard
* @type {number} * @type {number}
*/ */
this.id = id; this.id = id;
/** /**
* The process of the shard * Process of the shard
* @type {ChildProcess} * @type {ChildProcess}
*/ */
this.process = childProcess.fork(path.resolve(this.manager.file), [], { this.process = childProcess.fork(path.resolve(this.manager.file), [], {
@@ -56,34 +56,6 @@ class Shard {
}); });
} }
/**
* Evaluates a script on the shard, in the context of the Client.
* @param {string} script JavaScript to run on the shard
* @returns {Promise<*>} Result of the script execution
*/
eval(script) {
if (this._evals.has(script)) return this._evals.get(script);
const promise = new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._eval !== script) return;
this.process.removeListener('message', listener);
this._evals.delete(script);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
this.process.on('message', listener);
this.send({ _eval: script }).catch(err => {
this.process.removeListener('message', listener);
this._evals.delete(script);
reject(err);
});
});
this._evals.set(script, promise);
return promise;
}
/** /**
* Fetches a Client property value of the shard. * Fetches a Client property value of the shard.
* @param {string} prop Name of the Client property to get, using periods for nesting * @param {string} prop Name of the Client property to get, using periods for nesting
@@ -115,6 +87,34 @@ class Shard {
this._fetches.set(prop, promise); this._fetches.set(prop, promise);
return promise; return promise;
} }
/**
* Evaluates a script on the shard, in the context of the Client.
* @param {string} script JavaScript to run on the shard
* @returns {Promise<*>} Result of the script execution
*/
eval(script) {
if (this._evals.has(script)) return this._evals.get(script);
const promise = new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._eval !== script) return;
this.process.removeListener('message', listener);
this._evals.delete(script);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
this.process.on('message', listener);
this.send({ _eval: script }).catch(err => {
this.process.removeListener('message', listener);
this._evals.delete(script);
reject(err);
});
});
this._evals.set(script, promise);
return promise;
}
} }
module.exports = Shard; module.exports = Shard;

View File

@@ -41,27 +41,6 @@ class ShardClientUtil {
}); });
} }
/**
* Evaluates a script on all shards, in the context of the Clients.
* @param {string} script JavaScript to run on each shard
* @returns {Promise<Array>} Results of the script execution
*/
broadcastEval(script) {
return new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._sEval !== script) return;
process.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
process.on('message', listener);
this.send({ _sEval: script }).catch(err => {
process.removeListener('message', listener);
reject(err);
});
});
}
/** /**
* Fetches a Client property value of each shard. * Fetches a Client property value of each shard.
* @param {string} prop Name of the Client property to get, using periods for nesting * @param {string} prop Name of the Client property to get, using periods for nesting
@@ -87,6 +66,27 @@ class ShardClientUtil {
}); });
} }
/**
* Evaluates a script on all shards, in the context of the Clients.
* @param {string} script JavaScript to run on each shard
* @returns {Promise<Array>} Results of the script execution
*/
broadcastEval(script) {
return new Promise((resolve, reject) => {
const listener = message => {
if (!message || message._sEval !== script) return;
process.removeListener('message', listener);
if (!message._error) resolve(message._result); else reject(makeError(message._error));
};
process.on('message', listener);
this.send({ _sEval: script }).catch(err => {
process.removeListener('message', listener);
reject(err);
});
});
}
/** /**
* Handles an IPC message * Handles an IPC message
* @param {*} message Message received * @param {*} message Message received
@@ -94,17 +94,17 @@ class ShardClientUtil {
*/ */
_handleMessage(message) { _handleMessage(message) {
if (!message) return; if (!message) return;
if (message._eval) { if (message._fetchProp) {
const props = message._fetchProp.split('.');
let value = this.client;
for (const prop of props) value = value[prop];
this._respond('fetchProp', { _fetchProp: message._fetchProp, _result: value });
} else if (message._eval) {
try { try {
this._respond('eval', { _eval: message._eval, _result: this.client._eval(message._eval) }); this._respond('eval', { _eval: message._eval, _result: this.client._eval(message._eval) });
} catch (err) { } catch (err) {
this._respond('eval', { _eval: message._eval, _error: err }); this._respond('eval', { _eval: message._eval, _error: err });
} }
} else if (message._fetchProp) {
const props = message._fetchProp.split('.');
let value = this.client;
for (const prop of props) value = value[prop];
this._respond('fetchProp', { _fetchProp: message._fetchProp, _result: value });
} }
} }

View File

@@ -55,21 +55,21 @@ class ShardingManager extends EventEmitter {
this.on('message', (shard, message) => { this.on('message', (shard, message) => {
if (!message) return; if (!message) return;
if (message._sEval) { if (message._sFetchProp) {
this.broadcastEval(message._sEval)
.then(results => shard.send({ _sEval: message._sEval, _result: results }))
.catch(err => shard.send({ _sEval: message._sEval, _error: err }));
} else if (message._sFetchProp) {
this.fetchClientValues(message._sFetchProp) this.fetchClientValues(message._sFetchProp)
.then(results => shard.send({ _sFetchProp: message._sFetchProp, _result: results })) .then(results => shard.send({ _sFetchProp: message._sFetchProp, _result: results }))
.catch(err => shard.send({ _sFetchProp: message._sFetchProp, _error: err })); .catch(err => shard.send({ _sFetchProp: message._sFetchProp, _error: err }));
} else if (message._sEval) {
this.broadcastEval(message._sEval)
.then(results => shard.send({ _sEval: message._sEval, _result: results }))
.catch(err => shard.send({ _sEval: message._sEval, _error: err }));
} }
}); });
} }
/** /**
* Spawns a single shard. * Spawns a single shard.
* @param {number} id The ID of the shard to spawn. THIS IS NOT NEEDED IN ANY NORMAL CASE! * @param {number} id The ID of the shard to spawn. **This is usually not necessary.**
* @returns {Promise<Shard>} * @returns {Promise<Shard>}
*/ */
createShard(id = this.shards.size) { createShard(id = this.shards.size) {