把 TCP 的可靠性搬到 WebSocket:ACK 确认与指数退避重连(附完整代码)
最近在回顾《计算机网络》,再回头看之前写的 WebSocket 的使用体验,会发现一个很现实的差距:WebSocket 提供的是一条高效的长连接通道,但它并不会替你保证“业务消息一定送达”。 send() 调用成功,最多只能说明数据被写进了本地发送缓冲区;一旦遇到网络抖动、切网、代理超时或服务端重启,消息就可能在路上丢失、重复,甚至出现连接“看起来还在、实际上已经断了”的半开状态。
而这些问题,其实正是《计算机网络》里反复强调的核心命题:想要稳定传输,离不开确认(ACK) 、超时重传、退避重试和保活(Keepalive) 。TCP 在传输层替我们封装了大量可靠性机制,但当我们把通信提升到 WebSocket 这种应用层通道时,“可靠消息”的语义仍然需要自己补齐——尤其是在需要可确认投递、断线可恢复的业务场景里。
所以接下来我会用 JavaScript 封装一个更“稳”的 WebSocket:为每条消息加上 ACK 确认与超时重传;断线后采用指数退避 + 抖动自动重连,避免重连风暴;并提供可选的心跳检测与断线重发。一步步实现一个更“生产可用”的 WebSocket 客户端封装,包含:
- ACK 确认机制:每条消息带
id,服务端回ack,发送端可确认投递完成 - ACK 超时与重试
- 指数退避+抖动自动重连,避免惊群
- 断线重连后自动重发未 ACK 消息(可选)
- 心跳 ping/pong(可选)
1. 为什么需要 ACK?
很多人以为 WebSocket 基于 TCP,消息就“可靠”。但 TCP 的可靠指的是连接层字节流,不是业务层的“消息已送达/已处理”。
常见场景:
send()只是写入缓冲区,连接随后断开,对端可能没收到- 对端收到但处理失败(入库失败/校验失败/业务异常),发送方仍然不知道
- 断线后重发可能导致重复处理(扣款、下单、状态变更等风险)
所以我们在应用层加一层很轻的协议:每条消息带唯一 id,对端回 ACK。发送方收到 ACK,才认为这条消息完成。
2. 最小协议约定(推荐)
2.1 客户端发送(需要 ACK)
{ "t": "msg", "id": "uuid", "data": { ... } }2.2 服务端确认(ACK)
{ "t": "ack", "id": "uuid" }2.3(可选)心跳
- ping:
{ "t": "ping", "ts": 1700000000000 }- pong:
{ "t": "pong", "ts": 1700000000000 }
t是 type 字段,id用于匹配 pending 消息,data是业务 payload。
3. 客户端封装的核心思路
封装的关键是维护一个 pending 表:
pending[id] = { frame, resolve, reject, timer, tries }
当你调用 sendWithAck(data):
- 生成
id - 组装 frame:
{t:"msg", id, data} - 写入
pending - 发送 frame
- 启动 ACK 超时定时器:超时 → 重发或失败
收到服务端 ack:
- 在
pending里找到对应条目 - 清理超时定时器
- resolve Promise
连接断开时:
- 按指数退避安排重连(带 jitter)
- 重连成功后(可选)重发未 ACK 的 pending 消息
4. 完整代码(ES6,可直接用)
这份封装实现了:ACK、ACK 超时重试、指数退避自动重连、断线重发、可选心跳。
class AckWebSocket { /** * @param {string} url WebSocket 地址 * @param {{ * protocols?: string | string[], // 子协议 * ackTimeoutMs?: number, // 等待 ACK 超时时间 * maxSendRetries?: number, // ACK 超时后最多重发次数 * resendOnReconnect?: boolean, // 重连成功后是否重发未 ACK 的消息 * reconnect?: { * enabled?: boolean, // 是否开启自动重连 * baseDelayMs?: number, // 重连基础延迟(指数退避起点) * maxDelayMs?: number, // 重连最大延迟上限 * jitter?: number, // 抖动比例(0~1),避免惊群 * maxRetries?: number, // 最大重连次数(默认 Infinity) * }, * heartbeat?: { * enabled?: boolean, // 是否开启心跳 * intervalMs?: number, // 心跳间隔 * timeoutMs?: number, // 等待 pong 超时 * }, * makeId?: () => string // 自定义消息 id 生成器 * }} options */ constructor(url, options = {}) { this.url = url; this.protocols = options.protocols;
// ====== ACK 相关配置 ====== this.ackTimeoutMs = options.ackTimeoutMs ?? 8000; // 等待 ACK 的超时时间 this.maxSendRetries = options.maxSendRetries ?? 1; // ACK 超时后重发次数 this.resendOnReconnect = options.resendOnReconnect ?? true; // 重连后是否重发 pending
// ====== 自动重连:指数退避 + 抖动 ====== this.reconnect = { enabled: options.reconnect?.enabled ?? true, baseDelayMs: options.reconnect?.baseDelayMs ?? 500, maxDelayMs: options.reconnect?.maxDelayMs ?? 15000, jitter: options.reconnect?.jitter ?? 0.2, // 0~1,建议 0.2~0.4 maxRetries: options.reconnect?.maxRetries ?? Infinity, };
// ====== 心跳(可选) ====== this.heartbeat = { enabled: options.heartbeat?.enabled ?? false, intervalMs: options.heartbeat?.intervalMs ?? 15000, timeoutMs: options.heartbeat?.timeoutMs ?? 8000, };
// ====== 消息 id 生成器 ====== this.makeId = options.makeId ?? (() => (typeof crypto !== "undefined" && crypto.randomUUID) ? crypto.randomUUID() : `${Date.now()}-${Math.random().toString(16).slice(2)}`);
// 当前 WebSocket 实例 this.ws = null;
// 是否为手动 close(手动关闭则不再重连) this.manualClose = false;
// pending:保存“已发送但未收到 ACK”的消息 // id -> { frame, resolve, reject, timer, tries } this.pending = new Map();
// 重连状态 this.retryCount = 0; this.reconnectTimer = null;
// 心跳状态 this.hbInterval = null; // 心跳定时器 this.hbTimeout = null; // 等待 pong 超时定时器
// ====== 外部可挂载的回调 ====== this.onOpen = null; this.onClose = null; this.onError = null; this.onPush = null; // 处理 {t:"push", data} 的推送 this.onMessage = null; // 处理除 ack/pong 外的所有 JSON 消息 }
/** 建立连接 */ connect() { this.manualClose = false; this._openSocket(); }
/** * 手动关闭连接 * 注意:这里默认把 pending 全部 reject,你也可以改成保留并在下次 connect 后继续重发 */ close(code = 1000, reason = "manual close") { this.manualClose = true; this._clearReconnect(); this._stopHeartbeat();
// 关闭时把所有未 ACK 的消息标记失败 for (const [id, p] of this.pending) { clearTimeout(p.timer); p.reject(new Error(`WebSocket closed before ack (id=${id})`)); this.pending.delete(id); }
// 关闭底层 socket if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) { this.ws.close(code, reason); } }
/** 当前是否已连接(OPEN) */ isOpen() { return this.ws?.readyState === WebSocket.OPEN; }
/** * 发送一条“需要 ACK”的消息 * @param {any} data 业务数据 * @returns {Promise<{id: string}>} 收到 ACK 后 resolve */ sendWithAck(data) { const id = this.makeId(); const frame = { t: "msg", id, data };
return new Promise((resolve, reject) => { // 放入 pending,等待 ACK this.pending.set(id, { frame, resolve, reject, timer: null, tries: 0 }); // 尝试发送 this._trySend(id); }); }
/** * 发送一条“不需要 ACK”的消息(例如日志、统计、通知) * @param {any} obj */ sendRaw(obj) { this._sendFrame(obj); }
// ===================== 内部方法 =====================
/** 创建 WebSocket 并绑定事件 */ _openSocket() { this._clearReconnect();
try { this.ws = this.protocols ? new WebSocket(this.url, this.protocols) : new WebSocket(this.url); } catch (e) { // 创建失败也走重连 this._scheduleReconnect(e); return; }
// 连接成功 this.ws.onopen = () => { // 成功后重连计数清零 this.retryCount = 0;
// 开启心跳 if (this.heartbeat.enabled) this._startHeartbeat();
// 重连成功后:重发所有 pending(未 ACK)消息 if (this.resendOnReconnect && this.pending.size > 0) { for (const id of this.pending.keys()) this._trySend(id, true); }
this.onOpen?.(); };
// 收到消息 this.ws.onmessage = (evt) => { let msg = evt.data;
// 尝试解析 JSON try { msg = typeof msg === "string" ? JSON.parse(msg) : msg; } catch { // 非 JSON 消息交给上层(如果需要) this.onMessage?.(evt.data); return; }
// ====== 心跳 pong ====== if (msg?.t === "pong") { // 收到 pong,清除等待 pong 的超时 this._clearHeartbeatTimeout(); return; }
// ====== ACK ====== if (msg?.t === "ack" && msg.id) { const p = this.pending.get(msg.id); if (p) { clearTimeout(p.timer); this.pending.delete(msg.id); // resolve 告诉业务侧:已确认 p.resolve({ id: msg.id }); } return; }
// ====== 其它消息 ====== this.onMessage?.(msg); // 如果约定了 push 类型,可直接回调 if (msg?.t === "push") this.onPush?.(msg.data); };
// 错误事件(多数浏览器信息有限) this.ws.onerror = (err) => { this.onError?.(err); };
// 连接关闭 this.ws.onclose = (evt) => { this._stopHeartbeat(); this.onClose?.(evt);
// 非手动关闭才自动重连 if (!this.manualClose && this.reconnect.enabled) { this._scheduleReconnect(evt); } }; }
/** * 实际发送(底层 send) * @returns {boolean} 是否发送成功(仅代表 OPEN 并已调用 send) */ _sendFrame(obj) { if (!this.isOpen()) return false; this.ws.send(typeof obj === "string" ? obj : JSON.stringify(obj)); return true; }
/** * 尝试发送某条 pending 消息,并设置 ACK 超时重发 * @param {string} id * @param {boolean} force 是否强制(重连成功后调用) */ _trySend(id, force = false) { const p = this.pending.get(id); if (!p) return;
// 未连接:非 force 则等重连;force 也发不了就继续等 if (!this.isOpen()) { if (!force) return; return; }
// 超过最大重试次数:认为失败 if (p.tries > this.maxSendRetries) { this.pending.delete(id); p.reject(new Error(`ACK timeout retries exceeded (id=${id})`)); return; }
// 记录本次尝试次数 p.tries += 1;
// 发送消息 this._sendFrame(p.frame);
// 设置 ACK 超时:超时则重发 clearTimeout(p.timer); p.timer = setTimeout(() => { this._trySend(id); }, this.ackTimeoutMs); }
/** * 安排一次自动重连(指数退避 + 抖动) * delay = min(baseDelay * 2^n, maxDelay) * (1 ± jitter) */ _scheduleReconnect(_reason) { if (this.retryCount >= this.reconnect.maxRetries) return;
const n = this.retryCount++; const base = this.reconnect.baseDelayMs * Math.pow(2, n); const capped = Math.min(base, this.reconnect.maxDelayMs);
// 抖动:让延迟随机波动,避免大量客户端同一时刻重连(惊群) const j = this.reconnect.jitter; const rand = (Math.random() * 2 - 1) * j; // [-j, +j] const delay = Math.max(0, Math.floor(capped * (1 + rand)));
this._clearReconnect(); this.reconnectTimer = setTimeout(() => this._openSocket(), delay); }
/** 清除重连定时器 */ _clearReconnect() { if (this.reconnectTimer) { clearTimeout(this.reconnectTimer); this.reconnectTimer = null; } }
// ===================== 心跳 =====================
/** 启动心跳:定时 ping,超时未 pong 则主动 close 触发重连 */ _startHeartbeat() { this._stopHeartbeat();
this.hbInterval = setInterval(() => { if (!this.isOpen()) return;
// 发送 ping this._sendFrame({ t: "ping", ts: Date.now() });
// 等待 pong,超时则主动关闭,让 onclose 走重连 this._clearHeartbeatTimeout(); this.hbTimeout = setTimeout(() => { try { this.ws?.close(4000, "heartbeat timeout"); } catch {} }, this.heartbeat.timeoutMs); }, this.heartbeat.intervalMs); }
/** 清除等待 pong 的超时定时器 */ _clearHeartbeatTimeout() { if (this.hbTimeout) { clearTimeout(this.hbTimeout); this.hbTimeout = null; } }
/** 停止心跳 */ _stopHeartbeat() { if (this.hbInterval) { clearInterval(this.hbInterval); this.hbInterval = null; } this._clearHeartbeatTimeout(); }}5. 怎么用(业务侧最小示例)
const ws = new AckWebSocket("wss://example.com/ws", { ackTimeoutMs: 5000, maxSendRetries: 2, resendOnReconnect: true, reconnect: { baseDelayMs: 300, maxDelayMs: 10000, jitter: 0.3 }, heartbeat: { enabled: true, intervalMs: 15000, timeoutMs: 6000 },});
ws.onOpen = () => console.log("connected");ws.onClose = (e) => console.log("closed:", e.code, e.reason);ws.onPush = (data) => console.log("push:", data);
ws.connect();
async function sendChat(text) { try { const { id } = await ws.sendWithAck({ type: "chat", text }); console.log("acked:", id); } catch (e) { console.error("send failed:", e); }}6. 服务端必须配合的点(非常关键)
6.1 收到 msg 后回 ACK
最简单的伪代码:
if (frame.t === "msg") { // 处理你的业务... send({ t: "ack", id: frame.id })}6.2 处理“重发导致重复”的问题:幂等/去重
因为客户端会重试、重连后也可能重发,同一个 id 可能多次出现。服务端至少要做到:
- 如果
id已处理过:直接回 ACK,不要重复执行副作用 - 如果没处理过:执行一次,然后记录
id
实现上可用:
- 内存 Map(简单,但重启会丢)
- Redis set/hash(更稳)
- 数据库唯一键(最强,但成本更高)
7. 调参建议(经验值)
-
ackTimeoutMs:5~10 秒(如果 ACK 表示“处理成功”,要看业务耗时) -
maxSendRetries:1~3 次(避免无限重试) -
重连:
baseDelayMs:300~800msmaxDelayMs:10~30sjitter:0.2~0.4(强烈建议保留)
-
心跳:
intervalMs:10~20s(移动端可适当放大)timeoutMs:5~10s
8. 常见坑清单(上线前检查)
- ACK 语义:是“已收到”还是“已处理成功”?(决定 timeout 与用户体验)
- 服务端去重:没有去重,重试会造成重复副作用
- 乱序问题:重连后重发 pending 可能与新消息交错;若要求严格顺序,需要串行发送或加序列号
- 后台定时器被限频:移动端/后台页 setTimeout 精度会变差,属正常现象,去重能兜底
结语
WebSocket 本质上只是一个高效的长连接“通道”,它解决的是通信方式问题,却不直接保证“业务消息可靠送达”。想把它从“能跑起来”变成“线上稳得住”,就需要像《计算机网络》里那样把可靠性机制补到应用层:用 ACK 让交付可确认,用 超时重试 对抗偶发丢包与断链,用 指数退避 + jitter 控制重连节奏、避免惊群。把这些机制组合起来,你得到的不只是一个 WebSocket 连接,而是一套更接近“可靠传输”的工程化实时通信能力。 当然,这套实现更多是我最近学习《计算机网络》后的实践总结,仍然有很多取舍与边界(比如 ACK 语义、幂等去重、消息顺序、持久化策略等)值得打磨。如果你在实际项目里有更成熟的方案或踩坑经验,欢迎指正交流。
文章分享
如果这篇文章对你有帮助,欢迎分享给更多人!
部分内容可能已过时