什么是消息队列? 消息队列(Message Queue,简称MQ)是一种异步通信机制 ,用于在分布式系统中传递消息。它允许应用程序通过发送和接收消息来相互通信,而不需要知道彼此的位置,也不需要在同一时间都处于运行状态。
消息队列的核心概念 1 生产者(Producer) → [消息队列] → 消费者(Consumer)
生产者(Producer) : 发送消息的应用程序
消息队列(Message Queue) : 存储消息的缓冲区
消费者(Consumer) : 接收并处理消息的应用程序
Broker : 消息中间件服务器,负责接收、存储和转发消息
Topic/Exchange : 消息的分类或路由规则
为什么需要消息队列? 在没有消息队列的情况下,系统之间的调用通常是同步的 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 public void createOrder (Order order) { orderRepository.save(order); inventoryService.deductStock(order); smsService.sendNotification(order); emailService.sendEmail(order); }
问题 :
响应时间长:用户需要等待所有操作完成
耦合度高:订单服务依赖库存、短信、邮件等服务
可用性差:任何一个服务故障都会导致订单创建失败
使用消息队列后:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 public void createOrder (Order order) { orderRepository.save(order); messageQueue.send("order.created" , order); } @MessageListener("order.created") public void handleOrderCreated (Order order) { inventoryService.deductStock(order); smsService.sendNotification(order); emailService.sendEmail(order); }
消息队列的核心优势 1. 异步处理 场景 : 用户注册后需要发送邮件和短信通知。
1 2 3 4 5 6 7 同步方式: 用户注册 → 保存数据(50ms) → 发送邮件(1000ms) → 发送短信(1000ms) 总耗时: 2050ms 异步方式: 用户注册 → 保存数据(50ms) → 发送消息(5ms) 总耗时: 55ms (邮件和短信在后台异步处理)
代码示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service public class UserService { @Autowired private EmailService emailService; @Autowired private SmsService smsService; public void register (User user) { userRepository.save(user); emailService.sendWelcomeEmail(user); smsService.sendVerificationCode(user); } } @Service public class UserService { @Autowired private RabbitTemplate rabbitTemplate; public void register (User user) { userRepository.save(user); rabbitTemplate.convertAndSend("user.register" , user); } } @Component public class UserRegisterListener { @RabbitListener(queues = "user.register") public void handleUserRegister (User user) { emailService.sendWelcomeEmail(user); smsService.sendVerificationCode(user); } }
2. 系统解耦 场景 : 订单系统需要通知多个下游系统。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 不使用MQ: 订单服务 → 库存服务 → 物流服务 → 积分服务 → 数据分析服务 问题: 订单服务需要知道所有下游服务,耦合度高 使用MQ: 订单服务 → [消息队列] ← 库存服务 ← 物流服务 ← 积分服务 ← 数据分析服务 优点: 订单服务只需要发消息,不需要知道有哪些消费者
代码示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 @Service public class OrderService { @Autowired private InventoryService inventoryService; @Autowired private LogisticsService logisticsService; @Autowired private PointsService pointsService; public void createOrder (Order order) { orderRepository.save(order); inventoryService.deductStock(order); logisticsService.createShipment(order); pointsService.addPoints(order); } } @Service public class OrderService { @Autowired private KafkaTemplate kafkaTemplate; public void createOrder (Order order) { orderRepository.save(order); kafkaTemplate.send("order.created" , order); } } @Component public class InventoryListener { @KafkaListener(topics = "order.created") public void handleOrderCreated (Order order) { inventoryService.deductStock(order); } } @Component public class LogisticsListener { @KafkaListener(topics = "order.created") public void handleOrderCreated (Order order) { logisticsService.createShipment(order); } }
3. 流量削峰 场景 : 秒杀活动瞬时流量巨大。
1 2 3 4 5 6 不使用MQ: 10000请求/秒 → 数据库(只能处理1000请求/秒) → 数据库崩溃 使用MQ: 10000请求/秒 → [消息队列] → 按照数据库承受能力消费(1000请求/秒) 消息暂存在队列中,慢慢处理
代码示例 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 @RestController public class SeckillController { @Autowired private RocketMQTemplate rocketMQTemplate; @PostMapping("/seckill") public Response seckill (@RequestBody SeckillRequest request) { if (!validateRequest(request)) { return Response.fail("参数错误" ); } rocketMQTemplate.convertAndSend("seckill-topic" , request); return Response.success("排队中,请稍后查看结果" ); } } @Component @RocketMQMessageListener( topic = "seckill-topic", consumerGroup = "seckill-consumer", consumeThreadMax = 20 // 限制并发数 ) public class SeckillConsumer implements RocketMQListener <SeckillRequest> { @Override public void onMessage (SeckillRequest request) { seckillService.processSeckill(request); } }
4. 可靠性保证 消息队列提供了多种机制保证消息不丢失:
持久化 : 消息写入磁盘,即使服务重启也不会丢失
确认机制 : 消费者处理完消息后才确认,否则消息会重新投递
死信队列 : 处理失败的消息会进入死信队列,避免消息丢失
消息队列的应用场景 1. 异步处理
用户注册后发送欢迎邮件
订单创建后发送短信通知
文件上传后异步处理(转码、压缩等)
2. 系统解耦
订单系统与库存系统解耦
用户服务与积分服务解耦
支付系统与会计系统解耦
3. 流量削峰
4. 日志收集
应用日志收集到中心日志系统
操作日志、审计日志收集
5. 数据同步
MySQL 数据同步到 Elasticsearch
订单数据同步到数据仓库
缓存失效通知
6. 事件驱动
主流消息队列对比 目前主流的消息队列中间件主要有 Kafka 、RabbitMQ 、RocketMQ 、ActiveMQ 、Pulsar 等。下面我们详细对比这些主流MQ的特点和适用场景。
核心对比表格
特性
Kafka
RabbitMQ
RocketMQ
ActiveMQ
Pulsar
开发语言
Scala/Java
Erlang
Java
Java
Java
开发公司
LinkedIn
Pivotal
阿里巴巴
Apache
Apache/Yahoo
开源时间
2011
2007
2012
2004
2016
协议支持
自定义协议
AMQP
自定义协议
JMS、AMQP
自定义协议
消息模型
发布/订阅
多种模式
发布/订阅
多种模式
发布/订阅
消息存储
磁盘(顺序写)
内存+磁盘
磁盘(随机写)
内存+磁盘
磁盘分层存储
消息顺序
分区内有序
不保证
全局有序/分区有序
不保证
分区内有序
吞吐量
极高(百万级/秒)
高(万级/秒)
极高(十万级/秒)
中等(万级/秒)
极高(百万级/秒)
延迟
毫秒级
微秒级
毫秒级
毫秒级
毫秒级
可用性
极高(副本机制)
高(镜像队列)
极高(主从架构)
高(主从架构)
极高(多副本)
消息回溯
支持
不支持
支持
不支持
支持
事务消息
支持(幂等)
支持
支持
支持
支持
延迟消息
不支持
支持(插件)
支持(18个级别)
支持
支持
死信队列
不支持
支持
支持
支持
支持
消息过滤
弱
支持
支持(Tag/SQL92)
支持
支持
消息追踪
支持
支持(插件)
支持
支持
支持
管理界面
第三方
官方(优秀)
官方(RocketMQ Console)
官方
官方
客户端语言
多种语言
多种语言
Java为主
Java为主
多种语言
社区活跃度
非常活跃
活跃
活跃
一般
活跃
学习成本
中等
低
中等
低
中等
运维成本
中等
低
中等
低
高
适用场景
大数据、日志收集、流处理
业务消息、任务队列
金融级业务、事务消息
中小型项目
统一消息平台
Kafka - 大数据场景的王者 架构特点 Kafka 是一个分布式流处理平台 ,最初由 LinkedIn 开发,专为高吞吐量的日志收集和流处理场景设计。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 Kafka 集群架构: Producer1 ──┐ Producer2 ──┼─→ [Broker1] ─┐ Producer3 ──┘ │ ├─→ [ZooKeeper 协调] Consumer1 ──┐ │ Consumer2 ──┼─← [Broker2] ─┤ Consumer3 ──┘ │ [Broker3] ─┘ Topic 分区机制: Topic: order-events ├─ Partition 0 (Leader: Broker1, Replica: Broker2) ├─ Partition 1 (Leader: Broker2, Replica: Broker3) └─ Partition 2 (Leader: Broker3, Replica: Broker1)
核心特性 1. 超高吞吐量 Kafka 的吞吐量可以达到百万级/秒 ,这得益于:
顺序写磁盘 : 比随机写内存还快
零拷贝(Zero Copy) : 减少数据在内核态和用户态之间的拷贝
批量发送 : 减少网络开销
压缩 : 支持 GZIP、Snappy、LZ4 等压缩算法
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 Properties props = new Properties ();props.put("bootstrap.servers" , "localhost:9092" ); props.put("key.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("value.serializer" , "org.apache.kafka.common.serialization.StringSerializer" ); props.put("batch.size" , 16384 ); props.put("linger.ms" , 10 ); props.put("buffer.memory" , 33554432 ); props.put("compression.type" , "lz4" ); KafkaProducer<String, String> producer = new KafkaProducer <>(props); for (int i = 0 ; i < 1000000 ; i++) { producer.send(new ProducerRecord <>("my-topic" , "key-" + i, "value-" + i)); } producer.close();
2. 消息持久化与回溯 Kafka 将所有消息持久化到磁盘 ,并且支持消息回溯 (重新消费历史消息)。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 Properties props = new Properties ();props.put("bootstrap.servers" , "localhost:9092" ); props.put("group.id" , "my-group" ); props.put("key.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("value.deserializer" , "org.apache.kafka.common.serialization.StringDeserializer" ); props.put("auto.offset.reset" , "earliest" ); KafkaConsumer<String, String> consumer = new KafkaConsumer <>(props); consumer.subscribe(Arrays.asList("my-topic" )); TopicPartition partition = new TopicPartition ("my-topic" , 0 );consumer.assign(Arrays.asList(partition)); consumer.seek(partition, 1000 ); while (true ) { ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100 )); for (ConsumerRecord<String, String> record : records) { System.out.printf("offset = %d, key = %s, value = %s%n" , record.offset(), record.key(), record.value()); } }
3. 分区机制保证顺序 Kafka 通过分区(Partition)机制,保证 同一分区内的消息是有序的 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 producer.send(new ProducerRecord <>("order-topic" , userId, orderData)); public class UserPartitioner implements Partitioner { @Override public int partition (String topic, Object key, byte [] keyBytes, Object value, byte [] valueBytes, Cluster cluster) { String userId = (String) key; int numPartitions = cluster.partitionCountForTopic(topic); return Math.abs(userId.hashCode()) % numPartitions; } }
4. 高可用性 Kafka 通过**副本(Replica)**机制保证高可用:
每个分区可以有多个副本
一个 Leader 副本,多个 Follower 副本
Leader 负责读写,Follower 同步数据
Leader 故障时,自动选举新 Leader
1 2 3 4 5 6 7 8 Partition 0 的副本分布: Broker1: Leader (读写) Broker2: Follower (同步) Broker3: Follower (同步) Broker1 宕机后: Broker2: Leader (自动选举,继续提供服务) Broker3: Follower (同步)
使用场景
日志收集 : ELK(Elasticsearch + Logstash + Kibana) 中的 L
大数据管道 : 作为数据源和数据汇的中间缓冲
实时流处理 : Kafka Streams、Flink、Spark Streaming
用户行为追踪 : 网站点击流、APP 埋点数据
运维监控 : 收集服务器、应用程序的监控指标
Spring Boot 集成示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 <dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> </dependency> spring: kafka: bootstrap-servers: localhost:9092 producer: key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all # 确保消息不丢失 retries: 3 consumer: group-id: my-group key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer properties: spring.json.trusted.packages: com.example.model auto-offset-reset: earliest enable-auto-commit: false # 手动提交 offset @Service public class OrderProducer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; public void sendOrder (Order order) { kafkaTemplate.send("order-topic" , order.getUserId(), order) .addCallback( result -> log.info("消息发送成功: {}" , result), ex -> log.error("消息发送失败" , ex) ); } } @Component public class OrderConsumer { @KafkaListener(topics = "order-topic", groupId = "order-group") public void consume ( @Payload Order order, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition, @Header(KafkaHeaders.OFFSET) long offset, Acknowledgment acknowledgment ) { log.info("收到消息: partition={}, offset={}, order={}" , partition, offset, order); try { orderService.processOrder(order); acknowledgment.acknowledge(); } catch (Exception e) { log.error("消息处理失败" , e); } } }
优缺点总结 优点 :
超高吞吐量(百万级/秒)
高可用性和容错性
支持消息回溯
分区机制保证顺序
生态完善(Kafka Streams、Kafka Connect 等)
缺点 :
消息延迟相对较高(毫秒级)
不支持消息优先级
不支持延迟消息(需要额外实现)
原生不支持死信队列 (需要应用层手动实现,Kafka Streams 在未来版本计划支持)
依赖 ZooKeeper(新版本改用 KRaft,但还在过渡期)
学习曲线陡峭
RabbitMQ - 功能最全面的通用MQ 架构特点 RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol) 协议实现的消息中间件,使用 Erlang 语言开发,以高可靠性著称。
1 2 3 4 5 6 7 8 9 10 11 RabbitMQ 消息流转: Producer → Exchange (交换机) → Queue (队列) → Consumer ↓ Binding (绑定规则) Exchange 类型: 1. Direct: 精确匹配 routing key 2. Topic: 通配符匹配 routing key 3. Fanout: 广播到所有绑定的队列 4. Headers: 根据消息头匹配
核心特性 1. 灵活的路由机制 RabbitMQ 提供了 4 种 Exchange 类型,支持复杂的消息路由。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 @Configuration public class DirectConfig { @Bean public DirectExchange directExchange () { return new DirectExchange ("direct-exchange" ); } @Bean public Queue errorQueue () { return new Queue ("error-queue" ); } @Bean public Queue infoQueue () { return new Queue ("info-queue" ); } @Bean public Binding errorBinding () { return BindingBuilder.bind(errorQueue()) .to(directExchange()) .with("error" ); } @Bean public Binding infoBinding () { return BindingBuilder.bind(infoQueue()) .to(directExchange()) .with("info" ); } } @Configuration public class TopicConfig { @Bean public TopicExchange topicExchange () { return new TopicExchange ("topic-exchange" ); } @Bean public Queue orderQueue () { return new Queue ("order-queue" ); } @Bean public Binding orderBinding () { return BindingBuilder.bind(orderQueue()) .to(topicExchange()) .with("order.*" ); } } @Configuration public class FanoutConfig { @Bean public FanoutExchange fanoutExchange () { return new FanoutExchange ("fanout-exchange" ); } @Bean public Binding fanoutBinding1 () { return BindingBuilder.bind(new Queue ("queue1" )) .to(fanoutExchange()); } @Bean public Binding fanoutBinding2 () { return BindingBuilder.bind(new Queue ("queue2" )) .to(fanoutExchange()); } }
2. 消息可靠性保证 RabbitMQ 提供了多层可靠性保证 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 @Configuration public class ReliableConfig { @Bean public RabbitTemplate rabbitTemplate (ConnectionFactory connectionFactory) { RabbitTemplate template = new RabbitTemplate (connectionFactory); template.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { log.info("消息到达 Exchange" ); } else { log.error("消息未到达 Exchange: {}" , cause); } }); template.setReturnsCallback(returned -> { log.error("消息未到达 Queue: {}" , returned.getMessage()); }); template.setMandatory(true ); return template; } } @Bean public Queue durableQueue () { return QueueBuilder.durable("my-queue" ) .build(); } rabbitTemplate.convertAndSend("exchange" , "routing-key" , message, msg -> { msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); return msg; }); @RabbitListener(queues = "my-queue", ackMode = "MANUAL") public void consume (Message message, Channel channel) throws IOException { try { processMessage(message); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true ); } }
3. 死信队列(DLX - Dead Letter Exchange) 什么是死信队列? 死信队列(Dead Letter Queue, DLQ) 是一种特殊的队列,用于存储无法被正常消费的消息 。当消息在业务队列中无法正常处理时,会被转发到死信队列,避免消息丢失,便于后续排查和处理。
什么消息会成为死信? 消息变成死信有以下三种情况 :
1 2 3 4 5 6 7 8 1. 消息被拒绝(basic.reject / basic.nack) 且 requeue = false └─ 消费者明确拒绝消息,并不重新入队 2. 消息 TTL 过期 └─ 消息在队列中存活时间超过设置的过期时间 3. 队列达到最大长度 └─ 队列满了,新消息会将旧消息挤出成为死信
死信队列的作用
避免消息丢失 : 处理失败的消息不会被丢弃,而是转移到死信队列
问题排查 : 可以查看死信队列中的消息,分析失败原因
延迟重试 : 可以从死信队列中重新投递消息进行重试
告警通知 : 监控死信队列,发现异常及时告警
实现延迟队列 : 利用 TTL + 死信机制实现延迟消息
死信队列工作原理 1 2 3 4 5 6 7 8 9 10 11 正常消息流转: Producer → Exchange → Business Queue → Consumer(处理成功) 死信消息流转: Producer → Exchange → Business Queue → Consumer(处理失败) ↓ (消息变成死信) ↓ DLX Exchange ← (x-dead-letter-exchange) ↓ Dead Letter Queue → Dead Letter Consumer(人工处理)
核心配置 :
业务队列需要配置 x-dead-letter-exchange(死信交换机)
可选配置 x-dead-letter-routing-key(死信路由键)
死信消息会携带额外的 headers 信息:
x-first-death-queue: 原始队列名称
x-first-death-reason: 成为死信的原因(rejected/expired/maxlen)
x-death: 详细的死信信息列表
完整代码示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 @Configuration public class DeadLetterConfig { @Bean public DirectExchange deadLetterExchange () { return new DirectExchange ("dlx-exchange" ); } @Bean public Queue deadLetterQueue () { return new Queue ("dlx-queue" ); } @Bean public Binding deadLetterBinding () { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with("dlx" ); } @Bean public Queue businessQueue () { return QueueBuilder.durable("business-queue" ) .withArgument("x-dead-letter-exchange" , "dlx-exchange" ) .withArgument("x-dead-letter-routing-key" , "dlx" ) .withArgument("x-message-ttl" , 60000 ) .withArgument("x-max-length" , 10000 ) .build(); } @Bean public DirectExchange businessExchange () { return new DirectExchange ("business-exchange" ); } @Bean public Binding businessBinding () { return BindingBuilder.bind(businessQueue()) .to(businessExchange()) .with("business" ); } } @Component public class BusinessConsumer { @RabbitListener(queues = "business-queue", ackMode = "MANUAL") public void consume (Order order, Channel channel, Message message) throws IOException { try { log.info("处理订单: {}" , order); if (order.getAmount() < 0 ) { throw new IllegalArgumentException ("订单金额不能为负数" ); } orderService.processOrder(order); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { log.error("订单处理失败: {}" , order, e); Map<String, Object> headers = message.getMessageProperties().getHeaders(); Integer retryCount = (Integer) headers.getOrDefault("retry-count" , 0 ); if (retryCount < 3 ) { log.info("重试第 {} 次" , retryCount + 1 ); headers.put("retry-count" , retryCount + 1 ); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true ); } else { log.error("超过最大重试次数,消息进入死信队列" ); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } } } } @Component public class DeadLetterConsumer { @Autowired private AlarmService alarmService; @RabbitListener(queues = "dlx-queue") public void consumeDeadLetter (Message message, Channel channel) throws IOException { try { String body = new String (message.getBody()); Map<String, Object> headers = message.getMessageProperties().getHeaders(); String reason = (String) headers.get("x-first-death-reason" ); String originalQueue = (String) headers.get("x-first-death-queue" ); log.error("收到死信消息 - 原始队列: {}, 死信原因: {}, 消息内容: {}" , originalQueue, reason, body); switch (reason) { case "rejected" : log.error("消息被拒绝,可能是业务数据异常" ); saveToFailedMessageTable(message); alarmService.sendAlert("消息处理失败" , body); break ; case "expired" : log.error("消息过期,可能是消费速度过慢" ); retryMessage(message); break ; case "maxlen" : log.error("队列已满,消息被挤出" ); alarmService.sendAlert("队列积压严重" , "队列: " + originalQueue); break ; default : log.error("未知的死信原因: {}" , reason); } channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { log.error("死信消息处理失败" , e); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } } private void saveToFailedMessageTable (Message message) { FailedMessage failedMessage = new FailedMessage (); failedMessage.setBody(new String (message.getBody())); failedMessage.setHeaders(message.getMessageProperties().getHeaders()); failedMessage.setCreateTime(new Date ()); failedMessageRepository.save(failedMessage); } private void retryMessage (Message message) { rabbitTemplate.convertAndSend("business-exchange" , "business" , new String (message.getBody())); } }
死信队列的实际应用场景 1. 消息重试机制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 @RabbitListener(queues = "order-queue", ackMode = "MANUAL") public void consume (Order order, Channel channel, Message message) throws IOException { try { orderService.processOrder(order); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { Integer retryCount = getRetryCount(message); if (retryCount < 3 ) { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true ); } else { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } } }
2. 异常消息监控
1 2 3 4 5 6 7 8 9 10 11 12 @Scheduled(fixedDelay = 60000) public void checkDeadLetterQueue () { Properties properties = rabbitAdmin.getQueueProperties("dlx-queue" ); Integer messageCount = (Integer) properties.get("QUEUE_MESSAGE_COUNT" ); if (messageCount > 100 ) { alarmService.sendAlert("死信队列消息过多" , "当前消息数: " + messageCount); } }
3. 实现延迟队列
死信队列与普通队列的区别
特性
普通队列
死信队列
消息来源
生产者直接发送
业务队列转发失败消息
消息类型
正常业务消息
无法正常处理的消息
处理方式
正常业务处理
告警、人工处理、重试
是否自动消费
是
通常不自动重试
最佳实践
必须监控死信队列 : 设置告警,及时发现问题
记录详细日志 : 死信消息要记录详细的失败原因和上下文
避免无限死信循环 : 死信队列的消费者不要再配置死信队列
合理设置重试次数 : 避免不可恢复的错误无限重试
定期清理 : 对于已处理的死信消息,定期清理或归档
其他 MQ 的死信队列实现 不同的 MQ 对死信队列的支持程度不同。RabbitMQ 和 RocketMQ 都提供了原生的死信队列支持 ,而 Kafka 不支持原生死信队列 ,需要通过应用层手动实现。
Kafka 的死信队列实现
Kafka 的设计哲学是 “dumb pipes, smart endpoints” (愚蠢的管道,聪明的端点):
Kafka Broker 只负责高效存储和转发消息
消息处理逻辑(包括错误处理)由消费者自己决定
这种设计保证了 Broker 的高性能和简洁性
方案一: 手动实现死信队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 @Component public class KafkaDLQConsumer { @Autowired private KafkaTemplate<String, Order> kafkaTemplate; private static final String DLQ_TOPIC = "order-dlq" ; private static final int MAX_RETRY = 3 ; @KafkaListener(topics = "order-topic", groupId = "order-group") public void consume ( ConsumerRecord<String, Order> record, Acknowledgment ack, @Header(KafkaHeaders.RECEIVED_TOPIC) String topic ) { Order order = record.value(); String messageId = order.getMessageId(); try { orderService.processOrder(order); ack.acknowledge(); } catch (Exception e) { log.error("消息处理失败: {}" , order, e); Integer retryCount = getRetryCount(messageId); if (retryCount < MAX_RETRY) { log.warn("消息处理失败,第 {} 次重试" , retryCount + 1 ); incrementRetryCount(messageId); } else { log.error("消息处理失败超过最大重试次数,发送到死信队列: {}" , order); sendToDLQ(record, e); ack.acknowledge(); } } } private void sendToDLQ (ConsumerRecord<String, Order> record, Exception e) { try { ProducerRecord<String, Order> dlqRecord = new ProducerRecord <>( DLQ_TOPIC, record.key(), record.value() ); dlqRecord.headers() .add("original-topic" , record.topic().getBytes()) .add("original-partition" , String.valueOf(record.partition()).getBytes()) .add("original-offset" , String.valueOf(record.offset()).getBytes()) .add("failure-reason" , e.getMessage().getBytes()) .add("failure-timestamp" , String.valueOf(System.currentTimeMillis()).getBytes()); kafkaTemplate.send(dlqRecord).get(); log.info("消息已发送到死信队列: topic={}, offset={}" , record.topic(), record.offset()); } catch (Exception ex) { log.error("发送到死信队列失败" , ex); saveToDatabase(record, e); } } } @Component public class KafkaDLQMessageConsumer { @KafkaListener(topics = "order-dlq", groupId = "dlq-group") public void consumeDLQ ( ConsumerRecord<String, Order> record, @Header("original-topic") String originalTopic, @Header("failure-reason") String failureReason ) { log.error("处理死信消息 - 原始Topic: {}, 失败原因: {}, 消息: {}" , originalTopic, failureReason, record.value()); } }
方案二: 使用重试 Topic(推荐)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 @Component public class KafkaRetryConsumer { private static final String MAIN_TOPIC = "order-topic" ; private static final String RETRY_TOPIC = "order-retry" ; private static final String DLQ_TOPIC = "order-dlq" ; private static final int MAX_RETRY = 3 ; @KafkaListener(topics = MAIN_TOPIC, groupId = "order-group") public void consumeMain (ConsumerRecord<String, Order> record, Acknowledgment ack) { processMessage(record, 0 , ack); } @KafkaListener(topics = RETRY_TOPIC, groupId = "order-retry-group") public void consumeRetry ( ConsumerRecord<String, Order> record, @Header(value = "retry-count", required = false) Integer retryCount, Acknowledgment ack ) { int currentRetry = retryCount == null ? 1 : retryCount; processMessage(record, currentRetry, ack); } private void processMessage (ConsumerRecord<String, Order> record, int retryCount, Acknowledgment ack) { try { orderService.processOrder(record.value()); ack.acknowledge(); } catch (Exception e) { log.error("消息处理失败,当前重试次数: {}" , retryCount, e); if (retryCount < MAX_RETRY) { sendToRetry(record, retryCount + 1 ); } else { sendToDLQ(record, e); } ack.acknowledge(); } } private void sendToRetry (ConsumerRecord<String, Order> record, int retryCount) { ProducerRecord<String, Order> retryRecord = new ProducerRecord <>( RETRY_TOPIC, record.key(), record.value() ); retryRecord.headers().add("retry-count" , String.valueOf(retryCount).getBytes()); kafkaTemplate.send(retryRecord); } }
Kafka Connect 的死信队列支持
如果使用 Kafka Connect ,它已经内置了死信队列功能:
1 2 3 4 errors.tolerance =all errors.deadletterqueue.topic.name =dlq-topic errors.deadletterqueue.context.headers.enable =true
未来展望: KIP-1034
Apache Kafka 社区提出了 KIP-1034 ,计划为 Kafka Streams 添加原生死信队列支持:
配置参数: errors.deadletterqueue.topic.name
自动将处理失败的记录发送到死信 Topic
携带失败原因、原始 Topic 等元数据
当前状态 (2025年1月):
KIP-1034 已被采纳(Adopted)
具体发布版本待定
Kafka Consumer API 暂无原生 DLQ 支持计划
4. 延迟队列 RabbitMQ 可以通过 TTL + 死信队列 实现延迟消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 @Configuration public class DelayQueueConfig { @Bean public Queue delayQueue () { return QueueBuilder.durable("delay-queue" ) .withArgument("x-dead-letter-exchange" , "business-exchange" ) .withArgument("x-dead-letter-routing-key" , "business" ) .build(); } @Bean public Queue businessQueue () { return new Queue ("business-queue" ); } @Bean public DirectExchange businessExchange () { return new DirectExchange ("business-exchange" ); } @Bean public Binding businessBinding () { return BindingBuilder.bind(businessQueue()) .to(businessExchange()) .with("business" ); } } @Service public class DelayMessageService { @Autowired private RabbitTemplate rabbitTemplate; public void sendDelayMessage (String message, long delayMillis) { rabbitTemplate.convertAndSend("delay-queue" , message, msg -> { msg.getMessageProperties().setExpiration(String.valueOf(delayMillis)); return msg; }); } } @RabbitListener(queues = "business-queue") public void consumeBusinessMessage (String message) { log.info("收到延迟消息: {}" , message); }
使用场景
业务消息 : 订单、支付、物流等核心业务
任务队列 : 异步任务处理
RPC 调用 : 同步请求-响应模式
延迟任务 : 订单超时自动取消、定时提醒
事件通知 : 系统间的事件驱动
Spring Boot 集成示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency> spring: rabbitmq: host: localhost port: 5672 username: guest password: guest virtual-host: / listener: simple: acknowledge-mode: manual # 手动确认 prefetch: 1 # 每次只取一条消息 @Service public class OrderProducer { @Autowired private RabbitTemplate rabbitTemplate; public void sendOrder (Order order) { rabbitTemplate.convertAndSend("order-exchange" , "order.created" , order); } } @Component public class OrderConsumer { @RabbitListener(queues = "order-queue") public void consume (Order order, Channel channel, Message message) throws IOException { try { log.info("收到订单: {}" , order); orderService.processOrder(order); channel.basicAck(message.getMessageProperties().getDeliveryTag(), false ); } catch (Exception e) { log.error("处理失败" , e); Integer retryCount = (Integer) message.getMessageProperties() .getHeaders().get("retry-count" ); if (retryCount == null ) { retryCount = 0 ; } if (retryCount < 3 ) { message.getMessageProperties().getHeaders().put("retry-count" , retryCount + 1 ); channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , true ); } else { channel.basicNack(message.getMessageProperties().getDeliveryTag(), false , false ); } } } }
优缺点总结 优点 :
功能非常全面(路由、延迟、死信等)
可靠性高(多层确认机制)
管理界面友好
社区活跃,文档完善
支持多种协议(AMQP、MQTT、STOMP 等)
客户端语言丰富
缺点 :
吞吐量相对较低(万级/秒)
Erlang 语言不易定制开发
集群部署相对复杂
不支持消息回溯
RocketMQ - 阿里巴巴的金融级MQ 架构特点 RocketMQ 是阿里巴巴开源的分布式消息中间件,专为金融级业务 设计,具有高可靠、高可用、高性能的特点。
1 2 3 4 5 6 7 8 9 10 RocketMQ 架构: Producer ─┐ Producer ─┼─→ [NameServer 集群] ←─ Broker Master1 (写) Producer ─┘ ↓ ↓ ↓ Broker Slave1 (读+备份) Consumer ─┐ ↓ Consumer ─┼─→ 路由信息 ←─ Broker Master2 (写) Consumer ─┘ ↓ Broker Slave2 (读+备份)
核心特性 1. 事务消息 RocketMQ 提供了分布式事务消息 支持,解决分布式事务一致性问题。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 @Service public class OrderTransactionService { @Autowired private RocketMQTemplate rocketMQTemplate; public void createOrderWithTransaction (Order order) { TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction( "order-topic" , MessageBuilder.withPayload(order).build(), order ); log.info("事务消息发送结果: {}" , result.getSendStatus()); } } @RocketMQTransactionListener public class OrderTransactionListener implements RocketMQLocalTransactionListener { @Autowired private OrderService orderService; @Override public RocketMQLocalTransactionState executeLocalTransaction (Message msg, Object arg) { try { Order order = (Order) arg; orderService.createOrder(order); return RocketMQLocalTransactionState.COMMIT; } catch (Exception e) { log.error("本地事务执行失败" , e); return RocketMQLocalTransactionState.ROLLBACK; } } @Override public RocketMQLocalTransactionState checkLocalTransaction (Message msg) { String orderId = msg.getHeaders().get("orderId" , String.class); boolean exists = orderService.exists(orderId); return exists ? RocketMQLocalTransactionState.COMMIT : RocketMQLocalTransactionState.ROLLBACK; } }
事务消息流程 :
1 2 3 4 5 6 1. 生产者发送 Half 消息(半消息,对消费者不可见) 2. RocketMQ Broker 存储 Half 消息 3. 生产者执行本地事务 4. 本地事务成功 → 发送 Commit → 消费者可以消费 本地事务失败 → 发送 Rollback → 消息被删除 超时或未知 → Broker 回查本地事务状态
2. 延迟消息 RocketMQ 支持 18 个固定的延迟级别 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 @Service public class DelayMessageService { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendDelayMessage (String message, int delayLevel) { rocketMQTemplate.syncSend("delay-topic" , MessageBuilder.withPayload(message).build(), 3000 , delayLevel ); } public void createOrder (Order order) { orderRepository.save(order); sendDelayMessage(order.getId(), 16 ); } } @Component @RocketMQMessageListener( topic = "delay-topic", consumerGroup = "delay-consumer" ) public class DelayMessageConsumer implements RocketMQListener <String> { @Override public void onMessage (String orderId) { Order order = orderService.getById(orderId); if (order.getStatus() == OrderStatus.UNPAID) { orderService.cancelOrder(orderId); } } }
3. 消息过滤 RocketMQ 支持 Tag 过滤 和 SQL92 过滤 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 @Service public class TagFilterProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendMessage (Order order) { String tag = order.getType(); rocketMQTemplate.syncSend( "order-topic:" + tag, order ); } } @RocketMQMessageListener( topic = "order-topic", selectorExpression = "VIP || URGENT", // 只消费 VIP 和 URGENT 订单 consumerGroup = "vip-consumer" ) public class VipOrderConsumer implements RocketMQListener <Order> { @Override public void onMessage (Order order) { log.info("处理优先级订单: {}" , order); } } @RocketMQMessageListener( topic = "order-topic", selectorType = SelectorType.SQL92, selectorExpression = "amount > 1000 AND region = 'Beijing'", consumerGroup = "high-value-consumer" ) public class HighValueOrderConsumer implements RocketMQListener <Order> { @Override public void onMessage (Order order) { log.info("处理高价值订单: {}" , order); } }
4. 顺序消息 RocketMQ 支持全局顺序 和分区顺序 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 @Service public class OrderProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendOrderEvent (String orderId, String event) { rocketMQTemplate.syncSendOrderly( "order-topic" , event, orderId ); } } @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer", consumeMode = ConsumeMode.ORDERLY // 顺序消费 ) public class OrderConsumer implements RocketMQListener <String> { @Override public void onMessage (String event) { log.info("按顺序处理订单事件: {}" , event); } }
使用场景
金融交易 : 对可靠性要求极高的场景
分布式事务 : 需要保证最终一致性
订单系统 : 需要顺序消息
延迟任务 : 订单超时、定时提醒
核心业务 : 对消息不能丢失有严格要求
Spring Boot 集成示例 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 <dependency> <groupId>org.apache.rocketmq</groupId> <artifactId>rocketmq-spring-boot-starter</artifactId> <version>2.2 .3 </version> </dependency> rocketmq: name-server: localhost:9876 producer: group: order-producer-group send-message-timeout: 3000 retry-times-when-send-failed: 3 consumer: group: order-consumer-group @Service public class OrderProducer { @Autowired private RocketMQTemplate rocketMQTemplate; public void sendOrder (Order order) { SendResult result = rocketMQTemplate.syncSend("order-topic" , order); log.info("消息发送结果: {}" , result.getSendStatus()); rocketMQTemplate.asyncSend("order-topic" , order, new SendCallback () { @Override public void onSuccess (SendResult sendResult) { log.info("消息发送成功" ); } @Override public void onException (Throwable e) { log.error("消息发送失败" , e); } }); rocketMQTemplate.sendOneWay("order-topic" , order); } } @Component @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer-group", messageModel = MessageModel.CLUSTERING // 集群模式(默认) ) public class OrderConsumer implements RocketMQListener <Order> { @Override public void onMessage (Order order) { log.info("收到订单: {}" , order); orderService.processOrder(order); } } @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-consumer-group", consumeThreadMax = 20 // 最大消费线程数 ) public class ConcurrentOrderConsumer implements RocketMQListener <Order> { @Override public void onMessage (Order order) { } }
优缺点总结 优点 :
高可靠性(事务消息、消息追踪)
高性能(十万级/秒)
功能丰富(延迟消息、顺序消息、过滤等)
运维友好(可视化控制台)
专为金融级业务设计
缺点 :
社区相对小(但在国内很活跃)
客户端主要支持 Java
学习成本较高
某些功能需要特定配置才能使用
其他知名MQ ActiveMQ - 老牌 JMS 实现 ActiveMQ 是 Apache 出品的老牌消息中间件,完全支持 JMS 1.1 规范。
特点 :
完全支持 JMS 规范
易于使用,上手快
支持多种协议(OpenWire、STOMP、AMQP、MQTT 等)
有官方管理界面
适用场景 :
中小型项目
对 JMS 规范有要求的项目
传统企业应用
不足 :
性能一般(万级/秒)
社区不够活跃
高可用方案不够完善
逐渐被新一代 MQ 替代
Pulsar - 云原生时代的新星 Pulsar 是 Apache 顶级项目,由 Yahoo 开发,定位为统一消息和流处理平台 。
核心特性 :
计算与存储分离 : Broker(计算层) + BookKeeper(存储层)
多租户 : 原生支持租户隔离
多层存储 : 支持热数据和冷数据分层存储
地理复制 : 跨数据中心复制
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 @Service public class PulsarService { @Autowired private PulsarTemplate<Order> pulsarTemplate; public void sendOrder (Order order) { pulsarTemplate.send("persistent://public/default/order-topic" , order); } @PulsarListener( topics = "persistent://public/default/order-topic", subscriptionName = "order-subscription" ) public void consume (Order order) { log.info("收到订单: {}" , order); } }
优势 :
超高吞吐量(百万级/秒)
计算存储分离,扩展性好
多租户支持
统一流批处理
不足 :
架构复杂,运维成本高
社区相对较小
国内生态不够完善
如何选择合适的MQ? 选型决策树 1 2 3 4 5 6 7 8 9 10 11 开始 ↓ 是否需要超高吞吐量(百万级/秒)? ├─ 是 → 日志收集/大数据场景? │ ├─ 是 → Kafka │ └─ 否 → RocketMQ 或 Pulsar └─ 否 → 是否需要复杂路由和延迟消息? ├─ 是 → RabbitMQ └─ 否 → 是否需要金融级可靠性? ├─ 是 → RocketMQ └─ 否 → RabbitMQ 或 ActiveMQ
场景推荐
场景
推荐MQ
理由
日志收集
Kafka
超高吞吐量,支持流处理
用户行为追踪
Kafka
海量数据,需要回溯
订单系统
RocketMQ
事务消息,顺序消息
支付系统
RocketMQ
金融级可靠性
任务队列
RabbitMQ
功能全面,易用性高
延迟任务
RabbitMQ/RocketMQ
都支持延迟消息
实时推荐
Kafka + Flink
流处理能力强
IoT
RabbitMQ/Pulsar
支持 MQTT 协议
中小型项目
RabbitMQ
简单易用,功能全面
传统企业
ActiveMQ
支持 JMS 规范
技术选型考量因素
业务需求 :
吞吐量要求(QPS)
延迟要求
消息顺序
可靠性要求
团队因素 :
生态因素 :
成本因素 :
消息队列的最佳实践 1. 消息幂等性 由于网络抖动、重试等原因,消息可能会被重复投递 ,消费者需要保证幂等性 。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 @Component public class IdempotentConsumer { @Autowired private RedisTemplate<String, String> redisTemplate; @KafkaListener(topics = "order-topic") public void consume (Order order) { String messageId = order.getMessageId(); String key = "consumed:" + messageId; Boolean success = redisTemplate.opsForValue() .setIfAbsent(key, "1" , Duration.ofDays(7 )); if (Boolean.TRUE.equals(success)) { orderService.processOrder(order); } else { log.warn("重复消息: {}" , messageId); } } }
2. 消息顺序性 如果需要保证顺序,应该:
Kafka: 相同 key 的消息发送到同一分区,消费者单线程消费
RabbitMQ: 使用单队列,消费者单线程消费
RocketMQ: 使用顺序消息 API
1 2 3 4 5 producer.send(new ProducerRecord <>("topic" , userId, message)); rocketMQTemplate.syncSendOrderly("topic" , message, userId);
3. 消息丢失防范 生产者端 :
同步发送或使用回调确认
开启事务或 acks=all(Kafka)
Broker 端 :
消费者端 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 kafkaTemplate.send("topic" , message).addCallback( result -> log.info("发送成功" ), ex -> { log.error("发送失败,存入数据库重试" , ex); saveToRetryTable(message); } ); @KafkaListener(topics = "topic") public void consume (Order order, Acknowledgment ack) { try { orderService.processOrder(order); ack.acknowledge(); } catch (Exception e) { log.error("处理失败,等待重试" , e); } }
4. 消息积压处理 消息积压通常由于消费速度 < 生产速度 导致。
解决方案 :
增加消费者数量 (最快):
1 2 3 4 5 6 7 @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-group", consumeThreadMax = 64 // 增加消费线程 )
批量消费 :
1 2 3 4 5 6 7 8 9 10 11 12 13 14 @RocketMQMessageListener( topic = "order-topic", consumerGroup = "order-group", consumeMode = ConsumeMode.CONCURRENTLY, messageModel = MessageModel.CLUSTERING, consumeThreadMax = 20 ) public class BatchConsumer implements RocketMQListener <List<Order>> { @Override public void onMessage (List<Order> orders) { orderService.batchProcess(orders); } }
优化消费逻辑 :
临时扩容队列 :
5. 监控告警 关键指标:
消息生产速率
消息消费速率
消息积压数量
消费延迟
消费失败率
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 @Component public class MqMetrics { private final MeterRegistry registry; public MqMetrics (MeterRegistry registry) { this .registry = registry; } public void recordMessageSent (String topic) { registry.counter("mq.message.sent" , "topic" , topic).increment(); } public void recordMessageConsumed (String topic, long duration) { registry.counter("mq.message.consumed" , "topic" , topic).increment(); registry.timer("mq.consume.duration" , "topic" , topic) .record(Duration.ofMillis(duration)); } }
总结 消息队列是分布式系统中的重要组件,通过异步解耦提升系统性能和可维护性。
核心要点
Kafka : 大数据、日志收集、流处理的首选,超高吞吐量
RabbitMQ : 功能最全面的通用MQ,易用性高,适合业务消息
RocketMQ : 金融级可靠性,事务消息,适合核心业务
ActiveMQ : 老牌 JMS 实现,适合中小型传统项目
Pulsar : 云原生新星,计算存储分离,适合统一消息平台
选型建议
日志收集、用户行为追踪 : Kafka
订单、支付等核心业务 : RocketMQ
任务队列、延迟任务 : RabbitMQ
中小型项目 : RabbitMQ 或 ActiveMQ
云原生架构 : Pulsar
最佳实践
保证消息幂等性(Redis + 消息ID)
合理使用顺序消息
多层确认防止消息丢失
监控告警及时发现问题
做好限流和降级预案
选择合适的消息队列,需要综合考虑业务需求、团队能力、成本等因素。在实际项目中,也可以混合使用多种MQ,发挥各自优势。
参考资料