From d251e065cd224556042d71b8db1d47e668bdde21 Mon Sep 17 00:00:00 2001 From: Denis-Adrian Cristea Date: Sat, 4 Oct 2025 17:20:27 +0300 Subject: [PATCH] feat(RedisBroker): ability to explicitly tell the library to pick a random group (#11002) feat(RedisBroker): randomly pick group via symbol --- .../brokers/src/brokers/redis/BaseRedis.ts | 30 +++++++++++++------ .../brokers/src/brokers/redis/RPCRedis.ts | 2 +- 2 files changed, 22 insertions(+), 10 deletions(-) diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index b1f92b97a..1e262afb1 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -1,4 +1,5 @@ import type { Buffer } from 'node:buffer'; +import { randomBytes } from 'node:crypto'; import { readFileSync } from 'node:fs'; import { resolve } from 'node:path'; import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; @@ -24,6 +25,8 @@ declare module 'ioredis' { } } +export const kUseRandomGroupName = Symbol.for('djs.brokers.useRandomGroupName'); + /** * Options specific for a Redis broker */ @@ -33,11 +36,11 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { */ blockTimeout?: number; /** - * Consumer group name to use for this broker + * Consumer group name to use for this broker. For fanning out events, use {@link kUseRandomGroupName} * * @see {@link https://redis.io/commands/xreadgroup/} */ - group: string; + group: string | typeof kUseRandomGroupName; /** * Max number of messages to poll at once */ @@ -104,6 +107,14 @@ export abstract class BaseRedisBroker< */ protected readonly streamReadClient: Redis; + /** + * The group being used by this broker. + * + * @privateRemarks + * Stored as its own field to do the "use random group" resolution in the constructor. + */ + protected readonly group: string; + /** * Whether this broker is currently polling events */ @@ -115,6 +126,7 @@ export abstract class BaseRedisBroker< ) { super(); this.options = { ...DefaultRedisBrokerOptions, ...options }; + this.group = this.options.group === kUseRandomGroupName ? randomBytes(16).toString('hex') : this.options.group; redisClient.defineCommand('xcleangroup', { numberOfKeys: 1, lua: readFileSync(resolve(__dirname, '..', 'scripts', 'xcleangroup.lua'), 'utf8'), @@ -131,7 +143,7 @@ export abstract class BaseRedisBroker< events.map(async (event) => { this.subscribedEvents.add(event as string); try { - return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM'); + return await this.redisClient.xgroup('CREATE', event as string, this.group, 0, 'MKSTREAM'); } catch (error) { if (!(error instanceof ReplyError)) { throw error; @@ -201,7 +213,7 @@ export abstract class BaseRedisBroker< private async readGroup(fromId: string, block: number): Promise { const data = await this.streamReadClient.xreadgroupBuffer( 'GROUP', - this.options.group, + this.group, this.options.name, 'COUNT', String(this.options.maxChunk), @@ -226,7 +238,7 @@ export abstract class BaseRedisBroker< const payload = packet[idx + 1]; if (!payload) continue; - this.emitEvent(id, this.options.group, eventName, this.options.decode(payload)); + this.emitEvent(id, this.group, eventName, this.options.decode(payload)); } } } @@ -236,7 +248,7 @@ export abstract class BaseRedisBroker< // Get up to N oldest pending messages (note: a pending message is a message that has been read, but never ACKed) const pending = (await this.streamReadClient.xpending( stream, - this.options.group, + this.group, '-', '+', this.options.maxChunk, @@ -251,7 +263,7 @@ export abstract class BaseRedisBroker< if (deliveredTimes > this.options.maxDeliveredTimes) { // This message is dead. It has repeatedly failed being processed by a consumer. - await this.streamReadClient.xdel(stream, this.options.group, id); + await this.streamReadClient.xdel(stream, this.group, id); continue; } @@ -259,7 +271,7 @@ export abstract class BaseRedisBroker< if (consumer !== this.options.name) { const claimed = await this.streamReadClient.xclaimBuffer( stream, - this.options.group, + this.group, this.options.name, Math.max(this.options.messageIdleTime, 1), id, @@ -290,7 +302,7 @@ export abstract class BaseRedisBroker< continue; } - this.emitEvent(msgId, this.options.group, stream, this.options.decode(payload)); + this.emitEvent(msgId, this.group, stream, this.options.decode(payload)); } } } diff --git a/packages/brokers/src/brokers/redis/RPCRedis.ts b/packages/brokers/src/brokers/redis/RPCRedis.ts index 11d11f769..368652d3a 100644 --- a/packages/brokers/src/brokers/redis/RPCRedis.ts +++ b/packages/brokers/src/brokers/redis/RPCRedis.ts @@ -121,7 +121,7 @@ export class RPCRedisBroker, TResponses ex const payload: { ack(): Promise; data: unknown; reply(data: unknown): Promise } = { data, ack: async () => { - await this.redisClient.xack(event, this.options.group, id); + await this.redisClient.xack(event, this.group, id); }, reply: async (data) => { await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));