#技术
替代第一版评审稿。本稿目标是把"承诺了语义、没承诺实现"的所有空洞都补齐,给出一套生产可落地、能闭环、能承受 5x 流量压测的完整设计。
| # | 决策 | 替代了原设计中的什么 |
|---|---|---|
| D1 | 三层分离:device → slot(极稳定)、slot → worker(高频动态)、worker → 内部 mailbox(实现细节)。三层由三个独立版本号管理,互不耦合。 | 原本一套"槽点版本"既表达资源映射又表达节点归属,必然撕裂 |
| D2 | 扩缩容只迁 ownership,不动 slot 数量。TOTAL_SLOTS 在系统初始化时一次性确定,例如 256,永不随 Worker 数量变化。 | 原本"加节点 = 释放一批槽点"导致 IS/Worker 双对齐 |
| D3 | Worker fencing 落到行级:每条 upsert 都带 writer_epoch 谓词;slot_lease.epoch 在 worker 切换时单调自增。 | 原本只有"语义承诺",没有 SQL 防御,必产生静默脏写 |
| D4 | 告警与状态机分离:状态字段走"newest seq wins",告警走 alert_event append-only + alert_current 物化视图,不允许跨优先级队列覆盖告警。 | 原本优先级插队会导致告警被低优先级旧消息覆盖 |
| D5 | 派生状态写进 device_status_current.snapshot JSONB 列,slot 迁移即可恢复,禁止 Worker 内未持久化窗口。 | 原本切换 Worker 后防抖、速率窗口全部丢失 |
| D6 | IS 写盘 WAL(SQLite):核心设备消息全部先落本地盘再投 MQ,RabbitMQ 故障时 IS 不丢消息。 | 原本 RabbitMQ 故障时低优先级直接丢,核心设备没有承诺 |
| D7 | Web 控制面以 DB 为事实源,PG advisory lock 选主,Redis 仅做加速通知。Worker 周期性轮询 slot_lease,不依赖 push 可靠性。 | 原本 Redis pub/sub 丢一次推送就可能双 owner |
| D8 | 优先级简化为 2 档(high / normal),CRITICAL 合入 high,LOW 在 IS 源头降频不下沉到 MQ。 | 3 档队列数 ×1.5、运维复杂度边际收益低 |
| D9 | PostgreSQL 升级到 16 列入 P0,与第二阶段调度闭环并行做;不升级则 5x 压测必先垮在 DB。 | 原本只把 PG11 列为风险,未给路径 |
你提的核心痛点是:"版本机制有两套、排水时 IS 不知道该不该投递、扩缩容导致全量对齐"。这背后是三个本来应该独立的概念被强行用同一个"槽点版本"表达:
| 被混淆的概念 | 真实属性 | 真实变化频率 | 真实关心方 |
|---|---|---|---|
| A. 设备落在哪个分片单元(device → slot) | 极稳定 | 几乎不变 | IS、Worker、Web 全部 |
| B. 哪个 Worker 当前消费这个分片(slot → worker) | 高频变化 | 秒级到分钟级 | Worker、调度器 |
| C. 设备资源、巡检计划本身的内容 | 中频变化 | 业务变更触发 | IS、Worker、Web |
| 把这三件事压缩到一个"槽点版本"上,每次扩缩容都在改 A,逼得 IS 和 Worker 重新对齐 → IS 一边在排水(停产),一边还要按新映射产消息(投产)→ 排水永远排不完。 | |||
| 修正方向:把这三件事变成三套独立版本,A、C 极少变(且变更走专门协议),B 高频变(IS 完全不感知)。Worker 扩缩容只动 B,IS 永远只关心 A 和 C。 | |||
| 这一条已被原评审稿识别,本文档继续展开实现层。 |
| 版本号 | 含义 | 类型 | 写入方 | 消费方 | 进入 MQ 消息? | 变化频率 |
|---|---|---|---|---|---|---|
| resource_version | 设备资源(IP、型号、分组、优先级) | int64 单调递增 | Web | IS / Worker | ✓ | 中(业务变更) |
| plan_version | 巡检计划(频率、超时、检测项) | int64 单调递增 | Web | IS / Worker | ✓ | 中(业务变更) |
| slot_map_version | device → slot 映射 | int64 单调递增 | Web | IS / Worker | ✓ | 极低(重分片才动) |
| slot_lease_epoch | slot → worker 归属代次 | int64 per slot | Web | Worker / 调度器 | ✗(消息体里没有,只在 fencing 时校验) | 高(秒级到分钟级) |
| schema_version | 消息结构本身的版本 | int 小整数 | 协议 | IS / Worker | ✓ | 极低(升级才动) |
| 关键不变量: | ||||||
| INV-1: IS 投递的消息中携带 schema_version、resource_version、plan_version、slot_map_version。 | ||||||
| INV-2: IS 永远不感知 slot_lease_epoch。 | ||||||
| INV-3: Worker 扩缩容只改 slot_lease_epoch,不改其它任何版本。 | ||||||
| INV-4: device → slot 映射变更(slot_map_version 递增)走专门的"重分片协议"(§4.5),不与日常调度混用。 |
Worker 状态机: [*] → REGISTERED → WARMING → ACTIVE ↓ ┌─────────── DRAINING ─→ OFFLINE │ ↑ └─→ SUSPECTED → DEAD → QUARANTINE → WARMING (心跳超) (隔离观察)
| 状态 | 进入条件 | 允许动作 | 退出条件 |
|---|---|---|---|
| REGISTERED | Worker 启动完成注册 | 加载配置 | 自检通过 |
| WARMING | 自检通过 | 不接消息,不持有 lease,建立连接 | 心跳稳定 N 个周期 |
| ACTIVE | WARMING 期满 | 接收 slot 分配,正常消费 | 主动下线 / 心跳异常 |
| DRAINING | 主动下线请求 | 不接新 slot,处理本地 in-flight | 全部 slot 释放完成 |
| SUSPECTED | 心跳延迟超阈值(如 15 秒) | 暂不剥离 slot,观察 | 恢复 → ACTIVE / 失联 → DEAD |
| DEAD | SUSPECTED 持续 30 秒以上 | slot 强制剥离,等待接管 | (终态) |
| QUARANTINE | 一个 DEAD Worker 又恢复 | 不立即归还 slot,观察 5 分钟 | 稳定 → WARMING |
| OFFLINE | DRAINING 完成 | (终态) | — |
| Slot 状态机: | |||
| [*] → FREE → ACTIVE ─┬─→ MOVING → DRAINING_OLD → LOADING_NEW → ACTIVE |
│
└─→ ORPHANED → LOADING_NEW → ACTIVE
(旧主死)| 状态 | 含义 | fencing 行为 |
|---|---|---|
| FREE | 无 owner(仅初始化期出现) | 任何写入都不能通过 |
| ACTIVE | 有合法 owner | 该 owner 的 epoch 写入通过 |
| MOVING | 调度器决定迁移 | 旧 owner 仍可短暂写入直到 DRAINING_OLD |
| DRAINING_OLD | 旧 owner 处理本地 in-flight | 旧 owner 写入通过,新 owner 写入被拒(epoch 未升) |
| LOADING_NEW | 新 owner 加载快照 | 任何写入都不通过(epoch 已升但还没 ACTIVE) |
| ORPHANED | 旧主死亡,等新主接管 | 任何写入都不通过 |
| Worker 内部 SlotProcessor 状态机(精简版): | ||
| IDLE → DRAINING_INBOX → BATCH_BUILDING → COMMITTING → ACK_PENDING → IDLE |
↓ fencing 失败
ABORT_BATCH → STOP_CONSUMERCREATE TABLE slot_lease ( slot_id SMALLINT PRIMARY KEY, owner_worker_id VARCHAR(64), -- NULL 表示无主 lease_epoch BIGINT NOT NULL DEFAULT 0, -- 单调递增,永不回退 state VARCHAR(16) NOT NULL, -- ACTIVE / MOVING / DRAINING_OLD / LOADING_NEW / ORPHANED / FREE lease_until TIMESTAMPTZ, -- 当前租约到期时刻 last_renewed_at TIMESTAMPTZ, slot_map_version BIGINT NOT NULL, -- 该 slot 当前生效的 device 映射版本 -- 用于 grace window 期间识别 device 跨 slot weight INT NOT NULL DEFAULT 1 );
CREATE INDEX idx_slot_lease_owner ON slot_lease(owner_worker_id); CREATE INDEX idx_slot_lease_state ON slot_lease(state); lease_epoch 是核心 fencing 字段:每次 owner 切换 +1,永不回退。Web 调度器是唯一的 epoch 写入方,Worker 只读。
{ "schema_version": 2, "envelope": { "message_id": "<is_id>:<cycle_id>:<device_id>:", "device_id": "dev_00012345", "slot_id": 73, "slot_map_version": 1024, "resource_version": 17, "plan_version": 9, "is_node": "is-03", "cycle_id": 20260425103000, "seq": 42, "collected_at": "2026-04-25T10:30:01.123Z", "server_received_at": null, "priority": "high" }, "payload": { /* 巡检状态字段 */ } }
| 字段 | 用途 |
|---|---|
| message_id | 全局唯一,用于幂等去重,结构化便于排查 |
| slot_id | IS 计算 hash(device_id) % TOTAL_SLOTS 后投递路由 |
| slot_map_version | Worker 校验消息映射是否在合法窗口内 |
| resource_version | Worker 拒绝过老资源版本(宽限期外) |
| cycle_id + seq | 设备级单调,用于状态字段 newest-wins 比对 |
| collected_at | 探针端采集时刻(NTP 校准) |
| server_received_at | Worker 接收时刻(用于延迟监控、时钟偏差兜底) |
| priority | high / normal,决定 routing key |
| 字段不变量: | |
| INV-5: 同一 device 的 (cycle_id, seq) 在 IS 端单调递增,永不回退。 | |
| INV-6: message_id 全局唯一。在 IS 重启/重放后,相同消息必须保持同一个 message_id。 | |
| INV-7: schema_version 升级时,新旧 Worker 必须并行支持至少两个 schema_version 一段时间。 |
核心思想:每次 Worker 处理一批消息时,写入 device_status_current 必须带两个谓词:
⠀不依赖事务级 SELECT FOR UPDATE,因为那会让 slot 切换被 Worker 长事务阻塞。 device_status_current 表关键列: CREATE TABLE device_status_current ( device_id VARCHAR(64) PRIMARY KEY, slot_id SMALLINT NOT NULL, current_slot SMALLINT NOT NULL, -- grace window 期间区分新旧 slot seq BIGINT NOT NULL, cycle_id BIGINT NOT NULL, collected_at TIMESTAMPTZ NOT NULL, writer_epoch BIGINT NOT NULL, -- 上次写入者持有的 lease_epoch status_payload JSONB NOT NULL, derived_snapshot JSONB, -- 派生状态快照(防抖计数、滑动窗口、上次心跳等) updated_at TIMESTAMPTZ NOT NULL DEFAULT now() );
CREATE INDEX idx_dsc_slot ON device_status_current(slot_id); CREATE INDEX idx_dsc_updated ON device_status_current(updated_at); Fencing UPSERT 模板(这是 Worker 必须使用的固定模板,做进 SDK): -- 单条 upsert 模板(批量执行时用 UNNEST 多行同时写) INSERT INTO device_status_current (device_id, slot_id, current_slot, seq, cycle_id, collected_at, writer_epoch, status_payload, derived_snapshot, updated_at) VALUES (1,2, 2,3, 4,5, 6,7, $8, now()) ON CONFLICT (device_id) DO UPDATE SET seq = EXCLUDED.seq, cycle_id = EXCLUDED.cycle_id, collected_at = EXCLUDED.collected_at, writer_epoch = EXCLUDED.writer_epoch, current_slot = EXCLUDED.current_slot, status_payload = EXCLUDED.status_payload, derived_snapshot = EXCLUDED.derived_snapshot, updated_at = now() WHERE -- 旧消息防御 device_status_current.seq < EXCLUDED.seq -- Fencing:旧 owner 写不进来(epoch 已升) AND EXCLUDED.writer_epoch >= device_status_current.writer_epoch -- Grace window 期间:只有当前合法 slot 的 owner 才能写 AND (device_status_current.current_slot = EXCLUDED.slot_id OR device_status_current.current_slot IS NULL); 为什么不用事务级 fencing:
⠀Worker 批量提交伪代码: function commitBatch(slotId, messages): # 1. 应用侧去重:同 device 取 seq 最大的那条 deduped = dedupByDeviceLatestSeq(messages)
# 2. 读当前持有的 lease epoch(来自 Worker 内存,由心跳线程刷新)
myEpoch = currentLeases[slotId].epoch
if myEpoch is null:
abortBatch(messages) # 不 ack,等重投递
return
# 3. 批量 UPSERT current(行级 fencing)
BEGIN
UNNEST 所有 deduped 一次性发出
rowsAffected = 实际更新的行数
# 4. 批量 INSERT history(append-only,不需要 fencing)
INSERT INTO device_status_history (...) VALUES ...
# 5. 批量 INSERT alert_event(append-only)
INSERT INTO alert_event (...) VALUES ...
# 6. 批量更新 alert_current(基于 alert_event 物化)—— 见 §3.3
merge_alert_current_from_events(...)
# 7. 批量更新 rollup(基于 EXCLUDED 增量)
upsert_rollup(...)
COMMIT
# 8. 提交成功才 ack
if commit success:
for msg in messages:
channel.basicAck(msg.deliveryTag, multiple=false)
else:
# 不 ack,等待 RabbitMQ 重投递
log error
# 如果是 fencing 失败(不是 DB 故障),主动 cancel 该 slot 的 consumer
if fencingFailed:
cancelSlotConsumer(slotId)关键点:
⠀3.2 派生状态持久化(解决 D5) derived_snapshot JSONB 列存放 SlotProcessor 内的所有派生状态。设计原则: 原则:device 当前状态 = fold(历史所有事件) = fold(快照截至 T0 的状态, T0 之后的事件) 典型派生字段: { "consec_failure_count": 3, "last_n_states_ring": ["UP","UP","DOWN","UP","DOWN"], "ring_pos": 4, "last_heartbeat_at": "2026-04-25T10:29:50Z", "rate_window": { "window_start": "2026-04-25T10:25:00Z", "events_count": 142, "last_event_at": "2026-04-25T10:29:58Z" }, "debounce_pending": { "since": "2026-04-25T10:29:30Z", "trigger_threshold": 5 } } 约束: INV-8: SlotProcessor 内不允许有任何"未持久化的状态"。 INV-9: 每次 batch commit 必须把派生状态作为快照写入 derived_snapshot。 INV-10: Worker 接管 slot 时,加载所有受影响 device 的 derived_snapshot 到内存。 Slot 迁移加载流程: 1. 新 Worker 收到 slot 分配 2. SELECT device_id, status_payload, derived_snapshot, seq, cycle_id FROM device_status_current WHERE current_slot = ? 分批(如每批 1000 行)加载到 SlotProcessor 内存 3. 加载完成后,Worker 上报 ready 4. Web 把 slot 状态从 LOADING_NEW 切到 ACTIVE 5. Worker 开始 basic.consume 容量估算:单 device snapshot ≤ 2KB,1M device 总快照量约 2GB,单 slot(4000 device)约 8MB,加载时间 < 1 秒(DB 不忙时)。
原设计的"newest seq wins" 不能用于告警,因为: 高优先级告警 firing (cycle=10, seq=100) 投到 high queue 普通状态更新 status (cycle=11, seq=101) 投到 normal queue Worker 处理顺序:先 status,再 firing "newest seq wins" → status (seq=101) 覆盖 firing (seq=100) → 告警丢失 修正:告警事件流为事实源,alert_current 由事件流确定性派生。 alert_event 表(append-only,事实源): CREATE TABLE alert_event ( event_id BIGSERIAL PRIMARY KEY, device_id VARCHAR(64) NOT NULL, alert_key VARCHAR(128) NOT NULL, -- 同一类告警的稳定 key(如 "device.offline") event_type VARCHAR(16) NOT NULL, -- FIRING / RESOLVED / UPDATED severity SMALLINT NOT NULL, -- 0-4: INFO/WARN/MINOR/MAJOR/CRITICAL event_time TIMESTAMPTZ NOT NULL, -- 事件发生时刻(来自 collected_at) cycle_id BIGINT NOT NULL, seq BIGINT NOT NULL, payload JSONB NOT NULL, ingest_writer_epoch BIGINT NOT NULL, UNIQUE (device_id, alert_key, cycle_id, seq) );
CREATE INDEX idx_ae_device_key_time ON alert_event(device_id, alert_key, event_time DESC); UNIQUE 约束直接做幂等:重复消息 INSERT ON CONFLICT DO NOTHING 即可。 alert_current 表(物化视图,最后一次有效告警状态): CREATE TABLE alert_current ( device_id VARCHAR(64) NOT NULL, alert_key VARCHAR(128) NOT NULL, state VARCHAR(16) NOT NULL, -- ACTIVE / RESOLVED severity SMALLINT NOT NULL, first_fired_at TIMESTAMPTZ NOT NULL, last_event_at TIMESTAMPTZ NOT NULL, last_event_id BIGINT NOT NULL, -- 引用 alert_event.event_id payload JSONB NOT NULL, PRIMARY KEY (device_id, alert_key) ); 事件 → 当前态的派生规则: 对于同一 (device_id, alert_key),按 event_time 排序,规约:
最近的 FIRING/UPDATED 之后没有 RESOLVED → ACTIVE
最近的事件是 RESOLVED → RESOLVED
severity 取最近 ACTIVE 期间的最大值 Worker 处理告警的伪代码: function processAlertEvent(message, currentDeviceState):
INSERT INTO alert_event (...) ON CONFLICT DO NOTHING
latestActive = SELECT MAX(event_time), MAX(severity) FROM alert_event WHERE device_id=? AND alert_key=? AND event_time > now() - interval '7 days' AND event_id > (SELECT COALESCE(last_event_id,0) FROM alert_current ...)
UPSERT alert_current ... 性能保护:alert_event 按 (device_id, alert_key) hash 分区或按 event_time range 分区,查询永远命中索引,不退化。
device → slot 真正变更(slot_map_version 升)时,宽限期内同 device 可能存在新旧两个 slot 各自的 Worker 同时写入。 协议: T0: Web 发布 slot_map_version=N+1,effective_time=T0+5min grace window = [T0, T0+10min]
T0~T0+5min: IS 仍按 v=N 投递,新 IS 拉到 v=N+1 但 effective_time 未到, 依然按 v=N 投递。Worker 接受 v=N 消息正常。
T0+5min: IS 切到 v=N+1。开始按新 slot 投递。 Worker 进入"双版本接受"模式:v=N 和 v=N+1 都接收。
T0+5min ~ T0+10min: 对受影响 device,新旧两个 slot 都可能收到消息。 Web 在 effective_time 这一刻,对每个受影响 device 执行: UPDATE device_status_current SET current_slot = newSlot, slot_id = newSlot WHERE device_id = ?
旧 slot 的 Worker 此后 UPSERT 会因 current_slot != EXCLUDED.slot_id 失败 → 不更新。
新 slot 的 Worker UPSERT 会成功。
旧 slot 的 history 仍然 INSERT(append-only,无害)。T0+10min: Worker 拒绝任何 v=N 的消息(已超出 grace window)。 IS 端若仍有 v=N 的本地 buffer 消息,按设备级 message_id 幂等忽略。 关键 fencing 谓词(已包含在 §3.1 的 SQL 模板里): AND (device_status_current.current_slot = EXCLUDED.slot_id OR device_status_current.current_slot IS NULL); 重分片操作清单(一次性 DDL,由 Web 后台执行): BEGIN; -- 1. 标记新 slot_map_version INSERT INTO slot_map_version (version, effective_at) VALUES (N+1, '...');
-- 2. 计算受影响设备 INSERT INTO slot_remap_pending (device_id, old_slot, new_slot) SELECT device_id, old_slot_fn(device_id), new_slot_fn(device_id) FROM device_resource WHERE old_slot_fn(device_id) <> new_slot_fn(device_id);
-- 3. 在 effective_time 时点,原子地切换 current_slot -- (由后台调度器在 effective_at 时刻执行) UPDATE device_status_current dsc SET current_slot = srp.new_slot, slot_id = srp.new_slot FROM slot_remap_pending srp WHERE dsc.device_id = srp.device_id;
-- 4. 通知 IS 切换 slot_map_version COMMIT; 评审建议:日常运维永远不要触发 slot_map_version 变更。这条流程只在以下场景使用:
⠀3.5 IS WAL Buffer(解决 D6) 每个 IS 节点本地维护一个 SQLite/RocksDB WAL,所有产出消息先落盘再投 MQ。 SQLite 表: CREATE TABLE outbox ( seq INTEGER PRIMARY KEY AUTOINCREMENT, message_id TEXT NOT NULL UNIQUE, routing_key TEXT NOT NULL, body BLOB NOT NULL, created_at INTEGER NOT NULL, confirmed INTEGER NOT NULL DEFAULT 0 -- 0=未确认, 1=已 confirm );
CREATE INDEX idx_outbox_unconfirmed ON outbox(confirmed, seq) WHERE confirmed = 0; 生产侧伪代码: function produce(message): INSERT INTO outbox (message_id, routing_key, body, created_at) VALUES (...) sqlite.commit() # 落盘
# 异步投递
rabbitChannel.publish(routing_key, body, mandatory=true, with_confirm)function onPublishConfirm(deliveryTag): UPDATE outbox SET confirmed=1 WHERE seq=mapping[deliveryTag]
function onPublishNack(deliveryTag): # 不更新 confirmed=1,重发任务会重新投递
function periodicReplayer(): # 每 5 秒一次 for row in SELECT * FROM outbox WHERE confirmed=0 AND created_at < now()-3s: rabbitChannel.publish(row.routing_key, row.body)
function periodicCleaner(): # 每 1 分钟一次 DELETE FROM outbox WHERE confirmed=1 AND created_at < now() - retention_period 容量规划:
| 设备级别 | 单 IS 速率(变更后) | 单条均值 | 24h 累积 | 推荐磁盘配置 |
|---|---|---|---|---|
| 核心 | 60 msg/s | 500B | ~2.5GB | 至少 10GB 独立分区,retention=72h |
| 普通 | 60 msg/s | 500B | ~2.5GB | 同上 |
| 低优先级 | 在 IS 端降频,不进 buffer | — | — | — |
| 故障重启重放: | ||||
| IS 启动: |
message_id = "{is_node}:{cycle_id}:{device_id}:{seq}" message_dedup 表: CREATE TABLE message_dedup ( message_id VARCHAR(128) PRIMARY KEY, processed_at TIMESTAMPTZ NOT NULL DEFAULT now() ) PARTITION BY RANGE (processed_at);
-- 按天分区,保留 7 天 CREATE TABLE message_dedup_20260425 PARTITION OF message_dedup FOR VALUES FROM ('2026-04-25') TO ('2026-04-26'); Worker 处理时: 1. 批量 INSERT INTO message_dedup ... ON CONFLICT DO NOTHING 2. RETURNING 子句返回真正插入的 message_id 集合 3. 只对真正插入的消息执行业务处理 4. 无论是否首次见到,都 ack(重复消息已经被忽略) 保留策略:分区按天,保留 7 天即可(远长于 RabbitMQ 重投递窗口和 IS outbox retention)。
多实例 + Leader 选主: -- 在 PostgreSQL 上用 advisory lock 选主 SELECT pg_try_advisory_lock(hashtext('web_scheduler_leader')); -- 返回 true 即为 leader,定期续期(每 5 秒) 只有 leader 写入 slot_lease、做 rebalance 决策。Follower 只对外提供只读 API(前端查询、IS 拉资源)。 通知机制: 1. Leader 决策完成 → UPDATE slot_lease SET ... → COMMIT 2. Leader → Redis publish "slot_lease_updated" (slot_id 列表) 3. Worker 订阅 Redis → 收到通知 → 重新查询自己关心的 slot_lease 行 ↓ 兜底:Worker 每 5 秒主动 SELECT 一次自己持有的所有 lease,刷新本地缓存 4. Web Follower 也订阅 Redis → 刷新 dashboard 数据 关键不变量: INV-14: DB 中的 slot_lease 是事实源。Redis 仅为加速通知,丢消息不影响正确性。 INV-15: Worker 每次 batch commit 前必须确认本地 lease 缓存年龄 < 5 秒。 INV-16: Worker 收到 MQ delivery 时,若 slot 不在本地 lease 缓存中或 lease 已失效, 立即 basicCancel 该 channel 并 nack(requeue=true)。
CREATE TABLE worker_heartbeat ( worker_id VARCHAR(64) PRIMARY KEY, state VARCHAR(16) NOT NULL, last_heartbeat TIMESTAMPTZ NOT NULL, metadata JSONB, -- 负载、版本、IP 等 backlog_metrics JSONB -- 积压、lag、处理延迟 ); Worker 心跳逻辑: 每 3 秒:
触发时机:
⠀slot 权重计算: weight(slot) = α × device_count(slot) # 设备数 + β × msg_rate(slot, last_5min) # 消息速率 + γ × backlog_depth(slot) # MQ 积压 + δ × avg_process_ms(slot) # 处理耗时 + ε × high_priority_device_count(slot) # 高优先级设备数
α = 1, β = 5, γ = 10, δ = 2, ε = 3 rebalance 决策: function rebalance(): workers = ACTIVE Worker 列表(state=ACTIVE 且 WARMING 已过) if len(workers) == 0: return
totalWeight = sum(weight of all ACTIVE slots)
targetPerWorker = totalWeight / len(workers)
# 找出最重和最轻的 Worker
loads = {w: sum(weight(s) for s in w.slots) for w in workers}
maxW = max(loads), minW = min(loads)
# 阈值:差距 > 25% 才触发
if (loads[maxW] - loads[minW]) / targetPerWorker < 0.25:
return
# 单次最多迁移 N 个 slot(避免抖动)
movePlan = []
for slot in maxW.slots ordered by weight desc:
if loads[maxW] - weight(slot) >= targetPerWorker - 5%:
movePlan.append((slot, maxW, minW))
if len(movePlan) >= 5: break
for (slot, from, to) in movePlan:
executeMigration(slot, from, to)单 slot 迁移流程(与 §4.4 中 graceful drain 一致): function executeMigration(slot, oldOwner, newOwner): # 1. 标记 MOVING UPDATE slot_lease SET state=MOVING WHERE slot_id=?
# 2. 通知旧 owner 停止接收新消息
redis.publish("slot_command", {type: "DRAIN", slot: slot})
# 3. 旧 owner 收到命令:
# - basicCancel 该 slot 的 consumer
# - 处理本地 mailbox 内的消息
# - commit 完毕后 → UPDATE slot_lease SET state=DRAINING_OLD
# - 等待 ack 全部完成 → 通知 Web 释放
# 4. Web 收到 release 信号:
UPDATE slot_lease
SET owner_worker_id=newOwner, lease_epoch=lease_epoch+1,
state=LOADING_NEW, lease_until=now()+30s
WHERE slot_id=?
# 5. 通知新 owner
redis.publish("slot_command", {type: "ASSIGN", slot: slot, epoch: newEpoch})
# 6. 新 owner 加载 derived_snapshot → 上报 ready
UPDATE slot_lease SET state=ACTIVE WHERE slot_id=?
# 7. 新 owner basic.consume冷却约束:
场景 A:冷启动(系统首次安装) 1. DBA 初始化 TOTAL_SLOTS=256,slot_lease 全部 owner=NULL, state=FREE 2. Web 启动,竞选 leader 3. 第一个 Worker 启动 → 注册 → WARMING 4. WARMING 超期(60s 心跳稳定)→ ACTIVE 5. Web rebalance → 把所有 256 slot 分配给唯一的 Worker 注意:Worker 可能撑不住全部,按"线性逐步开启"策略 6. IS 启动 → 拉 resource_version、plan_version、slot_map_version 7. IS 按"渐进开闸"策略:先以 1/4 巡检频率运行,分批升频,避免新 Worker 被冲垮 8. 第二个 Worker 加入 → ACTIVE → rebalance 把 128 slot 迁过去 9. 第三个、第四个 Worker 陆续加入,slot 分布逐渐均匀 冷启动渐进开闸: 新 Worker 数 < 期望最小数(如 3):
仅当以下场景触发:
⠀流程:见 §3.4。
优先级简化为 2 档(D8): exchange: inspection.status (direct) routing_key 格式: slot.{slotId}.{priority} priority ∈ {high, normal}
queue: inspection.status.s{slotId}.{priority} 示例: inspection.status.s073.high inspection.status.s073.normal 总队列数:
| TOTAL_SLOTS | 队列数(high+normal) | RabbitMQ 节点配置建议 |
|---|---|---|
| 128 | 256 | 3 节点集群,单节点 4C/8GB |
| 256 | 512 | 3 节点集群,单节点 8C/16GB |
| 512 | 1024 | 5 节点集群,单节点 8C/16GB |
| 第一版选 256 slot = 512 queue,在合理负载内。 |
Quorum Queue: Arguments: x-queue-type: quorum x-single-active-consumer: true x-max-length-bytes: 50000000 # 单队列限 50MB,背压保护 x-overflow: reject-publish-dlx x-dead-letter-exchange: inspection.dlx
队列声明(Java): Map<String, Object> args = new HashMap<>(); args.put("x-queue-type", "quorum"); args.put("x-single-active-consumer", true); args.put("x-max-length-bytes", 50_000_000); args.put("x-overflow", "reject-publish-dlx"); args.put("x-dead-letter-exchange", "inspection.dlx"); channel.queueDeclare(queueName, true, false, false, args); 为什么 Quorum 而不是 Classic:
⠀512 队列 × 3 节点 Raft 副本:每节点保存 ~1.5GB Raft log(按队列 50MB cap),完全可承受。
第一版:每个活跃 slot consumer 一个独立 channel。 Worker 持有 50 个 slot × 2 优先级 = 100 channel 单 connection 默认上限 2047,完全够用 ack 路径清晰:delivery_tag 在自己的 channel 上 ack,无 frame interleaving 风险 Connection 配置: channelMax: 2048 heartbeat: 30 秒 automaticRecovery: true networkRecoveryInterval: 5 秒 topologyRecovery: true # 自动恢复 queue/binding
high queue: prefetch = 100 normal queue: prefetch = 300 原因:
⠀总 in-flight 限制:单 Worker (100 + 300) × 50 slot = 20000 条 in-flight。按单条 500B = 10MB 内存,可接受。
mailbox 容量: high mailbox: 上限 = prefetch_high = 100 normal mailbox: 上限 = prefetch_normal = 300 满了之后:consumer 不再 ack,prefetch 自动卡住,背压传到 RabbitMQ。 SlotProcessor 调度: Worker 工作线程数 = max(物理核 × 1.5, 16)
调度策略:
每个 drain 周期: 从 high 取最多 50 条 若 high 不足 50,从 normal 补到 200 条 执行 batch commit
当前 PG11 → PG16:
| 维度 | PG11 | PG16 | 对系统影响 |
|---|---|---|---|
| MERGE 语句 | 无 | 有 | 批量 upsert 减少 30% 耗时 |
| 执行期分区裁剪 | 无 | 有 | history 查询性能数倍提升 |
| 并行 VACUUM | 无 | 有 | autovacuum 稳定不滞后 |
| Quorum Replication | 弱 | 强 | DR 与 HA 配置更简单 |
| EOL | 已 EOL | 长期支持 | 安全合规 |
| 实施路径: | |||
| 1. 在测试环境部署 PG16,做完整功能回归测试(2 周) | |||
| 2. 生产环境部署 PG16 standby,逻辑复制从 PG11 同步 | |||
| 3. 业务低峰期切换主从,rollback 路径保留 7 天 | |||
| 4. 切换后第二阶段开发同步进行,但 rebalance、fencing 等新功能必须在 PG16 上验收 |
| 表 | 类型 | 关键策略 |
|---|---|---|
| device_resource | 配置表 | 普通 |
| device_slot | 映射表 | 仅在 slot_map_version 升级时变更 |
| inspection_plan | 配置表 | 普通 |
| slot_lease | 调度表 | 高频读,中频写,索引覆盖 owner、state |
| worker_heartbeat | 调度表 | 高频写(每 3s),TTL=24h |
| device_status_current | 热数据 | 高频 upsert,单设备一行,需要 fillfactor=70 |
| device_status_history | 时序大表 | RANGE PARTITION BY ts,每天一分区 |
| device_status_rollup_1m | 聚合表 | 每分钟一行,可重算 |
| alert_event | 时序大表 | RANGE PARTITION BY event_time,每天一分区 |
| alert_current | 热数据 | 物化视图模式,单 (device, key) 一行 |
| message_dedup | 时序大表 | RANGE PARTITION BY processed_at,每天一分区,保留 7 天 |
| slot_map_version | 配置表 | 历史版本保留 |
| slot_remap_pending | 临时表 | 重分片操作时使用 |
-- fillfactor 留 30% 空间给 HOT update,避免索引膨胀 CREATE TABLE device_status_current (...) WITH (fillfactor=70);
-- autovacuum 加速 ALTER TABLE device_status_current SET ( autovacuum_vacuum_scale_factor = 0.05, autovacuum_vacuum_cost_limit = 2000 ); 批量写入策略: Worker 累积一批 (max 200 条 或 max 200ms) 后:
分区: CREATE TABLE device_status_history ( ts TIMESTAMPTZ NOT NULL, device_id VARCHAR(64) NOT NULL, seq BIGINT NOT NULL, payload JSONB NOT NULL, PRIMARY KEY (ts, device_id, seq) ) PARTITION BY RANGE (ts);
-- 每天分区,保留策略由 device priority 决定 保留策略:
| 设备优先级 | 完整明细保留 | 仅保留聚合(rollup_1m) | 完全清理 |
|---|---|---|---|
| 核心 | 90 天 | 1 年 | > 1 年 |
| 普通 | 30 天 | 180 天 | > 180 天 |
| 低优先级 | 7 天(仅变化点) | 90 天 | > 90 天 |
| 清理脚本(每天凌晨): | |||
| -- 直接 DETACH + DROP,不走 DELETE,秒级完成 | |||
| ALTER TABLE device_status_history DETACH PARTITION history_20260101; | |||
| DROP TABLE history_20260101; |
CREATE TABLE device_status_rollup_1m ( minute_bucket TIMESTAMPTZ NOT NULL, device_id VARCHAR(64) NOT NULL, sample_count INT NOT NULL, state_changes INT NOT NULL, last_state VARCHAR(32), last_payload JSONB, PRIMARY KEY (minute_bucket, device_id) ) PARTITION BY RANGE (minute_bucket); 由 Worker 在每个 batch commit 时增量更新(同一事务内)。
ALTER TABLE device_resource ADD COLUMN priority SMALLINT NOT NULL DEFAULT 2; -- 1: CRITICAL 2: NORMAL 3: LOW CRITICAL/HIGH 简化合并为 priority=1,对应 MQ high 队列。
function inspectionLoop(): for device in deviceList: skipReason = checkBackpressure(device) if skipReason == "BACKPRESSURE_CRITICAL": if device.priority != 1: continue # 仅核心设备 elif skipReason == "BACKPRESSURE_HIGH": if device.priority == 3 and currentSlot % 4 != 0: continue # 低优先级设备 1/4 频率 # ... else 正常巡检
msg = inspect(device)
outbox.append(msg)slot.{slotId}.high 与 slot.{slotId}.normal,按 device.priority 路由。
function pickNextBatch(slotProcessor): high = slotProcessor.highMailbox.size() normal = slotProcessor.normalMailbox.size()
# 防饥饿:normal mailbox 最旧消息超过 60 秒,强制处理 normal
if normal > 0 and normalOldestAge() > 60s:
return drain(normalMailbox, 200)
# 默认权重 high : normal = 3 : 1
if high > 0:
return drain(highMailbox, min(150, high)) + drain(normalMailbox, 50)
else:
return drain(normalMailbox, 200)DB 写入异常率 > 30%(3 分钟窗口)→ 进入降级模式:
| SLO | 推荐值(取决于业务) |
|---|---|
| 巡检周期 | 60 秒(一般实时监控场景) |
| 故障检测最大延迟 | 90 秒 |
| 高优先级告警 RTO | < 10 秒 |
| 普通设备状态查询延迟 | P99 < 200ms |
| 7 天历史查询延迟 | P99 < 2 秒 |
| SLO 反推: | |
| 1M 设备 / 60 秒巡检周期 = 16,667 msg/s 入流 | |
| × 1.5 余量 = 25,000 msg/s 设计容量 | |
| ⚠️ 这是原始评审稿的 70K/min(1.2K/s)的 20 倍。原稿的 70K/min 只是"每探针 3500/min × 20 探针"的 IS 出口速率,实际全量巡检远高于此。 |
档位 A(POC / 小规模):
| 维度 | 配置 |
|---|---|
| 设备规模 | 5 万 |
| 巡检周期 | 60 秒 |
| 入流速率 | 833 msg/s |
| TOTAL_SLOTS | 64 |
| Worker 数量 | 2 |
| Web | 单实例 4C/8GB |
| Worker | 单实例 4C/8GB JVM heap |
| RabbitMQ | 3 节点 Quorum,单节点 4C/8GB |
| PostgreSQL | 主备 16,主 8C/32GB,磁盘 NVMe 500GB |
| 档位 B(中型 / 当前目标): | |
| 维度 | 配置 |
| :-: | :-: |
| 设备规模 | 50 万 |
| 巡检周期 | 60 秒 |
| 入流速率 | 8,300 msg/s |
| TOTAL_SLOTS | 256 |
| Worker 数量 | 6 ~ 8 |
| Web | 双实例 leader-follower 各 4C/8GB |
| Worker | 单实例 8C/16GB JVM heap=10GB |
| RabbitMQ | 3 节点 Quorum,单节点 8C/16GB,独立 NVMe |
| PostgreSQL | 主 + 同步 standby + 异步 standby,主 16C/64GB,磁盘 NVMe 2TB |
| 档位 C(生产 / 1M 设备): | |
| 维度 | 配置 |
| :-: | :-: |
| 设备规模 | 100 万 |
| 巡检周期 | 60 秒 |
| 入流速率 | 16,667 msg/s |
| TOTAL_SLOTS | 512 |
| Worker 数量 | 12 ~ 16 |
| Web | 三实例(leader + 2 followers),单实例 8C/16GB |
| Worker | 单实例 16C/32GB JVM heap=24GB |
| RabbitMQ | 5 节点 Quorum,单节点 16C/32GB,独立 NVMe |
| PostgreSQL | 主 32C/128GB + 2 备库,磁盘 NVMe 8TB,含 PITR 备份 |
| Redis | 3 节点哨兵,单节点 4C/8GB(仅 pub/sub) |
| 所有档位都建议拆分部署:Web、Worker、RabbitMQ、PostgreSQL 至少分别独立机器/容器。 |
| 探针数 | 单探针设备数 | 出口速率 | IS 节点配置 |
|---|---|---|---|
| 20 探针 (B) | 25,000 | 417 msg/s | 4C/8GB,SQLite WAL 50GB |
| 20 探针 (C) | 50,000 | 833 msg/s | 8C/16GB,SQLite WAL 100GB |
1. PostgreSQL 11 → 16 升级路径验证 2. RabbitMQ Quorum Queue 选型与部署 3. 监控系统(Prometheus + Grafana + OpenTelemetry)部署
1. 固定 TOTAL_SLOTS=256,建立 device → slot 稳定映射 2. 实现五版本号体系(resource / plan / slot_map / lease_epoch / schema) 3. IS 端去除对 lease_epoch 的感知 4. message_id 幂等键 + message_dedup 表 5. device_status_current 加 writer_epoch 列与 fencing UPSERT 模板(SDK 化) 6. Worker 改为按 slot_lease 消费 7. SlotProcessor 内部 mailbox 实现 8. IS 端 SQLite WAL outbox
1. slot_lease 表 + worker_heartbeat 表 2. Web Leader 选主(PG advisory lock) 3. Worker 状态机(REGISTERED → WARMING → ACTIVE → DRAINING → DEAD → QUARANTINE) 4. Slot 状态机(FREE → ACTIVE → MOVING → DRAINING_OLD → LOADING_NEW) 5. rebalance 算法(权重计算、阈值触发、冷却) 6. 完整生命周期场景闭环测试(A/B/C/D/E/F/G 七场景) 7. derived_snapshot 持久化与加载
1. 设备 priority 字段 2. IS 端按优先级降频策略 3. slot.high / slot.normal 队列 4. Worker 加权消费 + 防饥饿 5. DB 写入降级模式 6. RabbitMQ / DB 故障演练
1. alert_event / alert_current 重构 2. history 时间分区 3. rollup_1m 聚合 4. history 保留策略(按 priority) 5. 慢 SQL 治理与索引优化
1. 1x / 2x / 5x 流量压测 2. 全部 §10 验收场景跑通 3. 灰度发布
| 编号 | 场景 | 验证点 |
|---|---|---|
| V1 | 1 个 Worker 冷启动 | 全部 256 slot 可被消费,IS 不感知 Worker 数 |
| V2 | 新增 Worker | rebalance 在 60 秒内完成,迁移 5~10 slot;IS 完全无感知 |
| V3 | Worker 优雅下线 | 旧 Worker basicCancel;in-flight 处理完后释放;不丢消息 |
| V4 | Worker kill -9 | 40 秒内接管完成;fencing 防止脏写;message_dedup 防止重复处理 |
| V5 | Worker 假死 | epoch fencing 拦截旧 Worker 写入(rowsAffected=0);自动 quarantine |
| V6 | DB 故障 5 分钟 | Worker 不 ack;MQ backlog 可恢复;DB 恢复后追赶 |
| V7 | RabbitMQ 故障 5 分钟 | IS outbox 累积;恢复后按 seq 重放;message_dedup 防重复 |
| V8 | 高优先级告警延迟 | P99 < 10 秒(端到端 device → alert_current) |
| V9 | 重复消息 | current 不倒退;history append-only 但有 UNIQUE 防重复 |
| V10 | 旧 collected_at 消息 | seq 单调单调谓词拦截;不影响 current |
| V11 | resource_version / plan_version 切换 | grace window 内并存;切换不丢消息 |
| V12 | slot_map_version 切换(重分片) | grace window 内 current_slot 切换;新旧 slot 无双写 |
| V13 | 单 slot 热点 | rebalance 自动识别;管理员可触发 slot 拆分(重分片) |
| V14 | history 分区清理 | DETACH + DROP 秒级完成;不影响 current 查询 |
| V15 | 派生状态丢失测试 | Worker 切换后防抖、速率窗口正确恢复(用 derived_snapshot) |
| V16 | Web Leader 切换 | 旧 leader 断网;新 leader 60 秒内接管;无双调度 |
| V17 | 并发 Worker 心跳 | 8 Worker 每 3 秒心跳,DB 负载 < 5% CPU |
| V18 | 5x 流量压测 | 80,000 msg/s 持续 1 小时;Worker / DB / RabbitMQ 无故障 |
INV-1: IS 投递的消息中携带 schema_version、resource_version、plan_version、slot_map_version INV-2: IS 永远不感知 slot_lease_epoch INV-3: Worker 扩缩容只改 slot_lease_epoch,不改其它任何版本 INV-4: device → slot 映射变更(slot_map_version 递增)走专门的"重分片协议" INV-5: 同一 device 的 (cycle_id, seq) 在 IS 端单调递增,永不回退 INV-6: message_id 全局唯一,IS 重启/重放后保持同值 INV-7: schema_version 升级时新旧 Worker 必须并行支持至少两个版本 INV-8: SlotProcessor 内不允许有任何"未持久化的状态" INV-9: 每次 batch commit 必须把派生状态作为快照写入 derived_snapshot INV-10: Worker 接管 slot 时加载所有受影响 device 的 derived_snapshot INV-11: IS 不允许"先发 MQ 后写 outbox",必须先持久化 INV-12: outbox 的 message_id 必须能在 IS 重启后被重新构造 INV-13: outbox retention ≥ MQ 消息 TTL + 安全余量 INV-14: DB 中的 slot_lease 是事实源,Redis 仅为加速通知 INV-15: Worker 每次 batch commit 前必须确认本地 lease 缓存年龄 < 5 秒 INV-16: Worker 收到不属于自己的 slot 的 delivery 时立即 cancel + nack INV-17: 写入 device_status_current 必须使用标准 fencing UPSERT 模板 INV-18: alert_current 是 alert_event 的物化视图,不允许独立更新 INV-19: ack 必须在 DB commit 成功之后 INV-20: history 表写入是 append-only,依赖 PARTITION 治理而非 DELETE
| 原设计的痛点 | 正解 |
|---|---|
| 槽点版本一套表达三件事 | 拆成五个版本号,各管各的 |
| 扩缩容 = 改槽点数 = IS/Worker 全对齐 | 扩缩容 = 改 ownership = lease_epoch+1 = IS 无感知 |
| 排水时 IS 不知该不该投递 | IS 永远按 device→slot 投递;排水只排 Worker 本地 in-flight |
| Worker 内存状态丢失 | 派生状态写 device_status_current.derived_snapshot |
| 优先级覆盖告警 | 告警用 alert_event 流 + 物化 alert_current |
| Web 单点 | Leader 选主 + DB 事实源 |
| RabbitMQ 故障数据丢失 | IS SQLite WAL outbox,核心设备永不丢 |
| Worker 假死无法防御 | 行级 fencing UPSERT,绕不过 DB |
| PG11 风险 | 升级到 PG16 列入 P0 |
| 最关键的一条:这套设计把"调度"和"消费"完全解耦。IS 永远只关心 device→slot 这个稳定映射,Worker 永远只关心自己持有的 slot lease,Web 永远只协调 slot→worker 这个动态映射。三方各管各的,没有任何一方需要为另两方"对齐版本"。 | |
| 这就是为什么原设计调度永远混乱:它强迫三方共享一个版本号;正解的核心思想是让该稳定的彻底稳定,让该变化的只在一个维度变化。 |
加载评论中...