websocket cleanup (#1346)

* websocket cleanup

* resume event

* Update Resumed.js

* Update WebSocketManager.js
This commit is contained in:
Gus Caplan
2017-04-10 13:02:17 -05:00
committed by Crawl
parent 8436cbe8b1
commit 0a56fa0aae
8 changed files with 167 additions and 79 deletions

View File

@@ -1,4 +1,5 @@
const Constants = require('../util/Constants');
const WebSocketConnection = require('./websocket/WebSocketConnection');
/**
* Manages the State and Background Tasks of the Client
@@ -29,7 +30,9 @@ class ClientManager {
this.client.emit(Constants.Events.DEBUG, `Authenticated using token ${token}`);
this.client.token = token;
const timeout = this.client.setTimeout(() => reject(new Error(Constants.Errors.TOOK_TOO_LONG)), 1000 * 300);
this.client.rest.methods.getGateway().then(gateway => {
this.client.rest.methods.getGateway().then(res => {
const protocolVersion = Constants.DefaultOptions.ws.version;
const gateway = `${res.url}/?v=${protocolVersion}&encoding=${WebSocketConnection.ENCODING}`;
this.client.emit(Constants.Events.DEBUG, `Using gateway ${gateway}`);
this.client.ws.connect(gateway);
this.client.ws.once('close', event => {

View File

@@ -39,15 +39,8 @@ class RESTMethods {
return this.rest.makeRequest('post', Endpoints.logout, true, {});
}
getGateway() {
return this.rest.makeRequest('get', Endpoints.gateway, true).then(res => {
this.client.ws.gateway = `${res.url}/?v=${this.client.options.ws.version}`;
return this.client.ws.gateway;
});
}
getBotGateway() {
return this.rest.makeRequest('get', Endpoints.gateway.bot, true);
getGateway(bot = false) {
return this.rest.makeRequest('get', bot ? Endpoints.gateway.bot : Endpoints.gateway, true);
}
fetchVoiceRegions(guildID) {

View File

@@ -0,0 +1,105 @@
const browser = require('os').platform() === 'browser';
const EventEmitter = require('events');
const zlib = require('zlib');
const erlpack = (function findErlpack() {
try {
const e = require('erlpack');
if (!e.pack) return null;
return e;
} catch (e) {
return null;
}
}());
const WebSocket = (function findWebSocket() {
if (browser) return window.WebSocket; // eslint-disable-line no-undef
try {
return require('uws');
} catch (e) {
return require('ws');
}
}());
/**
* Abstracts a WebSocket connection with decoding/encoding for the discord gateway
* @private
*/
class WebSocketConnection extends WebSocket {
/**
* @param {string} gateway Websocket gateway to connect to
*/
constructor(gateway) {
super(gateway);
this.e = new EventEmitter();
if (browser) this.binaryType = 'arraybuffer';
this.onmessage = this.eventMessage.bind(this);
this.onopen = this.e.emit.bind(this.e, 'open');
this.onclose = this.e.emit.bind(this.e, 'close');
this.onerror = this.e.emit.bind(this.e, 'error');
}
/**
* Called when the websocket gets a message
* @param {Object} event Close event object
* @returns {Promise<boolean>}
*/
eventMessage(event) {
try {
const data = this.unpack(event.data);
this.e.emit('packet', data);
return true;
} catch (err) {
if (this.e.listenerCount('decodeError')) this.e.emit('decodeError', err);
return false;
}
}
/**
* Send data over the websocket
* @param {string|Buffer} data Data to send
*/
send(data) {
super.send(this.pack(data));
}
/**
* Pack data using JSON or Erlpack
* @param {*} data Data to pack
* @returns {string|Buffer}
*/
pack(data) {
return erlpack ? erlpack.pack(data) : JSON.stringify(data);
}
/**
* Unpack data using JSON or Erlpack
* @param {string|ArrayBuffer|Buffer} data Data to unpack
* @returns {string|Object}
*/
unpack(data) {
if (erlpack && typeof data !== 'string') {
if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data));
return erlpack.unpack(data);
} else {
if (data instanceof ArrayBuffer || data instanceof Buffer) data = this.inflate(data);
return JSON.parse(data);
}
}
/**
* Zlib inflate data
* @param {string|Buffer} data Data to inflate
* @returns {string|Buffer}
*/
inflate(data) {
return erlpack ? data : zlib.inflateSync(data).toString();
}
}
/**
* Encoding the WebSocket connections will use
* @type {string}
*/
WebSocketConnection.ENCODING = erlpack ? 'etf' : 'json';
module.exports = WebSocketConnection;

View File

@@ -1,28 +1,7 @@
const browser = require('os').platform() === 'browser';
const EventEmitter = require('events').EventEmitter;
const Constants = require('../../util/Constants');
const convertToBuffer = require('../../util/Util').convertToBuffer;
const zlib = require('zlib');
const PacketManager = require('./packets/WebSocketPacketManager');
let WebSocket, erlpack;
let serialize = JSON.stringify;
if (browser) {
WebSocket = window.WebSocket; // eslint-disable-line no-undef
} else {
try {
WebSocket = require('uws');
} catch (err) {
WebSocket = require('ws');
}
try {
erlpack = require('erlpack');
serialize = erlpack.pack;
} catch (err) {
erlpack = null;
}
}
const WebSocketConnection = require('./WebSocketConnection');
/**
* The WebSocket Manager of the Client
@@ -89,6 +68,9 @@ class WebSocketManager extends EventEmitter {
this.first = true;
this.lastHeartbeatAck = true;
this._trace = [];
this.resumeStart = -1;
}
/**
@@ -99,12 +81,11 @@ class WebSocketManager extends EventEmitter {
this.client.emit('debug', `Connecting to gateway ${gateway}`);
this.normalReady = false;
if (this.status !== Constants.Status.RECONNECTING) this.status = Constants.Status.CONNECTING;
this.ws = new WebSocket(gateway);
if (browser) this.ws.binaryType = 'arraybuffer';
this.ws.onopen = this.eventOpen.bind(this);
this.ws.onmessage = this.eventMessage.bind(this);
this.ws.onclose = this.eventClose.bind(this);
this.ws.onerror = this.eventError.bind(this);
this.ws = new WebSocketConnection(gateway);
this.ws.e.on('open', this.eventOpen.bind(this));
this.ws.e.on('packet', this.eventPacket.bind(this));
this.ws.e.on('close', this.eventClose.bind(this));
this.ws.e.on('error', this.eventError.bind(this));
this._queue = [];
this._remaining = 120;
this.client.setInterval(() => {
@@ -113,8 +94,8 @@ class WebSocketManager extends EventEmitter {
}, 60e3);
}
connect(gateway) {
gateway = `${gateway}&encoding=${erlpack ? 'etf' : 'json'}`;
connect(gateway = this.gateway) {
this.gateway = gateway;
if (this.first) {
this._connect(gateway);
this.first = false;
@@ -125,7 +106,7 @@ class WebSocketManager extends EventEmitter {
heartbeat(normal) {
if (normal && !this.lastHeartbeatAck) {
this.ws.close(1007);
this.tryReconnect();
return;
}
@@ -146,10 +127,10 @@ class WebSocketManager extends EventEmitter {
*/
send(data, force = false) {
if (force) {
this._send(serialize(data));
this._send(data);
return;
}
this._queue.push(serialize(data));
this._queue.push(data);
this.doQueue();
}
@@ -160,7 +141,7 @@ class WebSocketManager extends EventEmitter {
}
_send(data) {
if (this.ws.readyState === WebSocket.OPEN) {
if (this.ws.readyState === WebSocketConnection.OPEN) {
this.emit('send', data);
this.ws.send(data);
}
@@ -168,7 +149,7 @@ class WebSocketManager extends EventEmitter {
doQueue() {
const item = this._queue[0];
if (!(this.ws.readyState === WebSocket.OPEN && item)) return;
if (!(this.ws.readyState === WebSocketConnection.OPEN && item)) return;
if (this.remaining === 0) {
this.client.setTimeout(this.doQueue.bind(this), Date.now() - this.remainingReset);
return;
@@ -185,7 +166,7 @@ class WebSocketManager extends EventEmitter {
eventOpen() {
this.client.emit('debug', 'Connection to gateway opened');
this.lastHeartbeatAck = true;
if (this.status === Constants.Status.RECONNECTING) this._sendResume();
if (this.sessionID) this._sendResume();
else this._sendNewIdentify();
}
@@ -198,6 +179,7 @@ class WebSocketManager extends EventEmitter {
return;
}
this.client.emit('debug', 'Identifying as resumed session');
this.resumeStart = this.sequence;
const payload = {
token: this.client.token,
session_id: this.sessionID,
@@ -255,11 +237,10 @@ class WebSocketManager extends EventEmitter {
/**
* Run whenever a message is received from the WebSocket. Returns `true` if the message
* was handled properly.
* @param {Object} event The received websocket data
* @param {Object} data The received websocket data
* @returns {boolean}
*/
eventMessage(event) {
const data = this.tryParseEventData(event.data);
eventPacket(data) {
if (data === null) {
this.eventError(new Error(Constants.Errors.BAD_WS_MESSAGE));
return false;
@@ -271,34 +252,6 @@ class WebSocketManager extends EventEmitter {
return this.packetManager.handle(data);
}
/**
* Parses the raw data from a websocket event, inflating it if necessary
* @param {*} data Event data
* @returns {Object}
*/
parseEventData(data) {
if (erlpack) {
if (data instanceof ArrayBuffer) data = convertToBuffer(data);
return erlpack.unpack(data);
} else {
if (data instanceof Buffer || data instanceof ArrayBuffer) data = zlib.inflateSync(data).toString();
return JSON.parse(data);
}
}
/**
* Tries to call `parseEventData()` and return its result, or returns `null` upon thrown errors.
* @param {*} data Event data
* @returns {?Object}
*/
tryParseEventData(data) {
try {
return this.parseEventData(data);
} catch (err) {
return null;
}
}
/**
* Run whenever an error occurs with the WebSocket connection. Tries to reconnect
* @param {Error} err The encountered error

View File

@@ -16,6 +16,7 @@ class WebSocketPacketManager {
this.queue = [];
this.register(Constants.WSEvents.READY, require('./handlers/Ready'));
this.register(Constants.WSEvents.RESUMED, require('./handlers/Resumed'));
this.register(Constants.WSEvents.GUILD_CREATE, require('./handlers/GuildCreate'));
this.register(Constants.WSEvents.GUILD_DELETE, require('./handlers/GuildDelete'));
this.register(Constants.WSEvents.GUILD_UPDATE, require('./handlers/GuildUpdate'));
@@ -79,6 +80,7 @@ class WebSocketPacketManager {
}
if (packet.op === Constants.OPCodes.INVALID_SESSION) {
this.client.emit('debug', `SESSION INVALID! Waiting to reconnect: ${packet.d}`);
if (packet.d) {
setTimeout(() => {
this.ws._sendResume();

View File

@@ -63,8 +63,12 @@ class ReadyHandler extends AbstractHandler {
if (!client.ws.normalReady) client.ws._emitReady(false);
}, 1200 * data.guilds.length);
this.packetManager.ws.sessionID = data.session_id;
this.packetManager.ws.checkIfReady();
const ws = this.packetManager.ws;
ws.sessionID = data.session_id;
ws._trace = data._trace;
client.emit('debug', `READY ${ws._trace.join(' -> ')} ${ws.sessionID}`);
ws.checkIfReady();
}
}

View File

@@ -0,0 +1,26 @@
const AbstractHandler = require('./AbstractHandler');
class ResumedHandler extends AbstractHandler {
handle(packet) {
const client = this.packetManager.client;
const ws = client.ws;
ws._trace = packet.d._trace;
const replayed = ws.sequence - ws.resumeStart;
ws.resumeStart = -1;
client.emit('debug', `RESUMED ${ws._trace.join(' -> ')} | replayed ${replayed} events. `);
client.emit('resume', replayed);
ws.heartbeat();
}
}
/**
* Emitted whenever a websocket resumes
* @event Client#resume
* @param {Number} replayed Number of events that were replayed
*/
module.exports = ResumedHandler;

View File

@@ -326,6 +326,7 @@ exports.Events = {
/**
* The type of a websocket message event, e.g. `MESSAGE_CREATE`. Here are the available events:
* - READY
* - RESUMED
* - GUILD_SYNC
* - GUILD_CREATE
* - GUILD_DELETE
@@ -363,6 +364,7 @@ exports.Events = {
*/
exports.WSEvents = {
READY: 'READY',
RESUMED: 'RESUMED',
GUILD_SYNC: 'GUILD_SYNC',
GUILD_CREATE: 'GUILD_CREATE',
GUILD_DELETE: 'GUILD_DELETE',