Fixed memory leaks

This commit is contained in:
hydrabolt
2015-11-08 10:09:49 +00:00
parent 99a63db142
commit 04c3dbedac
7 changed files with 121 additions and 113 deletions

View File

@@ -44,26 +44,21 @@ var InternalClient = (function () {
this.channels = new Cache(); this.channels = new Cache();
this.servers = new Cache(); this.servers = new Cache();
this.private_channels = new Cache(); this.private_channels = new Cache();
this.voiceConnections = new Cache(); this.voiceConnection = null;
this.resolver = new Resolver(this); this.resolver = new Resolver(this);
} }
//def leaveVoiceChannel //def leaveVoiceChannel
InternalClient.prototype.leaveVoiceChannel = function leaveVoiceChannel(chann) { InternalClient.prototype.leaveVoiceChannel = function leaveVoiceChannel() {
var self = this; var self = this;
return new Promise(function (resolve, reject) { return new Promise(function (resolve, reject) {
var channel = self.resolver.resolveVoiceChannel(chann); if (self.voiceConnection) {
self.voiceConnection.destroy();
if (channel) { self.voiceConnection = null;
if (self.voiceConnections[channel]) { resolve();
var chan = self.voiceConnections[channel];
chan.stopPlaying();
self.voiceConnections.remove(chan);
resolve();
}
} else { } else {
reject(new Error("voice channel does not exist")); resolve();
} }
}); });
}; };
@@ -77,8 +72,7 @@ var InternalClient = (function () {
var channel = self.resolver.resolveVoiceChannel(chann); var channel = self.resolver.resolveVoiceChannel(chann);
if (channel) { if (channel) {
if (!self.voiceConnections.get("id", channel.id)) { var next = function next() {
var session, var session,
token, token,
server = channel.server, server = channel.server,
@@ -90,16 +84,18 @@ var InternalClient = (function () {
if (data.t === "VOICE_STATE_UPDATE") { if (data.t === "VOICE_STATE_UPDATE") {
session = data.d.session_id; session = data.d.session_id;
fired++;
} else if (data.t === "VOICE_SERVER_UPDATE") { } else if (data.t === "VOICE_SERVER_UPDATE") {
token = data.d.token; token = data.d.token;
endpoint = data.d.endpoint; endpoint = data.d.endpoint;
fired++;
var chan = self.voiceConnections.add(new VoiceConnection(channel, self.client, session, token, server, endpoint)); var chan = self.voiceConnection = new VoiceConnection(channel, self.client, session, token, server, endpoint);
chan.on("ready", resolve); chan.on("ready", resolve);
chan.on("error", reject); chan.on("error", reject);
} }
if (fired >= 2) { if (fired >= 2) {
self.client.emit("debug", "removed temporary voice websocket listeners");
self.websocket.removeListener('message', check); self.websocket.removeListener('message', check);
} }
}; };
@@ -114,9 +110,9 @@ var InternalClient = (function () {
"self_deaf": false "self_deaf": false
} }
}); });
} else { };
reject(new Error("voice channel connection exists"));
} self.leaveVoiceChannel().then(next);
} else { } else {
reject(new Error("voice channel does not exist")); reject(new Error("voice channel does not exist"));
} }

View File

@@ -23,9 +23,10 @@ var AudioEncoder = (function () {
var self = this; var self = this;
return new Promise(function (resolve, reject) { return new Promise(function (resolve, reject) {
var enc = cpoc.spawn("ffmpeg", ["-i", file, "-f", "s16le", "-ar", "48000", "-ac", "1", "-af", "volume=1", "pipe:1"]); var enc = cpoc.spawn("ffmpeg", ["-i", file, "-f", "s16le", "-ar", "48000", "-ac", "1", // this can be 2 but there's no point, discord makes it mono on playback, wasted bandwidth.
"-af", "volume=1", "pipe:1"]);
enc.stdout.on("readable", function () { enc.stdout.once("readable", function () {
callback(null, { callback(null, {
proc: enc, proc: enc,
stream: enc.stdout stream: enc.stdout

View File

@@ -1,4 +1,11 @@
"use strict"; "use strict";
/*
Major credit to izy521 who is the creator of
https://github.com/izy521/discord.io,
without his help voice chat in discord.js would not have
been possible!
*/
function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } } function _classCallCheck(instance, Constructor) { if (!(instance instanceof Constructor)) { throw new TypeError("Cannot call a class as a function"); } }
@@ -42,9 +49,17 @@ var VoiceConnection = (function (_EventEmitter) {
this.playing = false; this.playing = false;
this.streamTime = 0; this.streamTime = 0;
this.streamProc = null; this.streamProc = null;
this.KAI = null;
this.init(); this.init();
} }
VoiceConnection.prototype.destroy = function destroy() {
this.stopPlaying();
if (this.KAI) clearInterval(this.KAI);
this.vWS.close();
this.udp.close();
};
VoiceConnection.prototype.stopPlaying = function stopPlaying() { VoiceConnection.prototype.stopPlaying = function stopPlaying() {
this.playing = false; this.playing = false;
this.playingIntent = null; this.playingIntent = null;
@@ -123,7 +138,7 @@ var VoiceConnection = (function (_EventEmitter) {
VoiceConnection.prototype.setSpeaking = function setSpeaking(value) { VoiceConnection.prototype.setSpeaking = function setSpeaking(value) {
this.playing = value; this.playing = value;
this.vWS.send(JSON.stringify({ if (this.vWS.readyState === WebSocket.OPEN) this.vWS.send(JSON.stringify({
op: 5, op: 5,
d: { d: {
speaking: value, speaking: value,
@@ -138,7 +153,7 @@ var VoiceConnection = (function (_EventEmitter) {
var self = this; var self = this;
self.playing = true; self.playing = true;
try { try {
self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); if (self.vWS.readyState === WebSocket.OPEN) self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback);
} catch (e) { } catch (e) {
self.playing = false; self.playing = false;
callback(e); callback(e);
@@ -252,11 +267,12 @@ var VoiceConnection = (function (_EventEmitter) {
self.vWSData = data.d; self.vWSData = data.d;
KAI = setInterval(function () { KAI = setInterval(function () {
vWS.send(JSON.stringify({ if (vWS.readyState === WebSocket.OPEN) vWS.send(JSON.stringify({
op: 3, op: 3,
d: null d: null
})); }));
}, data.d.heartbeat_interval); }, data.d.heartbeat_interval);
self.KAI = KAI;
var udpPacket = new Buffer(70); var udpPacket = new Buffer(70);
udpPacket.writeUIntBE(data.d.ssrc, 0, 4); udpPacket.writeUIntBE(data.d.ssrc, 0, 4);

View File

@@ -40,25 +40,20 @@ class InternalClient {
this.channels = new Cache(); this.channels = new Cache();
this.servers = new Cache(); this.servers = new Cache();
this.private_channels = new Cache(); this.private_channels = new Cache();
this.voiceConnections = new Cache(); this.voiceConnection = null;
this.resolver = new Resolver(this); this.resolver = new Resolver(this);
} }
//def leaveVoiceChannel //def leaveVoiceChannel
leaveVoiceChannel(chann){ leaveVoiceChannel(){
var self = this; var self = this;
return new Promise((resolve, reject) => { return new Promise((resolve, reject) => {
var channel = self.resolver.resolveVoiceChannel(chann); if(self.voiceConnection){
self.voiceConnection.destroy();
if(channel){ self.voiceConnection = null;
if(self.voiceConnections[channel]){ resolve();
var chan = self.voiceConnections[channel];
chan.stopPlaying();
self.voiceConnections.remove(chan);
resolve();
}
}else{ }else{
reject(new Error("voice channel does not exist")); resolve();
} }
}); });
} }
@@ -71,45 +66,46 @@ class InternalClient {
var channel = self.resolver.resolveVoiceChannel(chann); var channel = self.resolver.resolveVoiceChannel(chann);
if(channel){ if(channel){
if(!self.voiceConnections.get("id", channel.id)){
var session, token, server = channel.server, endpoint, fired = 0; self.leaveVoiceChannel().then(next);
var check = (m) => { function next(){
var data = JSON.parse(m); var session, token, server = channel.server, endpoint, fired = 0;
if(data.t === "VOICE_STATE_UPDATE"){ var check = (m) => {
session = data.d.session_id; var data = JSON.parse(m);
}else if(data.t === "VOICE_SERVER_UPDATE"){
token = data.d.token;
endpoint = data.d.endpoint;
var chan = self.voiceConnections.add(new VoiceConnection(channel, self.client, session, token, server, endpoint)); if(data.t === "VOICE_STATE_UPDATE"){
session = data.d.session_id;
fired++;
}else if(data.t === "VOICE_SERVER_UPDATE"){
token = data.d.token;
endpoint = data.d.endpoint;
fired++;
var chan = self.voiceConnection = new VoiceConnection(channel, self.client, session, token, server, endpoint);
chan.on("ready", resolve);
chan.on("error", reject);
}
if(fired >= 2){
self.client.emit("debug", "removed temporary voice websocket listeners");
self.websocket.removeListener('message', check);
}
chan.on("ready", resolve); };
chan.on("error", reject);
}
if(fired >= 2){
self.websocket.removeListener('message', check);
}
}; self.websocket.on("message", check);
self.sendWS({
self.websocket.on("message", check); op : 4,
self.sendWS({ d : {
op : 4, "guild_id" : server.id,
d : { "channel_id" : channel.id,
"guild_id" : server.id, "self_mute" : false,
"channel_id" : channel.id, "self_deaf" : false
"self_mute" : false, }
"self_deaf" : false });
} }
});
}else{
reject(new Error("voice channel connection exists"));
}
}else{ }else{
reject(new Error("voice channel does not exist")); reject(new Error("voice channel does not exist"));
} }

View File

@@ -27,7 +27,7 @@ class AudioEncoder{
"pipe:1" "pipe:1"
]); ]);
enc.stdout.on("readable", function() { enc.stdout.once("readable", function() {
callback(null, { callback(null, {
proc : enc, proc : enc,
stream : enc.stdout stream : enc.stdout

View File

@@ -18,10 +18,10 @@ var VoicePacket = require("./VoicePacket.js");
var StreamIntent = require("./StreamIntent.js"); var StreamIntent = require("./StreamIntent.js");
var EventEmitter = require("events"); var EventEmitter = require("events");
class VoiceConnection extends EventEmitter{ class VoiceConnection extends EventEmitter {
constructor(channel, client, session, token, server, endpoint) { constructor(channel, client, session, token, server, endpoint) {
super(); super();
if(!Opus){ if (!Opus) {
console.log("HEY! WATCH OUT\n\n discord.js needs node-opus, you don't have it installed."); console.log("HEY! WATCH OUT\n\n discord.js needs node-opus, you don't have it installed.");
} }
this.id = channel.id; this.id = channel.id;
@@ -41,13 +41,22 @@ class VoiceConnection extends EventEmitter{
this.playing = false; this.playing = false;
this.streamTime = 0; this.streamTime = 0;
this.streamProc = null; this.streamProc = null;
this.KAI = null;
this.init(); this.init();
} }
destroy() {
this.stopPlaying();
if(this.KAI)
clearInterval(this.KAI);
this.vWS.close();
this.udp.close();
}
stopPlaying() { stopPlaying() {
this.playing=false; this.playing = false;
this.playingIntent = null; this.playingIntent = null;
if(this.streamProc) if (this.streamProc)
this.streamProc.kill(); this.streamProc.kill();
} }
@@ -78,12 +87,12 @@ class VoiceConnection extends EventEmitter{
} }
try { try {
var buffer = stream.read(1920); var buffer = stream.read(1920);
if (!buffer) { if (!buffer) {
setTimeout(send, length * 10); // give chance for some data in 200ms to appear setTimeout(send, length * 10); // give chance for some data in 200ms to appear
return; return;
} }
if (buffer.length !== 1920) { if (buffer.length !== 1920) {
if (onWarning) { if (onWarning) {
retStream.emit("end"); retStream.emit("end");
@@ -126,21 +135,22 @@ class VoiceConnection extends EventEmitter{
setSpeaking(value) { setSpeaking(value) {
this.playing = value; this.playing = value;
this.vWS.send(JSON.stringify({ if (this.vWS.readyState === WebSocket.OPEN)
op: 5, this.vWS.send(JSON.stringify({
d: { op: 5,
speaking: value, d: {
delay: 0 speaking: value,
} delay: 0
})); }
}));
} }
sendPacket(packet, callback = function (err) { }) { sendPacket(packet, callback = function (err) { }) {
var self = this; var self = this;
self.playing = true; self.playing = true;
try { try {
self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback); if (self.vWS.readyState === WebSocket.OPEN)
self.udp.send(packet, 0, packet.length, self.vWSData.port, self.endpoint, callback);
} catch (e) { } catch (e) {
self.playing = false; self.playing = false;
callback(e); callback(e);
@@ -180,7 +190,7 @@ class VoiceConnection extends EventEmitter{
.encodeFile(stream) .encodeFile(stream)
.catch(error) .catch(error)
.then(data => { .then(data => {
self.streamProc = data.proc; self.streamProc = data.proc;
var intent = self.playRawStream(data.stream); var intent = self.playRawStream(data.stream);
resolve(intent); resolve(intent);
@@ -251,16 +261,18 @@ class VoiceConnection extends EventEmitter{
self.vWSData = data.d; self.vWSData = data.d;
KAI = setInterval(() => { KAI = setInterval(() => {
vWS.send(JSON.stringify({ if (vWS.readyState === WebSocket.OPEN)
op: 3, vWS.send(JSON.stringify({
d: null op: 3,
})); d: null
}));
}, data.d.heartbeat_interval); }, data.d.heartbeat_interval);
self.KAI = KAI;
var udpPacket = new Buffer(70); var udpPacket = new Buffer(70);
udpPacket.writeUIntBE(data.d.ssrc, 0, 4); udpPacket.writeUIntBE(data.d.ssrc, 0, 4);
udpClient.send(udpPacket, 0, udpPacket.length, data.d.port, self.endpoint, err => { udpClient.send(udpPacket, 0, udpPacket.length, data.d.port, self.endpoint, err => {
if(err) if (err)
self.emit("error", err) self.emit("error", err)
}); });
break; break;

View File

@@ -7,38 +7,25 @@ client.on("message", m => {
if(m.content === "&init"){ if(m.content === "&init"){
for(var channel of m.channel.server.channels){ for(var channel of m.channel.server.channels){
if(channel instanceof Discord.VoiceChannel){ if(channel instanceof Discord.VoiceChannel){
client.joinVoiceChannel(channel).catch(error) client.joinVoiceChannel(channel).catch(error);
.then(connection => {
connection.playFile("C:/users/amish/desktop/Developers.mp3");
});
break; break;
} }
} }
} }
if(m.content.startsWith("$$$ stop")){ if(m.content.startsWith("$$$ stop")){
for(var channel of m.channel.server.channels){ if(client.internal.voiceConnection){
if(channel instanceof Discord.VoiceChannel){ client.internal.voiceConnection.stopPlaying();
chan = channel;
break;
}
}
if(client.internal.voiceConnections.get("id", chan.id)){
var connection = client.internal.voiceConnections.get("id", chan.id);
connection.stopPlaying();
} }
return; return;
} }
if(m.content.startsWith("$$$")){ if(m.content.startsWith("$$$")){
var chan; var chan;
for(var channel of m.channel.server.channels){ var rest = m.content.split(" ");
if(channel instanceof Discord.VoiceChannel){ rest.splice(0, 1);
chan = channel; rest = rest.join(" ");
break; if(client.internal.voiceConnection){
} var connection = client.internal.voiceConnection;
} connection.playFile("C:/users/amish/desktop/"+rest);
if(client.internal.voiceConnections.get("id", chan.id)){
var connection = client.internal.voiceConnections.get("id", chan.id);
connection.playFile("C:/users/amish/desktop/Developers.mp3");
} }
} }
}); });