Add optional zstd for faster WebSocket data inflation (#3223)

* zstd
This commit is contained in:
Gus Caplan
2019-05-07 08:30:34 -05:00
committed by Amish Shah
parent 8915bc1d37
commit 3d4513268d
3 changed files with 45 additions and 22 deletions

View File

@@ -40,7 +40,8 @@ Using opusscript is only recommended for development environments where node-opu
For production bots, using node-opus should be considered a necessity, especially if they're going to be running on multiple servers. For production bots, using node-opus should be considered a necessity, especially if they're going to be running on multiple servers.
### Optional packages ### Optional packages
- [zlib-sync](https://www.npmjs.com/package/zlib-sync) for significantly faster WebSocket data inflation (`npm install zlib-sync`) - [zlib-sync](https://www.npmjs.com/package/zlib-sync) for faster WebSocket data inflation (`npm install zlib-sync`)
- [zucc](https://www.npmjs.com/package/zucc) for significantly faster WebSocket data inflation (`npm install zucc`)
- [erlpack](https://github.com/discordapp/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install discordapp/erlpack`) - [erlpack](https://github.com/discordapp/erlpack) for significantly faster WebSocket data (de)serialisation (`npm install discordapp/erlpack`)
- One of the following packages can be installed for faster voice packet encryption and decryption: - One of the following packages can be installed for faster voice packet encryption and decryption:
- [sodium](https://www.npmjs.com/package/sodium) (`npm install sodium`) - [sodium](https://www.npmjs.com/package/sodium) (`npm install sodium`)

View File

@@ -49,7 +49,8 @@
"erlpack": "discordapp/erlpack", "erlpack": "discordapp/erlpack",
"libsodium-wrappers": "^0.7.4", "libsodium-wrappers": "^0.7.4",
"sodium": "^3.0.2", "sodium": "^3.0.2",
"zlib-sync": "^0.1.4" "zlib-sync": "^0.1.4",
"zucc": "^0.1.0"
}, },
"devDependencies": { "devDependencies": {
"@types/node": "^10.12.24", "@types/node": "^10.12.24",
@@ -80,6 +81,7 @@
"sodium": false, "sodium": false,
"worker_threads": false, "worker_threads": false,
"zlib-sync": false, "zlib-sync": false,
"zucc": false,
"src/sharding/Shard.js": false, "src/sharding/Shard.js": false,
"src/sharding/ShardClientUtil.js": false, "src/sharding/ShardClientUtil.js": false,
"src/sharding/ShardingManager.js": false, "src/sharding/ShardingManager.js": false,

View File

@@ -4,12 +4,21 @@ const EventEmitter = require('events');
const WebSocket = require('../../WebSocket'); const WebSocket = require('../../WebSocket');
const { Status, Events, ShardEvents, OPCodes, WSEvents } = require('../../util/Constants'); const { Status, Events, ShardEvents, OPCodes, WSEvents } = require('../../util/Constants');
let zstd;
let decoder;
let zlib; let zlib;
try { try {
zlib = require('zlib-sync'); zstd = require('zucc');
if (!zlib.Inflate) zlib = require('pako'); decoder = new TextDecoder('utf8');
} catch (err) { } catch (e) {
zlib = require('pako'); try {
zlib = require('zlib-sync');
if (!zlib.Inflate) zlib = require('pako');
} catch (err) {
zlib = require('pako');
}
} }
/** /**
@@ -206,11 +215,15 @@ class WebSocketShard extends EventEmitter {
return; return;
} }
this.inflate = new zlib.Inflate({ if (zstd) {
chunkSize: 65535, this.inflate = new zstd.DecompressStream();
flush: zlib.Z_SYNC_FLUSH, } else {
to: WebSocket.encoding === 'json' ? 'string' : '', this.inflate = new zlib.Inflate({
}); chunkSize: 65535,
flush: zlib.Z_SYNC_FLUSH,
to: WebSocket.encoding === 'json' ? 'string' : '',
});
}
this.debug(`Trying to connect to ${gateway}, version ${client.options.ws.version}`); this.debug(`Trying to connect to ${gateway}, version ${client.options.ws.version}`);
@@ -219,7 +232,7 @@ class WebSocketShard extends EventEmitter {
const ws = this.connection = WebSocket.create(gateway, { const ws = this.connection = WebSocket.create(gateway, {
v: client.options.ws.version, v: client.options.ws.version,
compress: 'zlib-stream', compress: zstd ? 'zstd-stream' : 'zlib-stream',
}); });
ws.onopen = this.onOpen.bind(this); ws.onopen = this.onOpen.bind(this);
ws.onmessage = this.onMessage.bind(this); ws.onmessage = this.onMessage.bind(this);
@@ -243,19 +256,26 @@ class WebSocketShard extends EventEmitter {
* @private * @private
*/ */
onMessage({ data }) { onMessage({ data }) {
if (data instanceof ArrayBuffer) data = new Uint8Array(data); let raw;
const l = data.length; if (zstd) {
const flush = l >= 4 && const ab = this.inflate.decompress(new Uint8Array(data).buffer);
data[l - 4] === 0x00 && raw = decoder.decode(ab);
data[l - 3] === 0x00 && } else {
data[l - 2] === 0xFF && if (data instanceof ArrayBuffer) data = new Uint8Array(data);
data[l - 1] === 0xFF; const l = data.length;
const flush = l >= 4 &&
data[l - 4] === 0x00 &&
data[l - 3] === 0x00 &&
data[l - 2] === 0xFF &&
data[l - 1] === 0xFF;
this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH); this.inflate.push(data, flush && zlib.Z_SYNC_FLUSH);
if (!flush) return; if (!flush) return;
raw = this.inflate.result;
}
let packet; let packet;
try { try {
packet = WebSocket.unpack(this.inflate.result); packet = WebSocket.unpack(raw);
this.manager.client.emit(Events.RAW, packet, this.id); this.manager.client.emit(Events.RAW, packet, this.id);
if (packet.op === OPCodes.DISPATCH) this.manager.emit(packet.t, packet.d, this.id); if (packet.op === OPCodes.DISPATCH) this.manager.emit(packet.t, packet.d, this.id);
} catch (err) { } catch (err) {