feat: zstd streaming support (#10758)

* feat(ws): zstd streaming support

* chore: naming of the mode

* fix: remove `@ts-expect-error`

---------

Co-authored-by: Almeida <github@almeidx.dev>
This commit is contained in:
Vlad Frangu
2025-12-12 15:01:25 +02:00
committed by GitHub
parent df0c28afc2
commit 5b4dbd541e
2 changed files with 38 additions and 6 deletions

View File

@@ -20,6 +20,7 @@ export enum Encoding {
export enum CompressionMethod { export enum CompressionMethod {
ZlibNative, ZlibNative,
ZlibSync, ZlibSync,
ZstdNative,
} }
export const DefaultDeviceProperty = `@discordjs/ws [VI]{{inject}}[/VI]` as `@discordjs/ws ${string}`; export const DefaultDeviceProperty = `@discordjs/ws [VI]{{inject}}[/VI]` as `@discordjs/ws ${string}`;
@@ -29,6 +30,7 @@ const getDefaultSessionStore = lazy(() => new Collection<number, SessionInfo | n
export const CompressionParameterMap = { export const CompressionParameterMap = {
[CompressionMethod.ZlibNative]: 'zlib-stream', [CompressionMethod.ZlibNative]: 'zlib-stream',
[CompressionMethod.ZlibSync]: 'zlib-stream', [CompressionMethod.ZlibSync]: 'zlib-stream',
[CompressionMethod.ZstdNative]: 'zstd-stream',
} as const satisfies Record<CompressionMethod, string>; } as const satisfies Record<CompressionMethod, string>;
/** /**

View File

@@ -217,7 +217,7 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
this.nativeInflate = inflate; this.nativeInflate = inflate;
} else { } else {
console.warn('WebSocketShard: Compression is set to native but node:zlib is not available.'); console.warn('WebSocketShard: Compression is set to native zlib but node:zlib is not available.');
params.delete('compress'); params.delete('compress');
} }
@@ -238,6 +238,34 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
break; break;
} }
case CompressionMethod.ZstdNative: {
const zlib = await getNativeZlib();
if (zlib && 'createZstdDecompress' in zlib) {
this.inflateBuffer = [];
const inflate = zlib.createZstdDecompress({
chunkSize: 65_535,
}) as nativeZlib.Inflate;
inflate.on('data', (chunk) => {
this.inflateBuffer.push(chunk);
});
inflate.on('error', (error) => {
this.emit(WebSocketShardEvents.Error, error);
});
this.nativeInflate = inflate;
} else {
console.warn(
'WebSocketShard: Compression is set to native zstd but node:zlib is not available or your node version does not support zstd decompression.',
);
params.delete('compress');
}
break;
}
} }
} }
@@ -628,12 +656,14 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
// Deal with transport compression // Deal with transport compression
if (this.transportCompressionEnabled) { if (this.transportCompressionEnabled) {
// Each WS message received is a full gateway message for zstd streaming, but for zlib it's chunked
const flush = const flush =
decompressable.length >= 4 && this.strategy.options.compression === CompressionMethod.ZstdNative ||
decompressable.at(-4) === 0x00 && (decompressable.length >= 4 &&
decompressable.at(-3) === 0x00 && decompressable.at(-4) === 0x00 &&
decompressable.at(-2) === 0xff && decompressable.at(-3) === 0x00 &&
decompressable.at(-1) === 0xff; decompressable.at(-2) === 0xff &&
decompressable.at(-1) === 0xff);
if (this.nativeInflate) { if (this.nativeInflate) {
const doneWriting = new Promise<void>((resolve) => { const doneWriting = new Promise<void>((resolve) => {