diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index fd7499682..bc34b84b3 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -1,6 +1,7 @@ 'use strict'; const Collection = require('../../util/Collection'); +const Util = require('../../util/Util'); const WebSocketShard = require('./WebSocketShard'); const { Events, Status, WSEvents } = require('../../util/Constants'); const PacketHandlers = require('./handlers'); @@ -16,7 +17,7 @@ const BeforeReadyWhitelist = [ ]; /** - * WebSocket Manager of the client. + * The WebSocket manager for this client. */ class WebSocketManager { constructor(client) { @@ -28,52 +29,60 @@ class WebSocketManager { Object.defineProperty(this, 'client', { value: client }); /** - * The gateway this WebSocketManager uses. + * The gateway this manager uses * @type {?string} */ this.gateway = undefined; /** - * An array of shards spawned by this WebSocketManager. + * A collection of all shards this manager handles * @type {Collection} */ this.shards = new Collection(); /** - * An array of queued shards to be spawned by this WebSocketManager. - * @type {Array} + * An array of shards to be spawned or reconnected + * @type {Array} * @private */ - this.spawnQueue = []; + this.shardQueue = []; /** - * Whether or not this WebSocketManager is currently spawning shards. - * @type {boolean} - * @private - */ - this.spawning = false; - - /** - * An array of queued events before this WebSocketManager became ready. + * An array of queued events before this WebSocketManager became ready * @type {object[]} * @private */ this.packetQueue = []; /** - * The current status of this WebSocketManager. + * The current status of this WebSocketManager * @type {number} */ this.status = Status.IDLE; /** - * The current session limit of the client. + * If this manager is expected to close + * @type {boolean} + * @private + */ + this.expectingClose = false; + + /** + * The current session limit of the client * @type {?Object} + * @private * @prop {number} total Total number of identifies available * @prop {number} remaining Number of identifies remaining * @prop {number} reset_after Number of milliseconds after which the limit resets */ this.sessionStartLimit = null; + + /** + * If the manager is currently reconnecting shards + * @type {boolean} + * @private + */ + this.isReconnectingShards = false; } /** @@ -89,103 +98,147 @@ class WebSocketManager { /** * Emits a debug event. * @param {string} message Debug message - * @returns {void} * @private */ debug(message) { - this.client.emit(Events.DEBUG, `[connection] ${message}`); + this.client.emit(Events.DEBUG, message); } /** - * Handles the session identify rate limit for a shard. - * @param {WebSocketShard} shard Shard to handle + * Checks if a new identify payload can be sent. * @private + * @returns {Promise} */ - async _handleSessionLimit(shard) { + async _checkSessionLimit() { this.sessionStartLimit = await this.client.api.gateway.bot.get().then(r => r.session_start_limit); const { remaining, reset_after } = this.sessionStartLimit; - if (remaining !== 0) { - this.spawn(); - } else { - shard.debug(`Exceeded identify threshold, setting a timeout for ${reset_after} ms`); - setTimeout(() => this.spawn(), this.sessionStartLimit.reset_after); - } + if (remaining !== 0) return true; + return reset_after; } /** - * Used to spawn WebSocketShards. - * @param {?WebSocketShard|WebSocketShard[]|number|string} query The WebSocketShards to be spawned - * @returns {void} + * Handles the session identify rate limit for creating a shard. * @private */ - spawn(query) { - if (query !== undefined) { - if (Array.isArray(query)) { - for (const item of query) { - if (!this.spawnQueue.includes(item)) this.spawnQueue.push(item); - } - } else if (!this.spawnQueue.includes(query)) { - this.spawnQueue.push(query); - } - } - - if (this.spawning || !this.spawnQueue.length) return; - - this.spawning = true; - let item = this.spawnQueue.shift(); - - if (typeof item === 'string' && !isNaN(item)) item = Number(item); - if (typeof item === 'number') { - const shard = new WebSocketShard(this, item, this.shards.get(item)); - this.shards.set(item, shard); - shard.once(Events.READY, () => { - this.spawning = false; - this.client.setTimeout(() => this._handleSessionLimit(shard), 5000); - }); - shard.once(Events.INVALIDATED, () => { - this.spawning = false; - }); - } else if (item instanceof WebSocketShard) { - item.reconnect(); + async _handleSessionLimit() { + const canSpawn = await this._checkSessionLimit(); + if (typeof canSpawn === 'number') { + this.debug(`Exceeded identify threshold, setting a timeout for ${canSpawn}ms`); + await Util.delayFor(canSpawn); } + this.create(); } /** * Creates a connection to a gateway. * @param {string} [gateway=this.gateway] The gateway to connect to - * @returns {void} * @private */ connect(gateway = this.gateway) { this.gateway = gateway; if (typeof this.client.options.shards === 'number') { - this.debug('Spawning 1 shard'); - this.spawn(this.client.options.shards); + this.debug(`Spawning shard with ID ${this.client.options.shards}`); + this.shardQueue.push(this.client.options.shards); } else if (Array.isArray(this.client.options.shards)) { this.debug(`Spawning ${this.client.options.shards.length} shards`); - for (const shard of this.client.options.shards) { - this.spawn(shard); - } + this.shardQueue.push(...this.client.options.shards); } else { this.debug(`Spawning ${this.client.options.shardCount} shards`); - for (let i = 0; i < this.client.options.shardCount; i++) { - this.spawn(i); + this.shardQueue.push(...Array.from({ length: this.client.options.shardCount }, (_, index) => index)); + } + this.create(); + } + + /** + * Creates or reconnects a shard. + * @private + */ + create() { + // Nothing to create + if (!this.shardQueue.length) return; + + let item = this.shardQueue.shift(); + if (typeof item === 'string' && !isNaN(item)) item = Number(item); + + if (item instanceof WebSocketShard) { + const timeout = setTimeout(() => { + this.debug(`[Shard ${item.id}] Failed to connect in 15s... Destroying and trying again`); + item.destroy(); + if (!this.shardQueue.includes(item)) this.shardQueue.push(item); + this.reconnect(true); + }, 15000); + item.once(Events.READY, this._shardReady.bind(this, timeout)); + item.once(Events.RESUMED, this._shardReady.bind(this, timeout)); + item.connect(); + return; + } + + const shard = new WebSocketShard(this, item); + this.shards.set(item, shard); + shard.once(Events.READY, this._shardReady.bind(this)); + } + + /** + * Shared handler for shards turning ready or resuming. + * @param {Timeout} [timeout=null] Optional timeout to clear if shard didn't turn ready in time + * @private + */ + _shardReady(timeout = null) { + if (timeout) clearTimeout(timeout); + if (this.shardQueue.length) { + this.client.setTimeout(this._handleSessionLimit.bind(this), 5000); + } else { + this.isReconnectingShards = false; + } + } + + /** + * Handles the reconnect of a shard. + * @param {WebSocketShard|boolean} shard The shard to reconnect, or a boolean to indicate an immediate reconnect + * @private + */ + async reconnect(shard) { + // If the item is a shard, add it to the queue + if (shard instanceof WebSocketShard) this.shardQueue.push(shard); + if (typeof shard === 'boolean') { + // If a boolean is passed, force a reconnect right now + } else if (this.isReconnectingShards) { + // If we're already reconnecting shards, and no boolean was provided, return + return; + } + this.isReconnectingShards = true; + try { + await this._handleSessionLimit(); + } catch (error) { + // If we get an error at this point, it means we cannot reconnect anymore + if (this.client.listenerCount(Events.INVALIDATED)) { + /** + * Emitted when the client's session becomes invalidated. + * You are expected to handle closing the process gracefully and preventing a boot loop + * if you are listening to this event. + * @event Client#invalidated + */ + this.client.emit(Events.INVALIDATED); + // Destroy just the shards. This means you have to handle the cleanup yourself + this.destroy(); + } else { + this.client.destroy(); } } } /** * Processes a packet and queues it if this WebSocketManager is not ready. - * @param {Object} packet The packet to be handled - * @param {WebSocketShard} shard The shard that will handle this packet + * @param {Object} [packet] The packet to be handled + * @param {WebSocketShard} [shard] The shard that will handle this packet * @returns {boolean} * @private */ handlePacket(packet, shard) { if (packet && this.status !== Status.READY) { if (!BeforeReadyWhitelist.includes(packet.t)) { - this.packetQueue.push({ packet, shardID: shard.id }); + this.packetQueue.push({ packet, shard }); return false; } } @@ -193,7 +246,7 @@ class WebSocketManager { if (this.packetQueue.length) { const item = this.packetQueue.shift(); this.client.setImmediate(() => { - this.handlePacket(item.packet, this.shards.get(item.shardID)); + this.handlePacket(item.packet, item.shard); }); } @@ -201,7 +254,7 @@ class WebSocketManager { PacketHandlers[packet.t](this.client, packet, shard); } - return false; + return true; } /** @@ -211,7 +264,7 @@ class WebSocketManager { */ checkReady() { if (this.shards.size !== this.client.options.shardCount || - this.shards.some(s => s && s.status !== Status.READY)) { + this.shards.some(s => s.status !== Status.READY)) { return false; } @@ -258,26 +311,22 @@ class WebSocketManager { /** * Broadcasts a message to every shard in this WebSocketManager. * @param {*} packet The packet to send + * @private */ broadcast(packet) { - for (const shard of this.shards.values()) { - shard.send(packet); - } + for (const shard of this.shards.values()) shard.send(packet); } /** * Destroys all shards. - * @returns {void} * @private */ destroy() { - this.gateway = undefined; - // Lock calls to spawn - this.spawning = true; - - for (const shard of this.shards.values()) { - shard.destroy(); - } + if (this.expectingClose) return; + this.expectingClose = true; + this.isReconnectingShards = false; + this.shardQueue.length = 0; + for (const shard of this.shards.values()) shard.destroy(); } } diff --git a/src/client/websocket/WebSocketShard.js b/src/client/websocket/WebSocketShard.js index 1cf0ef1c6..96ab06a44 100644 --- a/src/client/websocket/WebSocketShard.js +++ b/src/client/websocket/WebSocketShard.js @@ -4,6 +4,7 @@ const EventEmitter = require('events'); const WebSocket = require('../../WebSocket'); const { Status, Events, OPCodes, WSEvents, WSCodes } = require('../../util/Constants'); const Util = require('../../util/Util'); + let zlib; try { zlib = require('zlib-sync'); @@ -13,10 +14,10 @@ try { } /** - * Represents a Shard's Websocket connection. + * Represents a Shard's WebSocket connection */ class WebSocketShard extends EventEmitter { - constructor(manager, id, oldShard) { + constructor(manager, id) { super(); /** @@ -26,7 +27,7 @@ class WebSocketShard extends EventEmitter { this.manager = manager; /** - * The id of the this shard. + * The ID of the this shard * @type {number} */ this.id = id; @@ -38,28 +39,28 @@ class WebSocketShard extends EventEmitter { this.status = Status.IDLE; /** - * The current sequence of the WebSocket + * The current sequence of the shard * @type {number} * @private */ - this.sequence = oldShard ? oldShard.sequence : -1; + this.sequence = -1; /** - * The sequence on WebSocket close + * The sequence of the shard after close * @type {number} * @private */ - this.closeSequence = oldShard ? oldShard.closeSequence : 0; + this.closeSequence = 0; /** - * The current session id of the WebSocket - * @type {?string} + * The current session ID of the shard + * @type {string} * @private */ - this.sessionID = oldShard && oldShard.sessionID; + this.sessionID = undefined; /** - * Previous heartbeat pings of the websocket (most recent first, limited to three elements) + * The previous 3 heartbeat pings of the shard (most recent first) * @type {number[]} */ this.pings = []; @@ -71,6 +72,13 @@ class WebSocketShard extends EventEmitter { */ this.lastPingTimestamp = -1; + /** + * If we received a heartbeat ack back. Used to identify zombie connections + * @type {boolean} + * @private + */ + this.lastHeartbeatAcked = true; + /** * List of servers the shard is connected to * @type {string[]} @@ -96,7 +104,7 @@ class WebSocketShard extends EventEmitter { * @type {?WebSocket} * @private */ - this.ws = null; + this.connection = null; /** * @external Inflate @@ -110,13 +118,7 @@ class WebSocketShard extends EventEmitter { */ this.inflate = null; - /** - * Whether or not the WebSocket is expected to be closed - * @type {boolean} - */ - this.expectingClose = false; - - this.connect(); + if (this.manager.gateway) this.connect(); } /** @@ -135,35 +137,42 @@ class WebSocketShard extends EventEmitter { * @private */ debug(message) { - this.manager.debug(`[shard ${this.id}] ${message}`); + this.manager.debug(`[Shard ${this.id}] ${message}`); } /** - * Sends a heartbeat or sets an interval for sending heartbeats. - * @param {number} [time] If -1, clears the interval, any other number sets an interval - * If no value is given, a heartbeat will be sent instantly + * Sends a heartbeat to the WebSocket. + * If this shard didn't receive a heartbeat last time, it will destroy it and reconnect * @private */ - heartbeat(time) { - if (!isNaN(time)) { - if (time === -1) { + sendHeartbeat() { + if (!this.lastHeartbeatAcked) { + this.debug("Didn't receive a heartbeat ack last time, assuming zombie conenction. Destroying and reconnecting."); + this.connection.close(4000); + return; + } + this.debug('Sending a heartbeat'); + this.lastHeartbeatAcked = false; + this.lastPingTimestamp = Date.now(); + this.send({ op: OPCodes.HEARTBEAT, d: this.sequence }); + } + + /** + * Sets the heartbeat timer for this shard. + * @param {number} time If -1, clears the interval, any other number sets an interval + * @private + */ + setHeartbeatTimer(time) { + if (time === -1) { + if (this.heartbeatInterval) { this.debug('Clearing heartbeat interval'); this.manager.client.clearInterval(this.heartbeatInterval); this.heartbeatInterval = null; - } else { - this.debug(`Setting a heartbeat interval for ${time}ms`); - if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval); - this.heartbeatInterval = this.manager.client.setInterval(() => this.heartbeat(), time); } return; } - - this.debug('Sending a heartbeat'); - this.lastPingTimestamp = Date.now(); - this.send({ - op: OPCodes.HEARTBEAT, - d: this.sequence, - }); + this.debug(`Setting a heartbeat interval for ${time}ms`); + this.heartbeatInterval = this.manager.client.setInterval(() => this.sendHeartbeat(), time); } /** @@ -171,6 +180,7 @@ class WebSocketShard extends EventEmitter { * @private */ ackHeartbeat() { + this.lastHeartbeatAcked = true; const latency = Date.now() - this.lastPingTimestamp; this.debug(`Heartbeat acknowledged, latency of ${latency}ms`); this.pings.unshift(latency); @@ -178,18 +188,19 @@ class WebSocketShard extends EventEmitter { } /** - * Connects the shard to a gateway. + * Connects this shard to the gateway. * @private */ connect() { + const { expectingClose, gateway } = this.manager; + if (expectingClose) return; this.inflate = new zlib.Inflate({ chunkSize: 65535, flush: zlib.Z_SYNC_FLUSH, to: WebSocket.encoding === 'json' ? 'string' : '', }); - const gateway = this.manager.gateway; this.debug(`Connecting to ${gateway}`); - const ws = this.ws = WebSocket.create(gateway, { + const ws = this.connection = WebSocket.create(gateway, { v: this.manager.client.options.ws.version, compress: 'zlib-stream', }); @@ -200,73 +211,12 @@ class WebSocketShard extends EventEmitter { this.status = Status.CONNECTING; } - /** - * Called whenever a packet is received - * @param {Object} packet Packet received - * @returns {any} - * @private - */ - onPacket(packet) { - if (!packet) { - this.debug('Received null packet'); - return false; - } - - switch (packet.t) { - case WSEvents.READY: - this.sessionID = packet.d.session_id; - this.trace = packet.d._trace; - this.status = Status.READY; - this.debug(`READY ${this.trace.join(' -> ')} ${this.sessionID}`); - this.heartbeat(); - break; - case WSEvents.RESUMED: { - this.trace = packet.d._trace; - this.status = Status.READY; - const replayed = packet.s - this.closeSequence; - this.debug(`RESUMED ${this.trace.join(' -> ')} | replayed ${replayed} events.`); - this.heartbeat(); - break; - } - } - - if (packet.s > this.sequence) this.sequence = packet.s; - - switch (packet.op) { - case OPCodes.HELLO: - this.identify(); - return this.heartbeat(packet.d.heartbeat_interval); - case OPCodes.RECONNECT: - return this.reconnect(); - case OPCodes.INVALID_SESSION: - this.sequence = -1; - this.debug('Session invalidated'); - // If the session isn't resumable - if (!packet.d) { - // If we had a session ID before - if (this.sessionID) { - this.sessionID = null; - return this.identify(2500); - } - return this.identify(5000); - } - return this.identify(); - case OPCodes.HEARTBEAT_ACK: - return this.ackHeartbeat(); - case OPCodes.HEARTBEAT: - return this.heartbeat(); - default: - return this.manager.handlePacket(packet, this); - } - } - /** * Called whenever a connection is opened to the gateway. - * @param {Event} event Received open event * @private */ onOpen() { - this.debug('Connection open'); + this.debug('Connected to the gateway'); } /** @@ -293,87 +243,102 @@ class WebSocketShard extends EventEmitter { this.manager.client.emit(Events.ERROR, err); return; } - if (packet.t === WSEvents.READY) { - /** - * Emitted when a shard becomes ready - * @event WebSocketShard#ready - */ - this.emit(Events.READY); - - /** - * Emitted when a shard becomes ready - * @event Client#shardReady - * @param {number} shardID The id of the shard - */ - this.manager.client.emit(Events.SHARD_READY, this.id); - } this.onPacket(packet); } /** - * Called whenever an error occurs with the WebSocket. - * @param {Error} error The error that occurred + * Called whenever a packet is received. + * @param {Object} packet Packet received * @private */ - onError(error) { - if (error && error.message === 'uWs client connection error') { - this.reconnect(); + onPacket(packet) { + if (!packet) { + this.debug('Received null or broken packet'); return; } - this.emit(Events.INVALIDATED); - /** - * Emitted whenever the client's WebSocket encounters a connection error. - * @event Client#error - * @param {Error} error The encountered error - */ - this.manager.client.emit(Events.ERROR, error); - } + switch (packet.t) { + case WSEvents.READY: + /** + * Emitted when a shard becomes ready. + * @event WebSocketShard#ready + */ + this.emit(Events.READY); + /** + * Emitted when a shard becomes ready. + * @event Client#shardReady + * @param {number} shardID The ID of the shard + */ + this.manager.client.emit(Events.SHARD_READY, this.id); - /** - * @external CloseEvent - * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} - */ - - /** - * Called whenever a connection to the gateway is closed. - * @param {CloseEvent} event Close event that was received - * @returns {void} - * @private - */ - onClose(event) { - this.closeSequence = this.sequence; - this.emit('close', event); - if (event.code === 1000 ? this.expectingClose : WSCodes[event.code]) { - /** - * Emitted when the client's WebSocket disconnects and will no longer attempt to reconnect. - * @event Client#disconnect - * @param {CloseEvent} event The WebSocket close event - * @param {number} shardID The shard that disconnected - */ - this.manager.client.emit(Events.DISCONNECT, event, this.id); - this.debug(WSCodes[event.code]); - this.heartbeat(-1); - return; + this.sessionID = packet.d.session_id; + this.trace = packet.d._trace; + this.status = Status.READY; + this.debug(`READY ${this.trace.join(' -> ')} | Session ${this.sessionID}`); + this.lastHeartbeatAcked = true; + this.sendHeartbeat(); + break; + case WSEvents.RESUMED: { + this.emit(Events.RESUMED); + this.trace = packet.d._trace; + this.status = Status.READY; + const replayed = packet.s - this.closeSequence; + this.debug(`RESUMED ${this.trace.join(' -> ')} | Replayed ${replayed} events.`); + this.lastHeartbeatAcked = true; + this.sendHeartbeat(); + break; + } + } + + if (packet.s > this.sequence) this.sequence = packet.s; + + switch (packet.op) { + case OPCodes.HELLO: + this.setHeartbeatTimer(packet.d.heartbeat_interval); + this.identify(); + break; + case OPCodes.RECONNECT: + this.connection.close(1001); + break; + case OPCodes.INVALID_SESSION: + this.debug(`Session was invalidated. Resumable: ${packet.d}.`); + // If the session isn't resumable + if (!packet.d) { + // Reset the sequence, since it isn't valid anymore + this.sequence = -1; + // If we had a session ID before + if (this.sessionID) { + this.sessionID = null; + this.connection.close(1000); + return; + } + this.connection.close(1000); + return; + } + this.identifyResume(); + break; + case OPCodes.HEARTBEAT_ACK: + this.ackHeartbeat(); + break; + case OPCodes.HEARTBEAT: + this.sendHeartbeat(); + break; + default: + this.manager.handlePacket(packet, this); } - this.expectingClose = false; - this.reconnect(Events.INVALIDATED, 5100); } /** * Identifies the client on a connection. - * @param {?number} [wait=0] Amount of time to wait before identifying * @returns {void} * @private */ - identify(wait = 0) { - if (wait) return this.manager.client.setTimeout(this.identify.bind(this), wait); + identify() { return this.sessionID ? this.identifyResume() : this.identifyNew(); } /** * Identifies as a new connection on the gateway. - * @returns {void} * @private */ identifyNew() { @@ -382,10 +347,11 @@ class WebSocketShard extends EventEmitter { return; } // Clone the generic payload and assign the token - const d = Object.assign({ token: this.manager.client.token }, this.manager.client.options.ws); - - const { totalShardCount } = this.manager.client.options; - d.shard = [this.id, Number(totalShardCount)]; + const d = { + ...this.manager.client.options.ws, + token: this.manager.client.token, + shard: [this.id, Number(this.manager.client.options.totalShardCount)], + }; // Send the payload this.debug('Identifying as a new session'); @@ -402,23 +368,90 @@ class WebSocketShard extends EventEmitter { this.debug('Warning: wanted to resume but session ID not available; identifying as a new session instead'); return this.identifyNew(); } - this.debug(`Attempting to resume session ${this.sessionID}`); + + this.debug(`Attempting to resume session ${this.sessionID} at sequence ${this.closeSequence}`); const d = { token: this.manager.client.token, session_id: this.sessionID, - seq: this.sequence, + seq: this.closeSequence, }; - return this.send({ - op: OPCodes.RESUME, - d, - }); + return this.send({ op: OPCodes.RESUME, d }); + } + + /** + * Called whenever an error occurs with the WebSocket. + * @param {Error} error The error that occurred + * @private + */ + onError(error) { + if (error && error.message === 'uWs client connection error') { + this.connection.close(4000); + return; + } + + /** + * Emitted whenever the client's WebSocket encounters a connection error. + * @event Client#error + * @param {Error} error The encountered error + * @param {number} shardID The shard that encountered this error + */ + this.manager.client.emit(Events.ERROR, error, this.id); + } + + /** + * @external CloseEvent + * @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent} + */ + + /** + * Called whenever a connection to the gateway is closed. + * @param {CloseEvent} event Close event that was received + * @private + */ + onClose(event) { + this.closeSequence = this.sequence; + this.debug(`WebSocket was closed. + Event Code: ${event.code} + Reason: ${event.reason}`); + + if (event.code === 1000 ? this.manager.expectingClose : WSCodes[event.code]) { + /** + * Emitted when the client's WebSocket disconnects and will no longer attempt to reconnect. + * @event Client#disconnect + * @param {CloseEvent} event The WebSocket close event + * @param {number} shardID The shard that disconnected + */ + this.manager.client.emit(Events.DISCONNECT, event, this.id); + this.debug(WSCodes[event.code]); + return; + } + + this.destroy(); + + this.status = Status.RECONNECTING; + + /** + * Emitted whenever a shard tries to reconnect to the WebSocket. + * @event Client#reconnecting + * @param {number} shardID The shard ID that is reconnecting + */ + this.manager.client.emit(Events.RECONNECTING, this.id); + + this.debug(`${this.sessionID ? `Reconnecting in 3500ms` : 'Queueing a reconnect'} to the gateway...`); + + if (this.sessionID) { + Util.delayFor(3500).then(() => this.connect()); + } else { + this.manager.reconnect(this); + } } /** * Adds data to the queue to be sent. * @param {Object} data Packet to send + * @private * @returns {void} */ send(data) { @@ -433,11 +466,12 @@ class WebSocketShard extends EventEmitter { * @private */ _send(data) { - if (!this.ws || this.ws.readyState !== WebSocket.OPEN) { - this.debug(`Tried to send packet ${data} but no WebSocket is available!`); + if (!this.connection || this.connection.readyState !== WebSocket.OPEN) { + this.debug(`Tried to send packet ${JSON.stringify(data)} but no WebSocket is available!`); return; } - this.ws.send(WebSocket.pack(data), err => { + + this.connection.send(WebSocket.pack(data), err => { if (err) this.manager.client.emit(Events.ERROR, err); }); } @@ -465,44 +499,22 @@ class WebSocketShard extends EventEmitter { } /** - * Triggers a shard reconnect. - * @param {?string} [event] The event for the shard to emit - * @param {?number} [reconnectIn] Time to wait before reconnecting - * @returns {Promise} - * @private - */ - async reconnect(event, reconnectIn) { - this.heartbeat(-1); - this.status = Status.RECONNECTING; - - /** - * Emitted whenever a shard tries to reconnect to the WebSocket. - * @event Client#reconnecting - */ - this.manager.client.emit(Events.RECONNECTING, this.id); - - if (event === Events.INVALIDATED) this.emit(event); - this.debug(reconnectIn ? `Reconnecting in ${reconnectIn}ms` : 'Reconnecting now'); - if (reconnectIn) await Util.delayFor(reconnectIn); - this.manager.spawn(this.id); - } - - /** - * Destroys the current shard and terminates its connection. - * @returns {void} + * Destroys this shard and closes its connection. * @private */ destroy() { - this.heartbeat(-1); - this.expectingClose = true; - if (this.ws) this.ws.close(1000); - this.ws = null; + this.setHeartbeatTimer(-1); + if (this.connection) this.connection.close(1000); + this.connection = null; this.status = Status.DISCONNECTED; this.ratelimit.remaining = this.ratelimit.total; + this.ratelimit.queue.length = 0; if (this.ratelimit.timer) { this.manager.client.clearTimeout(this.ratelimit.timer); this.ratelimit.timer = null; } + this.sequence = -1; } } + module.exports = WebSocketShard; diff --git a/src/client/websocket/handlers/GUILD_SYNC.js b/src/client/websocket/handlers/GUILD_SYNC.js deleted file mode 100644 index b7e7d1b24..000000000 --- a/src/client/websocket/handlers/GUILD_SYNC.js +++ /dev/null @@ -1,5 +0,0 @@ -'use strict'; - -module.exports = (client, packet) => { - client.actions.GuildSync.handle(packet.d); -}; diff --git a/typings/index.d.ts b/typings/index.d.ts index dccd33049..1b1d17fc7 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -434,7 +434,7 @@ declare module 'discord.js' { public presences: PresenceStore; public region: string; public roles: RoleStore; - public shard: WebSocketShard; + public readonly shard: WebSocketShard; public shardID: number; public splash: string; public readonly systemChannel: TextChannel; @@ -1295,22 +1295,22 @@ declare module 'discord.js' { public gateway: string | undefined; public readonly ping: number; public shards: Collection; - public sessionStartLimit: { total: number; remaining: number; reset_after: number; }; public status: Status; - public broadcast(packet: any): void; + + public broadcast(packet: object): void; } export class WebSocketShard extends EventEmitter { - constructor(manager: WebSocketManager, id: number, oldShard?: WebSocketShard); + constructor(manager: WebSocketManager, id: number); public id: number; public readonly ping: number; public pings: number[]; public status: Status; public manager: WebSocketManager; - public send(data: object): void; + + public send(packet: object): void; public on(event: 'ready', listener: () => void): this; - public once(event: 'ready', listener: () => void): this; }