refactor!: make RedisBroker require consumer name (#11001)

* refactor(RedisBroker): require consumer name

* chore: spelling

Co-authored-by: Noel <buechler.noel@outlook.com>

---------

Co-authored-by: Noel <buechler.noel@outlook.com>
This commit is contained in:
Denis-Adrian Cristea
2025-10-04 11:51:43 +03:00
committed by GitHub
parent 6431cea24b
commit 2c750a4e00
3 changed files with 23 additions and 14 deletions

View File

@@ -43,7 +43,8 @@ These examples use [ES modules](https://nodejs.org/api/esm.html#enabling).
import { PubSubRedisBroker } from '@discordjs/brokers'; import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis'; import Redis from 'ioredis';
const broker = new PubSubRedisBroker(new Redis()); // Considering this only pushes events, the group and name are not important.
const broker = new PubSubRedisBroker(new Redis(), { group: 'noop', name: 'noop' });
await broker.publish('test', 'Hello World!'); await broker.publish('test', 'Hello World!');
await broker.destroy(); await broker.destroy();
@@ -52,13 +53,22 @@ await broker.destroy();
import { PubSubRedisBroker } from '@discordjs/brokers'; import { PubSubRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis'; import Redis from 'ioredis';
const broker = new PubSubRedisBroker(new Redis()); const broker = new PubSubRedisBroker(new Redis(), {
// This is the consumer group name. You should make sure to not re-use this
// across different applications in your stack, unless you absolutely know
// what you're doing.
group: 'subscribers',
// With the assumption that this service will scale to more than one instance,
// you MUST ensure `UNIQUE_CONSUMER_ID` is unique across all of them and
// also deterministic (i.e. if instance-1 restarts, it should still be instance-1)
name: `consumer-${UNIQUE_CONSUMER_ID}`,
});
broker.on('test', ({ data, ack }) => { broker.on('test', ({ data, ack }) => {
console.log(data); console.log(data);
void ack(); void ack();
}); });
await broker.subscribe('subscribers', ['test']); await broker.subscribe(['test']);
``` ```
### RPC ### RPC
@@ -68,7 +78,7 @@ await broker.subscribe('subscribers', ['test']);
import { RPCRedisBroker } from '@discordjs/brokers'; import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis'; import Redis from 'ioredis';
const broker = new RPCRedisBroker(new Redis()); const broker = new RPCRedisBroker(new Redis(), { group: 'noop', name: 'noop' });
console.log(await broker.call('testcall', 'Hello World!')); console.log(await broker.call('testcall', 'Hello World!'));
await broker.destroy(); await broker.destroy();
@@ -77,14 +87,18 @@ await broker.destroy();
import { RPCRedisBroker } from '@discordjs/brokers'; import { RPCRedisBroker } from '@discordjs/brokers';
import Redis from 'ioredis'; import Redis from 'ioredis';
const broker = new RPCRedisBroker(new Redis()); const broker = new RPCRedisBroker(new Redis(), {
// Equivalent to the group/name in pubsub, refer to the previous example.
group: 'responders',
name: `consumer-${UNIQUE_ID}`,
});
broker.on('testcall', ({ data, ack, reply }) => { broker.on('testcall', ({ data, ack, reply }) => {
console.log('responder', data); console.log('responder', data);
void ack(); void ack();
void reply(`Echo: ${data}`); void reply(`Echo: ${data}`);
}); });
await broker.subscribe('responders', ['testcall']); await broker.subscribe(['testcall']);
``` ```
## Links ## Links

View File

@@ -1,5 +1,4 @@
import type { Buffer } from 'node:buffer'; import type { Buffer } from 'node:buffer';
import { randomBytes } from 'node:crypto';
import { readFileSync } from 'node:fs'; import { readFileSync } from 'node:fs';
import { resolve } from 'node:path'; import { resolve } from 'node:path';
import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter'; import { AsyncEventEmitter } from '@vladfrangu/async_event_emitter';
@@ -23,25 +22,22 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
* How long to block for messages when polling * How long to block for messages when polling
*/ */
blockTimeout?: number; blockTimeout?: number;
/** /**
* Consumer group name to use for this broker * Consumer group name to use for this broker
* *
* @see {@link https://redis.io/commands/xreadgroup/} * @see {@link https://redis.io/commands/xreadgroup/}
*/ */
group: string; group: string;
/** /**
* Max number of messages to poll at once * Max number of messages to poll at once
*/ */
maxChunk?: number; maxChunk?: number;
/** /**
* Unique consumer name. * Unique consumer name.
* *
* @see {@link https://redis.io/commands/xreadgroup/} * @see {@link https://redis.io/commands/xreadgroup/}
*/ */
name?: string; name: string;
} }
/** /**
@@ -49,10 +45,9 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
*/ */
export const DefaultRedisBrokerOptions = { export const DefaultRedisBrokerOptions = {
...DefaultBrokerOptions, ...DefaultBrokerOptions,
name: randomBytes(20).toString('hex'),
maxChunk: 10, maxChunk: 10,
blockTimeout: 5_000, blockTimeout: 5_000,
} as const satisfies Required<Omit<RedisBrokerOptions, 'group'>>; } as const satisfies Required<Omit<RedisBrokerOptions, 'group' | 'name'>>;
/** /**
* Helper class with shared Redis logic * Helper class with shared Redis logic

View File

@@ -24,7 +24,7 @@ export interface RPCRedisBrokerOptions extends RedisBrokerOptions {
export const DefaultRPCRedisBrokerOptions = { export const DefaultRPCRedisBrokerOptions = {
...DefaultRedisBrokerOptions, ...DefaultRedisBrokerOptions,
timeout: 5_000, timeout: 5_000,
} as const satisfies Required<Omit<RPCRedisBrokerOptions, 'group'>>; } as const satisfies Required<Omit<RPCRedisBrokerOptions, 'group' | 'name'>>;
/** /**
* RPC broker powered by Redis * RPC broker powered by Redis