From 32fe72f909f2e93bcfcf6c17e87a96c7984a76aa Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Antonio=20Rom=C3=A1n?= Date: Fri, 25 Sep 2020 23:42:24 +0200 Subject: [PATCH] feat(Rest): switch queue to AsyncQueue (#4835) Co-authored-by: Sugden <28943913+NotSugden@users.noreply.github.com> --- src/rest/APIRequest.js | 1 + src/rest/AsyncQueue.js | 95 ++++++++++++++++++++++++++++++++++++++ src/rest/RESTManager.js | 15 +----- src/rest/RequestHandler.js | 94 +++++++++++++++---------------------- 4 files changed, 135 insertions(+), 70 deletions(-) create mode 100644 src/rest/AsyncQueue.js diff --git a/src/rest/APIRequest.js b/src/rest/APIRequest.js index b466cd01e..837b38f1b 100644 --- a/src/rest/APIRequest.js +++ b/src/rest/APIRequest.js @@ -15,6 +15,7 @@ class APIRequest { this.method = method; this.route = options.route; this.options = options; + this.retries = 0; let queryString = ''; if (options.query) { diff --git a/src/rest/AsyncQueue.js b/src/rest/AsyncQueue.js new file mode 100644 index 000000000..b465f6aaf --- /dev/null +++ b/src/rest/AsyncQueue.js @@ -0,0 +1,95 @@ +/** + * MIT License + * + * Copyright (c) 2020 kyranet, discord.js + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + */ + +'use strict'; + +// TODO(kyranet, vladfrangu): replace this with discord.js v13's core AsyncQueue. + +/** + * An async queue that preserves the stack and prevents lock-ups. + * @private + */ +class AsyncQueue { + constructor() { + /** + * The promises array. + * @type {Array<{promise: Promise, resolve: Function}>} + * @private + */ + this.promises = []; + } + + /** + * The remaining amount of queued promises + * @type {number} + */ + get remaining() { + return this.promises.length; + } + + /** + * Waits for last promise and queues a new one. + * @returns {Promise} + * @example + * const queue = new AsyncQueue(); + * async function request(url, options) { + * await queue.wait(); + * try { + * const result = await fetch(url, options); + * // Do some operations with 'result' + * } finally { + * // Remove first entry from the queue and resolve for the next entry + * queue.shift(); + * } + * } + * + * request(someUrl1, someOptions1); // Will call fetch() immediately + * request(someUrl2, someOptions2); // Will call fetch() after the first finished + * request(someUrl3, someOptions3); // Will call fetch() after the second finished + */ + wait() { + const next = this.promises.length ? this.promises[this.promises.length - 1].promise : Promise.resolve(); + let resolve; + const promise = new Promise(res => { + resolve = res; + }); + + this.promises.push({ + resolve, + promise, + }); + + return next; + } + + /** + * Frees the queue's lock for the next item to process. + */ + shift() { + const deferred = this.promises.shift(); + if (typeof deferred !== 'undefined') deferred.resolve(); + } +} + +module.exports = AsyncQueue; diff --git a/src/rest/RESTManager.js b/src/rest/RESTManager.js index b1d2f288b..799f60221 100644 --- a/src/rest/RESTManager.js +++ b/src/rest/RESTManager.js @@ -35,19 +35,6 @@ class RESTManager { return Endpoints.CDN(this.client.options.http.cdn); } - push(handler, apiRequest) { - return new Promise((resolve, reject) => { - handler - .push({ - request: apiRequest, - resolve, - reject, - retries: 0, - }) - .catch(reject); - }); - } - request(method, url, options = {}) { const apiRequest = new APIRequest(this, method, url, options); let handler = this.handlers.get(apiRequest.route); @@ -57,7 +44,7 @@ class RESTManager { this.handlers.set(apiRequest.route, handler); } - return this.push(handler, apiRequest); + return handler.push(apiRequest); } get endpoint() { diff --git a/src/rest/RequestHandler.js b/src/rest/RequestHandler.js index 27879f0bf..a52a5985b 100644 --- a/src/rest/RequestHandler.js +++ b/src/rest/RequestHandler.js @@ -1,5 +1,6 @@ 'use strict'; +const AsyncQueue = require('./AsyncQueue'); const DiscordAPIError = require('./DiscordAPIError'); const HTTPError = require('./HTTPError'); const { @@ -25,46 +26,31 @@ function calculateReset(reset, serverDate) { class RequestHandler { constructor(manager) { this.manager = manager; - this.busy = false; - this.queue = []; + this.queue = new AsyncQueue(); this.reset = -1; this.remaining = -1; this.limit = -1; this.retryAfter = -1; } - push(request) { - if (this.busy) { - this.queue.push(request); - return this.run(); - } else { - return this.execute(request); + async push(request) { + await this.queue.wait(); + try { + return await this.execute(request); + } finally { + this.queue.shift(); } } - run() { - if (this.queue.length === 0) return Promise.resolve(); - return this.execute(this.queue.shift()); - } - get limited() { return Boolean(this.manager.globalTimeout) || (this.remaining <= 0 && Date.now() < this.reset); } get _inactive() { - return this.queue.length === 0 && !this.limited && this.busy !== true; + return this.queue.remaining === 0 && !this.limited; } - async execute(item) { - // Insert item back to the beginning if currently busy - if (this.busy) { - this.queue.unshift(item); - return null; - } - - this.busy = true; - const { reject, request, resolve } = item; - + async execute(request) { // After calculations and requests have been done, pre-emptively stop further requests if (this.limited) { const timeout = this.reset + this.manager.client.options.restTimeOffset - Date.now(); @@ -103,8 +89,7 @@ class RequestHandler { res = await request.make(); } catch (error) { // NodeFetch error expected for all "operational" errors, such as 500 status code - this.busy = false; - return reject(new HTTPError(error.message, error.constructor.name, error.status, request.method, request.path)); + throw new HTTPError(error.message, error.constructor.name, error.status, request.method, request.path); } if (res && res.headers) { @@ -120,7 +105,7 @@ class RequestHandler { this.retryAfter = retryAfter ? Number(retryAfter) : -1; // https://github.com/discordapp/discord-api-docs/issues/182 - if (item.request.route.includes('reactions')) { + if (request.route.includes('reactions')) { this.reset = new Date(serverDate).getTime() - getAPIOffset(serverDate) + 250; } @@ -137,42 +122,39 @@ class RequestHandler { } } - // Finished handling headers, safe to unlock manager - this.busy = false; - if (res.ok) { - const success = await parseResponse(res); // Nothing wrong with the request, proceed with the next one - resolve(success); - return this.run(); - } else if (res.status === 429) { + return parseResponse(res); + } + + // Handle ratelimited requests + if (res.status === 429) { // A ratelimit was hit - this should never happen - this.queue.unshift(item); - this.manager.client.emit('debug', `429 hit on route ${item.request.route}`); + this.manager.client.emit('debug', `429 hit on route ${request.route}`); await Util.delayFor(this.retryAfter); - return this.run(); - } else if (res.status >= 500 && res.status < 600) { + return this.execute(request); + } + + // Handle server errors + if (res.status >= 500 && res.status < 600) { // Retry the specified number of times for possible serverside issues - if (item.retries === this.manager.client.options.retryLimit) { - return reject( - new HTTPError(res.statusText, res.constructor.name, res.status, item.request.method, request.path), - ); - } else { - item.retries++; - this.queue.unshift(item); - return this.run(); + if (request.retries === this.manager.client.options.retryLimit) { + throw new HTTPError(res.statusText, res.constructor.name, res.status, request.method, request.path); } - } else { - // Handle possible malformed requests - try { - const data = await parseResponse(res); - if (res.status >= 400 && res.status < 500) { - return reject(new DiscordAPIError(request.path, data, request.method, res.status)); - } - return null; - } catch (err) { - return reject(new HTTPError(err.message, err.constructor.name, err.status, request.method, request.path)); + + request.retries++; + return this.execute(request); + } + + // Handle possible malformed requests + try { + const data = await parseResponse(res); + if (res.status >= 400 && res.status < 500) { + throw new DiscordAPIError(request.path, data, request.method, res.status); } + return null; + } catch (err) { + throw new HTTPError(err.message, err.constructor.name, err.status, request.method, request.path); } } }