Merge branch 'indev-rewrite-voice' into indev-rewrite

This commit is contained in:
Amish Shah
2016-08-26 17:35:10 +01:00
6 changed files with 211 additions and 3 deletions

File diff suppressed because one or more lines are too long

View File

@@ -1,5 +1,6 @@
const VoiceConnectionWebSocket = require('./VoiceConnectionWebSocket');
const VoiceConnectionUDPClient = require('./VoiceConnectionUDPClient');
const VoiceReceiver = require('./receiver/VoiceReceiver');
const Constants = require('../../util/Constants');
const EventEmitter = require('events').EventEmitter;
const DefaultPlayer = require('./player/DefaultPlayer');
@@ -55,6 +56,9 @@ class VoiceConnection extends EventEmitter {
* @private
*/
this._reject = reject;
this.ssrcMap = new Map();
this.queue = [];
this.receivers = [];
this.bindListeners();
}
@@ -137,11 +141,48 @@ class VoiceConnection extends EventEmitter {
* Emitted once the connection is ready (joining voice channels resolves when the connection is ready anyway)
* @event VoiceConnection#ready
*/
this.emit('ready');
this._resolve(this);
this.emit('ready');
});
this.once('ready', () => {
setImmediate(() => {
for (const item of this.queue) {
this.emit(...item);
}
this.queue = [];
});
});
this.websocket.on('speaking', data => {
const guild = this.channel.guild;
const user = this.manager.client.users.get(data.user_id);
this.ssrcMap.set(+data.ssrc, user);
if (!data.speaking) {
for (const receiver of this.receivers) {
const opusStream = receiver.opusStreams.get(user.id);
const pcmStream = receiver.pcmStreams.get(user.id);
if (opusStream) {
opusStream.push(null);
opusStream.open = false;
receiver.opusStreams.delete(user.id);
}
if (pcmStream) {
pcmStream.push(null);
pcmStream.open = false;
receiver.pcmStreams.delete(user.id);
}
}
}
/**
* Emitted whenever a user starts/stops speaking
* @event VoiceConnection#speaking
* @param {User} user the user that has started/stopped speaking
* @param {Boolean} speaking whether or not the user is speaking
*/
if (this.ready) {
this.emit('speaking', user, data.speaking);
} else {
this.queue.push(['speaking', user, data.speaking]);
}
guild._memberSpeakUpdate(data.user_id, data.speaking);
});
}
@@ -179,6 +220,16 @@ class VoiceConnection extends EventEmitter {
playStream(stream) {
return this.player.playStream(stream);
}
/**
* Creates a VoiceReceiver so you can start listening to voice data. It's recommended to only create one of these.
* @returns {VoiceReceiver}
*/
createReceiver() {
const rcv = new VoiceReceiver(this);
this.receivers.push(rcv);
return rcv;
}
}
module.exports = VoiceConnection;

View File

@@ -0,0 +1,20 @@
const Readable = require('stream').Readable;
class VoiceReadable extends Readable {
constructor() {
super();
this._packets = [];
this.open = true;
}
_read() {
}
$push(d) {
if (this.open) {
this.push(d);
}
}
}
module.exports = VoiceReadable;

View File

@@ -0,0 +1,124 @@
const EventEmitter = require('events').EventEmitter;
const NaCl = require('tweetnacl');
const Readable = require('./VoiceReadable');
const nonce = new Buffer(24);
nonce.fill(0);
/**
* Receives voice data from a voice connection.
* @extends {EventEmitter}
*/
class VoiceReceiver extends EventEmitter {
constructor(connection) {
super();
/*
need a queue because we don't get the ssrc of the user speaking until after the first few packets,
so we queue up unknown SSRCs until they become known, then empty the queue.
*/
this.queues = new Map();
this.pcmStreams = new Map();
this.opusStreams = new Map();
/**
* The VoiceConnection that instantiated this
* @type {VoiceConnection}
*/
this.connection = connection;
this.connection.udp.udpSocket.on('message', msg => {
const ssrc = +msg.readUInt32BE(8).toString(10);
const user = this.connection.ssrcMap.get(ssrc);
if (!user) {
if (!this.queues.has(ssrc)) {
this.queues.set(ssrc, []);
}
this.queues.get(ssrc).push(msg);
} else {
if (this.queues.get(ssrc)) {
this.queues.get(ssrc).push(msg);
this.queues.get(ssrc).map(m => this.handlePacket(m, user));
this.queues.delete(ssrc);
return;
}
this.handlePacket(msg, user);
}
});
}
/**
* Creates a readable stream for a user that provides opus data while the user is speaking. When the user
* stops speaking, the stream is destroyed.
* @param {UserResolvable} user the user to create the stream for
* @returns {ReadableStream}
*/
createOpusStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user);
if (!user) {
throw new Error('invalid user object supplied');
}
if (this.opusStreams.get(user.id)) {
throw new Error('there is already an existing stream for that user!');
}
const stream = new Readable();
this.opusStreams.set(user.id, stream);
return stream;
}
/**
* Creates a readable stream for a user that provides PCM data while the user is speaking. When the user
* stops speaking, the stream is destroyed. The stream is 32-bit signed PCM at 48KHz.
* @param {UserResolvable} user the user to create the stream for
* @returns {ReadableStream}
*/
createPCMStream(user) {
user = this.connection.manager.client.resolver.resolveUser(user);
if (!user) {
throw new Error('invalid user object supplied');
}
if (this.pcmStreams.get(user.id)) {
throw new Error('there is already an existing stream for that user!');
}
const stream = new Readable();
this.pcmStreams.set(user.id, stream);
return stream;
}
handlePacket(msg, user) {
msg.copy(nonce, 0, 0, 12);
let data = NaCl.secretbox.open(msg.slice(12), nonce, this.connection.data.secret);
if (!data) {
/**
* Emitted whenever a voice packet cannot be decrypted
* @event VoiceReceiver#warn
* @param {String} message the warning message
*/
return this.emit('warn', 'failed to decrypt voice packet');
}
data = new Buffer(data);
/**
* Emitted whenever voice data is received from the voice connection. This is _always_ emitted (unlike PCM).
* @event VoiceReceiver#opus
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the opus buffer
*/
if (this.opusStreams.get(user.id)) {
this.opusStreams.get(user.id).$push(data);
}
this.emit('opus', user, data);
if (this.listenerCount('pcm') > 0 || this.pcmStreams.size > 0) {
/**
* Emits decoded voice data when it's received. For performance reasons, the decoding will only
* happen if there is at least one `pcm` listener on this receiver.
* @event VoiceReceiver#pcm
* @param {User} user the user that is sending the buffer (is speaking)
* @param {Buffer} buffer the decoded buffer
*/
const pcm = this.connection.player.opusEncoder.decode(data);
if (this.pcmStreams.get(user.id)) {
this.pcmStreams.get(user.id).$push(pcm);
}
this.emit('pcm', user, pcm);
}
}
}
module.exports = VoiceReceiver;

View File

@@ -121,7 +121,9 @@ class Message {
const channMentionsRaw = data.content.match(/<#([0-9]{14,20})>/g) || [];
for (const raw of channMentionsRaw) {
const chan = this.channel.guild.channels.get(raw.match(/([0-9]{14,20})/g)[0]);
this.mentions.channels.set(chan.id, chan);
if (chan) {
this.mentions.channels.set(chan.id, chan);
}
}
}
}

View File

@@ -2,6 +2,7 @@
const Discord = require('../');
const request = require('superagent');
const fs = require('fs-extra');
const client = new Discord.Client();
@@ -144,6 +145,16 @@ client.on('message', msg => {
const disp = conn.player.playStream(ytdl('https://www.youtube.com/watch?v=nbXgHAzUWB0', {filter : 'audioonly'}));
conn.player.on('debug', console.log);
conn.player.on('error', err => console.log(123, err));
const receiver = conn.createReceiver();
const out = fs.createWriteStream('C:/Users/Amish/Desktop/output.pcm');
conn.once('speaking', (user, speaking) => {
if (speaking) {
msg.reply(`${user.username} start`);
const str = receiver.createPCMStream(user);
str.pipe(out);
str.on('end', () => msg.reply(`${user.username} end`));
}
});
disp.on('error', err => console.log(123, err));
})
.catch(console.log);