diff --git a/src/sharding/Shard.js b/src/sharding/Shard.js index af71d23b5..57be609e1 100644 --- a/src/sharding/Shard.js +++ b/src/sharding/Shard.js @@ -7,21 +7,41 @@ class Shard { this.manager = manager; this.id = id; this.process = childProcess.fork(path.resolve(this.manager.file), [id, this.manager.shards.size], { silent: true }); + this.waitingForResponse = new Map(); this.process.on('message', m => { if (m && m.type && m.id) { - + if (this.waitingForResponse.get(m.id)) { + const resp = this.waitingForResponse.get(m.id); + resp.resolve(m.data); + this.waitingForResponse.delete(m.id); + } else { + const reply = data => { + this.send(m.id); + } + } } }); } send(type, timeout = 60000, data = {}) { + const id = crypto.randomBytes(16).toString('hex'); + this._send(id, type, timeout, data); + } + + _send(id, type, timeout, data) { return new Promise((resolve, reject) => { - const id = crypto.randomBytes(16).toString('hex'); this.process.send({ type, id, data, }); + this.waitingForResponse.set(id, { + resolve, + }); + setTimeout(() => { + reject(new Error('did not receive response')); + this.waitingForResponse.delete(id); + }, timeout); }); } } diff --git a/src/sharding/ShardListener.js b/src/sharding/ShardListener.js new file mode 100644 index 000000000..7ee79e6c1 --- /dev/null +++ b/src/sharding/ShardListener.js @@ -0,0 +1,28 @@ +const crypto = require('crypto'); + +class ShardListener { + constructor() { + this.waitingForResponse = new Map(); + this.process = process; + } + + send(type, timeout = 60000, data = {}) { + return new Promise((resolve, reject) => { + const id = crypto.randomBytes(16).toString('hex'); + this.process.send({ + type, + id, + data, + }); + this.waitingForResponse.set(id, { + resolve, + }); + setTimeout(() => { + reject(new Error('did not receive response')); + this.waitingForResponse.delete(id); + }, timeout); + }); + } +} + +module.exports = ShardListener;