From 195fcfa15cfdfbeede545f22009006f3a34a0dfc Mon Sep 17 00:00:00 2001 From: Amish Shah Date: Fri, 28 Apr 2017 16:13:06 +0100 Subject: [PATCH] Rewrite WebSocket internals (#1410) * Start rewriting Manager and Connection * more stuff * stuff * Fix ready bug * some stuff i forgot * fix some stuff * add stupid heartbeat ack like seriously who cares * woo! * fix a bug * rate limit the dumb websocket * stuff * hdocs * Docs * Remove ClientManager#setupKeepAlive as it is now redundant * Change Client._pingTimestamp to a getter that fetches the timestamp from the WebSocketConnection * are you happy now eslint smh * make gus happy * Add CloseEvent external doc * Make sure to emit 'reconnecting' when actually reconnecting * ffs * Fix RESUME logic * Add heartbeat ack debug messages, including latency data * Dumb stuff for Gus * thx eslint * more dumb stuff * more dumb crap smh gus i h8 u * moar messages * fix for using wrong status, causing certain events not to be fired (#1422) --- src/client/Client.js | 16 +- src/client/ClientDataManager.js | 2 +- src/client/ClientManager.js | 18 +- src/client/rest/RESTManager.js | 6 + .../rest/RequestHandlers/RequestHandler.js | 4 + src/client/rest/RequestHandlers/Sequential.js | 59 +-- src/client/websocket/WebSocketConnection.js | 437 ++++++++++++++++-- src/client/websocket/WebSocketManager.js | 333 ++----------- .../packets/WebSocketPacketManager.js | 36 +- .../websocket/packets/handlers/Ready.js | 6 +- .../websocket/packets/handlers/Resumed.js | 10 +- src/util/Constants.js | 7 + test/random.js | 39 +- 13 files changed, 541 insertions(+), 432 deletions(-) diff --git a/src/client/Client.js b/src/client/Client.js index b36f00148..e48f3b1ae 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -156,13 +156,6 @@ class Client extends EventEmitter { */ this.pings = []; - /** - * Timestamp of the latest ping's start time - * @type {number} - * @private - */ - this._pingTimestamp = 0; - /** * Timeouts set by {@link Client#setTimeout} that are still active * @type {Set} @@ -182,6 +175,15 @@ class Client extends EventEmitter { } } + /** + * Timestamp of the latest ping's start time + * @type {number} + * @private + */ + get _pingTimestamp() { + return this.ws.connection ? this.ws.connection.lastPingTimestamp : 0; + } + /** * Current status of the client's connection to Discord * @type {?number} diff --git a/src/client/ClientDataManager.js b/src/client/ClientDataManager.js index 30992fb96..e8c5721eb 100644 --- a/src/client/ClientDataManager.js +++ b/src/client/ClientDataManager.js @@ -15,7 +15,7 @@ class ClientDataManager { } get pastReady() { - return this.client.ws.status === Constants.Status.READY; + return this.client.ws.connection.status === Constants.Status.READY; } newGuild(data) { diff --git a/src/client/ClientManager.js b/src/client/ClientManager.js index 7f7c5cffa..038667f3a 100644 --- a/src/client/ClientManager.js +++ b/src/client/ClientManager.js @@ -20,6 +20,14 @@ class ClientManager { this.heartbeatInterval = null; } + /** + * The status of the client + * @type {number} + */ + get status() { + return this.connection ? this.connection.status : Constants.Status.IDLE; + } + /** * Connects the Client to the WebSocket * @param {string} token The authorization token @@ -47,17 +55,9 @@ class ClientManager { }, reject); } - /** - * Sets up a keep-alive interval to keep the Client's connection valid - * @param {number} time The interval in milliseconds at which heartbeat packets should be sent - */ - setupKeepAlive(time) { - this.heartbeatInterval = time; - this.client.setInterval(() => this.client.ws.heartbeat(true), time); - } - destroy() { this.client.ws.destroy(); + this.client.rest.destroy(); if (!this.client.user) return Promise.resolve(); if (this.client.user.bot) { this.client.token = null; diff --git a/src/client/rest/RESTManager.js b/src/client/rest/RESTManager.js index 3be85d1f0..512b3063e 100644 --- a/src/client/rest/RESTManager.js +++ b/src/client/rest/RESTManager.js @@ -15,6 +15,12 @@ class RESTManager { this.globallyRateLimited = false; } + destroy() { + for (const handlerID in this.handlers) { + this.handlers[handlerID].destroy(); + } + } + push(handler, apiRequest) { return new Promise((resolve, reject) => { handler.push({ diff --git a/src/client/rest/RequestHandlers/RequestHandler.js b/src/client/rest/RequestHandlers/RequestHandler.js index a93e6c031..ea99ade6c 100644 --- a/src/client/rest/RequestHandlers/RequestHandler.js +++ b/src/client/rest/RequestHandlers/RequestHandler.js @@ -45,6 +45,10 @@ class RequestHandler { * Attempts to get this RequestHandler to process its current queue */ handle() {} // eslint-disable-line no-empty-function + + destroy() { + this.queue = []; + } } module.exports = RequestHandler; diff --git a/src/client/rest/RequestHandlers/Sequential.js b/src/client/rest/RequestHandlers/Sequential.js index 057dec7ea..d5a3a8c14 100644 --- a/src/client/rest/RequestHandlers/Sequential.js +++ b/src/client/rest/RequestHandlers/Sequential.js @@ -48,39 +48,42 @@ class SequentialRequestHandler extends RequestHandler { execute(item) { this.busy = true; return new Promise(resolve => { - item.request.gen().end((err, res) => { - if (res && res.headers) { - this.requestLimit = Number(res.headers['x-ratelimit-limit']); - this.requestResetTime = Number(res.headers['x-ratelimit-reset']) * 1000; - this.requestRemaining = Number(res.headers['x-ratelimit-remaining']); - this.timeDifference = Date.now() - new Date(res.headers.date).getTime(); - } - if (err) { - if (err.status === 429) { - this.queue.unshift(item); - this.restManager.client.setTimeout(() => { - this.globalLimit = false; - resolve(); - }, Number(res.headers['retry-after']) + this.restManager.client.options.restTimeOffset); - if (res.headers['x-ratelimit-global']) this.globalLimit = true; - } else { - item.reject(err); - resolve(err); + item.request + .gen() + .on('error', e => item.reject(e)) + .end((err, res) => { + if (res && res.headers) { + this.requestLimit = Number(res.headers['x-ratelimit-limit']); + this.requestResetTime = Number(res.headers['x-ratelimit-reset']) * 1000; + this.requestRemaining = Number(res.headers['x-ratelimit-remaining']); + this.timeDifference = Date.now() - new Date(res.headers.date).getTime(); } - } else { - this.globalLimit = false; - const data = res && res.body ? res.body : {}; - item.resolve(data); - if (this.requestRemaining === 0) { - this.restManager.client.setTimeout( + if (err) { + if (err.status === 429) { + this.queue.unshift(item); + this.restManager.client.setTimeout(() => { + this.globalLimit = false; + resolve(); + }, Number(res.headers['retry-after']) + this.restManager.client.options.restTimeOffset); + if (res.headers['x-ratelimit-global']) this.globalLimit = true; + } else { + item.reject(err); + resolve(err); + } + } else { + this.globalLimit = false; + const data = res && res.body ? res.body : {}; + item.resolve(data); + if (this.requestRemaining === 0) { + this.restManager.client.setTimeout( () => resolve(data), this.requestResetTime - Date.now() + this.timeDifference + this.restManager.client.options.restTimeOffset ); - } else { - resolve(data); + } else { + resolve(data); + } } - } - }); + }); }); } diff --git a/src/client/websocket/WebSocketConnection.js b/src/client/websocket/WebSocketConnection.js index c8ec692c0..ca58c447a 100644 --- a/src/client/websocket/WebSocketConnection.js +++ b/src/client/websocket/WebSocketConnection.js @@ -1,6 +1,8 @@ const browser = require('os').platform() === 'browser'; const EventEmitter = require('events'); +const Constants = require('../../util/Constants'); const zlib = require('zlib'); +const PacketManager = require('./packets/WebSocketPacketManager'); const erlpack = (function findErlpack() { try { const e = require('erlpack'); @@ -26,45 +28,137 @@ const WebSocket = (function findWebSocket() { */ class WebSocketConnection extends EventEmitter { /** + * @param {WebSocketManager} manager the WebSocket manager * @param {string} gateway Websocket gateway to connect to */ - constructor(gateway) { - super(gateway); - this.ws = new WebSocket(gateway); - if (browser) this.ws.binaryType = 'arraybuffer'; - this.ws.onmessage = this.eventMessage.bind(this); - this.ws.onopen = this.emit.bind(this, 'open'); - this.ws.onclose = this.emit.bind(this, 'close'); - this.ws.onerror = this.emit.bind(this, 'error'); + constructor(manager, gateway) { + super(); + /** + * WebSocket Manager of this connection + * @type {WebSocketManager} + */ + this.manager = manager; + /** + * Client this belongs to + * @type {Client} + */ + this.client = manager.client; + /** + * WebSocket connection itself + * @type {WebSocket} + */ + this.ws = null; + /** + * Current sequence of the WebSocket + * @type {number} + */ + this.sequence = -1; + /** + * Current status of the client + * @type {number} + */ + this.status = Constants.Status.IDLE; + /** + * Packet Manager of the connection + * @type {WebSocketPacketManager} + */ + this.packetManager = new PacketManager(this); + /** + * Last time a ping was sent (a timestamp) + * @type {number} + */ + this.lastPingTimestamp = 0; + /** + * Contains the rate limit queue and metadata + * @type {Object} + */ + this.ratelimit = { + queue: [], + remaining: 120, + resetTime: -1, + }; + this.connect(gateway); + /** + * Events that are disabled (will not be processed) + * @type {Object} + */ + this.disabledEvents = {}; + /** + * Sequence on WebSocket close + * @type {number} + */ + this.closeSequence = 0; + for (const event of this.client.options.disabledEvents) this.disabledEvents[event] = true; } /** - * Called when the websocket gets a message - * @param {Object} event Close event object - * @returns {Promise} + * Causes the client to be marked as ready and emits the ready event + * @returns {void} */ - eventMessage(event) { - try { - const data = this.unpack(event.data); - this.emit('packet', data); - return true; - } catch (err) { - if (this.listenerCount('decodeError')) this.emit('decodeError', err); - return false; + triggerReady() { + if (this.status === Constants.Status.READY) { + this.debug('Tried to mark self as ready, but already ready'); + return; } + this.status = Constants.Status.READY; + this.client.emit(Constants.Events.READY); + this.packetManager.handleQueue(); } /** - * Send data over the websocket - * @param {string|Buffer} data Data to send + * Checks whether the client is ready to be marked as ready + * @returns {void} */ - send(data) { - this.ws.send(this.pack(data)); + checkIfReady() { + if (this.status === Constants.Status.READY || this.status === Constants.Status.NEARLY) return false; + let unavailableGuilds = 0; + for (const guild of this.client.guilds.values()) { + if (!guild.available) unavailableGuilds++; + } + if (unavailableGuilds === 0) { + this.status = Constants.Status.NEARLY; + if (!this.client.options.fetchAllMembers) return this.triggerReady(); + // Fetch all members before marking self as ready + const promises = this.client.guilds.map(g => g.fetchMembers()); + Promise.all(promises) + .then(() => this.triggerReady()) + .catch(e => { + this.debug(`Failed to fetch all members before ready! ${e}`); + this.triggerReady(); + }); + } + return true; + } + + // Util + /** + * Emits a debug message + * @param {string} message Debug message + * @returns {void} + */ + debug(message) { + if (message instanceof Error) message = message.stack; + return this.manager.debug(`[connection] ${message}`); } /** - * Pack data using JSON or Erlpack - * @param {*} data Data to pack + * Attempts to serialise data from the WebSocket + * @param {string|Object} data Data to unpack + * @returns {Object} + */ + unpack(data) { + if (erlpack && typeof data !== 'string') { + if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data)); + return erlpack.unpack(data); + } else if (data instanceof ArrayBuffer || data instanceof Buffer) { + data = zlib.inflateSync(data).toString(); + } + return JSON.parse(data); + } + + /** + * Packs an object ready to be sent + * @param {Object} data Data to pack * @returns {string|Buffer} */ pack(data) { @@ -72,45 +166,288 @@ class WebSocketConnection extends EventEmitter { } /** - * Unpack data using JSON or Erlpack - * @param {string|ArrayBuffer|Buffer} data Data to unpack - * @returns {string|Object} + * Processes the current WebSocket queue */ - unpack(data) { - if (erlpack && typeof data !== 'string') { - if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data)); - return erlpack.unpack(data); - } else { - if (data instanceof ArrayBuffer || data instanceof Buffer) data = this.inflate(data); - return JSON.parse(data); + processQueue() { + if (this.ratelimit.remaining === 0) return; + if (this.ratelimit.queue.length === 0) return; + if (this.ratelimit.remaining === 120) { + this.ratelimit.resetTimer = setTimeout(() => { + this.ratelimit.remaining = 120; + this.processQueue(); + }, 120e3); // eslint-disable-line + } + while (this.ratelimit.remaining > 0) { + const item = this.ratelimit.queue.shift(); + if (!item) return; + this._send(item); + this.ratelimit.remaining--; } } /** - * Zlib inflate data - * @param {string|Buffer} data Data to inflate - * @returns {string|Buffer} + * Sends data, bypassing the queue + * @param {Object} data Packet to send + * @returns {void} */ - inflate(data) { - return erlpack ? data : zlib.inflateSync(data).toString(); + _send(data) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.debug(`Tried to send packet ${data} but no WebSocket is available!`); + return; + } + this.ws.send(this.pack(data)); } /** - * State of the WebSocket - * @type {number} - * @readonly + * Adds data to the queue to be sent + * @param {Object} data Packet to send + * @returns {void} */ - get readyState() { - return this.ws.readyState; + send(data) { + if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { + this.debug(`Tried to send packet ${data} but no WebSocket is available!`); + return; + } + this.ratelimit.queue.push(data); + this.processQueue(); } /** - * Close the WebSocket - * @param {number} code Close code - * @param {string} [reason] Close reason + * Creates a connection to a gateway + * @param {string} gateway Gateway to connect to + * @param {number} [after=0] How long to wait before connecting + * @param {boolean} [force=false] Whether or not to force a new connection even if one already exists + * @returns {boolean} */ - close(code, reason) { - this.ws.close(code, reason); + connect(gateway = this.gateway, after = 0, force = false) { + if (after) return this.client.setTimeout(() => this.connect(gateway, 0, force), after); // eslint-disable-line + if (this.ws && !force) { + this.debug('WebSocket connection already exists'); + return false; + } else if (typeof gateway !== 'string') { + this.debug(`Tried to connect to an invalid gateway: ${gateway}`); + return false; + } + this.gateway = gateway; + this.debug(`Connecting to ${gateway}`); + const ws = this.ws = new WebSocket(gateway); + if (browser) ws.binaryType = 'arraybuffer'; + ws.onmessage = this.onMessage.bind(this); + ws.onopen = this.onOpen.bind(this); + ws.onerror = this.onError.bind(this); + ws.onclose = this.onClose.bind(this); + this.status = Constants.Status.CONNECTING; + return true; + } + + /** + * Destroys the connection + * @returns {boolean} + */ + destroy() { + const ws = this.ws; + if (!ws) { + this.debug('Attempted to destroy WebSocket but no connection exists!'); + return false; + } + this.heartbeat(-1); + ws.close(1000); + this.packetManager.handleQueue(); + this.ws = null; + this.status = Constants.Status.DISCONNECTED; + return true; + } + + /** + * Called whenever a message is received + * @param {Event} event Event received + * @returns {boolean} + */ + onMessage(event) { + try { + this.onPacket(this.unpack(event.data)); + return true; + } catch (err) { + this.debug(err); + return false; + } + } + + /** + * Sets the current sequence of the connection + * @param {number} s New sequence + */ + setSequence(s) { + this.sequence = s > this.sequence ? s : this.sequence; + } + + /** + * Called whenever a packet is received + * @param {Object} packet received packet + * @returns {boolean} + */ + onPacket(packet) { + if (!packet) { + this.debug('Received null packet'); + return false; + } + this.client.emit('raw', packet); + switch (packet.op) { + case Constants.OPCodes.HELLO: + return this.heartbeat(packet.d.heartbeat_interval); + case Constants.OPCodes.RECONNECT: + return this.reconnect(); + case Constants.OPCodes.INVALID_SESSION: + if (!packet.d) this.sessionID = null; + this.debug('Session invalidated -- will identify with a new session'); + return this.identify(packet.d ? 2500 : 0); + case Constants.OPCodes.HEARTBEAT_ACK: + return this.ackHeartbeat(); + case Constants.OPCodes.HEARTBEAT: + return this.heartbeat(); + default: + return this.packetManager.handle(packet); + } + } + + /** + * Called whenever a connection is opened to the gateway + * @param {Event} event Received open event + */ + onOpen(event) { + this.gateway = event.target.url; + this.debug(`Connected to gateway ${this.gateway}`); + this.identify(); + } + + /** + * Causes a reconnection to the gateway + */ + reconnect() { + this.debug('Attemping to reconnect in 5500ms...'); + this.client.emit(Constants.Events.RECONNECTING); + this.connect(this.gateway, 5500, true); + } + + /** + * Called whenever an error occurs with the WebSocket + * @param {Error} error Error that occurred + */ + onError(error) { + this.client.emit(Constants.Events.ERROR, error); + } + + /** + * @external CloseEvent + * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} + */ + + /** + * Called whenever a connection to the gateway is closed + * @param {CloseEvent} event Close event that was received + */ + onClose(event) { + this.debug(`Closed: ${event.code}`); + this.closeSequence = this.sequence; + // Reset the state before trying to fix anything + this.emit('close', event); + this.heartbeat(-1); + // Should we reconnect? + if (Constants.WSCodes[event.code]) { + this.debug(Constants.WSCodes[event.code]); + this.destroy(); + return; + } + this.reconnect(); + } + + // Heartbeat + /** + * Acknowledges a heartbeat + */ + ackHeartbeat() { + this.debug(`Heartbeat acknowledged, latency of ${Date.now() - this.lastPingTimestamp}ms`); + this.client._pong(this.lastPingTimestamp); + } + + /** + * Sends a heartbeat or sets an interval for sending heartbeats. + * @param {number} [time] If -1, clears the interval, any other number sets an interval. + * If no value is given, a heartbeat will be sent instantly. + */ + heartbeat(time) { + if (!isNaN(time)) { + if (time === -1) { + this.debug('Clearing heartbeat interval'); + this.client.clearInterval(this.heartbeatInterval); + this.heartbeatInterval = null; + } else { + this.debug(`Setting a heartbeat interval for ${time}ms`); + this.heartbeatInterval = this.client.setInterval(() => this.heartbeat(), time); + } + return; + } + this.debug('Sending a heartbeat'); + this.lastPingTimestamp = Date.now(); + this.send({ + op: Constants.OPCodes.HEARTBEAT, + d: this.sequence, + }); + } + + // Identification + /** + * Identifies the client on a connection + * @param {number} [after] How long to wait before identifying + * @returns {void} + */ + identify(after) { + if (after) return this.client.setTimeout(this.identify.apply(this), after); + return this.sessionID ? this.identifyResume() : this.identifyNew(); + } + + /** + * Identifies as a new connection on the gateway + * @returns {void} + */ + identifyNew() { + if (!this.client.token) { + this.debug('No token available to identify a new session with'); + return; + } + // Clone the generic payload and assign the token + const d = Object.assign({ token: this.client.token }, this.client.options.ws); + + // Sharding stuff + const { shardId, shardCount } = this.client.options; + if (shardCount > 0) d.shard = [Number(shardId), Number(shardCount)]; + + // Send the payload + this.debug('Identifying as a new session'); + this.send({ op: Constants.OPCodes.IDENTIFY, d }); + } + + /** + * Resumes a session on the gateway + * @returns {void} + */ + identifyResume() { + if (!this.sessionID) { + this.debug('Warning: wanted to resume but session ID not available; identifying as a new session instead'); + return this.identifyNew(); + } + this.debug(`Attempting to resume session ${this.sessionID}`); + + const d = { + token: this.client.token, + session_id: this.sessionID, + seq: this.sequence, + }; + + return this.send({ + op: Constants.OPCodes.RESUME, + d, + }); } } diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index 5218710fe..dca05932d 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -1,328 +1,89 @@ const EventEmitter = require('events').EventEmitter; const Constants = require('../../util/Constants'); -const PacketManager = require('./packets/WebSocketPacketManager'); const WebSocketConnection = require('./WebSocketConnection'); /** - * The WebSocket Manager of the Client + * WebSocket Manager of the Client * @private */ class WebSocketManager extends EventEmitter { constructor(client) { super(); /** - * The Client that instantiated this WebSocketManager + * Client that instantiated this WebSocketManager * @type {Client} */ this.client = client; /** - * A WebSocket Packet manager, it handles all the messages - * @type {PacketManager} + * WebSocket connection of this manager + * @type {?WebSocketConnection} */ - this.packetManager = new PacketManager(this); - - /** - * The status of the WebSocketManager, a type of Constants.Status. It defaults to IDLE. - * @type {number} - */ - this.status = Constants.Status.IDLE; - - /** - * The session ID of the connection, null if not yet available. - * @type {?string} - */ - this.sessionID = null; - - /** - * The packet count of the client, null if not yet available. - * @type {?number} - */ - this.sequence = -1; - - /** - * The gateway address for this WebSocket connection, null if not yet available. - * @type {?string} - */ - this.gateway = null; - - /** - * Whether READY was emitted normally (all packets received) or not - * @type {boolean} - */ - this.normalReady = false; - - /** - * The WebSocket connection to the gateway - * @type {?WebSocket} - */ - this.ws = null; - - /** - * An object with keys that are websocket event names that should be ignored - * @type {Object} - */ - this.disabledEvents = {}; - for (const event of client.options.disabledEvents) this.disabledEvents[event] = true; - - this.first = true; - - this.lastHeartbeatAck = true; - - this._remainingReset = 0; - - this._trace = []; - this.resumeStart = -1; + this.connection = null; } /** - * Connects the client to a given gateway - * @param {string} gateway The gateway to connect to + * Sends a heartbeat on the available connection + * @returns {void} */ - _connect(gateway) { - this.client.emit('debug', `Connecting to gateway ${gateway}`); - this.normalReady = false; - if (this.status !== Constants.Status.RECONNECTING) this.status = Constants.Status.CONNECTING; - this.ws = new WebSocketConnection(gateway); - this.ws.on('open', this.eventOpen.bind(this)); - this.ws.on('packet', this.eventPacket.bind(this)); - this.ws.on('close', this.eventClose.bind(this)); - this.ws.on('error', this.eventError.bind(this)); - this._queue = []; - this._remaining = 120; - this.client.setInterval(() => { - this._remaining = 120; - this._remainingReset = Date.now(); - }, 60e3); - } - - connect(gateway = this.gateway) { - this.gateway = gateway; - if (this.first) { - this._connect(gateway); - this.first = false; - } else { - this.client.setTimeout(() => this._connect(gateway), 7500); - } - } - - heartbeat(normal) { - if (normal && !this.lastHeartbeatAck) { - this.tryReconnect(); - return; - } - - this.client.emit('debug', 'Sending heartbeat'); - this.client._pingTimestamp = Date.now(); - this.client.ws.send({ - op: Constants.OPCodes.HEARTBEAT, - d: this.sequence, - }, true); - - this.lastHeartbeatAck = false; + heartbeat() { + if (!this.connection) return this.debug('No connection to heartbeat'); + return this.connection.heartbeat(); } /** - * Sends a packet to the gateway - * @param {Object} data An object that can be JSON stringified - * @param {boolean} force Whether or not to send the packet immediately + * Emits a debug event + * @param {string} message Debug message + * @returns {void} */ - send(data, force = false) { - if (force) { - this._send(data); - return; - } - this._queue.push(data); - this.doQueue(); + debug(message) { + return this.client.emit('debug', `[ws] ${message}`); } + /** + * Destroy the client + * @returns {void} Whether or not destruction was successful + */ destroy() { - if (this.ws) this.ws.close(1000); - this._queue = []; - this.status = Constants.Status.IDLE; - } - - _send(data) { - if (this.ws.readyState === WebSocketConnection.WebSocket.OPEN) { - this.emit('send', data); - this.ws.send(data); - } - } - - doQueue() { - const item = this._queue[0]; - if (!(this.ws.readyState === WebSocketConnection.WebSocket.OPEN && item)) return; - if (this.remaining === 0) { - this.client.setTimeout(this.doQueue.bind(this), Date.now() + (this._remainingReset || 120e3)); - return; - } - this._remaining--; - this._send(item); - this._queue.shift(); - this.doQueue(); - } - - /** - * Run whenever the gateway connections opens up - */ - eventOpen() { - this.client.emit('debug', 'Connection to gateway opened'); - this.lastHeartbeatAck = true; - if (this.sessionID) this._sendResume(); - else this._sendNewIdentify(); - } - - /** - * Sends a gateway resume packet, in cases of unexpected disconnections. - */ - _sendResume() { - if (!this.sessionID) { - this._sendNewIdentify(); - return; - } - this.client.emit('debug', 'Identifying as resumed session'); - this.resumeStart = this.sequence; - const payload = { - token: this.client.token, - session_id: this.sessionID, - seq: this.sequence, - }; - - this.send({ - op: Constants.OPCodes.RESUME, - d: payload, - }); - } - - /** - * Sends a new identification packet, in cases of new connections or failed reconnections. - */ - _sendNewIdentify() { - this.reconnecting = false; - const payload = this.client.options.ws; - payload.token = this.client.token; - if (this.client.options.shardCount > 0) { - payload.shard = [Number(this.client.options.shardId), Number(this.client.options.shardCount)]; - } - this.client.emit('debug', 'Identifying as new session'); - this.send({ - op: Constants.OPCodes.IDENTIFY, - d: payload, - }); - this.sequence = -1; - } - - /** - * @external CloseEvent - * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} - */ - - /** - * Run whenever the connection to the gateway is closed, it will try to reconnect the client. - * @param {CloseEvent} event The WebSocket close event - */ - eventClose(event) { - this.emit('close', event); - this.client.clearInterval(this.client.manager.heartbeatInterval); - this.status = Constants.Status.DISCONNECTED; - this._queue = []; - /** - * Emitted whenever the client websocket is disconnected - * @event Client#disconnect - * @param {CloseEvent} event The WebSocket close event - */ - if (!this.reconnecting) this.client.emit(Constants.Events.DISCONNECT, event); - if ([4004, 4010, 4011].includes(event.code)) return; - if (!this.reconnecting && event.code !== 1000) this.tryReconnect(); - } - - /** - * Run whenever a message is received from the WebSocket. Returns `true` if the message - * was handled properly. - * @param {Object} data The received websocket data - * @returns {boolean} - */ - eventPacket(data) { - if (data === null) { - this.eventError(new Error(Constants.Errors.BAD_WS_MESSAGE)); + if (!this.connection) { + this.debug('Attempted to destroy WebSocket but no connection exists!'); return false; } - - this.client.emit('raw', data); - - if (data.op === Constants.OPCodes.HELLO) this.client.manager.setupKeepAlive(data.d.heartbeat_interval); - return this.packetManager.handle(data); + return this.connection.destroy(); } /** - * Run whenever an error occurs with the WebSocket connection. Tries to reconnect - * @param {Error} err The encountered error + * Send a packet on the available WebSocket + * @param {Object} packet Packet to send + * @returns {void} */ - eventError(err) { - /** - * Emitted whenever the Client encounters a serious connection error - * @event Client#error - * @param {Error} error The encountered error - */ - if (this.client.listenerCount('error') > 0) this.client.emit('error', err); - this.tryReconnect(); - } - - _emitReady(normal = true) { - /** - * Emitted when the Client becomes ready to start working - * @event Client#ready - */ - this.status = Constants.Status.READY; - this.client.emit(Constants.Events.READY); - this.packetManager.handleQueue(); - this.normalReady = normal; - } - - /** - * Runs on new packets before `READY` to see if the Client is ready yet, if it is prepares - * the `READY` event. - */ - checkIfReady() { - if (this.status !== Constants.Status.READY && this.status !== Constants.Status.NEARLY) { - let unavailableCount = 0; - for (const guildID of this.client.guilds.keys()) { - unavailableCount += this.client.guilds.get(guildID).available ? 0 : 1; - } - if (unavailableCount === 0) { - this.status = Constants.Status.NEARLY; - if (this.client.options.fetchAllMembers) { - const promises = this.client.guilds.map(g => g.fetchMembers()); - Promise.all(promises).then(() => this._emitReady(), e => { - this.client.emit(Constants.Events.WARN, 'Error in pre-ready guild member fetching'); - this.client.emit(Constants.Events.ERROR, e); - this._emitReady(); - }); - return; - } - this._emitReady(); - } + send(packet) { + if (!this.connection) { + this.debug('No connection to websocket'); + return; } + this.connection.send(packet); } /** - * Tries to reconnect the client, changing the status to Constants.Status.RECONNECTING. + * Connects the client to a gateway + * @param {string} gateway Gateway to connect to + * @returns {boolean} */ - tryReconnect() { - if ( - this.status === Constants.Status.RECONNECTING || - this.status === Constants.Status.CONNECTING || - !this.client.token - ) return; - this.status = Constants.Status.RECONNECTING; - this.ws.close(); - this.packetManager.handleQueue(); - /** - * Emitted when the Client tries to reconnect after being disconnected - * @event Client#reconnecting - */ - this.client.emit(Constants.Events.RECONNECTING); - this.connect(this.client.ws.gateway); + connect(gateway) { + if (!this.connection) { + this.connection = new WebSocketConnection(this, gateway); + return true; + } + switch (this.connection.status) { + case Constants.Status.IDLE: + case Constants.Status.DISCONNECTED: + this.connection.connect(gateway, 5500); + return true; + default: + this.debug(`Couldn't connect to ${gateway} as the websocket is at state ${this.connection.status}`); + return false; + } } } diff --git a/src/client/websocket/packets/WebSocketPacketManager.js b/src/client/websocket/packets/WebSocketPacketManager.js index 112170d9b..9a6d5692c 100644 --- a/src/client/websocket/packets/WebSocketPacketManager.js +++ b/src/client/websocket/packets/WebSocketPacketManager.js @@ -2,6 +2,7 @@ const Constants = require('../../../util/Constants'); const BeforeReadyWhitelist = [ Constants.WSEvents.READY, + Constants.WSEvents.RESUMED, Constants.WSEvents.GUILD_CREATE, Constants.WSEvents.GUILD_DELETE, Constants.WSEvents.GUILD_MEMBERS_CHUNK, @@ -10,8 +11,8 @@ const BeforeReadyWhitelist = [ ]; class WebSocketPacketManager { - constructor(websocketManager) { - this.ws = websocketManager; + constructor(connection) { + this.ws = connection; this.handlers = {}; this.queue = []; @@ -63,35 +64,12 @@ class WebSocketPacketManager { handleQueue() { this.queue.forEach((element, index) => { - this.handle(this.queue[index]); + this.handle(this.queue[index], true); this.queue.splice(index, 1); }); } - setSequence(s) { - if (s && s > this.ws.sequence) this.ws.sequence = s; - } - - handle(packet) { - if (packet.op === Constants.OPCodes.RECONNECT) { - this.setSequence(packet.s); - this.ws.tryReconnect(); - return false; - } - - if (packet.op === Constants.OPCodes.INVALID_SESSION) { - this.client.emit('debug', `SESSION INVALID! Waiting to reconnect: ${packet.d}`); - if (packet.d) { - setTimeout(() => { - this.ws._sendResume(); - }, 2500); - } else { - this.ws.sessionID = null; - this.ws._sendNewIdentify(); - } - return false; - } - + handle(packet, queue = false) { if (packet.op === Constants.OPCodes.HEARTBEAT_ACK) { this.ws.client._pong(this.ws.client._pingTimestamp); this.ws.lastHeartbeatAck = true; @@ -109,7 +87,7 @@ class WebSocketPacketManager { this.ws.checkIfReady(); } - this.setSequence(packet.s); + this.ws.setSequence(packet.s); if (this.ws.disabledEvents[packet.t] !== undefined) return false; @@ -120,6 +98,8 @@ class WebSocketPacketManager { } } + if (!queue && this.queue.length > 0) this.handleQueue(); + if (this.handlers[packet.t]) return this.handlers[packet.t].handle(packet); return false; } diff --git a/src/client/websocket/packets/handlers/Ready.js b/src/client/websocket/packets/handlers/Ready.js index 81bf447e3..4c383dfe9 100644 --- a/src/client/websocket/packets/handlers/Ready.js +++ b/src/client/websocket/packets/handlers/Ready.js @@ -59,10 +59,12 @@ class ReadyHandler extends AbstractHandler { }); } - client.setTimeout(() => { - if (!client.ws.normalReady) client.ws._emitReady(false); + const t = client.setTimeout(() => { + client.ws.connection.triggerReady(); }, 1200 * data.guilds.length); + client.once('ready', () => client.clearTimeout(t)); + const ws = this.packetManager.ws; ws.sessionID = data.session_id; diff --git a/src/client/websocket/packets/handlers/Resumed.js b/src/client/websocket/packets/handlers/Resumed.js index a4cfd716e..54272e4e6 100644 --- a/src/client/websocket/packets/handlers/Resumed.js +++ b/src/client/websocket/packets/handlers/Resumed.js @@ -1,18 +1,20 @@ const AbstractHandler = require('./AbstractHandler'); +const Constants = require('../../../../util/Constants'); class ResumedHandler extends AbstractHandler { handle(packet) { const client = this.packetManager.client; - const ws = client.ws; + const ws = client.ws.connection; ws._trace = packet.d._trace; - const replayed = ws.sequence - ws.resumeStart; - ws.resumeStart = -1; + ws.status = Constants.Status.READY; + this.packetManager.handleQueue(); + + const replayed = ws.sequence - ws.closeSequence; client.emit('debug', `RESUMED ${ws._trace.join(' -> ')} | replayed ${replayed} events. `); client.emit('resume', replayed); - ws.heartbeat(); } } diff --git a/src/util/Constants.js b/src/util/Constants.js index 3230ea612..e79408877 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -70,6 +70,13 @@ exports.DefaultOptions = { }, }; +exports.WSCodes = { + 1000: 'Connection gracefully closed', + 4004: 'Tried to identify with an invalid token', + 4010: 'Sharding data provided was invalid', + 4011: 'Shard would be on too many guilds if connected', +}; + exports.Errors = { NO_TOKEN: 'Request to use token, but token was unavailable to the client.', NO_BOT_ACCOUNT: 'Only bot accounts are able to make use of this feature.', diff --git a/test/random.js b/test/random.js index 5ba989038..f177a2e9d 100644 --- a/test/random.js +++ b/test/random.js @@ -4,20 +4,25 @@ const Discord = require('../'); const request = require('superagent'); const fs = require('fs'); -const client = new Discord.Client({ fetchAllMembers: false, apiRequestMethod: 'sequential' }); +console.time('magic'); + +const client = new Discord.Client({ fetchAllMembers: true, apiRequestMethod: 'sequential' }); const { email, password, token, usertoken, song } = require('./auth.json'); client.login(token).then(atoken => console.log('logged in with token ' + atoken)).catch(console.error); client.on('ready', () => { - console.log('ready'); + console.log(`ready with ${client.users.size} users`); + console.timeEnd('magic'); }); client.on('userUpdate', (o, n) => { console.log(o.username, n.username); }); +client.on('debug', console.log); + client.on('emojiCreate', e => console.log('create!!', e.name)); client.on('emojiDelete', e => console.log('delete!!', e.name)); client.on('emojiUpdate', (o, n) => console.log('update!!', o.name, n.name)); @@ -43,7 +48,7 @@ client.on('message', message => { let count = 0; let ecount = 0; for(let x = 0; x < 4000; x++) { - message.channel.sendMessage(`this is message ${x} of 3999`) + message.channel.send(`this is message ${x} of 3999`) .then(m => { count++; console.log('reached', count, ecount); @@ -57,7 +62,7 @@ client.on('message', message => { } if (message.content === 'myperms?') { - message.channel.sendMessage('Your permissions are:\n' + + message.channel.send('Your permissions are:\n' + JSON.stringify(message.channel.permissionsFor(message.author).serialize(), null, 4)); } @@ -78,7 +83,7 @@ client.on('message', message => { .get('url') .end((err, res) => { client.user.setAvatar(res.body).catch(console.error) - .then(user => message.channel.sendMessage('Done!')); + .then(user => message.channel.send('Done!')); }); } @@ -99,11 +104,11 @@ client.on('message', message => { m += `I am aware of ${client.channels.size} channels overall\n`; m += `I am aware of ${client.guilds.size} guilds overall\n`; m += `I am aware of ${client.users.size} users overall\n`; - message.channel.sendMessage(m).then(msg => msg.edit('nah')).catch(console.error); + message.channel.send(m).then(msg => msg.edit('nah')).catch(console.error); } if (message.content === 'messageme!') { - message.author.sendMessage('oh, hi there!').catch(e => console.log(e.stack)); + message.author.send('oh, hi there!').catch(e => console.log(e.stack)); } if (message.content === 'don\'t dm me') { @@ -113,7 +118,7 @@ client.on('message', message => { if (message.content.startsWith('kick')) { message.guild.member(message.mentions[0]).kick().then(member => { console.log(member); - message.channel.sendMessage('Kicked!' + member.user.username); + message.channel.send('Kicked!' + member.user.username); }).catch(console.error); } @@ -121,10 +126,10 @@ client.on('message', message => { let i = 1; const start = Date.now(); while (i <= 20) { - message.channel.sendMessage(`Testing my rates, item ${i} of 20`); + message.channel.send(`Testing my rates, item ${i} of 20`); i++; } - message.channel.sendMessage('last one...').then(m => { + message.channel.send('last one...').then(m => { const diff = Date.now() - start; m.reply(`Each message took ${diff / 21}ms to send`); }); @@ -132,7 +137,7 @@ client.on('message', message => { if (message.content === 'makerole') { message.guild.createRole().then(role => { - message.channel.sendMessage(`Made role ${role.name}`); + message.channel.send(`Made role ${role.name}`); }).catch(console.error); } } @@ -148,15 +153,15 @@ function chanLoop(channel) { client.on('message', msg => { if (msg.content.startsWith('?raw')) { - msg.channel.sendMessage('```' + msg.content + '```'); + msg.channel.send('```' + msg.content + '```'); } if (msg.content.startsWith('#eval') && msg.author.id === '66564597481480192') { try { const com = eval(msg.content.split(" ").slice(1).join(" ")); - msg.channel.sendMessage('```\n' + com + '```'); + msg.channel.send('```\n' + com + '```'); } catch(e) { - msg.channel.sendMessage('```\n' + e + '```'); + msg.channel.send('```\n' + e + '```'); } } }); @@ -191,12 +196,12 @@ client.on('message', msg => { client.on('messageReactionAdd', (reaction, user) => { if (reaction.message.channel.id !== '222086648706498562') return; - reaction.message.channel.sendMessage(`${user.username} added reaction ${reaction.emoji}, count is now ${reaction.count}`); + reaction.message.channel.send(`${user.username} added reaction ${reaction.emoji}, count is now ${reaction.count}`); }); client.on('messageReactionRemove', (reaction, user) => { if (reaction.message.channel.id !== '222086648706498562') return; - reaction.message.channel.sendMessage(`${user.username} removed reaction ${reaction.emoji}, count is now ${reaction.count}`); + reaction.message.channel.send(`${user.username} removed reaction ${reaction.emoji}, count is now ${reaction.count}`); }); client.on('message', m => { @@ -205,7 +210,7 @@ client.on('message', m => { m.channel.fetchMessage(mID).then(rM => { for (const reaction of rM.reactions.values()) { reaction.fetchUsers().then(users => { - m.channel.sendMessage( + m.channel.send( `The following gave that message ${reaction.emoji}:\n` + `${users.map(u => u.username).map(t => `- ${t}`).join('\n')}` );