diff --git a/src/client/ClientManager.js b/src/client/ClientManager.js index 4959c7691..7f7c5cffa 100644 --- a/src/client/ClientManager.js +++ b/src/client/ClientManager.js @@ -1,4 +1,5 @@ const Constants = require('../util/Constants'); +const WebSocketConnection = require('./websocket/WebSocketConnection'); /** * Manages the State and Background Tasks of the Client @@ -29,7 +30,9 @@ class ClientManager { this.client.emit(Constants.Events.DEBUG, `Authenticated using token ${token}`); this.client.token = token; const timeout = this.client.setTimeout(() => reject(new Error(Constants.Errors.TOOK_TOO_LONG)), 1000 * 300); - this.client.rest.methods.getGateway().then(gateway => { + this.client.rest.methods.getGateway().then(res => { + const protocolVersion = Constants.DefaultOptions.ws.version; + const gateway = `${res.url}/?v=${protocolVersion}&encoding=${WebSocketConnection.ENCODING}`; this.client.emit(Constants.Events.DEBUG, `Using gateway ${gateway}`); this.client.ws.connect(gateway); this.client.ws.once('close', event => { diff --git a/src/client/rest/RESTMethods.js b/src/client/rest/RESTMethods.js index 6cba91018..28843b26e 100644 --- a/src/client/rest/RESTMethods.js +++ b/src/client/rest/RESTMethods.js @@ -39,15 +39,8 @@ class RESTMethods { return this.rest.makeRequest('post', Endpoints.logout, true, {}); } - getGateway() { - return this.rest.makeRequest('get', Endpoints.gateway, true).then(res => { - this.client.ws.gateway = `${res.url}/?v=${this.client.options.ws.version}`; - return this.client.ws.gateway; - }); - } - - getBotGateway() { - return this.rest.makeRequest('get', Endpoints.gateway.bot, true); + getGateway(bot = false) { + return this.rest.makeRequest('get', bot ? Endpoints.gateway.bot : Endpoints.gateway, true); } fetchVoiceRegions(guildID) { diff --git a/src/client/websocket/WebSocketConnection.js b/src/client/websocket/WebSocketConnection.js new file mode 100644 index 000000000..8ae2c8c40 --- /dev/null +++ b/src/client/websocket/WebSocketConnection.js @@ -0,0 +1,105 @@ +const browser = require('os').platform() === 'browser'; +const EventEmitter = require('events'); +const zlib = require('zlib'); +const erlpack = (function findErlpack() { + try { + const e = require('erlpack'); + if (!e.pack) return null; + return e; + } catch (e) { + return null; + } +}()); + +const WebSocket = (function findWebSocket() { + if (browser) return window.WebSocket; // eslint-disable-line no-undef + try { + return require('uws'); + } catch (e) { + return require('ws'); + } +}()); + +/** + * Abstracts a WebSocket connection with decoding/encoding for the discord gateway + * @private + */ +class WebSocketConnection extends WebSocket { + /** + * @param {string} gateway Websocket gateway to connect to + */ + constructor(gateway) { + super(gateway); + this.e = new EventEmitter(); + if (browser) this.binaryType = 'arraybuffer'; + this.onmessage = this.eventMessage.bind(this); + this.onopen = this.e.emit.bind(this.e, 'open'); + this.onclose = this.e.emit.bind(this.e, 'close'); + this.onerror = this.e.emit.bind(this.e, 'error'); + } + + /** + * Called when the websocket gets a message + * @param {Object} event Close event object + * @returns {Promise} + */ + eventMessage(event) { + try { + const data = this.unpack(event.data); + this.e.emit('packet', data); + return true; + } catch (err) { + if (this.e.listenerCount('decodeError')) this.e.emit('decodeError', err); + return false; + } + } + + /** + * Send data over the websocket + * @param {string|Buffer} data Data to send + */ + send(data) { + super.send(this.pack(data)); + } + + /** + * Pack data using JSON or Erlpack + * @param {*} data Data to pack + * @returns {string|Buffer} + */ + pack(data) { + return erlpack ? erlpack.pack(data) : JSON.stringify(data); + } + + /** + * Unpack data using JSON or Erlpack + * @param {string|ArrayBuffer|Buffer} data Data to unpack + * @returns {string|Object} + */ + unpack(data) { + if (erlpack && typeof data !== 'string') { + if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data)); + return erlpack.unpack(data); + } else { + if (data instanceof ArrayBuffer || data instanceof Buffer) data = this.inflate(data); + return JSON.parse(data); + } + } + + /** + * Zlib inflate data + * @param {string|Buffer} data Data to inflate + * @returns {string|Buffer} + */ + inflate(data) { + return erlpack ? data : zlib.inflateSync(data).toString(); + } +} + +/** + * Encoding the WebSocket connections will use + * @type {string} + */ +WebSocketConnection.ENCODING = erlpack ? 'etf' : 'json'; + +module.exports = WebSocketConnection; diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index 92f069794..3c0f45edc 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -1,28 +1,7 @@ -const browser = require('os').platform() === 'browser'; const EventEmitter = require('events').EventEmitter; const Constants = require('../../util/Constants'); -const convertToBuffer = require('../../util/Util').convertToBuffer; -const zlib = require('zlib'); const PacketManager = require('./packets/WebSocketPacketManager'); - -let WebSocket, erlpack; -let serialize = JSON.stringify; -if (browser) { - WebSocket = window.WebSocket; // eslint-disable-line no-undef -} else { - try { - WebSocket = require('uws'); - } catch (err) { - WebSocket = require('ws'); - } - - try { - erlpack = require('erlpack'); - serialize = erlpack.pack; - } catch (err) { - erlpack = null; - } -} +const WebSocketConnection = require('./WebSocketConnection'); /** * The WebSocket Manager of the Client @@ -89,6 +68,9 @@ class WebSocketManager extends EventEmitter { this.first = true; this.lastHeartbeatAck = true; + + this._trace = []; + this.resumeStart = -1; } /** @@ -99,12 +81,11 @@ class WebSocketManager extends EventEmitter { this.client.emit('debug', `Connecting to gateway ${gateway}`); this.normalReady = false; if (this.status !== Constants.Status.RECONNECTING) this.status = Constants.Status.CONNECTING; - this.ws = new WebSocket(gateway); - if (browser) this.ws.binaryType = 'arraybuffer'; - this.ws.onopen = this.eventOpen.bind(this); - this.ws.onmessage = this.eventMessage.bind(this); - this.ws.onclose = this.eventClose.bind(this); - this.ws.onerror = this.eventError.bind(this); + this.ws = new WebSocketConnection(gateway); + this.ws.e.on('open', this.eventOpen.bind(this)); + this.ws.e.on('packet', this.eventPacket.bind(this)); + this.ws.e.on('close', this.eventClose.bind(this)); + this.ws.e.on('error', this.eventError.bind(this)); this._queue = []; this._remaining = 120; this.client.setInterval(() => { @@ -113,8 +94,8 @@ class WebSocketManager extends EventEmitter { }, 60e3); } - connect(gateway) { - gateway = `${gateway}&encoding=${erlpack ? 'etf' : 'json'}`; + connect(gateway = this.gateway) { + this.gateway = gateway; if (this.first) { this._connect(gateway); this.first = false; @@ -125,7 +106,7 @@ class WebSocketManager extends EventEmitter { heartbeat(normal) { if (normal && !this.lastHeartbeatAck) { - this.ws.close(1007); + this.tryReconnect(); return; } @@ -146,10 +127,10 @@ class WebSocketManager extends EventEmitter { */ send(data, force = false) { if (force) { - this._send(serialize(data)); + this._send(data); return; } - this._queue.push(serialize(data)); + this._queue.push(data); this.doQueue(); } @@ -160,7 +141,7 @@ class WebSocketManager extends EventEmitter { } _send(data) { - if (this.ws.readyState === WebSocket.OPEN) { + if (this.ws.readyState === WebSocketConnection.OPEN) { this.emit('send', data); this.ws.send(data); } @@ -168,7 +149,7 @@ class WebSocketManager extends EventEmitter { doQueue() { const item = this._queue[0]; - if (!(this.ws.readyState === WebSocket.OPEN && item)) return; + if (!(this.ws.readyState === WebSocketConnection.OPEN && item)) return; if (this.remaining === 0) { this.client.setTimeout(this.doQueue.bind(this), Date.now() - this.remainingReset); return; @@ -185,7 +166,7 @@ class WebSocketManager extends EventEmitter { eventOpen() { this.client.emit('debug', 'Connection to gateway opened'); this.lastHeartbeatAck = true; - if (this.status === Constants.Status.RECONNECTING) this._sendResume(); + if (this.sessionID) this._sendResume(); else this._sendNewIdentify(); } @@ -198,6 +179,7 @@ class WebSocketManager extends EventEmitter { return; } this.client.emit('debug', 'Identifying as resumed session'); + this.resumeStart = this.sequence; const payload = { token: this.client.token, session_id: this.sessionID, @@ -255,11 +237,10 @@ class WebSocketManager extends EventEmitter { /** * Run whenever a message is received from the WebSocket. Returns `true` if the message * was handled properly. - * @param {Object} event The received websocket data + * @param {Object} data The received websocket data * @returns {boolean} */ - eventMessage(event) { - const data = this.tryParseEventData(event.data); + eventPacket(data) { if (data === null) { this.eventError(new Error(Constants.Errors.BAD_WS_MESSAGE)); return false; @@ -271,34 +252,6 @@ class WebSocketManager extends EventEmitter { return this.packetManager.handle(data); } - /** - * Parses the raw data from a websocket event, inflating it if necessary - * @param {*} data Event data - * @returns {Object} - */ - parseEventData(data) { - if (erlpack) { - if (data instanceof ArrayBuffer) data = convertToBuffer(data); - return erlpack.unpack(data); - } else { - if (data instanceof Buffer || data instanceof ArrayBuffer) data = zlib.inflateSync(data).toString(); - return JSON.parse(data); - } - } - - /** - * Tries to call `parseEventData()` and return its result, or returns `null` upon thrown errors. - * @param {*} data Event data - * @returns {?Object} - */ - tryParseEventData(data) { - try { - return this.parseEventData(data); - } catch (err) { - return null; - } - } - /** * Run whenever an error occurs with the WebSocket connection. Tries to reconnect * @param {Error} err The encountered error diff --git a/src/client/websocket/packets/WebSocketPacketManager.js b/src/client/websocket/packets/WebSocketPacketManager.js index 315b7bbb6..112170d9b 100644 --- a/src/client/websocket/packets/WebSocketPacketManager.js +++ b/src/client/websocket/packets/WebSocketPacketManager.js @@ -16,6 +16,7 @@ class WebSocketPacketManager { this.queue = []; this.register(Constants.WSEvents.READY, require('./handlers/Ready')); + this.register(Constants.WSEvents.RESUMED, require('./handlers/Resumed')); this.register(Constants.WSEvents.GUILD_CREATE, require('./handlers/GuildCreate')); this.register(Constants.WSEvents.GUILD_DELETE, require('./handlers/GuildDelete')); this.register(Constants.WSEvents.GUILD_UPDATE, require('./handlers/GuildUpdate')); @@ -79,6 +80,7 @@ class WebSocketPacketManager { } if (packet.op === Constants.OPCodes.INVALID_SESSION) { + this.client.emit('debug', `SESSION INVALID! Waiting to reconnect: ${packet.d}`); if (packet.d) { setTimeout(() => { this.ws._sendResume(); diff --git a/src/client/websocket/packets/handlers/Ready.js b/src/client/websocket/packets/handlers/Ready.js index 077d17f29..81bf447e3 100644 --- a/src/client/websocket/packets/handlers/Ready.js +++ b/src/client/websocket/packets/handlers/Ready.js @@ -63,8 +63,12 @@ class ReadyHandler extends AbstractHandler { if (!client.ws.normalReady) client.ws._emitReady(false); }, 1200 * data.guilds.length); - this.packetManager.ws.sessionID = data.session_id; - this.packetManager.ws.checkIfReady(); + const ws = this.packetManager.ws; + + ws.sessionID = data.session_id; + ws._trace = data._trace; + client.emit('debug', `READY ${ws._trace.join(' -> ')} ${ws.sessionID}`); + ws.checkIfReady(); } } diff --git a/src/client/websocket/packets/handlers/Resumed.js b/src/client/websocket/packets/handlers/Resumed.js new file mode 100644 index 000000000..d2fa68389 --- /dev/null +++ b/src/client/websocket/packets/handlers/Resumed.js @@ -0,0 +1,26 @@ +const AbstractHandler = require('./AbstractHandler'); + +class ResumedHandler extends AbstractHandler { + handle(packet) { + const client = this.packetManager.client; + const ws = client.ws; + + ws._trace = packet.d._trace; + + const replayed = ws.sequence - ws.resumeStart; + ws.resumeStart = -1; + + client.emit('debug', `RESUMED ${ws._trace.join(' -> ')} | replayed ${replayed} events. `); + client.emit('resume', replayed); + + ws.heartbeat(); + } +} + +/** + * Emitted whenever a websocket resumes + * @event Client#resume + * @param {Number} replayed Number of events that were replayed + */ + +module.exports = ResumedHandler; diff --git a/src/util/Constants.js b/src/util/Constants.js index 17b679f8b..5a2f69e0b 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -326,6 +326,7 @@ exports.Events = { /** * The type of a websocket message event, e.g. `MESSAGE_CREATE`. Here are the available events: * - READY + * - RESUMED * - GUILD_SYNC * - GUILD_CREATE * - GUILD_DELETE @@ -363,6 +364,7 @@ exports.Events = { */ exports.WSEvents = { READY: 'READY', + RESUMED: 'RESUMED', GUILD_SYNC: 'GUILD_SYNC', GUILD_CREATE: 'GUILD_CREATE', GUILD_DELETE: 'GUILD_DELETE',