fix: Sharding issues, silent disconnects and code cleanup (#2976)

* fix: Sharding bugs, silent disconnects and cleanup code

* typings

* fix: Destroy connecting with close code different from 1000
Per `If a client does not receive a heartbeat ack between its attempts at sending heartbeats, it should immediately terminate the connection with a non-1000 close code, reconnect, and attempt to resume.`

* misc: Wait x ms before reconnecting
Per https://discordapp.com/developers/docs/topics/gateway#resuming

* docs

* nit: docs

* misc: Prevent multiple calls to WebSocketManager#destroy

* fix: Implement destroying if you reset the token

* misc: Clear the WS packet queue on WebSocketShard#destroy
You can't send those packets anywhere anymore, so no point in keeping them

* fix: Handle session limits when reconnecting a full shard, cleanup

* misc: No need to create a new shard instance

* fix: closeSequence being null, thus emitting null on Client#resumed

* misc: Remove GUILD_SYNC Gateway handler and add missing dot to string

* misc: Close WS with code 4000 if we didn't get a heartbeat in time

As said in the Discord API server

* fix: Handle ready emitting in onPacket
Doesn't allow broken packets

* misc: Close the connection if Discord asks for a reconnect
Prevents double triggers

* testing: Prevent multiple reconnect attempts on a shard

Should fix some issues some people have had.

* fix: Prevent multiple reconnect calls on the shard, re-use conn to identify, remove reconnect function
Note: Closing the WS with 1000 makes the session invalid

* misc: Forgot to remove 2 unneeded setters

* docs: Wrong param docstring for WebSocketShard#destroy

* misc: Set status to reconnecting after destroying

* misc: Close connection with code 1000 on session invalidated
Allows us to cleanup the shard and do a full reconnect
Also remove identify wait delay, not used anywhere

* fix: Fix zlib crash on node
And with that, the PR is done!

* misc: Implement a reconnect queue
And that is all there was to be done in this PR.
Shards now queue up for a reconnect

* nit: Debug the queue after destroying

* docs: Make the invalidated event clearer

* lint: I'm good at my job

* docs

* docs: Make description for isReconnectingShards accurate
*can I stop finding issues, this PR is meant to be done*

* misc: Remove shard from bind params

* misc: Code re-ordering and cleanup
Resumes do not need to be queued up, as they do not count to the identify limit, and after some testing, they don't have the 5 second delay required, like in identify

* fix: Issues with token regeneration and shards not properly handling them
We close the ws connection with code 1000 if we get an invalid session payload,
that way we can queue the reconnects and handle any issues

* misc: Remove useless delays on session invalidated
They get handled by the rest of the code already

* lint

* misc: reset the sequence on Shard#destroy
This especially is a problem if you need to re-identify, as the sequence doesn't get set to the current one,
causing the sequence to be wrong

* fix: GitHub rebase and minor tweak
* Implement a 15 second timeout if shards don't connect till then
Should prevent shards that never reconnect

* revert: Make WebSocketShard#send and WebSocketManager#broadcast public

* typings: Set type to void instead of undefined

* docs: Requested Changes
This commit is contained in:
Vlad Frangu
2019-02-10 18:28:03 +02:00
committed by Amish Shah
parent 7324a993ed
commit 793341dbb4
4 changed files with 353 additions and 297 deletions

View File

@@ -4,6 +4,7 @@ const EventEmitter = require('events');
const WebSocket = require('../../WebSocket');
const { Status, Events, OPCodes, WSEvents, WSCodes } = require('../../util/Constants');
const Util = require('../../util/Util');
let zlib;
try {
zlib = require('zlib-sync');
@@ -13,10 +14,10 @@ try {
}
/**
* Represents a Shard's Websocket connection.
* Represents a Shard's WebSocket connection
*/
class WebSocketShard extends EventEmitter {
constructor(manager, id, oldShard) {
constructor(manager, id) {
super();
/**
@@ -26,7 +27,7 @@ class WebSocketShard extends EventEmitter {
this.manager = manager;
/**
* The id of the this shard.
* The ID of the this shard
* @type {number}
*/
this.id = id;
@@ -38,28 +39,28 @@ class WebSocketShard extends EventEmitter {
this.status = Status.IDLE;
/**
* The current sequence of the WebSocket
* The current sequence of the shard
* @type {number}
* @private
*/
this.sequence = oldShard ? oldShard.sequence : -1;
this.sequence = -1;
/**
* The sequence on WebSocket close
* The sequence of the shard after close
* @type {number}
* @private
*/
this.closeSequence = oldShard ? oldShard.closeSequence : 0;
this.closeSequence = 0;
/**
* The current session id of the WebSocket
* @type {?string}
* The current session ID of the shard
* @type {string}
* @private
*/
this.sessionID = oldShard && oldShard.sessionID;
this.sessionID = undefined;
/**
* Previous heartbeat pings of the websocket (most recent first, limited to three elements)
* The previous 3 heartbeat pings of the shard (most recent first)
* @type {number[]}
*/
this.pings = [];
@@ -71,6 +72,13 @@ class WebSocketShard extends EventEmitter {
*/
this.lastPingTimestamp = -1;
/**
* If we received a heartbeat ack back. Used to identify zombie connections
* @type {boolean}
* @private
*/
this.lastHeartbeatAcked = true;
/**
* List of servers the shard is connected to
* @type {string[]}
@@ -96,7 +104,7 @@ class WebSocketShard extends EventEmitter {
* @type {?WebSocket}
* @private
*/
this.ws = null;
this.connection = null;
/**
* @external Inflate
@@ -110,13 +118,7 @@ class WebSocketShard extends EventEmitter {
*/
this.inflate = null;
/**
* Whether or not the WebSocket is expected to be closed
* @type {boolean}
*/
this.expectingClose = false;
this.connect();
if (this.manager.gateway) this.connect();
}
/**
@@ -135,35 +137,42 @@ class WebSocketShard extends EventEmitter {
* @private
*/
debug(message) {
this.manager.debug(`[shard ${this.id}] ${message}`);
this.manager.debug(`[Shard ${this.id}] ${message}`);
}
/**
* Sends a heartbeat or sets an interval for sending heartbeats.
* @param {number} [time] If -1, clears the interval, any other number sets an interval
* If no value is given, a heartbeat will be sent instantly
* Sends a heartbeat to the WebSocket.
* If this shard didn't receive a heartbeat last time, it will destroy it and reconnect
* @private
*/
heartbeat(time) {
if (!isNaN(time)) {
if (time === -1) {
sendHeartbeat() {
if (!this.lastHeartbeatAcked) {
this.debug("Didn't receive a heartbeat ack last time, assuming zombie conenction. Destroying and reconnecting.");
this.connection.close(4000);
return;
}
this.debug('Sending a heartbeat');
this.lastHeartbeatAcked = false;
this.lastPingTimestamp = Date.now();
this.send({ op: OPCodes.HEARTBEAT, d: this.sequence });
}
/**
* Sets the heartbeat timer for this shard.
* @param {number} time If -1, clears the interval, any other number sets an interval
* @private
*/
setHeartbeatTimer(time) {
if (time === -1) {
if (this.heartbeatInterval) {
this.debug('Clearing heartbeat interval');
this.manager.client.clearInterval(this.heartbeatInterval);
this.heartbeatInterval = null;
} else {
this.debug(`Setting a heartbeat interval for ${time}ms`);
if (this.heartbeatInterval) this.manager.client.clearInterval(this.heartbeatInterval);
this.heartbeatInterval = this.manager.client.setInterval(() => this.heartbeat(), time);
}
return;
}
this.debug('Sending a heartbeat');
this.lastPingTimestamp = Date.now();
this.send({
op: OPCodes.HEARTBEAT,
d: this.sequence,
});
this.debug(`Setting a heartbeat interval for ${time}ms`);
this.heartbeatInterval = this.manager.client.setInterval(() => this.sendHeartbeat(), time);
}
/**
@@ -171,6 +180,7 @@ class WebSocketShard extends EventEmitter {
* @private
*/
ackHeartbeat() {
this.lastHeartbeatAcked = true;
const latency = Date.now() - this.lastPingTimestamp;
this.debug(`Heartbeat acknowledged, latency of ${latency}ms`);
this.pings.unshift(latency);
@@ -178,18 +188,19 @@ class WebSocketShard extends EventEmitter {
}
/**
* Connects the shard to a gateway.
* Connects this shard to the gateway.
* @private
*/
connect() {
const { expectingClose, gateway } = this.manager;
if (expectingClose) return;
this.inflate = new zlib.Inflate({
chunkSize: 65535,
flush: zlib.Z_SYNC_FLUSH,
to: WebSocket.encoding === 'json' ? 'string' : '',
});
const gateway = this.manager.gateway;
this.debug(`Connecting to ${gateway}`);
const ws = this.ws = WebSocket.create(gateway, {
const ws = this.connection = WebSocket.create(gateway, {
v: this.manager.client.options.ws.version,
compress: 'zlib-stream',
});
@@ -200,73 +211,12 @@ class WebSocketShard extends EventEmitter {
this.status = Status.CONNECTING;
}
/**
* Called whenever a packet is received
* @param {Object} packet Packet received
* @returns {any}
* @private
*/
onPacket(packet) {
if (!packet) {
this.debug('Received null packet');
return false;
}
switch (packet.t) {
case WSEvents.READY:
this.sessionID = packet.d.session_id;
this.trace = packet.d._trace;
this.status = Status.READY;
this.debug(`READY ${this.trace.join(' -> ')} ${this.sessionID}`);
this.heartbeat();
break;
case WSEvents.RESUMED: {
this.trace = packet.d._trace;
this.status = Status.READY;
const replayed = packet.s - this.closeSequence;
this.debug(`RESUMED ${this.trace.join(' -> ')} | replayed ${replayed} events.`);
this.heartbeat();
break;
}
}
if (packet.s > this.sequence) this.sequence = packet.s;
switch (packet.op) {
case OPCodes.HELLO:
this.identify();
return this.heartbeat(packet.d.heartbeat_interval);
case OPCodes.RECONNECT:
return this.reconnect();
case OPCodes.INVALID_SESSION:
this.sequence = -1;
this.debug('Session invalidated');
// If the session isn't resumable
if (!packet.d) {
// If we had a session ID before
if (this.sessionID) {
this.sessionID = null;
return this.identify(2500);
}
return this.identify(5000);
}
return this.identify();
case OPCodes.HEARTBEAT_ACK:
return this.ackHeartbeat();
case OPCodes.HEARTBEAT:
return this.heartbeat();
default:
return this.manager.handlePacket(packet, this);
}
}
/**
* Called whenever a connection is opened to the gateway.
* @param {Event} event Received open event
* @private
*/
onOpen() {
this.debug('Connection open');
this.debug('Connected to the gateway');
}
/**
@@ -293,87 +243,102 @@ class WebSocketShard extends EventEmitter {
this.manager.client.emit(Events.ERROR, err);
return;
}
if (packet.t === WSEvents.READY) {
/**
* Emitted when a shard becomes ready
* @event WebSocketShard#ready
*/
this.emit(Events.READY);
/**
* Emitted when a shard becomes ready
* @event Client#shardReady
* @param {number} shardID The id of the shard
*/
this.manager.client.emit(Events.SHARD_READY, this.id);
}
this.onPacket(packet);
}
/**
* Called whenever an error occurs with the WebSocket.
* @param {Error} error The error that occurred
* Called whenever a packet is received.
* @param {Object} packet Packet received
* @private
*/
onError(error) {
if (error && error.message === 'uWs client connection error') {
this.reconnect();
onPacket(packet) {
if (!packet) {
this.debug('Received null or broken packet');
return;
}
this.emit(Events.INVALIDATED);
/**
* Emitted whenever the client's WebSocket encounters a connection error.
* @event Client#error
* @param {Error} error The encountered error
*/
this.manager.client.emit(Events.ERROR, error);
}
switch (packet.t) {
case WSEvents.READY:
/**
* Emitted when a shard becomes ready.
* @event WebSocketShard#ready
*/
this.emit(Events.READY);
/**
* Emitted when a shard becomes ready.
* @event Client#shardReady
* @param {number} shardID The ID of the shard
*/
this.manager.client.emit(Events.SHARD_READY, this.id);
/**
* @external CloseEvent
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
*/
/**
* Called whenever a connection to the gateway is closed.
* @param {CloseEvent} event Close event that was received
* @returns {void}
* @private
*/
onClose(event) {
this.closeSequence = this.sequence;
this.emit('close', event);
if (event.code === 1000 ? this.expectingClose : WSCodes[event.code]) {
/**
* Emitted when the client's WebSocket disconnects and will no longer attempt to reconnect.
* @event Client#disconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} shardID The shard that disconnected
*/
this.manager.client.emit(Events.DISCONNECT, event, this.id);
this.debug(WSCodes[event.code]);
this.heartbeat(-1);
return;
this.sessionID = packet.d.session_id;
this.trace = packet.d._trace;
this.status = Status.READY;
this.debug(`READY ${this.trace.join(' -> ')} | Session ${this.sessionID}`);
this.lastHeartbeatAcked = true;
this.sendHeartbeat();
break;
case WSEvents.RESUMED: {
this.emit(Events.RESUMED);
this.trace = packet.d._trace;
this.status = Status.READY;
const replayed = packet.s - this.closeSequence;
this.debug(`RESUMED ${this.trace.join(' -> ')} | Replayed ${replayed} events.`);
this.lastHeartbeatAcked = true;
this.sendHeartbeat();
break;
}
}
if (packet.s > this.sequence) this.sequence = packet.s;
switch (packet.op) {
case OPCodes.HELLO:
this.setHeartbeatTimer(packet.d.heartbeat_interval);
this.identify();
break;
case OPCodes.RECONNECT:
this.connection.close(1001);
break;
case OPCodes.INVALID_SESSION:
this.debug(`Session was invalidated. Resumable: ${packet.d}.`);
// If the session isn't resumable
if (!packet.d) {
// Reset the sequence, since it isn't valid anymore
this.sequence = -1;
// If we had a session ID before
if (this.sessionID) {
this.sessionID = null;
this.connection.close(1000);
return;
}
this.connection.close(1000);
return;
}
this.identifyResume();
break;
case OPCodes.HEARTBEAT_ACK:
this.ackHeartbeat();
break;
case OPCodes.HEARTBEAT:
this.sendHeartbeat();
break;
default:
this.manager.handlePacket(packet, this);
}
this.expectingClose = false;
this.reconnect(Events.INVALIDATED, 5100);
}
/**
* Identifies the client on a connection.
* @param {?number} [wait=0] Amount of time to wait before identifying
* @returns {void}
* @private
*/
identify(wait = 0) {
if (wait) return this.manager.client.setTimeout(this.identify.bind(this), wait);
identify() {
return this.sessionID ? this.identifyResume() : this.identifyNew();
}
/**
* Identifies as a new connection on the gateway.
* @returns {void}
* @private
*/
identifyNew() {
@@ -382,10 +347,11 @@ class WebSocketShard extends EventEmitter {
return;
}
// Clone the generic payload and assign the token
const d = Object.assign({ token: this.manager.client.token }, this.manager.client.options.ws);
const { totalShardCount } = this.manager.client.options;
d.shard = [this.id, Number(totalShardCount)];
const d = {
...this.manager.client.options.ws,
token: this.manager.client.token,
shard: [this.id, Number(this.manager.client.options.totalShardCount)],
};
// Send the payload
this.debug('Identifying as a new session');
@@ -402,23 +368,90 @@ class WebSocketShard extends EventEmitter {
this.debug('Warning: wanted to resume but session ID not available; identifying as a new session instead');
return this.identifyNew();
}
this.debug(`Attempting to resume session ${this.sessionID}`);
this.debug(`Attempting to resume session ${this.sessionID} at sequence ${this.closeSequence}`);
const d = {
token: this.manager.client.token,
session_id: this.sessionID,
seq: this.sequence,
seq: this.closeSequence,
};
return this.send({
op: OPCodes.RESUME,
d,
});
return this.send({ op: OPCodes.RESUME, d });
}
/**
* Called whenever an error occurs with the WebSocket.
* @param {Error} error The error that occurred
* @private
*/
onError(error) {
if (error && error.message === 'uWs client connection error') {
this.connection.close(4000);
return;
}
/**
* Emitted whenever the client's WebSocket encounters a connection error.
* @event Client#error
* @param {Error} error The encountered error
* @param {number} shardID The shard that encountered this error
*/
this.manager.client.emit(Events.ERROR, error, this.id);
}
/**
* @external CloseEvent
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
*/
/**
* Called whenever a connection to the gateway is closed.
* @param {CloseEvent} event Close event that was received
* @private
*/
onClose(event) {
this.closeSequence = this.sequence;
this.debug(`WebSocket was closed.
Event Code: ${event.code}
Reason: ${event.reason}`);
if (event.code === 1000 ? this.manager.expectingClose : WSCodes[event.code]) {
/**
* Emitted when the client's WebSocket disconnects and will no longer attempt to reconnect.
* @event Client#disconnect
* @param {CloseEvent} event The WebSocket close event
* @param {number} shardID The shard that disconnected
*/
this.manager.client.emit(Events.DISCONNECT, event, this.id);
this.debug(WSCodes[event.code]);
return;
}
this.destroy();
this.status = Status.RECONNECTING;
/**
* Emitted whenever a shard tries to reconnect to the WebSocket.
* @event Client#reconnecting
* @param {number} shardID The shard ID that is reconnecting
*/
this.manager.client.emit(Events.RECONNECTING, this.id);
this.debug(`${this.sessionID ? `Reconnecting in 3500ms` : 'Queueing a reconnect'} to the gateway...`);
if (this.sessionID) {
Util.delayFor(3500).then(() => this.connect());
} else {
this.manager.reconnect(this);
}
}
/**
* Adds data to the queue to be sent.
* @param {Object} data Packet to send
* @private
* @returns {void}
*/
send(data) {
@@ -433,11 +466,12 @@ class WebSocketShard extends EventEmitter {
* @private
*/
_send(data) {
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
this.debug(`Tried to send packet ${data} but no WebSocket is available!`);
if (!this.connection || this.connection.readyState !== WebSocket.OPEN) {
this.debug(`Tried to send packet ${JSON.stringify(data)} but no WebSocket is available!`);
return;
}
this.ws.send(WebSocket.pack(data), err => {
this.connection.send(WebSocket.pack(data), err => {
if (err) this.manager.client.emit(Events.ERROR, err);
});
}
@@ -465,44 +499,22 @@ class WebSocketShard extends EventEmitter {
}
/**
* Triggers a shard reconnect.
* @param {?string} [event] The event for the shard to emit
* @param {?number} [reconnectIn] Time to wait before reconnecting
* @returns {Promise<void>}
* @private
*/
async reconnect(event, reconnectIn) {
this.heartbeat(-1);
this.status = Status.RECONNECTING;
/**
* Emitted whenever a shard tries to reconnect to the WebSocket.
* @event Client#reconnecting
*/
this.manager.client.emit(Events.RECONNECTING, this.id);
if (event === Events.INVALIDATED) this.emit(event);
this.debug(reconnectIn ? `Reconnecting in ${reconnectIn}ms` : 'Reconnecting now');
if (reconnectIn) await Util.delayFor(reconnectIn);
this.manager.spawn(this.id);
}
/**
* Destroys the current shard and terminates its connection.
* @returns {void}
* Destroys this shard and closes its connection.
* @private
*/
destroy() {
this.heartbeat(-1);
this.expectingClose = true;
if (this.ws) this.ws.close(1000);
this.ws = null;
this.setHeartbeatTimer(-1);
if (this.connection) this.connection.close(1000);
this.connection = null;
this.status = Status.DISCONNECTED;
this.ratelimit.remaining = this.ratelimit.total;
this.ratelimit.queue.length = 0;
if (this.ratelimit.timer) {
this.manager.client.clearTimeout(this.ratelimit.timer);
this.ratelimit.timer = null;
}
this.sequence = -1;
}
}
module.exports = WebSocketShard;