在项目中, 我们需要将 MongoDB 中的业务数据同步到 ClickHouse, 以满足多维度, 低延迟的查询分析需求结合客户对消息中间件的稳定性与易运维的要求, 最终选择 RabbitMQ 构建数据链路, 并搭配自研工具完成从历史数据到实时数据的同步和处理
1. 数据链路目标
我们的目标包括:
- 支持 MongoDB 中的历史数据同步;
- 支持业务系统中新增数据的实时写入和处理;
- 保证高可用, 可追溯, 可恢复;
- 提供维度字段补全以支持多维分析
由于原有业务系统以 MongoDB 为主库, 且部分系统未便于改造为事件驱动架构, 我们在设计上兼容了「部分同步 + 实时补充」的混合式方案
2. 数据同步链路设计
1 数据读取策略
历史数据同步
- 使用
createdAt字段分段拉取 MongoDB 的历史数据 - 脚本支持分页批处理, 适配 MongoDB 的分页游标读取方式
增量数据同步(CDC)
- 对于支持消息异步的模块, 我们优先在业务侧发送 RabbitMQ 消息;
- 对于不可变数据或不方便异步写入的模块, 我们基于
updatedAt字段做周期增量同步; - 每个数据表都配套一个消费者服务, 监听对应的消息队列;
2 数据写入
- 为每个业务表定义 MongoDB 与 ClickHouse 的字段映射关系;
- 编写映射配置 JSON, 字段转换, 日期格式, 布尔值等统一处理;
- 写入前使用缓存服务
CacheService补齐维度字段, 如部门, 业务线名称; - 数据处理后写入 ClickHouse, 使用
INSERT INTO批量模式提升写入性能; - 对于错误记录转入死信队列
3. ClickHouse 表结构设计
1. 原始事实表设计(fact_xx)
- 使用脚本根据字段映射自动生成建表语句;
- 大多数表使用
ReplacingMergeTree, 使用updated_at去重; - 对于保留历史记录的数据, 使用
MergeTree结构
CREATE TABLE fact_xx (
xx_id String,
...
created_at DateTime64(3),
updated_at DateTime64(3)
)
ENGINE = ReplacingMergeTree(updated_at)
PARTITION BY toYYYYMM(updated_at)
ORDER BY
(xx_id);
2. 物化聚合表设计(mv_xx)
CREATE MATERIALIZED VIEW mv_xx
ENGINE = AggregatingMergeTree()
ORDER BY (xx_id)
AS
SELECT
xx_id,
...
argMaxState(created_at, created_at) AS created_at_state,
argMaxState(updated_at, created_at) AS updated_at_state
FROM
fact_xx
GROUP BY
xx_id;
3. 查询视图供业务使用 (latest_xx)
最终视图表通过update_at进行聚合, 确保查询结果的实时性
CREATE VIEW latest_xx AS
SELECT
xx_id,
...
argMaxMerge(created_at_state) AS created_at,
argMaxMerge(updated_at_state) AS updated_at
FROM
mv_xx
GROUP BY
xx_id;
结语
在不引入 Kafka 的前提下, 我们构建了一套基于 MongoDB + RabbitMQ + ClickHouse 的轻量实时分析系统它能够兼顾业务系统稳定运行, 数据链路灵活可控, 以及最终在 ClickHouse 上完成高性能分析与多维可视化, 满足了我们对数据敏捷与可靠性的双重诉求