返回博客
BullMQNestJSShopifyMedusa消息队列

Shopify-to-Medusa:用 BullMQ 构建可靠的异步消费体系

介绍 Shopify Webhook 在进入 BullMQ 后的异步消费设计,包括队列隔离、Worker 并发控制、指数退避重试、死信队列、状态管理与消息重放,构建一套可靠、可恢复的生产级消费体系。

RaytonX
2 min read

为什么选择 BullMQ

在 Node.js 生态里,BullMQ 是生产级别任务队列的主流选择。它基于 Redis 构建,提供了完善的队列管理、Worker 并发控制、重试策略、以及事件钩子。

对于我们的场景,BullMQ 有几个关键特性:

  • Job 持久化:任务存储在 Redis 中,服务重启不丢失
  • 细粒度的重试控制:可以针对不同错误类型配置不同的重试策略
  • 内置的失败处理:原生支持 failed 队列,天然契合 DLQ 的设计
  • 事件钩子:Worker 的每个生命周期节点都有对应的事件,方便接入状态管理和监控

在 NestJS 中,@nestjs/bullmq 提供了与框架的深度集成,Queue 和 Worker(Processor)都可以作为 Injectable 注入,组织结构清晰。


队列隔离:按事件类型分离,互不影响

最直觉的做法是建一个统一的队列,所有 Shopify 事件都往里丢。这在早期可以工作,但会带来一个根本性的问题:一类事件的处理故障会拖累所有事件的处理进度。

比如,商品同步逻辑遇到了一个数据格式的边界情况,导致队列里积压了大量失败任务,不断重试。与此同时,订单事件和用户事件也在同一个队列里排队,被这批失败任务阻塞。

我们的选择是按事件类型建立独立队列

webhook-products   →  ProductsProcessor
webhook-orders     →  OrdersProcessor
webhook-customers  →  CustomersProcessor

三条队列相互独立,任何一条出现问题都不影响其他两条的正常消费。这个设计也带来了额外的好处:每条队列可以独立配置并发数、重试策略、优先级,针对不同数据类型的业务特点做差异化调整。

并发数的考量。 并发数不是越高越好,它取决于下游系统的承载能力。如果下游是 PostgreSQL,过高的并发会导致连接池耗尽;如果下游需要调用 Medusa 的 API,过高的并发会触发限流。一个合理的起点是根据数据库连接池大小和下游 API 的 Rate Limit 来估算,然后在压测中逐步调整。

订单数据的处理逻辑通常比商品和用户更复杂,涉及的数据库操作更多,并发数应该相对保守;商品数据的同步相对独立,可以适当提高并发。这种差异化配置,在统一队列的设计里是做不到的。


重试策略:指数退避,而不是固定间隔

Worker 处理失败后,BullMQ 支持自动重试。重试策略的设计,直接影响系统在故障时的行为。

为什么用指数退避,而不是固定间隔重试?

固定间隔重试的问题在于:当下游服务出现故障时,所有失败的任务会按照固定频率持续轰炸已经处于压力下的服务,雪上加霜。指数退避让每次重试之间的间隔越来越长,给下游服务留出恢复的时间。

一个典型的参数配置思路:

  • 最大重试次数:3 到 5 次。次数太少,瞬时故障没有机会自愈;次数太多,系统性问题会被掩盖,消息长时间卡在重试状态。
  • 初始等待时间:几秒到十几秒。足够短,不影响正常情况下的处理速度;足够长,给下游服务喘息的机会。
  • 退避倍数:通常是 2,每次等待时间翻倍。

哪些错误该重试,哪些不该。 这是重试策略里最容易被忽略的问题。

瞬时故障应该重试:数据库连接超时、下游 API 返回 503、网络抖动导致的请求失败。这类错误是环境问题,重试后通常能成功。

确定性错误不应该重试:数据格式解析失败、必填字段缺失、业务规则校验不通过。这类错误无论重试多少次结果都一样,重试只是在浪费资源,应该直接进入 DLQ,等待人工介入。

在 BullMQ 中,可以在 Worker 的错误处理逻辑里判断错误类型,对于确定性错误直接抛出一个特定类型的异常,配合 removeOnFail 和自定义的失败处理钩子,将其路由到 DLQ 而跳过重试。


死信队列:兜底,不是垃圾桶

超过最大重试次数的消息,会进入 BullMQ 的 failed 状态。我们在这个基础上建立了一套明确的 DLQ 机制。

失败消息的路由。 当 Worker 将任务标记为最终失败时,我们通过 BullMQ 的 failed 事件钩子,将消息的关键信息写入一张专用的 DLQ 表:原始事件 ID、失败原因、每次重试的错误详情、最终失败时间。这张表是排查问题的第一现场。

告警机制。 DLQ 有消息进入时,必须立即触发告警。沉默的 DLQ 是危险的——消息在里面静静堆积,没有人知道,也没有人处理。告警通过 Slack、邮件 推送,核心信息是:哪条消息失败了、事件类型是什么、失败原因的摘要。

人工介入与重新投递。 工程师收到告警后,可以通过管理界面或脚本查看 DLQ 中的消息详情,修复问题后将消息重新投递到对应的正常队列。重新投递时,消息的重试计数清零,作为一个新任务重新进入处理流程。


状态机:让每条消息都有迹可循

BullMQ 自身维护了任务的状态(waiting、active、completed、failed),但这个状态存在 Redis 里,任务完成后可能被清理,也不方便做复杂的业务查询。

我们在数据库里将 webhook_logs 作为唯一事实来源,这张表并不是 Redis 中任务状态的简单映射,而是业务层面的处理记录。即使 Redis 中的任务被清理,数据库仍然保留完整的处理历史,方便审计、统计和故障恢复。

状态的更新时机:

  • 投入队列时:状态为 pending
  • Worker 取出任务,开始处理时:更新为 processing
  • 处理成功时:更新为 done,记录完成时间
  • 超出重试上限,进入 DLQ 时:更新为 failed,记录错误详情

processing 状态有一个额外的价值:它可以用来检测"僵尸任务"。如果一条消息长时间停留在 processing 状态(例如超过 10 分钟),说明 Worker 可能在处理过程中崩溃了,任务没有被正常标记为成功或失败。可以通过定时任务扫描这类消息,及时发现并恢复异常任务。

BullMQ 管理队列的运行时状态,数据库管理业务层面的处理记录。两者结合,才能在生产环境中对每一条消息的去向做到完全可控。


消息重放:精确恢复,不是全量重跑

重放机制是整套可靠性设计的最后一块拼图。

什么时候需要重放。 最常见的场景是:修复了一个 Worker 处理逻辑的 bug,需要对历史上因为这个 bug 而失败的消息重新处理。另一个场景是:数据库出现了问题,需要对某个时间段内的数据重新同步。

实现上,重放的入口是数据库里的状态记录:按条件查询出状态为 failed 的消息,读取原始 Payload,重新投入对应的 BullMQ 队列,将状态重置为 pending。由于原始 Payload 在接收时已经完整保存,重放不依赖 Shopify 再次推送,也不依赖任何外部系统的状态。

重放的幂等性。 重放本质上是再次触发一次处理流程,因此 Worker 的业务逻辑必须是幂等的——处理同一条消息两次,结果应该和处理一次相同。这通常意味着在写入目标数据库时,使用 upsert 而不是 insert,以事件 ID 或业务唯一键作为幂等标识。


小结

这套 BullMQ 异步处理管道,把可靠性拆解成了几个可以独立验证的机制:

  • 队列隔离保证一类故障不传染给其他数据类型;
  • 指数退避重试给瞬时故障自愈的机会;
  • 确定性错误直接进 DLQ,不浪费重试资源;
  • DLQ 告警确保失败不被忽视;
  • 数据库状态机让每条消息有迹可循;
  • 消息重放让故障恢复精确可控。

这些机制组合在一起,解决的不是"消息能不能被处理"的问题,而是"消息出了问题之后能不能被恢复"的问题。在生产环境中,后者才是真正决定系统可信度的地方。