整体架构

Mill 系统的服务端由三个核心组件组成,形成完整的数据链路:

STM32 设备 → G780s (4G DTU)


         Mosquitto Broker (mqtt.varka.cn:1883)


         Node.js API Server (mill-api.varka.cn)
           ┌──┴──┐
           ▼     ▼
         MySQL  WebSocket
           │     │
           ▼     ▼
         REST API  实时广播
           │     │
           └──┬──┘

         Flutter 客户端

数据流

  1. STM32 构建 JSON 遥测帧,通过 USART3 发送给 G780s
  2. G780s 作为透明桥将 JSON 发布到 device/FM002/up
  3. Node.js 服务订阅该主题,解析 JSON,持久化到 MySQL
  4. 服务发布 cloud_ackdevice/FM002/down
  5. 同时通过 WebSocket 广播给所有在线客户端

Mosquitto 配置

Mosquitto 作为 MQTT Broker 部署在 VPS 上。

关键配置项

配置说明
监听端口1883(明文),8883(TLS,规划中)
密码文件passwordfile — 用户认证
ACL 文件aclfile — 主题级别访问控制
LWT 遗嘱设备离线时自动通知

服务端通过环境变量配置 MQTT 连接:

// config.js
mqtt: {
  url: env.MQTT_URL || "mqtt://mqtt.varka.cn:1883",
  username: env.MQTT_USERNAME,
  password: env.MQTT_PASSWORD,
  upTopic: env.MQTT_UP_TOPIC || "device/FM002/up",
  downTopicTemplate: env.MQTT_DOWN_TOPIC_TEMPLATE || "device/{dev}/down",
}

多行合并处理

G780s 在某些情况下会将多条串口输出合并到一次 MQTT 发布中。服务端在收到消息后按换行符拆分,逐行解析:

// mqtt-client.js
const lines = message.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
  state.ingest(line, topic);
}

MQTT 订阅与数据解析

主题结构

主题方向用途
device/{dev}/up设备 → 服务上行遥测、ACK、OTA 状态
device/{dev}/down服务 → 设备下行命令、cloud_ack

帧路由

state.js 根据 t 字段路由帧:

t处理逻辑
tele存储最新遥测数据,持久化到 MySQL,广播 WebSocket
ack记录命令确认,持久化到 command_acks
ota更新 OTA 进度

遥测存储

收到 tele 帧后,服务端:

  1. 更新内存中的 latestTeleByDevice Map
  2. 写入 MySQL measurements 表(唯一约束 device_id + seq 防重复)
  3. 写入成功后发布 cloud_ack(写入失败则不发 ACK,设备 5 秒超时后重试)
  4. 广播到 WebSocket 客户端

数据持久化

MySQL 数据库

使用 mysql2/promise 连接池,默认 5 个连接,启动时自动建表。

measurements 表

CREATE TABLE measurements (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  device_id     VARCHAR(64) NOT NULL,
  topic         VARCHAR(255),
  payload_ts    INT,
  seq           INT,
  flow          DECIMAL(10,3),
  total_value   DECIMAL(12,3),
  weight        INT,
  relay_do      INT,
  relay_di      INT,
  heart_count   INT,
  valid_mask    INT,
  status_bits   INT,
  temperature_json JSON,
  payload       JSON,
  received_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  UNIQUE KEY uk_dev_seq (device_id, seq),
  INDEX idx_dev_time (device_id, received_at),
  INDEX idx_dev_pts (device_id, payload_ts)
);

users 表

CREATE TABLE users (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  username      VARCHAR(64) NOT NULL UNIQUE,
  password_hash VARCHAR(255) NOT NULL,
  display_name  VARCHAR(128),
  created_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
  updated_at    TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);

command_acks 表

CREATE TABLE command_acks (
  id            BIGINT AUTO_INCREMENT PRIMARY KEY,
  device_id     VARCHAR(64) NOT NULL,
  topic         VARCHAR(255),
  payload_ts    INT,
  seq           INT,
  cmd_seq       INT,
  cmd           VARCHAR(64),
  result        VARCHAR(32),
  relay_do      INT,
  relay_di      INT,
  payload       JSON,
  received_at   TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

查询特性

  • 游标分页:通过 before_id 参数实现,避免 OFFSET 在大数据量下的性能问题
  • 单页上限:5000 条
  • 时间戳:Unix 秒
  • 重试机制:连接失败时最多重试 30 次,每次间隔 2 秒

WebSocket 广播

连接

客户端连接到 /ws/live?token=<JWT>,通过 URL 参数传递 JWT Token 进行认证。

hello 帧

连接建立后,服务端立即推送初始状态:

{
  "event": "hello",
  "path": "/ws/live",
  "ts": "2026-05-06T12:00:00Z",
  "latest": [
    {
      "device_id": "FM002",
      "payload": { "t": "tele", "dev": "FM002", ... }
    }
  ],
  "stats": {
    "mqttConnected": true,
    "messagesTotal": 12345,
    "teleTotal": 10000,
    "ackTotal": 2000,
    "wsClients": 3
  }
}

message 帧

实时数据广播:

{
  "event": "message",
  "kind": "tele",
  "device": "FM002",
  "topic": "device/FM002/up",
  "payload": { "t": "tele", "dev": "FM002", ... },
  "receivedAt": "2026-05-06T12:00:05Z"
}

kind 值:tele(遥测)、ack(命令确认)、ota(升级进度)。

心跳

30 秒 ping/pong 心跳,超时未响应的客户端自动断开。

优雅关闭

服务关闭时主动关闭所有 WebSocket 连接。

用户认证

认证机制

  • 密码哈希:bcrypt,cost factor 10(可配置)
  • Token:JWT,可配置过期时间(默认 7 天)
  • 传递方式:REST 使用 Authorization: Bearer <token> 头,WebSocket 使用 ?token=<URL> 参数
  • 账号模型:固定 admin + user 账号,不开放注册

API 端点

端点方法认证说明
/healthGET健康检查 + MQTT 状态
/api/auth/loginPOST用户名密码登录
/api/latest?dev=GET设备最新遥测
/api/statusGETMQTT 连接状态、WS 客户端数、消息统计
/api/historyGET历史数据查询(游标分页)
/api/alarmsGET告警查询
/api/commands/relay-setPOST继电器位图控制
/api/commands/upload-periodPOST设置遥测上传周期(2/10/30/60 秒)
/api/commands/ota-preparePOST触发 OTA 准备
/api/firmware/uploadPOST上传固件文件(512MB 限制,.bin/.hex)
/api/ota/startPOST启动远程 OTA 会话
/api/ota/:sessionId/statusGET查询 OTA 进度
/api/ota/abortPOST中止 OTA 会话

安全特性

  • JWT 密钥STM32_MILL_JWT_SECRET 环境变量,长度 ≥ 32 字符
  • 速率限制/api/commands 每 IP 每分钟 30 次
  • 设备冷却:同一设备命令间隔 ≥ 500ms
  • OTA 锁:OTA 升级期间阻止设备命令
  • cmd_seq 计数器:进程内单调递增整数(不用 Date.now(),因为 uint32 溢出)