diff --git a/packages/ws/src/ws/WebSocketShard.ts b/packages/ws/src/ws/WebSocketShard.ts index 4c47f6751..3025a4cb1 100644 --- a/packages/ws/src/ws/WebSocketShard.ts +++ b/packages/ws/src/ws/WebSocketShard.ts @@ -98,6 +98,13 @@ export class WebSocketShard extends AsyncEventEmitter { private zLibSyncInflate: ZlibSync.Inflate | null = null; + /** + * @privateRemarks + * + * Used only for native zlib inflate, zlib-sync buffering is handled by the library itself. + */ + private inflateBuffer: Buffer[] = []; + private readonly textDecoder = new TextDecoder(); private replayedEvents = 0; @@ -198,11 +205,17 @@ export class WebSocketShard extends AsyncEventEmitter { case CompressionMethod.ZlibNative: { const zlib = await getNativeZlib(); if (zlib) { + this.inflateBuffer = []; + const inflate = zlib.createInflate({ chunkSize: 65_535, flush: zlib.constants.Z_SYNC_FLUSH, }); + inflate.on('data', (chunk) => { + this.inflateBuffer.push(chunk); + }); + inflate.on('error', (error) => { this.emit(WebSocketShardEvents.Error, error); }); @@ -627,14 +640,28 @@ export class WebSocketShard extends AsyncEventEmitter { decompressable.at(-1) === 0xff; if (this.nativeInflate) { - this.nativeInflate.write(decompressable, 'binary'); + const doneWriting = new Promise((resolve) => { + // eslint-disable-next-line promise/prefer-await-to-callbacks + this.nativeInflate!.write(decompressable, 'binary', (error) => { + if (error) { + this.emit(WebSocketShardEvents.Error, error); + } + + resolve(); + }); + }); if (!flush) { return null; } - const [result] = await once(this.nativeInflate, 'data'); - return this.parseInflateResult(result); + // This way we're ensuring the latest write has flushed and our buffer is ready + await doneWriting; + + const result = this.parseInflateResult(Buffer.concat(this.inflateBuffer)); + this.inflateBuffer = []; + + return result; } else if (this.zLibSyncInflate) { const zLibSync = (await getZlibSync())!; this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH);