From 65673197d4fbb10038ea4ab650177831a57fa577 Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Wed, 25 Oct 2017 23:14:41 +0100 Subject: [PATCH] Start rewrite with new prism --- package.json | 4 +- src/client/voice/VoiceBroadcast.js | 5 - src/client/voice/VoiceConnection.js | 12 +- .../voice/dispatcher/StreamDispatcher.js | 323 +++--------------- src/client/voice/player/AudioPlayer.js | 128 ++----- test/voice.js | 17 +- 6 files changed, 91 insertions(+), 398 deletions(-) diff --git a/package.json b/package.json index 50fb6bb59..25688e774 100644 --- a/package.json +++ b/package.json @@ -34,7 +34,7 @@ "dependencies": { "long": "^3.0.0", "pako": "^1.0.0", - "prism-media": "^0.0.2", + "prism-media": "github:hydrabolt/prism-media#indev", "snekfetch": "^3.0.0", "tweetnacl": "^1.0.0", "ws": "^3.0.0" @@ -42,8 +42,6 @@ "peerDependencies": { "bufferutil": "^3.0.0", "erlpack": "discordapp/erlpack", - "node-opus": "^0.2.0", - "opusscript": "^0.0.4", "sodium": "^2.0.0", "libsodium-wrappers": "^0.7.0", "uws": "^8.14.0", diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js index 678045dca..0cc8e07ae 100644 --- a/src/client/voice/VoiceBroadcast.js +++ b/src/client/voice/VoiceBroadcast.js @@ -40,11 +40,6 @@ class VoiceBroadcast extends VolumeInterface { * @type {boolean} */ this.paused = false; - /** - * The audio transcoder that this broadcast uses - * @type {Prism} - */ - this.prism = new Prism(); /** * The current audio transcoder that is being used * @type {Object} diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index 1953d6b07..99e69e66e 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -35,17 +35,6 @@ class VoiceConnection extends EventEmitter { */ this.client = voiceManager.client; - /** - * @external Prism - * @see {@link https://github.com/hydrabolt/prism-media} - */ - - /** - * The audio transcoder for this connection - * @type {Prism} - */ - this.prism = new Prism(); - /** * The voice channel this connection is currently serving * @type {VoiceChannel} @@ -494,6 +483,7 @@ class VoiceConnection extends EventEmitter { * .catch(console.error); */ playStream(stream, options) { + console.log('VC!'); return this.player.playUnknownStream(stream, options); } diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index d128039eb..2dc3a6aa6 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -1,9 +1,12 @@ const VolumeInterface = require('../util/VolumeInterface'); const VoiceBroadcast = require('../VoiceBroadcast'); const { VoiceStatus } = require('../../../util/Constants'); +const { Writable } = require('stream'); const secretbox = require('../util/Secretbox'); +const FRAME_LENGTH = 20; + const nonce = Buffer.alloc(24); nonce.fill(0); @@ -18,138 +21,38 @@ nonce.fill(0); * ``` * @implements {VolumeInterface} */ -class StreamDispatcher extends VolumeInterface { - constructor(player, stream, streamOptions) { +class StreamDispatcher extends Writable { + constructor(player, streamOptions) { super(streamOptions); /** * The Audio Player that controls this dispatcher * @type {AudioPlayer} */ this.player = player; - /** - * The stream that the dispatcher plays - * @type {ReadableStream|VoiceBroadcast} - */ - this.stream = stream; - if (!(this.stream instanceof VoiceBroadcast)) this.startStreaming(); this.streamOptions = streamOptions; - - const data = this.streamingData; - data.length = 20; - data.missed = 0; - - /** - * Whether playing is paused - * @type {boolean} - */ - this.paused = false; - /** - * Whether this dispatcher has been destroyed - * @type {boolean} - */ - this.destroyed = false; - - this._opus = streamOptions.opus; + this.startTime = null; + this.on('error', this.destroy.bind(this)); + this.on('finish', () => { + this.destroy.bind(this); + this.emit('end'); + }); } - /** - * How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5 - * aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime - * @type {number} - * @readonly - */ - get passes() { - return this.streamOptions.passes || 1; - } - - set passes(n) { - this.streamOptions.passes = n; - } - - get streamingData() { + get _sdata() { return this.player.streamingData; } - /** - * How long the stream dispatcher has been "speaking" for - * @type {number} - * @readonly - */ - get time() { - return this.streamingData.count * (this.streamingData.length || 0); - } - - /** - * The total time, taking into account pauses and skips, that the dispatcher has been streaming for - * @type {number} - * @readonly - */ - get totalStreamTime() { - return this.time + this.streamingData.pausedTime; - } - - /** - * Stops sending voice packets to the voice connection (stream may still progress however). - */ - pause() { this.setPaused(true); } - - /** - * Resumes sending voice packets to the voice connection (may be further on in the stream than when paused). - */ - resume() { this.setPaused(false); } - - - /** - * Stops the current stream permanently and emits an `end` event. - * @param {string} [reason='user'] An optional reason for stopping the dispatcher - */ - end(reason = 'user') { - this.destroy('end', reason); - } - - setSpeaking(value) { - if (this.speaking === value) return; - if (this.player.voiceConnection.status !== VoiceStatus.CONNECTED) return; - this.speaking = value; - /** - * Emitted when the dispatcher starts/stops speaking. - * @event StreamDispatcher#speaking - * @param {boolean} value Whether or not the dispatcher is speaking - */ - this.emit('speaking', value); - } - - - /** - * Sets the bitrate of the current Opus encoder. - * @param {number} bitrate New bitrate, in kbps. - * If set to 'auto', the voice channel's bitrate will be used - */ - setBitrate(bitrate) { - this.player.setBitrate(bitrate); - } - - sendBuffer(buffer, sequence, timestamp, opusPacket) { - opusPacket = opusPacket || this.player.opusEncoder.encode(buffer); - const packet = this.createPacket(sequence, timestamp, opusPacket); - this.sendPacket(packet); - } - - sendPacket(packet) { - let repeats = this.passes; - /** - * Emitted whenever the dispatcher has debug information. - * @event StreamDispatcher#debug - * @param {string} info The debug info - */ + _write(chunk, enc, done) { + if (!this.startTime) this.startTime = Date.now(); this.setSpeaking(true); - while (repeats--) { - this.player.voiceConnection.sockets.udp.send(packet) - .catch(e => { - this.setSpeaking(false); - this.emit('debug', `Failed to send a packet ${e}`); - }); - } + const packet = this.createPacket(this._sdata.sequence, this._sdata.timestamp, chunk); + this.sendPacket(packet); + const next = FRAME_LENGTH + (this.startTime + (this._sdata.count * FRAME_LENGTH) - Date.now()); + setTimeout(done.bind(this), next); + // Do overflow checks here! + this._sdata.sequence++; + this._sdata.timestamp += 960; + this._sdata.count++; } createPacket(sequence, timestamp, buffer) { @@ -169,163 +72,41 @@ class StreamDispatcher extends VolumeInterface { return packetBuffer; } - processPacket(packet) { - try { - if (this.destroyed) { - this.setSpeaking(false); - return; - } - - const data = this.streamingData; - - if (this.paused) { - this.setSpeaking(false); - data.pausedTime = data.length * 10; - return; - } - - if (!packet) { - data.missed++; - data.pausedTime += data.length * 10; - return; - } - - this.started(); - this.missed = 0; - - this.stepStreamingData(); - this.sendBuffer(null, data.sequence, data.timestamp, packet); - } catch (e) { - this.destroy('error', e); - } - } - - process() { - try { - if (this.destroyed) { - this.setSpeaking(false); - return; - } - - const data = this.streamingData; - - if (data.missed >= 5) { - this.destroy('end', 'Stream is not generating quickly enough.'); - return; - } - - if (this.paused) { - this.setSpeaking(false); - // Old code? - // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); - return; - } - - this.started(); - - const buffer = this.readStreamBuffer(); - if (!buffer) { - data.missed++; - data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); - return; - } - - data.missed = 0; - - this.stepStreamingData(); - - if (this._opus) { - this.sendBuffer(null, data.sequence, data.timestamp, buffer); - } else { - this.sendBuffer(buffer, data.sequence, data.timestamp); - } - - const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); - this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime); - } catch (e) { - this.destroy('error', e); - } - } - - readStreamBuffer() { - const data = this.streamingData; - const bufferLength = (this._opus ? 80 : 1920) * data.channels; - let buffer = this.stream.read(bufferLength); - if (this._opus) return buffer; - if (!buffer) return null; - - if (buffer.length !== bufferLength) { - const newBuffer = Buffer.alloc(bufferLength).fill(0); - buffer.copy(newBuffer); - buffer = newBuffer; - } - - buffer = this.applyVolume(buffer); - return buffer; - } - - started() { - const data = this.streamingData; - - if (!data.startTime) { - /** - * Emitted once the dispatcher starts streaming. - * @event StreamDispatcher#start - */ - this.emit('start'); - data.startTime = Date.now(); - } - } - - stepStreamingData() { - const data = this.streamingData; - data.count++; - data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; - data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - } - - destroy(type, reason) { - if (this.destroyed) return; - this.destroyed = true; - this.setSpeaking(false); - this.emit(type, reason); + sendPacket(packet) { + let repeats = 1; /** - * Emitted once the dispatcher ends. - * @param {string} [reason] The reason the dispatcher ended - * @event StreamDispatcher#end + * Emitted whenever the dispatcher has debug information. + * @event StreamDispatcher#debug + * @param {string} info The debug info */ - if (type !== 'end') this.emit('end', `destroyed due to ${type} - ${reason}`); - } - - startStreaming() { - if (!this.stream) { - /** - * Emitted if the dispatcher encounters an error. - * @event StreamDispatcher#error - * @param {string} error The error message - */ - this.emit('error', 'No stream'); - return; + this.setSpeaking(true); + while (repeats--) { + this.player.voiceConnection.sockets.udp.send(packet) + .catch(e => { + this.setSpeaking(false); + this.emit('debug', `Failed to send a packet ${e}`); + }); } - - this.stream.on('end', err => this.destroy('end', err || 'stream')); - this.stream.on('error', err => this.destroy('error', err)); - - const data = this.streamingData; - data.length = 20; - data.missed = 0; - - this.stream.once('readable', () => { - data.startTime = null; - data.count = 0; - this.process(); - }); } - setPaused(paused) { this.setSpeaking(!(this.paused = paused)); } + setSpeaking(value) { + if (this.speaking === value) return; + if (this.player.voiceConnection.status !== VoiceStatus.CONNECTED) return; + this.speaking = value; + /** + * Emitted when the dispatcher starts/stops speaking. + * @event StreamDispatcher#speaking + * @param {boolean} value Whether or not the dispatcher is speaking + */ + this.emit('speaking', value); + } + + destroy() { + const streams = this.player.streams; + if (streams.opus) streams.opus.unpipe(this); + if (streams.ffmpeg) streams.ffmpeg.destroy(); + this.end(); + } } module.exports = StreamDispatcher; diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index 5380de3f8..acb17e6c8 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -1,10 +1,10 @@ const EventEmitter = require('events').EventEmitter; -const Prism = require('prism-media'); +const prism = require('prism-media'); const StreamDispatcher = require('../dispatcher/StreamDispatcher'); const Collection = require('../../../util/Collection'); const OpusEncoders = require('../opus/OpusEngineList'); -const ffmpegArguments = [ +const FFMPEG_ARGUMENTS = [ '-analyzeduration', '0', '-loglevel', '0', '-f', 's16le', @@ -25,13 +25,10 @@ class AudioPlayer extends EventEmitter { * @type {VoiceConnection} */ this.voiceConnection = voiceConnection; - /** - * The prism transcoder that the player uses - * @type {Prism} - */ - this.prism = new Prism(); - this.streams = new Collection(); - this.currentStream = {}; + + this.streams = {}; + this.dispatcher = null; + this.streamingData = { channels: 2, count: 0, @@ -39,45 +36,19 @@ class AudioPlayer extends EventEmitter { timestamp: 0, pausedTime: 0, }; + this.voiceConnection.once('closing', () => this.destroyCurrentStream()); } - /** - * The current transcoder - * @type {?Object} - * @readonly - */ - get transcoder() { - return this.currentStream.transcoder; - } - - /** - * The current dispatcher - * @type {?StreamDispatcher} - * @readonly - */ - get dispatcher() { - return this.currentStream.dispatcher; - } - destroy() { - if (this.opusEncoder) this.opusEncoder.destroy(); - this.opusEncoder = null; + this.destroyDispatcher(); } - destroyCurrentStream() { - const transcoder = this.transcoder; - const dispatcher = this.dispatcher; - if (transcoder) transcoder.kill(); - if (dispatcher) { - const end = dispatcher.listeners('end')[0]; - const error = dispatcher.listeners('error')[0]; - if (end) dispatcher.removeListener('end', end); - if (error) dispatcher.removeListener('error', error); - dispatcher.destroy('end'); + destroyDispatcher() { + if (this.dispatcher) { + this.dispatcher.destroy(); + this.dispatcher = null; } - this.currentStream = {}; - this.streamingData.pausedTime = 0; } /** @@ -93,76 +64,35 @@ class AudioPlayer extends EventEmitter { } playUnknownStream(stream, options = {}) { - this.destroy(); - this.opusEncoder = OpusEncoders.fetch(options); - const transcoder = this.prism.transcode({ - type: 'ffmpeg', - media: stream, - ffmpegArguments: ffmpegArguments.concat(['-ss', String(options.seek || 0)]), - }); - this.destroyCurrentStream(); - this.currentStream = { - transcoder: transcoder, - output: transcoder.output, - input: stream, - }; - transcoder.on('error', e => { - this.destroyCurrentStream(); - if (this.listenerCount('error') > 0) this.emit('error', e); - this.emit('warn', `prism transcoder error - ${e}`); - }); - return this.playPCMStream(transcoder.output, options, true); + this.destroyDispatcher(); + const ffmpeg = this.streams.ffmpeg = new prism.FFmpeg({ args: FFMPEG_ARGUMENTS }); + stream.pipe(ffmpeg); + return this.playPCMStream(ffmpeg, options); } - playPCMStream(stream, options = {}, fromUnknown = false) { - this.destroy(); - this.opusEncoder = OpusEncoders.fetch(options); - this.setBitrate(options.bitrate); - const dispatcher = this.createDispatcher(stream, options); - if (fromUnknown) { - this.currentStream.dispatcher = dispatcher; - } else { - this.destroyCurrentStream(); - this.currentStream = { - dispatcher, - input: stream, - output: stream, - }; - } - return dispatcher; + playPCMStream(stream, options = {}) { + this.destroyDispatcher(); + const opus = this.streams.opus = new prism.opus.Encoder({ channels: 2, rate: 48000, frameSize: 960 }); + stream.pipe(opus); + return this.playOpusStream(opus, options); } playOpusStream(stream, options = {}) { - options.opus = true; - this.destroyCurrentStream(); - const dispatcher = this.createDispatcher(stream, options); - this.currentStream = { - dispatcher, - input: stream, - output: stream, - }; + this.destroyDispatcher(); + const dispatcher = this.dispatcher = this.createDispatcher(options); + stream.pipe(dispatcher); return dispatcher; } playBroadcast(broadcast, options) { - this.destroyCurrentStream(); - const dispatcher = this.createDispatcher(broadcast, options); - this.currentStream = { - dispatcher, - broadcast, - input: broadcast, - output: broadcast, - }; - broadcast.registerDispatcher(dispatcher); - return dispatcher; + } - createDispatcher(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + createDispatcher({ seek = 0, volume = 1, passes = 1 } = {}) { + this.destroyDispatcher(); const options = { seek, volume, passes }; - - const dispatcher = new StreamDispatcher(this, stream, options); - dispatcher.on('end', () => this.destroyCurrentStream()); - dispatcher.on('error', () => this.destroyCurrentStream()); + const dispatcher = new StreamDispatcher(this, options); + this.streamingData.count = 0; dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); return dispatcher; } diff --git a/test/voice.js b/test/voice.js index 0b36636a3..b66a4b38a 100644 --- a/test/voice.js +++ b/test/voice.js @@ -6,7 +6,7 @@ const ytdl = require('ytdl-core'); const client = new Discord.Client({ fetchAllMembers: false, apiRequestMethod: 'sequential' }); -const auth = require('./auth.json'); +const auth = require('./auth.js'); client.login(auth.token).then(() => console.log('logged')).catch(console.error); @@ -14,8 +14,12 @@ const connections = new Map(); let broadcast; +client.on('debug', console.log); +client.on('error', console.log); + client.on('message', m => { if (!m.guild) return; + if (m.author.id !== '66564597481480192') return; if (m.content.startsWith('/join')) { const channel = m.guild.channels.get(m.content.split(' ')[1]) || m.member.voiceChannel; if (channel && channel.type === 'voice') { @@ -23,23 +27,18 @@ client.on('message', m => { conn.player.on('error', (...e) => console.log('player', ...e)); if (!connections.has(m.guild.id)) connections.set(m.guild.id, { conn, queue: [] }); m.reply('ok!'); + conn.playStream(ytdl('https://www.youtube.com/watch?v=i3Jv9fNPjgk')); }); } else { m.reply('Specify a voice channel!'); } } else if (m.content.startsWith('/play')) { if (connections.has(m.guild.id)) { - const connData = connections.get(m.guild.id); - const queue = connData.queue; const url = m.content.split(' ').slice(1).join(' ') .replace(//g, ''); - queue.push({ url, m }); - if (queue.length > 1) { - m.reply(`OK, that's going to play after ${queue.length - 1} songs`); - return; - } - doQueue(connData); + const stream = ytdl(item.url, { filter: 'audioonly' }, { passes: 3 }); + m.guild.voiceConnection.playStream(stream); } } else if (m.content.startsWith('/skip')) { if (connections.has(m.guild.id)) {