From 5519d6fbaa61abccc27b742f97ad725bf4259265 Mon Sep 17 00:00:00 2001 From: Vlad Frangu Date: Sun, 15 Dec 2019 21:45:27 +0200 Subject: [PATCH] src: sharding cleanup and checkReady rewrite (#3393) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * src: Step 1 of who knows how many * src: Remove accidentally committed test file * src: Remove useless added property in package.json * docs: Trailing spaces, come back >.> * src: Buhbye uws, we will miss you..not! * src: Move 'auto' shard selection from totalShardCount to shards * src: tweak * src: Filter out floats from shard IDs You want half of a shard or what? * src: Misc cleanup and bugfix for GUILD_BAN_ADD * src: Rewrite checkReady * src: Misse this while merging master into my branch * typings: Bring these up to date * typings: Forgot allReady event * src: Don't checkReady if the shard isn't waiting for guilds * src: Fix a possible bug for when the ws dies and the session becomes -1 * src: Hopefully fix last edge case that could case a shard to infinitely boot loop * src: Rename totalShardCount to shardCount * src: Small bugfix * src: Correct error message for shardCount being imvalid Co-Authored-By: bdistin * src: Small tweaks * src: If this doesn't fix the issues I'm gonna throw a brick at my PC * src: I swear, STOP BREAKING * src: *groans at a certain snake* * src: Use undefined instead of null on destroy in close event Setting it to null sets the close code to null, which causes a WebSocket error to be thrown. The error is thrown from WebSocket, although there is no connection alive. Fun times! * src: @SpaceEEC's requested changes * src: Remove zucc from discord.js Discord is removing support for it, sooo... Bye bye * src: Missed this * src: Apply @kyranet's suggestions Co-Authored-By: Antonio Román * src: @kyranet's suggestions * src: Remove pako, update debug messages - Pako is officially gone from both enviroments Install zlib-sync on node.js if you want it - Improve a few debug messages some more - Discover that internal sharding works in browsers but please don't do that --- .eslintrc.json | 2 +- README.md | 5 +- docs/general/welcome.md | 11 +- package.json | 11 +- src/WebSocket.js | 13 +- src/client/Client.js | 37 +-- src/client/websocket/WebSocketManager.js | 104 ++++---- src/client/websocket/WebSocketShard.js | 244 +++++++++++------- .../websocket/handlers/GUILD_BAN_ADD.js | 2 +- src/client/websocket/handlers/GUILD_CREATE.js | 16 +- src/client/websocket/handlers/READY.js | 2 - src/sharding/Shard.js | 2 +- src/sharding/ShardClientUtil.js | 2 +- src/util/Constants.js | 15 +- typings/index.d.ts | 22 +- 15 files changed, 280 insertions(+), 208 deletions(-) diff --git a/.eslintrc.json b/.eslintrc.json index a4f08d689..549500347 100644 --- a/.eslintrc.json +++ b/.eslintrc.json @@ -1,7 +1,7 @@ { "extends": "eslint:recommended", "parserOptions": { - "ecmaVersion": 2018 + "ecmaVersion": 2019 }, "env": { "es6": true, diff --git a/README.md b/README.md index d667ab1ff..2003f60d5 100644 --- a/README.md +++ b/README.md @@ -52,13 +52,12 @@ For production bots, using node-opus should be considered a necessity, especiall ### Optional packages - [zlib-sync](https://www.npmjs.com/package/zlib-sync) for faster WebSocket data inflation (`npm install zlib-sync`) -- [zucc](https://www.npmjs.com/package/zucc) for significantly faster WebSocket data inflation (`npm install zucc`) - [erlpack](https://github.com/discordapp/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install discordapp/erlpack`) - One of the following packages can be installed for faster voice packet encryption and decryption: - [sodium](https://www.npmjs.com/package/sodium) (`npm install sodium`) - [libsodium.js](https://www.npmjs.com/package/libsodium-wrappers) (`npm install libsodium-wrappers`) -- [uws](https://www.npmjs.com/package/@discordjs/uws) for a much faster WebSocket connection (`npm install @discordjs/uws`) -- [bufferutil](https://www.npmjs.com/package/bufferutil) for a much faster WebSocket connection when *not* using uws (`npm install bufferutil`) +- [bufferutil](https://www.npmjs.com/package/bufferutil) for a much faster WebSocket connection (`npm install bufferutil`) +- [utf-8-validate](https://www.npmjs.com/package/utf-8-validate) in combination with `bufferutil` for much faster WebSocket processing (`npm install utf-8-validate`) ## Example usage ```js diff --git a/docs/general/welcome.md b/docs/general/welcome.md index d13e623b0..52cb3135f 100644 --- a/docs/general/welcome.md +++ b/docs/general/welcome.md @@ -24,8 +24,8 @@ v12 is still very much a work-in-progress, as we're aiming to make it the best i Only use it if you are fond of living life on the bleeding edge. ## About -discord.js is a powerful [Node.js](https://nodejs.org) module that allows you to interact with the -[Discord API](https://discordapp.com/developers/docs/intro) very easily. +discord.js is a powerful [Node.js](https://nodejs.org) module that allows you to easily interact with the +[Discord API](https://discordapp.com/developers/docs/intro). - Object-oriented - Predictable abstractions @@ -46,14 +46,13 @@ Using opusscript is only recommended for development environments where node-opu For production bots, using node-opus should be considered a necessity, especially if they're going to be running on multiple servers. ### Optional packages -- [zlib-sync](https://www.npmjs.com/package/zlib-sync) for significantly faster WebSocket data inflation (`npm install zlib-sync`) -- [zucc](https://www.npmjs.com/package/zucc) for significantly faster WebSocket data inflation (`npm install zucc`) +- [zlib-sync](https://www.npmjs.com/package/zlib-sync) for faster WebSocket data inflation (`npm install zlib-sync`) - [erlpack](https://github.com/discordapp/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install discordapp/erlpack`) - One of the following packages can be installed for faster voice packet encryption and decryption: - [sodium](https://www.npmjs.com/package/sodium) (`npm install sodium`) - [libsodium.js](https://www.npmjs.com/package/libsodium-wrappers) (`npm install libsodium-wrappers`) -- [uws](https://www.npmjs.com/package/@discordjs/uws) for a much faster WebSocket connection (`npm install @discordjs/uws`) -- [bufferutil](https://www.npmjs.com/package/bufferutil) for a much faster WebSocket connection when *not* using uws (`npm install bufferutil`) +- [bufferutil](https://www.npmjs.com/package/bufferutil) for a much faster WebSocket connection (`npm install bufferutil`) +- [utf-8-validate](https://www.npmjs.com/package/utf-8-validate) in combination with `bufferutil` for much faster WebSocket processing (`npm install utf-8-validate`) ## Example usage ```js diff --git a/package.json b/package.json index cda32552b..c8decaa3b 100644 --- a/package.json +++ b/package.json @@ -39,20 +39,18 @@ "abort-controller": "^3.0.0", "form-data": "^2.3.3", "node-fetch": "^2.3.0", - "pako": "^1.0.8", "prism-media": "^1.0.0", "setimmediate": "^1.0.5", "tweetnacl": "^1.0.1", - "ws": "^6.1.3" + "ws": "^7.2.0" }, "peerDependencies": { - "@discordjs/uws": "^11.149.1", "bufferutil": "^4.0.1", "erlpack": "discordapp/erlpack", "libsodium-wrappers": "^0.7.4", "sodium": "^3.0.2", - "zlib-sync": "^0.1.4", - "zucc": "^0.1.0" + "utf-8-validate": "^5.0.2", + "zlib-sync": "^0.1.6" }, "peerDependenciesMeta": { "@discordjs/uws": { @@ -97,8 +95,6 @@ "browser": { "https": false, "ws": false, - "uws": false, - "@discordjs/uws": false, "erlpack": false, "prism-media": false, "opusscript": false, @@ -107,7 +103,6 @@ "sodium": false, "worker_threads": false, "zlib-sync": false, - "zucc": false, "src/sharding/Shard.js": false, "src/sharding/ShardClientUtil.js": false, "src/sharding/ShardingManager.js": false, diff --git a/src/WebSocket.js b/src/WebSocket.js index 8c61dfb49..90dd51bb5 100644 --- a/src/WebSocket.js +++ b/src/WebSocket.js @@ -1,10 +1,13 @@ 'use strict'; const { browser } = require('./util/Constants'); + +let erlpack; + try { - var erlpack = require('erlpack'); + erlpack = require('erlpack'); if (!erlpack.pack) erlpack = null; -} catch (err) {} // eslint-disable-line no-empty +} catch {} // eslint-disable-line no-empty let TextDecoder; @@ -13,11 +16,7 @@ if (browser) { exports.WebSocket = window.WebSocket; // eslint-disable-line no-undef } else { TextDecoder = require('util').TextDecoder; - try { - exports.WebSocket = require('@discordjs/uws'); - } catch (err) { - exports.WebSocket = require('ws'); - } + exports.WebSocket = require('ws'); } const ab = new TextDecoder(); diff --git a/src/client/Client.js b/src/client/Client.js index 3be8e3115..a1ab6672e 100644 --- a/src/client/Client.js +++ b/src/client/Client.js @@ -36,7 +36,7 @@ class Client extends BaseClient { try { // Test if worker threads module is present and used data = require('worker_threads').workerData || data; - } catch (_) { + } catch { // Do nothing } @@ -46,25 +46,25 @@ class Client extends BaseClient { } } - if (this.options.totalShardCount === DefaultOptions.totalShardCount) { - if ('TOTAL_SHARD_COUNT' in data) { - this.options.totalShardCount = Number(data.TOTAL_SHARD_COUNT); + if (this.options.shardCount === DefaultOptions.shardCount) { + if ('SHARD_COUNT' in data) { + this.options.shardCount = Number(data.SHARD_COUNT); } else if (Array.isArray(this.options.shards)) { - this.options.totalShardCount = this.options.shards.length; - } else { - this.options.totalShardCount = this.options.shardCount; + this.options.shardCount = this.options.shards.length; } } - if (typeof this.options.shards === 'undefined' && typeof this.options.shardCount === 'number') { + const typeofShards = typeof this.options.shards; + + if (typeofShards === 'undefined' && typeof this.options.shardCount === 'number') { this.options.shards = Array.from({ length: this.options.shardCount }, (_, i) => i); } - if (typeof this.options.shards === 'number') this.options.shards = [this.options.shards]; + if (typeofShards === 'number') this.options.shards = [this.options.shards]; - if (typeof this.options.shards !== 'undefined') { + if (Array.isArray(this.options.shards)) { this.options.shards = [...new Set( - this.options.shards.filter(item => !isNaN(item) && item >= 0 && item < Infinity) + this.options.shards.filter(item => !isNaN(item) && item >= 0 && item < Infinity && item === (item | 0)) )]; } @@ -198,7 +198,9 @@ class Client extends BaseClient { async login(token = this.token) { if (!token || typeof token !== 'string') throw new Error('TOKEN_INVALID'); this.token = token = token.replace(/^(Bot|Bearer)\s*/i, ''); - this.emit(Events.DEBUG, `Provided token: ${token}`); + this.emit(Events.DEBUG, + `Provided token: ${token.split('.').map((val, i) => i > 1 ? val.replace(/./g, '*') : val).join('.')}` + ); if (this.options.presence) { this.options.ws.presence = await this.presence._parse(this.options.presence); @@ -363,14 +365,15 @@ class Client extends BaseClient { * @private */ _validateOptions(options = this.options) { // eslint-disable-line complexity - if (options.shardCount !== 'auto' && (typeof options.shardCount !== 'number' || isNaN(options.shardCount))) { - throw new TypeError('CLIENT_INVALID_OPTION', 'shardCount', 'a number or "auto"'); + if (typeof options.shardCount !== 'number' || isNaN(options.shardCount) || options.shardCount < 1) { + throw new TypeError('CLIENT_INVALID_OPTION', 'shardCount', 'a number greater than or equal to 1'); } - if (options.shards && !Array.isArray(options.shards)) { - throw new TypeError('CLIENT_INVALID_OPTION', 'shards', 'a number or array'); + if (options.shards && + !(options.shards === 'auto' || Array.isArray(options.shards)) + ) { + throw new TypeError('CLIENT_INVALID_OPTION', 'shards', '\'auto\', a number or array of numbers'); } if (options.shards && !options.shards.length) throw new RangeError('CLIENT_INVALID_PROVIDED_SHARDS'); - if (options.shardCount < 1) throw new RangeError('CLIENT_INVALID_OPTION', 'shardCount', 'at least 1'); if (typeof options.messageCacheMaxSize !== 'number' || isNaN(options.messageCacheMaxSize)) { throw new TypeError('CLIENT_INVALID_OPTION', 'messageCacheMaxSize', 'a number'); } diff --git a/src/client/websocket/WebSocketManager.js b/src/client/websocket/WebSocketManager.js index 8668d2de5..71cbf93c0 100644 --- a/src/client/websocket/WebSocketManager.js +++ b/src/client/websocket/WebSocketManager.js @@ -19,6 +19,7 @@ const BeforeReadyWhitelist = [ ]; const UNRECOVERABLE_CLOSE_CODES = [4004, 4010, 4011]; +const UNRESUMABLE_CLOSE_CODES = [1000, 4006, 4007]; /** * The WebSocket manager for this client. @@ -47,9 +48,9 @@ class WebSocketManager extends EventEmitter { /** * The amount of shards this manager handles * @private - * @type {number|string} + * @type {number} */ - this.totalShards = this.client.options.shardCount; + this.totalShards = this.client.options.shards.length; /** * A collection of all shards this manager handles @@ -143,25 +144,25 @@ class WebSocketManager extends EventEmitter { const { total, remaining, reset_after } = sessionStartLimit; this.debug(`Fetched Gateway Information - URL: ${gatewayURL} - Recommended Shards: ${recommendedShards}`); + URL: ${gatewayURL} + Recommended Shards: ${recommendedShards}`); this.debug(`Session Limit Information - Total: ${total} - Remaining: ${remaining}`); + Total: ${total} + Remaining: ${remaining}`); this.gateway = `${gatewayURL}/`; - if (this.totalShards === 'auto') { + const { shards } = this.client.options; + + if (shards === 'auto') { this.debug(`Using the recommended shard count provided by Discord: ${recommendedShards}`); - this.totalShards = this.client.options.shardCount = this.client.options.totalShardCount = recommendedShards; - if (typeof this.client.options.shards === 'undefined' || !this.client.options.shards.length) { + this.totalShards = this.client.options.shardCount = recommendedShards; + if (shards === 'auto' || !this.client.options.shards.length) { this.client.options.shards = Array.from({ length: recommendedShards }, (_, i) => i); } } - const { shards } = this.client.options; - if (Array.isArray(shards)) { this.totalShards = shards.length; this.debug(`Spawning shards: ${shards.join(', ')}`); @@ -190,15 +191,17 @@ class WebSocketManager extends EventEmitter { this.shardQueue.delete(shard); if (!shard.eventsAttached) { - shard.on(ShardEvents.READY, () => { + shard.on(ShardEvents.ALL_READY, unavailableGuilds => { /** * Emitted when a shard turns ready. * @event Client#shardReady * @param {number} id The shard ID that turned ready + * @param {?Set} unavailableGuilds Set of unavailable guild IDs, if any */ - this.client.emit(Events.SHARD_READY, shard.id); + this.client.emit(Events.SHARD_READY, shard.id, unavailableGuilds); if (!this.shardQueue.size) this.reconnecting = false; + this.checkShardsReady(); }); shard.on(ShardEvents.CLOSE, event => { @@ -214,8 +217,8 @@ class WebSocketManager extends EventEmitter { return; } - if (event.code === 1000 || event.code === 4006) { - // Any event code in this range cannot be resumed. + if (UNRESUMABLE_CLOSE_CODES.includes(event.code)) { + // These event codes cannot be resumed shard.sessionID = undefined; } @@ -226,27 +229,23 @@ class WebSocketManager extends EventEmitter { */ this.client.emit(Events.SHARD_RECONNECTING, shard.id); + this.shardQueue.add(shard); + if (shard.sessionID) { this.debug(`Session ID is present, attempting an immediate reconnect...`, shard); - shard.connect().catch(() => null); - return; + this.reconnect(true); + } else { + shard.destroy(undefined, true); + this.reconnect(); } - - shard.destroy(); - - this.shardQueue.add(shard); - this.reconnect(); }); shard.on(ShardEvents.INVALID_SESSION, () => { this.client.emit(Events.SHARD_RECONNECTING, shard.id); - - this.shardQueue.add(shard); - this.reconnect(); }); shard.on(ShardEvents.DESTROYED, () => { - this.debug('Shard was destroyed but no WebSocket connection existed... Reconnecting...', shard); + this.debug('Shard was destroyed but no WebSocket connection was present! Reconnecting...', shard); this.client.emit(Events.SHARD_RECONNECTING, shard.id); @@ -264,7 +263,7 @@ class WebSocketManager extends EventEmitter { } catch (error) { if (error && error.code && UNRECOVERABLE_CLOSE_CODES.includes(error.code)) { throw new DJSError(WSCodes[error.code]); - // Undefined if session is invalid, error event (or uws' event mimicking it) for regular closes + // Undefined if session is invalid, error event for regular closes } else if (!error || error.code) { this.debug('Failed to connect to the gateway, requeueing...', shard); this.shardQueue.add(shard); @@ -285,14 +284,15 @@ class WebSocketManager extends EventEmitter { /** * Handles reconnects for this manager. + * @param {boolean} [skipLimit=false] IF this reconnect should skip checking the session limit * @private * @returns {Promise} */ - async reconnect() { + async reconnect(skipLimit = false) { if (this.reconnecting || this.status !== Status.READY) return false; this.reconnecting = true; try { - await this._handleSessionLimit(); + if (!skipLimit) await this._handleSessionLimit(); await this.createShards(); } catch (error) { this.debug(`Couldn't reconnect or fetch information about the gateway. ${error}`); @@ -340,7 +340,7 @@ class WebSocketManager extends EventEmitter { this.debug(`Manager was destroyed. Called by:\n${new Error('MANAGER_DESTROYED').stack}`); this.destroyed = true; this.shardQueue.clear(); - for (const shard of this.shards.values()) shard.destroy(); + for (const shard of this.shards.values()) shard.destroy(1000, true); } /** @@ -356,8 +356,8 @@ class WebSocketManager extends EventEmitter { remaining = session_start_limit.remaining; resetAfter = session_start_limit.reset_after; this.debug(`Session Limit Information - Total: ${session_start_limit.total} - Remaining: ${remaining}`); + Total: ${session_start_limit.total} + Remaining: ${remaining}`); } if (!remaining) { this.debug(`Exceeded identify threshold. Will attempt a connection in ${resetAfter}ms`); @@ -396,45 +396,37 @@ class WebSocketManager extends EventEmitter { /** * Checks whether the client is ready to be marked as ready. - * @returns {boolean} * @private */ - checkReady() { + async checkShardsReady() { + if (this.status === Status.READY) return; if (this.shards.size !== this.totalShards || this.shards.some(s => s.status !== Status.READY)) { - return false; + return; } - const unavailableGuilds = this.client.guilds.reduce((acc, guild) => guild.available ? acc : acc + 1, 0); + this.status = Status.NEARLY; - // TODO: Rethink implementation for this - if (unavailableGuilds === 0) { - this.status = Status.NEARLY; - if (!this.client.options.fetchAllMembers) return this.triggerReady(); - // Fetch all members before marking self as ready - const promises = this.client.guilds.map(g => g.members.fetch()); - Promise.all(promises) - .then(() => this.triggerReady()) - .catch(e => { - this.debug(`Failed to fetch all members before ready! ${e}\n${e.stack}`); - this.triggerReady(); + if (this.client.options.fetchAllMembers) { + try { + const promises = this.client.guilds.map(guild => { + if (guild.available) return guild.members.fetch(); + // Return empty promise if guild is unavailable + return Promise.resolve(); }); - } else { - this.debug(`There are ${unavailableGuilds} unavailable guilds. Waiting for their GUILD_CREATE packets`); + await Promise.all(promises); + } catch (err) { + this.debug(`Failed to fetch all members before ready! ${err}\n${err.stack}`); + } } - return true; + this.triggerClientReady(); } /** * Causes the client to be marked as ready and emits the ready event. * @private */ - triggerReady() { - if (this.status === Status.READY) { - this.debug('Tried to mark self as ready, but already ready'); - return; - } - + triggerClientReady() { this.status = Status.READY; this.client.readyAt = new Date(); diff --git a/src/client/websocket/WebSocketShard.js b/src/client/websocket/WebSocketShard.js index f77635b0f..8d9afd313 100644 --- a/src/client/websocket/WebSocketShard.js +++ b/src/client/websocket/WebSocketShard.js @@ -4,23 +4,15 @@ const EventEmitter = require('events'); const WebSocket = require('../../WebSocket'); const { browser, Status, Events, ShardEvents, OPCodes, WSEvents } = require('../../util/Constants'); -let zstd; +const STATUS_KEYS = Object.keys(Status); +const CONNECTION_STATE = Object.keys(WebSocket.WebSocket); + let zlib; -if (browser) { - zlib = require('pako'); -} else { +if (!browser) { try { - zstd = require('zucc'); - if (!zstd.DecompressStream) zstd = null; - } catch (e) { - try { - zlib = require('zlib-sync'); - if (!zlib.Inflate) zlib = require('pako'); - } catch (err) { - zlib = require('pako'); - } - } + zlib = require('zlib-sync'); + } catch {} // eslint-disable-line no-empty } /** @@ -70,10 +62,10 @@ class WebSocketShard extends EventEmitter { this.sessionID = undefined; /** - * The previous 3 heartbeat pings of the shard (most recent first) - * @type {number[]} + * The previous heartbeat ping of the shard + * @type {number} */ - this.pings = []; + this.ping = -1; /** * The last time a ping was sent (a timestamp) @@ -128,7 +120,7 @@ class WebSocketShard extends EventEmitter { * @type {?NodeJS.Timer} * @private */ - Object.defineProperty(this, 'helloTimeout', { value: null, writable: true }); + Object.defineProperty(this, 'helloTimeout', { value: undefined, writable: true }); /** * If the manager attached its event handlers on the shard @@ -136,16 +128,27 @@ class WebSocketShard extends EventEmitter { * @private */ Object.defineProperty(this, 'eventsAttached', { value: false, writable: true }); - } - /** - * Average heartbeat ping of the websocket, obtained by averaging the WebSocketShard#pings property - * @type {number} - * @readonly - */ - get ping() { - const sum = this.pings.reduce((a, b) => a + b, 0); - return sum / this.pings.length; + /** + * A set of guild IDs this shard expects to receive + * @type {?Set} + * @private + */ + Object.defineProperty(this, 'expectedGuilds', { value: undefined, writable: true }); + + /** + * The ready timeout + * @type {?NodeJS.Timer} + * @private + */ + Object.defineProperty(this, 'readyTimeout', { value: undefined, writable: true }); + + /** + * Time when the WebSocket connection was opened + * @type {number} + * @private + */ + Object.defineProperty(this, 'connectedAt', { value: 0, writable: true }); } /** @@ -166,36 +169,35 @@ class WebSocketShard extends EventEmitter { connect() { const { gateway, client } = this.manager; - if (this.status === Status.READY && this.connection && this.connection.readyState === WebSocket.OPEN) { + if (this.connection && this.connection.readyState === WebSocket.OPEN && this.status === Status.READY) { return Promise.resolve(); } return new Promise((resolve, reject) => { - const onReady = () => { + const cleanup = () => { this.off(ShardEvents.CLOSE, onClose); + this.off(ShardEvents.READY, onReady); this.off(ShardEvents.RESUMED, onResumed); this.off(ShardEvents.INVALID_SESSION, onInvalid); + }; + + const onReady = () => { + cleanup(); resolve(); }; const onResumed = () => { - this.off(ShardEvents.CLOSE, onClose); - this.off(ShardEvents.READY, onReady); - this.off(ShardEvents.INVALID_SESSION, onInvalid); + cleanup(); resolve(); }; const onClose = event => { - this.off(ShardEvents.READY, onReady); - this.off(ShardEvents.RESUMED, onResumed); - this.off(ShardEvents.INVALID_SESSION, onInvalid); + cleanup(); reject(event); }; const onInvalid = () => { - this.off(ShardEvents.READY, onReady); - this.off(ShardEvents.RESUMED, onResumed); - this.off(ShardEvents.CLOSE, onClose); + cleanup(); // eslint-disable-next-line prefer-promise-reject-errors reject(); }; @@ -206,29 +208,35 @@ class WebSocketShard extends EventEmitter { this.once(ShardEvents.INVALID_SESSION, onInvalid); if (this.connection && this.connection.readyState === WebSocket.OPEN) { - this.identifyNew(); + this.debug('Connection found, attempting an immediate identify.'); + this.identify(); return; } - if (zstd) { - this.inflate = new zstd.DecompressStream(); - } else { + const wsQuery = { v: client.options.ws.version }; + + if (zlib) { this.inflate = new zlib.Inflate({ chunkSize: 65535, flush: zlib.Z_SYNC_FLUSH, to: WebSocket.encoding === 'json' ? 'string' : '', }); + wsQuery.compress = 'zlib-stream'; } - this.debug(`Trying to connect to ${gateway}, version ${client.options.ws.version}`); + this.debug( + `[CONNECT] + Gateway: ${gateway} + Version: ${client.options.ws.version} + Encoding: ${WebSocket.encoding} + Compression: ${zlib ? 'zlib-stream' : 'none'}`); this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING; this.setHelloTimeout(); - const ws = this.connection = WebSocket.create(gateway, { - v: client.options.ws.version, - compress: zstd ? 'zstd-stream' : 'zlib-stream', - }); + this.connectedAt = Date.now(); + + const ws = this.connection = WebSocket.create(gateway, wsQuery); ws.onopen = this.onOpen.bind(this); ws.onmessage = this.onMessage.bind(this); ws.onerror = this.onError.bind(this); @@ -241,7 +249,7 @@ class WebSocketShard extends EventEmitter { * @private */ onOpen() { - this.debug('Opened a connection to the gateway successfully.'); + this.debug(`[CONNECTED] ${this.connection.url} in ${Date.now() - this.connectedAt}ms`); this.status = Status.NEARLY; } @@ -252,10 +260,8 @@ class WebSocketShard extends EventEmitter { */ onMessage({ data }) { let raw; - if (zstd) { - raw = this.inflate.decompress(new Uint8Array(data).buffer); - } else { - if (data instanceof ArrayBuffer) data = new Uint8Array(data); + if (data instanceof ArrayBuffer) data = new Uint8Array(data); + if (zlib) { const l = data.length; const flush = l >= 4 && data[l - 4] === 0x00 && @@ -266,6 +272,8 @@ class WebSocketShard extends EventEmitter { this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH); if (!flush) return; raw = this.inflate.result; + } else { + raw = data; } let packet; try { @@ -281,19 +289,13 @@ class WebSocketShard extends EventEmitter { /** * Called whenever an error occurs with the WebSocket. - * @param {ErrorEvent|Object} event The error that occurred + * @param {ErrorEvent} event The error that occurred * @private */ onError(event) { const error = event && event.error ? event.error : event; if (!error) return; - if (error.message === 'uWs client connection error') { - this.debug('Received a uWs error. Closing the connection and reconnecting...'); - this.connection.close(4000); - return; - } - /** * Emitted whenever a shard's WebSocket encounters a connection error. * @event Client#shardError @@ -324,13 +326,13 @@ class WebSocketShard extends EventEmitter { * @private */ onClose(event) { - this.closeSequence = this.sequence; + if (this.sequence !== -1) this.closeSequence = this.sequence; this.sequence = -1; - this.debug(`WebSocket was closed. - Event Code: ${event.code} - Clean: ${event.wasClean} - Reason: ${event.reason || 'No reason received'}`); + this.debug(`[CLOSE] + Event Code: ${event.code} + Clean: ${event.wasClean} + Reason: ${event.reason || 'No reason received'}`); this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); @@ -360,16 +362,18 @@ class WebSocketShard extends EventEmitter { switch (packet.t) { case WSEvents.READY: /** - * Emitted when the shard becomes ready + * Emitted when the shard receives the READY payload and is now waiting for guilds * @event WebSocketShard#ready */ this.emit(ShardEvents.READY); this.sessionID = packet.d.session_id; - this.status = Status.READY; - this.debug(`READY | Session ${this.sessionID}.`); + this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id)); + this.status = Status.WAITING_FOR_GUILDS; + this.debug(`[READY] Session ${this.sessionID}.`); this.lastHeartbeatAcked = true; - this.sendHeartbeat(); + this.sendHeartbeat('ReadyHeartbeat'); + this.checkReady(); break; case WSEvents.RESUMED: { /** @@ -380,9 +384,10 @@ class WebSocketShard extends EventEmitter { this.status = Status.READY; const replayed = packet.s - this.closeSequence; - this.debug(`RESUMED | Session ${this.sessionID} | Replayed ${replayed} events.`); + this.debug(`[RESUMED] Session ${this.sessionID} | Replayed ${replayed} events.`); this.lastHeartbeatAcked = true; - this.sendHeartbeat(); + this.sendHeartbeat('ResumeHeartbeat'); + break; } } @@ -398,7 +403,7 @@ class WebSocketShard extends EventEmitter { this.connection.close(1001); break; case OPCodes.INVALID_SESSION: - this.debug(`Session invalidated. Resumable: ${packet.d}.`); + this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`); // If we can resume the session, do so immediately if (packet.d) { this.identifyResume(); @@ -417,13 +422,56 @@ class WebSocketShard extends EventEmitter { this.ackHeartbeat(); break; case OPCodes.HEARTBEAT: - this.sendHeartbeat(); + this.sendHeartbeat('HeartbeatRequest'); break; default: this.manager.handlePacket(packet, this); + if (this.status === Status.WAITING_FOR_GUILDS && packet.t === WSEvents.GUILD_CREATE) { + this.expectedGuilds.delete(packet.d.id); + this.checkReady(); + } } } + /** + * Checks if the shard can be marked as ready + * @private + */ + checkReady() { + // Step 0. Clear the ready timeout, if it exists + if (this.readyTimeout) { + this.manager.client.clearTimeout(this.readyTimeout); + this.readyTimeout = undefined; + } + // Step 1. If we don't have any other guilds pending, we are ready + if (!this.expectedGuilds.size) { + this.debug('Shard received all its guilds. Marking as fully ready.'); + this.status = Status.READY; + + /** + * Emitted when the shard is fully ready. + * This event is emitted if: + * * all guilds were received by this shard + * * the ready timeout expired, and some guilds are unavailable + * @event WebSocketShard#allReady + * @param {?Set} unavailableGuilds Set of unavailable guilds, if any + */ + this.emit(ShardEvents.ALL_READY); + return; + } + // Step 2. Create a 15s timeout that will mark the shard as ready if there are still unavailable guilds + this.readyTimeout = this.manager.client.setTimeout(() => { + this.debug(`Shard did not receive any more guild packets in 15 seconds. + Unavailable guild count: ${this.expectedGuilds.size}`); + + this.readyTimeout = undefined; + + this.status = Status.READY; + + this.emit(ShardEvents.ALL_READY, this.expectedGuilds); + }, 15000); + } + /** * Sets the HELLO packet timeout. * @param {number} [time] If set to -1, it will clear the hello timeout timeout @@ -434,7 +482,7 @@ class WebSocketShard extends EventEmitter { if (this.helloTimeout) { this.debug('Clearing the HELLO timeout.'); this.manager.client.clearTimeout(this.helloTimeout); - this.helloTimeout = null; + this.helloTimeout = undefined; } return; } @@ -455,26 +503,39 @@ class WebSocketShard extends EventEmitter { if (this.heartbeatInterval) { this.debug('Clearing the heartbeat interval.'); this.manager.client.clearInterval(this.heartbeatInterval); - this.heartbeatInterval = null; + this.heartbeatInterval = undefined; } return; } this.debug(`Setting a heartbeat interval for ${time}ms.`); + // Sanity checks + if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval); this.heartbeatInterval = this.manager.client.setInterval(() => this.sendHeartbeat(), time); } /** * Sends a heartbeat to the WebSocket. * If this shard didn't receive a heartbeat last time, it will destroy it and reconnect + * @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent + * @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully. * @private */ - sendHeartbeat() { - if (!this.lastHeartbeatAcked) { - this.debug("Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting."); + sendHeartbeat(tag = 'HeartbeatTimer', + ignoreHeartbeatAck = [Status.WAITING_FOR_GUILDS, Status.IDENTIFYING, Status.RESUMING].includes(this.status)) { + if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) { + this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`); + } else if (!this.lastHeartbeatAcked) { + this.debug( + `[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting. + Status : ${STATUS_KEYS[this.status]} + Sequence : ${this.sequence} + Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}` + ); this.destroy(4009); return; } - this.debug('Sending a heartbeat.'); + + this.debug(`[${tag}] Sending a heartbeat.`); this.lastHeartbeatAcked = false; this.lastPingTimestamp = Date.now(); this.send({ op: OPCodes.HEARTBEAT, d: this.sequence }, true); @@ -488,8 +549,7 @@ class WebSocketShard extends EventEmitter { this.lastHeartbeatAcked = true; const latency = Date.now() - this.lastPingTimestamp; this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`); - this.pings.unshift(latency); - if (this.pings.length > 3) this.pings.length = 3; + this.ping = latency; } /** @@ -508,18 +568,20 @@ class WebSocketShard extends EventEmitter { identifyNew() { const { client } = this.manager; if (!client.token) { - this.debug('No token available to identify a new session.'); + this.debug('[IDENTIFY] No token available to identify a new session.'); return; } + this.status = Status.IDENTIFYING; + // Clone the identify payload and assign the token and shard info const d = { ...client.options.ws, token: client.token, - shard: [this.id, Number(client.options.totalShardCount)], + shard: [this.id, Number(client.options.shardCount)], }; - this.debug(`Identifying as a new session. Shard ${this.id}/${client.options.totalShardCount}`); + this.debug(`[IDENTIFY] Shard ${this.id}/${client.options.shardCount}`); this.send({ op: OPCodes.IDENTIFY, d }, true); } @@ -529,12 +591,14 @@ class WebSocketShard extends EventEmitter { */ identifyResume() { if (!this.sessionID) { - this.debug('Warning: attempted to resume but no session ID was present; identifying as a new session.'); + this.debug('[RESUME] No session ID was present; identifying as a new session.'); this.identifyNew(); return; } - this.debug(`Attempting to resume session ${this.sessionID} at sequence ${this.closeSequence}`); + this.status = Status.RESUMING; + + this.debug(`[RESUME] Session ${this.sessionID}, sequence ${this.closeSequence}`); const d = { token: this.manager.client.token, @@ -600,15 +664,17 @@ class WebSocketShard extends EventEmitter { /** * Destroys this shard and closes its WebSocket connection. * @param {number} [closeCode=1000] The close code to use + * @param {boolean} [cleanup=false] If the shard should attempt a reconnect * @private */ - destroy(closeCode = 1000) { + destroy(closeCode = 1000, cleanup = false) { this.setHeartbeatTimer(-1); this.setHelloTimeout(-1); + // Close the WebSocket connection, if any - if (this.connection && this.connection.readyState !== WebSocket.CLOSED) { + if (this.connection && this.connection.readyState === WebSocket.OPEN) { this.connection.close(closeCode); - } else { + } else if (!cleanup) { /** * Emitted when a shard is destroyed, but no WebSocket connection was present. * @private @@ -616,9 +682,11 @@ class WebSocketShard extends EventEmitter { */ this.emit(ShardEvents.DESTROYED); } + this.connection = null; // Set the shard status this.status = Status.DISCONNECTED; + if (this.sequence !== -1) this.closeSequence = this.sequence; // Reset the sequence this.sequence = -1; // Reset the ratelimit data diff --git a/src/client/websocket/handlers/GUILD_BAN_ADD.js b/src/client/websocket/handlers/GUILD_BAN_ADD.js index 4fa89edcf..cbb60e13c 100644 --- a/src/client/websocket/handlers/GUILD_BAN_ADD.js +++ b/src/client/websocket/handlers/GUILD_BAN_ADD.js @@ -4,7 +4,7 @@ const { Events } = require('../../../util/Constants'); module.exports = (client, { d: data }) => { const guild = client.guilds.get(data.guild_id); - const user = client.users.get(data.user.id); + const user = client.users.add(data.user); /** * Emitted whenever a member is banned from a guild. diff --git a/src/client/websocket/handlers/GUILD_CREATE.js b/src/client/websocket/handlers/GUILD_CREATE.js index 9f13c6581..33cc0c239 100644 --- a/src/client/websocket/handlers/GUILD_CREATE.js +++ b/src/client/websocket/handlers/GUILD_CREATE.js @@ -8,20 +8,28 @@ module.exports = async (client, { d: data }, shard) => { if (!guild.available && !data.unavailable) { // A newly available guild guild._patch(data); - client.ws.checkReady(); + // If the client was ready before and we had unavailable guilds, fetch them + if (client.ws.status === Status.READY && client.options.fetchAllMembers) { + await guild.members.fetch().catch(err => + client.emit(Events.DEBUG, `Failed to fetch all members: ${err}\n${err.stack}`) + ); + } } } else { // A new guild data.shardID = shard.id; guild = client.guilds.add(data); - const emitEvent = client.ws.status === Status.READY; - if (emitEvent) { + if (client.ws.status === Status.READY) { /** * Emitted whenever the client joins a guild. * @event Client#guildCreate * @param {Guild} guild The created guild */ - if (client.options.fetchAllMembers) await guild.members.fetch(); + if (client.options.fetchAllMembers) { + await guild.members.fetch().catch(err => + client.emit(Events.DEBUG, `Failed to fetch all members: ${err}\n${err.stack}`) + ); + } client.emit(Events.GUILD_CREATE, guild); } } diff --git a/src/client/websocket/handlers/READY.js b/src/client/websocket/handlers/READY.js index 039f8f237..1612cfa02 100644 --- a/src/client/websocket/handlers/READY.js +++ b/src/client/websocket/handlers/READY.js @@ -16,6 +16,4 @@ module.exports = (client, { d: data }, shard) => { guild.shardID = shard.id; client.guilds.add(guild); } - - client.ws.checkReady(); }; diff --git a/src/sharding/Shard.js b/src/sharding/Shard.js index 6533b4f83..04d224db4 100644 --- a/src/sharding/Shard.js +++ b/src/sharding/Shard.js @@ -55,7 +55,7 @@ class Shard extends EventEmitter { this.env = Object.assign({}, process.env, { SHARDING_MANAGER: true, SHARDS: this.id, - TOTAL_SHARD_COUNT: this.manager.totalShards, + SHARD_COUNT: this.manager.totalShards, DISCORD_TOKEN: this.manager.token, }); diff --git a/src/sharding/ShardClientUtil.js b/src/sharding/ShardClientUtil.js index 4d0b4a43d..8ed1b9787 100644 --- a/src/sharding/ShardClientUtil.js +++ b/src/sharding/ShardClientUtil.js @@ -60,7 +60,7 @@ class ShardClientUtil { * @readonly */ get count() { - return this.client.options.totalShardCount; + return this.client.options.shardCount; } /** diff --git a/src/util/Constants.js b/src/util/Constants.js index 12ddfbf8f..06f12f36c 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -7,9 +7,10 @@ const browser = exports.browser = typeof window !== 'undefined'; /** * Options for a client. * @typedef {Object} ClientOptions - * @property {number|number[]} [shards] ID of the shard to run, or an array of shard IDs - * @property {number} [shardCount=1] Total number of shards that will be spawned by this Client - * @property {number} [totalShardCount=1] The total amount of shards used by all processes of this bot + * @property {number|number[]|string} [shards] ID of the shard to run, or an array of shard IDs. If not specified, + * the client will spawn {@link ClientOptions#shardCount} shards. If set to `auto`, it will fetch the + * recommended amount of shards from Discord and spawn that amount + * @property {number} [shardCount=1] The total amount of shards used by all processes of this bot * (e.g. recommended shard count, shard count of the ShardingManager) * @property {number} [messageCacheMaxSize=200] Maximum number of messages to cache per channel * (-1 or Infinity for unlimited - don't do this without message sweeping, otherwise memory usage will climb @@ -42,7 +43,6 @@ const browser = exports.browser = typeof window !== 'undefined'; */ exports.DefaultOptions = { shardCount: 1, - totalShardCount: 1, messageCacheMaxSize: 200, messageCacheLifetime: 0, messageSweepInterval: 0, @@ -163,6 +163,9 @@ exports.Endpoints = { * * IDLE: 3 * * NEARLY: 4 * * DISCONNECTED: 5 + * * WAITING_FOR_GUILDS: 6 + * * IDENTIFYING: 7 + * * RESUMING: 8 * @typedef {number} Status */ exports.Status = { @@ -172,6 +175,9 @@ exports.Status = { IDLE: 3, NEARLY: 4, DISCONNECTED: 5, + WAITING_FOR_GUILDS: 6, + IDENTIFYING: 7, + RESUMING: 8, }; /** @@ -279,6 +285,7 @@ exports.ShardEvents = { INVALID_SESSION: 'invalidSession', READY: 'ready', RESUMED: 'resumed', + ALL_READY: 'allReady', }; /** diff --git a/typings/index.d.ts b/typings/index.d.ts index 9eb0b4532..27898c189 100644 --- a/typings/index.d.ts +++ b/typings/index.d.ts @@ -1650,6 +1650,7 @@ declare module 'discord.js' { public on(event: WSEventType, listener: (data: any, shardID: number) => void): this; public once(event: WSEventType, listener: (data: any, shardID: number) => void): this; + private debug(message: string, shard?: WebSocketShard): void; private connect(): Promise; private createShards(): Promise; @@ -1657,9 +1658,9 @@ declare module 'discord.js' { private broadcast(packet: object): void; private destroy(): void; private _handleSessionLimit(remaining?: number, resetAfter?: number): Promise; - private handlePacket(packet?: object, shard?: WebSocketShard): Promise; - private checkReady(): boolean; - private triggerReady(): void; + private handlePacket(packet?: object, shard?: WebSocketShard): boolean; + private checkShardsReady(): Promise; + private triggerClientReady(): void; } export class WebSocketShard extends EventEmitter { @@ -1671,14 +1672,15 @@ declare module 'discord.js' { private lastHeartbeatAcked: boolean; private ratelimit: { queue: object[]; total: number; remaining: number; time: 60e3; timer: NodeJS.Timeout | null; }; private connection: WebSocket | null; - private helloTimeout: NodeJS.Timeout | null; + private helloTimeout: NodeJS.Timeout | undefined; private eventsAttached: boolean; + private expectedGuilds: Set | undefined; + private readyTimeout: NodeJS.Timeout | undefined; public manager: WebSocketManager; public id: number; public status: Status; - public pings: [number, number, number]; - public readonly ping: number; + public ping: number; private debug(message: string): void; private connect(): Promise; @@ -1687,6 +1689,7 @@ declare module 'discord.js' { private onError(error: ErrorEvent | object): void; private onClose(event: CloseEvent): void; private onPacket(packet: object): void; + private checkReady(): void; private setHelloTimeout(time?: number): void; private setHeartbeatTimer(time: number): void; private sendHeartbeat(): void; @@ -1703,12 +1706,14 @@ declare module 'discord.js' { public on(event: 'resumed', listener: () => void): this; public on(event: 'close', listener: (event: CloseEvent) => void): this; public on(event: 'invalidSession', listener: () => void): this; + public on(event: 'allReady', listener: (unavailableGuilds?: Set) => void): this; public on(event: string, listener: Function): this; public once(event: 'ready', listener: () => void): this; public once(event: 'resumed', listener: () => void): this; public once(event: 'close', listener: (event: CloseEvent) => void): this; public once(event: 'invalidSession', listener: () => void): this; + public once(event: 'allReady', listener: (unavailableGuilds?: Set) => void): this; public once(event: string, listener: Function): this; } @@ -2054,9 +2059,8 @@ declare module 'discord.js' { } interface ClientOptions { - shards?: number | number[]; - shardCount?: number | 'auto'; - totalShardCount?: number; + shards?: number | number[] | 'auto'; + shardCount?: number; messageCacheMaxSize?: number; messageCacheLifetime?: number; messageSweepInterval?: number;