From 72a99f9582f9596c318d6c3b3939cde8ec2fd06a Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Thu, 29 Dec 2016 21:22:13 +0000 Subject: [PATCH] start work with broadcast streams --- src/client/Client.js | 13 +++ src/client/voice/VoiceBroadcast.js | 85 +++++++++++++++++++ src/client/voice/VoiceConnection.js | 6 +- .../voice/dispatcher/StreamDispatcher.js | 33 +++++-- src/client/voice/player/AudioPlayer.js | 17 ++++ test/voice.js | 9 ++ 6 files changed, 153 insertions(+), 10 deletions(-) create mode 100644 src/client/voice/VoiceBroadcast.js diff --git a/src/client/Client.js b/src/client/Client.js index 11cd5f403..2e7d18157 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -11,6 +11,7 @@ const ActionsManager = require('./actions/ActionsManager'); const Collection = require('../util/Collection'); const Presence = require('../structures/Presence').Presence; const ShardClientUtil = require('../sharding/ShardClientUtil'); +const VoiceBroadcast = require('./voice/VoiceBroadcast'); /** * The starting point for making a Discord Bot. @@ -136,6 +137,12 @@ class Client extends EventEmitter { */ this.readyAt = null; + /** + * An array of voice broadcasts + * @type {VoiceBroadcast[]} + */ + this.broadcasts = []; + /** * The previous heartbeat pings of the websocket (most recent first, limited to three elements) * @type {number[]} @@ -219,6 +226,12 @@ class Client extends EventEmitter { return typeof window !== 'undefined'; } + createVoiceBroadcast() { + const broadcast = new VoiceBroadcast(this); + this.broadcasts.push(broadcast); + return broadcast; + } + /** * Logs the client in. If successful, resolves with the account's token. If you're making a bot, it's * much better to use a bot account rather than a user account. diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js new file mode 100644 index 000000000..3971264ed --- /dev/null +++ b/src/client/voice/VoiceBroadcast.js @@ -0,0 +1,85 @@ +const EventEmitter = require('events').EventEmitter; +const Prism = require('prism-media'); + +const ffmpegArguments = [ + '-analyzeduration', '0', + '-loglevel', '0', + '-f', 's16le', + '-ar', '48000', + '-ac', '2', +]; + +class VoiceBroadcast extends EventEmitter { + constructor(client) { + super(); + this.client = client; + this.dispatchers = []; + this.prism = new Prism(); + this.currentTranscoder = null; + } + + get _playableStream() { + if (!this.currentTranscoder) return null; + return this.currentTranscoder.transcoder.output || this.currentTranscoder.options.stream; + } + + registerDispatcher(dispatcher) { + if (!this.dispatchers.includes(dispatcher)) this.dispatchers.push(dispatcher); + } + + killCurrentTranscoder() { + if (this.currentTranscoder) { + if (this.currentTranscoder.transcoder) this.currentTranscoder.transcoder.kill(); + this.currentTranscoder = null; + } + } + + playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + const options = { seek, volume, passes }; + options.stream = stream; + return this._playTranscodable(stream, options); + } + + playFile(file, { seek = 0, volume = 1, passes = 1 } = {}) { + const options = { seek, volume, passes }; + return this._playTranscodable(file, options); + } + + _playTranscodable(media, options) { + this.killCurrentTranscoder(); + const transcoder = this.prism.transcode({ + type: 'ffmpeg', + media, + ffmpegArguments: ffmpegArguments.concat(['-ss', String(options.seek)]), + }); + transcoder.once('error', e => this.emit('error', e)); + transcoder.once('end', () => this.killCurrentTranscoder()); + this.currentTranscoder = { + transcoder, + options, + }; + transcoder.output.once('readable', () => this._startPlaying()); + return this; + } + + playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + this.killCurrentTranscoder(); + const options = { seek, volume, passes, stream }; + this.currentTranscoder = { options }; + stream.once('readable', () => this._startPlaying()); + return this; + } + + _startPlaying() { + if (!this._playableStream) return; + const stream = this._playableStream; + const buffer = stream.read(1920 * 2); + + for (const dispatcher of this.dispatchers) { + setImmediate(() => dispatcher.process(buffer, true)); + } + setTimeout(this._startPlaying.bind(this), 20); + } +} + +module.exports = VoiceBroadcast; diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index afcc5b737..5aefd6b58 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -265,7 +265,11 @@ class VoiceConnection extends EventEmitter { */ playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { const options = { seek, volume, passes }; - return this.player.playPCMStream(stream, null, options); + return this.player.playPCMStream(stream, options); + } + + playBroadcast(broadcast) { + return this.player.playBroadcast(broadcast); } /** diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index 64c393d98..04e74a17a 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -1,5 +1,6 @@ const EventEmitter = require('events').EventEmitter; const NaCl = require('tweetnacl'); +const VoiceBroadcast = require('../VoiceBroadcast'); const nonce = new Buffer(24); nonce.fill(0); @@ -20,10 +21,14 @@ class StreamDispatcher extends EventEmitter { super(); this.player = player; this.stream = stream; - this.startStreaming(); + if (!(this.stream instanceof VoiceBroadcast)) this.startStreaming(); this.streamOptions = streamOptions; this.streamOptions.volume = this.streamOptions.volume || 0; + const data = this.streamingData; + data.length = 20; + data.missed = 0; + /** * Whether playing is paused * @type {boolean} @@ -163,7 +168,7 @@ class StreamDispatcher extends EventEmitter { return out; } - process() { + process(buffer, controlled) { try { if (this.destroyed) { this.setSpeaking(false); @@ -180,7 +185,14 @@ class StreamDispatcher extends EventEmitter { if (this.paused) { // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); + // if buffer is provided we are assuming a master process is controlling the dispatcher + if (!buffer) this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); + return; + } + + if (!buffer && controlled) { + data.missed++; + data.pausedTime += data.length * 10; return; } @@ -196,12 +208,14 @@ class StreamDispatcher extends EventEmitter { } const bufferLength = 1920 * data.channels; - let buffer = this.stream.read(bufferLength); - if (!buffer) { - data.missed++; - data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); - return; + if (!controlled) { + buffer = this.stream.read(bufferLength); + if (!buffer) { + data.missed++; + data.pausedTime += data.length * 10; + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); + return; + } } data.missed = 0; @@ -219,6 +233,7 @@ class StreamDispatcher extends EventEmitter { data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; this.sendBuffer(buffer, data.sequence, data.timestamp); + if (controlled) return; const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime); } catch (e) { diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index 65067d79a..6a1ac90b3 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -3,6 +3,7 @@ const Prism = require('prism-media'); const StreamDispatcher = require('../dispatcher/StreamDispatcher'); const Collection = require('../../../util/Collection'); const OpusEncoders = require('../opus/OpusEngineList'); +const VoiceBroadcast = require('../VoiceBroadcast'); const ffmpegArguments = [ '-analyzeduration', '0', @@ -33,6 +34,10 @@ class AudioPlayer extends EventEmitter { } destroyStream(stream) { + if (stream instanceof VoiceBroadcast) { + this.streams.delete(stream); + return; + } const data = this.streams.get(stream); if (!data) return; const transcoder = data.transcoder; @@ -77,6 +82,18 @@ class AudioPlayer extends EventEmitter { dispatcher.on('error', () => this.destroyStream(stream)); return dispatcher; } + + playBroadcast(broadcast, { volume = 1, passes = 1 } = {}) { + const options = { volume, passes }; + this.destroyAllStreams(); + this.streams.set(broadcast, broadcast); + const dispatcher = new StreamDispatcher(this, broadcast, options); + dispatcher.on('end', () => this.destroyStream(broadcast)); + dispatcher.on('error', () => this.destroyStream(broadcast)); + dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + broadcast.registerDispatcher(dispatcher); + return dispatcher; + } } module.exports = AudioPlayer; diff --git a/test/voice.js b/test/voice.js index c7d3b04d6..e41fd53b2 100644 --- a/test/voice.js +++ b/test/voice.js @@ -12,6 +12,8 @@ client.login(auth.token).then(() => console.log('logged')).catch(console.error); const connections = new Map(); +let broadcast; + client.on('message', m => { if (!m.guild) return; if (m.content.startsWith('/join')) { @@ -39,6 +41,13 @@ client.on('message', m => { } doQueue(connData); } + } else if (m.content.startsWith('#eval') && m.author.id === '66564597481480192') { + try { + const com = eval(m.content.split(' ').slice(1).join(' ')); + m.channel.sendMessage(`\`\`\`\n${com}\`\`\``); + } catch (e) { + m.channel.sendMessage(`\`\`\`\n${e}\`\`\``); + } } });