Clean up a bunch of new voice stuff

This commit is contained in:
Schuyler Cebulskie
2016-10-25 20:26:57 -04:00
parent a04094f0ff
commit d1e9d15a1c
9 changed files with 57 additions and 76 deletions

View File

@@ -11,22 +11,26 @@ const EventEmitter = require('events').EventEmitter;
class PendingVoiceConnection extends EventEmitter {
constructor(voiceManager, channel) {
super();
/**
* The ClientVoiceManager that instantiated this pending connection
* @type {ClientVoiceManager}
*/
this.voiceManager = voiceManager;
/**
* The channel that this pending voice connection will attempt to join
* @type {VoiceChannel}
*/
this.channel = channel;
/**
* The timeout that will be invoked after 15 seconds signifying a failure to connect
* @type {Timeout}
*/
this.deathTimer = this.voiceManager.client.setTimeout(
() => this.fail(new Error('Automatic failure after 15 seconds')), 15000);
() => this.fail(new Error('Connection not established within 15 seconds.')), 15000);
/**
* An object containing data required to connect to the voice servers with
* @type {object}
@@ -53,26 +57,26 @@ class PendingVoiceConnection extends EventEmitter {
*/
setTokenAndEndpoint(token, endpoint) {
if (!token) {
this.fail(new Error('Token not provided from voice server packet'));
this.fail(new Error('Token not provided from voice server packet.'));
return;
}
if (!endpoint) {
this.fail(new Error('Endpoint not provided from voice server packet'));
this.fail(new Error('Endpoint not provided from voice server packet.'));
return;
}
if (this.data.token) {
this.fail(new Error('There is already a registered token for this connection'));
this.fail(new Error('There is already a registered token for this connection.'));
return;
}
if (this.data.endpoint) {
this.fail(new Error('There is already a registered endpoint for this connection'));
this.fail(new Error('There is already a registered endpoint for this connection.'));
return;
}
endpoint = endpoint.match(/([^:]*)/)[0];
if (!endpoint) {
this.fail(new Error('failed to find an endpoint'));
this.fail(new Error('Failed to find an endpoint.'));
return;
}
@@ -88,11 +92,11 @@ class PendingVoiceConnection extends EventEmitter {
*/
setSessionID(sessionID) {
if (!sessionID) {
this.fail(new Error('Session ID not supplied'));
this.fail(new Error('Session ID not supplied.'));
return;
}
if (this.data.session_id) {
this.fail(new Error('There is already a registered session ID for this connection'));
this.fail(new Error('There is already a registered session ID for this connection.'));
return;
}
this.data.session_id = sessionID;
@@ -161,15 +165,11 @@ class ClientVoiceManager {
}
onVoiceServer(data) {
if (this.pending.has(data.guild_id)) {
this.pending.get(data.guild_id).setTokenAndEndpoint(data.token, data.endpoint);
}
if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setTokenAndEndpoint(data.token, data.endpoint);
}
onVoiceStateUpdate(data) {
if (this.pending.has(data.guild_id)) {
this.pending.get(data.guild_id).setSessionID(data.session_id);
}
if (this.pending.has(data.guild_id)) this.pending.get(data.guild_id).setSessionID(data.session_id);
}
/**
@@ -178,15 +178,13 @@ class ClientVoiceManager {
* @param {Object} [options] The options to provide
*/
sendVoiceStateUpdate(channel, options = {}) {
if (!this.client.user) {
throw new Error('You cannot join because there is no client user');
}
if (!this.client.user) throw new Error('Unable to join because there is no client user.');
if (channel.permissionsFor) {
const permissions = channel.permissionsFor(this.client.user);
if (permissions) {
if (!permissions.hasPermission('CONNECT')) {
throw new Error('You do not have permission to connect to this voice channel');
throw new Error('You do not have permission to connect to this voice channel.');
}
} else {
throw new Error('There is no permission set for the client user in this channel - are they part of the guild?');
@@ -215,10 +213,7 @@ class ClientVoiceManager {
*/
joinChannel(channel) {
return new Promise((resolve, reject) => {
// if already connecting to this voice server, error
if (this.pending.get(channel.guild.id)) {
throw new Error(`Already connecting to this guild's voice server.`);
}
if (this.pending.get(channel.guild.id)) throw new Error('Already connecting to this guild\'s voice server.');
const existingConnection = this.connections.get(channel.guild.id);
if (existingConnection) {

View File

@@ -129,12 +129,8 @@ class VoiceConnection extends EventEmitter {
* @private
*/
connect() {
if (this.sockets.ws) {
throw new Error('There is already an existing WebSocket connection!');
}
if (this.sockets.udp) {
throw new Error('There is already an existing UDP connection!');
}
if (this.sockets.ws) throw new Error('There is already an existing WebSocket connection.');
if (this.sockets.udp) throw new Error('There is already an existing UDP connection.');
this.sockets.ws = new VoiceWebSocket(this);
this.sockets.udp = new VoiceUDP(this);
this.sockets.ws.on('error', e => this.emit('error', e));
@@ -260,7 +256,6 @@ class VoiceConnection extends EventEmitter {
this.receivers.push(receiver);
return receiver;
}
}
module.exports = VoiceConnection;

View File

@@ -7,9 +7,7 @@ function parseLocalPacket(message) {
try {
const packet = new Buffer(message);
let address = '';
for (let i = 4; i < packet.indexOf(0, i); i++) {
address += String.fromCharCode(packet[i]);
}
for (let i = 4; i < packet.indexOf(0, i); i++) address += String.fromCharCode(packet[i]);
const port = parseInt(packet.readUIntLE(packet.length - 2, 2).toString(10), 10);
return { address, port };
} catch (error) {
@@ -24,33 +22,40 @@ function parseLocalPacket(message) {
class VoiceConnectionUDPClient extends EventEmitter {
constructor(voiceConnection) {
super();
/**
* The voice connection that this UDP client serves
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
/**
* The UDP socket
* @type {?Socket}
*/
this.socket = null;
/**
* The address of the discord voice server
* @type {?string}
*/
this.discordAddress = null;
/**
* The local IP address
* @type {?string}
*/
this.localAddress = null;
/**
* The local port
* @type {?string}
*/
this.localPort = null;
this.voiceConnection.on('closing', this.shutdown.bind(this));
}
shutdown() {
if (this.socket) {
try {
@@ -61,6 +66,7 @@ class VoiceConnectionUDPClient extends EventEmitter {
this.socket = null;
}
}
/**
* The port of the discord voice server
* @type {number}
@@ -69,6 +75,7 @@ class VoiceConnectionUDPClient extends EventEmitter {
get discordPort() {
return this.voiceConnection.authentication.port;
}
/**
* Tries to resolve the voice server endpoint to an address
* @returns {Promise<string>}
@@ -93,22 +100,11 @@ class VoiceConnectionUDPClient extends EventEmitter {
*/
send(packet) {
return new Promise((resolve, reject) => {
if (this.socket) {
if (!this.discordAddress || !this.discordPort) {
reject(new Error('malformed UDP address or port'));
return;
}
// console.log('sendin', packet);
this.socket.send(packet, 0, packet.length, this.discordPort, this.discordAddress, error => {
if (error) {
reject(error);
} else {
resolve(packet);
}
});
} else {
reject(new Error('tried to send a UDP packet but there is no socket available'));
}
if (!this.socket) throw new Error('Tried to send a UDP packet, but there is no socket available.');
if (!this.discordAddress || !this.discordPort) throw new Error('Malformed UDP address or port.');
this.socket.send(packet, 0, packet.length, this.discordPort, this.discordAddress, error => {
if (error) reject(error); else resolve(packet);
});
});
}

View File

@@ -11,20 +11,21 @@ const EventEmitter = require('events').EventEmitter;
class VoiceWebSocket extends EventEmitter {
constructor(voiceConnection) {
super();
/**
* The Voice Connection that this WebSocket serves
* @type {VoiceConnection}
*/
this.voiceConnection = voiceConnection;
/**
* How many connection attempts have been made
* @type {number}
*/
this.attempts = 0;
this.connect();
this.dead = false;
this.voiceConnection.on('closing', this.shutdown.bind(this));
}
@@ -47,9 +48,7 @@ class VoiceWebSocket extends EventEmitter {
*/
reset() {
if (this.ws) {
if (this.ws.readyState !== WebSocket.CLOSED) {
this.ws.close();
}
if (this.ws.readyState !== WebSocket.CLOSED) this.ws.close();
this.ws = null;
}
this.clearHeartbeat();
@@ -59,17 +58,15 @@ class VoiceWebSocket extends EventEmitter {
* Starts connecting to the Voice WebSocket Server.
*/
connect() {
if (this.dead) {
return;
}
if (this.ws) {
this.reset();
}
if (this.dead) return;
if (this.ws) this.reset();
if (this.attempts > 5) {
this.emit('error', new Error(`too many connection attempts (${this.attempts})`));
return;
}
this.attempts++;
/**
* The actual WebSocket used to connect to the Voice WebSocket Server.
* @type {WebSocket}
@@ -97,7 +94,7 @@ class VoiceWebSocket extends EventEmitter {
}
});
} else {
reject(new Error(`voice websocket not open to send ${data}`));
reject(new Error(`Voice websocket not open to send ${data}.`));
}
});
}
@@ -150,7 +147,7 @@ class VoiceWebSocket extends EventEmitter {
* Called whenever the connection to the WebSocket Server is lost
*/
onClose() {
// #todo see if the connection is open before reconnecting
// TODO see if the connection is open before reconnecting
if (!this.dead) this.client.setTimeout(this.connect.bind(this), this.attempts * 1000);
}

View File

@@ -3,7 +3,6 @@ const ChildProcess = require('child_process');
const EventEmitter = require('events').EventEmitter;
class PCMConversionProcess extends EventEmitter {
constructor(process) {
super();
this.process = process;

View File

@@ -4,7 +4,6 @@ const EventEmitter = require('events').EventEmitter;
const StreamDispatcher = require('../dispatcher/StreamDispatcher');
class AudioPlayer extends EventEmitter {
constructor(voiceConnection) {
super();
this.voiceConnection = voiceConnection;
@@ -20,13 +19,13 @@ class AudioPlayer extends EventEmitter {
timestamp: 0,
pausedTime: 0,
};
this.voiceConnection.on('closing', () => this.cleanup(null, 'voice connection is closing'));
this.voiceConnection.on('closing', () => this.cleanup(null, 'voice connection closing'));
}
playUnknownStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
stream.on('end', () => {
this.emit('debug', 'input stream to converter has ended');
this.emit('debug', 'Input stream to converter has ended');
});
stream.on('error', e => this.emit('error', e));
const conversionProcess = this.audioToPCM.createConvertStream(options.seek);
@@ -37,7 +36,7 @@ class AudioPlayer extends EventEmitter {
cleanup(checkStream, reason) {
// cleanup is a lot less aggressive than v9 because it doesn't try to kill every single stream it is aware of
this.emit('debug', `clean up triggered due to ${reason}`);
this.emit('debug', `Clean up triggered due to ${reason}`);
const filter = checkStream && this.dispatcher && this.dispatcher.stream === checkStream;
if (this.currentConverter && (checkStream ? filter : true)) {
this.currentConverter.destroy();
@@ -47,7 +46,7 @@ class AudioPlayer extends EventEmitter {
playPCMStream(stream, converter, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
stream.on('end', () => this.emit('debug', 'pcm input stream ended'));
stream.on('end', () => this.emit('debug', 'PCM input stream ended'));
this.cleanup(null, 'outstanding play stream');
this.currentConverter = converter;
if (this.dispatcher) {
@@ -56,10 +55,10 @@ class AudioPlayer extends EventEmitter {
stream.on('error', e => this.emit('error', e));
const dispatcher = new StreamDispatcher(this, stream, this.streamingData, options);
dispatcher.on('error', e => this.emit('error', e));
dispatcher.on('end', () => this.cleanup(dispatcher.stream, 'disp ended'));
dispatcher.on('end', () => this.cleanup(dispatcher.stream, 'dispatcher ended'));
dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value));
this.dispatcher = dispatcher;
dispatcher.on('debug', m => this.emit('debug', `stream dispatch - ${m}`));
dispatcher.on('debug', m => this.emit('debug', `Stream dispatch - ${m}`));
return dispatcher;
}

View File

@@ -101,8 +101,7 @@ class VoiceConnectionPlayer extends EventEmitter {
speaking: true,
delay: 0,
},
})
.catch(e => {
}).catch(e => {
this.emit('debug', e);
});
}

View File

@@ -25,17 +25,20 @@ class VoiceReceiver extends EventEmitter {
this.queues = new Map();
this.pcmStreams = new Map();
this.opusStreams = new Map();
/**
* Whether or not this receiver has been destroyed.
* @type {boolean}
*/
this.destroyed = false;
/**
* The VoiceConnection that instantiated this
* @type {VoiceConnection}
*/
this.voiceConnection = connection;
this._listener = (msg => {
this._listener = msg => {
const ssrc = +msg.readUInt32BE(8).toString(10);
const user = this.voiceConnection.ssrcMap.get(ssrc);
if (!user) {
@@ -50,7 +53,7 @@ class VoiceReceiver extends EventEmitter {
}
this.handlePacket(msg, user);
}
}).bind(this);
};
this.voiceConnection.sockets.udp.socket.on('message', this._listener);
}

View File

@@ -8,9 +8,7 @@ class SecretKey {
* @type {Uint8Array}
*/
this.key = new Uint8Array(new ArrayBuffer(key.length));
for (const index in key) {
this.key[index] = key[index];
}
for (const index in key) this.key[index] = key[index];
}
}