为什么需要 cloud_ack

PipeMonitor 的上行链路是 STM32 → DR154 (4G) → MQTT Broker → pipe-monitor-api → MySQL。中间任何一段抖动都会导致设备不知道云端到底有没有把这一帧 tele / alarm 真正写进库。

之前的实现里,mqtt-client 收到消息后是 “落库 promise 异步丢出去,立刻继续推 WebSocket”。这条路径在网络好的时候没问题,但只要 MySQL 偶发抖动一下,前端 App 已经看到了实时数据,库里却没有这条历史——后面去 /api/history 拉曲线就会出现空洞。

所以这一轮把语义改成了 “写库成功,云端才认账”

tele/alarm 上行 → state.ingest → dbStore.record (await) → live.publish → cloud_ack 下行

任何一步失败都不发 cloud_ack。设备端 DR154 在超时收不到确认时按既有补传策略重发即可。

cloud_ack 帧形态

云端发回设备的确认帧固定走 device/{dev}/down

{"t":"cloud_ack","ack_seq":3,"dev":"FM001","result":"ok"}

几个边界要点:

  • ack_seq 必须能从上行 tele / alarmseq 字段里 Number.isFinite 解出来;解不出来就跳过 cloud_ack,避免给设备发歧义帧
  • 设备号通过白名单形态校验(不能含 #+/),防止被 payload 注入篡改下行 topic
  • cloud_ack 只走 MQTT 下行通道,不进入 /ws/live 推给 App——App 只消费实时 tele / alarm / 命令 ack,混进来反而会误导前端

实现位置:server/services/pipe-monitor-api/src/mqtt-client.jspublishCloudAck / buildDownTopic 两个小函数。

下行 topic 模板配置

为了不把 device/{dev}/down 这个字符串硬编码到代码里,加了一个环境变量:

MQTT_DOWN_TOPIC_TEMPLATE=device/{dev}/down

默认值就是 device/{dev}/down{dev} 会在运行时替换成上行帧里的设备号。后续如果接入了别的网关需要不同 topic 形态,只改 .env 不动代码。

历史表幂等:UNIQUE + ON DUPLICATE KEY

cloud_ack 解决了正常路径的可观测性,但还有一类必须正面接住的情况:DR154 因超时或断网做了补传,同一条 (device, seq) 在云端到达多次。

如果不加约束,measurements / alarms 历史表里会出现重复点,前端历史曲线就会画出锯齿。这一轮在 db.js 启动时做了两件事:

-- 先把历史脏数据按 (device_id, seq) 收敛,保留 id 最小的那条
DELETE dup
FROM measurements dup
JOIN measurements keep
  ON dup.device_id = keep.device_id
 AND dup.seq = keep.seq
 AND dup.id > keep.id
WHERE dup.seq IS NOT NULL;

-- 然后补上唯一索引
ALTER TABLE measurements
  ADD UNIQUE KEY uniq_measurements_device_seq (device_id, seq);

alarms 表同样处理。索引名分别是 uniq_measurements_device_sequniq_alarms_device_seq,启动时检查 INFORMATION_SCHEMA.STATISTICS,已经存在就跳过。

写入侧顺手把 INSERT 改成 INSERT … ON DUPLICATE KEY UPDATE

INSERT INTO measurements
  (device_id, topic, payload_ts, seq, flow, total_value, velocity, pressure,
   temperature_json, valid_mask, payload, received_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
  topic = VALUES(topic),
  payload_ts = VALUES(payload_ts),
  flow = VALUES(flow),
  ...
  received_at = VALUES(received_at);

这样补传到达时不会报 1062,也不会插出新行;最后一次到达的内容会覆盖前面,历史曲线对应位置只剩一个点。ON DUPLICATE KEY 这条路径同样会触发 cloud_ack 下行,设备就能放心结束这一轮重试。

heart_count:上行帧加一个心跳计数

光靠 seq 还不够区分“真没采到” vs “采到了但没上来”。这一轮 tele 负载里加了一个字段:

{
  "t": "tele",
  "ts": 1776937335,
  "seq": 3,
  "dev": "FM001",
  "flow": 0,
  "total": 0,
  "v": 0,
  "pres": 0,
  "temp": [23.1, null, null, null, null, 23.7, 23.8],
  "heart_count": 10,
  "valid": 63
}

heart_count 是 STM32 每轮采集轮询时单调递增的心跳计数,跟 seq 是两个维度:

  • seq:上行帧的序号,决定历史表里的去重位置
  • heart_count:单片机内部的采集轮次,反映底层任务在没在跑

云端不需要为它单独建列——mapMeasurementRow 直接从 payload JSON 里取出来透传给前端:

function mapMeasurementRow(row) {
  const payload = parseJson(row.payload);
  return {
    device: row.device_id,
    ...
    temp: parseJson(row.temperature_json),
    heart_count: payload?.heart_count ?? null,
    valid: row.valid_mask,
    payload,
    receivedAt: toIsoString(row.received_at),
  };
}

注意 temp 数组也从 6 路扩到 7 路:temp[0] 是温压一体传感器的温度(App 显示 T0),temp[6] 是新增的第 7 路,这块和上一篇 LCD 渲染 那边的测点表是一致的。

失败计数:让问题可观测

createRuntimeState 里给 stats 加了三个字段:

cloudAckSent: 0,
cloudAckFailed: 0,
lastCloudAckAt: null,

任何一条 tele / alarm 该回 ack 而没回——数据库被禁用、seq 缺失、设备号非法、MQTT publish 报错——都会进 cloudAckFailed。后续 /api/health 把这几个字段一起暴露出去,DR154 端的补传压力大不大、有没有云端漏 ack 一目了然。

验收清单

  • 设备发 tele 后,能在 MQTT 抓到 device/FM001/down 上的 cloud_ack
  • MySQL 下线时 cloud_ack 不会被发出,cloudAckFailed 计数 +1
  • DR154 故意补传同一个 seqmeasurements 表里只有一行,cloud_ack 仍然回得上
  • 前端 /api/history 拉到的点里 heart_count 字段非空,且与采集端单调对齐
  • INFORMATION_SCHEMA.STATISTICS 里能看到 uniq_measurements_device_seq / uniq_alarms_device_seq

入口