上行链路总览
PipeMonitor 通过 DR154 4G DTU 模块连接云端 MQTT Broker。整条链路是:
STM32 USART3 (115200bps) → DR154 UART → MQTT Publish → 云端
DR154 工作在透传模式:STM32 往串口丢什么,DR154 就原样 publish 到 MQTT topic device/FM001/up。云端下发的消息则从 device/FM001/down subscribe 回来,通过同一个串口送达 STM32。
上行帧格式
所有上行帧都是 JSON,以 \n 结尾,典型大小 ~200 字节:
遥测帧(每 10 秒一帧):
{"t":"tele","ts":1712345678,"seq":123,"dev":"FM001",
"flow":12.300,"total":1234.500,"v":0.850,"pres":0.520,
"temp":[23.1,23.2,22.9,23.0,22.8,23.1,23.4],
"heart_count":10,"valid":63}
报警帧(事件触发即时上报):
{"t":"alarm","ts":1712345678,"seq":124,"dev":"FM001",
"code":"OVER_FLOW","val":150.000,"severity":"warn"}
应答帧(下行命令执行后回复):
{"t":"ack","ts":1712345678,"seq":125,"dev":"FM001",
"cmd_seq":42,"cmd":"reboot","result":"ok"}
手写 JSON 编码器
为什么不拉 cJSON?两个原因:
- 体积:cJSON 的
cJSON_Print会动态分配内存,在 STM32 上 malloc 碎片化风险高 - 浮点:newlib-nano 的
printf默认不支持%f,cJSON 依赖它
所以直接用 rt_snprintf 手拼字符串。浮点数用定点格式输出:
/* 把 float 按 3 位小数输出,避免 %f 依赖 */
static int fmt_fixed3(char *buf, int buf_sz, rt_uint8_t valid, float value)
{
if (!valid) return rt_snprintf(buf, buf_sz, "null");
value += 0.0005f; /* 四舍五入 */
long whole = (long)value;
long frac = (long)((value - (float)whole) * 1000.0f);
return rt_snprintf(buf, buf_sz, "%ld.%03ld", whole, frac);
}
PT100 原始值是 *10 ℃,格式化时除以 10 输出一位小数:
static int fmt_pt100(char *buf, int buf_sz, int16_t raw)
{
if (raw == INT16_MIN) return rt_snprintf(buf, buf_sz, "null");
int whole = raw / 10;
int frac = raw % 10;
return rt_snprintf(buf, buf_sz, "%d.%d", whole, frac);
}
Valid 位图
遥测帧里的 valid 字段是一个位图,告诉云端哪些传感器数据是有效的:
static uint32_t build_valid_bitmap(const app_measurement_t *m)
{
uint32_t v = 0U;
if (m->flow_valid) v |= (1U << 0);
if (m->total_valid) v |= (1U << 1);
if (m->velocity_valid) v |= (1U << 2);
if (m->pres_valid) v |= (1U << 3);
if (m->temp_valid) v |= (1U << 4);
/* bit5: 任一 PT100 通道有效 */
...
return v;
}
Flutter 端解析时按位判断,无效通道显示为 -- 而不是误导性的 0。
云端 ACK 机制
串口写成功只代表字节交给了 DR154,不代表云端收到了。为此引入了 seq 序号 + 云端 ACK 机制:
- STM32 发送帧时携带递增的
seq - 云端收到后回复
{"t":"cloud_ack","ack_seq":123,"result":"ok"} - STM32 等待 8 秒,收到匹配的 ACK 才算成功
static uplink_ack_wait_result_t wait_cloud_ack(uint32_t seq, rt_uint32_t timeout_ms)
{
rt_tick_t deadline = rt_tick_get() + rt_tick_from_millisecond(timeout_ms);
while ((rt_int32_t)(deadline - rt_tick_get()) > 0)
{
char line[256];
int got = uplink_port_read_line(line, sizeof(line), 100);
if (got > 0)
{
uplink_ack_wait_result_t result = handle_downlink_line_for_ack(line, seq);
if (result != UPLINK_ACK_NONE) return result;
}
}
return UPLINK_ACK_NONE; /* 超时 */
}
等待期间不阻塞下行命令处理——reboot 等控制指令仍会被及时响应。
离线缓存与断点补传
当云端 ACK 超时或串口写入失败时,帧会被追加到 LittleFS 上的 pending.jsonl:
static void cache_frame_on_failure(const char *frame)
{
if (storage_pending_append(frame) == RT_EOK)
g_stat.pending_saved++;
}
一旦网络恢复(收到云端 ACK),上行线程每轮循环补传一条缓存帧:
static void replay_pending_if_online(void)
{
if (g_cloud_online == RT_FALSE) return;
for (rt_uint8_t i = 0; i < UPLINK_REPLAY_ONCE_PER_LOOP; i++)
{
int result = storage_pending_replay_one(replay_pending_frame_cb, RT_NULL);
if (result <= 0) break;
}
}
补传采用”读一条、发一条、删一条”的策略,用 LittleFS 的 rename 保证断电安全。如果补传再次失败,帧留在缓存里等下次机会。
上行事件循环
整个上行服务在一个线程里跑,状态机很简单:
for (;;)
{
rt_uint32_t rx = 0U;
rt_err_t err = app_event_wait_any(
APP_EVENT_MEAS_UPDATED | APP_EVENT_ALARM_TRIGGERED,
100ms, &rx);
if (rx & APP_EVENT_MEAS_UPDATED)
{
meas_count++;
if (meas_count >= 5) /* 5 * 2s = 10s */
{
meas_count = 0;
send_tele_frame();
}
}
if (rx & APP_EVENT_ALARM_TRIGGERED)
drain_alarms();
poll_downlink_nonblocking();
replay_pending_if_online();
}
遥测每 10 秒一帧(5 次采样事件触发一次),报警即时上报,下行命令非阻塞轮询。
msh 调试命令
串口控制台提供几个调试命令:
uplink_stat— 查看发送/接收/缓存统计uplink_dump— 打印最近一帧原始 JSONuplink_test_alarm— 注入一条测试报警storage_pending— 查看/清空离线缓存
这些命令在现场联调时非常有用,可以快速确认链路状态。
小结
上行服务的设计目标是 可靠送达:云端 ACK 确认、离线缓存、自动补传,三层保障。代价是逻辑稍微复杂了一些,但对工业场景来说,丢一条报警数据的后果远比多几行代码严重。