Cleanup Part 2: Electric Boogaloo (Reloaded) (#594)

* Cleanup Part 2: Electric Boogaloo (Reloaded)

* Moar cleanup

* Tweak NOT_A_PERMISSION error
This commit is contained in:
Schuyler Cebulskie
2016-09-04 05:08:09 -04:00
committed by Amish Shah
parent 5a9c42061f
commit 0b908f5bce
95 changed files with 946 additions and 1526 deletions

View File

@@ -33,9 +33,7 @@ class ClientVoiceManager {
*/
_checkPendingReady(guildID) {
const pendingRequest = this.pending.get(guildID);
if (!pendingRequest) {
throw new Error('Guild not pending');
}
if (!pendingRequest) throw new Error('Guild not pending');
if (pendingRequest.token && pendingRequest.sessionID && pendingRequest.endpoint) {
const { channel, token, sessionID, endpoint, resolve, reject } = pendingRequest;
const voiceConnection = new VoiceConnection(this, channel, token, sessionID, endpoint, resolve, reject);
@@ -49,15 +47,13 @@ class ClientVoiceManager {
/**
* Called when the Client receives information about this voice server update.
* @param {string} guildID the ID of the Guild
* @param {string} token the token to authorise with
* @param {string} endpoint the endpoint to connect to
* @param {string} guildID The ID of the Guild
* @param {string} token The token to authorise with
* @param {string} endpoint The endpoint to connect to
*/
_receivedVoiceServer(guildID, token, endpoint) {
const pendingRequest = this.pending.get(guildID);
if (!pendingRequest) {
throw new Error('Guild not pending');
}
if (!pendingRequest) throw new Error('Guild not pending');
pendingRequest.token = token;
// remove the port otherwise it errors ¯\_(ツ)_/¯
pendingRequest.endpoint = endpoint.match(/([^:]*)/)[0];
@@ -66,22 +62,20 @@ class ClientVoiceManager {
/**
* Called when the Client receives information about the voice state update.
* @param {string} guildID the ID of the Guild
* @param {string} sessionID the session id to authorise with
* @param {string} guildID The ID of the Guild
* @param {string} sessionID The session id to authorise with
*/
_receivedVoiceStateUpdate(guildID, sessionID) {
const pendingRequest = this.pending.get(guildID);
if (!pendingRequest) {
throw new Error('Guild not pending');
}
if (!pendingRequest) throw new Error('Guild not pending');
pendingRequest.sessionID = sessionID;
this._checkPendingReady(guildID);
}
/**
* Sends a request to the main gateway to join a voice channel
* @param {VoiceChannel} channel the channel to join
* @param {Object} [options] the options to provide
* @param {VoiceChannel} channel The channel to join
* @param {Object} [options] The options to provide
*/
_sendWSJoin(channel, options = {}) {
options = mergeDefault({
@@ -98,14 +92,12 @@ class ClientVoiceManager {
/**
* Sets up a request to join a voice channel
* @param {VoiceChannel} channel the voice channel to join
* @param {VoiceChannel} channel The voice channel to join
* @returns {Promise<VoiceConnection>}
*/
joinChannel(channel) {
return new Promise((resolve, reject) => {
if (this.pending.get(channel.guild.id)) {
throw new Error('already connecting to a channel in this guild');
}
if (this.pending.get(channel.guild.id)) throw new Error('already connecting to a channel in this guild');
const existingConn = this.connections.get(channel.guild.id);
if (existingConn) {
if (existingConn.channel.id !== channel.id) {

View File

@@ -71,14 +71,14 @@ class VoiceConnection extends EventEmitter {
/**
* Executed whenever an error occurs with the UDP/WebSocket sub-client
* @private
* @param {Error} err The error that occurred
* @param {Error} err The encountered error
*/
_onError(err) {
this._reject(err);
/**
* Emitted whenever the connection encounters a fatal error.
* @event VoiceConnection#error
* @param {Error} error the encountered error
* @param {Error} error The encountered error
*/
this.emit('error', err);
this._shutdown(err);
@@ -86,7 +86,7 @@ class VoiceConnection extends EventEmitter {
/**
* Disconnects the Client from the Voice Channel
* @param {string} [reason='user requested'] the reason of the disconnection
* @param {string} [reason='user requested'] The reason of the disconnection
*/
disconnect(reason = 'user requested') {
this.manager.client.ws.send({
@@ -107,19 +107,15 @@ class VoiceConnection extends EventEmitter {
}
_shutdown(e) {
if (!this.ready) {
return;
}
if (!this.ready) return;
this.ready = false;
this.websocket._shutdown();
this.player._shutdown();
if (this.udp) {
this.udp._shutdown();
}
if (this.udp) this.udp._shutdown();
/**
* Emit once the voice connection has disconnected.
* @event VoiceConnection#disconnected
* @param {Error} error the error, if any
* @param {Error} error The encountered error, if any
*/
this.emit('disconnected', e);
}
@@ -149,9 +145,7 @@ class VoiceConnection extends EventEmitter {
});
this.once('ready', () => {
setImmediate(() => {
for (const item of this.queue) {
this.emit(...item);
}
for (const item of this.queue) this.emit(...item);
this.queue = [];
});
});
@@ -197,21 +191,18 @@ class VoiceConnection extends EventEmitter {
/**
* Emitted whenever a user starts/stops speaking
* @event VoiceConnection#speaking
* @param {User} user the user that has started/stopped speaking
* @param {boolean} speaking whether or not the user is speaking
* @param {User} user The user that has started/stopped speaking
* @param {boolean} speaking Whether or not the user is speaking
*/
if (this.ready) {
this.emit('speaking', user, data.speaking);
} else {
this.queue.push(['speaking', user, data.speaking]);
}
if (this.ready) this.emit('speaking', user, data.speaking);
else this.queue.push(['speaking', user, data.speaking]);
guild._memberSpeakUpdate(data.user_id, data.speaking);
});
}
/**
* Play the given file in the voice connection
* @param {string} file the path to the file
* @param {string} file The path to the file
* @returns {StreamDispatcher}
* @example
* // play files natively
@@ -227,7 +218,7 @@ class VoiceConnection extends EventEmitter {
/**
* Plays and converts an audio stream in the voice connection
* @param {ReadableStream} stream the audio stream to play
* @param {ReadableStream} stream The audio stream to play
* @returns {StreamDispatcher}
* @example
* // play streams using ytdl-core
@@ -245,7 +236,7 @@ class VoiceConnection extends EventEmitter {
/**
* Plays a stream of 16-bit signed stereo PCM at 48KHz.
* @param {ReadableStream} stream the audio stream to play.
* @param {ReadableStream} stream The audio stream to play.
* @returns {StreamDispatcher}
*/
playConvertedStream(stream) {

View File

@@ -4,7 +4,6 @@ const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
class VoiceConnectionUDPClient extends EventEmitter {
constructor(voiceConnection, data) {
super();
this.voiceConnection = voiceConnection;
@@ -38,9 +37,7 @@ class VoiceConnectionUDPClient extends EventEmitter {
try {
this.udpSocket.close();
} catch (err) {
if (err.message !== 'Not running') {
this.emit('error', err);
}
if (err.message !== 'Not running') this.emit('error', err);
}
this.udpSocket = null;
}
@@ -55,10 +52,7 @@ class VoiceConnectionUDPClient extends EventEmitter {
this.udpSocket.once('message', message => {
const packet = new Buffer(message);
this.localIP = '';
for (let i = 4; i < packet.indexOf(0, i); i++) {
this.localIP += String.fromCharCode(packet[i]);
}
for (let i = 4; i < packet.indexOf(0, i); i++) this.localIP += String.fromCharCode(packet[i]);
this.localPort = parseInt(packet.readUIntLE(packet.length - 2, 2).toString(10), 10);
this.voiceConnection.websocket.send({
@@ -77,7 +71,6 @@ class VoiceConnectionUDPClient extends EventEmitter {
this.udpSocket.on('error', (error, message) => {
this.emit('error', { error, message });
});
this.udpSocket.on('close', error => {
this.emit('close', error);
});
@@ -86,7 +79,6 @@ class VoiceConnectionUDPClient extends EventEmitter {
blankMessage.writeUIntBE(this.data.ssrc, 0, 4);
this.send(blankMessage);
}
}
module.exports = VoiceConnectionUDPClient;

View File

@@ -26,15 +26,11 @@ class VoiceConnectionWebSocket extends EventEmitter {
}
send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
this.ws.send(JSON.stringify(data));
}
if (this.ws.readyState === WebSocket.OPEN) this.ws.send(JSON.stringify(data));
}
_shutdown() {
if (this.ws) {
this.ws.close();
}
if (this.ws) this.ws.close();
clearInterval(this.heartbeat);
}
@@ -97,9 +93,7 @@ class VoiceConnectionWebSocket extends EventEmitter {
case Constants.VoiceOPCodes.SESSION_DESCRIPTION:
this.encryptionMode = packet.d.mode;
this.secretKey = new Uint8Array(new ArrayBuffer(packet.d.secret_key.length));
for (const index in packet.d.secret_key) {
this.secretKey[index] = packet.d.secret_key[index];
}
for (const index in packet.d.secret_key) this.secretKey[index] = packet.d.secret_key[index];
this.emit('ready', this.secretKey);
break;
case Constants.VoiceOPCodes.SPEAKING:

View File

@@ -34,10 +34,10 @@ class StreamDispatcher extends EventEmitter {
}
/**
* Emitted when the dispatcher starts/stops speaking
* @event StreamDispatcher#speaking
* @param {boolean} value whether or not the dispatcher is speaking
*/
* Emitted when the dispatcher starts/stops speaking
* @event StreamDispatcher#speaking
* @param {boolean} value Whether or not the dispatcher is speaking
*/
_setSpeaking(value) {
this.speaking = value;
this.emit('speaking', value);
@@ -62,27 +62,18 @@ class StreamDispatcher extends EventEmitter {
packetBuffer.copy(nonce, 0, 0, 12);
buffer = NaCl.secretbox(buffer, nonce, this.player.connection.data.secret);
for (let i = 0; i < buffer.length; i++) {
packetBuffer[i + 12] = buffer[i];
}
for (let i = 0; i < buffer.length; i++) packetBuffer[i + 12] = buffer[i];
return packetBuffer;
}
_applyVolume(buffer) {
if (this._volume === 1) {
return buffer;
}
if (this._volume === 1) return buffer;
const out = new Buffer(buffer.length);
for (let i = 0; i < buffer.length; i += 2) {
if (i >= buffer.length - 1) {
break;
}
if (i >= buffer.length - 1) break;
const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i))));
out.writeInt16LE(uint, i);
}
@@ -95,20 +86,24 @@ class StreamDispatcher extends EventEmitter {
this._setSpeaking(false);
return;
}
const data = this.streamingData;
if (data.missed >= 5) {
this._triggerTerminalState('error', new Error('stream is not generating fast enough'));
return;
}
if (this.paused) {
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
this.player.connection.manager.client.setTimeout(() => this._send(), data.length * 10);
return;
}
const bufferLength = 1920 * data.channels;
this._setSpeaking(true);
let buffer = this.stream.read(bufferLength);
this._setSpeaking(true);
const bufferLength = 1920 * data.channels;
let buffer = this.stream.read(bufferLength);
if (!buffer) {
data.missed++;
this.player.connection.manager.client.setTimeout(() => this._send(), data.length * 10);
@@ -132,7 +127,6 @@ class StreamDispatcher extends EventEmitter {
this._sendBuffer(buffer, data.sequence, data.timestamp);
const nextTime = data.startTime + (data.count * data.length);
this.player.connection.manager.client.setTimeout(() => this._send(), data.length + (nextTime - Date.now()));
} catch (e) {
this._triggerTerminalState('error', e);
@@ -140,33 +134,31 @@ class StreamDispatcher extends EventEmitter {
}
/**
* Emitted once the stream has ended. Attach a `once` listener to this.
* @event StreamDispatcher#end
*/
* Emitted once the stream has ended. Attach a `once` listener to this.
* @event StreamDispatcher#end
*/
_triggerEnd() {
this.emit('end');
}
/**
* Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`.
* @event StreamDispatcher#error
* @param {Error} err the error encountered
*/
* Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`.
* @event StreamDispatcher#error
* @param {Error} err The encountered error
*/
_triggerError(err) {
this.emit('end');
this.emit('error', err);
}
_triggerTerminalState(state, err) {
if (this._triggered) {
return;
}
if (this._triggered) return;
/**
* Emitted when the stream wants to give debug information.
* @event StreamDispatcher#debug
* @param {string} information the debug information
*/
* Emitted when the stream wants to give debug information.
* @event StreamDispatcher#debug
* @param {string} information The debug information
*/
this.emit('debug', `triggered terminal state ${state} - stream is now dead`);
this._triggered = true;
this._setSpeaking(false);
@@ -188,17 +180,20 @@ class StreamDispatcher extends EventEmitter {
this.emit('error', 'no stream');
return;
}
this.stream.on('end', err => this._triggerTerminalState('end', err));
this.stream.on('error', err => this._triggerTerminalState('error', err));
const data = this.streamingData;
data.length = 20;
data.missed = 0;
data.startTime = Date.now();
this.stream.once('readable', () => this._send());
}
_pause(value) {
if (value) {
_setPaused(paused) {
if (paused) {
this.paused = true;
this._setSpeaking(false);
} else {
@@ -225,7 +220,7 @@ class StreamDispatcher extends EventEmitter {
/**
* Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double.
* @param {number} volume the volume that you want to set
* @param {number} volume The volume that you want to set
*/
setVolume(volume) {
this._volume = volume;
@@ -233,7 +228,7 @@ class StreamDispatcher extends EventEmitter {
/**
* Set the volume in decibels
* @param {number} db the decibels
* @param {number} db The decibels
*/
setVolumeDecibels(db) {
this._volume = Math.pow(10, db / 20);
@@ -241,7 +236,7 @@ class StreamDispatcher extends EventEmitter {
/**
* Set the volume so that a perceived value of 0.5 is half the perceived volume etc.
* @param {number} value the value for the volume
* @param {number} value The value for the volume
*/
setVolumeLogarithmic(value) {
this._volume = Math.pow(value, 1.660964);
@@ -251,14 +246,14 @@ class StreamDispatcher extends EventEmitter {
* Stops sending voice packets to the voice connection (stream may still progress however)
*/
pause() {
this._pause(true);
this._setPaused(true);
}
/**
* Resumes sending voice packets to the voice connection (may be further on in the stream than when paused)
*/
resume() {
this._pause(false);
this._setPaused(false);
}
}

View File

@@ -6,7 +6,6 @@ class NodeOpusEngine extends OpusEngine {
constructor(player) {
super(player);
try {
// eslint-disable-next-line import/no-unresolved
opus = require('node-opus');
} catch (err) {
throw err;

View File

@@ -3,10 +3,6 @@ const list = [
require('./OpusScriptEngine'),
];
exports.add = encoder => {
list.push(encoder);
};
function fetch(Encoder) {
try {
return new Encoder();
@@ -15,12 +11,14 @@ function fetch(Encoder) {
}
}
exports.add = encoder => {
list.push(encoder);
};
exports.fetch = () => {
for (const encoder of list) {
const success = fetch(encoder);
if (success) {
return success;
}
const fetched = fetch(encoder);
if (fetched) return fetched;
}
throw new Error('could not find an opus engine');
};

View File

@@ -1,17 +1,16 @@
const OpusEngine = require('./BaseOpusEngine');
let Opusscript;
let OpusScript;
class NodeOpusEngine extends OpusEngine {
constructor(player) {
super(player);
try {
// eslint-disable-next-line import/no-unresolved
Opusscript = require('opusscript');
OpusScript = require('opusscript');
} catch (err) {
throw err;
}
this.encoder = new Opusscript(48000, 2);
this.encoder = new OpusScript(48000, 2);
}
encode(buffer) {

View File

@@ -1,7 +1,6 @@
const EventEmitter = require('events').EventEmitter;
class ConverterEngine extends EventEmitter {
constructor(player) {
super();
this.player = player;
@@ -10,7 +9,6 @@ class ConverterEngine extends EventEmitter {
createConvertStream() {
return;
}
}
module.exports = ConverterEngine;

View File

@@ -1,15 +1,6 @@
const ConverterEngine = require('./ConverterEngine');
const ChildProcess = require('child_process');
function chooseCommand() {
for (const cmd of ['ffmpeg', 'avconv', './ffmpeg', './avconv']) {
if (!ChildProcess.spawnSync(cmd, ['-h']).error) {
return cmd;
}
}
return null;
}
class FfmpegConverterEngine extends ConverterEngine {
constructor(player) {
super(player);
@@ -17,9 +8,7 @@ class FfmpegConverterEngine extends ConverterEngine {
}
handleError(encoder, err) {
if (encoder.destroy) {
encoder.destroy();
}
if (encoder.destroy) encoder.destroy();
this.emit('error', err);
}
@@ -41,4 +30,11 @@ class FfmpegConverterEngine extends ConverterEngine {
}
}
function chooseCommand() {
for (const cmd of ['ffmpeg', 'avconv', './ffmpeg', './avconv']) {
if (!ChildProcess.spawnSync(cmd, ['-h']).error) return cmd;
}
return null;
}
module.exports = FfmpegConverterEngine;

View File

@@ -5,7 +5,6 @@ const StreamDispatcher = require('../dispatcher/StreamDispatcher');
const EventEmitter = require('events').EventEmitter;
class VoiceConnectionPlayer extends EventEmitter {
constructor(connection) {
super();
this.connection = connection;
@@ -38,9 +37,7 @@ class VoiceConnectionPlayer extends EventEmitter {
_shutdown() {
this.speaking = false;
for (const stream of this.processMap.keys()) {
this.killStream(stream);
}
for (const stream of this.processMap.keys()) this.killStream(stream);
}
killStream(stream) {
@@ -79,9 +76,7 @@ class VoiceConnectionPlayer extends EventEmitter {
}
setSpeaking(value) {
if (this.speaking === value) {
return;
}
if (this.speaking === value) return;
this.speaking = value;
this.connection.websocket.send({
op: Constants.VoiceOPCodes.SPEAKING,
@@ -100,7 +95,6 @@ class VoiceConnectionPlayer extends EventEmitter {
this.dispatcher = dispatcher;
return dispatcher;
}
}
module.exports = VoiceConnectionPlayer;

View File

@@ -2,7 +2,6 @@ const BasePlayer = require('./BasePlayer');
const fs = require('fs');
class DefaultPlayer extends BasePlayer {
playFile(file) {
return this.playStream(fs.createReadStream(file));
}

View File

@@ -11,10 +11,8 @@ class VoiceReadable extends Readable {
return;
}
$push(d) {
if (this.open) {
this.push(d);
}
_push(d) {
if (this.open) this.push(d);
}
}

View File

@@ -53,7 +53,7 @@ class VoiceReceiver extends EventEmitter {
/**
* Creates a readable stream for a user that provides opus data while the user is speaking. When the user
* stops speaking, the stream is destroyed.
* @param {UserResolvable} user the user to create the stream for
* @param {UserResolvable} user The user to create the stream for
* @returns {ReadableStream}
*/
createOpusStream(user) {
@@ -72,17 +72,13 @@ class VoiceReceiver extends EventEmitter {
/**
* Creates a readable stream for a user that provides PCM data while the user is speaking. When the user
* stops speaking, the stream is destroyed. The stream is 16-bit signed stereo PCM at 48KHz.
* @param {UserResolvable} user the user to create the stream for
* @param {UserResolvable} user The user to create the stream for
* @returns {ReadableStream}
*/
createPCMStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user);
if (!user) {
throw new Error('invalid user object supplied');
}
if (this.pcmStreams.get(user.id)) {
throw new Error('there is already an existing stream for that user!');
}
if (!user) throw new Error('invalid user object supplied');
if (this.pcmStreams.get(user.id)) throw new Error('there is already an existing stream for that user!');
const stream = new Readable();
this.pcmStreams.set(user.id, stream);
return stream;
@@ -95,34 +91,30 @@ class VoiceReceiver extends EventEmitter {
/**
* Emitted whenever a voice packet cannot be decrypted
* @event VoiceReceiver#warn
* @param {string} message the warning message
* @param {string} message The warning message
*/
this.emit('warn', 'failed to decrypt voice packet');
return;
}
data = new Buffer(data);
/**
* Emitted whenever voice data is received from the voice connection. This is _always_ emitted (unlike PCM).
* @event VoiceReceiver#opus
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the opus buffer
*/
if (this.opusStreams.get(user.id)) {
this.opusStreams.get(user.id).$push(data);
}
if (this.opusStreams.get(user.id)) this.opusStreams.get(user.id)._push(data);
/**
* Emitted whenever voice data is received from the voice connection. This is _always_ emitted (unlike PCM).
* @event VoiceReceiver#opus
* @param {User} user The user that is sending the buffer (is speaking)
* @param {Buffer} buffer The opus buffer
*/
this.emit('opus', user, data);
if (this.listenerCount('pcm') > 0 || this.pcmStreams.size > 0) {
/**
* Emits decoded voice data when it's received. For performance reasons, the decoding will only
* happen if there is at least one `pcm` listener on this receiver.
* @event VoiceReceiver#pcm
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the decoded buffer
*/
* Emits decoded voice data when it's received. For performance reasons, the decoding will only
* happen if there is at least one `pcm` listener on this receiver.
* @event VoiceReceiver#pcm
* @param {User} user The user that is sending the buffer (is speaking)
* @param {Buffer} buffer The decoded buffer
*/
const pcm = this.connection.player.opusEncoder.decode(data);
if (this.pcmStreams.get(user.id)) {
this.pcmStreams.get(user.id).$push(pcm);
}
if (this.pcmStreams.get(user.id)) this.pcmStreams.get(user.id)._push(pcm);
this.emit('pcm', user, pcm);
}
}