rework concurrency

This commit is contained in:
Will Nelson
2018-06-13 19:42:43 -07:00
parent e66a798634
commit ef6aa2697e
10 changed files with 303 additions and 210 deletions

View File

@@ -34,6 +34,7 @@
"runkitExampleFilename": "./docs/examples/ping.js",
"unpkg": "./webpack/discord.min.js",
"dependencies": {
"async": "^2.6.1",
"form-data": "^2.3.2",
"node-fetch": "^2.1.2",
"pako": "^1.0.0",

View File

@@ -24,7 +24,7 @@ class APIRequest {
const url = API + this.path;
let headers = {};
if (this.options.auth !== false) headers.Authorization = this.rest.getAuth();
if (this.options.auth !== false) headers.Authorization = this.rest.auth;
if (this.options.reason) headers['X-Audit-Log-Reason'] = encodeURIComponent(this.options.reason);
if (!browser) headers['User-Agent'] = UserAgent;
if (this.options.headers) headers = Object.assign(headers, this.options.headers);

View File

@@ -1,82 +1,104 @@
const handlers = require('./handlers');
const APIRequest = require('./APIRequest');
const routeBuilder = require('./APIRouter');
const RequestBucket = require('./RequestBucket');
const { Error } = require('../errors');
const { Endpoints } = require('../util/Constants');
const Collection = require('../util/Collection');
/**
* Rest manager.
* @private
*/
class RESTManager {
constructor(client, tokenPrefix = 'Bot') {
/**
* @type {Client}
*/
this.client = client;
this.handlers = new Collection();
/**
* Request buckets, mapped by bucket
* @type {Collection<string, RequestHandler>}
*/
this.buckets = new Collection();
/**
* Whether we're globally limited
* @type {boolean}
*/
this.globallyRateLimited = false;
/**
* The token prefix to use when generating authorization headers
* @type {string}
*/
this.tokenPrefix = tokenPrefix;
/**
* Whether to use a versioned endpoint. Default to true.
* @type {boolean}
*/
this.versioned = true;
this.timeDifferences = [];
if (client.options.restSweepInterval > 0) {
client.setInterval(() => {
this.handlers.sweep(handler => handler._inactive);
this.buckets.sweep(handler => handler.queue.idle());
}, client.options.restSweepInterval * 1000);
}
}
/**
* The authorization header to use
* @readonly
* @type {string}
*/
get auth() {
const token = this.client.token || this.client.accessToken;
const prefixed = !!this.client.application || (this.client.user && this.client.user.bot);
if (token) {
if (prefixed) return `${this.tokenPrefix} ${token}`;
return token;
} else {
throw new Error('TOKEN_MISSING');
}
}
/**
* Make a new API router
* @readonly
* @type {APIRouter}
*/
get api() {
return routeBuilder(this);
}
get timeDifference() {
return Math.round(this.timeDifferences.reduce((a, b) => a + b, 0) / this.timeDifferences.length);
}
set timeDifference(ms) {
this.timeDifferences.unshift(ms);
if (this.timeDifferences.length > 5) this.timeDifferences.length = 5;
}
getAuth() {
const token = this.client.token || this.client.accessToken;
const prefixed = !!this.client.application || (this.client.user && this.client.user.bot);
if (token && prefixed) return `${this.tokenPrefix} ${token}`;
else if (token) return token;
throw new Error('TOKEN_MISSING');
}
/**
* The CDN endpoint
* @readonly
* @type {string}
*/
get cdn() {
return Endpoints.CDN(this.client.options.http.cdn);
}
push(handler, apiRequest) {
return new Promise((resolve, reject) => {
handler.push({
request: apiRequest,
resolve,
reject,
});
});
}
getRequestHandler() {
const method = this.client.options.apiRequestMethod;
if (typeof method === 'function') return method;
const handler = handlers[method];
if (!handler) throw new Error('RATELIMIT_INVALID_METHOD');
return handler;
}
/**
* Make a request
* @param {string} method The HTTP verb to use
* @param {string} url The Discord URL
* @param {*} options Options to send
* @returns {Promise<any>}
*/
request(method, url, options = {}) {
const apiRequest = new APIRequest(this, method, url, options);
let handler = this.handlers.get(apiRequest.route);
const req = new APIRequest(this, method, url, options);
let bucket = this.buckets.get(req.route);
if (!handler) {
handler = new handlers.RequestHandler(this, this.getRequestHandler());
this.handlers.set(apiRequest.route, handler);
if (!bucket) {
bucket = new RequestBucket(this, req.route);
this.buckets.set(req.route, bucket);
}
return this.push(handler, apiRequest);
}
set endpoint(endpoint) {
this.client.options.http.api = endpoint;
return bucket.enqueue(req);
}
}

210
src/rest/RequestBucket.js Normal file
View File

@@ -0,0 +1,210 @@
const queue = require('async/queue');
const DiscordAPIError = require('./DiscordAPIError');
const { Events: { RATE_LIMIT } } = require('../util/Constants');
/**
* A request bucket
* @private
*/
class RequestBucket {
constructor(manager, route) {
/**
* @type {RESTManager}
*/
this.manager = manager;
/**
* The route that this bucket is controlling
* @type {string}
*/
this.route = route;
/**
* @type {Client}
*/
this.client = this.manager.client;
/**
* How many requests are allowed in this bucket
* @type {number}
*/
this.limit = Infinity;
/**
* The number of requests remaining in this bucket
* @type {number}
*/
this.remaining = -1;
/**
* The async queue
* @type {QueueObject}
*/
this.queue = queue(this.handle.bind(this), this.client.options.restConcurrency || 1);
/**
* How long to wait before sending next request.
* @type {number}
*/
this.timeout = 0;
/**
* The current ratelimiting resume timer
* @type {?Timer}
* @private
*/
this._timer = null;
/**
* Time at which this bucket resets
* @type {number}
* @private
*/
this._resetTime = 0;
/**
* Current request sequence
* @type {number}
* @private
*/
this._seq = 0;
/**
* The seq of the most recent request, negative if unknown
* @type {number}
* @private
*/
this._latest = 0;
}
/**
* Whether this bucket is currently ratelimited
* @type {boolean}
* @readonly
*/
get limited() {
return this.manager.globallyRateLimited ||
(this.remaining - this.queue.running()) <= 0 ||
this.resetTime > Date.now();
}
/**
* The timestamp at which this bucket's ratelimiting will reset
* @type {number}
*/
get resetTime() {
return this._reset;
}
set resetTime(time) {
if (time > this._reset) this._reset = time;
}
/**
* Make a request in this bucket
* @param {APIRequest} request The request to make
* @returns {Promise<*>}
*/
enqueue(request) {
return new Promise((resolve, reject) => {
if (request.seq === undefined) request.seq = this._seq++;
this.queue.push(request, (err, res) => {
if (err) return reject(err);
if (res.ok) return res.json().then(resolve, reject);
return res.json().then(data => {
reject(res.status >= 400 && res.status < 500 ?
new DiscordAPIError(request.path, data, request.method) : res);
}, reject);
});
});
}
/* eslint-disable callback-return */
/**
* Handle a request item
* @param {APIRequest} request The item to execute
* @param {Function} cb A callback to indicate when this item is processed
*/
handle(request, cb) {
if (this.limited && !this.queue.paused) this.queue.pause();
request.make().then(res => {
// Response handling
let timeout = 0;
if (res.status === 429) {
this.queue.unshift(request, cb);
this.client.emit('debug', `Exceeded ratelimit for bucket "${this.route}" (${request.method} ${res.url})`);
timeout = Number(res.headers.get('retry-after'));
} else if (res.status >= 500 && res.status < 600) {
if (request.retried) {
cb(res);
} else {
request.retried = true;
this.queue.unshift(request, cb);
timeout = 1e3;
}
} else {
cb(null, res);
}
// Header parsing
const date = new Date(res.headers.get('date')).valueOf();
this.manager.globallyRateLimited = Boolean(res.headers.get('x-ratelimit-global'));
this.limit = Number(res.headers.get('x-ratelimit-limit') || Infinity);
this.timeout = (Number(res.headers.get('x-ratelimit-reset')) * 1e3 || date) - date;
this.resetTime = Date.now() + this.timeout;
const remaining = Number(res.headers.get('x-ratelimit-remaining'));
if (remaining < this.remaining || this.remaining < 0) this.remaining = remaining;
// If this is the newest response, control queue flow based on ratelimiting
if (request.seq >= this._latest) {
if (this.limited && !this.queue.paused) this.queue.pause();
else if (!this.limited && this.queue.paused) this.queue.resume();
this._latest = request.seq;
}
// Ratelimit handling
if (this.limited) {
if (!timeout) timeout = this.timeout;
timeout += this.client.options.restTimeOffset;
/**
* Emitted when the client hits a rate limit while making a request
* @event Client#rateLimit
* @param {Object} rateLimitInfo Object containing the rate limit info
* @param {number} rateLimitInfo.timeout Timeout in ms
* @param {number} rateLimitInfo.limit Number of requests that can be made to this endpoint
* @param {string} rateLimitInfo.method HTTP method used for request that triggered this event
* @param {string} rateLimitInfo.path Path used for request that triggered this event
* @param {string} rateLimitInfo.route Route used for request that triggered this event
*/
this.client.emit(RATE_LIMIT, {
timeout,
limit: this.limit,
method: request.method,
path: request.path,
route: request.route,
});
// NOTE: Use Timer#refresh (if the timeout is the same) when targeting Node 10
if (this._timer) clearTimeout(this._timer);
this._timer = this.client.setTimeout(() => {
this._timer = null;
this.reset();
this.queue.resume();
}, timeout);
}
}, cb);
}
/* eslint-enable callback-return */
reset() {
this.manager.globallyRateLimited = false;
this.remaining = this.limit;
}
}
module.exports = RequestBucket;

View File

@@ -1,114 +0,0 @@
const DiscordAPIError = require('../DiscordAPIError');
const { Events: { RATE_LIMIT } } = require('../../util/Constants');
class RequestHandler {
constructor(manager, handler) {
this.manager = manager;
this.client = this.manager.client;
this.handle = handler.bind(this);
this.limit = Infinity;
this.resetTime = null;
this.remaining = 1;
this.queue = [];
}
get limited() {
return (this.manager.globallyRateLimited || this.remaining <= 0) && Date.now() < this.resetTime;
}
push(request) {
this.queue.push(request);
this.handle();
}
get _inactive() {
return this.queue.length === 0 && !this.limited && Date.now() > this.resetTime && this.busy !== true;
}
/* eslint-disable prefer-promise-reject-errors */
execute(item) {
return new Promise((resolve, reject) => {
const finish = timeout => {
if (timeout || this.limited) {
if (!timeout) {
timeout = this.resetTime - Date.now() + this.client.options.restTimeOffset;
}
if (!this.manager.globalTimeout && this.manager.globallyRateLimited) {
this.manager.globalTimeout = setTimeout(() => {
this.manager.globalTimeout = undefined;
this.manager.globallyRateLimited = false;
this.busy = false;
this.handle();
}, timeout);
reject({ });
} else {
reject({ timeout });
}
if (this.client.listenerCount(RATE_LIMIT)) {
/**
* Emitted when the client hits a rate limit while making a request
* @event Client#rateLimit
* @param {Object} rateLimitInfo Object containing the rate limit info
* @param {number} rateLimitInfo.timeout Timeout in ms
* @param {number} rateLimitInfo.limit Number of requests that can be made to this endpoint
* @param {string} rateLimitInfo.method HTTP method used for request that triggered this event
* @param {string} rateLimitInfo.path Path used for request that triggered this event
* @param {string} rateLimitInfo.route Route used for request that triggered this event
*/
this.client.emit(RATE_LIMIT, {
timeout,
limit: this.limit,
method: item.request.method,
path: item.request.path,
route: item.request.route,
});
}
} else {
resolve();
}
};
item.request.make().then(res => {
if (res && res.headers) {
if (res.headers['x-ratelimit-global']) this.manager.globallyRateLimited = true;
this.limit = Number(res.headers['x-ratelimit-limit']);
this.resetTime = Date.now() + Number(res.headers['retry-after']);
this.remaining = Number(res.headers['x-ratelimit-remaining']);
}
if (res.ok) {
res.json().then(item.resolve, item.reject);
finish();
return;
}
if (res.status === 429) {
this.queue.unshift(item);
finish(Number(res.headers['retry-after']) + this.client.options.restTimeOffset);
} else if (res.status >= 500 && res.status < 600) {
if (item.retried) {
item.reject(res);
finish();
} else {
item.retried = true;
this.queue.unshift(item);
finish(1e3 + this.client.options.restTimeOffset);
}
} else {
res.json().then(data => {
item.reject(res.status >= 400 && res.status < 500 ?
new DiscordAPIError(item.path, data, item.method) : res);
}, item.reject);
finish();
}
});
});
}
reset() {
this.manager.globallyRateLimited = false;
this.remaining = 1;
}
}
module.exports = RequestHandler;

View File

@@ -1,15 +0,0 @@
module.exports = function burst() {
if (this.limited || this.queue.length === 0) return;
this.execute(this.queue.shift())
.then(this.handle.bind(this))
.catch(({ timeout }) => {
if (timeout) {
this.client.setTimeout(() => {
this.reset();
this.handle();
}, timeout);
}
});
this.remaining--;
this.handle();
};

View File

@@ -1,5 +0,0 @@
module.exports = {
sequential: require('./sequential'),
burst: require('./burst'),
RequestHandler: require('./RequestHandler'),
};

View File

@@ -1,18 +0,0 @@
module.exports = function sequential() {
if (this.busy || this.limited || this.queue.length === 0) return;
this.busy = true;
this.execute(this.queue.shift())
.then(() => {
this.busy = false;
this.handle();
})
.catch(({ timeout }) => {
if (timeout) {
this.client.setTimeout(() => {
this.reset();
this.busy = false;
this.handle();
}, timeout);
}
});
};

View File

@@ -5,10 +5,6 @@ const browser = exports.browser = typeof window !== 'undefined';
/**
* Options for a client.
* @typedef {Object} ClientOptions
* @property {string} [apiRequestMethod='sequential'] One of `sequential` or `burst`. The sequential handler executes
* all requests in the order they are triggered, whereas the burst handler runs multiple in parallel, and doesn't
* provide the guarantee of any particular order. Burst mode is more likely to hit a 429 ratelimit error by its nature,
* and is therefore slightly riskier to use.
* @property {number} [shardId=0] ID of the shard to run
* @property {number} [shardCount=0] Total number of shards
* @property {number} [messageCacheMaxSize=200] Maximum number of messages to cache per channel
@@ -28,6 +24,9 @@ const browser = exports.browser = typeof window !== 'undefined';
* requests (higher values will reduce rate-limiting errors on bad connections)
* @property {number} [restSweepInterval=60] How frequently to delete inactive request buckets, in seconds
* (or 0 for never)
* @property {number} [restConcurrency=1] The number REST calls to execute concurrently. Increasing this value above
* 1 will not guarantee delivery order but may result in increased performance. There is greater risk of hitting
* ratelimits when using concurrency above 1.
* @property {PresenceData} [presence] Presence data to use upon login
* @property {WSEventType[]} [disabledEvents] An array of disabled websocket events. Events in this array will not be
* processed, potentially resulting in performance improvements for larger bots. Only disable events you are
@@ -37,7 +36,6 @@ const browser = exports.browser = typeof window !== 'undefined';
* @property {HTTPOptions} [http] HTTP options
*/
exports.DefaultOptions = {
apiRequestMethod: 'sequential',
shardId: 0,
shardCount: 0,
internalSharding: false,
@@ -51,6 +49,7 @@ exports.DefaultOptions = {
disabledEvents: [],
restTimeOffset: 500,
restSweepInterval: 60,
restConcurrency: 1,
presence: {},
/**

View File

@@ -4,13 +4,14 @@ const { token, prefix, owner } = require('./auth.js');
// eslint-disable-next-line no-console
const log = (...args) => console.log(process.uptime().toFixed(3), ...args);
const client = new Discord.Client();
const client = new Discord.Client({ apiRequestConcurrency: Infinity, restTimeOffset: 0 });
client.on('debug', log);
client.on('ready', () => {
log('READY', client.user.tag, client.user.id);
});
client.on('rateLimit', log);
client.on('rateLimit', info => log(`ratelimited for ${info.timeout} ms`));
client.on('error', log);
const commands = {
eval: message => {
@@ -24,9 +25,21 @@ const commands = {
console.error(err.stack);
res = err.message;
}
message.channel.send(res, { code: 'js' });
if (res.length > 6000) {
message.channel.send('response too long; check console');
// eslint-disable-next-line no-console
console.log(res);
} else {
message.channel.send(res, { code: 'js', split: true });
}
},
ping: message => message.reply('pong'),
spam: async message => {
const start = Date.now();
await Promise.all(Array.from({ length: 10 }, (_, i) => message.channel.send(`spam${i}`)));
const diff = Date.now() - start;
message.channel.send(`total time: \`${diff}ms\``);
},
};
client.on('message', message => {