核心结论
消息队列已经不能只按“引入一个 MQ 中间件”来讨论。对 AI 全栈系统来说,消息基础设施至少分成三层:第一层是任务队列,用来承接异步任务、削峰、重试和死信;第二层是事件流,用来保留可回放的事实日志、CDC、训练数据和审计流水;第三层是运行时事件总线,用来表达 Agent 的 token 流、工具调用、子代理状态、人工审批和失败恢复。
Kafka、RabbitMQ、RocketMQ、Pulsar、Redis Streams、NATS JetStream、Amazon SQS、Google Pub/Sub 与 Azure Service Bus 都可以进入候选集,但它们解决的不是同一个问题。选型时先判断“消息是否必须长期回放”“是否需要复杂业务路由”“是否需要事务消息或延迟消息”“团队是否愿意自建集群”,再决定产品,而不是反过来用产品特性定义架构。
站内已有内容如何整合
convee.cn 之前已经有两条与消息队列有关的线索。第一条是 Agent Runtime 事件总线,它关注模型输出、工具调用、handoff、子代理生命周期和 tracing 如何进入同一控制面。第二条是 AI 状态数据平面,它把 Kafka 与 PostgreSQL、Redis 一起放进状态层发布工程,强调版本治理、回放验证和回滚边界。
本页把这两条线收束成一个更底层的中间件选型手册:Agent 事件总线回答“运行时事件如何建模”,状态数据平面回答“状态组件如何治理”,消息队列选型回答“哪一类消息系统应该承载哪一段链路”。
三类能力先分清
任务队列的核心是把工作交给消费者处理。典型例子是文档解析、向量化、通知发送、订单超时关闭、图片处理和离线评测。它的关键指标不是永久保留历史,而是投递可靠性、重试退避、死信处理、可见性超时、消费者扩容和幂等。
事件流的核心是把事实写成可追加、可回放的日志。典型例子是用户行为、CDC、模型调用流水、工具调用审计、RAG 索引变更、训练样本采集和特征管道。它的关键指标是分区、偏移量、保留周期、消费者组、回放成本、schema 兼容和跨集群复制。
运行时事件总线的核心是把一个长任务的过程显式化。Agent 系统里,一次运行会持续产生 run_started、model_delta、tool_call_started、tool_call_completed、handoff_requested、human_approval_required、retryable_error、run_completed 等事件。这里的难点不是吞吐最大化,而是事件语义、顺序键、trace 关联、敏感字段治理和局部恢复。
主流选型边界
| 选型 | 更适合 | 关键优势 | 主要代价 |
|---|---|---|---|
| Kafka | 高吞吐事件流、CDC、日志、可回放数据管道 | 分区日志、生态成熟、Kafka 4.2 已把 Share Groups/Queues for Kafka 推到生产可用,4.3 继续主线演进 | 运维和容量治理重,传统业务路由不如 AMQP 队列直接 |
| RabbitMQ | 业务异步、AMQP、复杂路由、工作队列 | exchange/queue/binding 模型清晰,Quorum Queues 提供复制队列,Streams 补齐可回放 | 大规模日志流和长期保留不是它最强项 |
| RocketMQ | Java 业务系统、订单支付、延迟与事务消息 | 顺序、延迟、重试、事务、DLQ 等业务消息语义集中 | 海外生态和多语言成熟度通常弱于 Kafka/RabbitMQ |
| Pulsar | 多租户、海量 topic、跨地域、队列流一体 | broker 与 BookKeeper 存储分离,多订阅模式、分层存储和跨集群能力强 | 组件更多,平台运维门槛高 |
| Redis Streams | 轻量事件流、已有 Redis 体系内的消费者组 | 部署路径短,适合小规模实时流和内部任务 | 不应承担大型持久事件平台或复杂 MQ 治理 |
| NATS JetStream | 低延迟服务间事件、轻量消息平台 | 部署轻,支持 stream、consumer、work queue retention | 生态规模和企业数据平台集成不如 Kafka |
| 云队列 | 云上业务异步和托管消息 | SQS、Pub/Sub、Service Bus 免自建集群,和云 IAM/监控/函数集成好 | 跨云迁移、细粒度底层控制和本地开发一致性要额外设计 |
AI 系统里的典型落点
RAG 摄取链路适合拆成任务队列加事件流。文件上传后先进入任务队列,驱动解析、清洗、切分、embedding 和索引写入;每个阶段再写出事件流,记录文档版本、chunk 版本、索引版本和失败原因。这样既能控制工作并发,又能在召回质量下降时回放某一批文档。
Agent 长任务更适合用事件总线建模。前端看的是 SSE 或 WebSocket,后端内部应该看统一事件信封:run_id、trace_id、agent_id、step_id、sequence、event_type、payload_schema_version、recoverable。如果要持久回放,可把关键事件落 Kafka/Pulsar;如果只做在线协调,可先落 Redis Streams、NATS JetStream 或云队列。
推理与批处理链路要把“排队”与“调度”分开。MQ 负责接收请求、去重、超时和重试,GPU 调度器负责批量、优先级、模型副本和资源水位。不要让消费者直接无限拉取大任务,否则队列积压会变成 GPU 饥饿、OOM 或长尾延迟。
审计与合规链路更接近事件流。模型输入输出摘要、工具调用、权限决策、人工审批和策略版本应该写成不可轻易覆盖的事实日志。这里优先考虑 Kafka、Pulsar 或云 Pub/Sub 类系统,而不是只靠应用日志。
可靠性设计不要交给 MQ 自动完成
“至少一次投递”是大多数系统的现实边界,所以消费者必须幂等。常见做法包括业务唯一键、去重表、状态机版本号、数据库唯一约束和幂等 token。对于外部副作用,例如发邮件、扣款、调用第三方 API,消费者需要先写本地状态,再决定是否执行副作用,不能只靠 broker 的 ack。
事务一致性优先用 Outbox 模式解决。业务库事务里同时写业务状态和 outbox 表,再由后台进程投递消息;投递成功后标记发送状态。这样比“先写库再发 MQ”或“先发 MQ 再写库”更容易恢复。若采用 RocketMQ 这类事务消息能力,也仍然要定义本地事务检查、超时补偿和重复消费策略。
顺序要按业务键设计,而不是追求全局顺序。订单系统按 order_id,用户会话按 conversation_id,Agent 运行按 run_id,文档摄取按 document_id 或 tenant_id + document_id。全局严格有序会牺牲扩展性,并把热点键放大成系统瓶颈。
延迟重试要分层。短暂网络错误可以快速重试,外部系统限流要指数退避,业务前置条件未满足可以延迟到未来某个时间,永久错误进入 DLQ。DLQ 不是垃圾桶,而是人工修复、批量重放和故障复盘入口。
运行手册基线
生产前至少建立六张表或等价台账:topic/queue 清单、消息 schema 清单、消费者责任清单、重试与 DLQ 策略、容量与保留策略、权限与审计策略。每个队列都要能回答:谁生产、谁消费、消息保留多久、最大消息多大、是否可回放、失败后谁处理、是否包含敏感字段。
监控指标至少包括生产速率、消费速率、积压量、消费延迟、重试次数、DLQ 数量、消费者错误率、broker 磁盘水位、分区热点、leader 切换和端到端业务耗时。只看 broker 存活没有意义,真正影响用户的是任务何时完成、事件是否可回放、失败是否可恢复。
发布流程要包含消息契约兼容性。新增字段默认可选,删除字段要经过双写双读窗口,schema 版本要进入事件信封。消费者升级通常比生产者升级更敏感,因为旧消费者可能无法理解新事件。对多团队平台来说,CloudEvents 这类通用事件信封可以作为跨语言、跨系统的最小约束,但不要用它替代业务 schema 设计。
安全上要默认启用 TLS、认证、最小权限和租户隔离。Agent 事件尤其容易包含 prompt、工具参数、检索片段、用户数据和内部路径,不能默认全量长期保留。可观测系统可以采样,审计事件要脱敏,调试 payload 要有保留周期。
采用建议
如果团队已经在单一云厂商里,普通业务异步优先用托管队列,减少自建 broker 运维。如果业务在 Java/Spring 体系里且需要延迟、事务、顺序和重试语义,RocketMQ 或 RabbitMQ 更容易落地。如果目标是高吞吐、可回放、数据平台和 CDC,Kafka 仍然是默认基线;如果还需要强多租户、海量 topic 和存储计算分离,再评估 Pulsar。
AI 应用早期不要一上来就搭复杂消息平台。先把消息语义、幂等键、DLQ、事件信封和监控指标定清楚,再根据吞吐、保留和回放要求替换底层产品。消息队列真正的架构价值不是“异步化”四个字,而是把系统从不可恢复的一次性调用,变成可观察、可回放、可补偿的工程流程。