diff --git a/src/client/voice/ClientVoiceManager.js b/src/client/voice/ClientVoiceManager.js index 09b23c637..da089c945 100644 --- a/src/client/voice/ClientVoiceManager.js +++ b/src/client/voice/ClientVoiceManager.js @@ -258,7 +258,8 @@ class ClientVoiceManager { }); pendingConnection.on('pass', voiceConnection => { - // do stuff + this.pending.delete(channel.guild.id); + this.connections.set(channel.guild.id, voiceConnection); }); }); } diff --git a/src/client/voice/VoiceConnection.js b/src/client/voice/VoiceConnection.js index bde83f060..cc1205d85 100644 --- a/src/client/voice/VoiceConnection.js +++ b/src/client/voice/VoiceConnection.js @@ -1,5 +1,5 @@ -const VoiceConnectionWebSocket = require('./VoiceConnectionWebSocket'); -const VoiceConnectionUDPClient = require('./VoiceConnectionUDPClient'); +const VoiceWebSocket = require('./VoiceConnectionWebSocket'); +const VoiceUDP = require('./VoiceConnectionUDPClient'); const VoiceReceiver = require('./receiver/VoiceReceiver'); const Constants = require('../../util/Constants'); const EventEmitter = require('events').EventEmitter; @@ -16,269 +16,45 @@ const DefaultPlayer = require('./player/DefaultPlayer'); * @extends {EventEmitter} */ class VoiceConnection extends EventEmitter { - constructor(manager, channel, token, sessionID, endpoint, resolve, reject) { + + constructor(pendingConnection) { super(); - /** - * The voice manager of this connection + * The Voice Manager that instantiated this connection * @type {ClientVoiceManager} - * @private */ - this.manager = manager; + this.voiceManager = pendingConnection.voiceManager; /** - * The player - * @type {BasePlayer} - */ - this.player = new DefaultPlayer(this); - - /** - * The endpoint of the connection - * @type {string} - */ - this.endpoint = endpoint; - - /** - * The VoiceChannel for this connection + * The voice channel this connection is currently serving * @type {VoiceChannel} */ - this.channel = channel; + this.channel = pendingConnection.channel; /** - * The WebSocket connection for this voice connection - * @type {VoiceConnectionWebSocket} - * @private + * The authentication data needed to connect to the voice server + * @type {object} */ - this.websocket = new VoiceConnectionWebSocket(this, channel.guild.id, token, sessionID, endpoint); + this.authentication = pendingConnection.data; /** - * Whether or not the connection is ready - * @type {boolean} + * Object that wraps contains the `ws` and `udp` sockets of this voice connection + * @type {object} */ - this.ready = false; - - /** - * The resolve function for the promise associated with creating this connection - * @type {function} - * @private - */ - this._resolve = resolve; - - /** - * The reject function for the promise associated with creating this connection - * @type {function} - * @private - */ - this._reject = reject; - - this.ssrcMap = new Map(); - this.queue = []; - this.receivers = []; - this.bindListeners(); + this.sockets = {}; } - /** - * Executed whenever an error occurs with the UDP/WebSocket sub-client. - * @private - * @param {Error} err The encountered error - */ - _onError(err) { - this._reject(err); - /** - * Emitted whenever the connection encounters a fatal error. - * @event VoiceConnection#error - * @param {Error} error The encountered error - */ - this.emit('error', err); - this._shutdown(err); + connect() { + if (this.sockets.ws) { + throw new Error('There is already an existing WebSocket connection!'); + } + if (this.sockets.udp) { + throw new Error('There is already an existing UDP connection!'); + } + this.sockets.ws = new VoiceWebSocket(this); + this.sockets.udp = new VoiceUDP(this); } - /** - * Disconnects the Client from the Voice Channel. - * @param {string} [reason='user requested'] The reason of the disconnection - */ - disconnect(reason = 'user requested') { - this.manager.client.ws.send({ - op: Constants.OPCodes.VOICE_STATE_UPDATE, - d: { - guild_id: this.channel.guild.id, - channel_id: null, - self_mute: false, - self_deaf: false, - }, - }); - this._shutdown(reason); - } - - _onClose(e) { - e = e && e.code === 1000 ? null : e; - return this._shutdown(e); - } - - _shutdown(e) { - if (!this.ready) return; - this.ready = false; - this.websocket._shutdown(); - this.player._shutdown(); - if (this.udp) this.udp._shutdown(); - if (this._vsUpdateListener) this.manager.client.removeListener('voiceStateUpdate', this._vsUpdateListener); - /** - * Emit once the voice connection has disconnected. - * @event VoiceConnection#disconnected - * @param {Error} error The encountered error, if any - */ - this.emit('disconnected', e); - } - - /** - * Binds listeners to the WebSocket and UDP sub-clients. - * @private - */ - bindListeners() { - this.websocket.on('error', err => this._onError(err)); - this.websocket.on('close', err => this._onClose(err)); - this.websocket.on('ready-for-udp', data => { - this.udp = new VoiceConnectionUDPClient(this, data); - this.data = data; - this.udp.on('error', err => this._onError(err)); - this.udp.on('close', err => this._onClose(err)); - }); - this.websocket.on('ready', secretKey => { - this.data.secret = secretKey; - this.ready = true; - /** - * Emitted once the connection is ready (joining voice channels resolves when the connection is ready anyway) - * @event VoiceConnection#ready - */ - this._resolve(this); - this.emit('ready'); - }); - this.once('ready', () => { - setImmediate(() => { - for (const item of this.queue) this.emit(...item); - this.queue = []; - }); - }); - this._vsUpdateListener = (oldM, newM) => { - if (oldM.voiceChannel && oldM.voiceChannel.guild.id === this.channel.guild.id && !newM.voiceChannel) { - const user = newM.user; - for (const receiver of this.receivers) { - const opusStream = receiver.opusStreams.get(user.id); - const pcmStream = receiver.pcmStreams.get(user.id); - if (opusStream) { - opusStream.push(null); - opusStream.open = false; - receiver.opusStreams.delete(user.id); - } - if (pcmStream) { - pcmStream.push(null); - pcmStream.open = false; - receiver.pcmStreams.delete(user.id); - } - } - } - }; - this.manager.client.on(Constants.Events.VOICE_STATE_UPDATE, this._vsUpdateListener); - this.websocket.on('speaking', data => { - const guild = this.channel.guild; - const user = this.manager.client.users.get(data.user_id); - this.ssrcMap.set(+data.ssrc, user); - if (!data.speaking) { - for (const receiver of this.receivers) { - const opusStream = receiver.opusStreams.get(user.id); - const pcmStream = receiver.pcmStreams.get(user.id); - if (opusStream) { - opusStream.push(null); - opusStream.open = false; - receiver.opusStreams.delete(user.id); - } - if (pcmStream) { - pcmStream.push(null); - pcmStream.open = false; - receiver.pcmStreams.delete(user.id); - } - } - } - /** - * Emitted whenever a user starts/stops speaking - * @event VoiceConnection#speaking - * @param {User} user The user that has started/stopped speaking - * @param {boolean} speaking Whether or not the user is speaking - */ - if (this.ready) this.emit('speaking', user, data.speaking); - else this.queue.push(['speaking', user, data.speaking]); - guild._memberSpeakUpdate(data.user_id, data.speaking); - }); - } - - /** - * Options that can be passed to stream-playing methods: - * @typedef {Object} StreamOptions - * @property {number} [seek=0] The time to seek to - * @property {number} [volume=1] The volume to play at - * @property {number} [passes=1] How many times to send the voice packet to reduce packet loss - */ - - /** - * Play the given file in the voice connection. - * @param {string} file The path to the file - * @param {StreamOptions} [options] Options for playing the stream - * @returns {StreamDispatcher} - * @example - * // play files natively - * voiceChannel.join() - * .then(connection => { - * const dispatcher = connection.playFile('C:/Users/Discord/Desktop/music.mp3'); - * }) - * .catch(console.error); - */ - playFile(file, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - return this.player.playFile(file, options); - } - - /** - * Plays and converts an audio stream in the voice connection. - * @param {ReadableStream} stream The audio stream to play - * @param {StreamOptions} [options] Options for playing the stream - * @returns {StreamDispatcher} - * @example - * // play streams using ytdl-core - * const ytdl = require('ytdl-core'); - * const streamOptions = { seek: 0, volume: 1 }; - * voiceChannel.join() - * .then(connection => { - * const stream = ytdl('https://www.youtube.com/watch?v=XAWgeLF9EVQ', {filter : 'audioonly'}); - * const dispatcher = connection.playStream(stream, streamOptions); - * }) - * .catch(console.error); - */ - playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - return this.player.playStream(stream, options); - } - - /** - * Plays a stream of 16-bit signed stereo PCM at 48KHz. - * @param {ReadableStream} stream The audio stream to play. - * @param {StreamOptions} [options] Options for playing the stream - * @returns {StreamDispatcher} - */ - playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { - const options = { seek, volume, passes }; - this.player._shutdown(); - return this.player.playPCMStream(stream, options); - } - - /** - * Creates a VoiceReceiver so you can start listening to voice data. It's recommended to only create one of these. - * @returns {VoiceReceiver} - */ - createReceiver() { - const rcv = new VoiceReceiver(this); - this.receivers.push(rcv); - return rcv; - } } module.exports = VoiceConnection; diff --git a/src/client/voice/VoiceConnectionUDPClient.js b/src/client/voice/VoiceConnectionUDPClient.js index add7b9c4f..118c55a9a 100644 --- a/src/client/voice/VoiceConnectionUDPClient.js +++ b/src/client/voice/VoiceConnectionUDPClient.js @@ -6,78 +6,6 @@ const EventEmitter = require('events').EventEmitter; class VoiceConnectionUDPClient extends EventEmitter { constructor(voiceConnection, data) { super(); - this.voiceConnection = voiceConnection; - this.count = 0; - this.data = data; - this.dnsLookup(); - } - - dnsLookup() { - dns.lookup(this.voiceConnection.endpoint, (err, address) => { - if (err) { - this.emit('error', err); - return; - } - this.connectUDP(address); - }); - } - - send(packet) { - if (this.udpSocket) { - try { - this.udpSocket.send(packet, 0, packet.length, this.data.port, this.udpIP); - } catch (err) { - this.emit('error', err); - } - } - } - - _shutdown() { - if (this.udpSocket) { - try { - this.udpSocket.close(); - } catch (err) { - if (err.message !== 'Not running') this.emit('error', err); - } - this.udpSocket = null; - } - } - - connectUDP(address) { - this.udpIP = address; - this.udpSocket = udp.createSocket('udp4'); - - // finding local IP - // https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery - this.udpSocket.once('message', message => { - const packet = new Buffer(message); - this.localIP = ''; - for (let i = 4; i < packet.indexOf(0, i); i++) this.localIP += String.fromCharCode(packet[i]); - this.localPort = parseInt(packet.readUIntLE(packet.length - 2, 2).toString(10), 10); - - this.voiceConnection.websocket.send({ - op: Constants.VoiceOPCodes.SELECT_PROTOCOL, - d: { - protocol: 'udp', - data: { - address: this.localIP, - port: this.localPort, - mode: 'xsalsa20_poly1305', - }, - }, - }); - }); - - this.udpSocket.on('error', (error, message) => { - this.emit('error', { error, message }); - }); - this.udpSocket.on('close', error => { - this.emit('close', error); - }); - - const blankMessage = new Buffer(70); - blankMessage.writeUIntBE(this.data.ssrc, 0, 4); - this.send(blankMessage); } } diff --git a/src/client/voice/VoiceConnectionWebSocket.js b/src/client/voice/VoiceConnectionWebSocket.js deleted file mode 100644 index fdb0fe0f7..000000000 --- a/src/client/voice/VoiceConnectionWebSocket.js +++ /dev/null @@ -1,113 +0,0 @@ -const WebSocket = require('ws'); -const Constants = require('../../util/Constants'); -const EventEmitter = require('events').EventEmitter; - -class VoiceConnectionWebSocket extends EventEmitter { - constructor(voiceConnection, serverID, token, sessionID, endpoint) { - super(); - this.voiceConnection = voiceConnection; - this.token = token; - this.sessionID = sessionID; - this.serverID = serverID; - this.heartbeat = null; - this.opened = false; - this.endpoint = endpoint; - this.attempts = 6; - this.setupWS(); - } - - setupWS() { - this.attempts--; - this.ws = new WebSocket(`wss://${this.endpoint}`, null, { rejectUnauthorized: false }); - this.ws.onopen = () => this._onOpen(); - this.ws.onmessage = e => this._onMessage(e); - this.ws.onclose = e => this._onClose(e); - this.ws.onerror = e => this._onError(e); - } - - send(data) { - if (this.ws.readyState === WebSocket.OPEN) this.ws.send(JSON.stringify(data)); - } - - _shutdown() { - if (this.ws) this.ws.close(); - this.voiceConnection.manager.client.clearInterval(this.heartbeat); - } - - _onOpen() { - this.opened = true; - this.send({ - op: Constants.OPCodes.DISPATCH, - d: { - server_id: this.serverID, - user_id: this.voiceConnection.manager.client.user.id, - session_id: this.sessionID, - token: this.token, - }, - }); - } - - _onClose(err) { - if (!this.opened && this.attempts >= 0) { - this.setupWS(); - return; - } - this.emit('close', err); - } - - _onError(e) { - if (!this.opened && this.attempts >= 0) { - this.setupWS(); - return; - } - this.emit('error', e); - } - - _setHeartbeat(interval) { - this.heartbeat = this.voiceConnection.manager.client.setInterval(() => { - this.send({ - op: Constants.VoiceOPCodes.HEARTBEAT, - d: null, - }); - }, interval); - this.send({ - op: Constants.VoiceOPCodes.HEARTBEAT, - d: null, - }); - } - - _onMessage(event) { - let packet; - try { - packet = JSON.parse(event.data); - } catch (error) { - this._onError(error); - return; - } - - switch (packet.op) { - case Constants.VoiceOPCodes.READY: - this._setHeartbeat(packet.d.heartbeat_interval); - this.emit('ready-for-udp', packet.d); - break; - case Constants.VoiceOPCodes.SESSION_DESCRIPTION: - this.encryptionMode = packet.d.mode; - this.secretKey = new Uint8Array(new ArrayBuffer(packet.d.secret_key.length)); - for (const index in packet.d.secret_key) this.secretKey[index] = packet.d.secret_key[index]; - this.emit('ready', this.secretKey); - break; - case Constants.VoiceOPCodes.SPEAKING: - /* - { op: 5, - d: { user_id: '123123', ssrc: 1, speaking: true } } - */ - this.emit('speaking', packet.d); - break; - default: - this.emit('unknown', packet); - break; - } - } -} - -module.exports = VoiceConnectionWebSocket; diff --git a/src/client/voice/VoiceWebSocket.js b/src/client/voice/VoiceWebSocket.js new file mode 100644 index 000000000..2af84a1b8 --- /dev/null +++ b/src/client/voice/VoiceWebSocket.js @@ -0,0 +1,169 @@ +const WebSocket = require('ws'); +const Constants = require('../../util/Constants'); +const EventEmitter = require('events').EventEmitter; + +/** + * Represents a Voice Connection's WebSocket + * @extends {EventEmitter} + * @private + */ +class VoiceWebSocket extends EventEmitter { + constructor(voiceConnection) { + super(); + /** + * The Voice Connection that this WebSocket serves + * @type {VoiceConnection} + */ + this.voiceConnection = voiceConnection; + } + + /** + * The client of this voice websocket + * @type {Client} + * @readonly + */ + get client() { + return this.voiceConnection.voiceManager.client; + } + + /** + * Starts connecting to the Voice WebSocket Server. + */ + connect() { + if (this.ws) { + throw new Error('there is already an existing websocket'); + } + /** + * The actual WebSocket used to connect to the Voice WebSocket Server. + * @type {WebSocket} + */ + this.ws = new WebSocket(`wss://${this.voiceConnection.authentication.endpoint}`); + this.ws.onopen = this.onOpen.bind(this); + this.ws.onmessage = this.onMessage.bind(this); + this.ws.onclose = this.onClose.bind(this); + this.ws.onerror = this.onError.bind(this); + } + + /** + * Sends data to the WebSocket if it is open. + * @param {string} data the data to send to the WebSocket + * @returns {Promise} + */ + send(data) { + return new Promise((resolve, reject) => { + if (this.ws.readyState === WebSocket.OPEN) { + this.ws.send(data, null, error => { + if (error) { + reject(error); + } else { + resolve(data); + } + }); + } else { + reject(new Error('websocket not open')); + } + }); + } + + /** + * JSON.stringify's a packet and then sends it to the WebSocket Server. + * @param {Object} packet the packet to send + * @returns {Promise} + */ + sendPacket(packet) { + try { + packet = JSON.stringify(packet); + } catch (error) { + return Promise.reject(error); + } + return this.send(packet); + } + + /** + * Called whenever the WebSocket opens + */ + onOpen() { + this.sendPacket({ + op: Constants.OPCodes.DISPATCH, + d: { + server_id: this.voiceConnection.channel.guild.id, + user_id: this.client.user.id, + token: this.voiceConnection.authentication.token, + session_id: this.voiceConnection.authentication.session_id, + }, + }).catch(() => { + this.emit('error', new Error('tried to send join packet but WebSocket not open')); + }); + } + + /** + * Called whenever a message is received from the WebSocket + * @param {MessageEvent} event the message event that was received + * @returns {void} + */ + onMessage(event) { + try { + return this.onPacket(JSON.stringify(event.data)); + } catch (error) { + return this.onError(error); + } + } + + /** + * Called whenever a valid packet is received from the WebSocket + * @param {Object} packet the received packet + */ + onPacket(packet) { + switch (packet.op) { + case Constants.VoiceOPCodes.READY: + this.setHeartbeat(packet.d.heartbeat_interval); + break; + } + } + + /** + * Sets an interval at which to send a heartbeat packet to the WebSocket + * @param {number} interval the interval at which to send a heartbeat packet + */ + setHeartbeat(interval) { + if (!interval || isNaN(interval)) { + this.onError(new Error('tried to set voice heartbeat but no valid interval was specified')); + return; + } + if (this.heartbeatInterval) { + /** + * Emitted whenver the voice websocket encounters a non-fatal error + * @param {string} warn the warning + * @event VoiceWebSocket#warn + */ + this.emit('warn', 'a voice heartbeat interval is being overwritten'); + clearInterval(this.heartbeatInterval); + } + this.heartbeatInterval = this.client.setInterval(this.sendHeartbeat.bind(this), interval); + } + + /** + * Clears a heartbeat interval, if one exists + */ + clearHeartbeat() { + if (!this.heartbeatInterval) { + this.emit('warn', 'tried to clear a heartbeat interval that does not exist'); + return; + } + clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } + + /** + * Sends a heartbeat packet + */ + sendHeartbeat() { + this.sendPacket({ op: Constants.VoiceOPCodes.HEARTBEAT }) + .catch(() => { + this.emit('warn', 'tried to send heartbeat, but connection is not open'); + this.clearHeartbeat(); + }); + } +} + +module.exports = VoiceWebSocket;