mirror of
https://github.com/discordjs/discord.js.git
synced 2026-03-13 18:13:29 +01:00
feat: Internal sharding (#2902)
* internal sharding * ready event * the square deal * the new deal * the second new deal * add actual documentation * the new freedom * the great society * federal intervention * some of requested changes * i ran out of things to call these * destroy this * fix: Client#uptime went missing * fix(Client): destroy the client on login failure This may happen duo invalid sharding config / invalid token / user requested destroy * fix(Client): reject login promise when the client is destroyed before ready * fix(WebSocketManager): remove redundancy in destroy method (#2491) * typo(ErrorMessages): duo -> duo to * typo(ErrorMessages): duo -> due * fix: docs and options * docs(WebSocketManager): WebSockethard -> WebSocketShard (#2502) * fix(ClientUser): lazily load to account for extended user structure (#2501) * docs(WebSocketShard): document class to make it visible in documentation (#2504) * fix: WebSocketShard#reconnect * fix: presenceUpdate & userUpdate * presenceUpdate wasn't really being handled at all * userUpdate handled incorrectly because as of v7 in the Discord API, it comes inside presenceUpdate * re-add raw event * member is now part of message create payload * feat: Add functionality to support multiple servers with different shards (#2395) * Added functionallity to spawn multiple sharding managers due to adding start and end shards * Small fixes and limiting shard amount to max recommended * Forgot a check in spawn() * Fixed indentation * Removed optiosn object documentation for totalShards * More fixes and a check that the startShard + amount doesnt go over the recommended shard amount * fix getting max recommended * Removed async from constructor (my fault) * Changed start and end shard to a shardList or "auto" + fixed some brainfarts with isNaN * Changed the loop and totalShard count calculation * shards are actually 0 based * Fixed a problem with the gateway and handled some range errors and type errors * Changed Number.isNan to isNaN and changed a few Integer checks to use Number.isInteger * Added check if shardList contains smth greater than totalShards; made spawn use totalShards again; shardList will be ignored and rebuild if totalShards is 'auto'; fixed docs * ShardingManager#spawn now uses a for..of loop; fixed the if statement inside the new for..of loop to still work as intended; made the totalShards be set to a new amount if smth manual is put into ShardingManager#spawn just like before; Fixed some spelling * internal sharding * ready event * the square deal * the new deal * the second new deal * add actual documentation * the new freedom * the great society * federal intervention * some of requested changes * i ran out of things to call these * destroy this * fix: Client#uptime went missing * fix(Client): destroy the client on login failure This may happen duo invalid sharding config / invalid token / user requested destroy * fix(Client): reject login promise when the client is destroyed before ready * fix(WebSocketManager): remove redundancy in destroy method (#2491) * typo(ErrorMessages): duo -> duo to * typo(ErrorMessages): duo -> due * fix: docs and options * docs(WebSocketManager): WebSockethard -> WebSocketShard (#2502) * fix(ClientUser): lazily load to account for extended user structure (#2501) * docs(WebSocketShard): document class to make it visible in documentation (#2504) * fix: WebSocketShard#reconnect * fix: presenceUpdate & userUpdate * presenceUpdate wasn't really being handled at all * userUpdate handled incorrectly because as of v7 in the Discord API, it comes inside presenceUpdate * Internal Sharding adaptation Adapted to internal sharding Fixed a bug where non ready invalidated sessions wouldnt respawn * Fixed shardCount not retrieving * Fixing style removed unnecessary parenthesis * Fixing and rebasing lets hope i didnt dun hecklered it * Fixing my own retardation * Thanks git rebase * fix: assigning member in message create payload * fix: resumes * fix: IS wont give up reconnecting now * docs: add missing docs mostly * fix: found lost methods * fix: WebSocketManager#broadcast check if shard exists * fix: ShardClientUtil#id returning undefined * feat: handle new session rate limits (#2796) * feat: handle new session rate limits * i have no idea what i was doing last night * fix if statement weirdness * fix: re-add presence parsing from ClientOptions (#2893) * resolve conflicts * typings: missing typings * re-add missing linter rule * fix: replacing ClientUser wrongly * address unecessary performance waste * docs: missing disconnect event * fix(typings): Fix 2 issues with typings (#2909) * (Typings) Update typings to reflect current ClientOptions * fix(Typings) fixes a bug with Websockets and DOM Types * fix travis * feat: allow setting presence per shard * add WebSocketManager#shardX events * adjust typings, docs and performance issues * readjust shard events, now provide shardId parameter instead * fix: ready event should check shardCount, not actualShardCount * fix: re-add replayed parameter of Client#resume * fix(Sharding): fixes several things in Internal Sharding (#2914) * fix(Sharding) fixes several things in Internal Sharding * add default value for shards property * better implement checking for shards array * fix travis & some casing * split shard count into 2 words * update to latest Internal Sharding, fix requested changes * make sure totalShardCount is a number * fix comment * fix small typo * dynamically set totalShardCount if either shards or shardCount is provided * consistency: rename shardID to shardId * remove Client#shardIds * fix: typo in GuildIntegrationsUpdate handler * fix: incorrect packet data being passed in some events (#2919) * fix: edgecase of ShardingManager and totalShardCount (#2918) * fix: Client#userUpdate being passed wrong parameter and fix a potential edgecase of returning null in ClientUser#edit from this event * fix consistency and typings issues * consistency: shardId instances renamed to shardID * typings: fix typings regarding WebSocket * style(.eslintrc): remove additional whitespace * fix(Client): remove ondisconnect handler on timeout * docs(BaseClient): fix typo of Immediate * nitpick: typings, private fields and methods * typo: improve grammar a bit * fix: error assigning client in WebSocketManager * typo: actually spell milliseconds properly
This commit is contained in:
473
src/client/websocket/WebSocketShard.js
Normal file
473
src/client/websocket/WebSocketShard.js
Normal file
@@ -0,0 +1,473 @@
|
||||
const EventEmitter = require('events');
|
||||
const WebSocket = require('../../WebSocket');
|
||||
const { Status, Events, OPCodes, WSEvents, WSCodes } = require('../../util/Constants');
|
||||
let zlib;
|
||||
try {
|
||||
zlib = require('zlib-sync');
|
||||
if (!zlib.Inflate) zlib = require('pako');
|
||||
} catch (err) {
|
||||
zlib = require('pako');
|
||||
}
|
||||
|
||||
/**
|
||||
* Represents a Shard's Websocket connection.
|
||||
*/
|
||||
class WebSocketShard extends EventEmitter {
|
||||
constructor(manager, id, oldShard) {
|
||||
super();
|
||||
|
||||
/**
|
||||
* The WebSocket Manager of this connection
|
||||
* @type {WebSocketManager}
|
||||
*/
|
||||
this.manager = manager;
|
||||
|
||||
/**
|
||||
* The id of the this shard.
|
||||
* @type {number}
|
||||
*/
|
||||
this.id = id;
|
||||
|
||||
/**
|
||||
* The current status of the shard
|
||||
* @type {Status}
|
||||
*/
|
||||
this.status = Status.IDLE;
|
||||
|
||||
/**
|
||||
* The current sequence of the WebSocket
|
||||
* @type {number}
|
||||
* @private
|
||||
*/
|
||||
this.sequence = oldShard ? oldShard.sequence : -1;
|
||||
|
||||
/**
|
||||
* The sequence on WebSocket close
|
||||
* @type {number}
|
||||
* @private
|
||||
*/
|
||||
this.closeSequence = 0;
|
||||
|
||||
/**
|
||||
* The current session id of the WebSocket
|
||||
* @type {?string}
|
||||
* @private
|
||||
*/
|
||||
this.sessionID = oldShard && oldShard.sessionID;
|
||||
|
||||
/**
|
||||
* Previous heartbeat pings of the websocket (most recent first, limited to three elements)
|
||||
* @type {number[]}
|
||||
*/
|
||||
this.pings = [];
|
||||
|
||||
/**
|
||||
* The last time a ping was sent (a timestamp)
|
||||
* @type {number}
|
||||
* @private
|
||||
*/
|
||||
this.lastPingTimestamp = -1;
|
||||
|
||||
/**
|
||||
* List of servers the shard is connected to
|
||||
* @type {string[]}
|
||||
* @private
|
||||
*/
|
||||
this.trace = [];
|
||||
|
||||
/**
|
||||
* Contains the rate limit queue and metadata
|
||||
* @type {Object}
|
||||
* @private
|
||||
*/
|
||||
this.ratelimit = {
|
||||
queue: [],
|
||||
total: 120,
|
||||
remaining: 120,
|
||||
time: 60e3,
|
||||
timer: null,
|
||||
};
|
||||
|
||||
/**
|
||||
* The WebSocket connection for the current shard
|
||||
* @type {?WebSocket}
|
||||
* @private
|
||||
*/
|
||||
this.ws = null;
|
||||
|
||||
/**
|
||||
* @external Inflate
|
||||
* @see {@link https://www.npmjs.com/package/zlib-sync}
|
||||
*/
|
||||
|
||||
/**
|
||||
* The compression to use
|
||||
* @type {?Inflate}
|
||||
* @private
|
||||
*/
|
||||
this.inflate = null;
|
||||
|
||||
this.connect();
|
||||
}
|
||||
|
||||
/**
|
||||
* Average heartbeat ping of the websocket, obtained by averaging the WebSocketShard#pings property
|
||||
* @type {number}
|
||||
* @readonly
|
||||
*/
|
||||
get ping() {
|
||||
const sum = this.pings.reduce((a, b) => a + b, 0);
|
||||
return sum / this.pings.length;
|
||||
}
|
||||
|
||||
/**
|
||||
* Emits a debug event.
|
||||
* @param {string} message Debug message
|
||||
* @private
|
||||
*/
|
||||
debug(message) {
|
||||
this.manager.debug(`[shard ${this.id}] ${message}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a heartbeat or sets an interval for sending heartbeats.
|
||||
* @param {number} [time] If -1, clears the interval, any other number sets an interval
|
||||
* If no value is given, a heartbeat will be sent instantly
|
||||
* @private
|
||||
*/
|
||||
heartbeat(time) {
|
||||
if (!isNaN(time)) {
|
||||
if (time === -1) {
|
||||
this.debug('Clearing heartbeat interval');
|
||||
this.manager.client.clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
} else {
|
||||
this.debug(`Setting a heartbeat interval for ${time}ms`);
|
||||
this.heartbeatInterval = this.manager.client.setInterval(() => this.heartbeat(), time);
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
this.debug('Sending a heartbeat');
|
||||
this.lastPingTimestamp = Date.now();
|
||||
this.send({
|
||||
op: OPCodes.HEARTBEAT,
|
||||
d: this.sequence,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Acknowledges a heartbeat.
|
||||
* @private
|
||||
*/
|
||||
ackHeartbeat() {
|
||||
const latency = Date.now() - this.lastPingTimestamp;
|
||||
this.debug(`Heartbeat acknowledged, latency of ${latency}ms`);
|
||||
this.pings.unshift(latency);
|
||||
if (this.pings.length > 3) this.pings.length = 3;
|
||||
}
|
||||
|
||||
/**
|
||||
* Connects the shard to a gateway.
|
||||
* @private
|
||||
*/
|
||||
connect() {
|
||||
this.inflate = new zlib.Inflate({
|
||||
chunkSize: 65535,
|
||||
flush: zlib.Z_SYNC_FLUSH,
|
||||
to: WebSocket.encoding === 'json' ? 'string' : '',
|
||||
});
|
||||
const gateway = this.manager.gateway;
|
||||
this.debug(`Connecting to ${gateway}`);
|
||||
const ws = this.ws = WebSocket.create(gateway, {
|
||||
v: this.manager.client.options.ws.version,
|
||||
compress: 'zlib-stream',
|
||||
});
|
||||
ws.onopen = this.onOpen.bind(this);
|
||||
ws.onmessage = this.onMessage.bind(this);
|
||||
ws.onerror = this.onError.bind(this);
|
||||
ws.onclose = this.onClose.bind(this);
|
||||
this.status = Status.CONNECTING;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a packet is received
|
||||
* @param {Object} packet Packet received
|
||||
* @returns {boolean}
|
||||
* @private
|
||||
*/
|
||||
onPacket(packet) {
|
||||
if (!packet) {
|
||||
this.debug('Received null packet');
|
||||
return false;
|
||||
}
|
||||
|
||||
this.manager.client.emit(Events.RAW, packet, this.id);
|
||||
|
||||
switch (packet.t) {
|
||||
case WSEvents.READY:
|
||||
this.sessionID = packet.d.session_id;
|
||||
this.trace = packet.d._trace;
|
||||
this.status = Status.READY;
|
||||
this.debug(`READY ${this.trace.join(' -> ')} ${this.sessionID}`);
|
||||
this.heartbeat();
|
||||
break;
|
||||
case WSEvents.RESUMED: {
|
||||
this.trace = packet.d._trace;
|
||||
this.status = Status.READY;
|
||||
const replayed = packet.s - this.sequence;
|
||||
this.debug(`RESUMED ${this.trace.join(' -> ')} | replayed ${replayed} events.`);
|
||||
this.heartbeat();
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (packet.s > this.sequence) this.sequence = packet.s;
|
||||
|
||||
switch (packet.op) {
|
||||
case OPCodes.HELLO:
|
||||
this.identify();
|
||||
return this.heartbeat(packet.d.heartbeat_interval);
|
||||
case OPCodes.RECONNECT:
|
||||
return this.reconnect();
|
||||
case OPCodes.INVALID_SESSION:
|
||||
if (!packet.d) this.sessionID = null;
|
||||
this.sequence = -1;
|
||||
this.debug('Session invalidated');
|
||||
return this.reconnect(Events.INVALIDATED);
|
||||
case OPCodes.HEARTBEAT_ACK:
|
||||
return this.ackHeartbeat();
|
||||
case OPCodes.HEARTBEAT:
|
||||
return this.heartbeat();
|
||||
default:
|
||||
return this.manager.handlePacket(packet, this);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a connection is opened to the gateway.
|
||||
* @param {Event} event Received open event
|
||||
* @private
|
||||
*/
|
||||
onOpen() {
|
||||
this.debug('Connection open');
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a message is received.
|
||||
* @param {Event} event Event received
|
||||
* @private
|
||||
*/
|
||||
onMessage({ data }) {
|
||||
if (data instanceof ArrayBuffer) data = new Uint8Array(data);
|
||||
const l = data.length;
|
||||
const flush = l >= 4 &&
|
||||
data[l - 4] === 0x00 &&
|
||||
data[l - 3] === 0x00 &&
|
||||
data[l - 2] === 0xFF &&
|
||||
data[l - 1] === 0xFF;
|
||||
|
||||
this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH);
|
||||
if (!flush) return;
|
||||
let packet;
|
||||
try {
|
||||
packet = WebSocket.unpack(this.inflate.result);
|
||||
this.manager.client.emit(Events.RAW, packet);
|
||||
} catch (err) {
|
||||
this.manager.client.emit(Events.ERROR, err);
|
||||
return;
|
||||
}
|
||||
if (packet.t === 'READY') {
|
||||
/**
|
||||
* Emitted when a shard becomes ready
|
||||
* @event WebSocketShard#ready
|
||||
*/
|
||||
this.emit(Events.READY);
|
||||
|
||||
/**
|
||||
* Emitted when a shard becomes ready
|
||||
* @event Client#shardReady
|
||||
* @param {number} shardID The id of the shard
|
||||
*/
|
||||
this.manager.client.emit(Events.SHARD_READY, this.id);
|
||||
}
|
||||
this.onPacket(packet);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever an error occurs with the WebSocket.
|
||||
* @param {Error} error The error that occurred
|
||||
* @private
|
||||
*/
|
||||
onError(error) {
|
||||
if (error && error.message === 'uWs client connection error') {
|
||||
this.reconnect();
|
||||
return;
|
||||
}
|
||||
this.emit(Events.INVALIDATED);
|
||||
this.manager.client.emit(Events.ERROR, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* @external CloseEvent
|
||||
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Called whenever a connection to the gateway is closed.
|
||||
* @param {CloseEvent} event Close event that was received
|
||||
* @private
|
||||
*/
|
||||
onClose(event) {
|
||||
this.closeSequence = this.sequence;
|
||||
this.emit('close', event);
|
||||
if (event.code === 1000 ? this.expectingClose : WSCodes[event.code]) {
|
||||
/**
|
||||
* Emitted when the client's WebSocket disconnects and will no longer attempt to reconnect.
|
||||
* @event Client#disconnect
|
||||
* @param {CloseEvent} event The WebSocket close event
|
||||
* @param {number} shardID The shard that disconnected
|
||||
*/
|
||||
this.manager.client.emit(Events.DISCONNECT, event, this.id);
|
||||
|
||||
this.debug(WSCodes[event.code]);
|
||||
return;
|
||||
}
|
||||
this.reconnect(Events.INVALIDATED);
|
||||
}
|
||||
|
||||
/**
|
||||
* Identifies the client on a connection.
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
identify() {
|
||||
return this.sessionID ? this.identifyResume() : this.identifyNew();
|
||||
}
|
||||
|
||||
/**
|
||||
* Identifies as a new connection on the gateway.
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
identifyNew() {
|
||||
if (!this.manager.client.token) {
|
||||
this.debug('No token available to identify a new session with');
|
||||
return;
|
||||
}
|
||||
// Clone the generic payload and assign the token
|
||||
const d = Object.assign({ token: this.manager.client.token }, this.manager.client.options.ws);
|
||||
|
||||
const { totalShardCount } = this.manager.client.options;
|
||||
d.shard = [this.id, Number(totalShardCount)];
|
||||
|
||||
// Send the payload
|
||||
this.debug('Identifying as a new session');
|
||||
this.send({ op: OPCodes.IDENTIFY, d });
|
||||
}
|
||||
|
||||
/**
|
||||
* Resumes a session on the gateway.
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
identifyResume() {
|
||||
if (!this.sessionID) {
|
||||
this.debug('Warning: wanted to resume but session ID not available; identifying as a new session instead');
|
||||
return this.identifyNew();
|
||||
}
|
||||
this.debug(`Attempting to resume session ${this.sessionID}`);
|
||||
|
||||
const d = {
|
||||
token: this.manager.client.token,
|
||||
session_id: this.sessionID,
|
||||
seq: this.sequence,
|
||||
};
|
||||
|
||||
return this.send({
|
||||
op: OPCodes.RESUME,
|
||||
d,
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* Adds data to the queue to be sent.
|
||||
* @param {Object} data Packet to send
|
||||
* @returns {void}
|
||||
*/
|
||||
send(data) {
|
||||
this.ratelimit.queue.push(data);
|
||||
this.processQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends data, bypassing the queue.
|
||||
* @param {Object} data Packet to send
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
_send(data) {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
this.debug(`Tried to send packet ${data} but no WebSocket is available!`);
|
||||
return;
|
||||
}
|
||||
this.ws.send(WebSocket.pack(data));
|
||||
}
|
||||
|
||||
/**
|
||||
* Processes the current WebSocket queue.
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
processQueue() {
|
||||
if (this.ratelimit.remaining === 0) return;
|
||||
if (this.ratelimit.queue.length === 0) return;
|
||||
if (this.ratelimit.remaining === this.ratelimit.total) {
|
||||
this.ratelimit.resetTimer = this.manager.client.setTimeout(() => {
|
||||
this.ratelimit.remaining = this.ratelimit.total;
|
||||
this.processQueue();
|
||||
}, this.ratelimit.time);
|
||||
}
|
||||
while (this.ratelimit.remaining > 0) {
|
||||
const item = this.ratelimit.queue.shift();
|
||||
if (!item) return;
|
||||
this._send(item);
|
||||
this.ratelimit.remaining--;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Triggers a shard reconnect.
|
||||
* @param {?string} [event] The event for the shard to emit
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
reconnect(event) {
|
||||
this.heartbeat(-1);
|
||||
this.status = Status.RECONNECTING;
|
||||
|
||||
/**
|
||||
* Emitted whenever a shard tries to reconnect to the WebSocket.
|
||||
* @event Client#reconnecting
|
||||
*/
|
||||
this.manager.client.emit(Events.RECONNECTING, this.id);
|
||||
|
||||
if (event === Events.INVALIDATED) this.emit(event);
|
||||
this.manager.spawn(this.id);
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the current shard and terminates its connection.
|
||||
* @returns {void}
|
||||
* @private
|
||||
*/
|
||||
destroy() {
|
||||
this.heartbeat(-1);
|
||||
this.expectingClose = true;
|
||||
if (this.ws) this.ws.close(1000);
|
||||
this.ws = null;
|
||||
this.status = Status.DISCONNECTED;
|
||||
this.ratelimit.remaining = this.ratelimit.total;
|
||||
}
|
||||
}
|
||||
module.exports = WebSocketShard;
|
||||
Reference in New Issue
Block a user