mirror of
https://github.com/discordjs/discord.js.git
synced 2026-03-10 08:33:30 +01:00
Rewrite WebSocket internals (#1410)
* Start rewriting Manager and Connection * more stuff * stuff * Fix ready bug * some stuff i forgot * fix some stuff * add stupid heartbeat ack like seriously who cares * woo! * fix a bug * rate limit the dumb websocket * stuff * hdocs * Docs * Remove ClientManager#setupKeepAlive as it is now redundant * Change Client._pingTimestamp to a getter that fetches the timestamp from the WebSocketConnection * are you happy now eslint smh * make gus happy * Add CloseEvent external doc * Make sure to emit 'reconnecting' when actually reconnecting * ffs * Fix RESUME logic * Add heartbeat ack debug messages, including latency data * Dumb stuff for Gus * thx eslint * more dumb stuff * more dumb crap smh gus i h8 u * moar messages * fix for using wrong status, causing certain events not to be fired (#1422)
This commit is contained in:
@@ -1,6 +1,8 @@
|
||||
const browser = require('os').platform() === 'browser';
|
||||
const EventEmitter = require('events');
|
||||
const Constants = require('../../util/Constants');
|
||||
const zlib = require('zlib');
|
||||
const PacketManager = require('./packets/WebSocketPacketManager');
|
||||
const erlpack = (function findErlpack() {
|
||||
try {
|
||||
const e = require('erlpack');
|
||||
@@ -26,45 +28,137 @@ const WebSocket = (function findWebSocket() {
|
||||
*/
|
||||
class WebSocketConnection extends EventEmitter {
|
||||
/**
|
||||
* @param {WebSocketManager} manager the WebSocket manager
|
||||
* @param {string} gateway Websocket gateway to connect to
|
||||
*/
|
||||
constructor(gateway) {
|
||||
super(gateway);
|
||||
this.ws = new WebSocket(gateway);
|
||||
if (browser) this.ws.binaryType = 'arraybuffer';
|
||||
this.ws.onmessage = this.eventMessage.bind(this);
|
||||
this.ws.onopen = this.emit.bind(this, 'open');
|
||||
this.ws.onclose = this.emit.bind(this, 'close');
|
||||
this.ws.onerror = this.emit.bind(this, 'error');
|
||||
constructor(manager, gateway) {
|
||||
super();
|
||||
/**
|
||||
* WebSocket Manager of this connection
|
||||
* @type {WebSocketManager}
|
||||
*/
|
||||
this.manager = manager;
|
||||
/**
|
||||
* Client this belongs to
|
||||
* @type {Client}
|
||||
*/
|
||||
this.client = manager.client;
|
||||
/**
|
||||
* WebSocket connection itself
|
||||
* @type {WebSocket}
|
||||
*/
|
||||
this.ws = null;
|
||||
/**
|
||||
* Current sequence of the WebSocket
|
||||
* @type {number}
|
||||
*/
|
||||
this.sequence = -1;
|
||||
/**
|
||||
* Current status of the client
|
||||
* @type {number}
|
||||
*/
|
||||
this.status = Constants.Status.IDLE;
|
||||
/**
|
||||
* Packet Manager of the connection
|
||||
* @type {WebSocketPacketManager}
|
||||
*/
|
||||
this.packetManager = new PacketManager(this);
|
||||
/**
|
||||
* Last time a ping was sent (a timestamp)
|
||||
* @type {number}
|
||||
*/
|
||||
this.lastPingTimestamp = 0;
|
||||
/**
|
||||
* Contains the rate limit queue and metadata
|
||||
* @type {Object}
|
||||
*/
|
||||
this.ratelimit = {
|
||||
queue: [],
|
||||
remaining: 120,
|
||||
resetTime: -1,
|
||||
};
|
||||
this.connect(gateway);
|
||||
/**
|
||||
* Events that are disabled (will not be processed)
|
||||
* @type {Object}
|
||||
*/
|
||||
this.disabledEvents = {};
|
||||
/**
|
||||
* Sequence on WebSocket close
|
||||
* @type {number}
|
||||
*/
|
||||
this.closeSequence = 0;
|
||||
for (const event of this.client.options.disabledEvents) this.disabledEvents[event] = true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called when the websocket gets a message
|
||||
* @param {Object} event Close event object
|
||||
* @returns {Promise<boolean>}
|
||||
* Causes the client to be marked as ready and emits the ready event
|
||||
* @returns {void}
|
||||
*/
|
||||
eventMessage(event) {
|
||||
try {
|
||||
const data = this.unpack(event.data);
|
||||
this.emit('packet', data);
|
||||
return true;
|
||||
} catch (err) {
|
||||
if (this.listenerCount('decodeError')) this.emit('decodeError', err);
|
||||
return false;
|
||||
triggerReady() {
|
||||
if (this.status === Constants.Status.READY) {
|
||||
this.debug('Tried to mark self as ready, but already ready');
|
||||
return;
|
||||
}
|
||||
this.status = Constants.Status.READY;
|
||||
this.client.emit(Constants.Events.READY);
|
||||
this.packetManager.handleQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Send data over the websocket
|
||||
* @param {string|Buffer} data Data to send
|
||||
* Checks whether the client is ready to be marked as ready
|
||||
* @returns {void}
|
||||
*/
|
||||
send(data) {
|
||||
this.ws.send(this.pack(data));
|
||||
checkIfReady() {
|
||||
if (this.status === Constants.Status.READY || this.status === Constants.Status.NEARLY) return false;
|
||||
let unavailableGuilds = 0;
|
||||
for (const guild of this.client.guilds.values()) {
|
||||
if (!guild.available) unavailableGuilds++;
|
||||
}
|
||||
if (unavailableGuilds === 0) {
|
||||
this.status = Constants.Status.NEARLY;
|
||||
if (!this.client.options.fetchAllMembers) return this.triggerReady();
|
||||
// Fetch all members before marking self as ready
|
||||
const promises = this.client.guilds.map(g => g.fetchMembers());
|
||||
Promise.all(promises)
|
||||
.then(() => this.triggerReady())
|
||||
.catch(e => {
|
||||
this.debug(`Failed to fetch all members before ready! ${e}`);
|
||||
this.triggerReady();
|
||||
});
|
||||
}
|
||||
return true;
|
||||
}
|
||||
|
||||
// Util
|
||||
/**
|
||||
* Emits a debug message
|
||||
* @param {string} message Debug message
|
||||
* @returns {void}
|
||||
*/
|
||||
debug(message) {
|
||||
if (message instanceof Error) message = message.stack;
|
||||
return this.manager.debug(`[connection] ${message}`);
|
||||
}
|
||||
|
||||
/**
|
||||
* Pack data using JSON or Erlpack
|
||||
* @param {*} data Data to pack
|
||||
* Attempts to serialise data from the WebSocket
|
||||
* @param {string|Object} data Data to unpack
|
||||
* @returns {Object}
|
||||
*/
|
||||
unpack(data) {
|
||||
if (erlpack && typeof data !== 'string') {
|
||||
if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data));
|
||||
return erlpack.unpack(data);
|
||||
} else if (data instanceof ArrayBuffer || data instanceof Buffer) {
|
||||
data = zlib.inflateSync(data).toString();
|
||||
}
|
||||
return JSON.parse(data);
|
||||
}
|
||||
|
||||
/**
|
||||
* Packs an object ready to be sent
|
||||
* @param {Object} data Data to pack
|
||||
* @returns {string|Buffer}
|
||||
*/
|
||||
pack(data) {
|
||||
@@ -72,45 +166,288 @@ class WebSocketConnection extends EventEmitter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Unpack data using JSON or Erlpack
|
||||
* @param {string|ArrayBuffer|Buffer} data Data to unpack
|
||||
* @returns {string|Object}
|
||||
* Processes the current WebSocket queue
|
||||
*/
|
||||
unpack(data) {
|
||||
if (erlpack && typeof data !== 'string') {
|
||||
if (data instanceof ArrayBuffer) data = Buffer.from(new Uint8Array(data));
|
||||
return erlpack.unpack(data);
|
||||
} else {
|
||||
if (data instanceof ArrayBuffer || data instanceof Buffer) data = this.inflate(data);
|
||||
return JSON.parse(data);
|
||||
processQueue() {
|
||||
if (this.ratelimit.remaining === 0) return;
|
||||
if (this.ratelimit.queue.length === 0) return;
|
||||
if (this.ratelimit.remaining === 120) {
|
||||
this.ratelimit.resetTimer = setTimeout(() => {
|
||||
this.ratelimit.remaining = 120;
|
||||
this.processQueue();
|
||||
}, 120e3); // eslint-disable-line
|
||||
}
|
||||
while (this.ratelimit.remaining > 0) {
|
||||
const item = this.ratelimit.queue.shift();
|
||||
if (!item) return;
|
||||
this._send(item);
|
||||
this.ratelimit.remaining--;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Zlib inflate data
|
||||
* @param {string|Buffer} data Data to inflate
|
||||
* @returns {string|Buffer}
|
||||
* Sends data, bypassing the queue
|
||||
* @param {Object} data Packet to send
|
||||
* @returns {void}
|
||||
*/
|
||||
inflate(data) {
|
||||
return erlpack ? data : zlib.inflateSync(data).toString();
|
||||
_send(data) {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
this.debug(`Tried to send packet ${data} but no WebSocket is available!`);
|
||||
return;
|
||||
}
|
||||
this.ws.send(this.pack(data));
|
||||
}
|
||||
|
||||
/**
|
||||
* State of the WebSocket
|
||||
* @type {number}
|
||||
* @readonly
|
||||
* Adds data to the queue to be sent
|
||||
* @param {Object} data Packet to send
|
||||
* @returns {void}
|
||||
*/
|
||||
get readyState() {
|
||||
return this.ws.readyState;
|
||||
send(data) {
|
||||
if (!this.ws || this.ws.readyState !== WebSocket.OPEN) {
|
||||
this.debug(`Tried to send packet ${data} but no WebSocket is available!`);
|
||||
return;
|
||||
}
|
||||
this.ratelimit.queue.push(data);
|
||||
this.processQueue();
|
||||
}
|
||||
|
||||
/**
|
||||
* Close the WebSocket
|
||||
* @param {number} code Close code
|
||||
* @param {string} [reason] Close reason
|
||||
* Creates a connection to a gateway
|
||||
* @param {string} gateway Gateway to connect to
|
||||
* @param {number} [after=0] How long to wait before connecting
|
||||
* @param {boolean} [force=false] Whether or not to force a new connection even if one already exists
|
||||
* @returns {boolean}
|
||||
*/
|
||||
close(code, reason) {
|
||||
this.ws.close(code, reason);
|
||||
connect(gateway = this.gateway, after = 0, force = false) {
|
||||
if (after) return this.client.setTimeout(() => this.connect(gateway, 0, force), after); // eslint-disable-line
|
||||
if (this.ws && !force) {
|
||||
this.debug('WebSocket connection already exists');
|
||||
return false;
|
||||
} else if (typeof gateway !== 'string') {
|
||||
this.debug(`Tried to connect to an invalid gateway: ${gateway}`);
|
||||
return false;
|
||||
}
|
||||
this.gateway = gateway;
|
||||
this.debug(`Connecting to ${gateway}`);
|
||||
const ws = this.ws = new WebSocket(gateway);
|
||||
if (browser) ws.binaryType = 'arraybuffer';
|
||||
ws.onmessage = this.onMessage.bind(this);
|
||||
ws.onopen = this.onOpen.bind(this);
|
||||
ws.onerror = this.onError.bind(this);
|
||||
ws.onclose = this.onClose.bind(this);
|
||||
this.status = Constants.Status.CONNECTING;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Destroys the connection
|
||||
* @returns {boolean}
|
||||
*/
|
||||
destroy() {
|
||||
const ws = this.ws;
|
||||
if (!ws) {
|
||||
this.debug('Attempted to destroy WebSocket but no connection exists!');
|
||||
return false;
|
||||
}
|
||||
this.heartbeat(-1);
|
||||
ws.close(1000);
|
||||
this.packetManager.handleQueue();
|
||||
this.ws = null;
|
||||
this.status = Constants.Status.DISCONNECTED;
|
||||
return true;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a message is received
|
||||
* @param {Event} event Event received
|
||||
* @returns {boolean}
|
||||
*/
|
||||
onMessage(event) {
|
||||
try {
|
||||
this.onPacket(this.unpack(event.data));
|
||||
return true;
|
||||
} catch (err) {
|
||||
this.debug(err);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets the current sequence of the connection
|
||||
* @param {number} s New sequence
|
||||
*/
|
||||
setSequence(s) {
|
||||
this.sequence = s > this.sequence ? s : this.sequence;
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a packet is received
|
||||
* @param {Object} packet received packet
|
||||
* @returns {boolean}
|
||||
*/
|
||||
onPacket(packet) {
|
||||
if (!packet) {
|
||||
this.debug('Received null packet');
|
||||
return false;
|
||||
}
|
||||
this.client.emit('raw', packet);
|
||||
switch (packet.op) {
|
||||
case Constants.OPCodes.HELLO:
|
||||
return this.heartbeat(packet.d.heartbeat_interval);
|
||||
case Constants.OPCodes.RECONNECT:
|
||||
return this.reconnect();
|
||||
case Constants.OPCodes.INVALID_SESSION:
|
||||
if (!packet.d) this.sessionID = null;
|
||||
this.debug('Session invalidated -- will identify with a new session');
|
||||
return this.identify(packet.d ? 2500 : 0);
|
||||
case Constants.OPCodes.HEARTBEAT_ACK:
|
||||
return this.ackHeartbeat();
|
||||
case Constants.OPCodes.HEARTBEAT:
|
||||
return this.heartbeat();
|
||||
default:
|
||||
return this.packetManager.handle(packet);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever a connection is opened to the gateway
|
||||
* @param {Event} event Received open event
|
||||
*/
|
||||
onOpen(event) {
|
||||
this.gateway = event.target.url;
|
||||
this.debug(`Connected to gateway ${this.gateway}`);
|
||||
this.identify();
|
||||
}
|
||||
|
||||
/**
|
||||
* Causes a reconnection to the gateway
|
||||
*/
|
||||
reconnect() {
|
||||
this.debug('Attemping to reconnect in 5500ms...');
|
||||
this.client.emit(Constants.Events.RECONNECTING);
|
||||
this.connect(this.gateway, 5500, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Called whenever an error occurs with the WebSocket
|
||||
* @param {Error} error Error that occurred
|
||||
*/
|
||||
onError(error) {
|
||||
this.client.emit(Constants.Events.ERROR, error);
|
||||
}
|
||||
|
||||
/**
|
||||
* @external CloseEvent
|
||||
* @see {@link https://developer.mozilla.org/en-US/docs/Web/API/CloseEvent}
|
||||
*/
|
||||
|
||||
/**
|
||||
* Called whenever a connection to the gateway is closed
|
||||
* @param {CloseEvent} event Close event that was received
|
||||
*/
|
||||
onClose(event) {
|
||||
this.debug(`Closed: ${event.code}`);
|
||||
this.closeSequence = this.sequence;
|
||||
// Reset the state before trying to fix anything
|
||||
this.emit('close', event);
|
||||
this.heartbeat(-1);
|
||||
// Should we reconnect?
|
||||
if (Constants.WSCodes[event.code]) {
|
||||
this.debug(Constants.WSCodes[event.code]);
|
||||
this.destroy();
|
||||
return;
|
||||
}
|
||||
this.reconnect();
|
||||
}
|
||||
|
||||
// Heartbeat
|
||||
/**
|
||||
* Acknowledges a heartbeat
|
||||
*/
|
||||
ackHeartbeat() {
|
||||
this.debug(`Heartbeat acknowledged, latency of ${Date.now() - this.lastPingTimestamp}ms`);
|
||||
this.client._pong(this.lastPingTimestamp);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends a heartbeat or sets an interval for sending heartbeats.
|
||||
* @param {number} [time] If -1, clears the interval, any other number sets an interval.
|
||||
* If no value is given, a heartbeat will be sent instantly.
|
||||
*/
|
||||
heartbeat(time) {
|
||||
if (!isNaN(time)) {
|
||||
if (time === -1) {
|
||||
this.debug('Clearing heartbeat interval');
|
||||
this.client.clearInterval(this.heartbeatInterval);
|
||||
this.heartbeatInterval = null;
|
||||
} else {
|
||||
this.debug(`Setting a heartbeat interval for ${time}ms`);
|
||||
this.heartbeatInterval = this.client.setInterval(() => this.heartbeat(), time);
|
||||
}
|
||||
return;
|
||||
}
|
||||
this.debug('Sending a heartbeat');
|
||||
this.lastPingTimestamp = Date.now();
|
||||
this.send({
|
||||
op: Constants.OPCodes.HEARTBEAT,
|
||||
d: this.sequence,
|
||||
});
|
||||
}
|
||||
|
||||
// Identification
|
||||
/**
|
||||
* Identifies the client on a connection
|
||||
* @param {number} [after] How long to wait before identifying
|
||||
* @returns {void}
|
||||
*/
|
||||
identify(after) {
|
||||
if (after) return this.client.setTimeout(this.identify.apply(this), after);
|
||||
return this.sessionID ? this.identifyResume() : this.identifyNew();
|
||||
}
|
||||
|
||||
/**
|
||||
* Identifies as a new connection on the gateway
|
||||
* @returns {void}
|
||||
*/
|
||||
identifyNew() {
|
||||
if (!this.client.token) {
|
||||
this.debug('No token available to identify a new session with');
|
||||
return;
|
||||
}
|
||||
// Clone the generic payload and assign the token
|
||||
const d = Object.assign({ token: this.client.token }, this.client.options.ws);
|
||||
|
||||
// Sharding stuff
|
||||
const { shardId, shardCount } = this.client.options;
|
||||
if (shardCount > 0) d.shard = [Number(shardId), Number(shardCount)];
|
||||
|
||||
// Send the payload
|
||||
this.debug('Identifying as a new session');
|
||||
this.send({ op: Constants.OPCodes.IDENTIFY, d });
|
||||
}
|
||||
|
||||
/**
|
||||
* Resumes a session on the gateway
|
||||
* @returns {void}
|
||||
*/
|
||||
identifyResume() {
|
||||
if (!this.sessionID) {
|
||||
this.debug('Warning: wanted to resume but session ID not available; identifying as a new session instead');
|
||||
return this.identifyNew();
|
||||
}
|
||||
this.debug(`Attempting to resume session ${this.sessionID}`);
|
||||
|
||||
const d = {
|
||||
token: this.client.token,
|
||||
session_id: this.sessionID,
|
||||
seq: this.sequence,
|
||||
};
|
||||
|
||||
return this.send({
|
||||
op: Constants.OPCodes.RESUME,
|
||||
d,
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
Reference in New Issue
Block a user