diff --git a/README.md b/README.md index b896814e0..870d7e432 100644 --- a/README.md +++ b/README.md @@ -41,6 +41,7 @@ For production bots, using node-opus should be considered a necessity, especiall ### Optional packages - [bufferutil](https://www.npmjs.com/package/bufferutil) to greatly speed up the WebSocket when *not* using uws (`npm install bufferutil --save`) - [erlpack](https://github.com/hammerandchisel/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install hammerandchisel/erlpack --save`) +- [sodium](https://www.npmjs.com/package/sodium) for faster voice packet encryption/decryption (`npm install sodium --save`) - [uws](https://www.npmjs.com/package/uws) for a much faster WebSocket connection (`npm install uws --save`) **Note:** This package does not handle disconnects entirely correctly, which causes automatic reconnection to Discord to not function. If you use this package, it may be wise to destroy + recreate the client entirely or restart the process upon disconnect. diff --git a/docs/general/welcome.md b/docs/general/welcome.md index 4ca23b79e..7acc38121 100644 --- a/docs/general/welcome.md +++ b/docs/general/welcome.md @@ -45,6 +45,7 @@ For production bots, using node-opus should be considered a necessity, especiall ### Optional packages - [bufferutil](https://www.npmjs.com/package/bufferutil) to greatly speed up the WebSocket when *not* using uws (`npm install bufferutil --save`) - [erlpack](https://github.com/hammerandchisel/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install hammerandchisel/erlpack --save`) +- [sodium](https://www.npmjs.com/package/sodium) for faster voice packet encryption/decryption (`npm install sodium --save`) - [uws](https://www.npmjs.com/package/uws) for a much faster WebSocket connection (`npm install uws --save`) **Note:** This package does not handle disconnects entirely correctly, which causes automatic reconnection to Discord to not function. If you use this package, it may be wise to destroy + recreate the client entirely or restart the process upon disconnect. diff --git a/docs/index.yml b/docs/index.yml index d35365af9..9aa9e7948 100644 --- a/docs/index.yml +++ b/docs/index.yml @@ -8,6 +8,8 @@ path: faq.md - name: Topics files: + - name: Voice + path: voice.md - name: Web builds path: web.md - name: Examples diff --git a/docs/topics/voice.md b/docs/topics/voice.md new file mode 100644 index 000000000..24fa405b0 --- /dev/null +++ b/docs/topics/voice.md @@ -0,0 +1,109 @@ +# Introduction to Voice +Voice in discord.js can be used for many things, such as music bots, recording or relaying audio. + +In discord.js, you can use voice by connecting to a `VoiceChannel` to obtain a `VoiceConnection`, where you can start streaming and receiving audio. + +To get started, make sure you have: +* ffmpeg - `npm install --global ffmpeg-binaries` +* an opus encoder, choose one from below: + * `npm install opusscript` + * `npm install node-opus` +* a good network connection + +## Joining a voice channel +The example below reacts to a message and joins the sender's voice channel, catching any errors. This is important +as it allows us to obtain a `VoiceConnection` that we can start to stream audio with + +```js +const Discord = require('discord.js'); +const client = new Discord.Client(); + +client.login('token here'); + +client.on('message', message => { + // voice only works in guilds, if the message does not come from a guild, + // we ignore it + if (!message.guild) return; + + if (message.content === '/join') { + // only try to join the sender's voice channel if they are in one themselves + if (message.member.voiceChannel) { + message.member.voiceChannel.join() + .then(connection => { // connection is an instance of VoiceConnection + message.reply('I have successfully connected to the channel!'); + }) + .catch(console.log); + } else { + message.reply('You need to join a voice channel first!'); + } + } +}); +``` + +## Streaming to a Voice Channel +In the previous example, we looked at how to join a voice channel in order to obtain a `VoiceConnection`. Now that we +have obtained a voice connection, we can start streaming audio to it. The following example shows how to stream an mp3 +file: + +**Playing a file:** +```js +// to play a file, we need to give an absolute path to it +const dispatcher = connection.playFile('C:/Users/Discord/Desktop/myfile.mp3'); +``` + +Your file doesn't have to be just an mp3; ffmpeg can convert videos and audios of many formats. + +The `dispatcher` variable is an instance of a `StreamDispatcher`, which manages streaming a specific resource to a voice +channel. We can do many things with the dispatcher, such as finding out when the stream ends or changing the volume: + +```js +dispatcher.on('end', () => { + // the song has finished +}); + +dispatcher.on('error', e => { + // catch any errors that may arise + console.log(e); +}); + +dispatcher.setVolume(0.5); // set the volume to 50% +dispatcher.setVolume(1); // set the volume back to 100% + +console.log(dispatcher.time); // the time in milliseconds that the stream dispatcher has been playing for + +dispatcher.pause(); // pause the stream +dispatcher.resume(); // carry on playing + +dispatcher.end(); // end the dispatcher, emits 'end' event +``` + +If you have an existing [ReadableStream](https://nodejs.org/api/stream.html#stream_readable_streams), +this can also be used: + +**Playing a ReadableStream:** +```js +connection.playStream(myReadableStream); + +// if you don't want to use absolute paths, you can use +// fs.createReadStream to circumvent it + +const fs = require('fs'); +const stream = fs.createReadStream('./test.mp3'); +connection.playStream(stream); +``` + +It's important to note that creating a readable stream to a file is less efficient than simply using `connection.playFile()`. + +**Playing anything else:** + +For anything else, such as a URL to a file, you can use `connection.playArbitraryInput()`. You should consult the [ffmpeg protocol documentation](https://ffmpeg.org/ffmpeg-protocols.html) to see what you can use this for. + +```js +// play an mp3 from a URL +connection.playArbitraryInput('http://mysite.com/sound.mp3'); +``` + +Again, playing a file from a URL like this is more performant than creating a ReadableStream to the file. + +## Advanced Topics +soon:tm: \ No newline at end of file diff --git a/package.json b/package.json index b5dadbe85..b6ee2c7e2 100644 --- a/package.json +++ b/package.json @@ -35,6 +35,7 @@ "@types/node": "^7.0.0", "long": "^3.2.0", "pako": "^1.0.0", + "prism-media": "hydrabolt/prism-media", "superagent": "^3.4.0", "tweetnacl": "^0.14.0", "ws": "^2.0.0" @@ -44,6 +45,7 @@ "erlpack": "hammerandchisel/erlpack", "node-opus": "^0.2.0", "opusscript": "^0.0.2", + "sodium": "^2.0.1", "uws": "^0.12.0" }, "devDependencies": { @@ -60,9 +62,11 @@ "ws": false, "uws": false, "erlpack": false, + "prism-media": false, "opusscript": false, "node-opus": false, "tweetnacl": false, + "sodium": false, "src/sharding/Shard.js": false, "src/sharding/ShardClientUtil.js": false, "src/sharding/ShardingManager.js": false, @@ -75,12 +79,13 @@ "src/client/voice/pcm/ConverterEngineList.js": false, "src/client/voice/pcm/FfmpegConverterEngine.js": false, "src/client/voice/player/AudioPlayer.js": false, - "src/client/voice/player/BasePlayer.js": false, - "src/client/voice/player/DefaultPlayer.js": false, "src/client/voice/receiver/VoiceReadable.js": false, "src/client/voice/receiver/VoiceReceiver.js": false, + "src/client/voice/util/Secretbox.js": false, "src/client/voice/util/SecretKey.js": false, + "src/client/voice/util/VolumeInterface.js": false, "src/client/voice/ClientVoiceManager.js": false, + "src/client/voice/VoiceBroadcast.js": false, "src/client/voice/VoiceConnection.js": false, "src/client/voice/VoiceUDPClient.js": false, "src/client/voice/VoiceWebSocket.js": false diff --git a/src/client/Client.js b/src/client/Client.js index d7ec95dfb..cc1cec634 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -12,6 +12,7 @@ const ActionsManager = require('./actions/ActionsManager'); const Collection = require('../util/Collection'); const Presence = require('../structures/Presence').Presence; const ShardClientUtil = require('../sharding/ShardClientUtil'); +const VoiceBroadcast = require('./voice/VoiceBroadcast'); /** * The main hub for interacting with the Discord API, and the starting point for any bot. @@ -142,6 +143,12 @@ class Client extends EventEmitter { */ this.readyAt = null; + /** + * Active voice broadcasts that have been created + * @type {VoiceBroadcast[]} + */ + this.broadcasts = []; + /** * Previous heartbeat pings of the websocket (most recent first, limited to three elements) * @type {number[]} @@ -242,6 +249,16 @@ class Client extends EventEmitter { return os.platform() === 'browser'; } + /** + * Creates a voice broadcast. + * @returns {VoiceBroadcast} + */ + createVoiceBroadcast() { + const broadcast = new VoiceBroadcast(this); + this.broadcasts.push(broadcast); + return broadcast; + } + /** * Logs the client in, establishing a websocket connection to Discord. * Both bot and regular user accounts are supported, but it is highly recommended to use a bot account whenever diff --git a/src/client/voice/ClientVoiceManager.js b/src/client/voice/ClientVoiceManager.js index e951891fd..e0d3879ff 100644 --- a/src/client/voice/ClientVoiceManager.js +++ b/src/client/voice/ClientVoiceManager.js @@ -1,8 +1,5 @@ const Collection = require('../../util/Collection'); -const Constants = require('../../util/Constants'); -const Util = require('../../util/Util'); const VoiceConnection = require('./VoiceConnection'); -const EventEmitter = require('events').EventEmitter; /** * Manages all the voice stuff for the Client @@ -22,53 +19,21 @@ class ClientVoiceManager { */ this.connections = new Collection(); - /** - * Pending connection attempts, maps guild ID to VoiceChannel - * @type {Collection} - */ - this.pending = new Collection(); - this.client.on('self.voiceServer', this.onVoiceServer.bind(this)); this.client.on('self.voiceStateUpdate', this.onVoiceStateUpdate.bind(this)); } - onVoiceServer(data) { - if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setTokenAndEndpoint(data.token, data.endpoint); + onVoiceServer({ guild_id, token, endpoint }) { + const connection = this.connections.get(guild_id); + if (connection) connection.setTokenAndEndpoint(token, endpoint); } - onVoiceStateUpdate(data) { - if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setSessionID(data.session_id); - } - - /** - * Sends a request to the main gateway to join a voice channel - * @param {VoiceChannel} channel The channel to join - * @param {Object} [options] The options to provide - */ - sendVoiceStateUpdate(channel, options = {}) { - if (!this.client.user) throw new Error('Unable to join because there is no client user.'); - if (!channel.permissionsFor) { - throw new Error('Channel does not support permissionsFor; is it really a voice channel?'); + onVoiceStateUpdate({ guild_id, session_id, channel_id }) { + const connection = this.connections.get(guild_id); + if (connection) { + connection.channel = this.client.channels.get(channel_id); + connection.setSessionID(session_id); } - const permissions = channel.permissionsFor(this.client.user); - if (!permissions) { - throw new Error('There is no permission set for the client user in this channel - are they part of the guild?'); - } - if (!permissions.hasPermission('CONNECT')) { - throw new Error('You do not have permission to join this voice channel.'); - } - - options = Util.mergeDefault({ - guild_id: channel.guild.id, - channel_id: channel.id, - self_mute: false, - self_deaf: false, - }, options); - - this.client.ws.send({ - op: Constants.OPCodes.VOICE_STATE_UPDATE, - d: options, - }); } /** @@ -78,7 +43,6 @@ class ClientVoiceManager { */ joinChannel(channel) { return new Promise((resolve, reject) => { - if (this.pending.get(channel.guild.id)) throw new Error('Already connecting to this guild\'s voice server.'); if (!channel.joinable) { if (channel.full) { throw new Error('You do not have permission to join this voice channel; it is full.'); @@ -87,165 +51,31 @@ class ClientVoiceManager { } } - const existingConnection = this.connections.get(channel.guild.id); - if (existingConnection) { - if (existingConnection.channel.id !== channel.id) { - this.sendVoiceStateUpdate(channel); - this.connections.get(channel.guild.id).channel = channel; + let connection = this.connections.get(channel.guild.id); + + if (connection) { + if (connection.channel.id !== channel.id) { + this.connections.get(channel.guild.id).updateChannel(channel); } - resolve(existingConnection); + resolve(connection); return; + } else { + connection = new VoiceConnection(this, channel); + this.connections.set(channel.guild.id, connection); } - const pendingConnection = new PendingVoiceConnection(this, channel); - this.pending.set(channel.guild.id, pendingConnection); - - pendingConnection.on('fail', reason => { - this.pending.delete(channel.guild.id); + connection.once('failed', reason => { + this.connections.delete(channel.guild.id); reject(reason); }); - pendingConnection.on('pass', voiceConnection => { - this.pending.delete(channel.guild.id); - this.connections.set(channel.guild.id, voiceConnection); - voiceConnection.once('ready', () => resolve(voiceConnection)); - voiceConnection.once('error', reject); - voiceConnection.once('disconnect', () => this.connections.delete(channel.guild.id)); + connection.once('authenticated', () => { + connection.once('ready', () => resolve(connection)); + connection.once('error', reject); + connection.once('disconnect', () => this.connections.delete(channel.guild.id)); }); }); } } -/** - * Represents a Pending Voice Connection - * @private - */ -class PendingVoiceConnection extends EventEmitter { - constructor(voiceManager, channel) { - super(); - - /** - * The ClientVoiceManager that instantiated this pending connection - * @type {ClientVoiceManager} - */ - this.voiceManager = voiceManager; - - /** - * The channel that this pending voice connection will attempt to join - * @type {VoiceChannel} - */ - this.channel = channel; - - /** - * The timeout that will be invoked after 15 seconds signifying a failure to connect - * @type {Timeout} - */ - this.deathTimer = this.voiceManager.client.setTimeout( - () => this.fail(new Error('Connection not established within 15 seconds.')), 15000); - - /** - * An object containing data required to connect to the voice servers with - * @type {Object} - */ - this.data = {}; - - this.sendVoiceStateUpdate(); - } - - checkReady() { - if (this.data.token && this.data.endpoint && this.data.session_id) { - this.pass(); - return true; - } else { - return false; - } - } - - /** - * Set the token and endpoint required to connect to the the voice servers - * @param {string} token the token - * @param {string} endpoint the endpoint - * @returns {void} - */ - setTokenAndEndpoint(token, endpoint) { - if (!token) { - this.fail(new Error('Token not provided from voice server packet.')); - return; - } - if (!endpoint) { - this.fail(new Error('Endpoint not provided from voice server packet.')); - return; - } - if (this.data.token) { - this.fail(new Error('There is already a registered token for this connection.')); - return; - } - if (this.data.endpoint) { - this.fail(new Error('There is already a registered endpoint for this connection.')); - return; - } - - endpoint = endpoint.match(/([^:]*)/)[0]; - - if (!endpoint) { - this.fail(new Error('Failed to find an endpoint.')); - return; - } - - this.data.token = token; - this.data.endpoint = endpoint; - - this.checkReady(); - } - - /** - * Sets the Session ID for the connection - * @param {string} sessionID the session ID - */ - setSessionID(sessionID) { - if (!sessionID) { - this.fail(new Error('Session ID not supplied.')); - return; - } - if (this.data.session_id) { - this.fail(new Error('There is already a registered session ID for this connection.')); - return; - } - this.data.session_id = sessionID; - - this.checkReady(); - } - - clean() { - clearInterval(this.deathTimer); - this.emit('fail', new Error('Clean-up triggered :fourTriggered:')); - } - - pass() { - clearInterval(this.deathTimer); - this.emit('pass', this.upgrade()); - } - - fail(reason) { - this.emit('fail', reason); - this.clean(); - } - - sendVoiceStateUpdate() { - try { - this.voiceManager.sendVoiceStateUpdate(this.channel); - } catch (error) { - this.fail(error); - } - } - - /** - * Upgrades this Pending Connection to a full Voice Connection - * @returns {VoiceConnection} - */ - upgrade() { - return new VoiceConnection(this); - } -} - module.exports = ClientVoiceManager; diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js new file mode 100644 index 000000000..316c150f2 --- /dev/null +++ b/src/client/voice/VoiceBroadcast.js @@ -0,0 +1,364 @@ +const VolumeInterface = require('./util/VolumeInterface'); +const Prism = require('prism-media'); +const OpusEncoders = require('./opus/OpusEngineList'); +const Collection = require('../../util/Collection'); + +const ffmpegArguments = [ + '-analyzeduration', '0', + '-loglevel', '0', + '-f', 's16le', + '-ar', '48000', + '-ac', '2', +]; + +/** + * A voice broadcast can be played across multiple voice connections for improved shared-stream efficiency. + * @extends {EventEmitter} + */ +class VoiceBroadcast extends VolumeInterface { + constructor(client) { + super(); + /** + * The client that created the broadcast + * @type {Client} + */ + this.client = client; + this._dispatchers = new Collection(); + this._encoders = new Collection(); + /** + * The audio transcoder that this broadcast uses + * @type {Prism} + */ + this.prism = new Prism(); + /** + * The current audio transcoder that is being used + * @type {object} + */ + this.currentTranscoder = null; + this.tickInterval = null; + this._volume = 1; + } + + /** + * An array of subscribed dispatchers + * @type {StreamDispatcher[]} + */ + get dispatchers() { + let d = []; + for (const container of this._dispatchers.values()) { + d = d.concat(Array.from(container.values())); + } + return d; + } + + get _playableStream() { + const currentTranscoder = this.currentTranscoder; + if (!currentTranscoder) return null; + const transcoder = currentTranscoder.transcoder; + const options = currentTranscoder.options; + return (transcoder && transcoder.output) || options.stream; + } + + unregisterDispatcher(dispatcher, old) { + const volume = old || dispatcher.volume; + + /** + * Emitted whenever a Stream Dispatcher unsubscribes from the broadcast + * @event VoiceBroadcast#unsubscribe + * @param {dispatcher} the dispatcher that unsubscribed + */ + this.emit('unsubscribe', dispatcher); + for (const container of this._dispatchers.values()) { + container.delete(dispatcher); + + if (!container.size) { + this._encoders.get(volume).destroy(); + this._dispatchers.delete(volume); + this._encoders.delete(volume); + } + } + } + + registerDispatcher(dispatcher) { + if (!this._dispatchers.has(dispatcher.volume)) { + this._dispatchers.set(dispatcher.volume, new Set()); + this._encoders.set(dispatcher.volume, OpusEncoders.fetch()); + } + const container = this._dispatchers.get(dispatcher.volume); + if (!container.has(dispatcher)) { + container.add(dispatcher); + dispatcher.once('end', () => this.unregisterDispatcher(dispatcher)); + dispatcher.on('volumeChange', (o, n) => { + this.unregisterDispatcher(dispatcher, o); + if (!this._dispatchers.has(n)) { + this._dispatchers.set(n, new Set()); + this._encoders.set(n, OpusEncoders.fetch()); + } + this._dispatchers.get(n).add(dispatcher); + }); + /** + * Emitted whenever a stream dispatcher subscribes to the broadcast + * @event VoiceBroadcast#subscribe + * @param {StreamDispatcher} dispatcher the subscribed dispatcher + */ + this.emit('subscribe', dispatcher); + } + } + + killCurrentTranscoder() { + if (this.currentTranscoder) { + if (this.currentTranscoder.transcoder) this.currentTranscoder.transcoder.kill(); + this.currentTranscoder = null; + this.emit('end'); + } + } + + /** + * Plays any audio stream across the broadcast + * @param {ReadableStream} stream The audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {VoiceBroadcast} + * @example + * // play streams using ytdl-core + * const ytdl = require('ytdl-core'); + * const streamOptions = { seek: 0, volume: 1 }; + * const broadcast = client.createVoiceBroadcast(); + * + * voiceChannel.join() + * .then(connection => { + * const stream = ytdl('https://www.youtube.com/watch?v=XAWgeLF9EVQ', {filter : 'audioonly'}); + * broadcast.playStream(stream); + * const dispatcher = connection.playBroadcast(broadcast); + * }) + * .catch(console.error); + */ + playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + const options = { seek, volume, passes, stream }; + return this._playTranscodable(stream, options); + } + + /** + * Play the given file in the voice connection. + * @param {string} file The absolute path to the file + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + * @example + * // play files natively + * const broadcast = client.createVoiceBroadcast(); + * + * voiceChannel.join() + * .then(connection => { + * broadcast.playFile('C:/Users/Discord/Desktop/music.mp3'); + * const dispatcher = connection.playBroadcast(broadcast); + * }) + * .catch(console.error); + */ + playFile(file, { seek = 0, volume = 1, passes = 1 } = {}) { + const options = { seek, volume, passes }; + return this._playTranscodable(`file:${file}`, options); + } + + _playTranscodable(media, options) { + OpusEncoders.guaranteeOpusEngine(); + + this.killCurrentTranscoder(); + const transcoder = this.prism.transcode({ + type: 'ffmpeg', + media, + ffmpegArguments: ffmpegArguments.concat(['-ss', String(options.seek)]), + }); + /** + * Emitted whenever an error occurs + * @event VoiceBroadcast#error + * @param {Error} error the error that occurred + */ + transcoder.once('error', e => { + if (this.listenerCount('error') > 0) this.emit('error', e); + /** + * Emitted whenever the VoiceBroadcast has any warnings + * @event VoiceBroadcast#warn + * @param {string|Error} warning the warning that was raised + */ + else this.emit('warn', e); + }); + /** + * Emitted once the broadcast (the audio stream) ends + * @event VoiceBroadcast#end + */ + transcoder.once('end', () => this.killCurrentTranscoder()); + this.currentTranscoder = { + transcoder, + options, + }; + transcoder.output.once('readable', () => this._startPlaying()); + return this; + } + + /** + * Plays a stream of 16-bit signed stereo PCM at 48KHz. + * @param {ReadableStream} stream The audio stream to play. + * @param {StreamOptions} [options] Options for playing the stream + * @returns {VoiceBroadcast} + */ + playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); + + this.killCurrentTranscoder(); + const options = { seek, volume, passes, stream }; + this.currentTranscoder = { options }; + stream.once('readable', () => this._startPlaying()); + return this; + } + + /** + * Plays an Opus encoded stream at 48KHz. + * Note that inline volume is not compatible with this method. + * @param {ReadableStream} stream The Opus audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playOpusStream(stream, { seek = 0, passes = 1 } = {}) { + const options = { seek, passes, stream }; + this.currentTranscoder = { options, opus: true }; + stream.once('readable', () => this._startPlaying()); + return this; + } + + /** + * Play an arbitrary input that can be [handled by ffmpeg](https://ffmpeg.org/ffmpeg-protocols.html#Description) + * @param {string} input the arbitrary input + * @param {StreamOptions} [options] Options for playing the stream + * @returns {VoiceBroadcast} + */ + playArbitraryInput(input, { seek = 0, volume = 1, passes = 1 } = {}) { + this.guaranteeOpusEngine(); + + const options = { seek, volume, passes, input }; + return this._playTranscodable(input, options); + } + + /** + * Pauses the entire broadcast - all dispatchers also pause + */ + pause() { + this.paused = true; + for (const container of this._dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.pause(); + } + } + } + + /** + * Resumes the entire broadcast - all dispatchers also resume + */ + resume() { + this.paused = false; + for (const container of this._dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.resume(); + } + } + } + + guaranteeOpusEngine() { + if (!this.opusEncoder) throw new Error('Couldn\'t find an Opus engine.'); + } + + _startPlaying() { + if (this.tickInterval) clearInterval(this.tickInterval); + // this.tickInterval = this.client.setInterval(this.tick.bind(this), 20); + this._startTime = Date.now(); + this._count = 0; + this._pausedTime = 0; + this._missed = 0; + this.tick(); + } + + tick() { + if (!this._playableStream) return; + if (this.paused) { + this._pausedTime += 20; + setTimeout(() => this.tick(), 20); + return; + } + + const opus = this.currentTranscoder.opus; + const buffer = this.readStreamBuffer(); + + if (!buffer) { + this._missed++; + if (this._missed < 5) { + this._pausedTime += 200; + setTimeout(() => this.tick(), 200); + } else { + this.killCurrentTranscoder(); + } + return; + } + + this._missed = 0; + + let packetMatrix = {}; + + const getOpusPacket = (volume) => { + if (packetMatrix[volume]) return packetMatrix[volume]; + + const opusEncoder = this._encoders.get(volume); + const opusPacket = opusEncoder.encode(this.applyVolume(buffer, this._volume * volume)); + packetMatrix[volume] = opusPacket; + return opusPacket; + }; + + for (const dispatcher of this.dispatchers) { + if (opus) { + dispatcher.processPacket(buffer); + continue; + } + + const volume = dispatcher.volume; + dispatcher.processPacket(getOpusPacket(volume)); + } + + const next = 20 + (this._startTime + this._pausedTime + (this._count * 20) - Date.now()); + this._count++; + setTimeout(() => this.tick(), next); + } + + readStreamBuffer() { + const opus = this.currentTranscoder.opus; + const bufferLength = (opus ? 80 : 1920) * 2; + let buffer = this._playableStream.read(bufferLength); + if (opus) return buffer; + if (!buffer) return null; + + if (buffer.length !== bufferLength) { + const newBuffer = Buffer.alloc(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + return buffer; + } + + /** + * Stop the current stream from playing without unsubscribing dispatchers. + */ + end() { + this.killCurrentTranscoder(); + } + + /** + * End the current broadcast, all subscribed dispatchers will also end + */ + destroy() { + this.end(); + for (const container of this._dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.destroy('end', 'broadcast ended'); + } + } + } +} + +module.exports = VoiceBroadcast; diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index ac44ff866..e89eaf3d6 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -1,10 +1,11 @@ const VoiceWebSocket = require('./VoiceWebSocket'); const VoiceUDP = require('./VoiceUDPClient'); +const Util = require('../../util/Util'); const Constants = require('../../util/Constants'); const AudioPlayer = require('./player/AudioPlayer'); const VoiceReceiver = require('./receiver/VoiceReceiver'); const EventEmitter = require('events').EventEmitter; -const fs = require('fs'); +const Prism = require('prism-media'); /** * Represents a connection to a voice channel in Discord. @@ -17,20 +18,43 @@ const fs = require('fs'); * @extends {EventEmitter} */ class VoiceConnection extends EventEmitter { - constructor(pendingConnection) { + constructor(voiceManager, channel) { super(); /** - * The Voice Manager that instantiated this connection + * The voice manager that instantiated this connection * @type {ClientVoiceManager} */ - this.voiceManager = pendingConnection.voiceManager; + this.voiceManager = voiceManager; + + /** + * The client that instantiated this connection + * @type {Client} + */ + this.client = voiceManager.client; + + /** + * @external Prism + * @see {@link https://github.com/hydrabolt/prism-media} + */ + + /** + * The audio transcoder for this connection + * @type {Prism} + */ + this.prism = new Prism(); /** * The voice channel this connection is currently serving * @type {VoiceChannel} */ - this.channel = pendingConnection.channel; + this.channel = channel; + + /** + * The current status of the voice connection + * @type {number} + */ + this.status = Constants.VoiceStatus.AUTHENTICATING; /** * Whether we're currently transmitting audio @@ -49,7 +73,7 @@ class VoiceConnection extends EventEmitter { * @type {Object} * @private */ - this.authentication = pendingConnection.data; + this.authentication = {}; /** * The audio player for this voice connection @@ -73,7 +97,6 @@ class VoiceConnection extends EventEmitter { * @param {string|Error} warning the warning */ this.emit('warn', e); - this.player.cleanup(); }); /** @@ -83,20 +106,14 @@ class VoiceConnection extends EventEmitter { */ this.ssrcMap = new Map(); - /** - * Whether this connection is ready - * @type {boolean} - * @private - */ - this.ready = false; - /** * Object that wraps contains the `ws` and `udp` sockets of this voice connection * @type {Object} * @private */ this.sockets = {}; - this.connect(); + + this.authenticate(); } /** @@ -118,20 +135,169 @@ class VoiceConnection extends EventEmitter { }); } + /** + * Sends a request to the main gateway to join a voice channel + * @param {Object} [options] The options to provide + */ + sendVoiceStateUpdate(options = {}) { + options = Util.mergeDefault({ + guild_id: this.channel.guild.id, + channel_id: this.channel.id, + self_mute: false, + self_deaf: false, + }, options); + + this.client.ws.send({ + op: Constants.OPCodes.VOICE_STATE_UPDATE, + d: options, + }); + } + + /** + * Set the token and endpoint required to connect to the the voice servers + * @param {string} token The voice token + * @param {string} endpoint The voice endpoint + * @returns {void} + */ + setTokenAndEndpoint(token, endpoint) { + if (!token) { + this.authenticateFailed('Token not provided from voice server packet.'); + return; + } + if (!endpoint) { + this.authenticateFailed('Endpoint not provided from voice server packet.'); + return; + } + + endpoint = endpoint.match(/([^:]*)/)[0]; + + if (!endpoint) { + this.authenticateFailed('Failed to find an endpoint.'); + return; + } + + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + this.authentication.token = token; + this.authentication.endpoint = endpoint; + this.checkAuthenticated(); + } else if (token !== this.authentication.token || endpoint !== this.authentication.endpoint) { + this.reconnect(token, endpoint); + } + } + + /** + * Sets the Session ID for the connection + * @param {string} sessionID The voice session ID + */ + setSessionID(sessionID) { + if (!sessionID) { + this.authenticateFailed('Session ID not supplied.'); + return; + } + + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + this.authentication.sessionID = sessionID; + this.checkAuthenticated(); + } else if (sessionID !== this.authentication.sessionID) { + this.authentication.sessionID = sessionID; + /** + * Emitted when a new session ID is received + * @event VoiceConnection#newSession + * @private + */ + this.emit('newSession', sessionID); + } + } + + /** + * Checks whether the voice connection is authenticated + * @private + */ + checkAuthenticated() { + const { token, endpoint, sessionID } = this.authentication; + + if (token && endpoint && sessionID) { + clearTimeout(this.connectTimeout); + this.status = Constants.VoiceStatus.CONNECTING; + /** + * Emitted when we successfully initiate a voice connection + * @event VoiceConnection#authenticated + */ + this.emit('authenticated'); + this.connect(); + } + } + + /** + * Invoked when we fail to initiate a voice connection + * @param {string} reason The reason for failure + * @private + */ + authenticateFailed(reason) { + clearTimeout(this.connectTimeout); + this.status = Constants.VoiceStatus.DISCONNECTED; + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + /** + * Emitted when we fail to initiate a voice connection + * @event VoiceConnection#failed + * @param {Error} error The encountered error + */ + this.emit('failed', new Error(reason)); + } else { + this.emit('error', new Error(reason)); + } + } + + /** + * Move to a different voice channel in the same guild + * @param {VoiceChannel} channel The channel to move to + * @private + */ + updateChannel(channel) { + this.channel = channel; + this.sendVoiceStateUpdate(); + } + + /** + * Attempts to authenticate to the voice server + * @private + */ + authenticate() { + this.sendVoiceStateUpdate(); + this.connectTimeout = this.client.setTimeout( + () => this.authenticateFailed(new Error('Connection not established within 15 seconds.')), 15000); + } + + /** + * Attempts to reconnect to the voice server (typically after a region change) + * @param {string} token The voice token + * @param {string} endpoint The voice endpoint + * @private + */ + reconnect(token, endpoint) { + this.authentication.token = token; + this.authentication.endpoint = endpoint; + + this.status = Constants.VoiceStatus.RECONNECTING; + /** + * Emitted when the voice connection is reconnecting (typically after a region change) + * @event VoiceConnection#reconnecting + */ + this.emit('reconnecting'); + this.connect(); + } + /** * Disconnect the voice connection, causing a disconnect and closing event to be emitted. */ disconnect() { this.emit('closing'); - this.voiceManager.client.ws.send({ - op: Constants.OPCodes.VOICE_STATE_UPDATE, - d: { - guild_id: this.channel.guild.id, - channel_id: null, - self_mute: false, - self_deaf: false, - }, + this.sendVoiceStateUpdate({ + channel_id: null, }); + this.player.destroy(); + this.cleanup(); + this.status = Constants.VoiceStatus.DISCONNECTED; /** * Emitted when the voice connection disconnects * @event VoiceConnection#disconnect @@ -139,70 +305,108 @@ class VoiceConnection extends EventEmitter { this.emit('disconnect'); } + /** + * Cleans up after disconnect + * @private + */ + cleanup() { + const { ws, udp } = this.sockets; + ws.removeAllListeners('error'); + udp.removeAllListeners('error'); + ws.removeAllListeners('ready'); + ws.removeAllListeners('sessionDescription'); + ws.removeAllListeners('speaking'); + this.sockets.ws = null; + this.sockets.udp = null; + } + /** * Connect the voice connection * @private */ connect() { - if (this.sockets.ws) throw new Error('There is already an existing WebSocket connection.'); - if (this.sockets.udp) throw new Error('There is already an existing UDP connection.'); + if (this.status !== Constants.VoiceStatus.RECONNECTING) { + if (this.sockets.ws) throw new Error('There is already an existing WebSocket connection.'); + if (this.sockets.udp) throw new Error('There is already an existing UDP connection.'); + } + + if (this.sockets.ws) this.sockets.ws.shutdown(); + if (this.sockets.udp) this.sockets.udp.shutdown(); + this.sockets.ws = new VoiceWebSocket(this); this.sockets.udp = new VoiceUDP(this); - this.sockets.ws.on('error', e => this.emit('error', e)); - this.sockets.udp.on('error', e => this.emit('error', e)); - this.sockets.ws.once('ready', d => { - this.authentication.port = d.port; - this.authentication.ssrc = d.ssrc; - /** - * Emitted whenever the connection encounters an error. - * @event VoiceConnection#error - * @param {Error} error the encountered error - */ - this.sockets.udp.findEndpointAddress() - .then(address => { - this.sockets.udp.createUDPSocket(address); - }, e => this.emit('error', e)); - }); - this.sockets.ws.once('sessionDescription', (mode, secret) => { - this.authentication.encryptionMode = mode; - this.authentication.secretKey = secret; - /** - * Emitted once the connection is ready, when a promise to join a voice channel resolves, - * the connection will already be ready. - * @event VoiceConnection#ready - */ - this.emit('ready'); - this.ready = true; - }); - this.sockets.ws.on('speaking', data => { - const guild = this.channel.guild; - const user = this.voiceManager.client.users.get(data.user_id); - this.ssrcMap.set(+data.ssrc, user); - if (!data.speaking) { - for (const receiver of this.receivers) { - const opusStream = receiver.opusStreams.get(user.id); - const pcmStream = receiver.pcmStreams.get(user.id); - if (opusStream) { - opusStream.push(null); - opusStream.open = false; - receiver.opusStreams.delete(user.id); - } - if (pcmStream) { - pcmStream.push(null); - pcmStream.open = false; - receiver.pcmStreams.delete(user.id); - } - } + + const { ws, udp } = this.sockets; + + ws.on('error', err => this.emit('error', err)); + udp.on('error', err => this.emit('error', err)); + ws.on('ready', this.onReady.bind(this)); + ws.on('sessionDescription', this.onSessionDescription.bind(this)); + ws.on('speaking', this.onSpeaking.bind(this)); + } + + /** + * Invoked when the voice websocket is ready + * @param {Object} data The received data + * @private + */ + onReady({ port, ssrc }) { + this.authentication.port = port; + this.authentication.ssrc = ssrc; + + const udp = this.sockets.udp; + /** + * Emitted whenever the connection encounters an error. + * @event VoiceConnection#error + * @param {Error} error The encountered error + */ + udp.findEndpointAddress() + .then(address => { + udp.createUDPSocket(address); + }, e => this.emit('error', e)); + } + + /** + * Invoked when a session description is received + * @param {string} mode The encryption mode + * @param {string} secret The secret key + * @private + */ + onSessionDescription(mode, secret) { + this.authentication.encryptionMode = mode; + this.authentication.secretKey = secret; + + this.status = Constants.VoiceStatus.CONNECTED; + /** + * Emitted once the connection is ready, when a promise to join a voice channel resolves, + * the connection will already be ready. + * @event VoiceConnection#ready + */ + this.emit('ready'); + } + + /** + * Invoked when a speaking event is received + * @param {Object} data The received data + * @private + */ + onSpeaking({ user_id, ssrc, speaking }) { + const guild = this.channel.guild; + const user = this.client.users.get(user_id); + this.ssrcMap.set(+ssrc, user); + if (!speaking) { + for (const receiver of this.receivers) { + receiver.stoppedSpeaking(user); } - /** - * Emitted whenever a user starts/stops speaking - * @event VoiceConnection#speaking - * @param {User} user The user that has started/stopped speaking - * @param {boolean} speaking Whether or not the user is speaking - */ - if (this.ready) this.emit('speaking', user, data.speaking); - guild._memberSpeakUpdate(data.user_id, data.speaking); - }); + } + /** + * Emitted whenever a user starts/stops speaking + * @event VoiceConnection#speaking + * @param {User} user The user that has started/stopped speaking + * @param {boolean} speaking Whether or not the user is speaking + */ + if (this.status === Constants.Status.CONNECTED) this.emit('speaking', user, speaking); + guild._memberSpeakUpdate(user_id, speaking); } /** @@ -215,7 +419,7 @@ class VoiceConnection extends EventEmitter { /** * Play the given file in the voice connection. - * @param {string} file The path to the file + * @param {string} file The absolute path to the file * @param {StreamOptions} [options] Options for playing the stream * @returns {StreamDispatcher} * @example @@ -227,7 +431,17 @@ class VoiceConnection extends EventEmitter { * .catch(console.error); */ playFile(file, options) { - return this.playStream(fs.createReadStream(file), options); + return this.player.playUnknownStream(`file:${file}`, options); + } + + /** + * Play an arbitrary input that can be [handled by ffmpeg](https://ffmpeg.org/ffmpeg-protocols.html#Description) + * @param {string} input the arbitrary input + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playArbitraryInput(input, options) { + return this.player.playUnknownStream(input, options); } /** @@ -246,20 +460,44 @@ class VoiceConnection extends EventEmitter { * }) * .catch(console.error); */ - playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; + playStream(stream, options) { return this.player.playUnknownStream(stream, options); } /** * Plays a stream of 16-bit signed stereo PCM at 48KHz. - * @param {ReadableStream} stream The audio stream to play. + * @param {ReadableStream} stream The audio stream to play * @param {StreamOptions} [options] Options for playing the stream * @returns {StreamDispatcher} */ - playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - return this.player.playPCMStream(stream, null, options); + playConvertedStream(stream, options) { + return this.player.playPCMStream(stream, options); + } + + /** + * Plays an Opus encoded stream at 48KHz. + * Note that inline volume is not compatible with this method. + * @param {ReadableStream} stream The Opus audio stream to play + * @param {StreamOptions} [options] Options for playing the stream + * @returns {StreamDispatcher} + */ + playOpusStream(stream, options) { + return this.player.playOpusStream(stream, options); + } + + /** + * Plays a voice broadcast + * @param {VoiceBroadcast} broadcast the broadcast to play + * @returns {StreamDispatcher} + * @example + * // play a broadcast + * const broadcast = client + * .createVoiceBroadcast() + * .playFile('./test.mp3'); + * const dispatcher = voiceConnection.playBroadcast(broadcast); + */ + playBroadcast(broadcast) { + return this.player.playBroadcast(broadcast); } /** diff --git a/src/client/voice/VoiceUDPClient.js b/src/client/voice/VoiceUDPClient.js index b7b0c0cfe..c8b32f126 100644 --- a/src/client/voice/VoiceUDPClient.js +++ b/src/client/voice/VoiceUDPClient.js @@ -47,12 +47,12 @@ class VoiceConnectionUDPClient extends EventEmitter { shutdown() { if (this.socket) { + this.socket.removeAllListeners('message'); try { this.socket.close(); - } catch (e) { - return; + } finally { + this.socket = null; } - this.socket = null; } } @@ -124,7 +124,7 @@ class VoiceConnectionUDPClient extends EventEmitter { }); }); - const blankMessage = new Buffer(70); + const blankMessage = Buffer.alloc(70); blankMessage.writeUIntBE(this.voiceConnection.authentication.ssrc, 0, 4); this.send(blankMessage); } @@ -132,7 +132,7 @@ class VoiceConnectionUDPClient extends EventEmitter { function parseLocalPacket(message) { try { - const packet = new Buffer(message); + const packet = Buffer.from(message); let address = ''; for (let i = 4; i < packet.indexOf(0, i); i++) address += String.fromCharCode(packet[i]); const port = parseInt(packet.readUIntLE(packet.length - 2, 2).toString(10), 10); diff --git a/src/client/voice/VoiceWebSocket.js b/src/client/voice/VoiceWebSocket.js index bafa5dde3..4edf11a7e 100644 --- a/src/client/voice/VoiceWebSocket.js +++ b/src/client/voice/VoiceWebSocket.js @@ -66,8 +66,8 @@ class VoiceWebSocket extends EventEmitter { connect() { if (this.dead) return; if (this.ws) this.reset(); - if (this.attempts > 5) { - this.emit('error', new Error(`Too many connection attempts (${this.attempts}).`)); + if (this.attempts >= 5) { + this.emit('debug', new Error(`Too many connection attempts (${this.attempts}).`)); return; } @@ -124,7 +124,7 @@ class VoiceWebSocket extends EventEmitter { server_id: this.voiceConnection.channel.guild.id, user_id: this.client.user.id, token: this.voiceConnection.authentication.token, - session_id: this.voiceConnection.authentication.session_id, + session_id: this.voiceConnection.authentication.sessionID, }, }).catch(() => { this.emit('error', new Error('Tried to send join packet, but the WebSocket is not open.')); diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index e08a36537..9c3b56303 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -1,7 +1,9 @@ -const EventEmitter = require('events').EventEmitter; -const NaCl = require('tweetnacl'); +const VolumeInterface = require('../util/VolumeInterface'); +const VoiceBroadcast = require('../VoiceBroadcast'); -const nonce = new Buffer(24); +const secretbox = require('../util/Secretbox'); + +const nonce = Buffer.alloc(24); nonce.fill(0); /** @@ -15,36 +17,55 @@ nonce.fill(0); * ``` * @extends {EventEmitter} */ -class StreamDispatcher extends EventEmitter { - constructor(player, stream, sd, streamOptions) { - super(); - this.player = player; - this.stream = stream; - this.streamingData = { - channels: 2, - count: 0, - sequence: sd.sequence, - timestamp: sd.timestamp, - pausedTime: 0, - }; - this._startStreaming(); - this._triggered = false; - this._volume = streamOptions.volume; - +class StreamDispatcher extends VolumeInterface { + constructor(player, stream, streamOptions) { + super(streamOptions); /** - * How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5 - * aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime. - * @type {number} + * The Audio Player that controls this dispatcher + * @type {AudioPlayer} */ - this.passes = streamOptions.passes || 1; + this.player = player; + /** + * The stream that the dispatcher plays + * @type {ReadableStream|VoiceBroadcast} + */ + this.stream = stream; + if (!(this.stream instanceof VoiceBroadcast)) this.startStreaming(); + this.streamOptions = streamOptions; + + const data = this.streamingData; + data.length = 20; + data.missed = 0; /** * Whether playing is paused * @type {boolean} */ this.paused = false; + /** + * Whether this dispatcher has been destroyed + * @type {boolean} + */ + this.destroyed = false; - this.setVolume(streamOptions.volume || 1); + this._opus = streamOptions.opus; + } + + /** + * How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5 + * aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime. + * @type {number} + */ + get passes() { + return this.streamOptions.passes || 1; + } + + set passes(n) { + this.streamOptions.passes = n; + } + + get streamingData() { + return this.player.streamingData; } /** @@ -65,62 +86,27 @@ class StreamDispatcher extends EventEmitter { return this.time + this.streamingData.pausedTime; } - /** - * The volume of the stream, relative to the stream's input volume - * @type {number} - * @readonly - */ - get volume() { - return this._volume; - } - - /** - * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. - * @param {number} volume The volume that you want to set - */ - setVolume(volume) { - this._volume = volume; - } - - /** - * Set the volume in decibels - * @param {number} db The decibels - */ - setVolumeDecibels(db) { - this._volume = Math.pow(10, db / 20); - } - - /** - * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. - * @param {number} value The value for the volume - */ - setVolumeLogarithmic(value) { - this._volume = Math.pow(value, 1.660964); - } - /** * Stops sending voice packets to the voice connection (stream may still progress however) */ - pause() { - this._setPaused(true); - } + pause() { this.setPaused(true); } /** * Resumes sending voice packets to the voice connection (may be further on in the stream than when paused) */ - resume() { - this._setPaused(false); - } + resume() { this.setPaused(false); } + /** * Stops the current stream permanently and emits an `end` event. * @param {string} [reason='user'] An optional reason for stopping the dispatcher. */ end(reason = 'user') { - this._triggerTerminalState('end', reason); + this.destroy('end', reason); } - _setSpeaking(value) { + setSpeaking(value) { + if (this.speaking === value) return; this.speaking = value; /** * Emitted when the dispatcher starts/stops speaking @@ -130,17 +116,31 @@ class StreamDispatcher extends EventEmitter { this.emit('speaking', value); } - _sendBuffer(buffer, sequence, timestamp) { + sendBuffer(buffer, sequence, timestamp, opusPacket) { + opusPacket = opusPacket || this.player.opusEncoder.encode(buffer); + const packet = this.createPacket(sequence, timestamp, opusPacket); + this.sendPacket(packet); + } + + sendPacket(packet) { let repeats = this.passes; - const packet = this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer)); + /** + * Emitted whenever the dispatcher has debug information + * @event StreamDispatcher#debug + * @param {string} info the debug info + */ + this.setSpeaking(true); while (repeats--) { this.player.voiceConnection.sockets.udp.send(packet) - .catch(e => this.emit('debug', `Failed to send a packet ${e}`)); + .catch(e => { + this.setSpeaking(false); + this.emit('debug', `Failed to send a packet ${e}`); + }); } } - _createPacket(sequence, timestamp, buffer) { - const packetBuffer = new Buffer(buffer.length + 28); + createPacket(sequence, timestamp, buffer) { + const packetBuffer = Buffer.alloc(buffer.length + 28); packetBuffer.fill(0); packetBuffer[0] = 0x80; packetBuffer[1] = 0x78; @@ -150,158 +150,168 @@ class StreamDispatcher extends EventEmitter { packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc, 8, 4); packetBuffer.copy(nonce, 0, 0, 12); - buffer = NaCl.secretbox(buffer, nonce, this.player.voiceConnection.authentication.secretKey.key); - + buffer = secretbox.close(buffer, nonce, this.player.voiceConnection.authentication.secretKey.key); for (let i = 0; i < buffer.length; i++) packetBuffer[i + 12] = buffer[i]; return packetBuffer; } - _applyVolume(buffer) { - if (this._volume === 1) return buffer; + processPacket(packet) { + try { + if (this.destroyed) { + this.setSpeaking(false); + return; + } - const out = new Buffer(buffer.length); - for (let i = 0; i < buffer.length; i += 2) { - if (i >= buffer.length - 1) break; - const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i)))); - out.writeInt16LE(uint, i); + const data = this.streamingData; + + if (this.paused) { + this.setSpeaking(false); + data.pausedTime = data.length * 10; + return; + } + + if (!packet) { + data.missed++; + data.pausedTime += data.length * 10; + return; + } + + this.started(); + this.missed = 0; + + this.stepStreamingData(); + this.sendBuffer(null, data.sequence, data.timestamp, packet); + } catch (e) { + this.destroy('error', e); } - - return out; } - _send() { + process() { try { - if (this._triggered) { - this._setSpeaking(false); + if (this.destroyed) { + this.setSpeaking(false); return; } const data = this.streamingData; if (data.missed >= 5) { - this._triggerTerminalState('end', 'Stream is not generating quickly enough.'); + this.destroy('end', 'Stream is not generating quickly enough.'); return; } if (this.paused) { + this.setSpeaking(false); // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } - this._setSpeaking(true); + this.started(); - if (!data.startTime) { - /** - * Emitted once the dispatcher starts streaming - * @event StreamDispatcher#start - */ - this.emit('start'); - data.startTime = Date.now(); - } - - const bufferLength = 1920 * data.channels; - let buffer = this.stream.read(bufferLength); + const buffer = this.readStreamBuffer(); if (!buffer) { data.missed++; data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } data.missed = 0; - if (buffer.length !== bufferLength) { - const newBuffer = new Buffer(bufferLength).fill(0); - buffer.copy(newBuffer); - buffer = newBuffer; + this.stepStreamingData(); + + if (this._opus) { + this.sendBuffer(null, data.sequence, data.timestamp, buffer); + } else { + this.sendBuffer(buffer, data.sequence, data.timestamp); } - buffer = this._applyVolume(buffer); - - data.count++; - data.sequence = (data.sequence + 1) < 65536 ? data.sequence + 1 : 0; - data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - - this._sendBuffer(buffer, data.sequence, data.timestamp); - const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), nextTime); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime); } catch (e) { - this._triggerTerminalState('error', e); + this.destroy('error', e); } } - _triggerEnd(reason) { + readStreamBuffer() { + const data = this.streamingData; + const bufferLength = (this._opus ? 80 : 1920) * data.channels; + let buffer = this.stream.read(bufferLength); + if (this._opus) return buffer; + if (!buffer) return null; + + if (buffer.length !== bufferLength) { + const newBuffer = Buffer.alloc(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + buffer = this.applyVolume(buffer); + return buffer; + } + + started() { + const data = this.streamingData; + + if (!data.startTime) { + /** + * Emitted once the dispatcher starts streaming + * @event StreamDispatcher#start + */ + this.emit('start'); + data.startTime = Date.now(); + } + } + + stepStreamingData() { + const data = this.streamingData; + data.count++; + data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; + data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; + } + + destroy(type, reason) { + if (this.destroyed) return; + this.destroyed = true; + this.setSpeaking(false); + this.emit(type, reason); /** - * Emitted once the stream has ended. Attach a `once` listener to this. + * Emitted once the dispatcher ends + * @param {string} [reason] the reason the dispatcher ended * @event StreamDispatcher#end - * @param {string} reason The reason for the end of the dispatcher. If it ended because it reached the end of the - * stream, this would be `stream`. If you invoke `.end()` without specifying a reason, this would be `user`. */ - this.emit('end', reason); + if (type !== 'end') this.emit('end', `destroyed due to ${type} - ${reason}`); } - _triggerError(err) { - this.emit('end'); - /** - * Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`. - * @event StreamDispatcher#error - * @param {Error} err The encountered error - */ - this.emit('error', err); - } - - _triggerTerminalState(state, err) { - if (this._triggered) return; - /** - * Emitted when the stream wants to give debug information. - * @event StreamDispatcher#debug - * @param {string} information The debug information - */ - this.emit('debug', `Triggered terminal state ${state} - stream is now dead`); - this._triggered = true; - this._setSpeaking(false); - switch (state) { - case 'end': - this._triggerEnd(err); - break; - case 'error': - this._triggerError(err); - break; - default: - this.emit('error', 'Unknown trigger state'); - break; - } - } - - _startStreaming() { + startStreaming() { if (!this.stream) { + /** + * Emitted if the dispatcher encounters an error + * @event StreamDispatcher#error + * @param {string} error the error message + */ this.emit('error', 'No stream'); return; } - this.stream.on('end', err => this._triggerTerminalState('end', err || 'stream')); - this.stream.on('error', err => this._triggerTerminalState('error', err)); + this.stream.on('end', err => this.destroy('end', err || 'stream')); + this.stream.on('error', err => this.destroy('error', err)); const data = this.streamingData; data.length = 20; data.missed = 0; - this.stream.once('readable', () => this._send()); + this.stream.once('readable', () => { + data.startTime = null; + data.count = 0; + this.process(); + }); } - _setPaused(paused) { - if (paused) { - this.paused = true; - this._setSpeaking(false); - } else { - this.paused = false; - this._setSpeaking(true); - } - } + setPaused(paused) { this.setSpeaking(!(this.paused = paused)); } } module.exports = StreamDispatcher; diff --git a/src/client/voice/opus/BaseOpusEngine.js b/src/client/voice/opus/BaseOpusEngine.js index 6c3ba6e34..47c88c7c6 100644 --- a/src/client/voice/opus/BaseOpusEngine.js +++ b/src/client/voice/opus/BaseOpusEngine.js @@ -10,6 +10,10 @@ class BaseOpus { decode(buffer) { return buffer; } + + destroy() { + return; + } } module.exports = BaseOpus; diff --git a/src/client/voice/opus/OpusEngineList.js b/src/client/voice/opus/OpusEngineList.js index ffd512a64..2aa7f17fc 100644 --- a/src/client/voice/opus/OpusEngineList.js +++ b/src/client/voice/opus/OpusEngineList.js @@ -3,6 +3,8 @@ const list = [ require('./OpusScriptEngine'), ]; +let opusEngineFound; + function fetch(Encoder) { try { return new Encoder(); @@ -20,5 +22,10 @@ exports.fetch = () => { const fetched = fetch(encoder); if (fetched) return fetched; } - throw new Error('Couldn\'t find an Opus engine.'); + return null; +}; + +exports.guaranteeOpusEngine = () => { + if (typeof opusEngineFound === 'undefined') opusEngineFound = Boolean(exports.fetch()); + if (!opusEngineFound) throw new Error('Couldn\'t find an Opus engine.'); }; diff --git a/src/client/voice/opus/OpusScriptEngine.js b/src/client/voice/opus/OpusScriptEngine.js index 33b4ff5a6..c902e790c 100644 --- a/src/client/voice/opus/OpusScriptEngine.js +++ b/src/client/voice/opus/OpusScriptEngine.js @@ -2,7 +2,7 @@ const OpusEngine = require('./BaseOpusEngine'); let OpusScript; -class NodeOpusEngine extends OpusEngine { +class OpusScriptEngine extends OpusEngine { constructor(player) { super(player); try { @@ -22,6 +22,11 @@ class NodeOpusEngine extends OpusEngine { super.decode(buffer); return this.encoder.decode(buffer); } + + destroy() { + super.destroy(); + this.encoder.delete(); + } } -module.exports = NodeOpusEngine; +module.exports = OpusScriptEngine; diff --git a/src/client/voice/pcm/ConverterEngine.js b/src/client/voice/pcm/ConverterEngine.js deleted file mode 100644 index 6b7502f90..000000000 --- a/src/client/voice/pcm/ConverterEngine.js +++ /dev/null @@ -1,14 +0,0 @@ -const EventEmitter = require('events').EventEmitter; - -class ConverterEngine extends EventEmitter { - constructor(player) { - super(); - this.player = player; - } - - createConvertStream() { - return; - } -} - -module.exports = ConverterEngine; diff --git a/src/client/voice/pcm/ConverterEngineList.js b/src/client/voice/pcm/ConverterEngineList.js deleted file mode 100644 index 56d430e48..000000000 --- a/src/client/voice/pcm/ConverterEngineList.js +++ /dev/null @@ -1 +0,0 @@ -exports.fetch = () => require('./FfmpegConverterEngine'); diff --git a/src/client/voice/pcm/FfmpegConverterEngine.js b/src/client/voice/pcm/FfmpegConverterEngine.js deleted file mode 100644 index 8fb725bda..000000000 --- a/src/client/voice/pcm/FfmpegConverterEngine.js +++ /dev/null @@ -1,86 +0,0 @@ -const ConverterEngine = require('./ConverterEngine'); -const ChildProcess = require('child_process'); -const EventEmitter = require('events').EventEmitter; - -class PCMConversionProcess extends EventEmitter { - constructor(process) { - super(); - this.process = process; - this.input = null; - this.process.on('error', e => this.emit('error', e)); - } - - setInput(stream) { - this.input = stream; - stream.pipe(this.process.stdin, { end: false }); - this.input.on('error', e => this.emit('error', e)); - this.process.stdin.on('error', e => this.emit('error', e)); - } - - destroy() { - this.emit('debug', 'destroying a ffmpeg process:'); - if (this.input && this.input.unpipe && this.process.stdin) { - this.input.unpipe(this.process.stdin); - this.emit('unpiped the user input stream from the process input stream'); - } - if (this.process.stdin) { - this.process.stdin.end(); - this.emit('ended the process stdin'); - } - if (this.process.stdin.destroy) { - this.process.stdin.destroy(); - this.emit('destroyed the process stdin'); - } - if (this.process.kill) { - this.process.kill(); - this.emit('killed the process'); - } - } - -} - -class FfmpegConverterEngine extends ConverterEngine { - constructor(player) { - super(player); - this.command = chooseCommand(); - } - - handleError(encoder, err) { - if (encoder.destroy) encoder.destroy(); - this.emit('error', err); - } - - createConvertStream(seek = 0) { - super.createConvertStream(); - const encoder = ChildProcess.spawn(this.command, [ - '-analyzeduration', '0', - '-loglevel', '0', - '-i', '-', - '-f', 's16le', - '-ar', '48000', - '-ac', '2', - '-ss', String(seek), - 'pipe:1', - ], { stdio: ['pipe', 'pipe', 'ignore'] }); - return new PCMConversionProcess(encoder); - } -} - -function chooseCommand() { - for (const cmd of [ - 'ffmpeg', - 'avconv', - './ffmpeg', - './avconv', - 'node_modules\\ffmpeg-binaries\\bin\\ffmpeg', - 'node_modules/ffmpeg-binaries/bin/ffmpeg', - ]) { - if (!ChildProcess.spawnSync(cmd, ['-h']).error) return cmd; - } - throw new Error( - 'FFMPEG was not found on your system, so audio cannot be played. ' + - 'Please make sure FFMPEG is installed and in your PATH.' - ); -} - -module.exports = FfmpegConverterEngine; diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index 96c6c24ae..11b611394 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -1,30 +1,41 @@ -const PCMConverters = require('../pcm/ConverterEngineList'); -const OpusEncoders = require('../opus/OpusEngineList'); const EventEmitter = require('events').EventEmitter; +const Prism = require('prism-media'); const StreamDispatcher = require('../dispatcher/StreamDispatcher'); +const Collection = require('../../../util/Collection'); +const OpusEncoders = require('../opus/OpusEngineList'); + +const ffmpegArguments = [ + '-analyzeduration', '0', + '-loglevel', '0', + '-f', 's16le', + '-ar', '48000', + '-ac', '2', +]; /** - * Represents the Audio Player of a Voice Connection - * @extends {EventEmitter} + * An Audio Player for a Voice Connection * @private + * @extends {EventEmitter} */ class AudioPlayer extends EventEmitter { constructor(voiceConnection) { super(); /** - * The voice connection the player belongs to + * The voice connection that the player serves * @type {VoiceConnection} */ this.voiceConnection = voiceConnection; - this.audioToPCM = new (PCMConverters.fetch())(); - this.opusEncoder = OpusEncoders.fetch(); - this.currentConverter = null; /** - * The current stream dispatcher, if a stream is being played - * @type {StreamDispatcher} + * The prism transcoder that the player uses + * @type {Prism} */ - this.dispatcher = null; - this.audioToPCM.on('error', e => this.emit('error', e)); + this.prism = new Prism(); + /** + * The opus encoder that the player uses + * @type {NodeOpusEngine|OpusScriptEngine} + */ + this.opusEncoder = OpusEncoders.fetch(); + this.streams = new Collection(); this.streamingData = { channels: 2, count: 0, @@ -32,49 +43,86 @@ class AudioPlayer extends EventEmitter { timestamp: 0, pausedTime: 0, }; - this.voiceConnection.on('closing', () => this.cleanup(null, 'voice connection closing')); + this.voiceConnection.once('closing', () => this.destroyAllStreams()); + } + + get currentTranscoder() { + return this.streams.last().transcoder; + } + + destroy() { + if (this.opusEncoder) this.opusEncoder.destroy(); + } + + destroyStream(stream) { + const data = this.streams.get(stream); + if (!data) return; + const transcoder = data.transcoder; + const dispatcher = data.dispatcher; + if (transcoder) transcoder.kill(); + if (dispatcher) dispatcher.destroy('end'); + this.streams.delete(stream); + } + + destroyAllStreams(except) { + for (const stream of this.streams.keys()) { + if (except === stream) continue; + if (except === true && this.streams.get(stream) === this.streams.last()) continue; + this.destroyStream(stream); + } } playUnknownStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); const options = { seek, volume, passes }; - stream.on('end', () => { - this.emit('debug', 'Input stream to converter has ended'); + const transcoder = this.prism.transcode({ + type: 'ffmpeg', + media: stream, + ffmpegArguments: ffmpegArguments.concat(['-ss', String(seek)]), }); - stream.on('error', e => this.emit('error', e)); - const conversionProcess = this.audioToPCM.createConvertStream(options.seek); - conversionProcess.on('error', e => this.emit('error', e)); - conversionProcess.setInput(stream); - return this.playPCMStream(conversionProcess.process.stdout, conversionProcess, options); + this.streams.set(transcoder.output, { transcoder, input: stream }); + transcoder.on('error', e => { + this.destroyStream(stream); + if (this.listenerCount('error') > 0) this.emit('error', e); + this.emit('warn', `prism transcoder error - ${e}`); + }); + return this.playPCMStream(transcoder.output, options); } - cleanup(checkStream, reason) { - // cleanup is a lot less aggressive than v9 because it doesn't try to kill every single stream it is aware of - this.emit('debug', `Clean up triggered due to ${reason}`); - const filter = checkStream && this.dispatcher && this.dispatcher.stream === checkStream; - if (this.currentConverter && (checkStream ? filter : true)) { - this.currentConverter.destroy(); - this.currentConverter = null; - } - } - - playPCMStream(stream, converter, { seek = 0, volume = 1, passes = 1 } = {}) { + playPCMStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { + OpusEncoders.guaranteeOpusEngine(); const options = { seek, volume, passes }; - stream.on('end', () => this.emit('debug', 'PCM input stream ended')); - this.cleanup(null, 'outstanding play stream'); - this.currentConverter = converter; - if (this.dispatcher) { - this.streamingData = this.dispatcher.streamingData; - } - stream.on('error', e => this.emit('error', e)); - const dispatcher = new StreamDispatcher(this, stream, this.streamingData, options); - dispatcher.on('error', e => this.emit('error', e)); - dispatcher.on('end', () => this.cleanup(dispatcher.stream, 'dispatcher ended')); - dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); - this.dispatcher = dispatcher; - dispatcher.on('debug', m => this.emit('debug', `Stream dispatch - ${m}`)); + this.destroyAllStreams(stream); + const dispatcher = this.createDispatcher(stream, options); + if (!this.streams.has(stream)) this.streams.set(stream, { dispatcher, input: stream }); + this.streams.get(stream).dispatcher = dispatcher; return dispatcher; } + playOpusStream(stream, { seek = 0, passes = 1 } = {}) { + const options = { seek, passes, opus: true }; + this.destroyAllStreams(stream); + const dispatcher = this.createDispatcher(stream, options); + this.streams.set(stream, { dispatcher, input: stream }); + return dispatcher; + } + + playBroadcast(broadcast, { volume = 1, passes = 1 } = {}) { + const options = { volume, passes }; + this.destroyAllStreams(); + const dispatcher = this.createDispatcher(broadcast, options); + this.streams.set(broadcast, { dispatcher, input: broadcast }); + broadcast.registerDispatcher(dispatcher); + return dispatcher; + } + + createDispatcher(stream, options) { + const dispatcher = new StreamDispatcher(this, stream, options); + dispatcher.on('end', () => this.destroyStream(stream)); + dispatcher.on('error', () => this.destroyStream(stream)); + dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + return dispatcher; + } } module.exports = AudioPlayer; diff --git a/src/client/voice/player/BasePlayer.js b/src/client/voice/player/BasePlayer.js deleted file mode 100644 index d5285cd34..000000000 --- a/src/client/voice/player/BasePlayer.js +++ /dev/null @@ -1,121 +0,0 @@ -const OpusEngines = require('../opus/OpusEngineList'); -const ConverterEngines = require('../pcm/ConverterEngineList'); -const Constants = require('../../../util/Constants'); -const StreamDispatcher = require('../dispatcher/StreamDispatcher'); -const EventEmitter = require('events').EventEmitter; - -class VoiceConnectionPlayer extends EventEmitter { - constructor(connection) { - super(); - this.connection = connection; - this.opusEncoder = OpusEngines.fetch(); - const Engine = ConverterEngines.fetch(); - this.converterEngine = new Engine(this); - this.converterEngine.on('error', err => { - this._shutdown(); - this.emit('error', err); - }); - this.speaking = false; - this.processMap = new Map(); - this.dispatcher = null; - this._streamingData = { - sequence: 0, - timestamp: 0, - }; - } - - convertStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - const encoder = this.converterEngine.createConvertStream(options.seek); - const pipe = stream.pipe(encoder.stdin, { end: false }); - pipe.on('unpipe', () => { - this.killStream(encoder.stdout); - pipe.destroy(); - }); - this.processMap.set(encoder.stdout, { - pcmConverter: encoder, - inputStream: stream, - }); - return encoder.stdout; - } - - _shutdown() { - this.speaking = false; - if (this.dispatcher) this.dispatcher._triggerTerminalState('end', 'ended by parent player shutdown'); - for (const stream of this.processMap.keys()) this.killStream(stream); - } - - killStream(stream) { - const streams = this.processMap.get(stream); - this._streamingData = this.dispatcher.streamingData; - this.emit(Constants.Events.DEBUG, 'Cleaning up player after audio stream ended or encountered an error'); - - const dummyHandler = () => null; - - if (streams) { - this.processMap.delete(stream); - if (streams.inputStream && streams.pcmConverter) { - try { - streams.inputStream.once('error', dummyHandler); - streams.pcmConverter.once('error', dummyHandler); - streams.pcmConverter.stdin.once('error', dummyHandler); - streams.pcmConverter.stdout.once('error', dummyHandler); - if (streams.inputStream.unpipe) { - streams.inputStream.unpipe(streams.pcmConverter.stdin); - this.emit(Constants.Events.DEBUG, '- Unpiped input stream'); - } else if (streams.inputStream.destroy) { - streams.inputStream.destroy(); - this.emit(Constants.Events.DEBUG, '- Couldn\'t unpipe input stream, so destroyed input stream'); - } - if (streams.pcmConverter.stdin) { - streams.pcmConverter.stdin.end(); - this.emit(Constants.Events.DEBUG, '- Ended input stream to PCM converter'); - } - if (streams.pcmConverter && streams.pcmConverter.kill) { - streams.pcmConverter.kill('SIGINT'); - this.emit(Constants.Events.DEBUG, '- Killed the PCM converter'); - } - } catch (err) { - // if an error happened make sure the pcm converter is killed anyway - try { - if (streams.pcmConverter && streams.pcmConverter.kill) { - streams.pcmConverter.kill('SIGINT'); - this.emit(Constants.Events.DEBUG, '- Killed the PCM converter after previous error (abnormal)'); - } - } catch (e) { - return e; - } - return err; - } - } - } - return null; - } - - setSpeaking(value) { - if (this.speaking === value) return; - this.speaking = value; - this.connection.websocket.send({ - op: Constants.VoiceOPCodes.SPEAKING, - d: { - speaking: true, - delay: 0, - }, - }).catch(e => { - this.emit('debug', e); - }); - } - - playPCMStream(pcmStream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - const dispatcher = new StreamDispatcher(this, pcmStream, this._streamingData, options); - dispatcher.on('speaking', value => this.setSpeaking(value)); - dispatcher.on('end', () => this.killStream(pcmStream)); - dispatcher.on('error', () => this.killStream(pcmStream)); - dispatcher.setVolume(options.volume); - this.dispatcher = dispatcher; - return dispatcher; - } -} - -module.exports = VoiceConnectionPlayer; diff --git a/src/client/voice/player/DefaultPlayer.js b/src/client/voice/player/DefaultPlayer.js deleted file mode 100644 index b465e8cd6..000000000 --- a/src/client/voice/player/DefaultPlayer.js +++ /dev/null @@ -1,19 +0,0 @@ -const BasePlayer = require('./BasePlayer'); -const fs = require('fs'); - -class DefaultPlayer extends BasePlayer { - playFile(file, { seek = 0, volume = 1 } = {}) { - const options = { seek: seek, volume: volume }; - return this.playStream(fs.createReadStream(file), options); - } - - playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - this._shutdown(); - const options = { seek, volume, passes }; - const pcmStream = this.convertStream(stream, options); - const dispatcher = this.playPCMStream(pcmStream, options); - return dispatcher; - } -} - -module.exports = DefaultPlayer; diff --git a/src/client/voice/receiver/VoiceReceiver.js b/src/client/voice/receiver/VoiceReceiver.js index bc9156f2f..de78322eb 100644 --- a/src/client/voice/receiver/VoiceReceiver.js +++ b/src/client/voice/receiver/VoiceReceiver.js @@ -1,8 +1,9 @@ const EventEmitter = require('events').EventEmitter; -const NaCl = require('tweetnacl'); +const secretbox = require('../util/Secretbox'); const Readable = require('./VoiceReadable'); +const OpusEncoders = require('../opus/OpusEngineList'); -const nonce = new Buffer(24); +const nonce = Buffer.alloc(24); nonce.fill(0); /** @@ -25,6 +26,7 @@ class VoiceReceiver extends EventEmitter { this.queues = new Map(); this.pcmStreams = new Map(); this.opusStreams = new Map(); + this.opusEncoders = new Map(); /** * Whether or not this receiver has been destroyed. @@ -74,17 +76,45 @@ class VoiceReceiver extends EventEmitter { */ destroy() { this.voiceConnection.sockets.udp.socket.removeListener('message', this._listener); - for (const stream of this.pcmStreams) { - stream[1]._push(null); - this.pcmStreams.delete(stream[0]); + for (const [id, stream] of this.pcmStreams) { + stream._push(null); + this.pcmStreams.delete(id); } - for (const stream of this.opusStreams) { - stream[1]._push(null); - this.opusStreams.delete(stream[0]); + for (const [id, stream] of this.opusStreams) { + stream._push(null); + this.opusStreams.delete(id); + } + for (const [id, encoder] of this.opusEncoders) { + encoder.destroy(); + this.opusEncoders.delete(id); } this.destroyed = true; } + /** + * Invoked when a user stops speaking + * @param {User} user The user that stopped speaking + * @private + */ + stoppedSpeaking(user) { + const opusStream = this.opusStreams.get(user.id); + const pcmStream = this.pcmStreams.get(user.id); + const opusEncoder = this.opusEncoders.get(user.id); + if (opusStream) { + opusStream.push(null); + opusStream.open = false; + this.opusStreams.delete(user.id); + } + if (pcmStream) { + pcmStream.push(null); + pcmStream.open = false; + this.pcmStreams.delete(user.id); + } + if (opusEncoder) { + opusEncoder.destroy(); + } + } + /** * Creates a readable stream for a user that provides opus data while the user is speaking. When the user * stops speaking, the stream is destroyed. @@ -117,17 +147,20 @@ class VoiceReceiver extends EventEmitter { handlePacket(msg, user) { msg.copy(nonce, 0, 0, 12); - let data = NaCl.secretbox.open(msg.slice(12), nonce, this.voiceConnection.authentication.secretKey.key); + let data = secretbox.open(msg.slice(12), nonce, this.voiceConnection.authentication.secretKey.key); if (!data) { /** - * Emitted whenever a voice packet cannot be decrypted + * Emitted whenever a voice packet experiences a problem. * @event VoiceReceiver#warn + * @param {string} reason The reason for the warning. If it happened because the voice packet could not be + * decrypted, this would be `decrypt`. If it happened because the voice packet could not be decoded into + * PCM, this would be `decode`. * @param {string} message The warning message */ - this.emit('warn', 'Failed to decrypt voice packet'); + this.emit('warn', 'decrypt', 'Failed to decrypt voice packet'); return; } - data = new Buffer(data); + data = Buffer.from(data); if (this.opusStreams.get(user.id)) this.opusStreams.get(user.id)._push(data); /** * Emitted whenever voice data is received from the voice connection. This is _always_ emitted (unlike PCM). @@ -137,6 +170,13 @@ class VoiceReceiver extends EventEmitter { */ this.emit('opus', user, data); if (this.listenerCount('pcm') > 0 || this.pcmStreams.size > 0) { + if (!this.opusEncoders.get(user.id)) this.opusEncoders.set(user.id, OpusEncoders.fetch()); + const { pcm, error } = VoiceReceiver._tryDecode(this.opusEncoders.get(user.id), data); + if (error) { + this.emit('warn', 'decode', `Failed to decode packet voice to PCM because: ${error.message}`); + return; + } + if (this.pcmStreams.get(user.id)) this.pcmStreams.get(user.id)._push(pcm); /** * Emits decoded voice data when it's received. For performance reasons, the decoding will only * happen if there is at least one `pcm` listener on this receiver. @@ -144,11 +184,17 @@ class VoiceReceiver extends EventEmitter { * @param {User} user The user that is sending the buffer (is speaking) * @param {Buffer} buffer The decoded buffer */ - const pcm = this.voiceConnection.player.opusEncoder.decode(data); - if (this.pcmStreams.get(user.id)) this.pcmStreams.get(user.id)._push(pcm); this.emit('pcm', user, pcm); } } + + static _tryDecode(encoder, data) { + try { + return { pcm: encoder.decode(data) }; + } catch (error) { + return { error }; + } + } } module.exports = VoiceReceiver; diff --git a/src/client/voice/util/Secretbox.js b/src/client/voice/util/Secretbox.js new file mode 100644 index 000000000..5aa80b088 --- /dev/null +++ b/src/client/voice/util/Secretbox.js @@ -0,0 +1,13 @@ +try { + const sodium = require('sodium'); + module.exports = { + open: sodium.api.crypto_secretbox_open, + close: sodium.api.crypto_secretbox, + }; +} catch (err) { + const tweetnacl = require('tweetnacl'); + module.exports = { + open: tweetnacl.secretbox.open, + close: tweetnacl.secretbox, + }; +} diff --git a/src/client/voice/util/VolumeInterface.js b/src/client/voice/util/VolumeInterface.js new file mode 100644 index 000000000..b12f1e4bc --- /dev/null +++ b/src/client/voice/util/VolumeInterface.js @@ -0,0 +1,64 @@ +const EventEmitter = require('events'); + +class VolumeInterface extends EventEmitter { + constructor({ volume = 0 } = {}) { + super(); + this.setVolume(volume || 1); + } + + applyVolume(buffer, volume) { + volume = volume || this._volume; + if (volume === 1) return buffer; + + const out = new Buffer(buffer.length); + for (let i = 0; i < buffer.length; i += 2) { + if (i >= buffer.length - 1) break; + const uint = Math.min(32767, Math.max(-32767, Math.floor(volume * buffer.readInt16LE(i)))); + out.writeInt16LE(uint, i); + } + + return out; + } + + /** + * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. + * @param {number} volume The volume that you want to set + */ + setVolume(volume) { + /** + * Emitted when the volume of this interface changes + * @event VolumeInterface#volumeChange + * @param {number} oldVolume The old volume of this interface + * @param {number} newVolume The new volume of this interface + */ + this.emit('volumeChange', this._volume, volume); + this._volume = volume; + } + + /** + * Set the volume in decibels + * @param {number} db The decibels + */ + setVolumeDecibels(db) { + this.setVolume(Math.pow(10, db / 20)); + } + + /** + * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. + * @param {number} value The value for the volume + */ + setVolumeLogarithmic(value) { + this.setVolume(Math.pow(value, 1.660964)); + } + + /** + * The current volume of the broadcast + * @readonly + * @type {number} + */ + get volume() { + return this._volume; + } +} + +module.exports = VolumeInterface; diff --git a/src/structures/DMChannel.js b/src/structures/DMChannel.js index 6af543dd6..93f277dcc 100644 --- a/src/structures/DMChannel.js +++ b/src/structures/DMChannel.js @@ -52,10 +52,10 @@ class DMChannel extends Channel { get typingCount() { return; } createCollector() { return; } awaitMessages() { return; } - bulkDelete() { return; } + // doesn't work on DM channels; bulkDelete() { return; } _cacheMessage() { return; } } -TextBasedChannel.applyToClass(DMChannel, true); +TextBasedChannel.applyToClass(DMChannel, true, ['bulkDelete']); module.exports = DMChannel; diff --git a/src/structures/GroupDMChannel.js b/src/structures/GroupDMChannel.js index ef5767db7..c5105c104 100644 --- a/src/structures/GroupDMChannel.js +++ b/src/structures/GroupDMChannel.js @@ -168,10 +168,10 @@ class GroupDMChannel extends Channel { get typingCount() { return; } createCollector() { return; } awaitMessages() { return; } - bulkDelete() { return; } + // doesn't work on group DMs; bulkDelete() { return; } _cacheMessage() { return; } } -TextBasedChannel.applyToClass(GroupDMChannel, true); +TextBasedChannel.applyToClass(GroupDMChannel, true, ['bulkDelete']); module.exports = GroupDMChannel; diff --git a/src/structures/interface/TextBasedChannel.js b/src/structures/interface/TextBasedChannel.js index 6d66e81c8..e25bafce7 100644 --- a/src/structures/interface/TextBasedChannel.js +++ b/src/structures/interface/TextBasedChannel.js @@ -412,7 +412,7 @@ class TextBasedChannel { } } -exports.applyToClass = (structure, full = false) => { +exports.applyToClass = (structure, full = false, ignore = []) => { const props = ['send', 'sendMessage', 'sendEmbed', 'sendFile', 'sendCode']; if (full) { props.push( @@ -431,6 +431,7 @@ exports.applyToClass = (structure, full = false) => { ); } for (const prop of props) { + if (ignore.includes(prop)) continue; Object.defineProperty(structure.prototype, prop, Object.getOwnPropertyDescriptor(TextBasedChannel.prototype, prop)); } }; diff --git a/src/util/Constants.js b/src/util/Constants.js index d0078d08c..e05668a81 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -168,6 +168,16 @@ const Endpoints = exports.Endpoints = { emoji: (emojiID) => `${Endpoints.CDN}/emojis/${emojiID}.png`, }; +/** + * The current status of the client. Here are the available statuses: + * - READY + * - CONNECTING + * - RECONNECTING + * - IDLE + * - NEARLY + * - DISCONNECTED + * @typedef {number} Status + */ exports.Status = { READY: 0, CONNECTING: 1, @@ -177,6 +187,23 @@ exports.Status = { DISCONNECTED: 5, }; +/** + * The current status of a voice connection. Here are the available statuses: + * - CONNECTED + * - CONNECTING + * - AUTHENTICATING + * - RECONNECTING + * - DISCONNECTED + * @typedef {number} VoiceStatus + */ +exports.VoiceStatus = { + CONNECTED: 0, + CONNECTING: 1, + AUTHENTICATING: 2, + RECONNECTING: 3, + DISCONNECTED: 4, +}; + exports.ChannelTypes = { TEXT: 0, DM: 1, diff --git a/test/random.js b/test/random.js index cc65fc759..0a4a2e071 100644 --- a/test/random.js +++ b/test/random.js @@ -10,8 +10,6 @@ const { email, password, token, usertoken, song } = require('./auth.json'); client.login(token).then(atoken => console.log('logged in with token ' + atoken)).catch(console.error); -client.ws.on('send', console.log); - client.on('ready', () => { console.log('ready'); }); @@ -171,7 +169,9 @@ client.on('message', msg => { if (msg.content.startsWith('/play')) { console.log('I am now going to play', msg.content); const chan = msg.content.split(' ').slice(1).join(' '); - con.playStream(ytdl(chan, {filter : 'audioonly'}), { passes : 4 }); + const s = ytdl(chan, {filter:'audioonly'}, { passes : 3 }); + s.on('error', e => console.log(`e w stream 1 ${e}`)); + con.playStream(s); } if (msg.content.startsWith('/join')) { const chan = msg.content.split(' ').slice(1).join(' '); @@ -179,7 +179,9 @@ client.on('message', msg => { .then(conn => { con = conn; msg.reply('done'); - disp = conn.playStream(ytdl(song, {filter:'audioonly'}), { passes : 3 }); + const s = ytdl(song, {filter:'audioonly'}, { passes : 3 }); + s.on('error', e => console.log(`e w stream 2 ${e}`)); + disp = conn.playStream(s); conn.player.on('debug', console.log); conn.player.on('error', err => console.log(123, err)); }) diff --git a/test/voice.js b/test/voice.js new file mode 100644 index 000000000..396bcc490 --- /dev/null +++ b/test/voice.js @@ -0,0 +1,78 @@ +/* eslint no-console: 0 */ +'use strict'; + +const Discord = require('../'); +const ytdl = require('ytdl-core'); + +const client = new Discord.Client({ fetchAllMembers: false, apiRequestMethod: 'sequential' }); + +const auth = require('./auth.json'); + +client.login(auth.token).then(() => console.log('logged')).catch(console.error); + +const connections = new Map(); + +let broadcast; + +client.on('message', m => { + if (!m.guild) return; + if (m.content.startsWith('/join')) { + const channel = m.guild.channels.get(m.content.split(' ')[1]) || m.member.voiceChannel; + if (channel && channel.type === 'voice') { + channel.join().then(conn => { + conn.player.on('error', (...e) => console.log('player', ...e)); + if (!connections.has(m.guild.id)) connections.set(m.guild.id, { conn, queue: [] }); + m.reply('ok!'); + }); + } else { + m.reply('Specify a voice channel!'); + } + } else if (m.content.startsWith('/play')) { + if (connections.has(m.guild.id)) { + const connData = connections.get(m.guild.id); + const queue = connData.queue; + const url = m.content.split(' ').slice(1).join(' ') + .replace(//g, ''); + queue.push({ url, m }); + if (queue.length > 1) { + m.reply(`OK, that's going to play after ${queue.length - 1} songs`); + return; + } + doQueue(connData); + } + } else if (m.content.startsWith('/skip')) { + if (connections.has(m.guild.id)) { + const connData = connections.get(m.guild.id); + if (connData.dispatcher) { + connData.dispatcher.end(); + } + } + } else if (m.content.startsWith('#eval') && m.author.id === '66564597481480192') { + try { + const com = eval(m.content.split(' ').slice(1).join(' ')); + m.channel.sendMessage(`\`\`\`\n${com}\`\`\``); + } catch (e) { + console.log(e); + m.channel.sendMessage(`\`\`\`\n${e}\`\`\``); + } + } +}); + +function doQueue(connData) { + const conn = connData.conn; + const queue = connData.queue; + const item = queue[0]; + if (!item) return; + const stream = ytdl(item.url, { filter: 'audioonly' }, { passes: 3 }); + const dispatcher = conn.playStream(stream); + stream.on('info', info => { + item.m.reply(`OK, playing **${info.title}**`); + }); + dispatcher.on('end', () => { + queue.shift(); + doQueue(connData); + }); + dispatcher.on('error', (...e) => console.log('dispatcher', ...e)); + connData.dispatcher = dispatcher; +} diff --git a/typings b/typings index 997abfd2d..3dbeb51fd 160000 --- a/typings +++ b/typings @@ -1 +1 @@ -Subproject commit 997abfd2d5b3fbc958f4cc11012a5fe41065aee8 +Subproject commit 3dbeb51fd2a0ec0ca87c4ddcf20c1c1498633762