feat(RedisBroker): ability to explicitly tell the library to pick a random group (#11002)

feat(RedisBroker): randomly pick group via symbol
This commit is contained in:
Denis-Adrian Cristea
2025-10-04 17:20:27 +03:00
committed by GitHub
parent cf89260c98
commit d251e065cd
2 changed files with 22 additions and 10 deletions

View File

@@ -1,4 +1,5 @@
import type { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { readFileSync } from 'node:fs';
import { resolve } from 'node:path';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
@@ -24,6 +25,8 @@ declare module 'ioredis' {
}
}
export const kUseRandomGroupName = Symbol.for('djs.brokers.useRandomGroupName');
/**
* Options specific for a Redis broker
*/
@@ -33,11 +36,11 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
*/
blockTimeout?: number;
/**
* Consumer group name to use for this broker
* Consumer group name to use for this broker. For fanning out events, use {@link kUseRandomGroupName}
*
* @see {@link https://redis.io/commands/xreadgroup/}
*/
group: string;
group: string | typeof kUseRandomGroupName;
/**
* Max number of messages to poll at once
*/
@@ -104,6 +107,14 @@ export abstract class BaseRedisBroker<
*/
protected readonly streamReadClient: Redis;
/**
* The group being used by this broker.
*
* @privateRemarks
* Stored as its own field to do the "use random group" resolution in the constructor.
*/
protected readonly group: string;
/**
* Whether this broker is currently polling events
*/
@@ -115,6 +126,7 @@ export abstract class BaseRedisBroker<
) {
super();
this.options = { ...DefaultRedisBrokerOptions, ...options };
this.group = this.options.group === kUseRandomGroupName ? randomBytes(16).toString('hex') : this.options.group;
redisClient.defineCommand('xcleangroup', {
numberOfKeys: 1,
lua: readFileSync(resolve(__dirname, '..', 'scripts', 'xcleangroup.lua'), 'utf8'),
@@ -131,7 +143,7 @@ export abstract class BaseRedisBroker<
events.map(async (event) => {
this.subscribedEvents.add(event as string);
try {
return await this.redisClient.xgroup('CREATE', event as string, this.options.group, 0, 'MKSTREAM');
return await this.redisClient.xgroup('CREATE', event as string, this.group, 0, 'MKSTREAM');
} catch (error) {
if (!(error instanceof ReplyError)) {
throw error;
@@ -201,7 +213,7 @@ export abstract class BaseRedisBroker<
private async readGroup(fromId: string, block: number): Promise<RedisReadGroupData> {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
this.options.group,
this.group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
@@ -226,7 +238,7 @@ export abstract class BaseRedisBroker<
const payload = packet[idx + 1];
if (!payload) continue;
this.emitEvent(id, this.options.group, eventName, this.options.decode(payload));
this.emitEvent(id, this.group, eventName, this.options.decode(payload));
}
}
}
@@ -236,7 +248,7 @@ export abstract class BaseRedisBroker<
// Get up to N oldest pending messages (note: a pending message is a message that has been read, but never ACKed)
const pending = (await this.streamReadClient.xpending(
stream,
this.options.group,
this.group,
'-',
'+',
this.options.maxChunk,
@@ -251,7 +263,7 @@ export abstract class BaseRedisBroker<
if (deliveredTimes > this.options.maxDeliveredTimes) {
// This message is dead. It has repeatedly failed being processed by a consumer.
await this.streamReadClient.xdel(stream, this.options.group, id);
await this.streamReadClient.xdel(stream, this.group, id);
continue;
}
@@ -259,7 +271,7 @@ export abstract class BaseRedisBroker<
if (consumer !== this.options.name) {
const claimed = await this.streamReadClient.xclaimBuffer(
stream,
this.options.group,
this.group,
this.options.name,
Math.max(this.options.messageIdleTime, 1),
id,
@@ -290,7 +302,7 @@ export abstract class BaseRedisBroker<
continue;
}
this.emitEvent(msgId, this.options.group, stream, this.options.decode(payload));
this.emitEvent(msgId, this.group, stream, this.options.decode(payload));
}
}
}

View File

@@ -121,7 +121,7 @@ export class RPCRedisBroker<TEvents extends Record<string, any[]>, TResponses ex
const payload: { ack(): Promise<void>; data: unknown; reply(data: unknown): Promise<void> } = {
data,
ack: async () => {
await this.redisClient.xack(event, this.options.group, id);
await this.redisClient.xack(event, this.group, id);
},
reply: async (data) => {
await this.redisClient.publish(`${event}:${id.toString()}`, this.options.encode(data));