1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
| class AckWebSocket {
/**
* @param {string} url WebSocket 地址
* @param {{
* protocols?: string | string[], // 子协议
* ackTimeoutMs?: number, // 等待 ACK 超时时间
* maxSendRetries?: number, // ACK 超时后最多重发次数
* resendOnReconnect?: boolean, // 重连成功后是否重发未 ACK 的消息
* reconnect?: {
* enabled?: boolean, // 是否开启自动重连
* baseDelayMs?: number, // 重连基础延迟(指数退避起点)
* maxDelayMs?: number, // 重连最大延迟上限
* jitter?: number, // 抖动比例(0~1),避免惊群
* maxRetries?: number, // 最大重连次数(默认 Infinity)
* },
* heartbeat?: {
* enabled?: boolean, // 是否开启心跳
* intervalMs?: number, // 心跳间隔
* timeoutMs?: number, // 等待 pong 超时
* },
* makeId?: () => string // 自定义消息 id 生成器
* }} options
*/
constructor(url, options = {}) {
this.url = url;
this.protocols = options.protocols;
// ====== ACK 相关配置 ======
this.ackTimeoutMs = options.ackTimeoutMs ?? 8000; // 等待 ACK 的超时时间
this.maxSendRetries = options.maxSendRetries ?? 1; // ACK 超时后重发次数
this.resendOnReconnect = options.resendOnReconnect ?? true; // 重连后是否重发 pending
// ====== 自动重连:指数退避 + 抖动 ======
this.reconnect = {
enabled: options.reconnect?.enabled ?? true,
baseDelayMs: options.reconnect?.baseDelayMs ?? 500,
maxDelayMs: options.reconnect?.maxDelayMs ?? 15000,
jitter: options.reconnect?.jitter ?? 0.2, // 0~1,建议 0.2~0.4
maxRetries: options.reconnect?.maxRetries ?? Infinity,
};
// ====== 心跳(可选) ======
this.heartbeat = {
enabled: options.heartbeat?.enabled ?? false,
intervalMs: options.heartbeat?.intervalMs ?? 15000,
timeoutMs: options.heartbeat?.timeoutMs ?? 8000,
};
// ====== 消息 id 生成器 ======
this.makeId =
options.makeId ??
(() =>
(typeof crypto !== "undefined" && crypto.randomUUID)
? crypto.randomUUID()
: `${Date.now()}-${Math.random().toString(16).slice(2)}`);
// 当前 WebSocket 实例
this.ws = null;
// 是否为手动 close(手动关闭则不再重连)
this.manualClose = false;
// pending:保存“已发送但未收到 ACK”的消息
// id -> { frame, resolve, reject, timer, tries }
this.pending = new Map();
// 重连状态
this.retryCount = 0;
this.reconnectTimer = null;
// 心跳状态
this.hbInterval = null; // 心跳定时器
this.hbTimeout = null; // 等待 pong 超时定时器
// ====== 外部可挂载的回调 ======
this.onOpen = null;
this.onClose = null;
this.onError = null;
this.onPush = null; // 处理 {t:"push", data} 的推送
this.onMessage = null; // 处理除 ack/pong 外的所有 JSON 消息
}
/** 建立连接 */
connect() {
this.manualClose = false;
this._openSocket();
}
/**
* 手动关闭连接
* 注意:这里默认把 pending 全部 reject,你也可以改成保留并在下次 connect 后继续重发
*/
close(code = 1000, reason = "manual close") {
this.manualClose = true;
this._clearReconnect();
this._stopHeartbeat();
// 关闭时把所有未 ACK 的消息标记失败
for (const [id, p] of this.pending) {
clearTimeout(p.timer);
p.reject(new Error(`WebSocket closed before ack (id=${id})`));
this.pending.delete(id);
}
// 关闭底层 socket
if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) {
this.ws.close(code, reason);
}
}
/** 当前是否已连接(OPEN) */
isOpen() {
return this.ws?.readyState === WebSocket.OPEN;
}
/**
* 发送一条“需要 ACK”的消息
* @param {any} data 业务数据
* @returns {Promise<{id: string}>} 收到 ACK 后 resolve
*/
sendWithAck(data) {
const id = this.makeId();
const frame = { t: "msg", id, data };
return new Promise((resolve, reject) => {
// 放入 pending,等待 ACK
this.pending.set(id, { frame, resolve, reject, timer: null, tries: 0 });
// 尝试发送
this._trySend(id);
});
}
/**
* 发送一条“不需要 ACK”的消息(例如日志、统计、通知)
* @param {any} obj
*/
sendRaw(obj) {
this._sendFrame(obj);
}
// ===================== 内部方法 =====================
/** 创建 WebSocket 并绑定事件 */
_openSocket() {
this._clearReconnect();
try {
this.ws = this.protocols
? new WebSocket(this.url, this.protocols)
: new WebSocket(this.url);
} catch (e) {
// 创建失败也走重连
this._scheduleReconnect(e);
return;
}
// 连接成功
this.ws.onopen = () => {
// 成功后重连计数清零
this.retryCount = 0;
// 开启心跳
if (this.heartbeat.enabled) this._startHeartbeat();
// 重连成功后:重发所有 pending(未 ACK)消息
if (this.resendOnReconnect && this.pending.size > 0) {
for (const id of this.pending.keys()) this._trySend(id, true);
}
this.onOpen?.();
};
// 收到消息
this.ws.onmessage = (evt) => {
let msg = evt.data;
// 尝试解析 JSON
try {
msg = typeof msg === "string" ? JSON.parse(msg) : msg;
} catch {
// 非 JSON 消息交给上层(如果需要)
this.onMessage?.(evt.data);
return;
}
// ====== 心跳 pong ======
if (msg?.t === "pong") {
// 收到 pong,清除等待 pong 的超时
this._clearHeartbeatTimeout();
return;
}
// ====== ACK ======
if (msg?.t === "ack" && msg.id) {
const p = this.pending.get(msg.id);
if (p) {
clearTimeout(p.timer);
this.pending.delete(msg.id);
// resolve 告诉业务侧:已确认
p.resolve({ id: msg.id });
}
return;
}
// ====== 其它消息 ======
this.onMessage?.(msg);
// 如果约定了 push 类型,可直接回调
if (msg?.t === "push") this.onPush?.(msg.data);
};
// 错误事件(多数浏览器信息有限)
this.ws.onerror = (err) => {
this.onError?.(err);
};
// 连接关闭
this.ws.onclose = (evt) => {
this._stopHeartbeat();
this.onClose?.(evt);
// 非手动关闭才自动重连
if (!this.manualClose && this.reconnect.enabled) {
this._scheduleReconnect(evt);
}
};
}
/**
* 实际发送(底层 send)
* @returns {boolean} 是否发送成功(仅代表 OPEN 并已调用 send)
*/
_sendFrame(obj) {
if (!this.isOpen()) return false;
this.ws.send(typeof obj === "string" ? obj : JSON.stringify(obj));
return true;
}
/**
* 尝试发送某条 pending 消息,并设置 ACK 超时重发
* @param {string} id
* @param {boolean} force 是否强制(重连成功后调用)
*/
_trySend(id, force = false) {
const p = this.pending.get(id);
if (!p) return;
// 未连接:非 force 则等重连;force 也发不了就继续等
if (!this.isOpen()) {
if (!force) return;
return;
}
// 超过最大重试次数:认为失败
if (p.tries > this.maxSendRetries) {
this.pending.delete(id);
p.reject(new Error(`ACK timeout retries exceeded (id=${id})`));
return;
}
// 记录本次尝试次数
p.tries += 1;
// 发送消息
this._sendFrame(p.frame);
// 设置 ACK 超时:超时则重发
clearTimeout(p.timer);
p.timer = setTimeout(() => {
this._trySend(id);
}, this.ackTimeoutMs);
}
/**
* 安排一次自动重连(指数退避 + 抖动)
* delay = min(baseDelay * 2^n, maxDelay) * (1 ± jitter)
*/
_scheduleReconnect(_reason) {
if (this.retryCount >= this.reconnect.maxRetries) return;
const n = this.retryCount++;
const base = this.reconnect.baseDelayMs * Math.pow(2, n);
const capped = Math.min(base, this.reconnect.maxDelayMs);
// 抖动:让延迟随机波动,避免大量客户端同一时刻重连(惊群)
const j = this.reconnect.jitter;
const rand = (Math.random() * 2 - 1) * j; // [-j, +j]
const delay = Math.max(0, Math.floor(capped * (1 + rand)));
this._clearReconnect();
this.reconnectTimer = setTimeout(() => this._openSocket(), delay);
}
/** 清除重连定时器 */
_clearReconnect() {
if (this.reconnectTimer) {
clearTimeout(this.reconnectTimer);
this.reconnectTimer = null;
}
}
// ===================== 心跳 =====================
/** 启动心跳:定时 ping,超时未 pong 则主动 close 触发重连 */
_startHeartbeat() {
this._stopHeartbeat();
this.hbInterval = setInterval(() => {
if (!this.isOpen()) return;
// 发送 ping
this._sendFrame({ t: "ping", ts: Date.now() });
// 等待 pong,超时则主动关闭,让 onclose 走重连
this._clearHeartbeatTimeout();
this.hbTimeout = setTimeout(() => {
try {
this.ws?.close(4000, "heartbeat timeout");
} catch {}
}, this.heartbeat.timeoutMs);
}, this.heartbeat.intervalMs);
}
/** 清除等待 pong 的超时定时器 */
_clearHeartbeatTimeout() {
if (this.hbTimeout) {
clearTimeout(this.hbTimeout);
this.hbTimeout = null;
}
}
/** 停止心跳 */
_stopHeartbeat() {
if (this.hbInterval) {
clearInterval(this.hbInterval);
this.hbInterval = null;
}
this._clearHeartbeatTimeout();
}
}
|