整体架构
Mill 系统的服务端由三个核心组件组成,形成完整的数据链路:
STM32 设备 → G780s (4G DTU)
│
▼
Mosquitto Broker (mqtt.varka.cn:1883)
│
▼
Node.js API Server (mill-api.varka.cn)
┌──┴──┐
▼ ▼
MySQL WebSocket
│ │
▼ ▼
REST API 实时广播
│ │
└──┬──┘
▼
Flutter 客户端
数据流
- STM32 构建 JSON 遥测帧,通过 USART3 发送给 G780s
- G780s 作为透明桥将 JSON 发布到
device/FM002/up - Node.js 服务订阅该主题,解析 JSON,持久化到 MySQL
- 服务发布
cloud_ack到device/FM002/down - 同时通过 WebSocket 广播给所有在线客户端
Mosquitto 配置
Mosquitto 作为 MQTT Broker 部署在 VPS 上。
关键配置项
| 配置 | 说明 |
|---|---|
| 监听端口 | 1883(明文),8883(TLS,规划中) |
| 密码文件 | passwordfile — 用户认证 |
| ACL 文件 | aclfile — 主题级别访问控制 |
| LWT 遗嘱 | 设备离线时自动通知 |
服务端通过环境变量配置 MQTT 连接:
// config.js
mqtt: {
url: env.MQTT_URL || "mqtt://mqtt.varka.cn:1883",
username: env.MQTT_USERNAME,
password: env.MQTT_PASSWORD,
upTopic: env.MQTT_UP_TOPIC || "device/FM002/up",
downTopicTemplate: env.MQTT_DOWN_TOPIC_TEMPLATE || "device/{dev}/down",
}
多行合并处理
G780s 在某些情况下会将多条串口输出合并到一次 MQTT 发布中。服务端在收到消息后按换行符拆分,逐行解析:
// mqtt-client.js
const lines = message.toString().split('\n').filter(l => l.trim());
for (const line of lines) {
state.ingest(line, topic);
}
MQTT 订阅与数据解析
主题结构
| 主题 | 方向 | 用途 |
|---|---|---|
device/{dev}/up | 设备 → 服务 | 上行遥测、ACK、OTA 状态 |
device/{dev}/down | 服务 → 设备 | 下行命令、cloud_ack |
帧路由
state.js 根据 t 字段路由帧:
t 值 | 处理逻辑 |
|---|---|
tele | 存储最新遥测数据,持久化到 MySQL,广播 WebSocket |
ack | 记录命令确认,持久化到 command_acks 表 |
ota | 更新 OTA 进度 |
遥测存储
收到 tele 帧后,服务端:
- 更新内存中的
latestTeleByDeviceMap - 写入 MySQL
measurements表(唯一约束device_id + seq防重复) - 写入成功后发布
cloud_ack(写入失败则不发 ACK,设备 5 秒超时后重试) - 广播到 WebSocket 客户端
数据持久化
MySQL 数据库
使用 mysql2/promise 连接池,默认 5 个连接,启动时自动建表。
measurements 表
CREATE TABLE measurements (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(64) NOT NULL,
topic VARCHAR(255),
payload_ts INT,
seq INT,
flow DECIMAL(10,3),
total_value DECIMAL(12,3),
weight INT,
relay_do INT,
relay_di INT,
heart_count INT,
valid_mask INT,
status_bits INT,
temperature_json JSON,
payload JSON,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE KEY uk_dev_seq (device_id, seq),
INDEX idx_dev_time (device_id, received_at),
INDEX idx_dev_pts (device_id, payload_ts)
);
users 表
CREATE TABLE users (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
username VARCHAR(64) NOT NULL UNIQUE,
password_hash VARCHAR(255) NOT NULL,
display_name VARCHAR(128),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
command_acks 表
CREATE TABLE command_acks (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
device_id VARCHAR(64) NOT NULL,
topic VARCHAR(255),
payload_ts INT,
seq INT,
cmd_seq INT,
cmd VARCHAR(64),
result VARCHAR(32),
relay_do INT,
relay_di INT,
payload JSON,
received_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
查询特性
- 游标分页:通过
before_id参数实现,避免 OFFSET 在大数据量下的性能问题 - 单页上限:5000 条
- 时间戳:Unix 秒
- 重试机制:连接失败时最多重试 30 次,每次间隔 2 秒
WebSocket 广播
连接
客户端连接到 /ws/live?token=<JWT>,通过 URL 参数传递 JWT Token 进行认证。
hello 帧
连接建立后,服务端立即推送初始状态:
{
"event": "hello",
"path": "/ws/live",
"ts": "2026-05-06T12:00:00Z",
"latest": [
{
"device_id": "FM002",
"payload": { "t": "tele", "dev": "FM002", ... }
}
],
"stats": {
"mqttConnected": true,
"messagesTotal": 12345,
"teleTotal": 10000,
"ackTotal": 2000,
"wsClients": 3
}
}
message 帧
实时数据广播:
{
"event": "message",
"kind": "tele",
"device": "FM002",
"topic": "device/FM002/up",
"payload": { "t": "tele", "dev": "FM002", ... },
"receivedAt": "2026-05-06T12:00:05Z"
}
kind 值:tele(遥测)、ack(命令确认)、ota(升级进度)。
心跳
30 秒 ping/pong 心跳,超时未响应的客户端自动断开。
优雅关闭
服务关闭时主动关闭所有 WebSocket 连接。
用户认证
认证机制
- 密码哈希:bcrypt,cost factor 10(可配置)
- Token:JWT,可配置过期时间(默认 7 天)
- 传递方式:REST 使用
Authorization: Bearer <token>头,WebSocket 使用?token=<URL>参数 - 账号模型:固定 admin + user 账号,不开放注册
API 端点
| 端点 | 方法 | 认证 | 说明 |
|---|---|---|---|
/health | GET | 否 | 健康检查 + MQTT 状态 |
/api/auth/login | POST | 否 | 用户名密码登录 |
/api/latest?dev= | GET | 是 | 设备最新遥测 |
/api/status | GET | 是 | MQTT 连接状态、WS 客户端数、消息统计 |
/api/history | GET | 是 | 历史数据查询(游标分页) |
/api/alarms | GET | 是 | 告警查询 |
/api/commands/relay-set | POST | 是 | 继电器位图控制 |
/api/commands/upload-period | POST | 是 | 设置遥测上传周期(2/10/30/60 秒) |
/api/commands/ota-prepare | POST | 是 | 触发 OTA 准备 |
/api/firmware/upload | POST | 是 | 上传固件文件(512MB 限制,.bin/.hex) |
/api/ota/start | POST | 是 | 启动远程 OTA 会话 |
/api/ota/:sessionId/status | GET | 是 | 查询 OTA 进度 |
/api/ota/abort | POST | 是 | 中止 OTA 会话 |
安全特性
- JWT 密钥:
STM32_MILL_JWT_SECRET环境变量,长度 ≥ 32 字符 - 速率限制:
/api/commands每 IP 每分钟 30 次 - 设备冷却:同一设备命令间隔 ≥ 500ms
- OTA 锁:OTA 升级期间阻止设备命令
- cmd_seq 计数器:进程内单调递增整数(不用 Date.now(),因为 uint32 溢出)