refactor(rest): switch api to fetch-like and provide strategies (#9416)

BREAKING CHANGE: NodeJS v18+ is required when using node due to the use of global `fetch`
BREAKING CHANGE: The raw method of REST now returns a web compatible `Respone` object.
BREAKING CHANGE: The `parseResponse` utility method has been updated to operate on a web compatible `Response` object.
BREAKING CHANGE: Many underlying internals have changed, some of which were exported.
BREAKING CHANGE: `DefaultRestOptions` used to contain a default `agent`, which is now set to `null` instead.
This commit is contained in:
ckohen
2023-05-06 12:09:19 -07:00
committed by GitHub
parent fc5b9c523b
commit cdaa0a36f5
28 changed files with 317 additions and 203 deletions

View File

@@ -1,6 +1,8 @@
import { EventEmitter } from 'node:events';
import type { Readable } from 'node:stream';
import type { ReadableStream } from 'node:stream/web';
import type { Collection } from '@discordjs/collection';
import type { request, Dispatcher } from 'undici';
import type { Dispatcher, RequestInit, Response } from 'undici';
import { CDN } from './CDN.js';
import {
RequestManager,
@@ -11,7 +13,7 @@ import {
type RequestData,
type RouteLike,
} from './RequestManager.js';
import type { IHandler } from './handlers/IHandler.js';
import type { IHandler } from './interfaces/Handler.js';
import { DefaultRestOptions, RESTEvents } from './utils/constants.js';
import { parseResponse } from './utils/utils.js';
@@ -22,7 +24,7 @@ export interface RESTOptions {
/**
* The agent to set globally
*/
agent: Dispatcher;
agent: Dispatcher | null;
/**
* The base api path, without version
*
@@ -79,6 +81,13 @@ export interface RESTOptions {
* @defaultValue `0`
*/
invalidRequestWarningInterval: number;
/**
* The method called to perform the actual HTTP request given a url and web `fetch` options
* For example, to use global fetch, simply provide `makeRequest: fetch`
*
* @defaultValue `undici.request`
*/
makeRequest(url: string, init: RequestInit): Promise<ResponseLike>;
/**
* The extra offset to add to rate limits in milliseconds
*
@@ -179,7 +188,7 @@ export interface APIRequest {
/**
* Additional HTTP options for this request
*/
options: RequestOptions;
options: RequestInit;
/**
* The full path used to make the request
*/
@@ -194,6 +203,11 @@ export interface APIRequest {
route: string;
}
export interface ResponseLike
extends Pick<Response, 'arrayBuffer' | 'bodyUsed' | 'headers' | 'json' | 'ok' | 'status' | 'text'> {
body: Readable | ReadableStream | null;
}
export interface InvalidRequestWarningData {
/**
* Number of invalid requests that have been made in the window
@@ -212,7 +226,7 @@ export interface RestEvents {
newListener: [name: string, listener: (...args: any) => void];
rateLimited: [rateLimitInfo: RateLimitData];
removeListener: [name: string, listener: (...args: any) => void];
response: [request: APIRequest, response: Dispatcher.ResponseData];
response: [request: APIRequest, response: ResponseLike];
restDebug: [info: string];
}
@@ -233,8 +247,6 @@ export interface REST {
(<S extends string | symbol>(event?: Exclude<S, keyof RestEvents>) => this);
}
export type RequestOptions = Exclude<Parameters<typeof request>[1], undefined>;
export class REST extends EventEmitter {
public readonly cdn: CDN;

View File

@@ -5,11 +5,11 @@ import type { URLSearchParams } from 'node:url';
import { Collection } from '@discordjs/collection';
import { lazy } from '@discordjs/util';
import { DiscordSnowflake } from '@sapphire/snowflake';
import { FormData, type RequestInit, type BodyInit, type Dispatcher, type Agent } from 'undici';
import type { RESTOptions, RestEvents, RequestOptions } from './REST.js';
import type { RequestInit, BodyInit, Dispatcher, Agent } from 'undici';
import type { RESTOptions, ResponseLike, RestEvents } from './REST.js';
import { BurstHandler } from './handlers/BurstHandler.js';
import type { IHandler } from './handlers/IHandler.js';
import { SequentialHandler } from './handlers/SequentialHandler.js';
import type { IHandler } from './interfaces/Handler.js';
import {
BurstHandlerMajorIdKey,
DefaultRestOptions,
@@ -17,7 +17,6 @@ import {
OverwrittenMimeTypes,
RESTEvents,
} from './utils/constants.js';
import { resolveBody } from './utils/utils.js';
// Make this a lazy dynamic import as file-type is a pure ESM package
const getFileType = lazy(async () => import('file-type'));
@@ -323,7 +322,7 @@ export class RequestManager extends EventEmitter {
* @param request - All the information needed to make a request
* @returns The response from the api request
*/
public async queueRequest(request: InternalRequest): Promise<Dispatcher.ResponseData> {
public async queueRequest(request: InternalRequest): Promise<ResponseLike> {
// Generalize the endpoint to its route data
const routeId = RequestManager.generateRouteData(request.fullRoute, request.method);
// Get the bucket hash for the generic route, or point to a global route otherwise
@@ -373,7 +372,7 @@ export class RequestManager extends EventEmitter {
*
* @param request - The request data
*/
private async resolveRequest(request: InternalRequest): Promise<{ fetchOptions: RequestOptions; url: string }> {
private async resolveRequest(request: InternalRequest): Promise<{ fetchOptions: RequestInit; url: string }> {
const { options } = this;
let query = '';
@@ -470,20 +469,18 @@ export class RequestManager extends EventEmitter {
}
}
finalBody = await resolveBody(finalBody);
const method = request.method.toUpperCase();
const fetchOptions: RequestOptions = {
// The non null assertions in the following block are due to exactOptionalPropertyTypes, they have been tested to work with undefined
const fetchOptions: RequestInit = {
// Set body to null on get / head requests. This does not follow fetch spec (likely because it causes subtle bugs) but is aligned with what request was doing
body: ['GET', 'HEAD'].includes(method) ? null : finalBody!,
headers: { ...request.headers, ...additionalHeaders, ...headers } as Record<string, string>,
method: request.method.toUpperCase() as Dispatcher.HttpMethod,
method,
// Prioritize setting an agent per request, use the agent for this instance otherwise.
dispatcher: request.dispatcher ?? this.agent ?? undefined!,
};
if (finalBody !== undefined) {
fetchOptions.body = finalBody as Exclude<RequestOptions['body'], undefined>;
}
// Prioritize setting an agent per request, use the agent for this instance otherwise.
fetchOptions.dispatcher = request.dispatcher ?? this.agent ?? undefined!;
return { url, fetchOptions };
}

View File

@@ -0,0 +1,5 @@
import type * as undici from 'undici';
declare global {
export const { fetch, FormData, Headers, Request, Response }: typeof undici;
}

View File

@@ -1,10 +1,10 @@
import { setTimeout as sleep } from 'node:timers/promises';
import type { Dispatcher } from 'undici';
import type { RequestOptions } from '../REST.js';
import type { RequestInit } from 'undici';
import type { ResponseLike } from '../REST.js';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager.js';
import type { IHandler } from '../interfaces/Handler.js';
import { RESTEvents } from '../utils/constants.js';
import { onRateLimit, parseHeader } from '../utils/utils.js';
import type { IHandler } from './IHandler.js';
import { onRateLimit } from '../utils/utils.js';
import { handleErrors, incrementInvalidCount, makeNetworkRequest } from './Shared.js';
/**
@@ -54,9 +54,9 @@ export class BurstHandler implements IHandler {
public async queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData> {
): Promise<ResponseLike> {
return this.runRequest(routeId, url, options, requestData);
}
@@ -72,10 +72,10 @@ export class BurstHandler implements IHandler {
private async runRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
retries = 0,
): Promise<Dispatcher.ResponseData> {
): Promise<ResponseLike> {
const method = options.method ?? 'get';
const res = await makeNetworkRequest(this.manager, routeId, url, options, requestData, retries);
@@ -86,9 +86,9 @@ export class BurstHandler implements IHandler {
return this.runRequest(routeId, url, options, requestData, ++retries);
}
const status = res.statusCode;
const status = res.status;
let retryAfter = 0;
const retry = parseHeader(res.headers['retry-after']);
const retry = res.headers.get('Retry-After');
// Amount of time in milliseconds until we should retry if rate limited (globally or otherwise)
if (retry) retryAfter = Number(retry) * 1_000 + this.manager.options.offset;
@@ -102,7 +102,7 @@ export class BurstHandler implements IHandler {
return res;
} else if (status === 429) {
// Unexpected ratelimit
const isGlobal = res.headers['x-ratelimit-global'] !== undefined;
const isGlobal = res.headers.has('X-RateLimit-Global');
await onRateLimit(this.manager, {
timeToReset: retryAfter,
limit: Number.POSITIVE_INFINITY,

View File

@@ -1,11 +1,11 @@
import { setTimeout as sleep } from 'node:timers/promises';
import { AsyncQueue } from '@sapphire/async-queue';
import type { Dispatcher } from 'undici';
import type { RateLimitData, RequestOptions } from '../REST.js';
import type { RequestInit } from 'undici';
import type { RateLimitData, ResponseLike } from '../REST.js';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager.js';
import type { IHandler } from '../interfaces/Handler.js';
import { RESTEvents } from '../utils/constants.js';
import { hasSublimit, onRateLimit, parseHeader } from '../utils/utils.js';
import type { IHandler } from './IHandler.js';
import { hasSublimit, onRateLimit } from '../utils/utils.js';
import { handleErrors, incrementInvalidCount, makeNetworkRequest } from './Shared.js';
const enum QueueType {
@@ -134,9 +134,9 @@ export class SequentialHandler implements IHandler {
public async queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData> {
): Promise<ResponseLike> {
let queue = this.#asyncQueue;
let queueType = QueueType.Standard;
// Separate sublimited requests when already sublimited
@@ -195,10 +195,10 @@ export class SequentialHandler implements IHandler {
private async runRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
retries = 0,
): Promise<Dispatcher.ResponseData> {
): Promise<ResponseLike> {
/*
* After calculations have been done, pre-emptively stop further requests
* Potentially loop until this task can run if e.g. the global rate limit is hit twice
@@ -270,14 +270,14 @@ export class SequentialHandler implements IHandler {
return this.runRequest(routeId, url, options, requestData, ++retries);
}
const status = res.statusCode;
const status = res.status;
let retryAfter = 0;
const limit = parseHeader(res.headers['x-ratelimit-limit']);
const remaining = parseHeader(res.headers['x-ratelimit-remaining']);
const reset = parseHeader(res.headers['x-ratelimit-reset-after']);
const hash = parseHeader(res.headers['x-ratelimit-bucket']);
const retry = parseHeader(res.headers['retry-after']);
const limit = res.headers.get('X-RateLimit-Limit');
const remaining = res.headers.get('X-RateLimit-Remaining');
const reset = res.headers.get('X-RateLimit-Reset-After');
const hash = res.headers.get('X-RateLimit-Bucket');
const retry = res.headers.get('Retry-After');
// Update the total number of requests that can be made before the rate limit resets
this.limit = limit ? Number(limit) : Number.POSITIVE_INFINITY;
@@ -309,7 +309,7 @@ export class SequentialHandler implements IHandler {
// Handle retryAfter, which means we have actually hit a rate limit
let sublimitTimeout: number | null = null;
if (retryAfter > 0) {
if (res.headers['x-ratelimit-global'] !== undefined) {
if (res.headers.has('X-RateLimit-Global')) {
this.manager.globalRemaining = 0;
this.manager.globalReset = Date.now() + retryAfter;
} else if (!this.localLimited) {
@@ -327,7 +327,7 @@ export class SequentialHandler implements IHandler {
incrementInvalidCount(this.manager);
}
if (status >= 200 && status < 300) {
if (res.ok) {
return res;
} else if (status === 429) {
// A rate limit was hit - this may happen if the route isn't associated with an official bucket hash yet, or when first globally rate limited

View File

@@ -1,13 +1,13 @@
import { setTimeout, clearTimeout } from 'node:timers';
import { request, type Dispatcher } from 'undici';
import type { RequestOptions } from '../REST.js';
import { Response } from 'undici';
import type { RequestInit } from 'undici';
import type { ResponseLike } from '../REST.js';
import type { HandlerRequestData, RequestManager, RouteData } from '../RequestManager.js';
import type { DiscordErrorData, OAuthErrorData } from '../errors/DiscordAPIError.js';
import { DiscordAPIError } from '../errors/DiscordAPIError.js';
import { HTTPError } from '../errors/HTTPError.js';
import { RESTEvents } from '../utils/constants.js';
import { parseResponse, shouldRetry } from '../utils/utils.js';
import type { PolyFillAbortSignal } from './IHandler.js';
/**
* Invalid request limiting is done on a per-IP basis, not a per-token basis.
@@ -60,25 +60,23 @@ export async function makeNetworkRequest(
manager: RequestManager,
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
retries: number,
) {
const controller = new AbortController();
const timeout = setTimeout(() => controller.abort(), manager.options.timeout).unref();
if (requestData.signal) {
// The type polyfill is required because Node.js's types are incomplete.
const signal = requestData.signal as unknown as PolyFillAbortSignal;
// If the user signal was aborted, abort the controller, else abort the local signal.
// The reason why we don't re-use the user's signal, is because users may use the same signal for multiple
// requests, and we do not want to cause unexpected side-effects.
if (signal.aborted) controller.abort();
else signal.addEventListener('abort', () => controller.abort());
if (requestData.signal.aborted) controller.abort();
else requestData.signal.addEventListener('abort', () => controller.abort());
}
let res: Dispatcher.ResponseData;
let res: ResponseLike;
try {
res = await request(url, { ...options, signal: controller.signal });
res = await manager.options.makeRequest(url, { ...options, signal: controller.signal });
} catch (error: unknown) {
if (!(error instanceof Error)) throw error;
// Retry the specified number of times if needed
@@ -103,7 +101,7 @@ export async function makeNetworkRequest(
data: requestData,
retries,
},
{ ...res },
res instanceof Response ? res.clone() : { ...res },
);
}
@@ -123,13 +121,13 @@ export async function makeNetworkRequest(
*/
export async function handleErrors(
manager: RequestManager,
res: Dispatcher.ResponseData,
res: ResponseLike,
method: string,
url: string,
requestData: HandlerRequestData,
retries: number,
) {
const status = res.statusCode;
const status = res.status;
if (status >= 500 && status < 600) {
// Retry the specified number of times for possible server side issues
if (retries !== manager.options.retries) {

View File

@@ -1,5 +1,5 @@
import type { Dispatcher } from 'undici';
import type { RequestOptions } from '../REST.js';
import type { RequestInit } from 'undici';
import type { ResponseLike } from '../REST.js';
import type { HandlerRequestData, RouteData } from '../RequestManager.js';
export interface IHandler {
@@ -22,13 +22,7 @@ export interface IHandler {
queueRequest(
routeId: RouteData,
url: string,
options: RequestOptions,
options: RequestInit,
requestData: HandlerRequestData,
): Promise<Dispatcher.ResponseData>;
}
export interface PolyFillAbortSignal {
readonly aborted: boolean;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
): Promise<ResponseLike>;
}

View File

@@ -1,8 +1,12 @@
import process from 'node:process';
import { lazy } from '@discordjs/util';
import { APIVersion } from 'discord-api-types/v10';
import { Agent } from 'undici';
import type { RESTOptions } from '../REST.js';
const getUndiciRequest = lazy(async () => {
return import('../../strategies/undiciRequest.js');
});
export const DefaultUserAgent =
`DiscordBot (https://discord.js.org, [VI]{{inject}}[/VI])` as `DiscordBot (https://discord.js.org, ${string})`;
@@ -12,13 +16,7 @@ export const DefaultUserAgent =
export const DefaultUserAgentAppendix = process.release?.name === 'node' ? `Node.js/${process.version}` : '';
export const DefaultRestOptions = {
get agent() {
return new Agent({
connect: {
timeout: 30_000,
},
});
},
agent: null,
api: 'https://discord.com/api',
authPrefix: 'Bot',
cdn: 'https://cdn.discordapp.com',
@@ -34,6 +32,10 @@ export const DefaultRestOptions = {
hashSweepInterval: 14_400_000, // 4 Hours
hashLifetime: 86_400_000, // 24 Hours
handlerSweepInterval: 3_600_000, // 1 Hour
async makeRequest(...args) {
const strategy = await getUndiciRequest();
return strategy.makeRequest(...args);
},
} as const satisfies Required<RESTOptions>;
/**

View File

@@ -1,20 +1,9 @@
import { Blob, Buffer } from 'node:buffer';
import { URLSearchParams } from 'node:url';
import { types } from 'node:util';
import type { RESTPatchAPIChannelJSONBody } from 'discord-api-types/v10';
import { FormData, type Dispatcher, type RequestInit } from 'undici';
import type { RateLimitData, RequestOptions } from '../REST.js';
import type { RateLimitData, ResponseLike } from '../REST.js';
import { type RequestManager, RequestMethod } from '../RequestManager.js';
import { RateLimitError } from '../errors/RateLimitError.js';
export function parseHeader(header: string[] | string | undefined): string | undefined {
if (header === undefined || typeof header === 'string') {
return header;
}
return header.join(';');
}
function serializeSearchParam(value: unknown): string | null {
switch (typeof value) {
case 'string':
@@ -61,13 +50,12 @@ export function makeURLSearchParams<T extends object>(options?: Readonly<T>) {
*
* @param res - The fetch response
*/
export async function parseResponse(res: Dispatcher.ResponseData): Promise<unknown> {
const header = parseHeader(res.headers['content-type']);
if (header?.startsWith('application/json')) {
return res.body.json();
export async function parseResponse(res: ResponseLike): Promise<unknown> {
if (res.headers.get('Content-Type')?.startsWith('application/json')) {
return res.json();
}
return res.body.arrayBuffer();
return res.arrayBuffer();
}
/**
@@ -94,49 +82,6 @@ export function hasSublimit(bucketRoute: string, body?: unknown, method?: string
return true;
}
export async function resolveBody(body: RequestInit['body']): Promise<RequestOptions['body']> {
// eslint-disable-next-line no-eq-null, eqeqeq
if (body == null) {
return null;
} else if (typeof body === 'string') {
return body;
} else if (types.isUint8Array(body)) {
return body;
} else if (types.isArrayBuffer(body)) {
return new Uint8Array(body);
} else if (body instanceof URLSearchParams) {
return body.toString();
} else if (body instanceof DataView) {
return new Uint8Array(body.buffer);
} else if (body instanceof Blob) {
return new Uint8Array(await body.arrayBuffer());
} else if (body instanceof FormData) {
return body;
} else if ((body as Iterable<Uint8Array>)[Symbol.iterator]) {
const chunks = [...(body as Iterable<Uint8Array>)];
const length = chunks.reduce((a, b) => a + b.length, 0);
const uint8 = new Uint8Array(length);
let lengthUsed = 0;
return chunks.reduce((a, b) => {
a.set(b, lengthUsed);
lengthUsed += b.length;
return a;
}, uint8);
} else if ((body as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]) {
const chunks: Uint8Array[] = [];
for await (const chunk of body as AsyncIterable<Uint8Array>) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
throw new TypeError(`Unable to resolve body.`);
}
/**
* Check whether an error indicates that a retry can be attempted
*

View File

@@ -0,0 +1,70 @@
import { Buffer } from 'node:buffer';
import { URLSearchParams } from 'node:url';
import { types } from 'node:util';
import { type RequestInit, request } from 'undici';
import type { ResponseLike } from '../index.js';
export type RequestOptions = Exclude<Parameters<typeof request>[1], undefined>;
export async function makeRequest(url: string, init: RequestInit): Promise<ResponseLike> {
// The cast is necessary because `headers` and `method` are narrower types in `undici.request`
// our request path guarantees they are of acceptable type for `undici.request`
const options = {
...init,
body: await resolveBody(init.body),
} as RequestOptions;
const res = await request(url, options);
return {
body: res.body,
async arrayBuffer() {
return res.body.arrayBuffer();
},
async json() {
return res.body.json();
},
async text() {
return res.body.text();
},
get bodyUsed() {
return res.body.bodyUsed;
},
headers: new Headers(res.headers as Record<string, string[] | string>),
status: res.statusCode,
ok: res.statusCode >= 200 && res.statusCode < 300,
};
}
export async function resolveBody(body: RequestInit['body']): Promise<Exclude<RequestOptions['body'], undefined>> {
// eslint-disable-next-line no-eq-null, eqeqeq
if (body == null) {
return null;
} else if (typeof body === 'string') {
return body;
} else if (types.isUint8Array(body)) {
return body;
} else if (types.isArrayBuffer(body)) {
return new Uint8Array(body);
} else if (body instanceof URLSearchParams) {
return body.toString();
} else if (body instanceof DataView) {
return new Uint8Array(body.buffer);
} else if (body instanceof Blob) {
return new Uint8Array(await body.arrayBuffer());
} else if (body instanceof FormData) {
return body;
} else if ((body as Iterable<Uint8Array>)[Symbol.iterator]) {
const chunks = [...(body as Iterable<Uint8Array>)];
return Buffer.concat(chunks);
} else if ((body as AsyncIterable<Uint8Array>)[Symbol.asyncIterator]) {
const chunks: Uint8Array[] = [];
for await (const chunk of body as AsyncIterable<Uint8Array>) {
chunks.push(chunk);
}
return Buffer.concat(chunks);
}
throw new TypeError(`Unable to resolve body.`);
}