fix(voice): mark stream as ended (#10455)

* fix: mark stream as ended

refactor: prefer destroying the stream

* refactor: callback for nextTick

test: wait duration ms to check end

chore: eslint

test: end before timeout

---------

Co-authored-by: Almeida <github@almeidx.dev>
Co-authored-by: Jiralite <33201955+Jiralite@users.noreply.github.com>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
This commit is contained in:
pat
2025-01-01 07:45:24 +11:00
committed by GitHub
parent a6685a319e
commit bc3a0c8389
2 changed files with 39 additions and 17 deletions

View File

@@ -24,29 +24,27 @@ describe('AudioReceiveStream', () => {
await wait(200); await wait(200);
stream.push(DUMMY_BUFFER); stream.push(DUMMY_BUFFER);
expect(stream.readable).toEqual(true); expect(stream.readable).toEqual(true);
stream.push(null);
await wait(200);
expect(stream.readable).toEqual(false);
}); });
// TODO: Fix this test test('AfterSilence end behavior', async () => {
// test('AfterSilence end behavior', async () => { const duration = 100;
// const duration = 100; const increment = 20;
// const increment = 20;
// const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration: 100 } }); const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterSilence, duration } });
// stream.resume(); stream.resume();
// for (let i = increment; i < duration / 2; i += increment) { for (let step = increment; step < duration / 2; step += increment) {
// await stepSilence(stream, increment); await stepSilence(stream, increment);
// } }
// stream.push(DUMMY_BUFFER); stream.push(DUMMY_BUFFER);
// for (let i = increment; i < duration; i += increment) { await wait(duration);
// await stepSilence(stream, increment); expect(stream.readableEnded).toEqual(true);
// } });
// await wait(increment);
// expect(stream.readableEnded).toEqual(true);
// });
test('AfterInactivity end behavior', async () => { test('AfterInactivity end behavior', async () => {
const duration = 100; const duration = 100;
@@ -72,4 +70,22 @@ describe('AudioReceiveStream', () => {
expect(stream.readableEnded).toEqual(true); expect(stream.readableEnded).toEqual(true);
}); });
test('Stream ends after pushing null', async () => {
const stream = new AudioReceiveStream({ end: { behavior: EndBehaviorType.AfterInactivity, duration: 100 } });
stream.resume();
stream.push(DUMMY_BUFFER);
expect(stream.readable).toEqual(true);
expect(stream.readableEnded).toEqual(false);
expect(stream.destroyed).toEqual(false);
stream.push(null);
await wait(50);
expect(stream.readable).toEqual(false);
expect(stream.readableEnded).toEqual(true);
expect(stream.destroyed).toEqual(true);
});
}); });

View File

@@ -1,4 +1,5 @@
import type { Buffer } from 'node:buffer'; import type { Buffer } from 'node:buffer';
import { nextTick } from 'node:process';
import { Readable, type ReadableOptions } from 'node:stream'; import { Readable, type ReadableOptions } from 'node:stream';
import { SILENCE_FRAME } from '../audio/AudioPlayer'; import { SILENCE_FRAME } from '../audio/AudioPlayer';
@@ -74,6 +75,11 @@ export class AudioReceiveStream extends Readable {
this.renewEndTimeout(this.end); this.renewEndTimeout(this.end);
} }
if (buffer === null) {
// null marks EOF for stream
nextTick(() => this.destroy());
}
return super.push(buffer); return super.push(buffer);
} }