跳过正文
Background Image

把 TCP 的可靠性搬到 WebSocket:ACK 确认与指数退避重连(附完整代码)

·1640 字·8 分钟
Hypoy
作者
Hypoy
写万行码,行万里路
目录

最近在回顾《计算机网络》,再回头看之前写的 WebSocket 的使用体验,会发现一个很现实的差距:WebSocket 提供的是一条高效的长连接通道,但它并不会替你保证“业务消息一定送达”。 send() 调用成功,最多只能说明数据被写进了本地发送缓冲区;一旦遇到网络抖动、切网、代理超时或服务端重启,消息就可能在路上丢失、重复,甚至出现连接“看起来还在、实际上已经断了”的半开状态。

而这些问题,其实正是《计算机网络》里反复强调的核心命题:想要稳定传输,离不开确认(ACK)超时重传退避重试保活(Keepalive) 。TCP 在传输层替我们封装了大量可靠性机制,但当我们把通信提升到 WebSocket 这种应用层通道时,“可靠消息”的语义仍然需要自己补齐——尤其是在需要可确认投递、断线可恢复的业务场景里。

所以接下来我会用 JavaScript 封装一个更“稳”的 WebSocket:为每条消息加上 ACK 确认与超时重传;断线后采用指数退避 + 抖动自动重连,避免重连风暴;并提供可选的心跳检测与断线重发。一步步实现一个更“生产可用”的 WebSocket 客户端封装,包含:

  • ACK 确认机制:每条消息带 id,服务端回 ack,发送端可确认投递完成
  • ACK 超时与重试
  • 指数退避+抖动自动重连,避免惊群
  • 断线重连后自动重发未 ACK 消息(可选)
  • 心跳 ping/pong(可选)

1. 为什么需要 ACK?
#

很多人以为 WebSocket 基于 TCP,消息就“可靠”。但 TCP 的可靠指的是连接层字节流,不是业务层的“消息已送达/已处理”。

常见场景:

  • send() 只是写入缓冲区,连接随后断开,对端可能没收到
  • 对端收到但处理失败(入库失败/校验失败/业务异常),发送方仍然不知道
  • 断线后重发可能导致重复处理(扣款、下单、状态变更等风险)

所以我们在应用层加一层很轻的协议:每条消息带唯一 id,对端回 ACK。发送方收到 ACK,才认为这条消息完成。


2. 最小协议约定(推荐)
#

2.1 客户端发送(需要 ACK)
#

1
{ "t": "msg", "id": "uuid", "data": { ... } }

2.2 服务端确认(ACK)
#

1
{ "t": "ack", "id": "uuid" }

2.3(可选)心跳
#

  • ping:
1
{ "t": "ping", "ts": 1700000000000 }
  • pong:
1
{ "t": "pong", "ts": 1700000000000 }

t 是 type 字段,id 用于匹配 pending 消息,data 是业务 payload。


3. 客户端封装的核心思路
#

封装的关键是维护一个 pending 表:

  • pending[id] = { frame, resolve, reject, timer, tries }

当你调用 sendWithAck(data)

  1. 生成 id
  2. 组装 frame:{t:"msg", id, data}
  3. 写入 pending
  4. 发送 frame
  5. 启动 ACK 超时定时器:超时 → 重发或失败

收到服务端 ack

  1. pending 里找到对应条目
  2. 清理超时定时器
  3. resolve Promise

连接断开时:

  • 按指数退避安排重连(带 jitter)
  • 重连成功后(可选)重发未 ACK 的 pending 消息

4. 完整代码(ES6,可直接用)
#

这份封装实现了:ACK、ACK 超时重试、指数退避自动重连、断线重发、可选心跳。

  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
class AckWebSocket {
  /**
   * @param {string} url WebSocket 地址
   * @param {{
   *  protocols?: string | string[],           // 子协议
   *  ackTimeoutMs?: number,                   // 等待 ACK 超时时间
   *  maxSendRetries?: number,                 // ACK 超时后最多重发次数
   *  resendOnReconnect?: boolean,             // 重连成功后是否重发未 ACK 的消息
   *  reconnect?: {
   *    enabled?: boolean,                     // 是否开启自动重连
   *    baseDelayMs?: number,                  // 重连基础延迟(指数退避起点)
   *    maxDelayMs?: number,                   // 重连最大延迟上限
   *    jitter?: number,                       // 抖动比例(0~1),避免惊群
   *    maxRetries?: number,                   // 最大重连次数(默认 Infinity)
   *  },
   *  heartbeat?: {
   *    enabled?: boolean,                     // 是否开启心跳
   *    intervalMs?: number,                   // 心跳间隔
   *    timeoutMs?: number,                    // 等待 pong 超时
   *  },
   *  makeId?: () => string                    // 自定义消息 id 生成器
   * }} options
   */
  constructor(url, options = {}) {
    this.url = url;
    this.protocols = options.protocols;

    // ====== ACK 相关配置 ======
    this.ackTimeoutMs = options.ackTimeoutMs ?? 8000;      // 等待 ACK 的超时时间
    this.maxSendRetries = options.maxSendRetries ?? 1;     // ACK 超时后重发次数
    this.resendOnReconnect = options.resendOnReconnect ?? true; // 重连后是否重发 pending

    // ====== 自动重连:指数退避 + 抖动 ======
    this.reconnect = {
      enabled: options.reconnect?.enabled ?? true,
      baseDelayMs: options.reconnect?.baseDelayMs ?? 500,
      maxDelayMs: options.reconnect?.maxDelayMs ?? 15000,
      jitter: options.reconnect?.jitter ?? 0.2,            // 0~1,建议 0.2~0.4
      maxRetries: options.reconnect?.maxRetries ?? Infinity,
    };

    // ====== 心跳(可选) ======
    this.heartbeat = {
      enabled: options.heartbeat?.enabled ?? false,
      intervalMs: options.heartbeat?.intervalMs ?? 15000,
      timeoutMs: options.heartbeat?.timeoutMs ?? 8000,
    };

    // ====== 消息 id 生成器 ======
    this.makeId =
      options.makeId ??
      (() =>
        (typeof crypto !== "undefined" && crypto.randomUUID)
          ? crypto.randomUUID()
          : `${Date.now()}-${Math.random().toString(16).slice(2)}`);

    // 当前 WebSocket 实例
    this.ws = null;

    // 是否为手动 close(手动关闭则不再重连)
    this.manualClose = false;

    // pending:保存“已发送但未收到 ACK”的消息
    // id -> { frame, resolve, reject, timer, tries }
    this.pending = new Map();

    // 重连状态
    this.retryCount = 0;
    this.reconnectTimer = null;

    // 心跳状态
    this.hbInterval = null; // 心跳定时器
    this.hbTimeout = null;  // 等待 pong 超时定时器

    // ====== 外部可挂载的回调 ======
    this.onOpen = null;
    this.onClose = null;
    this.onError = null;
    this.onPush = null;     // 处理 {t:"push", data} 的推送
    this.onMessage = null;  // 处理除 ack/pong 外的所有 JSON 消息
  }

  /** 建立连接 */
  connect() {
    this.manualClose = false;
    this._openSocket();
  }

  /**
   * 手动关闭连接
   * 注意:这里默认把 pending 全部 reject,你也可以改成保留并在下次 connect 后继续重发
   */
  close(code = 1000, reason = "manual close") {
    this.manualClose = true;
    this._clearReconnect();
    this._stopHeartbeat();

    // 关闭时把所有未 ACK 的消息标记失败
    for (const [id, p] of this.pending) {
      clearTimeout(p.timer);
      p.reject(new Error(`WebSocket closed before ack (id=${id})`));
      this.pending.delete(id);
    }

    // 关闭底层 socket
    if (this.ws && (this.ws.readyState === WebSocket.OPEN || this.ws.readyState === WebSocket.CONNECTING)) {
      this.ws.close(code, reason);
    }
  }

  /** 当前是否已连接(OPEN) */
  isOpen() {
    return this.ws?.readyState === WebSocket.OPEN;
  }

  /**
   * 发送一条“需要 ACK”的消息
   * @param {any} data 业务数据
   * @returns {Promise<{id: string}>} 收到 ACK 后 resolve
   */
  sendWithAck(data) {
    const id = this.makeId();
    const frame = { t: "msg", id, data };

    return new Promise((resolve, reject) => {
      // 放入 pending,等待 ACK
      this.pending.set(id, { frame, resolve, reject, timer: null, tries: 0 });
      // 尝试发送
      this._trySend(id);
    });
  }

  /**
   * 发送一条“不需要 ACK”的消息(例如日志、统计、通知)
   * @param {any} obj
   */
  sendRaw(obj) {
    this._sendFrame(obj);
  }

  // ===================== 内部方法 =====================

  /** 创建 WebSocket 并绑定事件 */
  _openSocket() {
    this._clearReconnect();

    try {
      this.ws = this.protocols
        ? new WebSocket(this.url, this.protocols)
        : new WebSocket(this.url);
    } catch (e) {
      // 创建失败也走重连
      this._scheduleReconnect(e);
      return;
    }

    // 连接成功
    this.ws.onopen = () => {
      // 成功后重连计数清零
      this.retryCount = 0;

      // 开启心跳
      if (this.heartbeat.enabled) this._startHeartbeat();

      // 重连成功后:重发所有 pending(未 ACK)消息
      if (this.resendOnReconnect && this.pending.size > 0) {
        for (const id of this.pending.keys()) this._trySend(id, true);
      }

      this.onOpen?.();
    };

    // 收到消息
    this.ws.onmessage = (evt) => {
      let msg = evt.data;

      // 尝试解析 JSON
      try {
        msg = typeof msg === "string" ? JSON.parse(msg) : msg;
      } catch {
        // 非 JSON 消息交给上层(如果需要)
        this.onMessage?.(evt.data);
        return;
      }

      // ====== 心跳 pong ======
      if (msg?.t === "pong") {
        // 收到 pong,清除等待 pong 的超时
        this._clearHeartbeatTimeout();
        return;
      }

      // ====== ACK ======
      if (msg?.t === "ack" && msg.id) {
        const p = this.pending.get(msg.id);
        if (p) {
          clearTimeout(p.timer);
          this.pending.delete(msg.id);
          // resolve 告诉业务侧:已确认
          p.resolve({ id: msg.id });
        }
        return;
      }

      // ====== 其它消息 ======
      this.onMessage?.(msg);
      // 如果约定了 push 类型,可直接回调
      if (msg?.t === "push") this.onPush?.(msg.data);
    };

    // 错误事件(多数浏览器信息有限)
    this.ws.onerror = (err) => {
      this.onError?.(err);
    };

    // 连接关闭
    this.ws.onclose = (evt) => {
      this._stopHeartbeat();
      this.onClose?.(evt);

      // 非手动关闭才自动重连
      if (!this.manualClose && this.reconnect.enabled) {
        this._scheduleReconnect(evt);
      }
    };
  }

  /**
   * 实际发送(底层 send)
   * @returns {boolean} 是否发送成功(仅代表 OPEN 并已调用 send)
   */
  _sendFrame(obj) {
    if (!this.isOpen()) return false;
    this.ws.send(typeof obj === "string" ? obj : JSON.stringify(obj));
    return true;
  }

  /**
   * 尝试发送某条 pending 消息,并设置 ACK 超时重发
   * @param {string} id
   * @param {boolean} force 是否强制(重连成功后调用)
   */
  _trySend(id, force = false) {
    const p = this.pending.get(id);
    if (!p) return;

    // 未连接:非 force 则等重连;force 也发不了就继续等
    if (!this.isOpen()) {
      if (!force) return;
      return;
    }

    // 超过最大重试次数:认为失败
    if (p.tries > this.maxSendRetries) {
      this.pending.delete(id);
      p.reject(new Error(`ACK timeout retries exceeded (id=${id})`));
      return;
    }

    // 记录本次尝试次数
    p.tries += 1;

    // 发送消息
    this._sendFrame(p.frame);

    // 设置 ACK 超时:超时则重发
    clearTimeout(p.timer);
    p.timer = setTimeout(() => {
      this._trySend(id);
    }, this.ackTimeoutMs);
  }

  /**
   * 安排一次自动重连(指数退避 + 抖动)
   * delay = min(baseDelay * 2^n, maxDelay) * (1 ± jitter)
   */
  _scheduleReconnect(_reason) {
    if (this.retryCount >= this.reconnect.maxRetries) return;

    const n = this.retryCount++;
    const base = this.reconnect.baseDelayMs * Math.pow(2, n);
    const capped = Math.min(base, this.reconnect.maxDelayMs);

    // 抖动:让延迟随机波动,避免大量客户端同一时刻重连(惊群)
    const j = this.reconnect.jitter;
    const rand = (Math.random() * 2 - 1) * j; // [-j, +j]
    const delay = Math.max(0, Math.floor(capped * (1 + rand)));

    this._clearReconnect();
    this.reconnectTimer = setTimeout(() => this._openSocket(), delay);
  }

  /** 清除重连定时器 */
  _clearReconnect() {
    if (this.reconnectTimer) {
      clearTimeout(this.reconnectTimer);
      this.reconnectTimer = null;
    }
  }

  // ===================== 心跳 =====================

  /** 启动心跳:定时 ping,超时未 pong 则主动 close 触发重连 */
  _startHeartbeat() {
    this._stopHeartbeat();

    this.hbInterval = setInterval(() => {
      if (!this.isOpen()) return;

      // 发送 ping
      this._sendFrame({ t: "ping", ts: Date.now() });

      // 等待 pong,超时则主动关闭,让 onclose 走重连
      this._clearHeartbeatTimeout();
      this.hbTimeout = setTimeout(() => {
        try {
          this.ws?.close(4000, "heartbeat timeout");
        } catch {}
      }, this.heartbeat.timeoutMs);
    }, this.heartbeat.intervalMs);
  }

  /** 清除等待 pong 的超时定时器 */
  _clearHeartbeatTimeout() {
    if (this.hbTimeout) {
      clearTimeout(this.hbTimeout);
      this.hbTimeout = null;
    }
  }

  /** 停止心跳 */
  _stopHeartbeat() {
    if (this.hbInterval) {
      clearInterval(this.hbInterval);
      this.hbInterval = null;
    }
    this._clearHeartbeatTimeout();
  }
}

5. 怎么用(业务侧最小示例)
#

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
const ws = new AckWebSocket("wss://example.com/ws", {
  ackTimeoutMs: 5000,
  maxSendRetries: 2,
  resendOnReconnect: true,
  reconnect: { baseDelayMs: 300, maxDelayMs: 10000, jitter: 0.3 },
  heartbeat: { enabled: true, intervalMs: 15000, timeoutMs: 6000 },
});

ws.onOpen = () => console.log("connected");
ws.onClose = (e) => console.log("closed:", e.code, e.reason);
ws.onPush = (data) => console.log("push:", data);

ws.connect();

async function sendChat(text) {
  try {
    const { id } = await ws.sendWithAck({ type: "chat", text });
    console.log("acked:", id);
  } catch (e) {
    console.error("send failed:", e);
  }
}

6. 服务端必须配合的点(非常关键)
#

6.1 收到 msg 后回 ACK
#

最简单的伪代码:

1
2
3
4
if (frame.t === "msg") {
  // 处理你的业务...
  send({ t: "ack", id: frame.id })
}

6.2 处理“重发导致重复”的问题:幂等/去重
#

因为客户端会重试、重连后也可能重发,同一个 id 可能多次出现。服务端至少要做到:

  • 如果 id 已处理过:直接回 ACK,不要重复执行副作用
  • 如果没处理过:执行一次,然后记录 id

实现上可用:

  • 内存 Map(简单,但重启会丢)
  • Redis set/hash(更稳)
  • 数据库唯一键(最强,但成本更高)

7. 调参建议(经验值)
#

  • ackTimeoutMs:5~10 秒(如果 ACK 表示“处理成功”,要看业务耗时)

  • maxSendRetries:1~3 次(避免无限重试)

  • 重连:

    • baseDelayMs:300~800ms
    • maxDelayMs:10~30s
    • jitter:0.2~0.4(强烈建议保留)
  • 心跳:

    • intervalMs:10~20s(移动端可适当放大)
    • timeoutMs:5~10s

8. 常见坑清单(上线前检查)
#

  1. ACK 语义:是“已收到”还是“已处理成功”?(决定 timeout 与用户体验)
  2. 服务端去重:没有去重,重试会造成重复副作用
  3. 乱序问题:重连后重发 pending 可能与新消息交错;若要求严格顺序,需要串行发送或加序列号
  4. 后台定时器被限频:移动端/后台页 setTimeout 精度会变差,属正常现象,去重能兜底

结语
#

WebSocket 本质上只是一个高效的长连接“通道”,它解决的是通信方式问题,却不直接保证“业务消息可靠送达”。想把它从“能跑起来”变成“线上稳得住”,就需要像《计算机网络》里那样把可靠性机制补到应用层:用 ACK 让交付可确认,用 超时重试 对抗偶发丢包与断链,用 指数退避 + jitter 控制重连节奏、避免惊群。把这些机制组合起来,你得到的不只是一个 WebSocket 连接,而是一套更接近“可靠传输”的工程化实时通信能力。 当然,这套实现更多是我最近学习《计算机网络》后的实践总结,仍然有很多取舍与边界(比如 ACK 语义、幂等去重、消息顺序、持久化策略等)值得打磨。如果你在实际项目里有更成熟的方案或踩坑经验,欢迎指正交流。