-
-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathBinaryPack.ts
109 lines (91 loc) · 2.72 KB
/
BinaryPack.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
import { BinaryPackChunker, concatArrayBuffers } from "./binaryPackChunker";
import logger from "../../logger";
import type { Peer } from "../../peer";
import { BufferedConnection } from "./BufferedConnection";
import { SerializationType } from "../../enums";
import { pack, type Packable, unpack } from "peerjs-js-binarypack";
export class BinaryPack extends BufferedConnection {
private readonly chunker = new BinaryPackChunker();
readonly serialization = SerializationType.Binary;
private _chunkedData: {
[id: number]: {
data: Uint8Array[];
count: number;
total: number;
};
} = {};
public override close(options?: { flush?: boolean }) {
super.close(options);
this._chunkedData = {};
}
constructor(peerId: string, provider: Peer, options: any) {
super(peerId, provider, options);
}
// Handles a DataChannel message.
protected override _handleDataMessage({ data }: { data: Uint8Array }): void {
const deserializedData = unpack(data);
// PeerJS specific message
const peerData = deserializedData["__peerData"];
if (peerData) {
if (peerData.type === "close") {
this.close();
return;
}
// Chunked data -- piece things back together.
// @ts-ignore
this._handleChunk(deserializedData);
return;
}
this.emit("data", deserializedData);
}
private _handleChunk(data: {
__peerData: number;
n: number;
total: number;
data: ArrayBuffer;
}): void {
const id = data.__peerData;
const chunkInfo = this._chunkedData[id] || {
data: [],
count: 0,
total: data.total,
};
chunkInfo.data[data.n] = new Uint8Array(data.data);
chunkInfo.count++;
this._chunkedData[id] = chunkInfo;
if (chunkInfo.total === chunkInfo.count) {
// Clean up before making the recursive call to `_handleDataMessage`.
delete this._chunkedData[id];
// We've received all the chunks--time to construct the complete data.
// const data = new Blob(chunkInfo.data);
const data = concatArrayBuffers(chunkInfo.data);
this._handleDataMessage({ data });
}
}
protected override _send(data: Packable, chunked: boolean) {
const blob = pack(data);
if (blob instanceof Promise) {
return this._send_blob(blob);
}
if (!chunked && blob.byteLength > this.chunker.chunkedMTU) {
this._sendChunks(blob);
return;
}
this._bufferedSend(blob);
}
private async _send_blob(blobPromise: Promise<ArrayBufferLike>) {
const blob = await blobPromise;
if (blob.byteLength > this.chunker.chunkedMTU) {
this._sendChunks(blob);
return;
}
this._bufferedSend(blob);
}
private _sendChunks(blob: ArrayBuffer) {
const blobs = this.chunker.chunk(blob);
logger.log(`DC#${this.connectionId} Try to send ${blobs.length} chunks...`);
for (const blob of blobs) {
this.send(blob, true);
}
}
}