refactor(WebSocketShard): throttling error handling (#9701)

* refactor(WebSocketShard): handle unknown identify errors

* chore: use better abort check

---------

Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
DD
2023-07-12 20:19:11 +03:00
committed by GitHub
parent 7fb91c57f7
commit ceab07bec8
3 changed files with 27 additions and 15 deletions

View File

@@ -25,7 +25,9 @@ export interface IContextFetchingStrategy {
retrieveSessionInfo(shardId: number): Awaitable<SessionInfo | null>; retrieveSessionInfo(shardId: number): Awaitable<SessionInfo | null>;
updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable<void>; updateSessionInfo(shardId: number, sessionInfo: SessionInfo | null): Awaitable<void>;
/** /**
* Resolves once the given shard should be allowed to identify, or rejects if the operation was aborted * Resolves once the given shard should be allowed to identify
* This should correctly handle the signal and reject with an abort error if the operation is aborted.
* Other errors will cause the shard to reconnect.
*/ */
waitForIdentify(shardId: number, signal: AbortSignal): Promise<void>; waitForIdentify(shardId: number, signal: AbortSignal): Promise<void>;
} }

View File

@@ -9,17 +9,13 @@ import {
} from '../sharding/WorkerShardingStrategy.js'; } from '../sharding/WorkerShardingStrategy.js';
import type { FetchingStrategyOptions, IContextFetchingStrategy } from './IContextFetchingStrategy.js'; import type { FetchingStrategyOptions, IContextFetchingStrategy } from './IContextFetchingStrategy.js';
// Because the global types are incomplete for whatever reason
interface PolyFillAbortSignal {
readonly aborted: boolean;
addEventListener(type: 'abort', listener: () => void): void;
removeEventListener(type: 'abort', listener: () => void): void;
}
export class WorkerContextFetchingStrategy implements IContextFetchingStrategy { export class WorkerContextFetchingStrategy implements IContextFetchingStrategy {
private readonly sessionPromises = new Collection<number, (session: SessionInfo | null) => void>(); private readonly sessionPromises = new Collection<number, (session: SessionInfo | null) => void>();
private readonly waitForIdentifyPromises = new Collection<number, { reject(): void; resolve(): void }>(); private readonly waitForIdentifyPromises = new Collection<
number,
{ reject(error: unknown): void; resolve(): void; signal: AbortSignal }
>();
public constructor(public readonly options: FetchingStrategyOptions) { public constructor(public readonly options: FetchingStrategyOptions) {
if (isMainThread) { if (isMainThread) {
@@ -37,7 +33,8 @@ export class WorkerContextFetchingStrategy implements IContextFetchingStrategy {
if (payload.ok) { if (payload.ok) {
promise?.resolve(); promise?.resolve();
} else { } else {
promise?.reject(); // We need to make sure we reject with an abort error
promise?.reject(promise.signal.reason);
} }
this.waitForIdentifyPromises.delete(payload.nonce); this.waitForIdentifyPromises.delete(payload.nonce);
@@ -77,7 +74,7 @@ export class WorkerContextFetchingStrategy implements IContextFetchingStrategy {
}; };
const promise = new Promise<void>((resolve, reject) => const promise = new Promise<void>((resolve, reject) =>
// eslint-disable-next-line no-promise-executor-return // eslint-disable-next-line no-promise-executor-return
this.waitForIdentifyPromises.set(nonce, { resolve, reject }), this.waitForIdentifyPromises.set(nonce, { signal, resolve, reject }),
); );
parentPort!.postMessage(payload); parentPort!.postMessage(payload);
@@ -91,12 +88,12 @@ export class WorkerContextFetchingStrategy implements IContextFetchingStrategy {
parentPort!.postMessage(payload); parentPort!.postMessage(payload);
}; };
(signal as unknown as PolyFillAbortSignal).addEventListener('abort', listener); signal.addEventListener('abort', listener);
try { try {
await promise; await promise;
} finally { } finally {
(signal as unknown as PolyFillAbortSignal).removeEventListener('abort', listener); signal.removeEventListener('abort', listener);
} }
} }
} }

View File

@@ -387,8 +387,21 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
try { try {
await this.strategy.waitForIdentify(this.id, controller.signal); await this.strategy.waitForIdentify(this.id, controller.signal);
} catch { } catch {
this.debug(['Was waiting for an identify, but the shard closed in the meantime']); if (controller.signal.aborted) {
return; this.debug(['Was waiting for an identify, but the shard closed in the meantime']);
return;
}
this.debug([
'IContextFetchingStrategy#waitForIdentify threw an unknown error.',
"If you're using a custom strategy, this is probably nothing to worry about.",
"If you're not, please open an issue on GitHub.",
]);
await this.destroy({
reason: 'Identify throttling logic failed',
recover: WebSocketShardDestroyRecovery.Resume,
});
} finally { } finally {
this.off(WebSocketShardEvents.Closed, closeHandler); this.off(WebSocketShardEvents.Closed, closeHandler);
} }