Add voice receiving again \o/

This commit is contained in:
Amish Shah
2016-10-25 19:41:54 +01:00
parent b50bec8a44
commit f77af72d71
2 changed files with 49 additions and 9 deletions

View File

@@ -2,6 +2,7 @@ const VoiceWebSocket = require('./VoiceWebSocket');
const VoiceUDP = require('./VoiceUDPClient'); const VoiceUDP = require('./VoiceUDPClient');
const Constants = require('../../util/Constants'); const Constants = require('../../util/Constants');
const AudioPlayer = require('./player/AudioPlayer'); const AudioPlayer = require('./player/AudioPlayer');
const VoiceReceiver = require('./receiver/VoiceReceiver');
const EventEmitter = require('events').EventEmitter; const EventEmitter = require('events').EventEmitter;
const fs = require('fs'); const fs = require('fs');
@@ -31,6 +32,8 @@ class VoiceConnection extends EventEmitter {
*/ */
this.channel = pendingConnection.channel; this.channel = pendingConnection.channel;
this.receivers = [];
/** /**
* The authentication data needed to connect to the voice server * The authentication data needed to connect to the voice server
* @type {object} * @type {object}
@@ -48,6 +51,8 @@ class VoiceConnection extends EventEmitter {
this.player.cleanup(); this.player.cleanup();
}); });
this.ssrcMap = new Map();
/** /**
* Object that wraps contains the `ws` and `udp` sockets of this voice connection * Object that wraps contains the `ws` and `udp` sockets of this voice connection
* @type {object} * @type {object}
@@ -110,6 +115,35 @@ class VoiceConnection extends EventEmitter {
this.authentication.secretKey = secret; this.authentication.secretKey = secret;
this.emit('ready'); this.emit('ready');
}); });
this.sockets.ws.on('speaking', data => {
const guild = this.channel.guild;
const user = this.voiceManager.client.users.get(data.user_id);
this.ssrcMap.set(+data.ssrc, user);
if (!data.speaking) {
for (const receiver of this.receivers) {
const opusStream = receiver.opusStreams.get(user.id);
const pcmStream = receiver.pcmStreams.get(user.id);
if (opusStream) {
opusStream.push(null);
opusStream.open = false;
receiver.opusStreams.delete(user.id);
}
if (pcmStream) {
pcmStream.push(null);
pcmStream.open = false;
receiver.pcmStreams.delete(user.id);
}
}
}
/**
* Emitted whenever a user starts/stops speaking
* @event VoiceConnection#speaking
* @param {User} user The user that has started/stopped speaking
* @param {boolean} speaking Whether or not the user is speaking
*/
if (this.ready) this.emit('speaking', user, data.speaking);
guild._memberSpeakUpdate(data.user_id, data.speaking);
});
} }
playFile(file, options) { playFile(file, options) {
@@ -126,6 +160,12 @@ class VoiceConnection extends EventEmitter {
return this.player.playPCMStream(stream, options); return this.player.playPCMStream(stream, options);
} }
createReceiver() {
const receiver = new VoiceReceiver(this);
this.receivers.push(receiver);
return receiver;
}
} }
module.exports = VoiceConnection; module.exports = VoiceConnection;

View File

@@ -34,10 +34,10 @@ class VoiceReceiver extends EventEmitter {
* The VoiceConnection that instantiated this * The VoiceConnection that instantiated this
* @type {VoiceConnection} * @type {VoiceConnection}
*/ */
this.connection = connection; this.voiceConnection = connection;
this._listener = (msg => { this._listener = (msg => {
const ssrc = +msg.readUInt32BE(8).toString(10); const ssrc = +msg.readUInt32BE(8).toString(10);
const user = this.connection.ssrcMap.get(ssrc); const user = this.voiceConnection.ssrcMap.get(ssrc);
if (!user) { if (!user) {
if (!this.queues.has(ssrc)) this.queues.set(ssrc, []); if (!this.queues.has(ssrc)) this.queues.set(ssrc, []);
this.queues.get(ssrc).push(msg); this.queues.get(ssrc).push(msg);
@@ -51,7 +51,7 @@ class VoiceReceiver extends EventEmitter {
this.handlePacket(msg, user); this.handlePacket(msg, user);
} }
}).bind(this); }).bind(this);
this.connection.udp.udpSocket.on('message', this._listener); this.voiceConnection.sockets.udp.socket.on('message', this._listener);
} }
/** /**
@@ -61,7 +61,7 @@ class VoiceReceiver extends EventEmitter {
*/ */
recreate() { recreate() {
if (!this.destroyed) return; if (!this.destroyed) return;
this.connection.udp.udpSocket.on('message', this._listener); this.voiceConnection.sockets.udp.socket.on('message', this._listener);
this.destroyed = false; this.destroyed = false;
return; return;
} }
@@ -70,7 +70,7 @@ class VoiceReceiver extends EventEmitter {
* Destroy this VoiceReceiver, also ending any streams that it may be controlling. * Destroy this VoiceReceiver, also ending any streams that it may be controlling.
*/ */
destroy() { destroy() {
this.connection.udp.udpSocket.removeListener('message', this._listener); this.voiceConnection.sockets.udp.socket.removeListener('message', this._listener);
for (const stream of this.pcmStreams) { for (const stream of this.pcmStreams) {
stream[1]._push(null); stream[1]._push(null);
this.pcmStreams.delete(stream[0]); this.pcmStreams.delete(stream[0]);
@@ -89,7 +89,7 @@ class VoiceReceiver extends EventEmitter {
* @returns {ReadableStream} * @returns {ReadableStream}
*/ */
createOpusStream(user) { createOpusStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user); user = this.voiceConnection.voiceManager.client.resolver.resolveUser(user);
if (!user) throw new Error('Couldn\'t resolve the user to create Opus stream.'); if (!user) throw new Error('Couldn\'t resolve the user to create Opus stream.');
if (this.opusStreams.get(user.id)) throw new Error('There is already an existing stream for that user.'); if (this.opusStreams.get(user.id)) throw new Error('There is already an existing stream for that user.');
const stream = new Readable(); const stream = new Readable();
@@ -104,7 +104,7 @@ class VoiceReceiver extends EventEmitter {
* @returns {ReadableStream} * @returns {ReadableStream}
*/ */
createPCMStream(user) { createPCMStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user); user = this.voiceConnection.voiceManager.client.resolver.resolveUser(user);
if (!user) throw new Error('Couldn\'t resolve the user to create PCM stream.'); if (!user) throw new Error('Couldn\'t resolve the user to create PCM stream.');
if (this.pcmStreams.get(user.id)) throw new Error('There is already an existing stream for that user.'); if (this.pcmStreams.get(user.id)) throw new Error('There is already an existing stream for that user.');
const stream = new Readable(); const stream = new Readable();
@@ -114,7 +114,7 @@ class VoiceReceiver extends EventEmitter {
handlePacket(msg, user) { handlePacket(msg, user) {
msg.copy(nonce, 0, 0, 12); msg.copy(nonce, 0, 0, 12);
let data = NaCl.secretbox.open(msg.slice(12), nonce, this.connection.data.secret); let data = NaCl.secretbox.open(msg.slice(12), nonce, this.voiceConnection.authentication.secretKey.key);
if (!data) { if (!data) {
/** /**
* Emitted whenever a voice packet cannot be decrypted * Emitted whenever a voice packet cannot be decrypted
@@ -141,7 +141,7 @@ class VoiceReceiver extends EventEmitter {
* @param {User} user The user that is sending the buffer (is speaking) * @param {User} user The user that is sending the buffer (is speaking)
* @param {Buffer} buffer The decoded buffer * @param {Buffer} buffer The decoded buffer
*/ */
const pcm = this.connection.player.opusEncoder.decode(data); const pcm = this.voiceConnection.player.opusEncoder.decode(data);
if (this.pcmStreams.get(user.id)) this.pcmStreams.get(user.id)._push(pcm); if (this.pcmStreams.get(user.id)) this.pcmStreams.get(user.id)._push(pcm);
this.emit('pcm', user, pcm); this.emit('pcm', user, pcm);
} }