fix(WebSocketShard): buffer native zlib decompression payload (#10416)

* fix(WebSocketShard): buffer native zlib decompression payload

* refactor: nit

Co-authored-by: Almeida <almeidx@pm.me>

---------

Co-authored-by: Almeida <almeidx@pm.me>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
DD
2024-08-15 19:15:08 +03:00
committed by GitHub
parent a6de2707fc
commit defb083528

View File

@@ -98,6 +98,13 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
private zLibSyncInflate: ZlibSync.Inflate | null = null;
/**
* @privateRemarks
*
* Used only for native zlib inflate, zlib-sync buffering is handled by the library itself.
*/
private inflateBuffer: Buffer[] = [];
private readonly textDecoder = new TextDecoder();
private replayedEvents = 0;
@@ -198,11 +205,17 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
case CompressionMethod.ZlibNative: {
const zlib = await getNativeZlib();
if (zlib) {
this.inflateBuffer = [];
const inflate = zlib.createInflate({
chunkSize: 65_535,
flush: zlib.constants.Z_SYNC_FLUSH,
});
inflate.on('data', (chunk) => {
this.inflateBuffer.push(chunk);
});
inflate.on('error', (error) => {
this.emit(WebSocketShardEvents.Error, error);
});
@@ -627,14 +640,28 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
decompressable.at(-1) === 0xff;
if (this.nativeInflate) {
this.nativeInflate.write(decompressable, 'binary');
const doneWriting = new Promise<void>((resolve) => {
// eslint-disable-next-line promise/prefer-await-to-callbacks
this.nativeInflate!.write(decompressable, 'binary', (error) => {
if (error) {
this.emit(WebSocketShardEvents.Error, error);
}
resolve();
});
});
if (!flush) {
return null;
}
const [result] = await once(this.nativeInflate, 'data');
return this.parseInflateResult(result);
// This way we're ensuring the latest write has flushed and our buffer is ready
await doneWriting;
const result = this.parseInflateResult(Buffer.concat(this.inflateBuffer));
this.inflateBuffer = [];
return result;
} else if (this.zLibSyncInflate) {
const zLibSync = (await getZlibSync())!;
this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH);