update streamDispatcher

This commit is contained in:
Amish Shah
2016-12-28 17:28:14 +00:00
parent be32bbc3a4
commit 8cf520d5af
3 changed files with 362 additions and 101 deletions

View File

@@ -20,16 +20,9 @@ class StreamDispatcher extends EventEmitter {
super();
this.player = player;
this.stream = stream;
this._startStreaming();
this._triggered = false;
this._volume = streamOptions.volume;
/**
* How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5
* aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime.
* @type {number}
*/
this.passes = streamOptions.passes || 1;
this.startStreaming();
this.streamOptions = streamOptions;
this.streamOptions.volume = this.streamOptions.volume || 0;
/**
* Whether playing is paused
@@ -37,9 +30,15 @@ class StreamDispatcher extends EventEmitter {
*/
this.paused = false;
this.destroyed = false;
this.setVolume(streamOptions.volume || 1);
}
get passes() {
return this.streamOptions.passes || 1;
}
get streamingData() {
return this.player.streamingData;
}
@@ -68,7 +67,7 @@ class StreamDispatcher extends EventEmitter {
* @readonly
*/
get volume() {
return this._volume;
return this.streamOptions.volume;
}
/**
@@ -76,7 +75,7 @@ class StreamDispatcher extends EventEmitter {
* @param {number} volume The volume that you want to set
*/
setVolume(volume) {
this._volume = volume;
this.streamOptions.volume = volume;
}
/**
@@ -84,7 +83,7 @@ class StreamDispatcher extends EventEmitter {
* @param {number} db The decibels
*/
setVolumeDecibels(db) {
this._volume = Math.pow(10, db / 20);
this.streamOptions.volume = Math.pow(10, db / 20);
}
/**
@@ -92,32 +91,29 @@ class StreamDispatcher extends EventEmitter {
* @param {number} value The value for the volume
*/
setVolumeLogarithmic(value) {
this._volume = Math.pow(value, 1.660964);
this.streamOptions.volume = Math.pow(value, 1.660964);
}
/**
* Stops sending voice packets to the voice connection (stream may still progress however)
*/
pause() {
this._setPaused(true);
}
pause() { this.setPaused(true); }
/**
* Resumes sending voice packets to the voice connection (may be further on in the stream than when paused)
*/
resume() {
this._setPaused(false);
}
resume() { this.setPaused(false); }
/**
* Stops the current stream permanently and emits an `end` event.
* @param {string} [reason='user'] An optional reason for stopping the dispatcher.
*/
end(reason = 'user') {
this._triggerTerminalState('end', reason);
this.destroy('end', reason);
}
_setSpeaking(value) {
setSpeaking(value) {
this.speaking = value;
/**
* Emitted when the dispatcher starts/stops speaking
@@ -127,16 +123,16 @@ class StreamDispatcher extends EventEmitter {
this.emit('speaking', value);
}
_sendBuffer(buffer, sequence, timestamp) {
sendBuffer(buffer, sequence, timestamp) {
let repeats = this.passes;
const packet = this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer));
const packet = this.createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer));
while (repeats--) {
this.player.voiceConnection.sockets.udp.send(packet)
.catch(e => this.emit('debug', `Failed to send a packet ${e}`));
}
}
_createPacket(sequence, timestamp, buffer) {
createPacket(sequence, timestamp, buffer) {
const packetBuffer = new Buffer(buffer.length + 28);
packetBuffer.fill(0);
packetBuffer[0] = 0x80;
@@ -154,41 +150,41 @@ class StreamDispatcher extends EventEmitter {
return packetBuffer;
}
_applyVolume(buffer) {
if (this._volume === 1) return buffer;
applyVolume(buffer) {
if (this.volume === 1) return buffer;
const out = new Buffer(buffer.length);
for (let i = 0; i < buffer.length; i += 2) {
if (i >= buffer.length - 1) break;
const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i))));
const uint = Math.min(32767, Math.max(-32767, Math.floor(this.volume * buffer.readInt16LE(i))));
out.writeInt16LE(uint, i);
}
return out;
}
_send() {
process() {
try {
if (this._triggered) {
this._setSpeaking(false);
if (this.destroyed) {
this.setSpeaking(false);
return;
}
const data = this.streamingData;
if (data.missed >= 5) {
this._triggerTerminalState('end', 'Stream is not generating quickly enough.');
this.destroy('end', 'Stream is not generating quickly enough.');
return;
}
if (this.paused) {
// data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10);
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
return;
}
this._setSpeaking(true);
this.setSpeaking(true);
if (!data.startTime) {
/**
@@ -204,7 +200,7 @@ class StreamDispatcher extends EventEmitter {
if (!buffer) {
data.missed++;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10);
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
return;
}
@@ -216,89 +212,45 @@ class StreamDispatcher extends EventEmitter {
buffer = newBuffer;
}
buffer = this._applyVolume(buffer);
buffer = this.applyVolume(buffer);
data.count++;
data.sequence = (data.sequence + 1) < 65536 ? data.sequence + 1 : 0;
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
this._sendBuffer(buffer, data.sequence, data.timestamp);
this.sendBuffer(buffer, data.sequence, data.timestamp);
const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now());
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), nextTime);
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime);
} catch (e) {
this._triggerTerminalState('error', e);
this.destroy('error', e);
}
}
_triggerEnd(reason) {
/**
* Emitted once the stream has ended. Attach a `once` listener to this.
* @event StreamDispatcher#end
* @param {string} reason The reason for the end of the dispatcher. If it ended because it reached the end of the
* stream, this would be `stream`. If you invoke `.end()` without specifying a reason, this would be `user`.
*/
this.emit('end', reason);
destroy(type, reason) {
if (this.destroyed) return;
this.destroyed = true;
this.setSpeaking(false);
this.emit(type, reason);
}
_triggerError(err) {
this.emit('end');
/**
* Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`.
* @event StreamDispatcher#error
* @param {Error} err The encountered error
*/
this.emit('error', err);
}
_triggerTerminalState(state, err) {
if (this._triggered) return;
/**
* Emitted when the stream wants to give debug information.
* @event StreamDispatcher#debug
* @param {string} information The debug information
*/
this.emit('debug', `Triggered terminal state ${state} - stream is now dead`);
this._triggered = true;
this._setSpeaking(false);
switch (state) {
case 'end':
this._triggerEnd(err);
break;
case 'error':
this._triggerError(err);
break;
default:
this.emit('error', 'Unknown trigger state');
break;
}
}
_startStreaming() {
startStreaming() {
if (!this.stream) {
this.emit('error', 'No stream');
return;
}
this.stream.on('end', err => this._triggerTerminalState('end', err || 'stream'));
this.stream.on('error', err => this._triggerTerminalState('error', err));
this.stream.on('end', err => this.destroy('end', err || 'stream'));
this.stream.on('error', err => this.destroy('error', err));
const data = this.streamingData;
data.length = 20;
data.missed = 0;
this.stream.once('readable', () => this._send());
this.stream.once('readable', () => this.process());
}
_setPaused(paused) {
if (paused) {
this.paused = true;
this._setSpeaking(false);
} else {
this.paused = false;
this._setSpeaking(true);
}
}
setPaused(paused) { this.setSpeaking(!(this.paused = paused)); }
}
module.exports = StreamDispatcher;

View File

@@ -0,0 +1,304 @@
const EventEmitter = require('events').EventEmitter;
const NaCl = require('tweetnacl');
const nonce = new Buffer(24);
nonce.fill(0);
/**
* The class that sends voice packet data to the voice connection.
* ```js
* // obtained using:
* voiceChannel.join().then(connection => {
* // you can play a file or a stream here:
* const dispatcher = connection.playFile('./file.mp3');
* });
* ```
* @extends {EventEmitter}
*/
class StreamDispatcher extends EventEmitter {
constructor(player, stream, streamOptions) {
super();
this.player = player;
this.stream = stream;
this._startStreaming();
this._triggered = false;
this._volume = streamOptions.volume;
/**
* How many passes the dispatcher should take when sending packets to reduce packet loss. Values over 5
* aren't recommended, as it means you are using 5x more bandwidth. You _can_ edit this at runtime.
* @type {number}
*/
this.passes = streamOptions.passes || 1;
/**
* Whether playing is paused
* @type {boolean}
*/
this.paused = false;
this.setVolume(streamOptions.volume || 1);
}
get streamingData() {
return this.player.streamingData;
}
/**
* How long the stream dispatcher has been "speaking" for
* @type {number}
* @readonly
*/
get time() {
return this.streamingData.count * (this.streamingData.length || 0);
}
/**
* The total time, taking into account pauses and skips, that the dispatcher has been streaming for
* @type {number}
* @readonly
*/
get totalStreamTime() {
return this.time + this.streamingData.pausedTime;
}
/**
* The volume of the stream, relative to the stream's input volume
* @type {number}
* @readonly
*/
get volume() {
return this._volume;
}
/**
* Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double.
* @param {number} volume The volume that you want to set
*/
setVolume(volume) {
this._volume = volume;
}
/**
* Set the volume in decibels
* @param {number} db The decibels
*/
setVolumeDecibels(db) {
this._volume = Math.pow(10, db / 20);
}
/**
* Set the volume so that a perceived value of 0.5 is half the perceived volume etc.
* @param {number} value The value for the volume
*/
setVolumeLogarithmic(value) {
this._volume = Math.pow(value, 1.660964);
}
/**
* Stops sending voice packets to the voice connection (stream may still progress however)
*/
pause() {
this._setPaused(true);
}
/**
* Resumes sending voice packets to the voice connection (may be further on in the stream than when paused)
*/
resume() {
this._setPaused(false);
}
/**
* Stops the current stream permanently and emits an `end` event.
* @param {string} [reason='user'] An optional reason for stopping the dispatcher.
*/
end(reason = 'user') {
this._triggerTerminalState('end', reason);
}
_setSpeaking(value) {
this.speaking = value;
/**
* Emitted when the dispatcher starts/stops speaking
* @event StreamDispatcher#speaking
* @param {boolean} value Whether or not the dispatcher is speaking
*/
this.emit('speaking', value);
}
_sendBuffer(buffer, sequence, timestamp) {
let repeats = this.passes;
const packet = this._createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer));
while (repeats--) {
this.player.voiceConnection.sockets.udp.send(packet)
.catch(e => this.emit('debug', `Failed to send a packet ${e}`));
}
}
_createPacket(sequence, timestamp, buffer) {
const packetBuffer = new Buffer(buffer.length + 28);
packetBuffer.fill(0);
packetBuffer[0] = 0x80;
packetBuffer[1] = 0x78;
packetBuffer.writeUIntBE(sequence, 2, 2);
packetBuffer.writeUIntBE(timestamp, 4, 4);
packetBuffer.writeUIntBE(this.player.voiceConnection.authentication.ssrc, 8, 4);
packetBuffer.copy(nonce, 0, 0, 12);
buffer = NaCl.secretbox(buffer, nonce, this.player.voiceConnection.authentication.secretKey.key);
for (let i = 0; i < buffer.length; i++) packetBuffer[i + 12] = buffer[i];
return packetBuffer;
}
_applyVolume(buffer) {
if (this._volume === 1) return buffer;
const out = new Buffer(buffer.length);
for (let i = 0; i < buffer.length; i += 2) {
if (i >= buffer.length - 1) break;
const uint = Math.min(32767, Math.max(-32767, Math.floor(this._volume * buffer.readInt16LE(i))));
out.writeInt16LE(uint, i);
}
return out;
}
_send() {
try {
if (this._triggered) {
this._setSpeaking(false);
return;
}
const data = this.streamingData;
if (data.missed >= 5) {
this._triggerTerminalState('end', 'Stream is not generating quickly enough.');
return;
}
if (this.paused) {
// data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10);
return;
}
this._setSpeaking(true);
if (!data.startTime) {
/**
* Emitted once the dispatcher starts streaming
* @event StreamDispatcher#start
*/
this.emit('start');
data.startTime = Date.now();
}
const bufferLength = 1920 * data.channels;
let buffer = this.stream.read(bufferLength);
if (!buffer) {
data.missed++;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), data.length * 10);
return;
}
data.missed = 0;
if (buffer.length !== bufferLength) {
const newBuffer = new Buffer(bufferLength).fill(0);
buffer.copy(newBuffer);
buffer = newBuffer;
}
buffer = this._applyVolume(buffer);
data.count++;
data.sequence = (data.sequence + 1) < 65536 ? data.sequence + 1 : 0;
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
this._sendBuffer(buffer, data.sequence, data.timestamp);
const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now());
this.player.voiceConnection.voiceManager.client.setTimeout(() => this._send(), nextTime);
} catch (e) {
this._triggerTerminalState('error', e);
}
}
_triggerEnd(reason) {
/**
* Emitted once the stream has ended. Attach a `once` listener to this.
* @event StreamDispatcher#end
* @param {string} reason The reason for the end of the dispatcher. If it ended because it reached the end of the
* stream, this would be `stream`. If you invoke `.end()` without specifying a reason, this would be `user`.
*/
this.emit('end', reason);
}
_triggerError(err) {
this.emit('end');
/**
* Emitted once the stream has encountered an error. Attach a `once` listener to this. Also emits `end`.
* @event StreamDispatcher#error
* @param {Error} err The encountered error
*/
this.emit('error', err);
}
_triggerTerminalState(state, err) {
if (this._triggered) return;
/**
* Emitted when the stream wants to give debug information.
* @event StreamDispatcher#debug
* @param {string} information The debug information
*/
this.emit('debug', `Triggered terminal state ${state} - stream is now dead`);
this._triggered = true;
this._setSpeaking(false);
switch (state) {
case 'end':
this._triggerEnd(err);
break;
case 'error':
this._triggerError(err);
break;
default:
this.emit('error', 'Unknown trigger state');
break;
}
}
_startStreaming() {
if (!this.stream) {
this.emit('error', 'No stream');
return;
}
this.stream.on('end', err => this._triggerTerminalState('end', err || 'stream'));
this.stream.on('error', err => this._triggerTerminalState('error', err));
const data = this.streamingData;
data.length = 20;
data.missed = 0;
this.stream.once('readable', () => this._send());
}
_setPaused(paused) {
if (paused) {
this.paused = true;
this._setSpeaking(false);
} else {
this.paused = false;
this._setSpeaking(true);
}
}
}
module.exports = StreamDispatcher;

View File

@@ -18,7 +18,7 @@ class AudioPlayer extends EventEmitter {
this.voiceConnection = voiceConnection;
this.prism = new Prism();
this.opusEncoder = OpusEncoders.fetch();
this.transcoders = new Collection();
this.streams = new Collection();
this.streamingData = {
channels: 2,
count: 0,
@@ -29,14 +29,17 @@ class AudioPlayer extends EventEmitter {
}
get currentTranscoder() {
return this.transcoders.last();
return this.streams.last().transcoder;
}
destroyAllTranscoders(exceptLatest) {
for (const stream of this.transcoders.keys()) {
const transcoder = this.transcoders.get(stream);
destroyAllStreams(exceptLatest) {
for (const stream of this.streams.keys()) {
const data = this.streams.get(stream);
const transcoder = data.transcoder;
const dispatcher = data.dispatcher;
if (exceptLatest && transcoder === this.currentTranscoder) continue;
transcoder.kill();
if (transcoder) transcoder.kill();
if (dispatcher) dispatcher.destroy('end');
}
}
@@ -47,15 +50,17 @@ class AudioPlayer extends EventEmitter {
media: stream,
ffmpegArguments,
});
this.transcoders.set(stream, transcoder);
this.streams.set(stream, { transcoder });
this.playPCMStream(transcoder.output, options);
}
playPCMStream(stream, { seek = 0, volume = 1, passes = 1 } = {}) {
const options = { seek, volume, passes };
this.destroyAllTranscoders(true);
this.destroyAllStreams(true);
const dispatcher = new StreamDispatcher(this, stream, options);
dispatcher.on('speaking', value => this.voiceConnection.setSpeaking(value));
if (!this.streams.has(stream)) this.streams.set(stream, { dispatcher });
this.streams.get(stream).dispatcher = dispatcher;
return dispatcher;
}
}