From 04c3dbedac55c685219ff335732da1dda762751a Mon Sep 17 00:00:00 2001 From: hydrabolt Date: Sun, 8 Nov 2015 10:09:49 +0000 Subject: [PATCH] Fixed memory leaks --- lib/Client/InternalClient.js | 34 +++++++------- lib/Voice/AudioEncoder.js | 5 ++- lib/Voice/VoiceConnection.js | 22 +++++++-- src/Client/InternalClient.js | 86 +++++++++++++++++------------------- src/Voice/AudioEncoder.js | 2 +- src/Voice/VoiceConnection.js | 54 +++++++++++++--------- test/bot.1.js | 31 ++++--------- 7 files changed, 121 insertions(+), 113 deletions(-) diff --git a/lib/Client/InternalClient.js b/lib/Client/InternalClient.js index e2b46fff3..9356ad1c2 100644 --- a/lib/Client/InternalClient.js +++ b/lib/Client/InternalClient.js @@ -44,26 +44,21 @@ var InternalClient = (function () { this.channels = new Cache(); this.servers = new Cache(); this.private_channels = new Cache(); - this.voiceConnections = new Cache(); + this.voiceConnection = null; this.resolver = new Resolver(this); } //def leaveVoiceChannel - InternalClient.prototype.leaveVoiceChannel = function leaveVoiceChannel(chann) { + InternalClient.prototype.leaveVoiceChannel = function leaveVoiceChannel() { var self = this; return new Promise(function (resolve, reject) { - var channel = self.resolver.resolveVoiceChannel(chann); - - if (channel) { - if (self.voiceConnections[channel]) { - var chan = self.voiceConnections[channel]; - chan.stopPlaying(); - self.voiceConnections.remove(chan); - resolve(); - } + if (self.voiceConnection) { + self.voiceConnection.destroy(); + self.voiceConnection = null; + resolve(); } else { - reject(new Error("voice channel does not exist")); + resolve(); } }); }; @@ -77,8 +72,7 @@ var InternalClient = (function () { var channel = self.resolver.resolveVoiceChannel(chann); if (channel) { - if (!self.voiceConnections.get("id", channel.id)) { - + var next = function next() { var session, token, server = channel.server, @@ -90,16 +84,18 @@ var InternalClient = (function () { if (data.t === "VOICE_STATE_UPDATE") { session = data.d.session_id; + fired++; } else if (data.t === "VOICE_SERVER_UPDATE") { token = data.d.token; endpoint = data.d.endpoint; - - var chan = self.voiceConnections.add(new VoiceConnection(channel, self.client, session, token, server, endpoint)); + fired++; + var chan = self.voiceConnection = new VoiceConnection(channel, self.client, session, token, server, endpoint); chan.on("ready", resolve); chan.on("error", reject); } if (fired >= 2) { + self.client.emit("debug", "removed temporary voice websocket listeners"); self.websocket.removeListener('message', check); } }; @@ -114,9 +110,9 @@ var InternalClient = (function () { "self_deaf": false } }); - } else { - reject(new Error("voice channel connection exists")); - } + }; + + self.leaveVoiceChannel().then(next); } else { reject(new Error("voice channel does not exist")); } diff --git a/lib/Voice/AudioEncoder.js b/lib/Voice/AudioEncoder.js index a495c9991..2cc0b942f 100644 --- a/lib/Voice/AudioEncoder.js +++ b/lib/Voice/AudioEncoder.js @@ -23,9 +23,10 @@ var AudioEncoder = (function () { var self = this; return new Promise(function (resolve, reject) { - var enc = cpoc.spawn("ffmpeg", ["-i", file, "-f", "s16le", "-ar", "48000", "-ac", "1", "-af", "volume=1", "pipe:1"]); + var enc = cpoc.spawn("ffmpeg", ["-i", file, "-f", "s16le", "-ar", "48000", "-ac", "1", // this can be 2 but there's no point, discord makes it mono on playback, wasted bandwidth. + "-af", "volume=1", "pipe:1"]); - enc.stdout.on("readable", function () { + enc.stdout.once("readable", function () { callback(null, { proc: enc, stream: enc.stdout diff --git a/lib/Voice/VoiceConnection.js b/lib/Voice/VoiceConnection.js index ca74fd834..c63dc6dd0 100644 --- a/lib/Voice/VoiceConnection.js +++ b/lib/Voice/VoiceConnection.js @@ -1,4 +1,11 @@ "use strict"; +/* + Major credit to izy521 who is the creator of + https://github.com/izy521/discord.io, + + without his help voice chat in discord.js would not have + been possible! +*/ function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } @@ -42,9 +49,17 @@ var VoiceConnection = (function (_EventEmitter) { this.playing = false; this.streamTime = 0; this.streamProc = null; + this.KAI = null; this.init(); } + VoiceConnection.prototype.destroy = function destroy() { + this.stopPlaying(); + if (this.KAI) clearInterval(this.KAI); + this.vWS.close(); + this.udp.close(); + }; + VoiceConnection.prototype.stopPlaying = function stopPlaying() { this.playing = false; this.playingIntent = null; @@ -123,7 +138,7 @@ var VoiceConnection = (function (_EventEmitter) { VoiceConnection.prototype.setSpeaking = function setSpeaking(value) { this.playing = value; - this.vWS.send(JSON.stringify({ + if (this.vWS.readyState === WebSocket.OPEN) this.vWS.send(JSON.stringify({ op: 5, d: { speaking: value, @@ -138,7 +153,7 @@ var VoiceConnection = (function (_EventEmitter) { var self = this; self.playing = true; try { - self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); + if (self.vWS.readyState === WebSocket.OPEN) self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); } catch (e) { self.playing = false; callback(e); @@ -252,11 +267,12 @@ var VoiceConnection = (function (_EventEmitter) { self.vWSData = data.d; KAI = setInterval(function () { - vWS.send(JSON.stringify({ + if (vWS.readyState === WebSocket.OPEN) vWS.send(JSON.stringify({ op: 3, d: null })); }, data.d.heartbeat_interval); + self.KAI = KAI; var udpPacket = new Buffer(70); udpPacket.writeUIntBE(data.d.ssrc, 0, 4); diff --git a/src/Client/InternalClient.js b/src/Client/InternalClient.js index 2ee3fb8da..387787d72 100644 --- a/src/Client/InternalClient.js +++ b/src/Client/InternalClient.js @@ -40,25 +40,20 @@ class InternalClient { this.channels = new Cache(); this.servers = new Cache(); this.private_channels = new Cache(); - this.voiceConnections = new Cache(); + this.voiceConnection = null; this.resolver = new Resolver(this); } //def leaveVoiceChannel - leaveVoiceChannel(chann){ + leaveVoiceChannel(){ var self = this; return new Promise((resolve, reject) => { - var channel = self.resolver.resolveVoiceChannel(chann); - - if(channel){ - if(self.voiceConnections[channel]){ - var chan = self.voiceConnections[channel]; - chan.stopPlaying(); - self.voiceConnections.remove(chan); - resolve(); - } + if(self.voiceConnection){ + self.voiceConnection.destroy(); + self.voiceConnection = null; + resolve(); }else{ - reject(new Error("voice channel does not exist")); + resolve(); } }); } @@ -71,45 +66,46 @@ class InternalClient { var channel = self.resolver.resolveVoiceChannel(chann); if(channel){ - if(!self.voiceConnections.get("id", channel.id)){ - var session, token, server = channel.server, endpoint, fired = 0; + self.leaveVoiceChannel().then(next); - var check = (m) => { - var data = JSON.parse(m); + function next(){ + var session, token, server = channel.server, endpoint, fired = 0; - if(data.t === "VOICE_STATE_UPDATE"){ - session = data.d.session_id; - }else if(data.t === "VOICE_SERVER_UPDATE"){ - token = data.d.token; - endpoint = data.d.endpoint; + var check = (m) => { + var data = JSON.parse(m); - var chan = self.voiceConnections.add(new VoiceConnection(channel, self.client, session, token, server, endpoint)); + if(data.t === "VOICE_STATE_UPDATE"){ + session = data.d.session_id; + fired++; + }else if(data.t === "VOICE_SERVER_UPDATE"){ + token = data.d.token; + endpoint = data.d.endpoint; + fired++; + var chan = self.voiceConnection = new VoiceConnection(channel, self.client, session, token, server, endpoint); + + chan.on("ready", resolve); + chan.on("error", reject); + + } + if(fired >= 2){ + self.client.emit("debug", "removed temporary voice websocket listeners"); + self.websocket.removeListener('message', check); + } - chan.on("ready", resolve); - chan.on("error", reject); - - } - if(fired >= 2){ - self.websocket.removeListener('message', check); - } + }; - }; - - self.websocket.on("message", check); - self.sendWS({ - op : 4, - d : { - "guild_id" : server.id, - "channel_id" : channel.id, - "self_mute" : false, - "self_deaf" : false - } - }); - - }else{ - reject(new Error("voice channel connection exists")); - } + self.websocket.on("message", check); + self.sendWS({ + op : 4, + d : { + "guild_id" : server.id, + "channel_id" : channel.id, + "self_mute" : false, + "self_deaf" : false + } + }); + } }else{ reject(new Error("voice channel does not exist")); } diff --git a/src/Voice/AudioEncoder.js b/src/Voice/AudioEncoder.js index ad3a2e20d..4597fe803 100644 --- a/src/Voice/AudioEncoder.js +++ b/src/Voice/AudioEncoder.js @@ -27,7 +27,7 @@ class AudioEncoder{ "pipe:1" ]); - enc.stdout.on("readable", function() { + enc.stdout.once("readable", function() { callback(null, { proc : enc, stream : enc.stdout diff --git a/src/Voice/VoiceConnection.js b/src/Voice/VoiceConnection.js index 9770a384c..e2ca9f1ab 100644 --- a/src/Voice/VoiceConnection.js +++ b/src/Voice/VoiceConnection.js @@ -18,10 +18,10 @@ var VoicePacket = require("./VoicePacket.js"); var StreamIntent = require("./StreamIntent.js"); var EventEmitter = require("events"); -class VoiceConnection extends EventEmitter{ +class VoiceConnection extends EventEmitter { constructor(channel, client, session, token, server, endpoint) { super(); - if(!Opus){ + if (!Opus) { console.log("HEY! WATCH OUT\n\n discord.js needs node-opus, you don't have it installed."); } this.id = channel.id; @@ -41,13 +41,22 @@ class VoiceConnection extends EventEmitter{ this.playing = false; this.streamTime = 0; this.streamProc = null; + this.KAI = null; this.init(); } + destroy() { + this.stopPlaying(); + if(this.KAI) + clearInterval(this.KAI); + this.vWS.close(); + this.udp.close(); + } + stopPlaying() { - this.playing=false; + this.playing = false; this.playingIntent = null; - if(this.streamProc) + if (this.streamProc) this.streamProc.kill(); } @@ -78,12 +87,12 @@ class VoiceConnection extends EventEmitter{ } try { var buffer = stream.read(1920); - + if (!buffer) { setTimeout(send, length * 10); // give chance for some data in 200ms to appear return; } - + if (buffer.length !== 1920) { if (onWarning) { retStream.emit("end"); @@ -126,21 +135,22 @@ class VoiceConnection extends EventEmitter{ setSpeaking(value) { this.playing = value; - this.vWS.send(JSON.stringify({ - op: 5, - d: { - speaking: value, - delay: 0 - } - })); + if (this.vWS.readyState === WebSocket.OPEN) + this.vWS.send(JSON.stringify({ + op: 5, + d: { + speaking: value, + delay: 0 + } + })); } sendPacket(packet, callback = function (err) { }) { var self = this; self.playing = true; try { - self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); - + if (self.vWS.readyState === WebSocket.OPEN) + self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); } catch (e) { self.playing = false; callback(e); @@ -180,7 +190,7 @@ class VoiceConnection extends EventEmitter{ .encodeFile(stream) .catch(error) .then(data => { - + self.streamProc = data.proc; var intent = self.playRawStream(data.stream); resolve(intent); @@ -251,16 +261,18 @@ class VoiceConnection extends EventEmitter{ self.vWSData = data.d; KAI = setInterval(() => { - vWS.send(JSON.stringify({ - op: 3, - d: null - })); + if (vWS.readyState === WebSocket.OPEN) + vWS.send(JSON.stringify({ + op: 3, + d: null + })); }, data.d.heartbeat_interval); + self.KAI = KAI; var udpPacket = new Buffer(70); udpPacket.writeUIntBE(data.d.ssrc, 0, 4); udpClient.send(udpPacket, 0, udpPacket.length, data.d.port, self.endpoint, err => { - if(err) + if (err) self.emit("error", err) }); break; diff --git a/test/bot.1.js b/test/bot.1.js index 35fbc54d8..74530b68d 100644 --- a/test/bot.1.js +++ b/test/bot.1.js @@ -7,38 +7,25 @@ client.on("message", m => { if(m.content === "&init"){ for(var channel of m.channel.server.channels){ if(channel instanceof Discord.VoiceChannel){ - client.joinVoiceChannel(channel).catch(error) - .then(connection => { - connection.playFile("C:/users/amish/desktop/Developers.mp3"); - }); + client.joinVoiceChannel(channel).catch(error); break; } } } if(m.content.startsWith("$$$ stop")){ - for(var channel of m.channel.server.channels){ - if(channel instanceof Discord.VoiceChannel){ - chan = channel; - break; - } - } - if(client.internal.voiceConnections.get("id", chan.id)){ - var connection = client.internal.voiceConnections.get("id", chan.id); - connection.stopPlaying(); + if(client.internal.voiceConnection){ + client.internal.voiceConnection.stopPlaying(); } return; } if(m.content.startsWith("$$$")){ var chan; - for(var channel of m.channel.server.channels){ - if(channel instanceof Discord.VoiceChannel){ - chan = channel; - break; - } - } - if(client.internal.voiceConnections.get("id", chan.id)){ - var connection = client.internal.voiceConnections.get("id", chan.id); - connection.playFile("C:/users/amish/desktop/Developers.mp3"); + var rest = m.content.split(" "); + rest.splice(0, 1); + rest = rest.join(" "); + if(client.internal.voiceConnection){ + var connection = client.internal.voiceConnection; + connection.playFile("C:/users/amish/desktop/"+rest); } } });