From 91fc6ccb5c938991739d76407c5e1e9be8f3b2f5 Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Fri, 30 Dec 2016 13:57:09 +0000 Subject: [PATCH] VoiceBroadcasting much more efficient --- src/client/voice/VoiceBroadcast.js | 106 +++++++++++++++--- .../voice/dispatcher/StreamDispatcher.js | 20 +++- test/voice.js | 1 + 3 files changed, 107 insertions(+), 20 deletions(-) diff --git a/src/client/voice/VoiceBroadcast.js b/src/client/voice/VoiceBroadcast.js index 892e0330b..eac7b7180 100644 --- a/src/client/voice/VoiceBroadcast.js +++ b/src/client/voice/VoiceBroadcast.js @@ -1,5 +1,7 @@ const EventEmitter = require('events').EventEmitter; const Prism = require('prism-media'); +const OpusEncoders = require('./opus/OpusEngineList'); +const Collection = require('../../util/Collection'); const ffmpegArguments = [ '-analyzeduration', '0', @@ -13,10 +15,42 @@ class VoiceBroadcast extends EventEmitter { constructor(client) { super(); this.client = client; - this.dispatchers = []; + this.dispatchers = new Collection(); this.prism = new Prism(); + this.opusEncoder = OpusEncoders.fetch(); this.currentTranscoder = null; this.tickInterval = null; + this._volume = 1; + } + + applyVolume(buffer, volume) { + volume = volume || this._volume; + if (volume === 1) return buffer; + + const out = new Buffer(buffer.length); + for (let i = 0; i < buffer.length; i += 2) { + if (i >= buffer.length - 1) break; + const uint = Math.min(32767, Math.max(-32767, Math.floor(volume * buffer.readInt16LE(i)))); + out.writeInt16LE(uint, i); + } + + return out; + } + + setVolume(volume) { + this._volume = volume; + } + + setVolumeDecibels(db) { + this.setVolume(Math.pow(10, db / 20)); + } + + setVolumeLogarithmic(value) { + this.setVolume(Math.pow(value, 1.660964)); + } + + get volume() { + return this._volume; } get _playableStream() { @@ -24,12 +58,30 @@ class VoiceBroadcast extends EventEmitter { return this.currentTranscoder.transcoder.output || this.currentTranscoder.options.stream; } + unregisterDispatcher(dispatcher, old) { + let container = this.dispatchers.get(old || dispatcher.volume); + if (container) { + if (container.delete(dispatcher)) return; + } + for (container of this.dispatchers.values()) { + container.delete(dispatcher); + } + } + registerDispatcher(dispatcher) { - if (!this.dispatchers.includes(dispatcher)) { - this.dispatchers.push(dispatcher); - dispatcher.once('end', () => { - const ind = this.dispatchers.indexOf(dispatcher); - if (ind > -1) this.dispatchers.splice(ind, 1); + if (!this.dispatchers.has(dispatcher.volume)) { + this.dispatchers.set(dispatcher.volume, new Set()); + } + const container = this.dispatchers.get(dispatcher.volume); + if (!container.has(dispatcher)) { + container.add(dispatcher); + dispatcher.once('end', () => this.unregisterDispatcher(dispatcher)); + dispatcher.on('volumeChange', (o, n) => { + this.unregisterDispatcher(dispatcher, o); + if (!this.dispatchers.has(n)) { + this.dispatchers.set(n, new Set()); + } + this.dispatchers.get(n).add(dispatcher); }); } } @@ -78,15 +130,19 @@ class VoiceBroadcast extends EventEmitter { } pause() { - for (const dispatcher of this.dispatchers) { - dispatcher.pause(); + for (const container of this.dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.pause(); + } } clearInterval(this.tickInterval); } resume() { - for (const dispatcher of this.dispatchers) { - dispatcher.resume(); + for (const container of this.dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.resume(); + } } this._startPlaying(); } @@ -99,17 +155,37 @@ class VoiceBroadcast extends EventEmitter { tick() { if (!this._playableStream) return; const stream = this._playableStream; - const buffer = stream.read(1920 * 2); + const bufferLength = 1920 * 2; + let buffer = stream.read(bufferLength); - for (const dispatcher of this.dispatchers) { - setImmediate(() => dispatcher.process(buffer, true)); + if (!buffer) return; + + if (buffer.length !== bufferLength) { + const newBuffer = new Buffer(bufferLength).fill(0); + buffer.copy(newBuffer); + buffer = newBuffer; + } + + buffer = this.applyVolume(buffer); + + for (const x of this.dispatchers.entries()) { + const [volume, container] = x; + if (container.size === 0) continue; + setImmediate(() => { + const opusPacket = this.opusEncoder.encode(this.applyVolume(buffer, volume)); + for (const dispatcher of container.values()) { + dispatcher.process(buffer, true, opusPacket); + } + }); } } end() { this.killCurrentTranscoder(); - for (const dispatcher of this.dispatchers) { - dispatcher.destroy('end', 'broadcast ended'); + for (const container of this.dispatchers.values()) { + for (const dispatcher of container.values()) { + dispatcher.destroy('end', 'broadcast ended'); + } } } } diff --git a/src/client/voice/dispatcher/StreamDispatcher.js b/src/client/voice/dispatcher/StreamDispatcher.js index 7aed21ebc..e20d0545a 100644 --- a/src/client/voice/dispatcher/StreamDispatcher.js +++ b/src/client/voice/dispatcher/StreamDispatcher.js @@ -80,6 +80,7 @@ class StreamDispatcher extends EventEmitter { * @param {number} volume The volume that you want to set */ setVolume(volume) { + this.emit('volumeChange', this.streamOptions.volume, volume); this.streamOptions.volume = volume; } @@ -88,7 +89,7 @@ class StreamDispatcher extends EventEmitter { * @param {number} db The decibels */ setVolumeDecibels(db) { - this.streamOptions.volume = Math.pow(10, db / 20); + this.setVolume(Math.pow(10, db / 20)); } /** @@ -96,7 +97,7 @@ class StreamDispatcher extends EventEmitter { * @param {number} value The value for the volume */ setVolumeLogarithmic(value) { - this.streamOptions.volume = Math.pow(value, 1.660964); + this.setVolume(Math.pow(value, 1.660964)); } /** @@ -128,9 +129,10 @@ class StreamDispatcher extends EventEmitter { this.emit('speaking', value); } - sendBuffer(buffer, sequence, timestamp) { + sendBuffer(buffer, sequence, timestamp, opusPacket) { + opusPacket = opusPacket || this.player.opusEncoder.encode(buffer); let repeats = this.passes; - const packet = this.createPacket(sequence, timestamp, this.player.opusEncoder.encode(buffer)); + const packet = this.createPacket(sequence, timestamp, opusPacket); while (repeats--) { this.player.voiceConnection.sockets.udp.send(packet) .catch(e => this.emit('debug', `Failed to send a packet ${e}`)); @@ -168,7 +170,7 @@ class StreamDispatcher extends EventEmitter { return out; } - process(buffer, controlled) { + process(buffer, controlled, packet) { try { if (this.destroyed) { this.setSpeaking(false); @@ -208,6 +210,14 @@ class StreamDispatcher extends EventEmitter { data.startTime = Date.now(); } + if (packet) { + data.count++; + data.sequence = data.sequence < 65535 ? data.sequence + 1 : 0; + data.timestamp = data.timestamp + 4294967295 ? data.timestamp + 960 : 0; + this.sendBuffer(null, data.sequence, data.timestamp, packet); + return; + } + const bufferLength = 1920 * data.channels; if (!controlled) { buffer = this.stream.read(bufferLength); diff --git a/test/voice.js b/test/voice.js index e41fd53b2..5a6ad1657 100644 --- a/test/voice.js +++ b/test/voice.js @@ -46,6 +46,7 @@ client.on('message', m => { const com = eval(m.content.split(' ').slice(1).join(' ')); m.channel.sendMessage(`\`\`\`\n${com}\`\`\``); } catch (e) { + console.log(e); m.channel.sendMessage(`\`\`\`\n${e}\`\`\``); } }