From bc3a0c83890cdc9b8ca6abd1fde59053d3c6d905 Mon Sep 17 00:00:00 2001 From: pat <73502164+nyapat@users.noreply.github.com> Date: Wed, 1 Jan 2025 07:45:24 +1100 Subject: [PATCH] fix(voice): mark stream as ended (#10455) * fix: mark stream as ended refactor: prefer destroying the stream * refactor: callback for nextTick test: wait duration ms to check end chore: eslint test: end before timeout --------- Co-authored-by: Almeida Co-authored-by: Jiralite <33201955+Jiralite@users.noreply.github.com> Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com> --- .../__tests__/AudioReceiveStream.test.ts | 50 ++++++++++++------- .../voice/src/receive/AudioReceiveStream.ts | 6 +++ 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/packages/voice/__tests__/AudioReceiveStream.test.ts b/packages/voice/__tests__/AudioReceiveStream.test.ts index 8823ff641..6e650f2b4 100644 --- a/packages/voice/__tests__/AudioReceiveStream.test.ts +++ b/packages/voice/__tests__/AudioReceiveStream.test.ts @@ -24,29 +24,27 @@ describe('AudioReceiveStream', () => { await wait(200); stream.push(DUMMY_BUFFER); expect(stream.readable).toEqual(true); + stream.push(null); + await wait(200); + expect(stream.readable).toEqual(false); }); - // TODO: Fix this test - // test('AfterSilence end behavior', async () => { - // const duration = 100; - // const increment = 20; + test('AfterSilence end behavior', async () => { + const duration = 100; + const increment = 20; - // const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration: 100 } }); - // stream.resume(); + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration } }); + stream.resume(); - // for (let i = increment; i < duration / 2; i += increment) { - // await stepSilence(stream, increment); - // } + for (let step = increment; step < duration / 2; step += increment) { + await stepSilence(stream, increment); + } - // stream.push(DUMMY_BUFFER); + stream.push(DUMMY_BUFFER); - // for (let i = increment; i < duration; i += increment) { - // await stepSilence(stream, increment); - // } - - // await wait(increment); - // expect(stream.readableEnded).toEqual(true); - // }); + await wait(duration); + expect(stream.readableEnded).toEqual(true); + }); test('AfterInactivity end behavior', async () => { const duration = 100; @@ -72,4 +70,22 @@ describe('AudioReceiveStream', () => { expect(stream.readableEnded).toEqual(true); }); + + test('Stream ends after pushing null', async () => { + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } }); + stream.resume(); + + stream.push(DUMMY_BUFFER); + + expect(stream.readable).toEqual(true); + expect(stream.readableEnded).toEqual(false); + expect(stream.destroyed).toEqual(false); + + stream.push(null); + await wait(50); + + expect(stream.readable).toEqual(false); + expect(stream.readableEnded).toEqual(true); + expect(stream.destroyed).toEqual(true); + }); }); diff --git a/packages/voice/src/receive/AudioReceiveStream.ts b/packages/voice/src/receive/AudioReceiveStream.ts index 119903844..45b0ef3e8 100644 --- a/packages/voice/src/receive/AudioReceiveStream.ts +++ b/packages/voice/src/receive/AudioReceiveStream.ts @@ -1,4 +1,5 @@ import type { Buffer } from 'node:buffer'; +import { nextTick } from 'node:process'; import { Readable, type ReadableOptions } from 'node:stream'; import { SILENCE_FRAME } from '../audio/AudioPlayer'; @@ -74,6 +75,11 @@ export class AudioReceiveStream extends Readable { this.renewEndTimeout(this.end); } + if (buffer === null) { + // null marks EOF for stream + nextTick(() => this.destroy()); + } + return super.push(buffer); }