From 2c750a4e009991d2904c1d7c91be124eb0a5ac22 Mon Sep 17 00:00:00 2001 From: Denis-Adrian Cristea Date: Sat, 4 Oct 2025 11:51:43 +0300 Subject: [PATCH] refactor!: make RedisBroker require consumer name (#11001) * refactor(RedisBroker): require consumer name * chore: spelling Co-authored-by: Noel --------- Co-authored-by: Noel --- packages/brokers/README.md | 26 ++++++++++++++----- .../brokers/src/brokers/redis/BaseRedis.ts | 9 ++----- .../brokers/src/brokers/redis/RPCRedis.ts | 2 +- 3 files changed, 23 insertions(+), 14 deletions(-) diff --git a/packages/brokers/README.md b/packages/brokers/README.md index 948949b4d..f84fea53d 100644 --- a/packages/brokers/README.md +++ b/packages/brokers/README.md @@ -43,7 +43,8 @@ These examples use [ES modules](https://nodejs.org/api/esm.html#enabling). import { PubSubRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new PubSubRedisBroker(new Redis()); +// Considering this only pushes events, the group and name are not important. +const broker = new PubSubRedisBroker(new Redis(), { group: 'noop', name: 'noop' }); await broker.publish('test', 'Hello World!'); await broker.destroy(); @@ -52,13 +53,22 @@ await broker.destroy(); import { PubSubRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new PubSubRedisBroker(new Redis()); +const broker = new PubSubRedisBroker(new Redis(), { + // This is the consumer group name. You should make sure to not re-use this + // across different applications in your stack, unless you absolutely know + // what you're doing. + group: 'subscribers', + // With the assumption that this service will scale to more than one instance, + // you MUST ensure `UNIQUE_CONSUMER_ID` is unique across all of them and + // also deterministic (i.e. if instance-1 restarts, it should still be instance-1) + name: `consumer-${UNIQUE_CONSUMER_ID}`, +}); broker.on('test', ({ data, ack }) => { console.log(data); void ack(); }); -await broker.subscribe('subscribers', ['test']); +await broker.subscribe(['test']); ``` ### RPC @@ -68,7 +78,7 @@ await broker.subscribe('subscribers', ['test']); import { RPCRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new RPCRedisBroker(new Redis()); +const broker = new RPCRedisBroker(new Redis(), { group: 'noop', name: 'noop' }); console.log(await broker.call('testcall', 'Hello World!')); await broker.destroy(); @@ -77,14 +87,18 @@ await broker.destroy(); import { RPCRedisBroker } from '@discordjs/brokers'; import Redis from 'ioredis'; -const broker = new RPCRedisBroker(new Redis()); +const broker = new RPCRedisBroker(new Redis(), { + // Equivalent to the group/name in pubsub, refer to the previous example. + group: 'responders', + name: `consumer-${UNIQUE_ID}`, +}); broker.on('testcall', ({ data, ack, reply }) => { console.log('responder', data); void ack(); void reply(`Echo: ${data}`); }); -await broker.subscribe('responders', ['testcall']); +await broker.subscribe(['testcall']); ``` ## Links diff --git a/packages/brokers/src/brokers/redis/BaseRedis.ts b/packages/brokers/src/brokers/redis/BaseRedis.ts index 9ef109159..db6e01911 100644 --- a/packages/brokers/src/brokers/redis/BaseRedis.ts +++ b/packages/brokers/src/brokers/redis/BaseRedis.ts @@ -1,5 +1,4 @@ import type { Buffer } from 'node:buffer'; -import { randomBytes } from 'node:crypto'; import { readFileSync } from 'node:fs'; import { resolve } from 'node:path'; import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; @@ -23,25 +22,22 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { * How long to block for messages when polling */ blockTimeout?: number; - /** * Consumer group name to use for this broker * * @see {@link https://redis.io/commands/xreadgroup/} */ group: string; - /** * Max number of messages to poll at once */ maxChunk?: number; - /** * Unique consumer name. * * @see {@link https://redis.io/commands/xreadgroup/} */ - name?: string; + name: string; } /** @@ -49,10 +45,9 @@ export interface RedisBrokerOptions extends BaseBrokerOptions { */ export const DefaultRedisBrokerOptions = { ...DefaultBrokerOptions, - name: randomBytes(20).toString('hex'), maxChunk: 10, blockTimeout: 5_000, -} as const satisfies Required>; +} as const satisfies Required>; /** * Helper class with shared Redis logic diff --git a/packages/brokers/src/brokers/redis/RPCRedis.ts b/packages/brokers/src/brokers/redis/RPCRedis.ts index 578862767..11d11f769 100644 --- a/packages/brokers/src/brokers/redis/RPCRedis.ts +++ b/packages/brokers/src/brokers/redis/RPCRedis.ts @@ -24,7 +24,7 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions { export const DefaultRPCRedisBrokerOptions = { ...DefaultRedisBrokerOptions, timeout: 5_000, -} as const satisfies Required>; +} as const satisfies Required>; /** * RPC broker powered by Redis