diff --git a/packages/voice/__tests__/AudioReceiveStream.test.ts b/packages/voice/__tests__/AudioReceiveStream.test.ts index 8823ff641..6e650f2b4 100644 --- a/packages/voice/__tests__/AudioReceiveStream.test.ts +++ b/packages/voice/__tests__/AudioReceiveStream.test.ts @@ -24,29 +24,27 @@ describe('AudioReceiveStream', () => { await wait(200); stream.push(DUMMY_BUFFER); expect(stream.readable).toEqual(true); + stream.push(null); + await wait(200); + expect(stream.readable).toEqual(false); }); - // TODO: Fix this test - // test('AfterSilence end behavior', async () => { - // const duration = 100; - // const increment = 20; + test('AfterSilence end behavior', async () => { + const duration = 100; + const increment = 20; - // const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration: 100 } }); - // stream.resume(); + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration } }); + stream.resume(); - // for (let i = increment; i < duration / 2; i += increment) { - // await stepSilence(stream, increment); - // } + for (let step = increment; step < duration / 2; step += increment) { + await stepSilence(stream, increment); + } - // stream.push(DUMMY_BUFFER); + stream.push(DUMMY_BUFFER); - // for (let i = increment; i < duration; i += increment) { - // await stepSilence(stream, increment); - // } - - // await wait(increment); - // expect(stream.readableEnded).toEqual(true); - // }); + await wait(duration); + expect(stream.readableEnded).toEqual(true); + }); test('AfterInactivity end behavior', async () => { const duration = 100; @@ -72,4 +70,22 @@ describe('AudioReceiveStream', () => { expect(stream.readableEnded).toEqual(true); }); + + test('Stream ends after pushing null', async () => { + const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } }); + stream.resume(); + + stream.push(DUMMY_BUFFER); + + expect(stream.readable).toEqual(true); + expect(stream.readableEnded).toEqual(false); + expect(stream.destroyed).toEqual(false); + + stream.push(null); + await wait(50); + + expect(stream.readable).toEqual(false); + expect(stream.readableEnded).toEqual(true); + expect(stream.destroyed).toEqual(true); + }); }); diff --git a/packages/voice/src/receive/AudioReceiveStream.ts b/packages/voice/src/receive/AudioReceiveStream.ts index 119903844..45b0ef3e8 100644 --- a/packages/voice/src/receive/AudioReceiveStream.ts +++ b/packages/voice/src/receive/AudioReceiveStream.ts @@ -1,4 +1,5 @@ import type { Buffer } from 'node:buffer'; +import { nextTick } from 'node:process'; import { Readable, type ReadableOptions } from 'node:stream'; import { SILENCE_FRAME } from '../audio/AudioPlayer'; @@ -74,6 +75,11 @@ export class AudioReceiveStream extends Readable { this.renewEndTimeout(this.end); } + if (buffer === null) { + // null marks EOF for stream + nextTick(() => this.destroy()); + } + return super.push(buffer); }