feat: no-de-no-de, now with extra buns (#9683)

BREAKING CHANGE: The REST and RequestManager classes now extend AsyncEventEmitter
from `@vladfrangu/async_event_emitter`, which aids in cross-compatibility
between Node, Deno, Bun, CF Workers, Vercel Functions, etc.

BREAKING CHANGE: DefaultUserAgentAppendix has been adapted to support multiple
different platforms (previously mentioned Deno, Bun, CF Workers, etc)

BREAKING CHANGE: the entry point for `@discordjs/rest` will now differ
in non-node-like environments (CF Workers, etc.)

Co-authored-by: Suneet Tipirneni <77477100+suneettipirneni@users.noreply.github.com>
Co-authored-by: Jiralite <33201955+Jiralite@users.noreply.github.com>
Co-authored-by: suneettipirneni <suneettipirneni@icloud.com>
This commit is contained in:
Vlad Frangu
2023-07-17 09:27:57 +03:00
committed by GitHub
parent 351a18bc35
commit 386f206caf
25 changed files with 272 additions and 179 deletions

View File

@@ -20,7 +20,7 @@ import {
type GatewayReceivePayload,
type GatewaySendPayload,
} from 'discord-api-types/v10';
import { WebSocket, type RawData } from 'ws';
import { WebSocket, type Data } from 'ws';
import type { Inflate } from 'zlib-sync';
import type { IContextFetchingStrategy } from '../strategies/context/IContextFetchingStrategy.js';
import { ImportantGatewayOpcodes, getInitialSendRateLimitState } from '../utils/constants.js';
@@ -80,6 +80,12 @@ export interface SendRateLimitState {
resetAt: number;
}
// TODO(vladfrangu): enable this once https://github.com/oven-sh/bun/issues/3392 is solved
// const WebSocketConstructor: typeof WebSocket = shouldUseGlobalFetchAndWebSocket()
// ? (globalThis as any).WebSocket
// : WebSocket;
const WebSocketConstructor: typeof WebSocket = WebSocket;
export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private connection: WebSocket | null = null;
@@ -179,13 +185,27 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
const session = await this.strategy.retrieveSessionInfo(this.id);
const url = `${session?.resumeURL ?? this.strategy.options.gatewayInformation.url}?${params.toString()}`;
this.debug([`Connecting to ${url}`]);
const connection = new WebSocket(url, { handshakeTimeout: this.strategy.options.handshakeTimeout ?? undefined })
.on('message', this.onMessage.bind(this))
.on('error', this.onError.bind(this))
.on('close', this.onClose.bind(this));
const connection = new WebSocketConstructor(url, {
handshakeTimeout: this.strategy.options.handshakeTimeout ?? undefined,
});
connection.binaryType = 'arraybuffer';
connection.onmessage = (event) => {
void this.onMessage(event.data, event.data instanceof ArrayBuffer);
};
connection.onerror = (event) => {
this.onError(event.error);
};
connection.onclose = (event) => {
void this.onClose(event.code);
};
this.connection = connection;
this.#status = WebSocketShardStatus.Connecting;
@@ -249,9 +269,9 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
if (this.connection) {
// No longer need to listen to messages
this.connection.removeAllListeners('message');
this.connection.onmessage = null;
// Prevent a reconnection loop by unbinding the main close event
this.connection.removeAllListeners('close');
this.connection.onclose = null;
const shouldClose = this.connection.readyState === WebSocket.OPEN;
@@ -262,14 +282,22 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
]);
if (shouldClose) {
let outerResolve: () => void;
const promise = new Promise<void>((resolve) => {
outerResolve = resolve;
});
this.connection.onclose = outerResolve!;
this.connection.close(options.code, options.reason);
await once(this.connection, 'close');
await promise;
this.emit(WebSocketShardEvents.Closed, { code: options.code });
}
// Lastly, remove the error event.
// Doing this earlier would cause a hard crash in case an error event fired on our `close` call
this.connection.removeAllListeners('error');
this.connection.onerror = null;
} else {
this.debug(['Destroying a shard that has no connection; please open an issue on GitHub']);
}
@@ -476,17 +504,23 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.isAck = false;
}
private async unpackMessage(data: ArrayBuffer | Buffer, isBinary: boolean): Promise<GatewayReceivePayload | null> {
const decompressable = new Uint8Array(data);
private async unpackMessage(data: Data, isBinary: boolean): Promise<GatewayReceivePayload | null> {
// Deal with no compression
if (!isBinary) {
return JSON.parse(this.textDecoder.decode(decompressable)) as GatewayReceivePayload;
try {
return JSON.parse(data as string) as GatewayReceivePayload;
} catch {
// This is a non-JSON payload / (at the time of writing this comment) emitted by bun wrongly interpreting custom close codes https://github.com/oven-sh/bun/issues/3392
return null;
}
}
const decompressable = new Uint8Array(data as ArrayBuffer);
// Deal with identify compress
if (this.useIdentifyCompress) {
return new Promise((resolve, reject) => {
// eslint-disable-next-line promise/prefer-await-to-callbacks
inflate(decompressable, { chunkSize: 65_535 }, (err, result) => {
if (err) {
reject(err);
@@ -539,8 +573,8 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
return null;
}
private async onMessage(data: RawData, isBinary: boolean) {
const payload = await this.unpackMessage(data as ArrayBuffer | Buffer, isBinary);
private async onMessage(data: Data, isBinary: boolean) {
const payload = await this.unpackMessage(data, isBinary);
if (!payload) {
return;
}