From ef6aa2697e07277ae9f561f72558f38f358d2e08 Mon Sep 17 00:00:00 2001 From: Will Nelson Date: Wed, 13 Jun 2018 19:42:43 -0700 Subject: [PATCH] rework concurrency --- package.json | 1 + src/rest/APIRequest.js | 2 +- src/rest/RESTManager.js | 120 +++++++++------- src/rest/RequestBucket.js | 210 ++++++++++++++++++++++++++++ src/rest/handlers/RequestHandler.js | 114 --------------- src/rest/handlers/burst.js | 15 -- src/rest/handlers/index.js | 5 - src/rest/handlers/sequential.js | 18 --- src/util/Constants.js | 9 +- test/tester1000.js | 19 ++- 10 files changed, 303 insertions(+), 210 deletions(-) create mode 100644 src/rest/RequestBucket.js delete mode 100644 src/rest/handlers/RequestHandler.js delete mode 100644 src/rest/handlers/burst.js delete mode 100644 src/rest/handlers/index.js delete mode 100644 src/rest/handlers/sequential.js diff --git a/package.json b/package.json index eb6cbf33d..95cdcc70a 100644 --- a/package.json +++ b/package.json @@ -34,6 +34,7 @@ "runkitExampleFilename": "./docs/examples/ping.js", "unpkg": "./webpack/discord.min.js", "dependencies": { + "async": "^2.6.1", "form-data": "^2.3.2", "node-fetch": "^2.1.2", "pako": "^1.0.0", diff --git a/src/rest/APIRequest.js b/src/rest/APIRequest.js index ec8375dd1..456892db2 100644 --- a/src/rest/APIRequest.js +++ b/src/rest/APIRequest.js @@ -24,7 +24,7 @@ class APIRequest { const url = API + this.path; let headers = {}; - if (this.options.auth !== false) headers.Authorization = this.rest.getAuth(); + if (this.options.auth !== false) headers.Authorization = this.rest.auth; if (this.options.reason) headers['X-Audit-Log-Reason'] = encodeURIComponent(this.options.reason); if (!browser) headers['User-Agent'] = UserAgent; if (this.options.headers) headers = Object.assign(headers, this.options.headers); diff --git a/src/rest/RESTManager.js b/src/rest/RESTManager.js index 586125a54..b9e5b940e 100644 --- a/src/rest/RESTManager.js +++ b/src/rest/RESTManager.js @@ -1,82 +1,104 @@ -const handlers = require('./handlers'); const APIRequest = require('./APIRequest'); const routeBuilder = require('./APIRouter'); +const RequestBucket = require('./RequestBucket'); const { Error } = require('../errors'); const { Endpoints } = require('../util/Constants'); const Collection = require('../util/Collection'); +/** + * Rest manager. + * @private + */ class RESTManager { constructor(client, tokenPrefix = 'Bot') { + /** + * @type {Client} + */ this.client = client; - this.handlers = new Collection(); + + /** + * Request buckets, mapped by bucket + * @type {Collection} + */ + this.buckets = new Collection(); + + /** + * Whether we're globally limited + * @type {boolean} + */ this.globallyRateLimited = false; + + /** + * The token prefix to use when generating authorization headers + * @type {string} + */ this.tokenPrefix = tokenPrefix; + + /** + * Whether to use a versioned endpoint. Default to true. + * @type {boolean} + */ this.versioned = true; - this.timeDifferences = []; + if (client.options.restSweepInterval > 0) { client.setInterval(() => { - this.handlers.sweep(handler => handler._inactive); + this.buckets.sweep(handler => handler.queue.idle()); }, client.options.restSweepInterval * 1000); } } + + /** + * The authorization header to use + * @readonly + * @type {string} + */ + get auth() { + const token = this.client.token || this.client.accessToken; + const prefixed = !!this.client.application || (this.client.user && this.client.user.bot); + if (token) { + if (prefixed) return `${this.tokenPrefix} ${token}`; + return token; + } else { + throw new Error('TOKEN_MISSING'); + } + } + + /** + * Make a new API router + * @readonly + * @type {APIRouter} + */ get api() { return routeBuilder(this); } - get timeDifference() { - return Math.round(this.timeDifferences.reduce((a, b) => a + b, 0) / this.timeDifferences.length); - } - - set timeDifference(ms) { - this.timeDifferences.unshift(ms); - if (this.timeDifferences.length > 5) this.timeDifferences.length = 5; - } - - getAuth() { - const token = this.client.token || this.client.accessToken; - const prefixed = !!this.client.application || (this.client.user && this.client.user.bot); - if (token && prefixed) return `${this.tokenPrefix} ${token}`; - else if (token) return token; - throw new Error('TOKEN_MISSING'); - } - + /** + * The CDN endpoint + * @readonly + * @type {string} + */ get cdn() { return Endpoints.CDN(this.client.options.http.cdn); } - push(handler, apiRequest) { - return new Promise((resolve, reject) => { - handler.push({ - request: apiRequest, - resolve, - reject, - }); - }); - } - - getRequestHandler() { - const method = this.client.options.apiRequestMethod; - if (typeof method === 'function') return method; - const handler = handlers[method]; - if (!handler) throw new Error('RATELIMIT_INVALID_METHOD'); - return handler; - } - + /** + * Make a request + * @param {string} method The HTTP verb to use + * @param {string} url The Discord URL + * @param {*} options Options to send + * @returns {Promise} + */ request(method, url, options = {}) { - const apiRequest = new APIRequest(this, method, url, options); - let handler = this.handlers.get(apiRequest.route); + const req = new APIRequest(this, method, url, options); + let bucket = this.buckets.get(req.route); - if (!handler) { - handler = new handlers.RequestHandler(this, this.getRequestHandler()); - this.handlers.set(apiRequest.route, handler); + if (!bucket) { + bucket = new RequestBucket(this, req.route); + this.buckets.set(req.route, bucket); } - return this.push(handler, apiRequest); - } - - set endpoint(endpoint) { - this.client.options.http.api = endpoint; + return bucket.enqueue(req); } } diff --git a/src/rest/RequestBucket.js b/src/rest/RequestBucket.js new file mode 100644 index 000000000..a44d015e6 --- /dev/null +++ b/src/rest/RequestBucket.js @@ -0,0 +1,210 @@ +const queue = require('async/queue'); +const DiscordAPIError = require('./DiscordAPIError'); +const { Events: { RATE_LIMIT } } = require('../util/Constants'); + +/** + * A request bucket + * @private + */ +class RequestBucket { + constructor(manager, route) { + /** + * @type {RESTManager} + */ + this.manager = manager; + + /** + * The route that this bucket is controlling + * @type {string} + */ + this.route = route; + + /** + * @type {Client} + */ + this.client = this.manager.client; + + /** + * How many requests are allowed in this bucket + * @type {number} + */ + this.limit = Infinity; + + /** + * The number of requests remaining in this bucket + * @type {number} + */ + this.remaining = -1; + + /** + * The async queue + * @type {QueueObject} + */ + this.queue = queue(this.handle.bind(this), this.client.options.restConcurrency || 1); + + /** + * How long to wait before sending next request. + * @type {number} + */ + this.timeout = 0; + + /** + * The current ratelimiting resume timer + * @type {?Timer} + * @private + */ + this._timer = null; + + /** + * Time at which this bucket resets + * @type {number} + * @private + */ + this._resetTime = 0; + + /** + * Current request sequence + * @type {number} + * @private + */ + this._seq = 0; + + /** + * The seq of the most recent request, negative if unknown + * @type {number} + * @private + */ + this._latest = 0; + } + + /** + * Whether this bucket is currently ratelimited + * @type {boolean} + * @readonly + */ + get limited() { + return this.manager.globallyRateLimited || + (this.remaining - this.queue.running()) <= 0 || + this.resetTime > Date.now(); + } + + /** + * The timestamp at which this bucket's ratelimiting will reset + * @type {number} + */ + get resetTime() { + return this._reset; + } + + set resetTime(time) { + if (time > this._reset) this._reset = time; + } + + /** + * Make a request in this bucket + * @param {APIRequest} request The request to make + * @returns {Promise<*>} + */ + enqueue(request) { + return new Promise((resolve, reject) => { + if (request.seq === undefined) request.seq = this._seq++; + + this.queue.push(request, (err, res) => { + if (err) return reject(err); + + if (res.ok) return res.json().then(resolve, reject); + return res.json().then(data => { + reject(res.status >= 400 && res.status < 500 ? + new DiscordAPIError(request.path, data, request.method) : res); + }, reject); + }); + }); + } + + /* eslint-disable callback-return */ + /** + * Handle a request item + * @param {APIRequest} request The item to execute + * @param {Function} cb A callback to indicate when this item is processed + */ + handle(request, cb) { + if (this.limited && !this.queue.paused) this.queue.pause(); + + request.make().then(res => { + // Response handling + let timeout = 0; + if (res.status === 429) { + this.queue.unshift(request, cb); + this.client.emit('debug', `Exceeded ratelimit for bucket "${this.route}" (${request.method} ${res.url})`); + timeout = Number(res.headers.get('retry-after')); + } else if (res.status >= 500 && res.status < 600) { + if (request.retried) { + cb(res); + } else { + request.retried = true; + this.queue.unshift(request, cb); + timeout = 1e3; + } + } else { + cb(null, res); + } + + // Header parsing + const date = new Date(res.headers.get('date')).valueOf(); + this.manager.globallyRateLimited = Boolean(res.headers.get('x-ratelimit-global')); + this.limit = Number(res.headers.get('x-ratelimit-limit') || Infinity); + this.timeout = (Number(res.headers.get('x-ratelimit-reset')) * 1e3 || date) - date; + this.resetTime = Date.now() + this.timeout; + + const remaining = Number(res.headers.get('x-ratelimit-remaining')); + if (remaining < this.remaining || this.remaining < 0) this.remaining = remaining; + + // If this is the newest response, control queue flow based on ratelimiting + if (request.seq >= this._latest) { + if (this.limited && !this.queue.paused) this.queue.pause(); + else if (!this.limited && this.queue.paused) this.queue.resume(); + this._latest = request.seq; + } + + // Ratelimit handling + if (this.limited) { + if (!timeout) timeout = this.timeout; + timeout += this.client.options.restTimeOffset; + + /** + * Emitted when the client hits a rate limit while making a request + * @event Client#rateLimit + * @param {Object} rateLimitInfo Object containing the rate limit info + * @param {number} rateLimitInfo.timeout Timeout in ms + * @param {number} rateLimitInfo.limit Number of requests that can be made to this endpoint + * @param {string} rateLimitInfo.method HTTP method used for request that triggered this event + * @param {string} rateLimitInfo.path Path used for request that triggered this event + * @param {string} rateLimitInfo.route Route used for request that triggered this event + */ + this.client.emit(RATE_LIMIT, { + timeout, + limit: this.limit, + method: request.method, + path: request.path, + route: request.route, + }); + + // NOTE: Use Timer#refresh (if the timeout is the same) when targeting Node 10 + if (this._timer) clearTimeout(this._timer); + this._timer = this.client.setTimeout(() => { + this._timer = null; + this.reset(); + this.queue.resume(); + }, timeout); + } + }, cb); + } + /* eslint-enable callback-return */ + + reset() { + this.manager.globallyRateLimited = false; + this.remaining = this.limit; + } +} + +module.exports = RequestBucket; diff --git a/src/rest/handlers/RequestHandler.js b/src/rest/handlers/RequestHandler.js deleted file mode 100644 index 47abd9580..000000000 --- a/src/rest/handlers/RequestHandler.js +++ /dev/null @@ -1,114 +0,0 @@ -const DiscordAPIError = require('../DiscordAPIError'); -const { Events: { RATE_LIMIT } } = require('../../util/Constants'); - -class RequestHandler { - constructor(manager, handler) { - this.manager = manager; - this.client = this.manager.client; - this.handle = handler.bind(this); - this.limit = Infinity; - this.resetTime = null; - this.remaining = 1; - - this.queue = []; - } - - get limited() { - return (this.manager.globallyRateLimited || this.remaining <= 0) && Date.now() < this.resetTime; - } - - push(request) { - this.queue.push(request); - this.handle(); - } - - get _inactive() { - return this.queue.length === 0 && !this.limited && Date.now() > this.resetTime && this.busy !== true; - } - - /* eslint-disable prefer-promise-reject-errors */ - execute(item) { - return new Promise((resolve, reject) => { - const finish = timeout => { - if (timeout || this.limited) { - if (!timeout) { - timeout = this.resetTime - Date.now() + this.client.options.restTimeOffset; - } - if (!this.manager.globalTimeout && this.manager.globallyRateLimited) { - this.manager.globalTimeout = setTimeout(() => { - this.manager.globalTimeout = undefined; - this.manager.globallyRateLimited = false; - this.busy = false; - this.handle(); - }, timeout); - reject({ }); - } else { - reject({ timeout }); - } - if (this.client.listenerCount(RATE_LIMIT)) { - /** - * Emitted when the client hits a rate limit while making a request - * @event Client#rateLimit - * @param {Object} rateLimitInfo Object containing the rate limit info - * @param {number} rateLimitInfo.timeout Timeout in ms - * @param {number} rateLimitInfo.limit Number of requests that can be made to this endpoint - * @param {string} rateLimitInfo.method HTTP method used for request that triggered this event - * @param {string} rateLimitInfo.path Path used for request that triggered this event - * @param {string} rateLimitInfo.route Route used for request that triggered this event - */ - this.client.emit(RATE_LIMIT, { - timeout, - limit: this.limit, - method: item.request.method, - path: item.request.path, - route: item.request.route, - }); - } - } else { - resolve(); - } - }; - item.request.make().then(res => { - if (res && res.headers) { - if (res.headers['x-ratelimit-global']) this.manager.globallyRateLimited = true; - this.limit = Number(res.headers['x-ratelimit-limit']); - this.resetTime = Date.now() + Number(res.headers['retry-after']); - this.remaining = Number(res.headers['x-ratelimit-remaining']); - } - - if (res.ok) { - res.json().then(item.resolve, item.reject); - finish(); - return; - } - - if (res.status === 429) { - this.queue.unshift(item); - finish(Number(res.headers['retry-after']) + this.client.options.restTimeOffset); - } else if (res.status >= 500 && res.status < 600) { - if (item.retried) { - item.reject(res); - finish(); - } else { - item.retried = true; - this.queue.unshift(item); - finish(1e3 + this.client.options.restTimeOffset); - } - } else { - res.json().then(data => { - item.reject(res.status >= 400 && res.status < 500 ? - new DiscordAPIError(item.path, data, item.method) : res); - }, item.reject); - finish(); - } - }); - }); - } - - reset() { - this.manager.globallyRateLimited = false; - this.remaining = 1; - } -} - -module.exports = RequestHandler; diff --git a/src/rest/handlers/burst.js b/src/rest/handlers/burst.js deleted file mode 100644 index 54e4600ff..000000000 --- a/src/rest/handlers/burst.js +++ /dev/null @@ -1,15 +0,0 @@ -module.exports = function burst() { - if (this.limited || this.queue.length === 0) return; - this.execute(this.queue.shift()) - .then(this.handle.bind(this)) - .catch(({ timeout }) => { - if (timeout) { - this.client.setTimeout(() => { - this.reset(); - this.handle(); - }, timeout); - } - }); - this.remaining--; - this.handle(); -}; diff --git a/src/rest/handlers/index.js b/src/rest/handlers/index.js deleted file mode 100644 index 47792c46c..000000000 --- a/src/rest/handlers/index.js +++ /dev/null @@ -1,5 +0,0 @@ -module.exports = { - sequential: require('./sequential'), - burst: require('./burst'), - RequestHandler: require('./RequestHandler'), -}; diff --git a/src/rest/handlers/sequential.js b/src/rest/handlers/sequential.js deleted file mode 100644 index 71020282d..000000000 --- a/src/rest/handlers/sequential.js +++ /dev/null @@ -1,18 +0,0 @@ -module.exports = function sequential() { - if (this.busy || this.limited || this.queue.length === 0) return; - this.busy = true; - this.execute(this.queue.shift()) - .then(() => { - this.busy = false; - this.handle(); - }) - .catch(({ timeout }) => { - if (timeout) { - this.client.setTimeout(() => { - this.reset(); - this.busy = false; - this.handle(); - }, timeout); - } - }); -}; diff --git a/src/util/Constants.js b/src/util/Constants.js index 729448d7a..55e3597fb 100644 --- a/src/util/Constants.js +++ b/src/util/Constants.js @@ -5,10 +5,6 @@ const browser = exports.browser = typeof window !== 'undefined'; /** * Options for a client. * @typedef {Object} ClientOptions - * @property {string} [apiRequestMethod='sequential'] One of `sequential` or `burst`. The sequential handler executes - * all requests in the order they are triggered, whereas the burst handler runs multiple in parallel, and doesn't - * provide the guarantee of any particular order. Burst mode is more likely to hit a 429 ratelimit error by its nature, - * and is therefore slightly riskier to use. * @property {number} [shardId=0] ID of the shard to run * @property {number} [shardCount=0] Total number of shards * @property {number} [messageCacheMaxSize=200] Maximum number of messages to cache per channel @@ -28,6 +24,9 @@ const browser = exports.browser = typeof window !== 'undefined'; * requests (higher values will reduce rate-limiting errors on bad connections) * @property {number} [restSweepInterval=60] How frequently to delete inactive request buckets, in seconds * (or 0 for never) + * @property {number} [restConcurrency=1] The number REST calls to execute concurrently. Increasing this value above + * 1 will not guarantee delivery order but may result in increased performance. There is greater risk of hitting + * ratelimits when using concurrency above 1. * @property {PresenceData} [presence] Presence data to use upon login * @property {WSEventType[]} [disabledEvents] An array of disabled websocket events. Events in this array will not be * processed, potentially resulting in performance improvements for larger bots. Only disable events you are @@ -37,7 +36,6 @@ const browser = exports.browser = typeof window !== 'undefined'; * @property {HTTPOptions} [http] HTTP options */ exports.DefaultOptions = { - apiRequestMethod: 'sequential', shardId: 0, shardCount: 0, internalSharding: false, @@ -51,6 +49,7 @@ exports.DefaultOptions = { disabledEvents: [], restTimeOffset: 500, restSweepInterval: 60, + restConcurrency: 1, presence: {}, /** diff --git a/test/tester1000.js b/test/tester1000.js index 99209c71a..a378d2989 100644 --- a/test/tester1000.js +++ b/test/tester1000.js @@ -4,13 +4,14 @@ const { token, prefix, owner } = require('./auth.js'); // eslint-disable-next-line no-console const log = (...args) => console.log(process.uptime().toFixed(3), ...args); -const client = new Discord.Client(); +const client = new Discord.Client({ apiRequestConcurrency: Infinity, restTimeOffset: 0 }); client.on('debug', log); client.on('ready', () => { log('READY', client.user.tag, client.user.id); }); -client.on('rateLimit', log); +client.on('rateLimit', info => log(`ratelimited for ${info.timeout} ms`)); +client.on('error', log); const commands = { eval: message => { @@ -24,9 +25,21 @@ const commands = { console.error(err.stack); res = err.message; } - message.channel.send(res, { code: 'js' }); + if (res.length > 6000) { + message.channel.send('response too long; check console'); + // eslint-disable-next-line no-console + console.log(res); + } else { + message.channel.send(res, { code: 'js', split: true }); + } }, ping: message => message.reply('pong'), + spam: async message => { + const start = Date.now(); + await Promise.all(Array.from({ length: 10 }, (_, i) => message.channel.send(`spam${i}`))); + const diff = Date.now() - start; + message.channel.send(`total time: \`${diff}ms\``); + }, }; client.on('message', message => {