Added Opus stream support, added volume interface (#1102)

* Added opus stream support, added volume interface

* Remove setImmediate

* Fix weird syntax error

* Most useless commit ever

You're welcome, @PgBiel

* Fix potential memory leak with OpusScript

Emscripten has the tendency to not free resources even when the Opus engine instance has been garbage collected. Thanks to @abalabahaha for pointing this out.

* Typo

* VoiceReceiver.destroy: destroy opus encoder
This commit is contained in:
Programmix
2017-01-29 11:07:33 -08:00
committed by Amish Shah
parent 6fae17912e
commit 7ed58f5f7f
9 changed files with 262 additions and 190 deletions

View File

@@ -1,4 +1,4 @@
const EventEmitter = require('events').EventEmitter;
const VolumeInterface = require('../util/VolumeInterface');
const NaCl = require('tweetnacl');
const VoiceBroadcast = require('../VoiceBroadcast');
@@ -16,9 +16,9 @@ nonce.fill(0);
* ```
* @extends {EventEmitter}
*/
class StreamDispatcher extends EventEmitter {
class StreamDispatcher extends VolumeInterface {
constructor(player, stream, streamOptions) {
super();
super(streamOptions);
/**
* The Audio Player that controls this dispatcher
* @type {AudioPlayer}
@@ -31,7 +31,6 @@ class StreamDispatcher extends EventEmitter {
this.stream = stream;
if (!(this.stream instanceof VoiceBroadcast)) this.startStreaming();
this.streamOptions = streamOptions;
this.streamOptions.volume = this.streamOptions.volume || 0;
const data = this.streamingData;
data.length = 20;
@@ -48,7 +47,7 @@ class StreamDispatcher extends EventEmitter {
*/
this.destroyed = false;
this.setVolume(streamOptions.volume || 1);
this._opus = streamOptions.opus;
}
/**
@@ -86,46 +85,6 @@ class StreamDispatcher extends EventEmitter {
return this.time + this.streamingData.pausedTime;
}
/**
* The volume of the stream, relative to the stream's input volume
* @type {number}
* @readonly
*/
get volume() {
return this.streamOptions.volume;
}
/**
* Sets the volume relative to the input stream - i.e. 1 is normal, 0.5 is half, 2 is double.
* @param {number} volume The volume that you want to set
*/
setVolume(volume) {
/**
* Emitted when the volume of this dispatcher changes
* @param {number} oldVolume the old volume
* @param {number} newVolume the new volume
* @event StreamDispatcher#volumeChange
*/
this.emit('volumeChange', this.streamOptions.volume, volume);
this.streamOptions.volume = volume;
}
/**
* Set the volume in decibels
* @param {number} db The decibels
*/
setVolumeDecibels(db) {
this.setVolume(Math.pow(10, db / 20));
}
/**
* Set the volume so that a perceived value of 0.5 is half the perceived volume etc.
* @param {number} value The value for the volume
*/
setVolumeLogarithmic(value) {
this.setVolume(Math.pow(value, 1.660964));
}
/**
* Stops sending voice packets to the voice connection (stream may still progress however)
*/
@@ -196,20 +155,7 @@ class StreamDispatcher extends EventEmitter {
return packetBuffer;
}
applyVolume(buffer) {
if (this.volume === 1) return buffer;
const out = Buffer.alloc(buffer.length);
for (let i = 0; i < buffer.length; i += 2) {
if (i >= buffer.length - 1) break;
const uint = Math.min(32767, Math.max(-32767, Math.floor(this.volume * buffer.readInt16LE(i))));
out.writeInt16LE(uint, i);
}
return out;
}
process(buffer, controlled, packet) {
processPacket(packet) {
try {
if (this.destroyed) {
this.setSpeaking(false);
@@ -218,7 +164,38 @@ class StreamDispatcher extends EventEmitter {
const data = this.streamingData;
if (data.missed >= 5 && !controlled) {
if (this.paused) {
this.setSpeaking(false);
data.pausedTime = data.length * 10;
return;
}
if (!packet) {
data.missed++;
data.pausedTime += data.length * 10;
return;
}
this.started();
this.missed = 0;
this.stepStreamingData();
this.sendBuffer(null, data.sequence, data.timestamp, packet);
} catch (e) {
this.destroy('error', e);
}
}
process() {
try {
if (this.destroyed) {
this.setSpeaking(false);
return;
}
const data = this.streamingData;
if (data.missed >= 5) {
this.destroy('end', 'Stream is not generating quickly enough.');
return;
}
@@ -227,61 +204,30 @@ class StreamDispatcher extends EventEmitter {
this.setSpeaking(false);
// data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
data.pausedTime += data.length * 10;
// if buffer is provided we are assuming a master process is controlling the dispatcher
if (!buffer) this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
return;
}
if (!buffer && controlled) {
this.started();
const buffer = this.readStreamBuffer();
if (!buffer) {
data.missed++;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
return;
}
if (!data.startTime) {
/**
* Emitted once the dispatcher starts streaming
* @event StreamDispatcher#start
*/
this.emit('start');
data.startTime = Date.now();
}
if (packet) {
data.count++;
data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0;
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
this.sendBuffer(null, data.sequence, data.timestamp, packet);
return;
}
const bufferLength = 1920 * data.channels;
if (!controlled) {
buffer = this.stream.read(bufferLength);
if (!buffer) {
data.missed++;
data.pausedTime += data.length * 10;
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), data.length * 10);
return;
}
}
data.missed = 0;
if (buffer.length !== bufferLength) {
const newBuffer = Buffer.alloc(bufferLength).fill(0);
buffer.copy(newBuffer);
buffer = newBuffer;
this.stepStreamingData();
if (this._opus) {
this.sendBuffer(null, data.sequence, data.timestamp, buffer);
} else {
this.sendBuffer(buffer, data.sequence, data.timestamp);
}
buffer = this.applyVolume(buffer);
data.count++;
data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0;
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
this.sendBuffer(buffer, data.sequence, data.timestamp);
if (controlled) return;
const nextTime = data.length + (data.startTime + data.pausedTime + (data.count * data.length) - Date.now());
this.player.voiceConnection.voiceManager.client.setTimeout(() => this.process(), nextTime);
} catch (e) {
@@ -289,6 +235,43 @@ class StreamDispatcher extends EventEmitter {
}
}
readStreamBuffer() {
const data = this.streamingData;
const bufferLength = (this._opus ? 80 : 1920) * data.channels;
let buffer = this.stream.read(bufferLength);
if (this._opus) return buffer;
if (!buffer) return null;
if (buffer.length !== bufferLength) {
const newBuffer = Buffer.alloc(bufferLength).fill(0);
buffer.copy(newBuffer);
buffer = newBuffer;
}
buffer = this.applyVolume(buffer);
return buffer;
}
started() {
const data = this.streamingData;
if (!data.startTime) {
/**
* Emitted once the dispatcher starts streaming
* @event StreamDispatcher#start
*/
this.emit('start');
data.startTime = Date.now();
}
}
stepStreamingData() {
const data = this.streamingData;
data.count++;
data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0;
data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0;
}
destroy(type, reason) {
if (this.destroyed) return;
this.destroyed = true;