diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index a56a4a98e..6e3cfd207 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -20,16 +20,9 @@ class StreamDispatcher extends EventEmitter { super(); this.player = player; this.stream = stream; - this._startStreaming(); - this._triggered = false; - this._volume = streamOptions.volume; - - /** - * How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5 - * aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime. - * @type {number} - */ - this.passes = streamOptions.passes || 1; + this.startStreaming(); + this.streamOptions = streamOptions; + this.streamOptions.volume = this.streamOptions.volume || 0; /** * Whether playing is paused @@ -37,9 +30,15 @@ class StreamDispatcher extends EventEmitter { */ this.paused = false; + this.destroyed = false; + this.setVolume(streamOptions.volume || 1); } + get passes() { + return this.streamOptions.passes || 1; + } + get streamingData() { return this.player.streamingData; } @@ -68,7 +67,7 @@ class StreamDispatcher extends EventEmitter { * @readonly */ get volume() { - return this._volume; + return this.streamOptions.volume; } /** @@ -76,7 +75,7 @@ class StreamDispatcher extends EventEmitter { * @param {number} volume The volume that you want to set */ setVolume(volume) { - this._volume = volume; + this.streamOptions.volume = volume; } /** @@ -84,7 +83,7 @@ class StreamDispatcher extends EventEmitter { * @param {number} db The decibels */ setVolumeDecibels(db) { - this._volume = Math.pow(10, db / 20); + this.streamOptions.volume = Math.pow(10, db / 20); } /** @@ -92,32 +91,29 @@ class StreamDispatcher extends EventEmitter { * @param {number} value The value for the volume */ setVolumeLogarithmic(value) { - this._volume = Math.pow(value, 1.660964); + this.streamOptions.volume = Math.pow(value, 1.660964); } /** * Stops sending voice packets to the voice connection (stream may still progress however) */ - pause() { - this._setPaused(true); - } + pause() { this.setPaused(true); } /** * Resumes sending voice packets to the voice connection (may be further on in the stream than when paused) */ - resume() { - this._setPaused(false); - } + resume() { this.setPaused(false); } + /** * Stops the current stream permanently and emits an `end` event. * @param {string} [reason='user'] An optional reason for stopping the dispatcher. */ end(reason = 'user') { - this._triggerTerminalState('end', reason); + this.destroy('end', reason); } - _setSpeaking(value) { + setSpeaking(value) { this.speaking = value; /** * Emitted when the dispatcher starts/stops speaking @@ -127,16 +123,16 @@ class StreamDispatcher extends EventEmitter { this.emit('speaking', value); } - _sendBuffer(buffer, sequence, timestamp) { + sendBuffer(buffer, sequence, timestamp) { let repeats = this.passes; - const packet = this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer)); + const packet = this.createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer)); while (repeats--) { this.player.voiceConnection.sockets.udp.send(packet) .catch(e => this.emit('debug', `Failed to send a packet ${e}`)); } } - _createPacket(sequence, timestamp, buffer) { + createPacket(sequence, timestamp, buffer) { const packetBuffer = new Buffer(buffer.length + 28); packetBuffer.fill(0); packetBuffer[0] = 0x80; @@ -154,41 +150,41 @@ class StreamDispatcher extends EventEmitter { return packetBuffer; } - _applyVolume(buffer) { - if (this._volume === 1) return buffer; + applyVolume(buffer) { + if (this.volume === 1) return buffer; const out = new Buffer(buffer.length); for (let i = 0; i < buffer.length; i += 2) { if (i >= buffer.length - 1) break; - const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i)))); + const uint = Math.min(32767, Math.max(-32767, Math.floor(this.volume * buffer.readInt16LE(i)))); out.writeInt16LE(uint, i); } return out; } - _send() { + process() { try { - if (this._triggered) { - this._setSpeaking(false); + if (this.destroyed) { + this.setSpeaking(false); return; } const data = this.streamingData; if (data.missed >= 5) { - this._triggerTerminalState('end', 'Stream is not generating quickly enough.'); + this.destroy('end', 'Stream is not generating quickly enough.'); return; } if (this.paused) { // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } - this._setSpeaking(true); + this.setSpeaking(true); if (!data.startTime) { /** @@ -204,7 +200,7 @@ class StreamDispatcher extends EventEmitter { if (!buffer) { data.missed++; data.pausedTime += data.length * 10; - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10); return; } @@ -216,89 +212,45 @@ class StreamDispatcher extends EventEmitter { buffer = newBuffer; } - buffer = this._applyVolume(buffer); + buffer = this.applyVolume(buffer); data.count++; data.sequence = (data.sequence + 1) < 65536 ? data.sequence + 1 : 0; data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; - this._sendBuffer(buffer, data.sequence, data.timestamp); + this.sendBuffer(buffer, data.sequence, data.timestamp); const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); - this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), nextTime); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime); } catch (e) { - this._triggerTerminalState('error', e); + this.destroy('error', e); } } - _triggerEnd(reason) { - /** - * Emitted once the stream has ended. Attach a `once` listener to this. - * @event StreamDispatcher#end - * @param {string} reason The reason for the end of the dispatcher. If it ended because it reached the end of the - * stream, this would be `stream`. If you invoke `.end()` without specifying a reason, this would be `user`. - */ - this.emit('end', reason); + destroy(type, reason) { + if (this.destroyed) return; + this.destroyed = true; + this.setSpeaking(false); + this.emit(type, reason); } - _triggerError(err) { - this.emit('end'); - /** - * Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`. - * @event StreamDispatcher#error - * @param {Error} err The encountered error - */ - this.emit('error', err); - } - - _triggerTerminalState(state, err) { - if (this._triggered) return; - /** - * Emitted when the stream wants to give debug information. - * @event StreamDispatcher#debug - * @param {string} information The debug information - */ - this.emit('debug', `Triggered terminal state ${state} - stream is now dead`); - this._triggered = true; - this._setSpeaking(false); - switch (state) { - case 'end': - this._triggerEnd(err); - break; - case 'error': - this._triggerError(err); - break; - default: - this.emit('error', 'Unknown trigger state'); - break; - } - } - - _startStreaming() { + startStreaming() { if (!this.stream) { this.emit('error', 'No stream'); return; } - this.stream.on('end', err => this._triggerTerminalState('end', err || 'stream')); - this.stream.on('error', err => this._triggerTerminalState('error', err)); + this.stream.on('end', err => this.destroy('end', err || 'stream')); + this.stream.on('error', err => this.destroy('error', err)); const data = this.streamingData; data.length = 20; data.missed = 0; - this.stream.once('readable', () => this._send()); + this.stream.once('readable', () => this.process()); } - _setPaused(paused) { - if (paused) { - this.paused = true; - this._setSpeaking(false); - } else { - this.paused = false; - this._setSpeaking(true); - } - } + setPaused(paused) { this.setSpeaking(!(this.paused = paused)); } } module.exports = StreamDispatcher; diff --git a/src/client/voice/dispatcher/StreamDispatcher.old.js b/src/client/voice/dispatcher/StreamDispatcher.old.js new file mode 100644 index 000000000..a56a4a98e --- /dev/null +++ b/src/client/voice/dispatcher/StreamDispatcher.old.js @@ -0,0 +1,304 @@ +const EventEmitter = require('events').EventEmitter; +const NaCl = require('tweetnacl'); + +const nonce = new Buffer(24); +nonce.fill(0); + +/** + * The class that sends voice packet data to the voice connection. + * ```js + * // obtained using: + * voiceChannel.join().then(connection => { + * // you can play a file or a stream here: + * const dispatcher = connection.playFile('./file.mp3'); + * }); + * ``` + * @extends {EventEmitter} + */ +class StreamDispatcher extends EventEmitter { + constructor(player, stream, streamOptions) { + super(); + this.player = player; + this.stream = stream; + this._startStreaming(); + this._triggered = false; + this._volume = streamOptions.volume; + + /** + * How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5 + * aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime. + * @type {number} + */ + this.passes = streamOptions.passes || 1; + + /** + * Whether playing is paused + * @type {boolean} + */ + this.paused = false; + + this.setVolume(streamOptions.volume || 1); + } + + get streamingData() { + return this.player.streamingData; + } + + /** + * How long the stream dispatcher has been "speaking" for + * @type {number} + * @readonly + */ + get time() { + return this.streamingData.count * (this.streamingData.length || 0); + } + + /** + * The total time, taking into account pauses and skips, that the dispatcher has been streaming for + * @type {number} + * @readonly + */ + get totalStreamTime() { + return this.time + this.streamingData.pausedTime; + } + + /** + * The volume of the stream, relative to the stream's input volume + * @type {number} + * @readonly + */ + get volume() { + return this._volume; + } + + /** + * Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double. + * @param {number} volume The volume that you want to set + */ + setVolume(volume) { + this._volume = volume; + } + + /** + * Set the volume in decibels + * @param {number} db The decibels + */ + setVolumeDecibels(db) { + this._volume = Math.pow(10, db / 20); + } + + /** + * Set the volume so that a perceived value of 0.5 is half the perceived volume etc. + * @param {number} value The value for the volume + */ + setVolumeLogarithmic(value) { + this._volume = Math.pow(value, 1.660964); + } + + /** + * Stops sending voice packets to the voice connection (stream may still progress however) + */ + pause() { + this._setPaused(true); + } + + /** + * Resumes sending voice packets to the voice connection (may be further on in the stream than when paused) + */ + resume() { + this._setPaused(false); + } + + /** + * Stops the current stream permanently and emits an `end` event. + * @param {string} [reason='user'] An optional reason for stopping the dispatcher. + */ + end(reason = 'user') { + this._triggerTerminalState('end', reason); + } + + _setSpeaking(value) { + this.speaking = value; + /** + * Emitted when the dispatcher starts/stops speaking + * @event StreamDispatcher#speaking + * @param {boolean} value Whether or not the dispatcher is speaking + */ + this.emit('speaking', value); + } + + _sendBuffer(buffer, sequence, timestamp) { + let repeats = this.passes; + const packet = this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer)); + while (repeats--) { + this.player.voiceConnection.sockets.udp.send(packet) + .catch(e => this.emit('debug', `Failed to send a packet ${e}`)); + } + } + + _createPacket(sequence, timestamp, buffer) { + const packetBuffer = new Buffer(buffer.length + 28); + packetBuffer.fill(0); + packetBuffer[0] = 0x80; + packetBuffer[1] = 0x78; + + packetBuffer.writeUIntBE(sequence, 2, 2); + packetBuffer.writeUIntBE(timestamp, 4, 4); + packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc, 8, 4); + + packetBuffer.copy(nonce, 0, 0, 12); + buffer = NaCl.secretbox(buffer, nonce, this.player.voiceConnection.authentication.secretKey.key); + + for (let i = 0; i < buffer.length; i++) packetBuffer[i + 12] = buffer[i]; + + return packetBuffer; + } + + _applyVolume(buffer) { + if (this._volume === 1) return buffer; + + const out = new Buffer(buffer.length); + for (let i = 0; i < buffer.length; i += 2) { + if (i >= buffer.length - 1) break; + const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i)))); + out.writeInt16LE(uint, i); + } + + return out; + } + + _send() { + try { + if (this._triggered) { + this._setSpeaking(false); + return; + } + + const data = this.streamingData; + + if (data.missed >= 5) { + this._triggerTerminalState('end', 'Stream is not generating quickly enough.'); + return; + } + + if (this.paused) { + // data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; + data.pausedTime += data.length * 10; + this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + return; + } + + this._setSpeaking(true); + + if (!data.startTime) { + /** + * Emitted once the dispatcher starts streaming + * @event StreamDispatcher#start + */ + this.emit('start'); + data.startTime = Date.now(); + } + + const bufferLength = 1920 * data.channels; + let buffer = this.stream.read(bufferLength); + if (!buffer) { + data.missed++; + data.pausedTime += data.length * 10; + this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10); + return; + } + + data.missed = 0; + + if (buffer.length !== bufferLength) { + const newBuffer = new Buffer(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + buffer = this._applyVolume(buffer); + + data.count++; + data.sequence = (data.sequence + 1) < 65536 ? data.sequence + 1 : 0; + data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; + + this._sendBuffer(buffer, data.sequence, data.timestamp); + + const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now()); + this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), nextTime); + } catch (e) { + this._triggerTerminalState('error', e); + } + } + + _triggerEnd(reason) { + /** + * Emitted once the stream has ended. Attach a `once` listener to this. + * @event StreamDispatcher#end + * @param {string} reason The reason for the end of the dispatcher. If it ended because it reached the end of the + * stream, this would be `stream`. If you invoke `.end()` without specifying a reason, this would be `user`. + */ + this.emit('end', reason); + } + + _triggerError(err) { + this.emit('end'); + /** + * Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`. + * @event StreamDispatcher#error + * @param {Error} err The encountered error + */ + this.emit('error', err); + } + + _triggerTerminalState(state, err) { + if (this._triggered) return; + /** + * Emitted when the stream wants to give debug information. + * @event StreamDispatcher#debug + * @param {string} information The debug information + */ + this.emit('debug', `Triggered terminal state ${state} - stream is now dead`); + this._triggered = true; + this._setSpeaking(false); + switch (state) { + case 'end': + this._triggerEnd(err); + break; + case 'error': + this._triggerError(err); + break; + default: + this.emit('error', 'Unknown trigger state'); + break; + } + } + + _startStreaming() { + if (!this.stream) { + this.emit('error', 'No stream'); + return; + } + + this.stream.on('end', err => this._triggerTerminalState('end', err || 'stream')); + this.stream.on('error', err => this._triggerTerminalState('error', err)); + + const data = this.streamingData; + data.length = 20; + data.missed = 0; + + this.stream.once('readable', () => this._send()); + } + + _setPaused(paused) { + if (paused) { + this.paused = true; + this._setSpeaking(false); + } else { + this.paused = false; + this._setSpeaking(true); + } + } +} + +module.exports = StreamDispatcher; diff --git a/src/client/voice/player/AudioPlayer.js b/src/client/voice/player/AudioPlayer.js index 64c54630f..d9f1144a2 100644 --- a/src/client/voice/player/AudioPlayer.js +++ b/src/client/voice/player/AudioPlayer.js @@ -18,7 +18,7 @@ class AudioPlayer extends EventEmitter { this.voiceConnection = voiceConnection; this.prism = new Prism(); this.opusEncoder = OpusEncoders.fetch(); - this.transcoders = new Collection(); + this.streams = new Collection(); this.streamingData = { channels: 2, count: 0, @@ -29,14 +29,17 @@ class AudioPlayer extends EventEmitter { } get currentTranscoder() { - return this.transcoders.last(); + return this.streams.last().transcoder; } - destroyAllTranscoders(exceptLatest) { - for (const stream of this.transcoders.keys()) { - const transcoder = this.transcoders.get(stream); + destroyAllStreams(exceptLatest) { + for (const stream of this.streams.keys()) { + const data = this.streams.get(stream); + const transcoder = data.transcoder; + const dispatcher = data.dispatcher; if (exceptLatest && transcoder === this.currentTranscoder) continue; - transcoder.kill(); + if (transcoder) transcoder.kill(); + if (dispatcher) dispatcher.destroy('end'); } } @@ -47,15 +50,17 @@ class AudioPlayer extends EventEmitter { media: stream, ffmpegArguments, }); - this.transcoders.set(stream, transcoder); + this.streams.set(stream, { transcoder }); this.playPCMStream(transcoder.output, options); } playPCMStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) { const options = { seek, volume, passes }; - this.destroyAllTranscoders(true); + this.destroyAllStreams(true); const dispatcher = new StreamDispatcher(this, stream, options); dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value)); + if (!this.streams.has(stream)) this.streams.set(stream, { dispatcher }); + this.streams.get(stream).dispatcher = dispatcher; return dispatcher; } }