Add VoiceReceiver streams

This commit is contained in:
Amish Shah
2016-08-26 17:34:23 +01:00
parent 8507e34995
commit 6ddc7a813c
6 changed files with 136 additions and 18 deletions

View File

@@ -0,0 +1,20 @@
const Readable = require('stream').Readable;
class VoiceReadable extends Readable {
constructor() {
super();
this._packets = [];
this.open = true;
}
_read() {
}
$push(d) {
if (this.open) {
this.push(d);
}
}
}
module.exports = VoiceReadable;

View File

@@ -1,5 +1,6 @@
const EventEmitter = require('events').EventEmitter;
const NaCl = require('tweetnacl');
const Readable = require('./VoiceReadable');
const nonce = new Buffer(24);
nonce.fill(0);
@@ -16,6 +17,8 @@ class VoiceReceiver extends EventEmitter {
so we queue up unknown SSRCs until they become known, then empty the queue.
*/
this.queues = new Map();
this.pcmStreams = new Map();
this.opusStreams = new Map();
/**
* The VoiceConnection that instantiated this
* @type {VoiceConnection}
@@ -29,17 +32,56 @@ class VoiceReceiver extends EventEmitter {
this.queues.set(ssrc, []);
}
this.queues.get(ssrc).push(msg);
}
if (user) {
} else {
if (this.queues.get(ssrc)) {
this.queues.get(ssrc).push(msg);
this.queues.get(ssrc).map(m => this.handlePacket(m, user));
this.queues.delete(ssrc);
return;
}
this.handlePacket(msg, user);
}
});
}
/**
* Creates a readable stream for a user that provides opus data while the user is speaking. When the user
* stops speaking, the stream is destroyed.
* @param {UserResolvable} user the user to create the stream for
* @returns {ReadableStream}
*/
createOpusStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user);
if (!user) {
throw new Error('invalid user object supplied');
}
if (this.opusStreams.get(user.id)) {
throw new Error('there is already an existing stream for that user!');
}
const stream = new Readable();
this.opusStreams.set(user.id, stream);
return stream;
}
/**
* Creates a readable stream for a user that provides PCM data while the user is speaking. When the user
* stops speaking, the stream is destroyed. The stream is 32-bit signed PCM at 48KHz.
* @param {UserResolvable} user the user to create the stream for
* @returns {ReadableStream}
*/
createPCMStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user);
if (!user) {
throw new Error('invalid user object supplied');
}
if (this.pcmStreams.get(user.id)) {
throw new Error('there is already an existing stream for that user!');
}
const stream = new Readable();
this.pcmStreams.set(user.id, stream);
return stream;
}
handlePacket(msg, user) {
msg.copy(nonce, 0, 0, 12);
let data = NaCl.secretbox.open(msg.slice(12), nonce, this.connection.data.secret);
@@ -58,16 +100,23 @@ class VoiceReceiver extends EventEmitter {
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the opus buffer
*/
if (this.opusStreams.get(user.id)) {
this.opusStreams.get(user.id).$push(data);
}
this.emit('opus', user, data);
if (this.listenerCount('pcm') > 0) {
/**
* Emits decoded voice data when it's received. For performance reasons, the decoding will only
* happen if there is at least one `pcm` listener on this receiver.
* @event VoiceReceiver#pcm
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the decoded buffer
*/
this.emit('pcm', user, this.connection.player.opusEncoder.decode(data));
if (this.listenerCount('pcm') > 0 || this.pcmStreams.size > 0) {
/**
* Emits decoded voice data when it's received. For performance reasons, the decoding will only
* happen if there is at least one `pcm` listener on this receiver.
* @event VoiceReceiver#pcm
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the decoded buffer
*/
const pcm = this.connection.player.opusEncoder.decode(data);
if (this.pcmStreams.get(user.id)) {
this.pcmStreams.get(user.id).$push(pcm);
}
this.emit('pcm', user, pcm);
}
}
}