diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index 4e4eec1ee..891da3f45 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -1,3 +1,4 @@ +const Collection = require('../../util/Collection'); const WebSocketShard = require('./WebSocketShard'); const { Events, Status, WSEvents } = require('../../util/Constants'); const PacketHandlers = require('./handlers'); @@ -32,9 +33,9 @@ class WebSocketManager { /** * An array of shards spawned by this WebSocketManager. - * @type {WebSocketShard[]} + * @type {Collection} */ - this.shards = []; + this.shards = new Collection(); /** * An array of queued shards to be spawned by this WebSocketManager. @@ -80,7 +81,7 @@ class WebSocketManager { */ get ping() { const sum = this.shards.reduce((a, b) => a + b.ping, 0); - return sum / this.shards.length; + return sum / this.shards.size; } /** @@ -133,8 +134,8 @@ class WebSocketManager { if (typeof item === 'string' && !isNaN(item)) item = Number(item); if (typeof item === 'number') { - const shard = new WebSocketShard(this, item, this.shards[item]); - this.shards[item] = shard; + const shard = new WebSocketShard(this, item, this.shards.get(item)); + this.shards.set(item, shard); shard.once(Events.READY, () => { this.spawning = false; this.client.setTimeout(() => this._handleSessionLimit(shard), 5000); @@ -161,8 +162,8 @@ class WebSocketManager { this.spawn(this.client.options.shards); } else if (Array.isArray(this.client.options.shards)) { this.debug(`Spawning ${this.client.options.shards.length} shards`); - for (let i = 0; i < this.client.options.shards.length; i++) { - this.spawn(this.client.options.shards[i]); + for (const shard of this.client.options.shards) { + this.spawn(shard); } } else { this.debug(`Spawning ${this.client.options.shardCount} shards`); @@ -190,11 +191,11 @@ class WebSocketManager { if (this.packetQueue.length) { const item = this.packetQueue.shift(); this.client.setImmediate(() => { - this.handlePacket(item.packet, this.shards[item.shardID]); + this.handlePacket(item.packet, this.shards.get(item.shardID)); }); } - if (packet && PacketHandlers[packet.t]) { + if (packet && !this.client.options.disabledEvents.includes(packet.t) && PacketHandlers[packet.t]) { PacketHandlers[packet.t](this.client, packet, shard); } @@ -207,7 +208,7 @@ class WebSocketManager { * @private */ checkReady() { - if (this.shards.filter(s => s).length !== this.client.options.shardCount || + if (this.shards.size !== this.client.options.shardCount || this.shards.some(s => s && s.status !== Status.READY)) { return false; } @@ -257,8 +258,7 @@ class WebSocketManager { * @param {*} packet The packet to send */ broadcast(packet) { - for (const shard of this.shards) { - if (!shard) continue; + for (const shard of this.shards.values()) { shard.send(packet); } } @@ -273,8 +273,7 @@ class WebSocketManager { // Lock calls to spawn this.spawning = true; - for (const shard of this.shards) { - if (!shard) continue; + for (const shard of this.shards.values()) { shard.destroy(); } } diff --git a/src/client/websocket/WebSocketShard.js b/src/client/websocket/WebSocketShard.js index e9c01e0b0..7aec37e25 100644 --- a/src/client/websocket/WebSocketShard.js +++ b/src/client/websocket/WebSocketShard.js @@ -1,6 +1,7 @@ const EventEmitter = require('events'); const WebSocket = require('../../WebSocket'); const { Status, Events, OPCodes, WSEvents, WSCodes } = require('../../util/Constants'); +const Util = require('../../util/Util'); let zlib; try { zlib = require('zlib-sync'); @@ -107,6 +108,12 @@ class WebSocketShard extends EventEmitter { */ this.inflate = null; + /** + * Whether or not the WebSocket is expected to be closed + * @type {boolean} + */ + this.expectingClose = false; + this.connect(); } @@ -143,6 +150,7 @@ class WebSocketShard extends EventEmitter { this.heartbeatInterval = null; } else { this.debug(`Setting a heartbeat interval for ${time}ms`); + if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval); this.heartbeatInterval = this.manager.client.setInterval(() => this.heartbeat(), time); } return; @@ -193,7 +201,7 @@ class WebSocketShard extends EventEmitter { /** * Called whenever a packet is received * @param {Object} packet Packet received - * @returns {boolean} + * @returns {any} * @private */ onPacket(packet) { @@ -229,10 +237,18 @@ class WebSocketShard extends EventEmitter { case OPCodes.RECONNECT: return this.reconnect(); case OPCodes.INVALID_SESSION: - if (!packet.d) this.sessionID = null; this.sequence = -1; this.debug('Session invalidated'); - return this.reconnect(Events.INVALIDATED); + // If the session isn't resumable + if (!packet.d) { + // If we had a session ID before + if (this.sessionID) { + this.sessionID = null; + return this.identify(2500); + } + return this.identify(5000); + } + return this.identify(); case OPCodes.HEARTBEAT_ACK: return this.ackHeartbeat(); case OPCodes.HEARTBEAT: @@ -275,7 +291,7 @@ class WebSocketShard extends EventEmitter { this.manager.client.emit(Events.ERROR, err); return; } - if (packet.t === 'READY') { + if (packet.t === WSEvents.READY) { /** * Emitted when a shard becomes ready * @event WebSocketShard#ready @@ -320,6 +336,7 @@ class WebSocketShard extends EventEmitter { /** * Called whenever a connection to the gateway is closed. * @param {CloseEvent} event Close event that was received + * @returns {void} * @private */ onClose(event) { @@ -333,19 +350,22 @@ class WebSocketShard extends EventEmitter { * @param {number} shardID The shard that disconnected */ this.manager.client.emit(Events.DISCONNECT, event, this.id); - this.debug(WSCodes[event.code]); + this.heartbeat(-1); return; } - this.reconnect(Events.INVALIDATED); + this.expectingClose = false; + this.reconnect(Events.INVALIDATED, 5100); } /** * Identifies the client on a connection. + * @param {?number} [wait=0] Amount of time to wait before identifying * @returns {void} * @private */ - identify() { + identify(wait = 0) { + if (wait) return this.manager.client.setTimeout(this.identify.bind(this), wait); return this.sessionID ? this.identifyResume() : this.identifyNew(); } @@ -427,7 +447,7 @@ class WebSocketShard extends EventEmitter { if (this.ratelimit.remaining === 0) return; if (this.ratelimit.queue.length === 0) return; if (this.ratelimit.remaining === this.ratelimit.total) { - this.ratelimit.resetTimer = this.manager.client.setTimeout(() => { + this.ratelimit.timer = this.manager.client.setTimeout(() => { this.ratelimit.remaining = this.ratelimit.total; this.processQueue(); }, this.ratelimit.time); @@ -443,10 +463,11 @@ class WebSocketShard extends EventEmitter { /** * Triggers a shard reconnect. * @param {?string} [event] The event for the shard to emit - * @returns {void} + * @param {?number} [reconnectIn] Time to wait before reconnecting + * @returns {Promise} * @private */ - reconnect(event) { + async reconnect(event, reconnectIn) { this.heartbeat(-1); this.status = Status.RECONNECTING; @@ -457,6 +478,8 @@ class WebSocketShard extends EventEmitter { this.manager.client.emit(Events.RECONNECTING, this.id); if (event === Events.INVALIDATED) this.emit(event); + this.debug(reconnectIn ? `Reconnecting in ${reconnectIn}ms` : 'Reconnecting now'); + if (reconnectIn) await Util.delayFor(reconnectIn); this.manager.spawn(this.id); } @@ -472,6 +495,10 @@ class WebSocketShard extends EventEmitter { this.ws = null; this.status = Status.DISCONNECTED; this.ratelimit.remaining = this.ratelimit.total; + if (this.ratelimit.timer) { + this.manager.client.clearTimeout(this.ratelimit.timer); + this.ratelimit.timer = null; + } } } module.exports = WebSocketShard; diff --git a/src/sharding/ShardingManager.js b/src/sharding/ShardingManager.js index 9bfce6fe2..b71962290 100644 --- a/src/sharding/ShardingManager.js +++ b/src/sharding/ShardingManager.js @@ -27,7 +27,7 @@ class ShardingManager extends EventEmitter { /** * @param {string} file Path to your shard script file * @param {Object} [options] Options for the sharding manager - * @param {string|number[]} [options.totalShards='auto'] Number of total shards of all shard managers or "auto" + * @param {string|number} [options.totalShards='auto'] Number of total shards of all shard managers or "auto" * @param {string|number[]} [options.shardList='auto'] List of shards to spawn or "auto" * @param {ShardingManagerMode} [options.mode='process'] Which mode to use for shards * @param {boolean} [options.respawn=true] Whether shards should automatically respawn upon exiting diff --git a/src/structures/ClientPresence.js b/src/structures/ClientPresence.js index 06924589d..68dda64c5 100644 --- a/src/structures/ClientPresence.js +++ b/src/structures/ClientPresence.js @@ -15,10 +15,10 @@ class ClientPresence extends Presence { this.client.ws.broadcast({ op: OPCodes.STATUS_UPDATE, d: packet }); } else if (Array.isArray(presence.shardID)) { for (const shardID of presence.shardID) { - this.client.ws.shards[shardID].send({ op: OPCodes.STATUS_UPDATE, d: packet }); + this.client.ws.shards.get(shardID).send({ op: OPCodes.STATUS_UPDATE, d: packet }); } } else { - this.client.ws.shards[presence.shardID].send({ op: OPCodes.STATUS_UPDATE, d: packet }); + this.client.ws.shards.get(presence.shardID).send({ op: OPCodes.STATUS_UPDATE, d: packet }); } return this; } diff --git a/src/structures/Guild.js b/src/structures/Guild.js index ffef8578b..a194ea15a 100644 --- a/src/structures/Guild.js +++ b/src/structures/Guild.js @@ -88,7 +88,7 @@ class Guild extends Base { * @readonly */ get shard() { - return this.client.ws.shards[this.shardID]; + return this.client.ws.shards.get(this.shardID); } /* eslint-disable complexity */ diff --git a/typings/index.d.ts b/typings/index.d.ts index faed57ef2..71366d56d 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -1291,7 +1291,7 @@ declare module 'discord.js' { public readonly client: Client; public gateway: string | undefined; public readonly ping: number; - public shards: WebSocketShard[]; + public shards: Collection; public sessionStartLimit: { total: number; remaining: number; reset_after: number; }; public status: Status; public broadcast(packet: any): void;