feat: @discordjs/brokers (#8548)

This commit is contained in:
DD
2022-10-13 23:20:36 +03:00
committed by GitHub
parent 179392d6d7
commit bf9aa1858d
32 changed files with 1210 additions and 14 deletions

View File

@@ -0,0 +1,86 @@
import { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { encode, decode } from '@msgpack/msgpack';
import type { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
/**
* Base options for a broker implementation
*/
export interface BaseBrokerOptions {
/**
* How long to block for messages when polling
*/
blockTimeout?: number;
/**
* Function to use for decoding messages
*/
// eslint-disable-next-line @typescript-eslint/method-signature-style
decode?: (data: Buffer) => unknown;
/**
* Function to use for encoding messages
*/
// eslint-disable-next-line @typescript-eslint/method-signature-style
encode?: (data: unknown) => Buffer;
/**
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* Unique consumer name. See: https://redis.io/commands/xreadgroup/
*/
name?: string;
}
/**
* Default broker options
*/
export const DefaultBrokerOptions: Required<BaseBrokerOptions> = {
name: randomBytes(20).toString('hex'),
maxChunk: 10,
blockTimeout: 5_000,
encode: (data): Buffer => {
const encoded = encode(data);
return Buffer.from(encoded.buffer, encoded.byteOffset, encoded.byteLength);
},
decode: (data): unknown => decode(data),
};
export type ToEventMap<
TRecord extends Record<string, any>,
TResponses extends Record<keyof TRecord, any> | undefined = undefined,
> = {
[TKey in keyof TRecord]: [
event: TResponses extends Record<keyof TRecord, any>
? { ack(): Promise<void>; reply(data: TResponses[TKey]): Promise<void> }
: { ack(): Promise<void> } & { data: TRecord[TKey] },
];
} & { [K: string]: any };
export interface IBaseBroker<TEvents extends Record<string, any>> {
/**
* Subscribes to the given events, grouping them by the given group name
*/
subscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
/**
* Unsubscribes from the given events - it's required to pass the same group name as when subscribing for proper cleanup
*/
unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void>;
}
export interface IPubSubBroker<TEvents extends Record<string, any>>
extends IBaseBroker<TEvents>,
AsyncEventEmitter<ToEventMap<TEvents>> {
/**
* Publishes an event
*/
publish<T extends keyof TEvents>(event: T, data: TEvents[T]): Promise<void>;
}
export interface IRPCBroker<TEvents extends Record<string, any>, TResponses extends Record<keyof TEvents, any>>
extends IBaseBroker<TEvents>,
AsyncEventEmitter<ToEventMap<TEvents, TResponses>> {
/**
* Makes an RPC call
*/
call<T extends keyof TEvents>(event: T, data: TEvents[T], timeoutDuration?: number): Promise<TResponses[T]>;
}

View File

@@ -0,0 +1,172 @@
import type { Buffer } from 'node:buffer';
import { readFileSync } from 'node:fs';
import { resolve } from 'node:path';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
import type { Redis } from 'ioredis';
import { ReplyError } from 'ioredis';
import type { BaseBrokerOptions, IBaseBroker, ToEventMap } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
// For some reason ioredis doesn't have this typed, but it exists
declare module 'ioredis' {
interface Redis {
xreadgroupBuffer(...args: (Buffer | string)[]): Promise<[Buffer, [Buffer, Buffer[]][]][] | null>;
}
}
/**
* Options specific for a Redis broker
*/
export interface RedisBrokerOptions extends BaseBrokerOptions {
/**
* The Redis client to use
*/
redisClient: Redis;
}
/**
* Helper class with shared Redis logic
*/
export abstract class BaseRedisBroker<TEvents extends Record<string, any>>
extends AsyncEventEmitter<ToEventMap<TEvents>>
implements IBaseBroker<TEvents>
{
/**
* Used for Redis queues, see the 3rd argument taken by {@link https://redis.io/commands/xadd | xadd }
*/
public static readonly STREAM_DATA_KEY = 'data';
/**
* Options this broker is using
*/
protected readonly options: Required<RedisBrokerOptions>;
/**
* Events this broker has subscribed to
*/
protected readonly subscribedEvents = new Set<string>();
/**
* Internal copy of the Redis client being used to read incoming payloads
*/
protected readonly streamReadClient: Redis;
/**
* Whether this broker is currently polling events
*/
protected listening = false;
public constructor(options: RedisBrokerOptions) {
super();
this.options = { ...DefaultBrokerOptions, ...options };
options.redisClient.defineCommand('xcleangroup', {
numberOfKeys: 1,
lua: readFileSync(resolve(__dirname, '..', '..', '..', 'scripts', 'xcleangroup.lua'), 'utf8'),
});
this.streamReadClient = options.redisClient.duplicate();
}
/**
* {@inheritDoc IBaseBroker.subscribe}
*/
public async subscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
await Promise.all(
// eslint-disable-next-line consistent-return
events.map(async (event) => {
this.subscribedEvents.add(event as string);
try {
return await this.options.redisClient.xgroup('CREATE', event as string, group, 0, 'MKSTREAM');
} catch (error) {
if (!(error instanceof ReplyError)) {
throw error;
}
}
}),
);
void this.listen(group);
}
/**
* {@inheritDoc IBaseBroker.unsubscribe}
*/
public async unsubscribe(group: string, events: (keyof TEvents)[]): Promise<void> {
const commands: unknown[][] = Array.from({ length: events.length * 2 });
for (let idx = 0; idx < commands.length; idx += 2) {
const event = events[idx / 2];
commands[idx] = ['xgroup', 'delconsumer', event as string, group, this.options.name];
commands[idx + 1] = ['xcleangroup', event as string, group];
}
await this.options.redisClient.pipeline(commands).exec();
for (const event of events) {
this.subscribedEvents.delete(event as string);
}
}
/**
* Begins polling for events, firing them to {@link BaseRedisBroker.listen}
*/
protected async listen(group: string): Promise<void> {
if (this.listening) {
return;
}
this.listening = true;
// eslint-disable-next-line @typescript-eslint/no-unnecessary-condition
while (true) {
try {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
'BLOCK',
String(this.options.blockTimeout),
'STREAMS',
...this.subscribedEvents,
...Array.from({ length: this.subscribedEvents.size }, () => '>'),
);
if (!data) {
continue;
}
for (const [event, info] of data) {
for (const [id, packet] of info) {
const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) {
continue;
}
const data = packet[idx + 1];
if (!data) {
continue;
}
this.emitEvent(id, group, event.toString('utf8'), this.options.decode(data));
}
}
} catch (error) {
this.emit('error', error);
break;
}
}
this.listening = false;
}
/**
* Destroys the broker, closing all connections
*/
public async destroy() {
this.streamReadClient.disconnect();
this.options.redisClient.disconnect();
}
/**
* Handles an incoming Redis event
*/
protected abstract emitEvent(id: Buffer, group: string, event: string, data: unknown): unknown;
}

View File

@@ -0,0 +1,58 @@
import type { Buffer } from 'node:buffer';
import type { IPubSubBroker } from '../Broker.js';
import { BaseRedisBroker } from './BaseRedis.js';
/**
* PubSub broker powered by Redis
*
* @example
* ```ts
* // publisher.js
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
*
* await broker.publish('test', 'Hello World!');
* await broker.destroy();
*
* // subscriber.js
* import { PubSubRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new PubSubRedisBroker({ redisClient: new Redis() });
* broker.on('test', ({ data, ack }) => {
* console.log(data);
* void ack();
* });
*
* await broker.subscribe('subscribers', ['test']);
* ```
*/
export class PubSubRedisBroker<TEvents extends Record<string, any>>
extends BaseRedisBroker<TEvents>
implements IPubSubBroker<TEvents>
{
/**
* {@inheritDoc IPubSubBroker.publish}
*/
public async publish<T extends keyof TEvents>(event: T, data: TEvents[T]): Promise<void> {
await this.options.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
this.options.encode(data),
);
}
protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
const payload: { ack(): Promise<void>; data: unknown } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
},
};
this.emit(event, payload);
}
}

View File

@@ -0,0 +1,130 @@
import type { Buffer } from 'node:buffer';
import { clearTimeout, setTimeout } from 'node:timers';
import type { IRPCBroker } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
import type { RedisBrokerOptions } from './BaseRedis.js';
import { BaseRedisBroker } from './BaseRedis.js';
interface InternalPromise {
reject(error: any): void;
resolve(data: any): void;
timeout: NodeJS.Timeout;
}
/**
* Options specific for an RPC Redis broker
*/
export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
timeout?: number;
}
/**
* Default values used for the {@link RPCRedisBrokerOptions}
*/
export const DefaultRPCRedisBrokerOptions: Required<Omit<RPCRedisBrokerOptions, 'redisClient'>> = {
...DefaultBrokerOptions,
timeout: 5_000,
};
/**
* RPC broker powered by Redis
*
* @example
* ```ts
* // caller.js
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
*
* console.log(await broker.call('testcall', 'Hello World!'));
* await broker.destroy();
*
* // responder.js
* import { RPCRedisBroker } from '@discordjs/brokers';
* import Redis from 'ioredis';
*
* const broker = new RPCRedisBroker({ redisClient: new Redis() });
* broker.on('testcall', ({ data, ack, reply }) => {
* console.log('responder', data);
* void ack();
* void reply(`Echo: ${data}`);
* });
*
* await broker.subscribe('responders', ['testcall']);
* ```
*/
export class RPCRedisBroker<TEvents extends Record<string, any>, TResponses extends Record<keyof TEvents, any>>
extends BaseRedisBroker<TEvents>
implements IRPCBroker<TEvents, TResponses>
{
/**
* Options this broker is using
*/
protected override readonly options: Required<RPCRedisBrokerOptions>;
protected readonly promises = new Map<string, InternalPromise>();
public constructor(options: RPCRedisBrokerOptions) {
super(options);
this.options = { ...DefaultRPCRedisBrokerOptions, ...options };
this.streamReadClient.on('messageBuffer', (channel: Buffer, message: Buffer) => {
const [, id] = channel.toString().split(':');
if (id && this.promises.has(id)) {
// eslint-disable-next-line @typescript-eslint/unbound-method
const { resolve, timeout } = this.promises.get(id)!;
resolve(this.options.decode(message));
clearTimeout(timeout);
}
});
}
/**
* {@inheritDoc IRPCBroker.call}
*/
public async call<T extends keyof TEvents>(
event: T,
data: TEvents[T],
timeoutDuration: number = this.options.timeout,
): Promise<TResponses[T]> {
const id = await this.options.redisClient.xadd(
event as string,
'*',
BaseRedisBroker.STREAM_DATA_KEY,
this.options.encode(data),
);
// This id! assertion is valid. From redis docs:
// "The command returns a Null reply when used with the NOMKSTREAM option and the key doesn't exist."
// See: https://redis.io/commands/xadd/
const rpcChannel = `${event as string}:${id!}`;
// Construct the error here for better stack traces
const timedOut = new Error(`timed out after ${timeoutDuration}ms`);
await this.streamReadClient.subscribe(rpcChannel);
return new Promise<TResponses[T]>((resolve, reject) => {
const timeout = setTimeout(() => reject(timedOut), timeoutDuration).unref();
this.promises.set(id!, { resolve, reject, timeout });
// eslint-disable-next-line promise/prefer-await-to-then
}).finally(() => {
void this.streamReadClient.unsubscribe(rpcChannel);
this.promises.delete(id!);
});
}
protected emitEvent(id: Buffer, group: string, event: string, data: unknown) {
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
data,
ack: async () => {
await this.options.redisClient.xack(event, group, id);
},
reply: async (data) => {
await this.options.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));
},
};
this.emit(event, payload);
}
}

View File

@@ -0,0 +1,5 @@
export * from './brokers/redis/BaseRedis.js';
export * from './brokers/redis/PubSubRedis.js';
export * from './brokers/redis/RPCRedis.js';
export * from './brokers/Broker.js';