From 7fd94c29d8b427d8c35f4879242ceb7a3b98faf3 Mon Sep 17 00:00:00 2001 From: Programmix Date: Wed, 22 Feb 2017 12:13:52 -0800 Subject: [PATCH] VoiceConnection rework (#1183) * VoiceConnection rework - improves codebase - removes concept of pending connections - attempts to fix memory leaks by removing EventEmitter listeners - makes voice connections keep track of its own channel when it is moved by another user - allows voice connections to reconnect when Discord falls back to another voice server or a region change occurs - adds events for some of the aforementioned events * Removed unused code * More clean up / bugfixes * Added typedefs to Status and VoiceStatus constants --- src/client/voice/ClientVoiceManager.js | 216 ++----------- src/client/voice/VoiceConnection.js | 347 ++++++++++++++++----- src/client/voice/VoiceUDPClient.js | 6 +- src/client/voice/VoiceWebSocket.js | 4 +- src/client/voice/player/AudioPlayer.js | 2 +- src/client/voice/receiver/VoiceReceiver.js | 24 ++ src/util/Constants.js | 27 ++ 7 files changed, 350 insertions(+), 276 deletions(-) diff --git a/src/client/voice/ClientVoiceManager.js b/src/client/voice/ClientVoiceManager.js index 1abe30eb4..e0d3879ff 100644 --- a/src/client/voice/ClientVoiceManager.js +++ b/src/client/voice/ClientVoiceManager.js @@ -1,8 +1,5 @@ const Collection = require('../../util/Collection'); -const mergeDefault = require('../../util/MergeDefault'); -const Constants = require('../../util/Constants'); const VoiceConnection = require('./VoiceConnection'); -const EventEmitter = require('events').EventEmitter; /** * Manages all the voice stuff for the Client @@ -22,53 +19,21 @@ class ClientVoiceManager { */ this.connections = new Collection(); - /** - * Pending connection attempts, maps guild ID to VoiceChannel - * @type {Collection} - */ - this.pending = new Collection(); - this.client.on('self.voiceServer', this.onVoiceServer.bind(this)); this.client.on('self.voiceStateUpdate', this.onVoiceStateUpdate.bind(this)); } - onVoiceServer(data) { - if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setTokenAndEndpoint(data.token, data.endpoint); + onVoiceServer({ guild_id, token, endpoint }) { + const connection = this.connections.get(guild_id); + if (connection) connection.setTokenAndEndpoint(token, endpoint); } - onVoiceStateUpdate(data) { - if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setSessionID(data.session_id); - } - - /** - * Sends a request to the main gateway to join a voice channel - * @param {VoiceChannel} channel The channel to join - * @param {Object} [options] The options to provide - */ - sendVoiceStateUpdate(channel, options = {}) { - if (!this.client.user) throw new Error('Unable to join because there is no client user.'); - if (!channel.permissionsFor) { - throw new Error('Channel does not support permissionsFor; is it really a voice channel?'); + onVoiceStateUpdate({ guild_id, session_id, channel_id }) { + const connection = this.connections.get(guild_id); + if (connection) { + connection.channel = this.client.channels.get(channel_id); + connection.setSessionID(session_id); } - const permissions = channel.permissionsFor(this.client.user); - if (!permissions) { - throw new Error('There is no permission set for the client user in this channel - are they part of the guild?'); - } - if (!permissions.hasPermission('CONNECT')) { - throw new Error('You do not have permission to join this voice channel.'); - } - - options = mergeDefault({ - guild_id: channel.guild.id, - channel_id: channel.id, - self_mute: false, - self_deaf: false, - }, options); - - this.client.ws.send({ - op: Constants.OPCodes.VOICE_STATE_UPDATE, - d: options, - }); } /** @@ -78,7 +43,6 @@ class ClientVoiceManager { */ joinChannel(channel) { return new Promise((resolve, reject) => { - if (this.pending.get(channel.guild.id)) throw new Error('Already connecting to this guild\'s voice server.'); if (!channel.joinable) { if (channel.full) { throw new Error('You do not have permission to join this voice channel; it is full.'); @@ -87,165 +51,31 @@ class ClientVoiceManager { } } - const existingConnection = this.connections.get(channel.guild.id); - if (existingConnection) { - if (existingConnection.channel.id !== channel.id) { - this.sendVoiceStateUpdate(channel); - this.connections.get(channel.guild.id).channel = channel; + let connection = this.connections.get(channel.guild.id); + + if (connection) { + if (connection.channel.id !== channel.id) { + this.connections.get(channel.guild.id).updateChannel(channel); } - resolve(existingConnection); + resolve(connection); return; + } else { + connection = new VoiceConnection(this, channel); + this.connections.set(channel.guild.id, connection); } - const pendingConnection = new PendingVoiceConnection(this, channel); - this.pending.set(channel.guild.id, pendingConnection); - - pendingConnection.on('fail', reason => { - this.pending.delete(channel.guild.id); + connection.once('failed', reason => { + this.connections.delete(channel.guild.id); reject(reason); }); - pendingConnection.on('pass', voiceConnection => { - this.pending.delete(channel.guild.id); - this.connections.set(channel.guild.id, voiceConnection); - voiceConnection.once('ready', () => resolve(voiceConnection)); - voiceConnection.once('error', reject); - voiceConnection.once('disconnect', () => this.connections.delete(channel.guild.id)); + connection.once('authenticated', () => { + connection.once('ready', () => resolve(connection)); + connection.once('error', reject); + connection.once('disconnect', () => this.connections.delete(channel.guild.id)); }); }); } } -/** - * Represents a Pending Voice Connection - * @private - */ -class PendingVoiceConnection extends EventEmitter { - constructor(voiceManager, channel) { - super(); - - /** - * The ClientVoiceManager that instantiated this pending connection - * @type {ClientVoiceManager} - */ - this.voiceManager = voiceManager; - - /** - * The channel that this pending voice connection will attempt to join - * @type {VoiceChannel} - */ - this.channel = channel; - - /** - * The timeout that will be invoked after 15 seconds signifying a failure to connect - * @type {Timeout} - */ - this.deathTimer = this.voiceManager.client.setTimeout( - () => this.fail(new Error('Connection not established within 15 seconds.')), 15000); - - /** - * An object containing data required to connect to the voice servers with - * @type {Object} - */ - this.data = {}; - - this.sendVoiceStateUpdate(); - } - - checkReady() { - if (this.data.token && this.data.endpoint && this.data.session_id) { - this.pass(); - return true; - } else { - return false; - } - } - - /** - * Set the token and endpoint required to connect to the the voice servers - * @param {string} token the token - * @param {string} endpoint the endpoint - * @returns {void} - */ - setTokenAndEndpoint(token, endpoint) { - if (!token) { - this.fail(new Error('Token not provided from voice server packet.')); - return; - } - if (!endpoint) { - this.fail(new Error('Endpoint not provided from voice server packet.')); - return; - } - if (this.data.token) { - this.fail(new Error('There is already a registered token for this connection.')); - return; - } - if (this.data.endpoint) { - this.fail(new Error('There is already a registered endpoint for this connection.')); - return; - } - - endpoint = endpoint.match(/([^:]*)/)[0]; - - if (!endpoint) { - this.fail(new Error('Failed to find an endpoint.')); - return; - } - - this.data.token = token; - this.data.endpoint = endpoint; - - this.checkReady(); - } - - /** - * Sets the Session ID for the connection - * @param {string} sessionID the session ID - */ - setSessionID(sessionID) { - if (!sessionID) { - this.fail(new Error('Session ID not supplied.')); - return; - } - if (this.data.session_id) { - this.fail(new Error('There is already a registered session ID for this connection.')); - return; - } - this.data.session_id = sessionID; - - this.checkReady(); - } - - clean() { - clearInterval(this.deathTimer); - this.emit('fail', new Error('Clean-up triggered :fourTriggered:')); - } - - pass() { - clearInterval(this.deathTimer); - this.emit('pass', this.upgrade()); - } - - fail(reason) { - this.emit('fail', reason); - this.clean(); - } - - sendVoiceStateUpdate() { - try { - this.voiceManager.sendVoiceStateUpdate(this.channel); - } catch (error) { - this.fail(error); - } - } - - /** - * Upgrades this Pending Connection to a full Voice Connection - * @returns {VoiceConnection} - */ - upgrade() { - return new VoiceConnection(this); - } -} - module.exports = ClientVoiceManager; diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index 771312a81..ceb34c427 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -1,5 +1,6 @@ const VoiceWebSocket = require('./VoiceWebSocket'); const VoiceUDP = require('./VoiceUDPClient'); +const mergeDefault = require('../../util/MergeDefault'); const Constants = require('../../util/Constants'); const AudioPlayer = require('./player/AudioPlayer'); const VoiceReceiver = require('./receiver/VoiceReceiver'); @@ -17,14 +18,20 @@ const Prism = require('prism-media'); * @extends {EventEmitter} */ class VoiceConnection extends EventEmitter { - constructor(pendingConnection) { + constructor(voiceManager, channel) { super(); /** - * The Voice Manager that instantiated this connection + * The voice manager that instantiated this connection * @type {ClientVoiceManager} */ - this.voiceManager = pendingConnection.voiceManager; + this.voiceManager = voiceManager; + + /** + * The client that instantiated this connection + * @type {Client} + */ + this.client = voiceManager.client; /** * @external Prism @@ -41,7 +48,13 @@ class VoiceConnection extends EventEmitter { * The voice channel this connection is currently serving * @type {VoiceChannel} */ - this.channel = pendingConnection.channel; + this.channel = channel; + + /** + * The current status of the voice connection + * @type {number} + */ + this.status = Constants.VoiceStatus.AUTHENTICATING; /** * Whether we're currently transmitting audio @@ -60,7 +73,7 @@ class VoiceConnection extends EventEmitter { * @type {Object} * @private */ - this.authentication = pendingConnection.data; + this.authentication = {}; /** * The audio player for this voice connection @@ -93,20 +106,14 @@ class VoiceConnection extends EventEmitter { */ this.ssrcMap = new Map(); - /** - * Whether this connection is ready - * @type {boolean} - * @private - */ - this.ready = false; - /** * Object that wraps contains the `ws` and `udp` sockets of this voice connection * @type {Object} * @private */ this.sockets = {}; - this.connect(); + + this.authenticate(); } /** @@ -128,21 +135,169 @@ class VoiceConnection extends EventEmitter { }); } + /** + * Sends a request to the main gateway to join a voice channel + * @param {Object} [options] The options to provide + */ + sendVoiceStateUpdate(options = {}) { + options = mergeDefault({ + guild_id: this.channel.guild.id, + channel_id: this.channel.id, + self_mute: false, + self_deaf: false, + }, options); + + this.client.ws.send({ + op: Constants.OPCodes.VOICE_STATE_UPDATE, + d: options, + }); + } + + /** + * Set the token and endpoint required to connect to the the voice servers + * @param {string} token The voice token + * @param {string} endpoint The voice endpoint + * @returns {void} + */ + setTokenAndEndpoint(token, endpoint) { + if (!token) { + this.authenticateFailed('Token not provided from voice server packet.'); + return; + } + if (!endpoint) { + this.authenticateFailed('Endpoint not provided from voice server packet.'); + return; + } + + endpoint = endpoint.match(/([^:]*)/)[0]; + + if (!endpoint) { + this.authenticateFailed('Failed to find an endpoint.'); + return; + } + + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + this.authentication.token = token; + this.authentication.endpoint = endpoint; + this.checkAuthenticated(); + } else if (token !== this.authentication.token || endpoint !== this.authentication.endpoint) { + this.reconnect(token, endpoint); + } + } + + /** + * Sets the Session ID for the connection + * @param {string} sessionID The voice session ID + */ + setSessionID(sessionID) { + if (!sessionID) { + this.authenticateFailed('Session ID not supplied.'); + return; + } + + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + this.authentication.sessionID = sessionID; + this.checkAuthenticated(); + } else if (sessionID !== this.authentication.sessionID) { + this.authentication.sessionID = sessionID; + /** + * Emitted when a new session ID is received + * @event VoiceConnection#newSession + * @private + */ + this.emit('newSession', sessionID); + } + } + + /** + * Checks whether the voice connection is authenticated + * @private + */ + checkAuthenticated() { + const { token, endpoint, sessionID } = this.authentication; + + if (token && endpoint && sessionID) { + clearTimeout(this.connectTimeout); + this.status = Constants.VoiceStatus.CONNECTING; + /** + * Emitted when we successfully initiate a voice connection + * @event VoiceConnection#authenticated + */ + this.emit('authenticated'); + this.connect(); + } + } + + /** + * Invoked when we fail to initiate a voice connection + * @param {string} reason The reason for failure + * @private + */ + authenticateFailed(reason) { + clearTimeout(this.connectTimeout); + this.status = Constants.VoiceStatus.DISCONNECTED; + if (this.status === Constants.VoiceStatus.AUTHENTICATING) { + /** + * Emitted when we fail to initiate a voice connection + * @event VoiceConnection#failed + * @param {Error} error The encountered error + */ + this.emit('failed', new Error(reason)); + } else { + this.emit('error', new Error(reason)); + } + } + + /** + * Move to a different voice channel in the same guild + * @param {VoiceChannel} channel The channel to move to + * @private + */ + updateChannel(channel) { + this.channel = channel; + this.sendVoiceStateUpdate(); + } + + /** + * Attempts to authenticate to the voice server + * @private + */ + authenticate() { + this.sendVoiceStateUpdate(); + this.connectTimeout = this.client.setTimeout( + () => this.authenticateFailed(new Error('Connection not established within 15 seconds.')), 15000); + } + + /** + * Attempts to reconnect to the voice server (typically after a region change) + * @param {string} token The voice token + * @param {string} endpoint The voice endpoint + * @private + */ + reconnect(token, endpoint) { + this.authentication.token = token; + this.authentication.endpoint = endpoint; + + this.status = Constants.VoiceStatus.RECONNECTING; + /** + * Emitted when the voice connection is reconnecting (typically after a region change) + * @event VoiceConnection#reconnecting + */ + this.emit('reconnecting'); + this.connect(); + } + /** * Disconnect the voice connection, causing a disconnect and closing event to be emitted. */ disconnect() { this.emit('closing'); - this.voiceManager.client.ws.send({ - op: Constants.OPCodes.VOICE_STATE_UPDATE, - d: { - guild_id: this.channel.guild.id, - channel_id: null, - self_mute: false, - self_deaf: false, - }, + this.sendVoiceStateUpdate({ + channel_id: null, }); this.player.destroy(); + this.cleanup(); + this.status = Constants.VoiceStatus.DISCONNECTED; /** * Emitted when the voice connection disconnects * @event VoiceConnection#disconnect @@ -150,70 +305,108 @@ class VoiceConnection extends EventEmitter { this.emit('disconnect'); } + /** + * Cleans up after disconnect + * @private + */ + cleanup() { + const { ws, udp } = this.sockets; + ws.removeAllListeners('error'); + udp.removeAllListeners('error'); + ws.removeAllListeners('ready'); + ws.removeAllListeners('sessionDescription'); + ws.removeAllListeners('speaking'); + this.sockets.ws = null; + this.sockets.udp = null; + } + /** * Connect the voice connection * @private */ connect() { - if (this.sockets.ws) throw new Error('There is already an existing WebSocket connection.'); - if (this.sockets.udp) throw new Error('There is already an existing UDP connection.'); + if (this.status !== Constants.VoiceStatus.RECONNECTING) { + if (this.sockets.ws) throw new Error('There is already an existing WebSocket connection.'); + if (this.sockets.udp) throw new Error('There is already an existing UDP connection.'); + } + + if (this.sockets.ws) this.sockets.ws.shutdown(); + if (this.sockets.udp) this.sockets.udp.shutdown(); + this.sockets.ws = new VoiceWebSocket(this); this.sockets.udp = new VoiceUDP(this); - this.sockets.ws.on('error', e => this.emit('error', e)); - this.sockets.udp.on('error', e => this.emit('error', e)); - this.sockets.ws.on('ready', d => { - this.authentication.port = d.port; - this.authentication.ssrc = d.ssrc; - /** - * Emitted whenever the connection encounters an error. - * @event VoiceConnection#error - * @param {Error} error the encountered error - */ - this.sockets.udp.findEndpointAddress() - .then(address => { - this.sockets.udp.createUDPSocket(address); - }, e => this.emit('error', e)); - }); - this.sockets.ws.on('sessionDescription', (mode, secret) => { - this.authentication.encryptionMode = mode; - this.authentication.secretKey = secret; - /** - * Emitted once the connection is ready, when a promise to join a voice channel resolves, - * the connection will already be ready. - * @event VoiceConnection#ready - */ - this.emit('ready'); - this.ready = true; - }); - this.sockets.ws.on('speaking', data => { - const guild = this.channel.guild; - const user = this.voiceManager.client.users.get(data.user_id); - this.ssrcMap.set(+data.ssrc, user); - if (!data.speaking) { - for (const receiver of this.receivers) { - const opusStream = receiver.opusStreams.get(user.id); - const pcmStream = receiver.pcmStreams.get(user.id); - if (opusStream) { - opusStream.push(null); - opusStream.open = false; - receiver.opusStreams.delete(user.id); - } - if (pcmStream) { - pcmStream.push(null); - pcmStream.open = false; - receiver.pcmStreams.delete(user.id); - } - } + + const { ws, udp } = this.sockets; + + ws.on('error', err => this.emit('error', err)); + udp.on('error', err => this.emit('error', err)); + ws.on('ready', this.onReady.bind(this)); + ws.on('sessionDescription', this.onSessionDescription.bind(this)); + ws.on('speaking', this.onSpeaking.bind(this)); + } + + /** + * Invoked when the voice websocket is ready + * @param {Object} data The received data + * @private + */ + onReady({ port, ssrc }) { + this.authentication.port = port; + this.authentication.ssrc = ssrc; + + const udp = this.sockets.udp; + /** + * Emitted whenever the connection encounters an error. + * @event VoiceConnection#error + * @param {Error} error The encountered error + */ + udp.findEndpointAddress() + .then(address => { + udp.createUDPSocket(address); + }, e => this.emit('error', e)); + } + + /** + * Invoked when a session description is received + * @param {string} mode The encryption mode + * @param {string} secret The secret key + * @private + */ + onSessionDescription(mode, secret) { + this.authentication.encryptionMode = mode; + this.authentication.secretKey = secret; + + this.status = Constants.VoiceStatus.CONNECTED; + /** + * Emitted once the connection is ready, when a promise to join a voice channel resolves, + * the connection will already be ready. + * @event VoiceConnection#ready + */ + this.emit('ready'); + } + + /** + * Invoked when a speaking event is received + * @param {Object} data The received data + * @private + */ + onSpeaking({ user_id, ssrc, speaking }) { + const guild = this.channel.guild; + const user = this.client.users.get(user_id); + this.ssrcMap.set(+ssrc, user); + if (!speaking) { + for (const receiver of this.receivers) { + receiver.stoppedSpeaking(user); } - /** - * Emitted whenever a user starts/stops speaking - * @event VoiceConnection#speaking - * @param {User} user The user that has started/stopped speaking - * @param {boolean} speaking Whether or not the user is speaking - */ - if (this.ready) this.emit('speaking', user, data.speaking); - guild._memberSpeakUpdate(data.user_id, data.speaking); - }); + } + /** + * Emitted whenever a user starts/stops speaking + * @event VoiceConnection#speaking + * @param {User} user The user that has started/stopped speaking + * @param {boolean} speaking Whether or not the user is speaking + */ + if (this.status === Constants.Status.CONNECTED) this.emit('speaking', user, speaking); + guild._memberSpeakUpdate(user_id, speaking); } /** diff --git a/src/client/voice/VoiceUDPClient.js b/src/client/voice/VoiceUDPClient.js index 7fc2ca577..c8b32f126 100644 --- a/src/client/voice/VoiceUDPClient.js +++ b/src/client/voice/VoiceUDPClient.js @@ -47,12 +47,12 @@ class VoiceConnectionUDPClient extends EventEmitter { shutdown() { if (this.socket) { + this.socket.removeAllListeners('message'); try { this.socket.close(); - } catch (e) { - return; + } finally { + this.socket = null; } - this.socket = null; } } diff --git a/src/client/voice/VoiceWebSocket.js b/src/client/voice/VoiceWebSocket.js index d0996ff97..4edf11a7e 100644 --- a/src/client/voice/VoiceWebSocket.js +++ b/src/client/voice/VoiceWebSocket.js @@ -66,7 +66,7 @@ class VoiceWebSocket extends EventEmitter { connect() { if (this.dead) return; if (this.ws) this.reset(); - if (this.attempts > 5) { + if (this.attempts >= 5) { this.emit('debug', new Error(`Too many connection attempts (${this.attempts}).`)); return; } @@ -124,7 +124,7 @@ class VoiceWebSocket extends EventEmitter { server_id: this.voiceConnection.channel.guild.id, user_id: this.client.user.id, token: this.voiceConnection.authentication.token, - session_id: this.voiceConnection.authentication.session_id, + session_id: this.voiceConnection.authentication.sessionID, }, }).catch(() => { this.emit('error', new Error('Tried to send join packet, but the WebSocket is not open.')); diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index 585af040a..11b611394 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -51,7 +51,7 @@ class AudioPlayer extends EventEmitter { } destroy() { - this.opusEncoder.destroy(); + if (this.opusEncoder) this.opusEncoder.destroy(); } destroyStream(stream) { diff --git a/src/client/voice/receiver/VoiceReceiver.js b/src/client/voice/receiver/VoiceReceiver.js index 71a637bd6..de78322eb 100644 --- a/src/client/voice/receiver/VoiceReceiver.js +++ b/src/client/voice/receiver/VoiceReceiver.js @@ -91,6 +91,30 @@ class VoiceReceiver extends EventEmitter { this.destroyed = true; } + /** + * Invoked when a user stops speaking + * @param {User} user The user that stopped speaking + * @private + */ + stoppedSpeaking(user) { + const opusStream = this.opusStreams.get(user.id); + const pcmStream = this.pcmStreams.get(user.id); + const opusEncoder = this.opusEncoders.get(user.id); + if (opusStream) { + opusStream.push(null); + opusStream.open = false; + this.opusStreams.delete(user.id); + } + if (pcmStream) { + pcmStream.push(null); + pcmStream.open = false; + this.pcmStreams.delete(user.id); + } + if (opusEncoder) { + opusEncoder.destroy(); + } + } + /** * Creates a readable stream for a user that provides opus data while the user is speaking. When the user * stops speaking, the stream is destroyed. diff --git a/src/util/Constants.js b/src/util/Constants.js index df1c7b227..baea1da2a 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -163,6 +163,16 @@ const Endpoints = exports.Endpoints = { emoji: (emojiID) => `${Endpoints.CDN}/emojis/${emojiID}.png`, }; +/** + * The current status of the client. Here are the available statuses: + * - READY + * - CONNECTING + * - RECONNECTING + * - IDLE + * - NEARLY + * - DISCONNECTED + * @typedef {number} Status + */ exports.Status = { READY: 0, CONNECTING: 1, @@ -172,6 +182,23 @@ exports.Status = { DISCONNECTED: 5, }; +/** + * The current status of a voice connection. Here are the available statuses: + * - CONNECTED + * - CONNECTING + * - AUTHENTICATING + * - RECONNECTING + * - DISCONNECTED + * @typedef {number} VoiceStatus + */ +exports.VoiceStatus = { + CONNECTED: 0, + CONNECTING: 1, + AUTHENTICATING: 2, + RECONNECTING: 3, + DISCONNECTED: 4, +}; + exports.ChannelTypes = { text: 0, DM: 1,