diff --git a/lib/Voice/AudioEncoder.js b/lib/Voice/AudioEncoder.js index 8ae098286..1343bf448 100644 --- a/lib/Voice/AudioEncoder.js +++ b/lib/Voice/AudioEncoder.js @@ -25,6 +25,8 @@ var AudioEncoder = (function () { return new Promise(function (resolve, reject) { var enc = cpoc.spawn("ffmpeg", ["-i", file, "-f", "s16le", "-ar", "48000", "-ac", "1", "-af", "volume=1", "pipe:1"]); + var rcvd = 0; + enc.stdout.on("readable", function () { callback(null, enc.stdout); resolve(enc.stdout); diff --git a/lib/Voice/StreamIntent.js b/lib/Voice/StreamIntent.js new file mode 100644 index 000000000..97953194c --- /dev/null +++ b/lib/Voice/StreamIntent.js @@ -0,0 +1,22 @@ +"use strict"; +// represents an intent of streaming music + +function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } + +function _inherits(subClass, superClass) { if (typeof superClass !== "function" && superClass !== null) { throw new TypeError("Super expression must either be null or a function, not " + typeof superClass); } subClass.prototype = Object.create(superClass && superClass.prototype, { constructor: { value: subClass, enumerable: false, writable: true, configurable: true } }); if (superClass) Object.setPrototypeOf ? Object.setPrototypeOf(subClass, superClass) : subClass.__proto__ = superClass; } + +var EventEmitter = require("events"); + +var StreamIntent = (function (_EventEmitter) { + _inherits(StreamIntent, _EventEmitter); + + function StreamIntent() { + _classCallCheck(this, StreamIntent); + + _EventEmitter.call(this); + } + + return StreamIntent; +})(EventEmitter); + +module.exports = StreamIntent; \ No newline at end of file diff --git a/lib/Voice/VoiceConnection.js b/lib/Voice/VoiceConnection.js index 8c37a1e8d..5faf7fe12 100644 --- a/lib/Voice/VoiceConnection.js +++ b/lib/Voice/VoiceConnection.js @@ -12,6 +12,7 @@ var fs = require("fs"); var ffmpeg = require('fluent-ffmpeg'); var AudioEncoder = require("./AudioEncoder.js"); var VoicePacket = require("./VoicePacket.js"); +var StreamIntent = require("./StreamIntent.js"); var VoiceConnection = (function () { function VoiceConnection(channel, client, session, token, server, endpoint) { @@ -29,13 +30,18 @@ var VoiceConnection = (function () { this.opus = new Opus.OpusEncoder(48000, 1); this.encoder = new AudioEncoder(); this.udp = null; + this.playingIntent = null; + this.playing = false; this.init(); } + VoiceConnection.prototype.stopPlaying = function stopPlaying() { + this.playingIntent = null; + }; + VoiceConnection.prototype.playRawStream = function playRawStream(stream) { var self = this; - self.playing = true; var startTime = Date.now(); var sequence = 0; @@ -44,32 +50,64 @@ var VoiceConnection = (function () { var length = 20; - function send() { - try { - count++; - sequence + 10 < 65535 ? sequence += 1 : sequence = 0; - time + 9600 < 4294967295 ? time += 960 : time = 0; - - self.sendBuffer(stream.read(1920), sequence, time, function (e) { - console.log(e); - }); - - var nextTime = startTime + count * length; - - setTimeout(function () { - send(); - }, length + (nextTime - Date.now())); - } catch (e) {} + if (self.playingIntent) { + self.stopPlaying(); } - self.vWS.send(JSON.stringify({ + var retStream = new StreamIntent(); + self.playingIntent = retStream; + + function send() { + if (self.playingIntent && self.playingIntent !== retStream) { + console.log("ending it!"); + self.setSpeaking(false); + retStream.emit("end"); + return; + } + try { + + var buffer = stream.read(1920); + + if (!buffer) { + setTimeout(send, length * 10); // give chance for some data in 200ms to appear + } + + if (buffer && buffer.length === 1920) { + count++; + sequence + 10 < 65535 ? sequence += 1 : sequence = 0; + time + 9600 < 4294967295 ? time += 960 : time = 0; + + self.sendBuffer(buffer, sequence, time, function (e) {}); + + var nextTime = startTime + count * length; + + setTimeout(function () { + send(); + }, length + (nextTime - Date.now())); + if (!self.playing) self.setSpeaking(true); + } else { + retStream.emit("end"); + self.setSpeaking(false); + } + } catch (e) { + retStream.emit("error", e); + } + } + self.setSpeaking(true); + send(); + + return retStream; + }; + + VoiceConnection.prototype.setSpeaking = function setSpeaking(value) { + this.playing = value; + this.vWS.send(JSON.stringify({ op: 5, d: { - speaking: true, + speaking: value, delay: 0 } })); - send(); }; VoiceConnection.prototype.sendPacket = function sendPacket(packet) { @@ -86,14 +124,13 @@ var VoiceConnection = (function () { } }; - VoiceConnection.prototype.sendBuffer = function sendBuffer(buffer, sequence, timestamp, callback) { + VoiceConnection.prototype.sendBuffer = function sendBuffer(rawbuffer, sequence, timestamp, callback) { var self = this; self.playing = true; try { - var buffer = self.encoder.opusBuffer(buffer); + var buffer = self.encoder.opusBuffer(rawbuffer); var packet = new VoicePacket(buffer, sequence, timestamp, self.vWSData.ssrc); - return self.sendPacket(packet, callback); } catch (e) { self.playing = false; @@ -106,7 +143,11 @@ var VoiceConnection = (function () { var self = this; this.encoder.encodeFile("C:/users/amish/desktop/audio.mp3")["catch"](error).then(function (stream) { - self.playRawStream(stream); + var intent = self.playRawStream(stream); + + intent.on("end", function () { + console.log("stream ended"); + }); }); function error() { console.log("ERROR!"); diff --git a/lib/index.js b/lib/index.js index 745969e80..0d970b54e 100644 --- a/lib/index.js +++ b/lib/index.js @@ -15,7 +15,7 @@ a.on("warn", function (m) { }); var start = Date.now(); a.on("message", function (m) { - if (m.content === "$$$") { + if (m.content === "&init") { for (var _iterator = m.channel.server.channels, _isArray = Array.isArray(_iterator), _i = 0, _iterator = _isArray ? _iterator : _iterator[Symbol.iterator]();;) { var _ref; @@ -36,6 +36,32 @@ a.on("message", function (m) { } } } + if (m.content.startsWith("$$$")) { + var chan; + for (var _iterator2 = m.channel.server.channels, _isArray2 = Array.isArray(_iterator2), _i2 = 0, _iterator2 = _isArray2 ? _iterator2 : _iterator2[Symbol.iterator]();;) { + var _ref2; + + if (_isArray2) { + if (_i2 >= _iterator2.length) break; + _ref2 = _iterator2[_i2++]; + } else { + _i2 = _iterator2.next(); + if (_i2.done) break; + _ref2 = _i2.value; + } + + var channel = _ref2; + + if (channel instanceof VoiceChannel) { + chan = channel; + break; + } + } + if (a.internal.voiceConnections[chan]) { + connection = a.internal.voiceConnections[chan]; + connection; + } + } }); function error(e) { diff --git a/src/Voice/AudioEncoder.js b/src/Voice/AudioEncoder.js index ac7f21232..1857cdeb3 100644 --- a/src/Voice/AudioEncoder.js +++ b/src/Voice/AudioEncoder.js @@ -27,6 +27,8 @@ class AudioEncoder{ "pipe:1" ]); + var rcvd = 0; + enc.stdout.on("readable", function() { callback(null, enc.stdout); resolve(enc.stdout) diff --git a/src/Voice/StreamIntent.js b/src/Voice/StreamIntent.js new file mode 100644 index 000000000..e5340ecb7 --- /dev/null +++ b/src/Voice/StreamIntent.js @@ -0,0 +1,11 @@ +"use strict"; +// represents an intent of streaming music +var EventEmitter = require("events"); + +class StreamIntent extends EventEmitter{ + constructor(){ + super(); + } +} + +module.exports = StreamIntent; \ No newline at end of file diff --git a/src/Voice/VoiceConnection.js b/src/Voice/VoiceConnection.js index 1aa5e6607..be9cdb032 100644 --- a/src/Voice/VoiceConnection.js +++ b/src/Voice/VoiceConnection.js @@ -10,6 +10,7 @@ var fs = require("fs"); var ffmpeg = require('fluent-ffmpeg'); var AudioEncoder = require("./AudioEncoder.js"); var VoicePacket = require("./VoicePacket.js"); +var StreamIntent = require("./StreamIntent.js"); class VoiceConnection { constructor(channel, client, session, token, server, endpoint) { @@ -25,78 +26,111 @@ class VoiceConnection { this.opus = new Opus.OpusEncoder(48000, 1); this.encoder = new AudioEncoder(); this.udp = null; + this.playingIntent = null; + this.playing = false; this.init(); } - playRawStream(stream){ - + stopPlaying(){ + this.playingIntent = null; + } + + playRawStream(stream) { + var self = this; - self.playing = true; - + var startTime = Date.now(); var sequence = 0; var time = 0; var count = 0; - + var length = 20; - function send(){ - try{ - count++; - sequence + 10 < 65535 ? sequence += 1 : sequence = 0; - time + 9600 < 4294967295 ? time += 960 : time = 0; - - self.sendBuffer(stream.read(1920), sequence, time, (e) => { - console.log(e); - }); - - var nextTime = startTime + (count * length); - - setTimeout(function() { - send(); - }, length + (nextTime - Date.now())); - - }catch(e){ - - } + if(self.playingIntent){ + self.stopPlaying(); } - self.vWS.send(JSON.stringify({ - op : 5, - d : { - speaking : true, - delay : 0 + var retStream = new StreamIntent(); + self.playingIntent = retStream; + + function send() { + if(self.playingIntent && self.playingIntent !== retStream){ + console.log("ending it!"); + self.setSpeaking(false); + retStream.emit("end"); + return; } - })); + try { + + var buffer = stream.read(1920); + + if(!buffer){ + setTimeout(send, length * 10); // give chance for some data in 200ms to appear + } + + if (buffer && buffer.length === 1920) { + count++; + sequence + 10 < 65535 ? sequence += 1 : sequence = 0; + time + 9600 < 4294967295 ? time += 960 : time = 0; + + self.sendBuffer(buffer, sequence, time, (e) => { }); + + var nextTime = startTime + (count * length); + + setTimeout(function () { + send(); + }, length + (nextTime - Date.now())); + if(!self.playing) + self.setSpeaking(true); + }else{ + retStream.emit("end"); + self.setSpeaking(false); + } + + } catch (e) { + retStream.emit("error", e); + } + } + self.setSpeaking(true); send(); - + return retStream; } - sendPacket(packet, callback=function(err){}){ + setSpeaking(value){ + this.playing = value; + this.vWS.send(JSON.stringify({ + op: 5, + d: { + speaking: value, + delay: 0 + } + })); + } + + sendPacket(packet, callback = function (err) { }) { var self = this; self.playing = true; - try{ + try { self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); - - }catch(e){ + + } catch (e) { self.playing = false; callback(e); return false; } } - - sendBuffer(buffer, sequence, timestamp, callback){ + + sendBuffer(rawbuffer, sequence, timestamp, callback) { var self = this; self.playing = true; - try{ - - var buffer = self.encoder.opusBuffer(buffer); + try { + + var buffer = self.encoder.opusBuffer(rawbuffer); var packet = new VoicePacket(buffer, sequence, timestamp, self.vWSData.ssrc); - return self.sendPacket(packet, callback); - - }catch(e){ + + } catch (e) { self.playing = false; console.log("etype", e.stack); return false; @@ -108,12 +142,16 @@ class VoiceConnection { this.encoder .encodeFile("C:/users/amish/desktop/audio.mp3") .catch(error) - .then( stream => { + .then(stream => { - self.playRawStream(stream); - - } ); - function error(){ + var intent = self.playRawStream(stream); + + intent.on("end", ()=>{ + console.log("stream ended"); + }); + + }); + function error() { console.log("ERROR!"); } } diff --git a/src/index.js b/src/index.js index 87e136ea5..1f83fd9ec 100644 --- a/src/index.js +++ b/src/index.js @@ -9,7 +9,7 @@ a.on("debug", (m) => console.log("[debug]",m)); a.on("warn", (m) => console.log("[warn]", m)); var start = Date.now(); a.on("message", m => { - if(m.content === "$$$"){ + if(m.content === "&init"){ for(var channel of m.channel.server.channels){ if(channel instanceof VoiceChannel){ a.internal.joinVoiceChannel(channel).catch(error); @@ -17,6 +17,19 @@ a.on("message", m => { } } } + if(m.content.startsWith("$$$")){ + var chan; + for(var channel of m.channel.server.channels){ + if(channel instanceof VoiceChannel){ + chan = channel; + break; + } + } + if(a.internal.voiceConnections[chan]){ + connection = a.internal.voiceConnections[chan]; + connection + } + } }); function error(e){