-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
Copy pathali.ts
127 lines (105 loc) · 2.57 KB
/
ali.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
import { Buffer } from 'buffer'
import { Transform } from 'readable-stream'
import { StreamBuilder } from '../shared'
import MqttClient, { IClientOptions } from '../client'
import { BufferedDuplex } from '../BufferedDuplex'
let my: any
let proxy: Transform
let stream: BufferedDuplex
let isInitialized = false
function buildProxy() {
const _proxy = new Transform()
_proxy._write = (chunk, encoding, next) => {
my.sendSocketMessage({
data: chunk.buffer,
success() {
next()
},
fail() {
next(new Error())
},
})
}
_proxy._flush = (done) => {
my.closeSocket({
success() {
done()
},
})
}
return _proxy
}
function setDefaultOpts(opts: IClientOptions) {
if (!opts.hostname) {
opts.hostname = 'localhost'
}
if (!opts.path) {
opts.path = '/'
}
if (!opts.wsOptions) {
opts.wsOptions = {}
}
}
function buildUrl(opts: IClientOptions, client: MqttClient) {
const protocol = opts.protocol === 'alis' ? 'wss' : 'ws'
let url = `${protocol}://${opts.hostname}${opts.path}`
if (opts.port && opts.port !== 80 && opts.port !== 443) {
url = `${protocol}://${opts.hostname}:${opts.port}${opts.path}`
}
if (typeof opts.transformWsUrl === 'function') {
url = opts.transformWsUrl(url, opts, client)
}
return url
}
function bindEventHandler() {
if (isInitialized) return
isInitialized = true
my.onSocketOpen(() => {
stream.socketReady()
})
my.onSocketMessage((res) => {
if (typeof res.data === 'string') {
const buffer = Buffer.from(res.data, 'base64')
proxy.push(buffer)
} else {
const reader = new FileReader()
reader.addEventListener('load', () => {
let data = reader.result
if (data instanceof ArrayBuffer) data = Buffer.from(data)
else data = Buffer.from(data, 'utf8')
proxy.push(data)
})
reader.readAsArrayBuffer(res.data)
}
})
my.onSocketClose(() => {
stream.end()
stream.destroy()
})
my.onSocketError((err) => {
stream.destroy(err)
})
}
const buildStream: StreamBuilder = (client, opts) => {
opts.hostname = opts.hostname || opts.host
if (!opts.hostname) {
throw new Error('Could not determine host. Specify host manually.')
}
const websocketSubProtocol =
opts.protocolId === 'MQIsdp' && opts.protocolVersion === 3
? 'mqttv3.1'
: 'mqtt'
setDefaultOpts(opts)
const url = buildUrl(opts, client)
my = opts.my
// https://miniprogram.alipay.com/docs/miniprogram/mpdev/api_network_connectsocket
my.connectSocket({
url,
protocols: websocketSubProtocol,
})
proxy = buildProxy()
stream = new BufferedDuplex(opts, proxy, my)
bindEventHandler()
return stream
}
export default buildStream