mirror of
https://github.com/discordjs/discord.js.git
synced 2026-03-12 17:43:30 +01:00
src: sharding cleanup and checkReady rewrite (#3393)
* src: Step 1 of who knows how many * src: Remove accidentally committed test file * src: Remove useless added property in package.json * docs: Trailing spaces, come back >.> * src: Buhbye uws, we will miss you..not! * src: Move 'auto' shard selection from totalShardCount to shards * src: tweak * src: Filter out floats from shard IDs You want half of a shard or what? * src: Misc cleanup and bugfix for GUILD_BAN_ADD * src: Rewrite checkReady * src: Misse this while merging master into my branch * typings: Bring these up to date * typings: Forgot allReady event * src: Don't checkReady if the shard isn't waiting for guilds * src: Fix a possible bug for when the ws dies and the session becomes -1 * src: Hopefully fix last edge case that could case a shard to infinitely boot loop * src: Rename totalShardCount to shardCount * src: Small bugfix * src: Correct error message for shardCount being imvalid Co-Authored-By: bdistin <bdistin@gmail.com> * src: Small tweaks * src: If this doesn't fix the issues I'm gonna throw a brick at my PC * src: I swear, STOP BREAKING * src: *groans at a certain snake* * src: Use undefined instead of null on destroy in close event Setting it to null sets the close code to null, which causes a WebSocket error to be thrown. The error is thrown from WebSocket, although there is no connection alive. Fun times! * src: @SpaceEEC's requested changes * src: Remove zucc from discord.js Discord is removing support for it, sooo... Bye bye * src: Missed this * src: Apply @kyranet's suggestions Co-Authored-By: Antonio Román <kyradiscord@gmail.com> * src: @kyranet's suggestions * src: Remove pako, update debug messages - Pako is officially gone from both enviroments Install zlib-sync on node.js if you want it - Improve a few debug messages some more - Discover that internal sharding works in browsers but please don't do that
This commit is contained in:
@@ -4,23 +4,15 @@ const EventEmitter = require('events');
|
||||
const WebSocket = require('../../WebSocket');
|
||||
const { browser, Status, Events, ShardEvents, OPCodes, WSEvents } = require('../../util/Constants');
|
||||
|
||||
let zstd;
|
||||
const STATUS_KEYS = Object.keys(Status);
|
||||
const CONNECTION_STATE = Object.keys(WebSocket.WebSocket);
|
||||
|
||||
let zlib;
|
||||
|
||||
if (browser) {
|
||||
zlib = require('pako');
|
||||
} else {
|
||||
if (!browser) {
|
||||
try {
|
||||
zstd = require('zucc');
|
||||
if (!zstd.DecompressStream) zstd = null;
|
||||
} catch (e) {
|
||||
try {
|
||||
zlib = require('zlib-sync');
|
||||
if (!zlib.Inflate) zlib = require('pako');
|
||||
} catch (err) {
|
||||
zlib = require('pako');
|
||||
}
|
||||
}
|
||||
zlib = require('zlib-sync');
|
||||
} catch {} // eslint-disable-line no-empty
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -70,10 +62,10 @@ class WebSocketShard extends EventEmitter {
|
||||
this.sessionID = undefined;
|
||||
|
||||
/**
|
||||
* The previous 3 heartbeat pings of the shard (most recent first)
|
||||
* @type {number[]}
|
||||
* The previous heartbeat ping of the shard
|
||||
* @type {number}
|
||||
*/
|
||||
this.pings = [];
|
||||
this.ping = -1;
|
||||
|
||||
/**
|
||||
* The last time a ping was sent (a timestamp)
|
||||
@@ -128,7 +120,7 @@ class WebSocketShard extends EventEmitter {
|
||||
* @type {?NodeJS.Timer}
|
||||
* @private
|
||||
*/
|
||||
Object.defineProperty(this, 'helloTimeout', { value: null, writable: true });
|
||||
Object.defineProperty(this, 'helloTimeout', { value: undefined, writable: true });
|
||||
|
||||
/**
|
||||
* If the manager attached its event handlers on the shard
|
||||
@@ -136,16 +128,27 @@ class WebSocketShard extends EventEmitter {
|
||||
* @private
|
||||
*/
|
||||
Object.defineProperty(this, 'eventsAttached', { value: false, writable: true });
|
||||
}
|
||||
|
||||
/**
|
||||
* Average heartbeat ping of the websocket, obtained by averaging the WebSocketShard#pings property
|
||||
* @type {number}
|
||||
* @readonly
|
||||
*/
|
||||
get ping() {
|
||||
const sum = this.pings.reduce((a, b) => a + b, 0);
|
||||
return sum / this.pings.length;
|
||||
/**
|
||||
* A set of guild IDs this shard expects to receive
|
||||
* @type {?Set<string>}
|
||||
* @private
|
||||
*/
|
||||
Object.defineProperty(this, 'expectedGuilds', { value: undefined, writable: true });
|
||||
|
||||
/**
|
||||
* The ready timeout
|
||||
* @type {?NodeJS.Timer}
|
||||
* @private
|
||||
*/
|
||||
Object.defineProperty(this, 'readyTimeout', { value: undefined, writable: true });
|
||||
|
||||
/**
|
||||
* Time when the WebSocket connection was opened
|
||||
* @type {number}
|
||||
* @private
|
||||
*/
|
||||
Object.defineProperty(this, 'connectedAt', { value: 0, writable: true });
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -166,36 +169,35 @@ class WebSocketShard extends EventEmitter {
|
||||
connect() {
|
||||
const { gateway, client } = this.manager;
|
||||
|
||||
if (this.status === Status.READY && this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN && this.status === Status.READY) {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
return new Promise((resolve, reject) => {
|
||||
const onReady = () => {
|
||||
const cleanup = () => {
|
||||
this.off(ShardEvents.CLOSE, onClose);
|
||||
this.off(ShardEvents.READY, onReady);
|
||||
this.off(ShardEvents.RESUMED, onResumed);
|
||||
this.off(ShardEvents.INVALID_SESSION, onInvalid);
|
||||
};
|
||||
|
||||
const onReady = () => {
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
|
||||
const onResumed = () => {
|
||||
this.off(ShardEvents.CLOSE, onClose);
|
||||
this.off(ShardEvents.READY, onReady);
|
||||
this.off(ShardEvents.INVALID_SESSION, onInvalid);
|
||||
cleanup();
|
||||
resolve();
|
||||
};
|
||||
|
||||
const onClose = event => {
|
||||
this.off(ShardEvents.READY, onReady);
|
||||
this.off(ShardEvents.RESUMED, onResumed);
|
||||
this.off(ShardEvents.INVALID_SESSION, onInvalid);
|
||||
cleanup();
|
||||
reject(event);
|
||||
};
|
||||
|
||||
const onInvalid = () => {
|
||||
this.off(ShardEvents.READY, onReady);
|
||||
this.off(ShardEvents.RESUMED, onResumed);
|
||||
this.off(ShardEvents.CLOSE, onClose);
|
||||
cleanup();
|
||||
// eslint-disable-next-line prefer-promise-reject-errors
|
||||
reject();
|
||||
};
|
||||
@@ -206,29 +208,35 @@ class WebSocketShard extends EventEmitter {
|
||||
this.once(ShardEvents.INVALID_SESSION, onInvalid);
|
||||
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
this.identifyNew();
|
||||
this.debug('Connection found, attempting an immediate identify.');
|
||||
this.identify();
|
||||
return;
|
||||
}
|
||||
|
||||
if (zstd) {
|
||||
this.inflate = new zstd.DecompressStream();
|
||||
} else {
|
||||
const wsQuery = { v: client.options.ws.version };
|
||||
|
||||
if (zlib) {
|
||||
this.inflate = new zlib.Inflate({
|
||||
chunkSize: 65535,
|
||||
flush: zlib.Z_SYNC_FLUSH,
|
||||
to: WebSocket.encoding === 'json' ? 'string' : '',
|
||||
});
|
||||
wsQuery.compress = 'zlib-stream';
|
||||
}
|
||||
|
||||
this.debug(`Trying to connect to ${gateway}, version ${client.options.ws.version}`);
|
||||
this.debug(
|
||||
`[CONNECT]
|
||||
Gateway: ${gateway}
|
||||
Version: ${client.options.ws.version}
|
||||
Encoding: ${WebSocket.encoding}
|
||||
Compression: ${zlib ? 'zlib-stream' : 'none'}`);
|
||||
|
||||
this.status = this.status === Status.DISCONNECTED ? Status.RECONNECTING : Status.CONNECTING;
|
||||
this.setHelloTimeout();
|
||||
|
||||
const ws = this.connection = WebSocket.create(gateway, {
|
||||
v: client.options.ws.version,
|
||||
compress: zstd ? 'zstd-stream' : 'zlib-stream',
|
||||
});
|
||||
this.connectedAt = Date.now();
|
||||
|
||||
const ws = this.connection = WebSocket.create(gateway, wsQuery);
|
||||
ws.onopen = this.onOpen.bind(this);
|
||||
ws.onmessage = this.onMessage.bind(this);
|
||||
ws.onerror = this.onError.bind(this);
|
||||
@@ -241,7 +249,7 @@ class WebSocketShard extends EventEmitter {
|
||||
* @private
|
||||
*/
|
||||
onOpen() {
|
||||
this.debug('Opened a connection to the gateway successfully.');
|
||||
this.debug(`[CONNECTED] ${this.connection.url} in ${Date.now() - this.connectedAt}ms`);
|
||||
this.status = Status.NEARLY;
|
||||
}
|
||||
|
||||
@@ -252,10 +260,8 @@ class WebSocketShard extends EventEmitter {
|
||||
*/
|
||||
onMessage({ data }) {
|
||||
let raw;
|
||||
if (zstd) {
|
||||
raw = this.inflate.decompress(new Uint8Array(data).buffer);
|
||||
} else {
|
||||
if (data instanceof ArrayBuffer) data = new Uint8Array(data);
|
||||
if (data instanceof ArrayBuffer) data = new Uint8Array(data);
|
||||
if (zlib) {
|
||||
const l = data.length;
|
||||
const flush = l >= 4 &&
|
||||
data[l - 4] === 0x00 &&
|
||||
@@ -266,6 +272,8 @@ class WebSocketShard extends EventEmitter {
|
||||
this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH);
|
||||
if (!flush) return;
|
||||
raw = this.inflate.result;
|
||||
} else {
|
||||
raw = data;
|
||||
}
|
||||
let packet;
|
||||
try {
|
||||
@@ -281,19 +289,13 @@ class WebSocketShard extends EventEmitter {
|
||||
|
||||
/**
|
||||
* Called whenever an error occurs with the WebSocket.
|
||||
* @param {ErrorEvent|Object} event The error that occurred
|
||||
* @param {ErrorEvent} event The error that occurred
|
||||
* @private
|
||||
*/
|
||||
onError(event) {
|
||||
const error = event && event.error ? event.error : event;
|
||||
if (!error) return;
|
||||
|
||||
if (error.message === 'uWs client connection error') {
|
||||
this.debug('Received a uWs error. Closing the connection and reconnecting...');
|
||||
this.connection.close(4000);
|
||||
return;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emitted whenever a shard's WebSocket encounters a connection error.
|
||||
* @event Client#shardError
|
||||
@@ -324,13 +326,13 @@ class WebSocketShard extends EventEmitter {
|
||||
* @private
|
||||
*/
|
||||
onClose(event) {
|
||||
this.closeSequence = this.sequence;
|
||||
if (this.sequence !== -1) this.closeSequence = this.sequence;
|
||||
this.sequence = -1;
|
||||
|
||||
this.debug(`WebSocket was closed.
|
||||
Event Code: ${event.code}
|
||||
Clean: ${event.wasClean}
|
||||
Reason: ${event.reason || 'No reason received'}`);
|
||||
this.debug(`[CLOSE]
|
||||
Event Code: ${event.code}
|
||||
Clean: ${event.wasClean}
|
||||
Reason: ${event.reason || 'No reason received'}`);
|
||||
|
||||
this.setHeartbeatTimer(-1);
|
||||
this.setHelloTimeout(-1);
|
||||
@@ -360,16 +362,18 @@ class WebSocketShard extends EventEmitter {
|
||||
switch (packet.t) {
|
||||
case WSEvents.READY:
|
||||
/**
|
||||
* Emitted when the shard becomes ready
|
||||
* Emitted when the shard receives the READY payload and is now waiting for guilds
|
||||
* @event WebSocketShard#ready
|
||||
*/
|
||||
this.emit(ShardEvents.READY);
|
||||
|
||||
this.sessionID = packet.d.session_id;
|
||||
this.status = Status.READY;
|
||||
this.debug(`READY | Session ${this.sessionID}.`);
|
||||
this.expectedGuilds = new Set(packet.d.guilds.map(d => d.id));
|
||||
this.status = Status.WAITING_FOR_GUILDS;
|
||||
this.debug(`[READY] Session ${this.sessionID}.`);
|
||||
this.lastHeartbeatAcked = true;
|
||||
this.sendHeartbeat();
|
||||
this.sendHeartbeat('ReadyHeartbeat');
|
||||
this.checkReady();
|
||||
break;
|
||||
case WSEvents.RESUMED: {
|
||||
/**
|
||||
@@ -380,9 +384,10 @@ class WebSocketShard extends EventEmitter {
|
||||
|
||||
this.status = Status.READY;
|
||||
const replayed = packet.s - this.closeSequence;
|
||||
this.debug(`RESUMED | Session ${this.sessionID} | Replayed ${replayed} events.`);
|
||||
this.debug(`[RESUMED] Session ${this.sessionID} | Replayed ${replayed} events.`);
|
||||
this.lastHeartbeatAcked = true;
|
||||
this.sendHeartbeat();
|
||||
this.sendHeartbeat('ResumeHeartbeat');
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -398,7 +403,7 @@ class WebSocketShard extends EventEmitter {
|
||||
this.connection.close(1001);
|
||||
break;
|
||||
case OPCodes.INVALID_SESSION:
|
||||
this.debug(`Session invalidated. Resumable: ${packet.d}.`);
|
||||
this.debug(`[INVALID SESSION] Resumable: ${packet.d}.`);
|
||||
// If we can resume the session, do so immediately
|
||||
if (packet.d) {
|
||||
this.identifyResume();
|
||||
@@ -417,13 +422,56 @@ class WebSocketShard extends EventEmitter {
|
||||
this.ackHeartbeat();
|
||||
break;
|
||||
case OPCodes.HEARTBEAT:
|
||||
this.sendHeartbeat();
|
||||
this.sendHeartbeat('HeartbeatRequest');
|
||||
break;
|
||||
default:
|
||||
this.manager.handlePacket(packet, this);
|
||||
if (this.status === Status.WAITING_FOR_GUILDS && packet.t === WSEvents.GUILD_CREATE) {
|
||||
this.expectedGuilds.delete(packet.d.id);
|
||||
this.checkReady();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks if the shard can be marked as ready
|
||||
* @private
|
||||
*/
|
||||
checkReady() {
|
||||
// Step 0. Clear the ready timeout, if it exists
|
||||
if (this.readyTimeout) {
|
||||
this.manager.client.clearTimeout(this.readyTimeout);
|
||||
this.readyTimeout = undefined;
|
||||
}
|
||||
// Step 1. If we don't have any other guilds pending, we are ready
|
||||
if (!this.expectedGuilds.size) {
|
||||
this.debug('Shard received all its guilds. Marking as fully ready.');
|
||||
this.status = Status.READY;
|
||||
|
||||
/**
|
||||
* Emitted when the shard is fully ready.
|
||||
* This event is emitted if:
|
||||
* * all guilds were received by this shard
|
||||
* * the ready timeout expired, and some guilds are unavailable
|
||||
* @event WebSocketShard#allReady
|
||||
* @param {?Set<string>} unavailableGuilds Set of unavailable guilds, if any
|
||||
*/
|
||||
this.emit(ShardEvents.ALL_READY);
|
||||
return;
|
||||
}
|
||||
// Step 2. Create a 15s timeout that will mark the shard as ready if there are still unavailable guilds
|
||||
this.readyTimeout = this.manager.client.setTimeout(() => {
|
||||
this.debug(`Shard did not receive any more guild packets in 15 seconds.
|
||||
Unavailable guild count: ${this.expectedGuilds.size}`);
|
||||
|
||||
this.readyTimeout = undefined;
|
||||
|
||||
this.status = Status.READY;
|
||||
|
||||
this.emit(ShardEvents.ALL_READY, this.expectedGuilds);
|
||||
}, 15000);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the HELLO packet timeout.
|
||||
* @param {number} [time] If set to -1, it will clear the hello timeout timeout
|
||||
@@ -434,7 +482,7 @@ class WebSocketShard extends EventEmitter {
|
||||
if (this.helloTimeout) {
|
||||
this.debug('Clearing the HELLO timeout.');
|
||||
this.manager.client.clearTimeout(this.helloTimeout);
|
||||
this.helloTimeout = null;
|
||||
this.helloTimeout = undefined;
|
||||
}
|
||||
return;
|
||||
}
|
||||
@@ -455,26 +503,39 @@ class WebSocketShard extends EventEmitter {
|
||||
if (this.heartbeatInterval) {
|
||||
this.debug('Clearing the heartbeat interval.');
|
||||
this.manager.client.clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
this.heartbeatInterval = undefined;
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.debug(`Setting a heartbeat interval for ${time}ms.`);
|
||||
// Sanity checks
|
||||
if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = this.manager.client.setInterval(() => this.sendHeartbeat(), time);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a heartbeat to the WebSocket.
|
||||
* If this shard didn't receive a heartbeat last time, it will destroy it and reconnect
|
||||
* @param {string} [tag='HeartbeatTimer'] What caused this heartbeat to be sent
|
||||
* @param {boolean} [ignoreHeartbeatAck] If we should send the heartbeat forcefully.
|
||||
* @private
|
||||
*/
|
||||
sendHeartbeat() {
|
||||
if (!this.lastHeartbeatAcked) {
|
||||
this.debug("Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting.");
|
||||
sendHeartbeat(tag = 'HeartbeatTimer',
|
||||
ignoreHeartbeatAck = [Status.WAITING_FOR_GUILDS, Status.IDENTIFYING, Status.RESUMING].includes(this.status)) {
|
||||
if (ignoreHeartbeatAck && !this.lastHeartbeatAcked) {
|
||||
this.debug(`[${tag}] Didn't process heartbeat ack yet but we are still connected. Sending one now.`);
|
||||
} else if (!this.lastHeartbeatAcked) {
|
||||
this.debug(
|
||||
`[${tag}] Didn't receive a heartbeat ack last time, assuming zombie connection. Destroying and reconnecting.
|
||||
Status : ${STATUS_KEYS[this.status]}
|
||||
Sequence : ${this.sequence}
|
||||
Connection State: ${this.connection ? CONNECTION_STATE[this.connection.readyState] : 'No Connection??'}`
|
||||
);
|
||||
this.destroy(4009);
|
||||
return;
|
||||
}
|
||||
this.debug('Sending a heartbeat.');
|
||||
|
||||
this.debug(`[${tag}] Sending a heartbeat.`);
|
||||
this.lastHeartbeatAcked = false;
|
||||
this.lastPingTimestamp = Date.now();
|
||||
this.send({ op: OPCodes.HEARTBEAT, d: this.sequence }, true);
|
||||
@@ -488,8 +549,7 @@ class WebSocketShard extends EventEmitter {
|
||||
this.lastHeartbeatAcked = true;
|
||||
const latency = Date.now() - this.lastPingTimestamp;
|
||||
this.debug(`Heartbeat acknowledged, latency of ${latency}ms.`);
|
||||
this.pings.unshift(latency);
|
||||
if (this.pings.length > 3) this.pings.length = 3;
|
||||
this.ping = latency;
|
||||
}
|
||||
|
||||
/**
|
||||
@@ -508,18 +568,20 @@ class WebSocketShard extends EventEmitter {
|
||||
identifyNew() {
|
||||
const { client } = this.manager;
|
||||
if (!client.token) {
|
||||
this.debug('No token available to identify a new session.');
|
||||
this.debug('[IDENTIFY] No token available to identify a new session.');
|
||||
return;
|
||||
}
|
||||
|
||||
this.status = Status.IDENTIFYING;
|
||||
|
||||
// Clone the identify payload and assign the token and shard info
|
||||
const d = {
|
||||
...client.options.ws,
|
||||
token: client.token,
|
||||
shard: [this.id, Number(client.options.totalShardCount)],
|
||||
shard: [this.id, Number(client.options.shardCount)],
|
||||
};
|
||||
|
||||
this.debug(`Identifying as a new session. Shard ${this.id}/${client.options.totalShardCount}`);
|
||||
this.debug(`[IDENTIFY] Shard ${this.id}/${client.options.shardCount}`);
|
||||
this.send({ op: OPCodes.IDENTIFY, d }, true);
|
||||
}
|
||||
|
||||
@@ -529,12 +591,14 @@ class WebSocketShard extends EventEmitter {
|
||||
*/
|
||||
identifyResume() {
|
||||
if (!this.sessionID) {
|
||||
this.debug('Warning: attempted to resume but no session ID was present; identifying as a new session.');
|
||||
this.debug('[RESUME] No session ID was present; identifying as a new session.');
|
||||
this.identifyNew();
|
||||
return;
|
||||
}
|
||||
|
||||
this.debug(`Attempting to resume session ${this.sessionID} at sequence ${this.closeSequence}`);
|
||||
this.status = Status.RESUMING;
|
||||
|
||||
this.debug(`[RESUME] Session ${this.sessionID}, sequence ${this.closeSequence}`);
|
||||
|
||||
const d = {
|
||||
token: this.manager.client.token,
|
||||
@@ -600,15 +664,17 @@ class WebSocketShard extends EventEmitter {
|
||||
/**
|
||||
* Destroys this shard and closes its WebSocket connection.
|
||||
* @param {number} [closeCode=1000] The close code to use
|
||||
* @param {boolean} [cleanup=false] If the shard should attempt a reconnect
|
||||
* @private
|
||||
*/
|
||||
destroy(closeCode = 1000) {
|
||||
destroy(closeCode = 1000, cleanup = false) {
|
||||
this.setHeartbeatTimer(-1);
|
||||
this.setHelloTimeout(-1);
|
||||
|
||||
// Close the WebSocket connection, if any
|
||||
if (this.connection && this.connection.readyState !== WebSocket.CLOSED) {
|
||||
if (this.connection && this.connection.readyState === WebSocket.OPEN) {
|
||||
this.connection.close(closeCode);
|
||||
} else {
|
||||
} else if (!cleanup) {
|
||||
/**
|
||||
* Emitted when a shard is destroyed, but no WebSocket connection was present.
|
||||
* @private
|
||||
@@ -616,9 +682,11 @@ class WebSocketShard extends EventEmitter {
|
||||
*/
|
||||
this.emit(ShardEvents.DESTROYED);
|
||||
}
|
||||
|
||||
this.connection = null;
|
||||
// Set the shard status
|
||||
this.status = Status.DISCONNECTED;
|
||||
if (this.sequence !== -1) this.closeSequence = this.sequence;
|
||||
// Reset the sequence
|
||||
this.sequence = -1;
|
||||
// Reset the ratelimit data
|
||||
|
||||
Reference in New Issue
Block a user