为什么需要 cloud_ack
PipeMonitor 的上行链路是 STM32 → DR154 (4G) → MQTT Broker → pipe-monitor-api → MySQL。中间任何一段抖动都会导致设备不知道云端到底有没有把这一帧 tele / alarm 真正写进库。
之前的实现里,mqtt-client 收到消息后是 “落库 promise 异步丢出去,立刻继续推 WebSocket”。这条路径在网络好的时候没问题,但只要 MySQL 偶发抖动一下,前端 App 已经看到了实时数据,库里却没有这条历史——后面去 /api/history 拉曲线就会出现空洞。
所以这一轮把语义改成了 “写库成功,云端才认账”:
tele/alarm 上行 → state.ingest → dbStore.record (await) → live.publish → cloud_ack 下行
任何一步失败都不发 cloud_ack。设备端 DR154 在超时收不到确认时按既有补传策略重发即可。
cloud_ack 帧形态
云端发回设备的确认帧固定走 device/{dev}/down:
{"t":"cloud_ack","ack_seq":3,"dev":"FM001","result":"ok"}
几个边界要点:
ack_seq必须能从上行tele/alarm的seq字段里Number.isFinite解出来;解不出来就跳过cloud_ack,避免给设备发歧义帧- 设备号通过白名单形态校验(不能含
#、+、/),防止被 payload 注入篡改下行 topic cloud_ack只走 MQTT 下行通道,不进入/ws/live推给 App——App 只消费实时tele/alarm/ 命令ack,混进来反而会误导前端
实现位置:server/services/pipe-monitor-api/src/mqtt-client.js 的 publishCloudAck / buildDownTopic 两个小函数。
下行 topic 模板配置
为了不把 device/{dev}/down 这个字符串硬编码到代码里,加了一个环境变量:
MQTT_DOWN_TOPIC_TEMPLATE=device/{dev}/down
默认值就是 device/{dev}/down,{dev} 会在运行时替换成上行帧里的设备号。后续如果接入了别的网关需要不同 topic 形态,只改 .env 不动代码。
历史表幂等:UNIQUE + ON DUPLICATE KEY
cloud_ack 解决了正常路径的可观测性,但还有一类必须正面接住的情况:DR154 因超时或断网做了补传,同一条 (device, seq) 在云端到达多次。
如果不加约束,measurements / alarms 历史表里会出现重复点,前端历史曲线就会画出锯齿。这一轮在 db.js 启动时做了两件事:
-- 先把历史脏数据按 (device_id, seq) 收敛,保留 id 最小的那条
DELETE dup
FROM measurements dup
JOIN measurements keep
ON dup.device_id = keep.device_id
AND dup.seq = keep.seq
AND dup.id > keep.id
WHERE dup.seq IS NOT NULL;
-- 然后补上唯一索引
ALTER TABLE measurements
ADD UNIQUE KEY uniq_measurements_device_seq (device_id, seq);
alarms 表同样处理。索引名分别是 uniq_measurements_device_seq 和 uniq_alarms_device_seq,启动时检查 INFORMATION_SCHEMA.STATISTICS,已经存在就跳过。
写入侧顺手把 INSERT 改成 INSERT … ON DUPLICATE KEY UPDATE:
INSERT INTO measurements
(device_id, topic, payload_ts, seq, flow, total_value, velocity, pressure,
temperature_json, valid_mask, payload, received_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
ON DUPLICATE KEY UPDATE
topic = VALUES(topic),
payload_ts = VALUES(payload_ts),
flow = VALUES(flow),
...
received_at = VALUES(received_at);
这样补传到达时不会报 1062,也不会插出新行;最后一次到达的内容会覆盖前面,历史曲线对应位置只剩一个点。ON DUPLICATE KEY 这条路径同样会触发 cloud_ack 下行,设备就能放心结束这一轮重试。
heart_count:上行帧加一个心跳计数
光靠 seq 还不够区分“真没采到” vs “采到了但没上来”。这一轮 tele 负载里加了一个字段:
{
"t": "tele",
"ts": 1776937335,
"seq": 3,
"dev": "FM001",
"flow": 0,
"total": 0,
"v": 0,
"pres": 0,
"temp": [23.1, null, null, null, null, 23.7, 23.8],
"heart_count": 10,
"valid": 63
}
heart_count 是 STM32 每轮采集轮询时单调递增的心跳计数,跟 seq 是两个维度:
seq:上行帧的序号,决定历史表里的去重位置heart_count:单片机内部的采集轮次,反映底层任务在没在跑
云端不需要为它单独建列——mapMeasurementRow 直接从 payload JSON 里取出来透传给前端:
function mapMeasurementRow(row) {
const payload = parseJson(row.payload);
return {
device: row.device_id,
...
temp: parseJson(row.temperature_json),
heart_count: payload?.heart_count ?? null,
valid: row.valid_mask,
payload,
receivedAt: toIsoString(row.received_at),
};
}
注意 temp 数组也从 6 路扩到 7 路:temp[0] 是温压一体传感器的温度(App 显示 T0),temp[6] 是新增的第 7 路,这块和上一篇 LCD 渲染 那边的测点表是一致的。
失败计数:让问题可观测
createRuntimeState 里给 stats 加了三个字段:
cloudAckSent: 0,
cloudAckFailed: 0,
lastCloudAckAt: null,
任何一条 tele / alarm 该回 ack 而没回——数据库被禁用、seq 缺失、设备号非法、MQTT publish 报错——都会进 cloudAckFailed。后续 /api/health 把这几个字段一起暴露出去,DR154 端的补传压力大不大、有没有云端漏 ack 一目了然。
验收清单
- 设备发
tele后,能在 MQTT 抓到device/FM001/down上的cloud_ack MySQL下线时cloud_ack不会被发出,cloudAckFailed计数 +1- DR154 故意补传同一个
seq,measurements表里只有一行,cloud_ack仍然回得上 - 前端
/api/history拉到的点里heart_count字段非空,且与采集端单调对齐 INFORMATION_SCHEMA.STATISTICS里能看到uniq_measurements_device_seq/uniq_alarms_device_seq