start rewriting voice websocket

This commit is contained in:
Amish Shah
2016-10-02 14:59:05 +01:00
parent c286c1443f
commit e201e9080f
5 changed files with 194 additions and 433 deletions

View File

@@ -258,7 +258,8 @@ class ClientVoiceManager {
});
pendingConnection.on('pass', voiceConnection => {
// do stuff
this.pending.delete(channel.guild.id);
this.connections.set(channel.guild.id, voiceConnection);
});
});
}

View File

@@ -1,5 +1,5 @@
const VoiceConnectionWebSocket = require('./VoiceConnectionWebSocket');
const VoiceConnectionUDPClient = require('./VoiceConnectionUDPClient');
const VoiceWebSocket = require('./VoiceConnectionWebSocket');
const VoiceUDP = require('./VoiceConnectionUDPClient');
const VoiceReceiver = require('./receiver/VoiceReceiver');
const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
@@ -16,269 +16,45 @@ const DefaultPlayer = require('./player/DefaultPlayer');
* @extends {EventEmitter}
*/
class VoiceConnection extends EventEmitter {
constructor(manager, channel, token, sessionID, endpoint, resolve, reject) {
constructor(pendingConnection) {
super();
/**
* The voice manager of this connection
* The Voice Manager that instantiated this connection
* @type {ClientVoiceManager}
* @private
*/
this.manager = manager;
this.voiceManager = pendingConnection.voiceManager;
/**
* The player
* @type {BasePlayer}
*/
this.player = new DefaultPlayer(this);
/**
* The endpoint of the connection
* @type {string}
*/
this.endpoint = endpoint;
/**
* The VoiceChannel for this connection
* The voice channel this connection is currently serving
* @type {VoiceChannel}
*/
this.channel = channel;
this.channel = pendingConnection.channel;
/**
* The WebSocket connection for this voice connection
* @type {VoiceConnectionWebSocket}
* @private
* The authentication data needed to connect to the voice server
* @type {object}
*/
this.websocket = new VoiceConnectionWebSocket(this, channel.guild.id, token, sessionID, endpoint);
this.authentication = pendingConnection.data;
/**
* Whether or not the connection is ready
* @type {boolean}
* Object that wraps contains the `ws` and `udp` sockets of this voice connection
* @type {object}
*/
this.ready = false;
/**
* The resolve function for the promise associated with creating this connection
* @type {function}
* @private
*/
this._resolve = resolve;
/**
* The reject function for the promise associated with creating this connection
* @type {function}
* @private
*/
this._reject = reject;
this.ssrcMap = new Map();
this.queue = [];
this.receivers = [];
this.bindListeners();
this.sockets = {};
}
/**
* Executed whenever an error occurs with the UDP/WebSocket sub-client.
* @private
* @param {Error} err The encountered error
*/
_onError(err) {
this._reject(err);
/**
* Emitted whenever the connection encounters a fatal error.
* @event VoiceConnection#error
* @param {Error} error The encountered error
*/
this.emit('error', err);
this._shutdown(err);
connect() {
if (this.sockets.ws) {
throw new Error('There is already an existing WebSocket connection!');
}
if (this.sockets.udp) {
throw new Error('There is already an existing UDP connection!');
}
this.sockets.ws = new VoiceWebSocket(this);
this.sockets.udp = new VoiceUDP(this);
}
/**
* Disconnects the Client from the Voice Channel.
* @param {string} [reason='user requested'] The reason of the disconnection
*/
disconnect(reason = 'user requested') {
this.manager.client.ws.send({
op: Constants.OPCodes.VOICE_STATE_UPDATE,
d: {
guild_id: this.channel.guild.id,
channel_id: null,
self_mute: false,
self_deaf: false,
},
});
this._shutdown(reason);
}
_onClose(e) {
e = e && e.code === 1000 ? null : e;
return this._shutdown(e);
}
_shutdown(e) {
if (!this.ready) return;
this.ready = false;
this.websocket._shutdown();
this.player._shutdown();
if (this.udp) this.udp._shutdown();
if (this._vsUpdateListener) this.manager.client.removeListener('voiceStateUpdate', this._vsUpdateListener);
/**
* Emit once the voice connection has disconnected.
* @event VoiceConnection#disconnected
* @param {Error} error The encountered error, if any
*/
this.emit('disconnected', e);
}
/**
* Binds listeners to the WebSocket and UDP sub-clients.
* @private
*/
bindListeners() {
this.websocket.on('error', err => this._onError(err));
this.websocket.on('close', err => this._onClose(err));
this.websocket.on('ready-for-udp', data => {
this.udp = new VoiceConnectionUDPClient(this, data);
this.data = data;
this.udp.on('error', err => this._onError(err));
this.udp.on('close', err => this._onClose(err));
});
this.websocket.on('ready', secretKey => {
this.data.secret = secretKey;
this.ready = true;
/**
* Emitted once the connection is ready (joining voice channels resolves when the connection is ready anyway)
* @event VoiceConnection#ready
*/
this._resolve(this);
this.emit('ready');
});
this.once('ready', () => {
setImmediate(() => {
for (const item of this.queue) this.emit(...item);
this.queue = [];
});
});
this._vsUpdateListener = (oldM, newM) => {
if (oldM.voiceChannel && oldM.voiceChannel.guild.id === this.channel.guild.id && !newM.voiceChannel) {
const user = newM.user;
for (const receiver of this.receivers) {
const opusStream = receiver.opusStreams.get(user.id);
const pcmStream = receiver.pcmStreams.get(user.id);
if (opusStream) {
opusStream.push(null);
opusStream.open = false;
receiver.opusStreams.delete(user.id);
}
if (pcmStream) {
pcmStream.push(null);
pcmStream.open = false;
receiver.pcmStreams.delete(user.id);
}
}
}
};
this.manager.client.on(Constants.Events.VOICE_STATE_UPDATE, this._vsUpdateListener);
this.websocket.on('speaking', data => {
const guild = this.channel.guild;
const user = this.manager.client.users.get(data.user_id);
this.ssrcMap.set(+data.ssrc, user);
if (!data.speaking) {
for (const receiver of this.receivers) {
const opusStream = receiver.opusStreams.get(user.id);
const pcmStream = receiver.pcmStreams.get(user.id);
if (opusStream) {
opusStream.push(null);
opusStream.open = false;
receiver.opusStreams.delete(user.id);
}
if (pcmStream) {
pcmStream.push(null);
pcmStream.open = false;
receiver.pcmStreams.delete(user.id);
}
}
}
/**
* Emitted whenever a user starts/stops speaking
* @event VoiceConnection#speaking
* @param {User} user The user that has started/stopped speaking
* @param {boolean} speaking Whether or not the user is speaking
*/
if (this.ready) this.emit('speaking', user, data.speaking);
else this.queue.push(['speaking', user, data.speaking]);
guild._memberSpeakUpdate(data.user_id, data.speaking);
});
}
/**
* Options that can be passed to stream-playing methods:
* @typedef {Object} StreamOptions
* @property {number} [seek=0] The time to seek to
* @property {number} [volume=1] The volume to play at
* @property {number} [passes=1] How many times to send the voice packet to reduce packet loss
*/
/**
* Play the given file in the voice connection.
* @param {string} file The path to the file
* @param {StreamOptions} [options] Options for playing the stream
* @returns {StreamDispatcher}
* @example
* // play files natively
* voiceChannel.join()
* .then(connection => {
* const dispatcher = connection.playFile('C:/Users/Discord/Desktop/music.mp3');
* })
* .catch(console.error);
*/
playFile(file, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
return this.player.playFile(file, options);
}
/**
* Plays and converts an audio stream in the voice connection.
* @param {ReadableStream} stream The audio stream to play
* @param {StreamOptions} [options] Options for playing the stream
* @returns {StreamDispatcher}
* @example
* // play streams using ytdl-core
* const ytdl = require('ytdl-core');
* const streamOptions = { seek: 0, volume: 1 };
* voiceChannel.join()
* .then(connection => {
* const stream = ytdl('https://www.youtube.com/watch?v=XAWgeLF9EVQ', {filter : 'audioonly'});
* const dispatcher = connection.playStream(stream, streamOptions);
* })
* .catch(console.error);
*/
playStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
return this.player.playStream(stream, options);
}
/**
* Plays a stream of 16-bit signed stereo PCM at 48KHz.
* @param {ReadableStream} stream The audio stream to play.
* @param {StreamOptions} [options] Options for playing the stream
* @returns {StreamDispatcher}
*/
playConvertedStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
this.player._shutdown();
return this.player.playPCMStream(stream, options);
}
/**
* Creates a VoiceReceiver so you can start listening to voice data. It's recommended to only create one of these.
* @returns {VoiceReceiver}
*/
createReceiver() {
const rcv = new VoiceReceiver(this);
this.receivers.push(rcv);
return rcv;
}
}
module.exports = VoiceConnection;

View File

@@ -6,78 +6,6 @@ const EventEmitter = require('events').EventEmitter;
class VoiceConnectionUDPClient extends EventEmitter {
constructor(voiceConnection, data) {
super();
this.voiceConnection = voiceConnection;
this.count = 0;
this.data = data;
this.dnsLookup();
}
dnsLookup() {
dns.lookup(this.voiceConnection.endpoint, (err, address) => {
if (err) {
this.emit('error', err);
return;
}
this.connectUDP(address);
});
}
send(packet) {
if (this.udpSocket) {
try {
this.udpSocket.send(packet, 0, packet.length, this.data.port, this.udpIP);
} catch (err) {
this.emit('error', err);
}
}
}
_shutdown() {
if (this.udpSocket) {
try {
this.udpSocket.close();
} catch (err) {
if (err.message !== 'Not running') this.emit('error', err);
}
this.udpSocket = null;
}
}
connectUDP(address) {
this.udpIP = address;
this.udpSocket = udp.createSocket('udp4');
// finding local IP
// https://discordapp.com/developers/docs/topics/voice-connections#ip-discovery
this.udpSocket.once('message', message => {
const packet = new Buffer(message);
this.localIP = '';
for (let i = 4; i < packet.indexOf(0, i); i++) this.localIP += String.fromCharCode(packet[i]);
this.localPort = parseInt(packet.readUIntLE(packet.length - 2, 2).toString(10), 10);
this.voiceConnection.websocket.send({
op: Constants.VoiceOPCodes.SELECT_PROTOCOL,
d: {
protocol: 'udp',
data: {
address: this.localIP,
port: this.localPort,
mode: 'xsalsa20_poly1305',
},
},
});
});
this.udpSocket.on('error', (error, message) => {
this.emit('error', { error, message });
});
this.udpSocket.on('close', error => {
this.emit('close', error);
});
const blankMessage = new Buffer(70);
blankMessage.writeUIntBE(this.data.ssrc, 0, 4);
this.send(blankMessage);
}
}

View File

@@ -1,113 +0,0 @@
const WebSocket = require('ws');
const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
class VoiceConnectionWebSocket extends EventEmitter {
constructor(voiceConnection, serverID, token, sessionID, endpoint) {
super();
this.voiceConnection = voiceConnection;
this.token = token;
this.sessionID = sessionID;
this.serverID = serverID;
this.heartbeat = null;
this.opened = false;
this.endpoint = endpoint;
this.attempts = 6;
this.setupWS();
}
setupWS() {
this.attempts--;
this.ws = new WebSocket(`wss://${this.endpoint}`, null, { rejectUnauthorized: false });
this.ws.onopen = () => this._onOpen();
this.ws.onmessage = e => this._onMessage(e);
this.ws.onclose = e => this._onClose(e);
this.ws.onerror = e => this._onError(e);
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) this.ws.send(JSON.stringify(data));
}
_shutdown() {
if (this.ws) this.ws.close();
this.voiceConnection.manager.client.clearInterval(this.heartbeat);
}
_onOpen() {
this.opened = true;
this.send({
op: Constants.OPCodes.DISPATCH,
d: {
server_id: this.serverID,
user_id: this.voiceConnection.manager.client.user.id,
session_id: this.sessionID,
token: this.token,
},
});
}
_onClose(err) {
if (!this.opened && this.attempts >= 0) {
this.setupWS();
return;
}
this.emit('close', err);
}
_onError(e) {
if (!this.opened && this.attempts >= 0) {
this.setupWS();
return;
}
this.emit('error', e);
}
_setHeartbeat(interval) {
this.heartbeat = this.voiceConnection.manager.client.setInterval(() => {
this.send({
op: Constants.VoiceOPCodes.HEARTBEAT,
d: null,
});
}, interval);
this.send({
op: Constants.VoiceOPCodes.HEARTBEAT,
d: null,
});
}
_onMessage(event) {
let packet;
try {
packet = JSON.parse(event.data);
} catch (error) {
this._onError(error);
return;
}
switch (packet.op) {
case Constants.VoiceOPCodes.READY:
this._setHeartbeat(packet.d.heartbeat_interval);
this.emit('ready-for-udp', packet.d);
break;
case Constants.VoiceOPCodes.SESSION_DESCRIPTION:
this.encryptionMode = packet.d.mode;
this.secretKey = new Uint8Array(new ArrayBuffer(packet.d.secret_key.length));
for (const index in packet.d.secret_key) this.secretKey[index] = packet.d.secret_key[index];
this.emit('ready', this.secretKey);
break;
case Constants.VoiceOPCodes.SPEAKING:
/*
{ op: 5,
d: { user_id: '123123', ssrc: 1, speaking: true } }
*/
this.emit('speaking', packet.d);
break;
default:
this.emit('unknown', packet);
break;
}
}
}
module.exports = VoiceConnectionWebSocket;

View File

@@ -0,0 +1,169 @@
const WebSocket = require('ws');
const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
/**
* Represents a Voice Connection's WebSocket
* @extends {EventEmitter}
* @private
*/
class VoiceWebSocket extends EventEmitter {
constructor(voiceConnection) {
super();
/**
* The Voice Connection that this WebSocket serves
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
}
/**
* The client of this voice websocket
* @type {Client}
* @readonly
*/
get client() {
return this.voiceConnection.voiceManager.client;
}
/**
* Starts connecting to the Voice WebSocket Server.
*/
connect() {
if (this.ws) {
throw new Error('there is already an existing websocket');
}
/**
* The actual WebSocket used to connect to the Voice WebSocket Server.
* @type {WebSocket}
*/
this.ws = new WebSocket(`wss://${this.voiceConnection.authentication.endpoint}`);
this.ws.onopen = this.onOpen.bind(this);
this.ws.onmessage = this.onMessage.bind(this);
this.ws.onclose = this.onClose.bind(this);
this.ws.onerror = this.onError.bind(this);
}
/**
* Sends data to the WebSocket if it is open.
* @param {string} data the data to send to the WebSocket
* @returns {Promise<string>}
*/
send(data) {
return new Promise((resolve, reject) => {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(data, null, error => {
if (error) {
reject(error);
} else {
resolve(data);
}
});
} else {
reject(new Error('websocket not open'));
}
});
}
/**
* JSON.stringify's a packet and then sends it to the WebSocket Server.
* @param {Object} packet the packet to send
* @returns {Promise<string>}
*/
sendPacket(packet) {
try {
packet = JSON.stringify(packet);
} catch (error) {
return Promise.reject(error);
}
return this.send(packet);
}
/**
* Called whenever the WebSocket opens
*/
onOpen() {
this.sendPacket({
op: Constants.OPCodes.DISPATCH,
d: {
server_id: this.voiceConnection.channel.guild.id,
user_id: this.client.user.id,
token: this.voiceConnection.authentication.token,
session_id: this.voiceConnection.authentication.session_id,
},
}).catch(() => {
this.emit('error', new Error('tried to send join packet but WebSocket not open'));
});
}
/**
* Called whenever a message is received from the WebSocket
* @param {MessageEvent} event the message event that was received
* @returns {void}
*/
onMessage(event) {
try {
return this.onPacket(JSON.stringify(event.data));
} catch (error) {
return this.onError(error);
}
}
/**
* Called whenever a valid packet is received from the WebSocket
* @param {Object} packet the received packet
*/
onPacket(packet) {
switch (packet.op) {
case Constants.VoiceOPCodes.READY:
this.setHeartbeat(packet.d.heartbeat_interval);
break;
}
}
/**
* Sets an interval at which to send a heartbeat packet to the WebSocket
* @param {number} interval the interval at which to send a heartbeat packet
*/
setHeartbeat(interval) {
if (!interval || isNaN(interval)) {
this.onError(new Error('tried to set voice heartbeat but no valid interval was specified'));
return;
}
if (this.heartbeatInterval) {
/**
* Emitted whenver the voice websocket encounters a non-fatal error
* @param {string} warn the warning
* @event VoiceWebSocket#warn
*/
this.emit('warn', 'a voice heartbeat interval is being overwritten');
clearInterval(this.heartbeatInterval);
}
this.heartbeatInterval = this.client.setInterval(this.sendHeartbeat.bind(this), interval);
}
/**
* Clears a heartbeat interval, if one exists
*/
clearHeartbeat() {
if (!this.heartbeatInterval) {
this.emit('warn', 'tried to clear a heartbeat interval that does not exist');
return;
}
clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
}
/**
* Sends a heartbeat packet
*/
sendHeartbeat() {
this.sendPacket({ op: Constants.VoiceOPCodes.HEARTBEAT })
.catch(() => {
this.emit('warn', 'tried to send heartbeat, but connection is not open');
this.clearHeartbeat();
});
}
}
module.exports = VoiceWebSocket;