feat(RedisBroker): poll for unacked events (#11004)

Co-authored-by: Noel <buechler.noel@outlook.com>
This commit is contained in:
Denis-Adrian Cristea
2025-10-04 11:59:48 +03:00
committed by GitHub
parent 2c750a4e00
commit cf89260c98

View File

@@ -7,10 +7,20 @@ import { ReplyError } from 'ioredis';
import type { BaseBrokerOptions, IBaseBroker, ToEventMap } from '../Broker.js';
import { DefaultBrokerOptions } from '../Broker.js';
// For some reason ioredis doesn't have this typed, but it exists
type RedisReadGroupData = [Buffer, [Buffer, Buffer[]][]][];
// For some reason ioredis doesn't have those typed, but they exist
declare module 'ioredis' {
interface Redis {
xreadgroupBuffer(...args: (Buffer | string)[]): Promise<[Buffer, [Buffer, Buffer[]][]][] | null>;
xclaimBuffer(
key: Buffer | string,
group: Buffer | string,
consumer: Buffer | string,
minIdleTime: number,
id: Buffer | string,
...args: (Buffer | string)[]
): Promise<string[]>;
xreadgroupBuffer(...args: (Buffer | string)[]): Promise<RedisReadGroupData | null>;
}
}
@@ -32,6 +42,19 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
* Max number of messages to poll at once
*/
maxChunk?: number;
/**
* How many times a message can be delivered to a consumer before it is considered dead.
* This is used to prevent messages from being stuck in the queue forever if a consumer is
* unable to process them.
*/
maxDeliveredTimes?: number;
/**
* How long a message should be idle for before allowing it to be claimed by another consumer.
* Note that too high of a value can lead to a high delay in processing messages during a service downscale,
* while too low of a value can lead to messages being too eagerly claimed by other consumers during an instance
* restart (which is most likely not actually that problematic)
*/
messageIdleTime?: number;
/**
* Unique consumer name.
*
@@ -46,6 +69,8 @@ export interface RedisBrokerOptions extends BaseBrokerOptions {
export const DefaultRedisBrokerOptions = {
...DefaultBrokerOptions,
maxChunk: 10,
maxDeliveredTimes: 3,
messageIdleTime: 3_000,
blockTimeout: 5_000,
} as const satisfies Required<Omit<RedisBrokerOptions, 'group' | 'name'>>;
@@ -136,7 +161,7 @@ export abstract class BaseRedisBroker<
}
/**
* Begins polling for events, firing them to {@link BaseRedisBroker.listen}
* Begins polling for events, firing them to {@link BaseRedisBroker.emitEvent}
*/
protected async listen(): Promise<void> {
if (this.listening) {
@@ -145,40 +170,24 @@ export abstract class BaseRedisBroker<
this.listening = true;
// Enter regular polling
while (this.subscribedEvents.size > 0) {
try {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
this.options.group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
'BLOCK',
String(this.options.blockTimeout),
'STREAMS',
...this.subscribedEvents,
...Array.from({ length: this.subscribedEvents.size }, () => '>'),
);
await this.claimAndEmitDeadEvents();
} catch (error) {
// @ts-expect-error: Intended
this.emit('error', error);
// We don't break here to keep the loop running even if dead event processing fails
}
try {
// As per docs, '>' means "give me a new message"
const data = await this.readGroup('>', this.options.blockTimeout);
if (!data) {
continue;
}
for (const [event, info] of data) {
for (const [id, packet] of info) {
const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) {
continue;
}
const data = packet[idx + 1];
if (!data) {
continue;
}
this.emitEvent(id, this.options.group, event.toString('utf8'), this.options.decode(data));
}
}
await this.processMessages(data);
} catch (error) {
// @ts-expect-error: Intended
this.emit('error', error);
@@ -189,6 +198,103 @@ export abstract class BaseRedisBroker<
this.listening = false;
}
private async readGroup(fromId: string, block: number): Promise<RedisReadGroupData> {
const data = await this.streamReadClient.xreadgroupBuffer(
'GROUP',
this.options.group,
this.options.name,
'COUNT',
String(this.options.maxChunk),
'BLOCK',
String(block),
'STREAMS',
...this.subscribedEvents,
...Array.from({ length: this.subscribedEvents.size }, () => fromId),
);
return data ?? [];
}
private async processMessages(data: RedisReadGroupData): Promise<void> {
for (const [event, messages] of data) {
const eventName = event.toString('utf8');
for (const [id, packet] of messages) {
const idx = packet.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) continue;
const payload = packet[idx + 1];
if (!payload) continue;
this.emitEvent(id, this.options.group, eventName, this.options.decode(payload));
}
}
}
private async claimAndEmitDeadEvents(): Promise<void> {
for (const stream of this.subscribedEvents) {
// Get up to N oldest pending messages (note: a pending message is a message that has been read, but never ACKed)
const pending = (await this.streamReadClient.xpending(
stream,
this.options.group,
'-',
'+',
this.options.maxChunk,
// See: https://redis.io/docs/latest/commands/xpending/#extended-form-of-xpending
)) as [id: string, consumer: string, idleMs: number, deliveredTimes: number][];
for (const [id, consumer, idleMs, deliveredTimes] of pending) {
// Technically xclaim checks for us anyway, but why not avoid an extra call?
if (idleMs < this.options.messageIdleTime) {
continue;
}
if (deliveredTimes > this.options.maxDeliveredTimes) {
// This message is dead. It has repeatedly failed being processed by a consumer.
await this.streamReadClient.xdel(stream, this.options.group, id);
continue;
}
// Try to claim the message if we don't already own it (this may fail if another consumer has already claimed it)
if (consumer !== this.options.name) {
const claimed = await this.streamReadClient.xclaimBuffer(
stream,
this.options.group,
this.options.name,
Math.max(this.options.messageIdleTime, 1),
id,
'JUSTID',
);
// Another consumer got the message before us
if (!claimed?.length) {
continue;
}
}
// Fetch message body
const entries = await this.streamReadClient.xrangeBuffer(stream, id, id);
// No idea how this could happen, frankly!
if (!entries?.length) {
continue;
}
const [msgId, fields] = entries[0]!;
const idx = fields.findIndex((value, idx) => value.toString('utf8') === 'data' && idx % 2 === 0);
if (idx < 0) {
continue;
}
const payload = fields[idx + 1];
if (!payload) {
continue;
}
this.emitEvent(msgId, this.options.group, stream, this.options.decode(payload));
}
}
}
/**
* Destroys the broker, closing all connections
*/