diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js index e7f74b48f..316c150f2 100644 --- a/src/client/voice/VoiceBroadcast.js +++ b/src/client/voice/VoiceBroadcast.js @@ -1,4 +1,4 @@ -const EventEmitter = require('events').EventEmitter; +const VolumeInterface = require('./util/VolumeInterface'); const Prism = require('prism-media'); const OpusEncoders = require('./opus/OpusEngineList'); const Collection = require('../../util/Collection'); @@ -15,7 +15,7 @@ const ffmpegArguments = [ * A voice broadcast can be played across multiple voice connections for improved shared-stream efficiency. * @extends {EventEmitter} */ -class VoiceBroadcast extends EventEmitter { +class VoiceBroadcast extends VolumeInterface { constructor(client) { super(); /** @@ -51,55 +51,12 @@ class VoiceBroadcast extends EventEmitter { return d; } - applyVolume(buffer, volume = this._volume) { - if (volume === 1) return buffer; - - const out = Buffer.alloc(buffer.length); - for (let i = 0; i < buffer.length; i += 2) { - if (i >= buffer.length - 1) break; - const uint = Math.min(32767, Math.max(-32767, Math.floor(volume * buffer.readInt16LE(i)))); - out.writeInt16LE(uint, i); - } - - return out; - } - - /** - * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. - * @param {number} volume The volume that you want to set - */ - setVolume(volume) { - this._volume = volume; - } - - /** - * Set the volume in decibels - * @param {number} db The decibels - */ - setVolumeDecibels(db) { - this.setVolume(Math.pow(10, db / 20)); - } - - /** - * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. - * @param {number} value The value for the volume - */ - setVolumeLogarithmic(value) { - this.setVolume(Math.pow(value, 1.660964)); - } - - /** - * The current volume of the broadcast - * @readonly - * @type {number} - */ - get volume() { - return this._volume; - } - get _playableStream() { - if (!this.currentTranscoder) return null; - return this.currentTranscoder.transcoder.output || this.currentTranscoder.options.stream; + const currentTranscoder = this.currentTranscoder; + if (!currentTranscoder) return null; + const transcoder = currentTranscoder.transcoder; + const options = currentTranscoder.options; + return (transcoder && transcoder.output) || options.stream; } unregisterDispatcher(dispatcher, old) { @@ -115,6 +72,7 @@ class VoiceBroadcast extends EventEmitter { container.delete(dispatcher); if (!container.size) { + this._encoders.get(volume).destroy(); this._dispatchers.delete(volume); this._encoders.delete(volume); } @@ -175,8 +133,7 @@ class VoiceBroadcast extends EventEmitter { * .catch(console.error); */ playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - options.stream = stream; + const options = { seek, volume, passes, stream }; return this._playTranscodable(stream, options); } @@ -202,6 +159,8 @@ class VoiceBroadcast extends EventEmitter { } _playTranscodable(media, options) { + OpusEncoders.guaranteeOpusEngine(); + this.killCurrentTranscoder(); const transcoder = this.prism.transcode({ type: 'ffmpeg', @@ -242,6 +201,8 @@ class VoiceBroadcast extends EventEmitter { * @returns {VoiceBroadcast} */ playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); + this.killCurrentTranscoder(); const options = { seek, volume, passes, stream }; this.currentTranscoder = { options }; @@ -249,6 +210,20 @@ class VoiceBroadcast extends EventEmitter { return this; } + /** + * Plays an Opus encoded stream at 48KHz. + * Note that inline volume is not compatible with this method. + * @param {ReadableStream} stream The Opus audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playOpusStream(stream, { seek = 0, passes = 1 } = {}) { + const options = { seek, passes, stream }; + this.currentTranscoder = { options, opus: true }; + stream.once('readable', () => this._startPlaying()); + return this; + } + /** * Play an arbitrary input that can be [handled by ffmpeg](https://ffmpeg.org/ffmpeg-protocols.html#Description) * @param {string} input the arbitrary input @@ -256,8 +231,10 @@ class VoiceBroadcast extends EventEmitter { * @returns {VoiceBroadcast} */ playArbitraryInput(input, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - return this.player.playUnknownStream(input, options); + this.guaranteeOpusEngine(); + + const options = { seek, volume, passes, input }; + return this._playTranscodable(input, options); } /** @@ -284,6 +261,10 @@ class VoiceBroadcast extends EventEmitter { } } + guaranteeOpusEngine() { + if (!this.opusEncoder) throw new Error('Couldn\'t find an Opus engine.'); + } + _startPlaying() { if (this.tickInterval) clearInterval(this.tickInterval); // this.tickInterval = this.client.setInterval(this.tick.bind(this), 20); @@ -301,9 +282,9 @@ class VoiceBroadcast extends EventEmitter { setTimeout(() => this.tick(), 20); return; } - const stream = this._playableStream; - const bufferLength = 1920 * 2; - let buffer = stream.read(bufferLength); + + const opus = this.currentTranscoder.opus; + const buffer = this.readStreamBuffer(); if (!buffer) { this._missed++; @@ -318,12 +299,6 @@ class VoiceBroadcast extends EventEmitter { this._missed = 0; - if (buffer.length !== bufferLength) { - const newBuffer = Buffer.alloc(bufferLength).fill(0); - buffer.copy(newBuffer); - buffer = newBuffer; - } - let packetMatrix = {}; const getOpusPacket = (volume) => { @@ -336,10 +311,13 @@ class VoiceBroadcast extends EventEmitter { }; for (const dispatcher of this.dispatchers) { + if (opus) { + dispatcher.processPacket(buffer); + continue; + } + const volume = dispatcher.volume; - setImmediate(() => { - dispatcher.process(buffer, true, getOpusPacket(volume)); - }); + dispatcher.processPacket(getOpusPacket(volume)); } const next = 20 + (this._startTime + this._pausedTime + (this._count * 20) - Date.now()); @@ -347,6 +325,22 @@ class VoiceBroadcast extends EventEmitter { setTimeout(() => this.tick(), next); } + readStreamBuffer() { + const opus = this.currentTranscoder.opus; + const bufferLength = (opus ? 80 : 1920) * 2; + let buffer = this._playableStream.read(bufferLength); + if (opus) return buffer; + if (!buffer) return null; + + if (buffer.length !== bufferLength) { + const newBuffer = Buffer.alloc(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + return buffer; + } + /** * Stop the current stream from playing without unsubscribing dispatchers. */ diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index f90fec911..771312a81 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -142,6 +142,7 @@ class VoiceConnection extends EventEmitter { self_deaf: false, }, }); + this.player.destroy(); /** * Emitted when the voice connection disconnects * @event VoiceConnection#disconnect @@ -236,8 +237,7 @@ class VoiceConnection extends EventEmitter { * }) * .catch(console.error); */ - playFile(file, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; + playFile(file, options) { return this.player.playUnknownStream(`file:${file}`, options); } @@ -247,8 +247,7 @@ class VoiceConnection extends EventEmitter { * @param {StreamOptions} [options] Options for playing the stream * @returns {StreamDispatcher} */ - playArbitraryInput(input, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; + playArbitraryInput(input, options) { return this.player.playUnknownStream(input, options); } @@ -268,22 +267,31 @@ class VoiceConnection extends EventEmitter { * }) * .catch(console.error); */ - playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; + playStream(stream, options) { return this.player.playUnknownStream(stream, options); } /** * Plays a stream of 16-bit signed stereo PCM at 48KHz. - * @param {ReadableStream} stream The audio stream to play. + * @param {ReadableStream} stream The audio stream to play * @param {StreamOptions} [options] Options for playing the stream * @returns {StreamDispatcher} */ - playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; + playConvertedStream(stream, options) { return this.player.playPCMStream(stream, options); } + /** + * Plays an Opus encoded stream at 48KHz. + * Note that inline volume is not compatible with this method. + * @param {ReadableStream} stream The Opus audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playOpusStream(stream, options) { + return this.player.playOpusStream(stream, options); + } + /** * Plays a voice broadcast * @param {VoiceBroadcast} broadcast the broadcast to play diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index 0fa0f2368..71af21bb0 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -1,4 +1,4 @@ -const EventEmitter = require('events').EventEmitter; +const VolumeInterface = require('../util/VolumeInterface'); const NaCl = require('tweetnacl'); const VoiceBroadcast = require('../VoiceBroadcast'); @@ -16,9 +16,9 @@ nonce.fill(0); * ``` * @extends {EventEmitter} */ -class StreamDispatcher extends EventEmitter { +class StreamDispatcher extends VolumeInterface { constructor(player, stream, streamOptions) { - super(); + super(streamOptions); /** * The Audio Player that controls this dispatcher * @type {AudioPlayer} @@ -31,7 +31,6 @@ class StreamDispatcher extends EventEmitter { this.stream = stream; if (!(this.stream instanceof VoiceBroadcast)) this.startStreaming(); this.streamOptions = streamOptions; - this.streamOptions.volume = this.streamOptions.volume || 0; const data = this.streamingData; data.length = 20; @@ -48,7 +47,7 @@ class StreamDispatcher extends EventEmitter { */ this.destroyed = false; - this.setVolume(streamOptions.volume || 1); + this._opus = streamOptions.opus; } /** @@ -86,46 +85,6 @@ class StreamDispatcher extends EventEmitter { return this.time + this.streamingData.pausedTime; } - /** - * The volume of the stream, relative to the stream's input volume - * @type {number} - * @readonly - */ - get volume() { - return this.streamOptions.volume; - } - - /** - * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. - * @param {number} volume The volume that you want to set - */ - setVolume(volume) { - /** - * Emitted when the volume of this dispatcher changes - * @param {number} oldVolume the old volume - * @param {number} newVolume the new volume - * @event StreamDispatcher#volumeChange - */ - this.emit('volumeChange', this.streamOptions.volume, volume); - this.streamOptions.volume = volume; - } - - /** - * Set the volume in decibels - * @param {number} db The decibels - */ - setVolumeDecibels(db) { - this.setVolume(Math.pow(10, db / 20)); - } - - /** - * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. - * @param {number} value The value for the volume - */ - setVolumeLogarithmic(value) { - this.setVolume(Math.pow(value, 1.660964)); - } - /** * Stops sending voice packets to the voice connection (stream may still progress however) */ @@ -196,20 +155,7 @@ class StreamDispatcher extends EventEmitter { return packetBuffer; } - applyVolume(buffer) { - if (this.volume === 1) return buffer; - - const out = Buffer.alloc(buffer.length); - for (let i = 0; i < buffer.length; i += 2) { - if (i >= buffer.length - 1) break; - const uint = Math.min(32767, Math.max(-32767, Math.floor(this.volume * buffer.readInt16LE(i)))); - out.writeInt16LE(uint, i); - } - - return out; - } - - process(buffer, controlled, packet) { + processPacket(packet) { try { if (this.destroyed) { this.setSpeaking(false); @@ -218,7 +164,38 @@ class StreamDispatcher extends EventEmitter { const data = this.streamingData; - if (data.missed >= 5 && !controlled) { + if (this.paused) { + this.setSpeaking(false); + data.pausedTime = data.length * 10; + return; + } + + if (!packet) { + data.missed++; + data.pausedTime += data.length * 10; + return; + } + + this.started(); + this.missed = 0; + + this.stepStreamingData(); + this.sendBuffer(null, data.sequence, data.timestamp, packet); + } catch (e) { + this.destroy('error', e); + } + } + + process() { + try { + if (this.destroyed) { + this.setSpeaking(false); + return; + } + + const data = this.streamingData; + + if (data.missed >= 5) { this.destroy('end', 'Stream is not generating quickly enough.'); return; } @@ -227,61 +204,30 @@ class StreamDispatcher extends EventEmitter { this.setSpeaking(false); // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; data.pausedTime += data.length * 10; - // if buffer is provided we are assuming a master process is controlling the dispatcher - if (!buffer) this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } - if (!buffer && controlled) { + this.started(); + + const buffer = this.readStreamBuffer(); + if (!buffer) { data.missed++; data.pausedTime += data.length * 10; + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } - if (!data.startTime) { - /** - * Emitted once the dispatcher starts streaming - * @event StreamDispatcher#start - */ - this.emit('start'); - data.startTime = Date.now(); - } - - if (packet) { - data.count++; - data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; - data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - this.sendBuffer(null, data.sequence, data.timestamp, packet); - return; - } - - const bufferLength = 1920 * data.channels; - if (!controlled) { - buffer = this.stream.read(bufferLength); - if (!buffer) { - data.missed++; - data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); - return; - } - } - data.missed = 0; - if (buffer.length !== bufferLength) { - const newBuffer = Buffer.alloc(bufferLength).fill(0); - buffer.copy(newBuffer); - buffer = newBuffer; + this.stepStreamingData(); + + if (this._opus) { + this.sendBuffer(null, data.sequence, data.timestamp, buffer); + } else { + this.sendBuffer(buffer, data.sequence, data.timestamp); } - buffer = this.applyVolume(buffer); - - data.count++; - data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; - data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - this.sendBuffer(buffer, data.sequence, data.timestamp); - - if (controlled) return; const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime); } catch (e) { @@ -289,6 +235,43 @@ class StreamDispatcher extends EventEmitter { } } + readStreamBuffer() { + const data = this.streamingData; + const bufferLength = (this._opus ? 80 : 1920) * data.channels; + let buffer = this.stream.read(bufferLength); + if (this._opus) return buffer; + if (!buffer) return null; + + if (buffer.length !== bufferLength) { + const newBuffer = Buffer.alloc(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + buffer = this.applyVolume(buffer); + return buffer; + } + + started() { + const data = this.streamingData; + + if (!data.startTime) { + /** + * Emitted once the dispatcher starts streaming + * @event StreamDispatcher#start + */ + this.emit('start'); + data.startTime = Date.now(); + } + } + + stepStreamingData() { + const data = this.streamingData; + data.count++; + data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; + data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; + } + destroy(type, reason) { if (this.destroyed) return; this.destroyed = true; diff --git a/src/client/voice/opus/BaseOpusEngine.js b/src/client/voice/opus/BaseOpusEngine.js index 6c3ba6e34..47c88c7c6 100644 --- a/src/client/voice/opus/BaseOpusEngine.js +++ b/src/client/voice/opus/BaseOpusEngine.js @@ -10,6 +10,10 @@ class BaseOpus { decode(buffer) { return buffer; } + + destroy() { + return; + } } module.exports = BaseOpus; diff --git a/src/client/voice/opus/OpusEngineList.js b/src/client/voice/opus/OpusEngineList.js index ffd512a64..bc9fe6102 100644 --- a/src/client/voice/opus/OpusEngineList.js +++ b/src/client/voice/opus/OpusEngineList.js @@ -20,5 +20,9 @@ exports.fetch = () => { const fetched = fetch(encoder); if (fetched) return fetched; } - throw new Error('Couldn\'t find an Opus engine.'); + return null; +}; + +exports.guaranteeOpusEngine = () => { + if (!this.opusEncoder) throw new Error('Couldn\'t find an Opus engine.'); }; diff --git a/src/client/voice/opus/OpusScriptEngine.js b/src/client/voice/opus/OpusScriptEngine.js index 33b4ff5a6..c902e790c 100644 --- a/src/client/voice/opus/OpusScriptEngine.js +++ b/src/client/voice/opus/OpusScriptEngine.js @@ -2,7 +2,7 @@ const OpusEngine = require('./BaseOpusEngine'); let OpusScript; -class NodeOpusEngine extends OpusEngine { +class OpusScriptEngine extends OpusEngine { constructor(player) { super(player); try { @@ -22,6 +22,11 @@ class NodeOpusEngine extends OpusEngine { super.decode(buffer); return this.encoder.decode(buffer); } + + destroy() { + super.destroy(); + this.encoder.delete(); + } } -module.exports = NodeOpusEngine; +module.exports = OpusScriptEngine; diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index f67969f95..585af040a 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -50,6 +50,10 @@ class AudioPlayer extends EventEmitter { return this.streams.last().transcoder; } + destroy() { + this.opusEncoder.destroy(); + } + destroyStream(stream) { const data = this.streams.get(stream); if (!data) return; @@ -69,6 +73,7 @@ class AudioPlayer extends EventEmitter { } playUnknownStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); const options = { seek, volume, passes }; const transcoder = this.prism.transcode({ type: 'ffmpeg', @@ -85,28 +90,39 @@ class AudioPlayer extends EventEmitter { } playPCMStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); const options = { seek, volume, passes }; this.destroyAllStreams(stream); - const dispatcher = new StreamDispatcher(this, stream, options); - dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + const dispatcher = this.createDispatcher(stream, options); if (!this.streams.has(stream)) this.streams.set(stream, { dispatcher, input: stream }); this.streams.get(stream).dispatcher = dispatcher; - dispatcher.on('end', () => this.destroyStream(stream)); - dispatcher.on('error', () => this.destroyStream(stream)); + return dispatcher; + } + + playOpusStream(stream, { seek = 0, passes = 1 } = {}) { + const options = { seek, passes, opus: true }; + this.destroyAllStreams(stream); + const dispatcher = this.createDispatcher(stream, options); + this.streams.set(stream, { dispatcher, input: stream }); return dispatcher; } playBroadcast(broadcast, { volume = 1, passes = 1 } = {}) { const options = { volume, passes }; this.destroyAllStreams(); - const dispatcher = new StreamDispatcher(this, broadcast, options); - dispatcher.on('end', () => this.destroyStream(broadcast)); - dispatcher.on('error', () => this.destroyStream(broadcast)); - dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + const dispatcher = this.createDispatcher(broadcast, options); this.streams.set(broadcast, { dispatcher, input: broadcast }); broadcast.registerDispatcher(dispatcher); return dispatcher; } + + createDispatcher(stream, options) { + const dispatcher = new StreamDispatcher(this, stream, options); + dispatcher.on('end', () => this.destroyStream(stream)); + dispatcher.on('error', () => this.destroyStream(stream)); + dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + return dispatcher; + } } module.exports = AudioPlayer; diff --git a/src/client/voice/receiver/VoiceReceiver.js b/src/client/voice/receiver/VoiceReceiver.js index 5aec6d35b..0f44d0850 100644 --- a/src/client/voice/receiver/VoiceReceiver.js +++ b/src/client/voice/receiver/VoiceReceiver.js @@ -84,7 +84,8 @@ class VoiceReceiver extends EventEmitter { stream._push(null); this.opusStreams.delete(id); } - for (const [id] of this.opusEncoders) { + for (const [id, encoder] of this.opusEncoders) { + encoder.destroy(); this.opusEncoders.delete(id); } this.destroyed = true; diff --git a/src/client/voice/util/VolumeInterface.js b/src/client/voice/util/VolumeInterface.js new file mode 100644 index 000000000..40b7bb339 --- /dev/null +++ b/src/client/voice/util/VolumeInterface.js @@ -0,0 +1,57 @@ +const EventEmitter = require('events'); + +class VolumeInterface extends EventEmitter { + constructor({ volume = 0 } = {}) { + super(); + this.setVolume(volume || 1); + } + + applyVolume(buffer, volume) { + volume = volume || this._volume; + if (volume === 1) return buffer; + + const out = new Buffer(buffer.length); + for (let i = 0; i < buffer.length; i += 2) { + if (i >= buffer.length - 1) break; + const uint = Math.min(32767, Math.max(-32767, Math.floor(volume * buffer.readInt16LE(i)))); + out.writeInt16LE(uint, i); + } + + return out; + } + + /** + * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. + * @param {number} volume The volume that you want to set + */ + setVolume(volume) { + this._volume = volume; + } + + /** + * Set the volume in decibels + * @param {number} db The decibels + */ + setVolumeDecibels(db) { + this.setVolume(Math.pow(10, db / 20)); + } + + /** + * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. + * @param {number} value The value for the volume + */ + setVolumeLogarithmic(value) { + this.setVolume(Math.pow(value, 1.660964)); + } + + /** + * The current volume of the broadcast + * @readonly + * @type {number} + */ + get volume() { + return this._volume; + } +} + +module.exports = VolumeInterface;