Pause/Unpause

This commit is contained in:
Amish Shah
2017-10-26 13:32:38 +01:00
parent 79c10f7084
commit f5d10a5f76
2 changed files with 48 additions and 19 deletions

View File

@@ -6,6 +6,8 @@ const { Writable } = require('stream');
const secretbox = require('../util/Secretbox'); const secretbox = require('../util/Secretbox');
const FRAME_LENGTH = 20; const FRAME_LENGTH = 20;
const CHANNELS = 2;
const TIMESTAMP_INC = (48000 / 100) * CHANNELS;
const nonce = Buffer.alloc(24); const nonce = Buffer.alloc(24);
nonce.fill(0); nonce.fill(0);
@@ -30,10 +32,17 @@ class StreamDispatcher extends Writable {
*/ */
this.player = player; this.player = player;
this.streamOptions = streamOptions; this.streamOptions = streamOptions;
this.startTime = null;
this.pausedSince = null;
this._writeCallback = null;
this.pausedTime = 0;
this.count = 0;
this.on('error', this.destroy.bind(this)); this.on('error', this.destroy.bind(this));
this.on('finish', () => { this.on('finish', () => {
this.destroy.bind(this); this.destroy.bind(this);
// Still emitting end for backwards compatibility, probably remove it in the future!
this.emit('end'); this.emit('end');
}); });
} }
@@ -44,16 +53,47 @@ class StreamDispatcher extends Writable {
_write(chunk, enc, done) { _write(chunk, enc, done) {
if (!this.startTime) this.startTime = Date.now(); if (!this.startTime) this.startTime = Date.now();
this.setSpeaking(true); this._playChunk(chunk);
const packet = this.createPacket(this._sdata.sequence, this._sdata.timestamp, chunk); this._step(done);
this.sendPacket(packet); }
const next = FRAME_LENGTH + (this.startTime + (this._sdata.count * FRAME_LENGTH) - Date.now());
_destroy(err, cb) {
if (this.player.dispatcher !== this) return;
this.player.dispatcher = null;
const streams = this.player.streams;
if (streams.opus) streams.opus.unpipe(this);
if (streams.ffmpeg) streams.ffmpeg.destroy();
super._destroy(err, cb);
}
pause() {
this.pausedSince = Date.now();
}
unpause() {
this.pausedTime += Date.now() - this.pausedSince;
this.pausedSince = null;
if (this._writeCallback) this._writeCallback();
}
_step(done) {
if (this.pausedSince) {
this._writeCallback = done;
return;
}
const next = FRAME_LENGTH + (this.count * FRAME_LENGTH) - (Date.now() - this.startTime - this.pausedTime);
setTimeout(done.bind(this), next); setTimeout(done.bind(this), next);
if (this._sdata.sequence === (2 ** 16) - 1) this._sdata.sequence = -1; if (this._sdata.sequence === (2 ** 16) - 1) this._sdata.sequence = -1;
if (this._sdata.timestamp === (2 ** 32) - 1) this._sdata.timestamp = -960; if (this._sdata.timestamp === (2 ** 32) - 1) this._sdata.timestamp = -TIMESTAMP_INC;
this._sdata.sequence++; this._sdata.sequence++;
this._sdata.timestamp += 960; this._sdata.timestamp += TIMESTAMP_INC;
this._sdata.count++; this.count++;
}
_playChunk(chunk) {
if (this.player.dispatcher !== this) return;
this.setSpeaking(true);
this.sendPacket(this.createPacket(this._sdata.sequence, this._sdata.timestamp, chunk));
} }
createPacket(sequence, timestamp, buffer) { createPacket(sequence, timestamp, buffer) {
@@ -101,15 +141,6 @@ class StreamDispatcher extends Writable {
*/ */
this.emit('speaking', value); this.emit('speaking', value);
} }
destroy() {
if (this.player.dispatcher !== this) return;
this.player.dispatcher = null;
const streams = this.player.streams;
this.end();
if (streams.opus) streams.opus.unpipe(this);
if (streams.ffmpeg) streams.ffmpeg.destroy();
}
} }
module.exports = StreamDispatcher; module.exports = StreamDispatcher;

View File

@@ -1,8 +1,6 @@
const EventEmitter = require('events').EventEmitter; const EventEmitter = require('events').EventEmitter;
const prism = require('prism-media'); const prism = require('prism-media');
const StreamDispatcher = require('../dispatcher/StreamDispatcher'); const StreamDispatcher = require('../dispatcher/StreamDispatcher');
const Collection = require('../../../util/Collection');
const OpusEncoders = require('../opus/OpusEngineList');
const FFMPEG_ARGUMENTS = [ const FFMPEG_ARGUMENTS = [
'-analyzeduration', '0', '-analyzeduration', '0',