From bdf8955098e203773896947c79c6afe0cb428b22 Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Thu, 26 Oct 2017 21:00:53 +0100 Subject: [PATCH] "yeah we need voice broadcasts cause we make big big music bots" no stop --- src/client/voice/VoiceBroadcast.js | 348 +++--------------- src/client/voice/VoiceConnection.js | 2 + .../voice/dispatcher/BroadcastDispatcher.js | 38 ++ .../voice/dispatcher/StreamDispatcher.js | 13 +- src/client/voice/player/AudioPlayer.js | 67 +--- src/client/voice/player/BasePlayer.js | 76 ++++ .../voice/player/BroadcastAudioPlayer.js | 27 ++ 7 files changed, 200 insertions(+), 371 deletions(-) create mode 100644 src/client/voice/dispatcher/BroadcastDispatcher.js create mode 100644 src/client/voice/player/BasePlayer.js create mode 100644 src/client/voice/player/BroadcastAudioPlayer.js diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js index b9a6f5117..dadbbc5ef 100644 --- a/src/client/voice/VoiceBroadcast.js +++ b/src/client/voice/VoiceBroadcast.js @@ -1,14 +1,5 @@ -class VolumeInterface {} -const OpusEncoders = require('./opus/OpusEngineList'); -const Collection = require('../../util/Collection'); - -const ffmpegArguments = [ - '-analyzeduration', '0', - '-loglevel', '0', - '-f', 's16le', - '-ar', '48000', - '-ac', '2', -]; +const EventEmitter = require('events'); +const BroadcastAudioPlayer = require('./player/BroadcastAudioPlayer'); /** * A voice broadcast can be played across multiple voice connections for improved shared-stream efficiency. @@ -24,7 +15,7 @@ const ffmpegArguments = [ * ``` * @implements {VolumeInterface} */ -class VoiceBroadcast extends VolumeInterface { +class VoiceBroadcast extends EventEmitter { constructor(client) { super(); /** @@ -32,119 +23,8 @@ class VoiceBroadcast extends VolumeInterface { * @type {Client} */ this.client = client; - this._dispatchers = new Collection(); - this._encoders = new Collection(); - /** - * Whether playing is paused - * @type {boolean} - */ - this.paused = false; - /** - * The current audio transcoder that is being used - * @type {Object} - */ - this.currentTranscoder = null; - this.tickInterval = null; - this._volume = 1; - } - - /** - * An array of subscribed dispatchers - * @type {StreamDispatcher[]} - * @readonly - */ - get dispatchers() { - let d = []; - for (const container of this._dispatchers.values()) { - d = d.concat(Array.from(container.values())); - } - return d; - } - - get _playableStream() { - const currentTranscoder = this.currentTranscoder; - if (!currentTranscoder) return null; - const transcoder = currentTranscoder.transcoder; - const options = currentTranscoder.options; - return (transcoder && transcoder.output) || options.stream; - } - - unregisterDispatcher(dispatcher, old) { - const volume = old || dispatcher.volume; - - /** - * Emitted whenever a stream dispatcher unsubscribes from the broadcast. - * @event VoiceBroadcast#unsubscribe - * @param {StreamDispatcher} dispatcher The unsubscribed dispatcher - */ - this.emit('unsubscribe', dispatcher); - for (const container of this._dispatchers.values()) { - container.delete(dispatcher); - - if (!container.size) { - this._encoders.get(volume).destroy(); - this._dispatchers.delete(volume); - this._encoders.delete(volume); - } - } - } - - registerDispatcher(dispatcher) { - if (!this._dispatchers.has(dispatcher.volume)) { - this._dispatchers.set(dispatcher.volume, new Set()); - this._encoders.set(dispatcher.volume, OpusEncoders.fetch()); - } - const container = this._dispatchers.get(dispatcher.volume); - if (!container.has(dispatcher)) { - container.add(dispatcher); - dispatcher.once('end', () => this.unregisterDispatcher(dispatcher)); - dispatcher.on('volumeChange', (o, n) => { - this.unregisterDispatcher(dispatcher, o); - if (!this._dispatchers.has(n)) { - this._dispatchers.set(n, new Set()); - this._encoders.set(n, OpusEncoders.fetch()); - } - this._dispatchers.get(n).add(dispatcher); - }); - /** - * Emitted whenever a stream dispatcher subscribes to the broadcast. - * @event VoiceBroadcast#subscribe - * @param {StreamDispatcher} dispatcher The subscribed dispatcher - */ - this.emit('subscribe', dispatcher); - } - } - - killCurrentTranscoder() { - if (this.currentTranscoder) { - if (this.currentTranscoder.transcoder) this.currentTranscoder.transcoder.kill(); - this.currentTranscoder = null; - this.emit('end'); - } - } - - /** - * Plays any audio stream across the broadcast. - * @param {ReadableStream} stream The audio stream to play - * @param {StreamOptions} [options] Options for playing the stream - * @returns {VoiceBroadcast} - * @example - * // Play streams using ytdl-core - * const ytdl = require('ytdl-core'); - * const streamOptions = { seek: 0, volume: 1 }; - * const broadcast = client.createVoiceBroadcast(); - * - * voiceChannel.join() - * .then(connection => { - * const stream = ytdl('https://www.youtube.com/watch?v=XAWgeLF9EVQ', { filter : 'audioonly' }); - * broadcast.playStream(stream); - * const dispatcher = connection.playBroadcast(broadcast); - * }) - * .catch(console.error); - */ - playStream(stream, options = {}) { - this.setVolume(options.volume || 1); - return this._playTranscodable(stream, options); + this.dispatchers = []; + this.player = new BroadcastAudioPlayer(this); } /** @@ -154,66 +34,54 @@ class VoiceBroadcast extends VolumeInterface { * @returns {StreamDispatcher} * @example * // Play files natively - * const broadcast = client.createVoiceBroadcast(); - * * voiceChannel.join() * .then(connection => { - * broadcast.playFile('C:/Users/Discord/Desktop/music.mp3'); - * const dispatcher = connection.playBroadcast(broadcast); + * const dispatcher = connection.playFile('C:/Users/Discord/Desktop/music.mp3'); * }) * .catch(console.error); */ - playFile(file, options = {}) { - this.setVolume(options.volume || 1); - return this._playTranscodable(`file:${file}`, options); + playFile(file, options) { + return this.player.playUnknownStream(`file:${file}`, options); } - _playTranscodable(media, options) { - this.killCurrentTranscoder(); - const transcoder = this.prism.transcode({ - type: 'ffmpeg', - media, - ffmpegArguments: ffmpegArguments.concat(['-ss', String(options.seek || 0)]), - }); - /** - * Emitted whenever an error occurs. - * @event VoiceBroadcast#error - * @param {Error} error The error that occurred - */ - transcoder.once('error', e => { - if (this.listenerCount('error') > 0) this.emit('error', e); - /** - * Emitted whenever the VoiceBroadcast has any warnings. - * @event VoiceBroadcast#warn - * @param {string|Error} warning The warning that was raised - */ - else this.emit('warn', e); - }); - /** - * Emitted once the broadcast (the audio stream) ends. - * @event VoiceBroadcast#end - */ - transcoder.once('end', () => this.killCurrentTranscoder()); - this.currentTranscoder = { - transcoder, - options, - }; - transcoder.output.once('readable', () => this._startPlaying()); - return this; + /** + * Plays an arbitrary input that can be [handled by ffmpeg](https://ffmpeg.org/ffmpeg-protocols.html#Description) + * @param {string} input the arbitrary input + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playArbitraryInput(input, options) { + return this.player.playUnknownStream(input, options); + } + + /** + * Plays and converts an audio stream in the voice connection. + * @param {ReadableStream} stream The audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + * @example + * // Play streams using ytdl-core + * const ytdl = require('ytdl-core'); + * const streamOptions = { seek: 0, volume: 1 }; + * voiceChannel.join() + * .then(connection => { + * const stream = ytdl('https://www.youtube.com/watch?v=XAWgeLF9EVQ', { filter : 'audioonly' }); + * const dispatcher = connection.playStream(stream, streamOptions); + * }) + * .catch(console.error); + */ + playStream(stream, options) { + return this.player.playUnknownStream(stream, options); } /** * Plays a stream of 16-bit signed stereo PCM. * @param {ReadableStream} stream The audio stream to play * @param {StreamOptions} [options] Options for playing the stream - * @returns {VoiceBroadcast} + * @returns {StreamDispatcher} */ - playConvertedStream(stream, options = {}) { - this.killCurrentTranscoder(); - this.setVolume(options.volume || 1); - this.currentTranscoder = { options: { stream } }; - stream.once('readable', () => this._startPlaying()); - return this; + playConvertedStream(stream, options) { + return this.player.playPCMStream(stream, options); } /** @@ -223,142 +91,8 @@ class VoiceBroadcast extends VolumeInterface { * @param {StreamOptions} [options] Options for playing the stream * @returns {StreamDispatcher} */ - playOpusStream(stream) { - this.currentTranscoder = { options: { stream }, opus: true }; - stream.once('readable', () => this._startPlaying()); - return this; - } - - /** - * Plays an arbitrary input that can be [handled by ffmpeg](https://ffmpeg.org/ffmpeg-protocols.html#Description) - * @param {string} input The arbitrary input - * @param {StreamOptions} [options] Options for playing the stream - * @returns {VoiceBroadcast} - */ - playArbitraryInput(input, options = {}) { - this.setVolume(options.volume || 1); - options.input = input; - return this._playTranscodable(input, options); - } - - /** - * Pauses the entire broadcast - all dispatchers are also paused. - */ - pause() { - this.paused = true; - for (const container of this._dispatchers.values()) { - for (const dispatcher of container.values()) { - dispatcher.pause(); - } - } - } - - /** - * Resumes the entire broadcast - all dispatchers are also resumed. - */ - resume() { - this.paused = false; - for (const container of this._dispatchers.values()) { - for (const dispatcher of container.values()) { - dispatcher.resume(); - } - } - } - - _startPlaying() { - if (this.tickInterval) clearInterval(this.tickInterval); - // Old code? - // this.tickInterval = this.client.setInterval(this.tick.bind(this), 20); - this._startTime = Date.now(); - this._count = 0; - this._pausedTime = 0; - this._missed = 0; - this.tick(); - } - - tick() { - if (!this._playableStream) return; - if (this.paused) { - this._pausedTime += 20; - setTimeout(() => this.tick(), 20); - return; - } - - const opus = this.currentTranscoder.opus; - const buffer = this.readStreamBuffer(); - - if (!buffer) { - this._missed++; - if (this._missed < 5) { - this._pausedTime += 200; - setTimeout(() => this.tick(), 200); - } else { - this.killCurrentTranscoder(); - } - return; - } - - this._missed = 0; - - let packetMatrix = {}; - - const getOpusPacket = volume => { - if (packetMatrix[volume]) return packetMatrix[volume]; - - const opusEncoder = this._encoders.get(volume); - const opusPacket = opusEncoder.encode(this.applyVolume(buffer, this._volume * volume)); - packetMatrix[volume] = opusPacket; - return opusPacket; - }; - - for (const dispatcher of this.dispatchers) { - if (opus) { - dispatcher.processPacket(buffer); - continue; - } - - const volume = dispatcher.volume; - dispatcher.processPacket(getOpusPacket(volume)); - } - - const next = 20 + (this._startTime + this._pausedTime + (this._count * 20) - Date.now()); - this._count++; - setTimeout(() => this.tick(), next); - } - - readStreamBuffer() { - const opus = this.currentTranscoder.opus; - const bufferLength = (opus ? 80 : 1920) * 2; - let buffer = this._playableStream.read(bufferLength); - if (opus) return buffer; - if (!buffer) return null; - - if (buffer.length !== bufferLength) { - const newBuffer = Buffer.alloc(bufferLength).fill(0); - buffer.copy(newBuffer); - buffer = newBuffer; - } - - return buffer; - } - - /** - * Stops the current stream from playing without unsubscribing dispatchers. - */ - end() { - this.killCurrentTranscoder(); - } - - /** - * Ends the current broadcast, all subscribed dispatchers will also end. - */ - destroy() { - this.end(); - for (const container of this._dispatchers.values()) { - for (const dispatcher of container.values()) { - dispatcher.destroy('end', 'broadcast ended'); - } - } + playOpusStream(stream, options) { + return this.player.playOpusStream(stream, options); } } diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index 4fca12952..8cea545f4 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -89,6 +89,8 @@ class VoiceConnection extends EventEmitter { this.emit('warn', e); }); + this.once('closing', () => this.player.destroy()); + /** * Map SSRC to speaking values * @type {Map} diff --git a/src/client/voice/dispatcher/BroadcastDispatcher.js b/src/client/voice/dispatcher/BroadcastDispatcher.js new file mode 100644 index 000000000..902935cc4 --- /dev/null +++ b/src/client/voice/dispatcher/BroadcastDispatcher.js @@ -0,0 +1,38 @@ +const Collection = require('../../../util/Collection'); +const StreamDispatcher = require('./StreamDispatcher'); + +/** + * The class that sends voice packet data to the voice connection. + * @implements {VolumeInterface} + */ +class BroadcastDispatcher extends StreamDispatcher { + constructor(player, options, streams) { + super(player, options, streams); + this.broadcast = player.broadcast; + } + + _write(chunk, enc, done) { + if (!this.startTime) this.startTime = Date.now(); + for (const dispatcher of this.broadcast.dispatchers) { + dispatcher._write(chunk, enc); + } + this._step(done); + } + + _destroy(err, cb) { + if (this.player.dispatcher === this) this.player.dispatcher = null; + const { streams } = this; + if (streams.opus) streams.opus.unpipe(this); + if (streams.ffmpeg) streams.ffmpeg.destroy(); + super._destroy(err, cb); + } + + setBitrate(value) { + if (!value || !this.streams.opus || !this.streams.opus.setBitrate) return false; + const bitrate = value === 'auto' ? 48 : value; + this.streams.opus.setBitrate(bitrate * 1000); + return true; + } +} + +module.exports = BroadcastDispatcher; diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index 82f30b4dc..8a6cb75df 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -47,6 +47,12 @@ class StreamDispatcher extends Writable { this.pausedSince = null; this._writeCallback = null; + /** + * The broadcast controlling this dispatcher, if any + * @type {?VoiceBroadcast} + */ + this.broadcast = this.streams.broadcast; + this._pausedTime = 0; this.count = 0; @@ -165,8 +171,10 @@ class StreamDispatcher extends Writable { this._writeCallback = done; return; } - const next = FRAME_LENGTH + (this.count * FRAME_LENGTH) - (Date.now() - this.startTime - this.pausedTime); - setTimeout(done.bind(this), next); + if (!this.streams.broadcast) { + const next = FRAME_LENGTH + (this.count * FRAME_LENGTH) - (Date.now() - this.startTime - this.pausedTime); + setTimeout(done.bind(this), next); + } if (this._sdata.sequence === (2 ** 16) - 1) this._sdata.sequence = -1; if (this._sdata.timestamp === (2 ** 32) - 1) this._sdata.timestamp = -TIMESTAMP_INC; this._sdata.sequence++; @@ -218,6 +226,7 @@ class StreamDispatcher extends Writable { if (this.speaking === value) return; if (this.player.voiceConnection.status !== VoiceStatus.CONNECTED) return; this.speaking = value; + this.player.voiceConnection.setSpeaking(value); /** * Emitted when the dispatcher starts/stops speaking. * @event StreamDispatcher#speaking diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index a681f3a95..954b0b914 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -1,21 +1,13 @@ -const EventEmitter = require('events').EventEmitter; const prism = require('prism-media'); const StreamDispatcher = require('../dispatcher/StreamDispatcher'); - -const FFMPEG_ARGUMENTS = [ - '-analyzeduration', '0', - '-loglevel', '0', - '-f', 's16le', - '-ar', '48000', - '-ac', '2', -]; +const BasePlayer = require('./BasePlayer'); /** * An Audio Player for a Voice Connection. * @private * @extends {EventEmitter} */ -class AudioPlayer extends EventEmitter { +class AudioPlayer extends BasePlayer { constructor(voiceConnection) { super(); /** @@ -23,60 +15,11 @@ class AudioPlayer extends EventEmitter { * @type {VoiceConnection} */ this.voiceConnection = voiceConnection; - - this.dispatcher = null; - - this.streamingData = { - channels: 2, - sequence: 0, - timestamp: 0, - }; - - this.voiceConnection.once('closing', () => this.destroy()); } - destroy() { - this.destroyDispatcher(); - } - - destroyDispatcher() { - if (this.dispatcher) { - this.dispatcher.destroy(); - this.dispatcher = null; - } - } - - playUnknownStream(stream, options) { - this.destroyDispatcher(); - const ffmpeg = new prism.FFmpeg({ args: FFMPEG_ARGUMENTS }); - stream.pipe(ffmpeg); - return this.playPCMStream(ffmpeg, options, { ffmpeg }); - } - - playPCMStream(stream, options, streams = {}) { - this.destroyDispatcher(); - const opus = streams.opus = new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 }); - if (options && options.volume === false) { - stream.pipe(opus); - return this.playOpusStream(opus, options, streams); - } - const volume = streams.volume = new prism.VolumeTransformer16LE(null, { volume: options ? options.volume : 1 }); - stream.pipe(volume).pipe(opus); - return this.playOpusStream(opus, options, streams); - } - - playOpusStream(stream, options, streams = {}) { - this.destroyDispatcher(); - streams.opus = stream; - const dispatcher = this.dispatcher = this.createDispatcher(options, streams); - stream.pipe(dispatcher); - return dispatcher; - } - - createDispatcher(options, streams) { - this.destroyDispatcher(); - const dispatcher = new StreamDispatcher(this, options, streams); - dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + playBroadcast(broadcast, options) { + const dispatcher = this.createDispatcher(options, { broadcast }); + broadcast.dispatchers.push(dispatcher); return dispatcher; } } diff --git a/src/client/voice/player/BasePlayer.js b/src/client/voice/player/BasePlayer.js new file mode 100644 index 000000000..45009c20f --- /dev/null +++ b/src/client/voice/player/BasePlayer.js @@ -0,0 +1,76 @@ +const EventEmitter = require('events').EventEmitter; +const prism = require('prism-media'); +const StreamDispatcher = require('../dispatcher/StreamDispatcher'); + +const FFMPEG_ARGUMENTS = [ + '-analyzeduration', '0', + '-loglevel', '0', + '-f', 's16le', + '-ar', '48000', + '-ac', '2', +]; + +/** + * An Audio Player for a Voice Connection. + * @private + * @extends {EventEmitter} + */ +class BasePlayer extends EventEmitter { + constructor(voiceConnection) { + super(); + + this.dispatcher = null; + + this.streamingData = { + channels: 2, + sequence: 0, + timestamp: 0, + }; + } + + destroy() { + this.destroyDispatcher(); + } + + destroyDispatcher() { + if (this.dispatcher) { + this.dispatcher.destroy(); + this.dispatcher = null; + } + } + + playUnknownStream(stream, options) { + this.destroyDispatcher(); + const ffmpeg = new prism.FFmpeg({ args: FFMPEG_ARGUMENTS }); + stream.pipe(ffmpeg); + return this.playPCMStream(ffmpeg, options, { ffmpeg }); + } + + playPCMStream(stream, options, streams = {}) { + this.destroyDispatcher(); + const opus = streams.opus = new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 }); + if (options && options.volume === false) { + stream.pipe(opus); + return this.playOpusStream(opus, options, streams); + } + const volume = streams.volume = new prism.VolumeTransformer16LE(null, { volume: options ? options.volume : 1 }); + stream.pipe(volume).pipe(opus); + return this.playOpusStream(opus, options, streams); + } + + playOpusStream(stream, options, streams = {}) { + this.destroyDispatcher(); + streams.opus = stream; + const dispatcher = this.createDispatcher(options, streams); + stream.pipe(dispatcher); + return dispatcher; + } + + createDispatcher(options, streams, broadcast) { + this.destroyDispatcher(); + const dispatcher = this.dispatcher = new StreamDispatcher(this, options, streams, broadcast); + return dispatcher; + } +} + +module.exports = BasePlayer; diff --git a/src/client/voice/player/BroadcastAudioPlayer.js b/src/client/voice/player/BroadcastAudioPlayer.js new file mode 100644 index 000000000..41441276d --- /dev/null +++ b/src/client/voice/player/BroadcastAudioPlayer.js @@ -0,0 +1,27 @@ +const prism = require('prism-media'); +const BroadcastDispatcher = require('../dispatcher/BroadcastDispatcher'); +const BasePlayer = require('./BasePlayer'); + +/** + * An Audio Player for a Voice Connection. + * @private + * @extends {EventEmitter} + */ +class AudioPlayer extends BasePlayer { + constructor(broadcast) { + super(); + /** + * The broadcast that the player serves + * @type {VoiceBroadcast} + */ + this.broadcast = broadcast; + } + + createDispatcher(options, streams) { + this.destroyDispatcher(); + const dispatcher = new BroadcastDispatcher(this, options, streams); + return dispatcher; + } +} + +module.exports = AudioPlayer;