上行链路总览

PipeMonitor 通过 DR154 4G DTU 模块连接云端 MQTT Broker。整条链路是:

STM32 USART3 (115200bps) → DR154 UART → MQTT Publish → 云端

DR154 工作在透传模式:STM32 往串口丢什么,DR154 就原样 publish 到 MQTT topic device/FM001/up。云端下发的消息则从 device/FM001/down subscribe 回来,通过同一个串口送达 STM32。

上行帧格式

所有上行帧都是 JSON,以 \n 结尾,典型大小 ~200 字节:

遥测帧(每 10 秒一帧):

{"t":"tele","ts":1712345678,"seq":123,"dev":"FM001",
 "flow":12.300,"total":1234.500,"v":0.850,"pres":0.520,
 "temp":[23.1,23.2,22.9,23.0,22.8,23.1,23.4],
 "heart_count":10,"valid":63}

报警帧(事件触发即时上报):

{"t":"alarm","ts":1712345678,"seq":124,"dev":"FM001",
 "code":"OVER_FLOW","val":150.000,"severity":"warn"}

应答帧(下行命令执行后回复):

{"t":"ack","ts":1712345678,"seq":125,"dev":"FM001",
 "cmd_seq":42,"cmd":"reboot","result":"ok"}

手写 JSON 编码器

为什么不拉 cJSON?两个原因:

  1. 体积:cJSON 的 cJSON_Print 会动态分配内存,在 STM32 上 malloc 碎片化风险高
  2. 浮点:newlib-nano 的 printf 默认不支持 %f,cJSON 依赖它

所以直接用 rt_snprintf 手拼字符串。浮点数用定点格式输出:

/* 把 float 按 3 位小数输出,避免 %f 依赖 */
static int fmt_fixed3(char *buf, int buf_sz, rt_uint8_t valid, float value)
{
    if (!valid) return rt_snprintf(buf, buf_sz, "null");

    value += 0.0005f;  /* 四舍五入 */
    long whole = (long)value;
    long frac  = (long)((value - (float)whole) * 1000.0f);

    return rt_snprintf(buf, buf_sz, "%ld.%03ld", whole, frac);
}

PT100 原始值是 *10 ℃,格式化时除以 10 输出一位小数:

static int fmt_pt100(char *buf, int buf_sz, int16_t raw)
{
    if (raw == INT16_MIN) return rt_snprintf(buf, buf_sz, "null");
    int whole = raw / 10;
    int frac  = raw % 10;
    return rt_snprintf(buf, buf_sz, "%d.%d", whole, frac);
}

Valid 位图

遥测帧里的 valid 字段是一个位图,告诉云端哪些传感器数据是有效的:

static uint32_t build_valid_bitmap(const app_measurement_t *m)
{
    uint32_t v = 0U;
    if (m->flow_valid)     v |= (1U << 0);
    if (m->total_valid)    v |= (1U << 1);
    if (m->velocity_valid) v |= (1U << 2);
    if (m->pres_valid)     v |= (1U << 3);
    if (m->temp_valid)     v |= (1U << 4);
    /* bit5: 任一 PT100 通道有效 */
    ...
    return v;
}

Flutter 端解析时按位判断,无效通道显示为 -- 而不是误导性的 0。

云端 ACK 机制

串口写成功只代表字节交给了 DR154,不代表云端收到了。为此引入了 seq 序号 + 云端 ACK 机制:

  1. STM32 发送帧时携带递增的 seq
  2. 云端收到后回复 {"t":"cloud_ack","ack_seq":123,"result":"ok"}
  3. STM32 等待 8 秒,收到匹配的 ACK 才算成功
static uplink_ack_wait_result_t wait_cloud_ack(uint32_t seq, rt_uint32_t timeout_ms)
{
    rt_tick_t deadline = rt_tick_get() + rt_tick_from_millisecond(timeout_ms);

    while ((rt_int32_t)(deadline - rt_tick_get()) > 0)
    {
        char line[256];
        int got = uplink_port_read_line(line, sizeof(line), 100);
        if (got > 0)
        {
            uplink_ack_wait_result_t result = handle_downlink_line_for_ack(line, seq);
            if (result != UPLINK_ACK_NONE) return result;
        }
    }
    return UPLINK_ACK_NONE;  /* 超时 */
}

等待期间不阻塞下行命令处理——reboot 等控制指令仍会被及时响应。

离线缓存与断点补传

当云端 ACK 超时或串口写入失败时,帧会被追加到 LittleFS 上的 pending.jsonl

static void cache_frame_on_failure(const char *frame)
{
    if (storage_pending_append(frame) == RT_EOK)
        g_stat.pending_saved++;
}

一旦网络恢复(收到云端 ACK),上行线程每轮循环补传一条缓存帧:

static void replay_pending_if_online(void)
{
    if (g_cloud_online == RT_FALSE) return;

    for (rt_uint8_t i = 0; i < UPLINK_REPLAY_ONCE_PER_LOOP; i++)
    {
        int result = storage_pending_replay_one(replay_pending_frame_cb, RT_NULL);
        if (result <= 0) break;
    }
}

补传采用”读一条、发一条、删一条”的策略,用 LittleFS 的 rename 保证断电安全。如果补传再次失败,帧留在缓存里等下次机会。

上行事件循环

整个上行服务在一个线程里跑,状态机很简单:

for (;;)
{
    rt_uint32_t rx = 0U;
    rt_err_t err = app_event_wait_any(
        APP_EVENT_MEAS_UPDATED | APP_EVENT_ALARM_TRIGGERED,
        100ms, &rx);

    if (rx & APP_EVENT_MEAS_UPDATED)
    {
        meas_count++;
        if (meas_count >= 5)  /* 5 * 2s = 10s */
        {
            meas_count = 0;
            send_tele_frame();
        }
    }
    if (rx & APP_EVENT_ALARM_TRIGGERED)
        drain_alarms();

    poll_downlink_nonblocking();
    replay_pending_if_online();
}

遥测每 10 秒一帧(5 次采样事件触发一次),报警即时上报,下行命令非阻塞轮询。

msh 调试命令

串口控制台提供几个调试命令:

  • uplink_stat — 查看发送/接收/缓存统计
  • uplink_dump — 打印最近一帧原始 JSON
  • uplink_test_alarm — 注入一条测试报警
  • storage_pending — 查看/清空离线缓存

这些命令在现场联调时非常有用,可以快速确认链路状态。

小结

上行服务的设计目标是 可靠送达:云端 ACK 确认、离线缓存、自动补传,三层保障。代价是逻辑稍微复杂了一些,但对工业场景来说,丢一条报警数据的后果远比多几行代码严重。

后续阅读