Reorganise VoiceBroadcast dispatchers and also add new events

This commit is contained in:
Amish Shah
2016-12-30 18:21:22 +00:00
parent d13c48bafa
commit e2753136a4

View File

@@ -23,7 +23,7 @@ class VoiceBroadcast extends EventEmitter {
* @type {Client} * @type {Client}
*/ */
this.client = client; this.client = client;
this.dispatchers = new Collection(); this._dispatchers = new Collection();
/** /**
* The audio transcoder that this broadcast uses * The audio transcoder that this broadcast uses
* @type {Prism} * @type {Prism}
@@ -43,6 +43,18 @@ class VoiceBroadcast extends EventEmitter {
this._volume = 1; this._volume = 1;
} }
/**
* An array of subscribed dispatchers
* @type {StreamDispatcher[]}
*/
get dispatchers() {
let d = [];
for (const container of this._dispatchers.values()) {
d = d.concat(Array.from(container.values()));
}
return d;
}
applyVolume(buffer, volume) { applyVolume(buffer, volume) {
volume = volume || this._volume; volume = volume || this._volume;
if (volume === 1) return buffer; if (volume === 1) return buffer;
@@ -96,30 +108,42 @@ class VoiceBroadcast extends EventEmitter {
} }
unregisterDispatcher(dispatcher, old) { unregisterDispatcher(dispatcher, old) {
let container = this.dispatchers.get(old || dispatcher.volume); /**
* Emitted whenever a Stream Dispatcher unsubscribes from the broadcast
* @event VoiceBroadcast#unsubscribe
* @param {dispatcher} the dispatcher that unsubscribed
*/
this.emit('unsubscribe', dispatcher);
let container = this._dispatchers.get(old || dispatcher.volume);
if (container) { if (container) {
if (container.delete(dispatcher)) return; if (container.delete(dispatcher)) return;
} }
for (container of this.dispatchers.values()) { for (container of this._dispatchers.values()) {
container.delete(dispatcher); container.delete(dispatcher);
} }
} }
registerDispatcher(dispatcher) { registerDispatcher(dispatcher) {
if (!this.dispatchers.has(dispatcher.volume)) { if (!this._dispatchers.has(dispatcher.volume)) {
this.dispatchers.set(dispatcher.volume, new Set()); this._dispatchers.set(dispatcher.volume, new Set());
} }
const container = this.dispatchers.get(dispatcher.volume); const container = this._dispatchers.get(dispatcher.volume);
if (!container.has(dispatcher)) { if (!container.has(dispatcher)) {
container.add(dispatcher); container.add(dispatcher);
dispatcher.once('end', () => this.unregisterDispatcher(dispatcher)); dispatcher.once('end', () => this.unregisterDispatcher(dispatcher));
dispatcher.on('volumeChange', (o, n) => { dispatcher.on('volumeChange', (o, n) => {
this.unregisterDispatcher(dispatcher, o); this.unregisterDispatcher(dispatcher, o);
if (!this.dispatchers.has(n)) { if (!this._dispatchers.has(n)) {
this.dispatchers.set(n, new Set()); this._dispatchers.set(n, new Set());
} }
this.dispatchers.get(n).add(dispatcher); this._dispatchers.get(n).add(dispatcher);
}); });
/**
* Emitted whenever a stream dispatcher subscribes to the broadcast
* @event VoiceBroadcast#subscribe
* @param {StreamDispatcher} dispatcher the subscribed dispatcher
*/
this.emit('subscribe', dispatcher);
} }
} }
@@ -197,7 +221,14 @@ class VoiceBroadcast extends EventEmitter {
*/ */
else this.emit('warn', e); else this.emit('warn', e);
}); });
transcoder.once('end', () => this.killCurrentTranscoder()); /**
* Emitted once the broadcast (the audio stream) ends
* @event VoiceBroadcast#end
*/
transcoder.once('end', () => {
this.emit('end');
this.killCurrentTranscoder();
});
this.currentTranscoder = { this.currentTranscoder = {
transcoder, transcoder,
options, options,
@@ -214,6 +245,7 @@ class VoiceBroadcast extends EventEmitter {
*/ */
playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) {
this.killCurrentTranscoder(); this.killCurrentTranscoder();
stream.once('end', () => this.emit('end'));
const options = { seek, volume, passes, stream }; const options = { seek, volume, passes, stream };
this.currentTranscoder = { options }; this.currentTranscoder = { options };
stream.once('readable', () => this._startPlaying()); stream.once('readable', () => this._startPlaying());
@@ -224,7 +256,7 @@ class VoiceBroadcast extends EventEmitter {
* Pauses the entire broadcast - all dispatchers also pause * Pauses the entire broadcast - all dispatchers also pause
*/ */
pause() { pause() {
for (const container of this.dispatchers.values()) { for (const container of this._dispatchers.values()) {
for (const dispatcher of container.values()) { for (const dispatcher of container.values()) {
dispatcher.pause(); dispatcher.pause();
} }
@@ -236,7 +268,7 @@ class VoiceBroadcast extends EventEmitter {
* Resumes the entire broadcast - all dispatchers also resume * Resumes the entire broadcast - all dispatchers also resume
*/ */
resume() { resume() {
for (const container of this.dispatchers.values()) { for (const container of this._dispatchers.values()) {
for (const dispatcher of container.values()) { for (const dispatcher of container.values()) {
dispatcher.resume(); dispatcher.resume();
} }
@@ -265,7 +297,7 @@ class VoiceBroadcast extends EventEmitter {
buffer = this.applyVolume(buffer); buffer = this.applyVolume(buffer);
for (const x of this.dispatchers.entries()) { for (const x of this._dispatchers.entries()) {
setImmediate(() => { setImmediate(() => {
const [volume, container] = x; const [volume, container] = x;
const opusPacket = this.opusEncoder.encode(this.applyVolume(buffer, volume)); const opusPacket = this.opusEncoder.encode(this.applyVolume(buffer, volume));
@@ -281,7 +313,7 @@ class VoiceBroadcast extends EventEmitter {
*/ */
end() { end() {
this.killCurrentTranscoder(); this.killCurrentTranscoder();
for (const container of this.dispatchers.values()) { for (const container of this._dispatchers.values()) {
for (const dispatcher of container.values()) { for (const dispatcher of container.values()) {
dispatcher.destroy('end', 'broadcast ended'); dispatcher.destroy('end', 'broadcast ended');
} }