什么是消息队列?

消息队列(Message Queue,简称MQ)是一种异步通信机制,用于在分布式系统中传递消息。它允许应用程序通过发送和接收消息来相互通信,而不需要知道彼此的位置,也不需要在同一时间都处于运行状态。

消息队列的核心概念

1
生产者(Producer) → [消息队列] → 消费者(Consumer)
  • 生产者(Producer): 发送消息的应用程序
  • 消息队列(Message Queue): 存储消息的缓冲区
  • 消费者(Consumer): 接收并处理消息的应用程序
  • Broker: 消息中间件服务器,负责接收、存储和转发消息
  • Topic/Exchange: 消息的分类或路由规则

为什么需要消息队列?

在没有消息队列的情况下,系统之间的调用通常是同步的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 同步调用:订单服务必须等待所有操作完成
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);

// 2. 扣减库存(同步调用,可能很慢)
inventoryService.deductStock(order);

// 3. 发送短信通知(同步调用,可能很慢)
smsService.sendNotification(order);

// 4. 发送邮件(同步调用,可能很慢)
emailService.sendEmail(order);

// 用户需要等待所有这些操作完成才能得到响应
}

问题:

  • 响应时间长:用户需要等待所有操作完成
  • 耦合度高:订单服务依赖库存、短信、邮件等服务
  • 可用性差:任何一个服务故障都会导致订单创建失败

使用消息队列后:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
// 异步调用:订单服务只需要发送消息即可快速返回
public void createOrder(Order order) {
// 1. 保存订单
orderRepository.save(order);

// 2. 发送消息到消息队列(非常快)
messageQueue.send("order.created", order);

// 立即返回,用户体验好
}

// 消费者异步处理
@MessageListener("order.created")
public void handleOrderCreated(Order order) {
inventoryService.deductStock(order);
smsService.sendNotification(order);
emailService.sendEmail(order);
}

消息队列的核心优势

1. 异步处理

场景: 用户注册后需要发送邮件和短信通知。

1
2
3
4
5
6
7
同步方式:
用户注册 → 保存数据(50ms) → 发送邮件(1000ms) → 发送短信(1000ms)
总耗时: 2050ms

异步方式:
用户注册 → 保存数据(50ms) → 发送消息(5ms)
总耗时: 55ms (邮件和短信在后台异步处理)

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// 同步方式
@Service
public class UserService {
@Autowired
private EmailService emailService;

@Autowired
private SmsService smsService;

public void register(User user) {
userRepository.save(user); // 50ms
emailService.sendWelcomeEmail(user); // 1000ms
smsService.sendVerificationCode(user);// 1000ms
// 总耗时: 2050ms
}
}

// 异步方式(使用消息队列)
@Service
public class UserService {
@Autowired
private RabbitTemplate rabbitTemplate;

public void register(User user) {
userRepository.save(user); // 50ms

// 发送消息到队列
rabbitTemplate.convertAndSend("user.register", user); // 5ms

// 立即返回,总耗时: 55ms
}
}

// 消费者异步处理通知
@Component
public class UserRegisterListener {
@RabbitListener(queues = "user.register")
public void handleUserRegister(User user) {
emailService.sendWelcomeEmail(user);
smsService.sendVerificationCode(user);
}
}

2. 系统解耦

场景: 订单系统需要通知多个下游系统。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
不使用MQ:
订单服务 → 库存服务
→ 物流服务
→ 积分服务
→ 数据分析服务

问题: 订单服务需要知道所有下游服务,耦合度高

使用MQ:
订单服务 → [消息队列] ← 库存服务
← 物流服务
← 积分服务
← 数据分析服务

优点: 订单服务只需要发消息,不需要知道有哪些消费者

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
// 不使用MQ:强耦合
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService;

@Autowired
private LogisticsService logisticsService;

@Autowired
private PointsService pointsService;

public void createOrder(Order order) {
orderRepository.save(order);

// 强依赖所有下游服务
inventoryService.deductStock(order);
logisticsService.createShipment(order);
pointsService.addPoints(order);

// 新增下游服务需要修改代码
}
}

// 使用MQ:解耦
@Service
public class OrderService {
@Autowired
private KafkaTemplate kafkaTemplate;

public void createOrder(Order order) {
orderRepository.save(order);

// 只需要发送一条消息
kafkaTemplate.send("order.created", order);

// 新增下游服务不需要修改代码,只需要订阅消息即可
}
}

// 各个下游服务独立消费
@Component
public class InventoryListener {
@KafkaListener(topics = "order.created")
public void handleOrderCreated(Order order) {
inventoryService.deductStock(order);
}
}

@Component
public class LogisticsListener {
@KafkaListener(topics = "order.created")
public void handleOrderCreated(Order order) {
logisticsService.createShipment(order);
}
}

3. 流量削峰

场景: 秒杀活动瞬时流量巨大。

1
2
3
4
5
6
不使用MQ:
10000请求/秒 → 数据库(只能处理1000请求/秒) → 数据库崩溃

使用MQ:
10000请求/秒 → [消息队列] → 按照数据库承受能力消费(1000请求/秒)
消息暂存在队列中,慢慢处理

代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
// 秒杀场景
@RestController
public class SeckillController {
@Autowired
private RocketMQTemplate rocketMQTemplate;

@PostMapping("/seckill")
public Response seckill(@RequestBody SeckillRequest request) {
// 简单校验
if (!validateRequest(request)) {
return Response.fail("参数错误");
}

// 将请求放入消息队列,立即返回
rocketMQTemplate.convertAndSend("seckill-topic", request);

return Response.success("排队中,请稍后查看结果");
}
}

// 消费者按照数据库承受能力慢慢处理
@Component
@RocketMQMessageListener(
topic = "seckill-topic",
consumerGroup = "seckill-consumer",
consumeThreadMax = 20 // 限制并发数
)
public class SeckillConsumer implements RocketMQListener<SeckillRequest> {
@Override
public void onMessage(SeckillRequest request) {
// 按照数据库承受能力处理
seckillService.processSeckill(request);
}
}

4. 可靠性保证

消息队列提供了多种机制保证消息不丢失:

  • 持久化: 消息写入磁盘,即使服务重启也不会丢失
  • 确认机制: 消费者处理完消息后才确认,否则消息会重新投递
  • 死信队列: 处理失败的消息会进入死信队列,避免消息丢失

消息队列的应用场景

1. 异步处理

  • 用户注册后发送欢迎邮件
  • 订单创建后发送短信通知
  • 文件上传后异步处理(转码、压缩等)

2. 系统解耦

  • 订单系统与库存系统解耦
  • 用户服务与积分服务解耦
  • 支付系统与会计系统解耦

3. 流量削峰

  • 秒杀活动
  • 抢购活动
  • 大促期间的订单处理

4. 日志收集

  • 应用日志收集到中心日志系统
  • 操作日志、审计日志收集

5. 数据同步

  • MySQL 数据同步到 Elasticsearch
  • 订单数据同步到数据仓库
  • 缓存失效通知

6. 事件驱动

  • 用户行为追踪
  • 业务事件通知
  • 状态变更通知

主流消息队列对比

目前主流的消息队列中间件主要有 KafkaRabbitMQRocketMQActiveMQPulsar 等。下面我们详细对比这些主流MQ的特点和适用场景。

核心对比表格

特性 Kafka RabbitMQ RocketMQ ActiveMQ Pulsar
开发语言 Scala/Java Erlang Java Java Java
开发公司 LinkedIn Pivotal 阿里巴巴 Apache Apache/Yahoo
开源时间 2011 2007 2012 2004 2016
协议支持 自定义协议 AMQP 自定义协议 JMS、AMQP 自定义协议
消息模型 发布/订阅 多种模式 发布/订阅 多种模式 发布/订阅
消息存储 磁盘(顺序写) 内存+磁盘 磁盘(随机写) 内存+磁盘 磁盘分层存储
消息顺序 分区内有序 不保证 全局有序/分区有序 不保证 分区内有序
吞吐量 极高(百万级/秒) 高(万级/秒) 极高(十万级/秒) 中等(万级/秒) 极高(百万级/秒)
延迟 毫秒级 微秒级 毫秒级 毫秒级 毫秒级
可用性 极高(副本机制) 高(镜像队列) 极高(主从架构) 高(主从架构) 极高(多副本)
消息回溯 支持 不支持 支持 不支持 支持
事务消息 支持(幂等) 支持 支持 支持 支持
延迟消息 不支持 支持(插件) 支持(18个级别) 支持 支持
死信队列 不支持 支持 支持 支持 支持
消息过滤 支持 支持(Tag/SQL92) 支持 支持
消息追踪 支持 支持(插件) 支持 支持 支持
管理界面 第三方 官方(优秀) 官方(RocketMQ Console) 官方 官方
客户端语言 多种语言 多种语言 Java为主 Java为主 多种语言
社区活跃度 非常活跃 活跃 活跃 一般 活跃
学习成本 中等 中等 中等
运维成本 中等 中等
适用场景 大数据、日志收集、流处理 业务消息、任务队列 金融级业务、事务消息 中小型项目 统一消息平台

Kafka - 大数据场景的王者

架构特点

Kafka 是一个分布式流处理平台,最初由 LinkedIn 开发,专为高吞吐量的日志收集和流处理场景设计。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
Kafka 集群架构:

Producer1 ──┐
Producer2 ──┼─→ [Broker1] ─┐
Producer3 ──┘ │
├─→ [ZooKeeper 协调]
Consumer1 ──┐ │
Consumer2 ──┼─← [Broker2] ─┤
Consumer3 ──┘ │
[Broker3] ─┘

Topic 分区机制:
Topic: order-events
├─ Partition 0 (Leader: Broker1, Replica: Broker2)
├─ Partition 1 (Leader: Broker2, Replica: Broker3)
└─ Partition 2 (Leader: Broker3, Replica: Broker1)

核心特性

1. 超高吞吐量

Kafka 的吞吐量可以达到百万级/秒,这得益于:

  • 顺序写磁盘: 比随机写内存还快
  • 零拷贝(Zero Copy): 减少数据在内核态和用户态之间的拷贝
  • 批量发送: 减少网络开销
  • 压缩: 支持 GZIP、Snappy、LZ4 等压缩算法
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Kafka 生产者配置(高吞吐量优化)
Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("key.serializer""org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer""org.apache.kafka.common.serialization.StringSerializer");

// 批量发送配置
props.put("batch.size"16384); // 批量大小 16KB
props.put("linger.ms"10); // 等待 10ms 凑够一批
props.put("buffer.memory"33554432); // 缓冲区 32MB
props.put("compression.type""lz4"); // 使用 LZ4 压缩

KafkaProducer<String, String> producer = new KafkaProducer<>(props);

// 发送消息
for (int i = 0; i < 1000000; i++) {
producer.send(new ProducerRecord<>("my-topic""key-" + i, "value-" + i));
}

producer.close();

2. 消息持久化与回溯

Kafka 将所有消息持久化到磁盘,并且支持消息回溯(重新消费历史消息)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// 消费者从指定位置开始消费
Properties props = new Properties();
props.put("bootstrap.servers""localhost:9092");
props.put("group.id""my-group");
props.put("key.deserializer""org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer""org.apache.kafka.common.serialization.StringDeserializer");

// 手动指定从哪个位置开始消费
props.put("auto.offset.reset""earliest"); // 从最早的消息开始

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("my-topic"));

// 或者指定具体的 offset
TopicPartition partition = new TopicPartition("my-topic"0);
consumer.assign(Arrays.asList(partition));
consumer.seek(partition, 1000); // 从 offset=1000 开始消费

while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
System.out.printf("offset = %d, key = %s, value = %s%n"
record.offset(), record.key(), record.value());
}
}

3. 分区机制保证顺序

Kafka 通过分区(Partition)机制,保证同一分区内的消息是有序的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
// 发送消息时指定 key,相同 key 的消息会进入同一分区
producer.send(new ProducerRecord<>("order-topic", userId, orderData));

// 自定义分区器
public class UserPartitioner implements Partitioner {
@Override
public int partition(String topic, Object key, byte[] keyBytes,
Object value, byte[] valueBytes, Cluster cluster) {
// 根据用户 ID 分区,保证同一用户的订单顺序
String userId = (String) key;
int numPartitions = cluster.partitionCountForTopic(topic);
return Math.abs(userId.hashCode()) % numPartitions;
}
}

4. 高可用性

Kafka 通过**副本(Replica)**机制保证高可用:

  • 每个分区可以有多个副本
  • 一个 Leader 副本,多个 Follower 副本
  • Leader 负责读写,Follower 同步数据
  • Leader 故障时,自动选举新 Leader
1
2
3
4
5
6
7
8
Partition 0 的副本分布:
Broker1: Leader (读写)
Broker2: Follower (同步)
Broker3: Follower (同步)

Broker1 宕机后:
Broker2: Leader (自动选举,继续提供服务)
Broker3: Follower (同步)

使用场景

  1. 日志收集: ELK(Elasticsearch + Logstash + Kibana) 中的 L
  2. 大数据管道: 作为数据源和数据汇的中间缓冲
  3. 实时流处理: Kafka Streams、Flink、Spark Streaming
  4. 用户行为追踪: 网站点击流、APP 埋点数据
  5. 运维监控: 收集服务器、应用程序的监控指标

Spring Boot 集成示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// 1. 添加依赖
// pom.xml
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

// 2. 配置
// application.yml
spring:
kafka:
bootstrap-servers: localhost:9092
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all # 确保消息不丢失
retries: 3
consumer:
group-id: my-group
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
properties:
spring.json.trusted.packages: com.example.model
auto-offset-reset: earliest
enable-auto-commit: false # 手动提交 offset

// 3. 生产者
@Service
public class OrderProducer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

public void sendOrder(Order order) {
kafkaTemplate.send("order-topic", order.getUserId(), order)
.addCallback(
result -> log.info("消息发送成功: {}", result),
ex -> log.error("消息发送失败", ex)
);
}
}

// 4. 消费者
@Component
public class OrderConsumer {
@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consume(
@Payload Order order,
@Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
Acknowledgment acknowledgment
) {
log.info("收到消息: partition={}, offset={}, order={}", partition, offset, order);

try {
// 处理业务逻辑
orderService.processOrder(order);

// 手动提交 offset
acknowledgment.acknowledge();
} catch (Exception e) {
log.error("消息处理失败", e);
// 不提交 offset,消息会重新消费
}
}
}

优缺点总结

优点:

  • 超高吞吐量(百万级/秒)
  • 高可用性和容错性
  • 支持消息回溯
  • 分区机制保证顺序
  • 生态完善(Kafka Streams、Kafka Connect 等)

缺点:

  • 消息延迟相对较高(毫秒级)
  • 不支持消息优先级
  • 不支持延迟消息(需要额外实现)
  • 原生不支持死信队列(需要应用层手动实现,Kafka Streams 在未来版本计划支持)
  • 依赖 ZooKeeper(新版本改用 KRaft,但还在过渡期)
  • 学习曲线陡峭

RabbitMQ - 功能最全面的通用MQ

架构特点

RabbitMQ 是基于 AMQP(Advanced Message Queuing Protocol) 协议实现的消息中间件,使用 Erlang 语言开发,以高可靠性著称。

1
2
3
4
5
6
7
8
9
10
11
RabbitMQ 消息流转:

Producer → Exchange (交换机) → Queue (队列) → Consumer

Binding (绑定规则)

Exchange 类型:
1. Direct: 精确匹配 routing key
2. Topic: 通配符匹配 routing key
3. Fanout: 广播到所有绑定的队列
4. Headers: 根据消息头匹配

核心特性

1. 灵活的路由机制

RabbitMQ 提供了 4 种 Exchange 类型,支持复杂的消息路由。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
// Direct Exchange: 精确匹配
@Configuration
public class DirectConfig {
@Bean
public DirectExchange directExchange() {
return new DirectExchange("direct-exchange");
}

@Bean
public Queue errorQueue() {
return new Queue("error-queue");
}

@Bean
public Queue infoQueue() {
return new Queue("info-queue");
}

@Bean
public Binding errorBinding() {
return BindingBuilder.bind(errorQueue())
.to(directExchange())
.with("error"); // routing key
}

@Bean
public Binding infoBinding() {
return BindingBuilder.bind(infoQueue())
.to(directExchange())
.with("info");
}
}

// Topic Exchange: 通配符匹配
@Configuration
public class TopicConfig {
@Bean
public TopicExchange topicExchange() {
return new TopicExchange("topic-exchange");
}

@Bean
public Queue orderQueue() {
return new Queue("order-queue");
}

@Bean
public Binding orderBinding() {
return BindingBuilder.bind(orderQueue())
.to(topicExchange())
.with("order.*"); // 匹配 order.created、order.paid 等
}
}

// Fanout Exchange: 广播
@Configuration
public class FanoutConfig {
@Bean
public FanoutExchange fanoutExchange() {
return new FanoutExchange("fanout-exchange");
}

@Bean
public Binding fanoutBinding1() {
return BindingBuilder.bind(new Queue("queue1"))
.to(fanoutExchange());
}

@Bean
public Binding fanoutBinding2() {
return BindingBuilder.bind(new Queue("queue2"))
.to(fanoutExchange());
}
}

2. 消息可靠性保证

RabbitMQ 提供了多层可靠性保证:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Configuration
public class ReliableConfig {
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate template = new RabbitTemplate(connectionFactory);

// 1. 确认消息是否到达 Exchange
template.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
log.info("消息到达 Exchange");
} else {
log.error("消息未到达 Exchange: {}", cause);
}
});

// 2. 确认消息是否到达 Queue
template.setReturnsCallback(returned -> {
log.error("消息未到达 Queue: {}", returned.getMessage());
});

// 开启 mandatory 模式
template.setMandatory(true);

return template;
}
}

// 持久化配置
@Bean
public Queue durableQueue() {
return QueueBuilder.durable("my-queue") // 队列持久化
.build();
}

// 发送持久化消息
rabbitTemplate.convertAndSend("exchange""routing-key", message, msg -> {
msg.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return msg;
});

// 3. 消费者手动确认
@RabbitListener(queues = "my-queue", ackMode = "MANUAL")
public void consume(Message message, Channel channel) throws IOException {
try {
// 处理业务逻辑
processMessage(message);

// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
// 拒绝消息,重新入队
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
}
}

3. 死信队列(DLX - Dead Letter Exchange)

什么是死信队列?

死信队列(Dead Letter Queue, DLQ) 是一种特殊的队列,用于存储无法被正常消费的消息。当消息在业务队列中无法正常处理时,会被转发到死信队列,避免消息丢失,便于后续排查和处理。

什么消息会成为死信?

消息变成死信有以下三种情况:

1
2
3
4
5
6
7
8
1. 消息被拒绝(basic.reject / basic.nack) 且 requeue = false
└─ 消费者明确拒绝消息,并不重新入队

2. 消息 TTL 过期
└─ 消息在队列中存活时间超过设置的过期时间

3. 队列达到最大长度
└─ 队列满了,新消息会将旧消息挤出成为死信
死信队列的作用
  1. 避免消息丢失: 处理失败的消息不会被丢弃,而是转移到死信队列
  2. 问题排查: 可以查看死信队列中的消息,分析失败原因
  3. 延迟重试: 可以从死信队列中重新投递消息进行重试
  4. 告警通知: 监控死信队列,发现异常及时告警
  5. 实现延迟队列: 利用 TTL + 死信机制实现延迟消息
死信队列工作原理
1
2
3
4
5
6
7
8
9
10
11
正常消息流转:
Producer → Exchange → Business Queue → Consumer(处理成功)

死信消息流转:
Producer → Exchange → Business Queue → Consumer(处理失败)

(消息变成死信)

DLX Exchange ← (x-dead-letter-exchange)

Dead Letter Queue → Dead Letter Consumer(人工处理)

核心配置:

  • 业务队列需要配置 x-dead-letter-exchange(死信交换机)
  • 可选配置 x-dead-letter-routing-key(死信路由键)
  • 死信消息会携带额外的 headers 信息:
    • x-first-death-queue: 原始队列名称
    • x-first-death-reason: 成为死信的原因(rejected/expired/maxlen)
    • x-death: 详细的死信信息列表
完整代码示例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
@Configuration
public class DeadLetterConfig {
// 1. 死信交换机
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange("dlx-exchange");
}

// 2. 死信队列
@Bean
public Queue deadLetterQueue() {
return new Queue("dlx-queue");
}

@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with("dlx"); // 死信路由键
}

// 3. 业务队列(配置死信交换机)
@Bean
public Queue businessQueue() {
return QueueBuilder.durable("business-queue")
.withArgument("x-dead-letter-exchange""dlx-exchange") // 指定死信交换机
.withArgument("x-dead-letter-routing-key""dlx") // 指定死信路由键
.withArgument("x-message-ttl"60000) // 消息 TTL 60秒
.withArgument("x-max-length"10000) // 队列最大长度
.build();
}

@Bean
public DirectExchange businessExchange() {
return new DirectExchange("business-exchange");
}

@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with("business");
}
}

// 业务消费者(模拟处理失败的场景)
@Component
public class BusinessConsumer {
@RabbitListener(queues = "business-queue", ackMode = "MANUAL")
public void consume(Order order, Channel channel, Message message) throws IOException {
try {
log.info("处理订单: {}", order);

// 模拟业务处理
if (order.getAmount() < 0) {
throw new IllegalArgumentException("订单金额不能为负数");
}

orderService.processOrder(order);

// 处理成功,确认消息
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {
log.error("订单处理失败: {}", order, e);

// 获取重试次数
Map<String, Object> headers = message.getMessageProperties().getHeaders();
Integer retryCount = (Integer) headers.getOrDefault("retry-count"0);

if (retryCount < 3) {
// 未达到重试上限,拒绝消息并重新入队
log.info("重试第 {} 次", retryCount + 1);
headers.put("retry-count", retryCount + 1);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
} else {
// 超过重试次数,拒绝消息并不重新入队 → 进入死信队列
log.error("超过最大重试次数,消息进入死信队列");
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsefalse);
}
}
}
}

// 死信队列消费者(处理死信消息)
@Component
public class DeadLetterConsumer {
@Autowired
private AlarmService alarmService;

@RabbitListener(queues = "dlx-queue")
public void consumeDeadLetter(Message message, Channel channel) throws IOException {
try {
String body = new String(message.getBody());
Map<String, Object> headers = message.getMessageProperties().getHeaders();

// 获取死信原因
String reason = (String) headers.get("x-first-death-reason");
String originalQueue = (String) headers.get("x-first-death-queue");

log.error("收到死信消息 - 原始队列: {}, 死信原因: {}, 消息内容: {}"
originalQueue, reason, body);

// 根据不同的死信原因进行不同的处理
switch (reason) {
case "rejected":
// 消息被拒绝:通常是业务逻辑问题
log.error("消息被拒绝,可能是业务数据异常");
// 1. 记录到数据库,等待人工处理
saveToFailedMessageTable(message);
// 2. 发送告警通知
alarmService.sendAlert("消息处理失败", body);
break;

case "expired":
// 消息过期:可能是消费者处理太慢或消费者宕机
log.error("消息过期,可能是消费速度过慢");
// 根据业务决定是否重新投递
retryMessage(message);
break;

case "maxlen":
// 队列满:可能是消费者宕机,消息大量积压
log.error("队列已满,消息被挤出");
alarmService.sendAlert("队列积压严重""队列: " + originalQueue);
break;

default:
log.error("未知的死信原因: {}", reason);
}

// 确认死信消息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

} catch (Exception e) {
log.error("死信消息处理失败", e);
// 死信消息处理失败,可以选择:
// 1. 再次拒绝(谨慎使用,避免无限循环)
// 2. 直接确认,记录到数据库等待人工处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
}
}

// 保存失败消息到数据库
private void saveToFailedMessageTable(Message message) {
FailedMessage failedMessage = new FailedMessage();
failedMessage.setBody(new String(message.getBody()));
failedMessage.setHeaders(message.getMessageProperties().getHeaders());
failedMessage.setCreateTime(new Date());
failedMessageRepository.save(failedMessage);
}

// 重新投递消息
private void retryMessage(Message message) {
// 可以将消息重新发送到业务队列,或者等待一段时间后再发送
rabbitTemplate.convertAndSend("business-exchange""business"
new String(message.getBody()));
}
}
死信队列的实际应用场景

1. 消息重试机制

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// 配置最大重试次数,超过后进入死信队列
@RabbitListener(queues = "order-queue", ackMode = "MANUAL")
public void consume(Order order, Channel channel, Message message) throws IOException {
try {
orderService.processOrder(order);
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
Integer retryCount = getRetryCount(message);
if (retryCount < 3) {
// 重试
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
} else {
// 进入死信队列
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsefalse);
}
}
}

2. 异常消息监控

1
2
3
4
5
6
7
8
9
10
11
12
// 定期检查死信队列,发现异常及时处理
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void checkDeadLetterQueue() {
// 获取死信队列长度
Properties properties = rabbitAdmin.getQueueProperties("dlx-queue");
Integer messageCount = (Integer) properties.get("QUEUE_MESSAGE_COUNT");

if (messageCount > 100) {
// 死信消息过多,发送告警
alarmService.sendAlert("死信队列消息过多""当前消息数: " + messageCount);
}
}

3. 实现延迟队列

1
// 利用 TTL + 死信实现延迟消息(详见下一节)
死信队列与普通队列的区别
特性 普通队列 死信队列
消息来源 生产者直接发送 业务队列转发失败消息
消息类型 正常业务消息 无法正常处理的消息
处理方式 正常业务处理 告警、人工处理、重试
是否自动消费 通常不自动重试
最佳实践
  1. 必须监控死信队列: 设置告警,及时发现问题
  2. 记录详细日志: 死信消息要记录详细的失败原因和上下文
  3. 避免无限死信循环: 死信队列的消费者不要再配置死信队列
  4. 合理设置重试次数: 避免不可恢复的错误无限重试
  5. 定期清理: 对于已处理的死信消息,定期清理或归档
其他 MQ 的死信队列实现

不同的 MQ 对死信队列的支持程度不同。RabbitMQ 和 RocketMQ 都提供了原生的死信队列支持,而 Kafka 不支持原生死信队列,需要通过应用层手动实现。

Kafka 的死信队列实现

Kafka 的设计哲学是 “dumb pipes, smart endpoints”(愚蠢的管道,聪明的端点):

  • Kafka Broker 只负责高效存储和转发消息
  • 消息处理逻辑(包括错误处理)由消费者自己决定
  • 这种设计保证了 Broker 的高性能和简洁性

方案一: 手动实现死信队列

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Component
public class KafkaDLQConsumer {
@Autowired
private KafkaTemplate<String, Order> kafkaTemplate;

private static final String DLQ_TOPIC = "order-dlq"; // 死信 Topic
private static final int MAX_RETRY = 3;

@KafkaListener(topics = "order-topic", groupId = "order-group")
public void consume(
ConsumerRecord<String, Order> record,
Acknowledgment ack,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic
) {
Order order = record.value();
String messageId = order.getMessageId();

try {
// 处理业务逻辑
orderService.processOrder(order);
ack.acknowledge();

} catch (Exception e) {
log.error("消息处理失败: {}", order, e);

// 获取重试次数
Integer retryCount = getRetryCount(messageId);

if (retryCount < MAX_RETRY) {
log.warn("消息处理失败,第 {} 次重试", retryCount + 1);
incrementRetryCount(messageId);
// 不提交 offset,消息会重新消费

} else {
// 超过最大重试次数,发送到死信队列
log.error("消息处理失败超过最大重试次数,发送到死信队列: {}", order);
sendToDLQ(record, e);
ack.acknowledge();
}
}
}

// 发送到死信队列
private void sendToDLQ(ConsumerRecord<String, Order> record, Exception e) {
try {
ProducerRecord<String, Order> dlqRecord = new ProducerRecord<>(
DLQ_TOPIC,
record.key(),
record.value()
);

// 添加死信元数据
dlqRecord.headers()
.add("original-topic", record.topic().getBytes())
.add("original-partition", String.valueOf(record.partition()).getBytes())
.add("original-offset", String.valueOf(record.offset()).getBytes())
.add("failure-reason", e.getMessage().getBytes())
.add("failure-timestamp", String.valueOf(System.currentTimeMillis()).getBytes());

kafkaTemplate.send(dlqRecord).get();
log.info("消息已发送到死信队列: topic={}, offset={}", record.topic(), record.offset());

} catch (Exception ex) {
log.error("发送到死信队列失败", ex);
saveToDatabase(record, e); // 保存到数据库作为兜底方案
}
}
}

// 死信队列消费者
@Component
public class KafkaDLQMessageConsumer {
@KafkaListener(topics = "order-dlq", groupId = "dlq-group")
public void consumeDLQ(
ConsumerRecord<String, Order> record,
@Header("original-topic") String originalTopic,
@Header("failure-reason") String failureReason
) {
log.error("处理死信消息 - 原始Topic: {}, 失败原因: {}, 消息: {}"
originalTopic, failureReason, record.value());

// 1. 记录到数据库或监控系统
// 2. 发送告警
// 3. 人工处理或根据业务决定是否重新投递
}
}

方案二: 使用重试 Topic(推荐)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
@Component
public class KafkaRetryConsumer {
private static final String MAIN_TOPIC = "order-topic";
private static final String RETRY_TOPIC = "order-retry";
private static final String DLQ_TOPIC = "order-dlq";
private static final int MAX_RETRY = 3;

@KafkaListener(topics = MAIN_TOPIC, groupId = "order-group")
public void consumeMain(ConsumerRecord<String, Order> record, Acknowledgment ack) {
processMessage(record, 0, ack);
}

@KafkaListener(topics = RETRY_TOPIC, groupId = "order-retry-group")
public void consumeRetry(
ConsumerRecord<String, Order> record,
@Header(value = "retry-count", required = false) Integer retryCount,
Acknowledgment ack
) {
int currentRetry = retryCount == null ? 1 : retryCount;
processMessage(record, currentRetry, ack);
}

private void processMessage(ConsumerRecord<String, Order> record, int retryCount, Acknowledgment ack) {
try {
orderService.processOrder(record.value());
ack.acknowledge(); // 处理成功

} catch (Exception e) {
log.error("消息处理失败,当前重试次数: {}", retryCount, e);

if (retryCount < MAX_RETRY) {
sendToRetry(record, retryCount + 1); // 发送到重试 Topic
} else {
sendToDLQ(record, e); // 发送到死信队列
}

// 无论如何都提交 offset,避免阻塞
ack.acknowledge();
}
}

private void sendToRetry(ConsumerRecord<String, Order> record, int retryCount) {
ProducerRecord<String, Order> retryRecord = new ProducerRecord<>(
RETRY_TOPIC,
record.key(),
record.value()
);
retryRecord.headers().add("retry-count", String.valueOf(retryCount).getBytes());
kafkaTemplate.send(retryRecord);
}
}

Kafka Connect 的死信队列支持

如果使用 Kafka Connect,它已经内置了死信队列功能:

1
2
3
4
# Kafka Connect 配置
errors.tolerance=all
errors.deadletterqueue.topic.name=dlq-topic
errors.deadletterqueue.context.headers.enable=true

未来展望: KIP-1034

Apache Kafka 社区提出了 KIP-1034,计划为 Kafka Streams 添加原生死信队列支持:

  • 配置参数: errors.deadletterqueue.topic.name
  • 自动将处理失败的记录发送到死信 Topic
  • 携带失败原因、原始 Topic 等元数据

当前状态(2025年1月):

  • KIP-1034 已被采纳(Adopted)
  • 具体发布版本待定
  • Kafka Consumer API 暂无原生 DLQ 支持计划

4. 延迟队列

RabbitMQ 可以通过 TTL + 死信队列 实现延迟消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
@Configuration
public class DelayQueueConfig {
// 延迟队列(没有消费者)
@Bean
public Queue delayQueue() {
return QueueBuilder.durable("delay-queue")
.withArgument("x-dead-letter-exchange""business-exchange")
.withArgument("x-dead-letter-routing-key""business")
.build();
}

// 业务队列
@Bean
public Queue businessQueue() {
return new Queue("business-queue");
}

@Bean
public DirectExchange businessExchange() {
return new DirectExchange("business-exchange");
}

@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with("business");
}
}

// 发送延迟消息
@Service
public class DelayMessageService {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendDelayMessage(String message, long delayMillis) {
rabbitTemplate.convertAndSend("delay-queue", message, msg -> {
// 设置消息过期时间
msg.getMessageProperties().setExpiration(String.valueOf(delayMillis));
return msg;
});
}
}

// 消费延迟消息
@RabbitListener(queues = "business-queue")
public void consumeBusinessMessage(String message) {
log.info("收到延迟消息: {}", message);
}

使用场景

  1. 业务消息: 订单、支付、物流等核心业务
  2. 任务队列: 异步任务处理
  3. RPC 调用: 同步请求-响应模式
  4. 延迟任务: 订单超时自动取消、定时提醒
  5. 事件通知: 系统间的事件驱动

Spring Boot 集成示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
// 1. 添加依赖
// pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

// 2. 配置
// application.yml
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
virtual-host: /
listener:
simple:
acknowledge-mode: manual # 手动确认
prefetch: 1 # 每次只取一条消息

// 3. 生产者
@Service
public class OrderProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendOrder(Order order) {
rabbitTemplate.convertAndSend("order-exchange""order.created", order);
}
}

// 4. 消费者
@Component
public class OrderConsumer {
@RabbitListener(queues = "order-queue")
public void consume(Order order, Channel channel, Message message) throws IOException {
try {
log.info("收到订单: {}", order);
orderService.processOrder(order);

// 手动确认
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
log.error("处理失败", e);

// 拒绝消息,重新入队(最多重试3次)
Integer retryCount = (Integer) message.getMessageProperties()
.getHeaders().get("retry-count");
if (retryCount == null) {
retryCount = 0;
}

if (retryCount < 3) {
message.getMessageProperties().getHeaders().put("retry-count", retryCount + 1);
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsetrue);
} else {
// 超过重试次数,拒绝并不重新入队(进入死信队列)
channel.basicNack(message.getMessageProperties().getDeliveryTag(), falsefalse);
}
}
}
}

优缺点总结

优点:

  • 功能非常全面(路由、延迟、死信等)
  • 可靠性高(多层确认机制)
  • 管理界面友好
  • 社区活跃,文档完善
  • 支持多种协议(AMQP、MQTT、STOMP 等)
  • 客户端语言丰富

缺点:

  • 吞吐量相对较低(万级/秒)
  • Erlang 语言不易定制开发
  • 集群部署相对复杂
  • 不支持消息回溯

RocketMQ - 阿里巴巴的金融级MQ

架构特点

RocketMQ 是阿里巴巴开源的分布式消息中间件,专为金融级业务设计,具有高可靠、高可用、高性能的特点。

1
2
3
4
5
6
7
8
9
10
RocketMQ 架构:

Producer ─┐
Producer ─┼─→ [NameServer 集群] ←─ Broker Master1 (写)
Producer ─┘ ↓ ↓
↓ Broker Slave1 (读+备份)
Consumer ─┐ ↓
Consumer ─┼─→ 路由信息 ←─ Broker Master2 (写)
Consumer ─┘ ↓
Broker Slave2 (读+备份)

核心特性

1. 事务消息

RocketMQ 提供了分布式事务消息支持,解决分布式事务一致性问题。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
// 事务消息实现最终一致性
@Service
public class OrderTransactionService {
@Autowired
private RocketMQTemplate rocketMQTemplate;

public void createOrderWithTransaction(Order order) {
// 发送事务消息
TransactionSendResult result = rocketMQTemplate.sendMessageInTransaction(
"order-topic"
MessageBuilder.withPayload(order).build(),
order // 传递给事务监听器的参数
);

log.info("事务消息发送结果: {}", result.getSendStatus());
}
}

// 事务监听器
@RocketMQTransactionListener
public class OrderTransactionListener implements RocketMQLocalTransactionListener {
@Autowired
private OrderService orderService;

/**
* 执行本地事务
*/
@Override
public RocketMQLocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
Order order = (Order) arg;

// 执行本地数据库操作
orderService.createOrder(order);

// 本地事务成功,返回 COMMIT,消息会被投递给消费者
return RocketMQLocalTransactionState.COMMIT;
} catch (Exception e) {
log.error("本地事务执行失败", e);

// 本地事务失败,返回 ROLLBACK,消息会被丢弃
return RocketMQLocalTransactionState.ROLLBACK;
}
}

/**
* 回查本地事务状态(如果 executeLocalTransaction 返回 UNKNOWN 或超时)
*/
@Override
public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
// 检查本地数据库,判断事务是否成功
String orderId = msg.getHeaders().get("orderId", String.class);
boolean exists = orderService.exists(orderId);

return exists ? RocketMQLocalTransactionState.COMMIT
: RocketMQLocalTransactionState.ROLLBACK;
}
}

事务消息流程:

1
2
3
4
5
6
1. 生产者发送 Half 消息(半消息,对消费者不可见)
2. RocketMQ Broker 存储 Half 消息
3. 生产者执行本地事务
4. 本地事务成功 → 发送 Commit → 消费者可以消费
本地事务失败 → 发送 Rollback → 消息被删除
超时或未知 → Broker 回查本地事务状态

2. 延迟消息

RocketMQ 支持 18 个固定的延迟级别

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
@Service
public class DelayMessageService {
@Autowired
private RocketMQTemplate rocketMQTemplate;

/**
* 发送延迟消息
* 延迟级别: 1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
* 对应级别: 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
*/
public void sendDelayMessage(String message, int delayLevel) {
rocketMQTemplate.syncSend("delay-topic"
MessageBuilder.withPayload(message).build(),
3000// 发送超时时间
delayLevel // 延迟级别
);
}

// 示例:订单超时自动取消
public void createOrder(Order order) {
// 保存订单
orderRepository.save(order);

// 发送30分钟延迟消息(级别16)
sendDelayMessage(order.getId(), 16);
}
}

@Component
@RocketMQMessageListener(
topic = "delay-topic",
consumerGroup = "delay-consumer"
)
public class DelayMessageConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String orderId) {
// 检查订单状态
Order order = orderService.getById(orderId);

if (order.getStatus() == OrderStatus.UNPAID) {
// 订单超时未支付,自动取消
orderService.cancelOrder(orderId);
}
}
}

3. 消息过滤

RocketMQ 支持 Tag 过滤SQL92 过滤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// Tag 过滤
@Service
public class TagFilterProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendMessage(Order order) {
String tag = order.getType(); // VIP、NORMAL、URGENT

rocketMQTemplate.syncSend(
"order-topic:" + tag, // topic:tag
order
);
}
}

// 消费者只消费特定 Tag 的消息
@RocketMQMessageListener(
topic = "order-topic",
selectorExpression = "VIP || URGENT", // 只消费 VIP 和 URGENT 订单
consumerGroup = "vip-consumer"
)
public class VipOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("处理优先级订单: {}", order);
}
}

// SQL92 过滤(需要在 Broker 配置中开启)
@RocketMQMessageListener(
topic = "order-topic",
selectorType = SelectorType.SQL92,
selectorExpression = "amount > 1000 AND region = 'Beijing'",
consumerGroup = "high-value-consumer"
)
public class HighValueOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("处理高价值订单: {}", order);
}
}

4. 顺序消息

RocketMQ 支持全局顺序分区顺序

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// 分区顺序消息(同一个订单的消息保证顺序)
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendOrderEvent(String orderId, String event) {
// 使用 syncSendOrderly,相同 orderId 的消息会发送到同一队列
rocketMQTemplate.syncSendOrderly(
"order-topic"
event,
orderId // hashKey,相同 hashKey 发送到同一队列
);
}
}

// 顺序消费
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer",
consumeMode = ConsumeMode.ORDERLY // 顺序消费
)
public class OrderConsumer implements RocketMQListener<String> {
@Override
public void onMessage(String event) {
log.info("按顺序处理订单事件: {}", event);
}
}

使用场景

  1. 金融交易: 对可靠性要求极高的场景
  2. 分布式事务: 需要保证最终一致性
  3. 订单系统: 需要顺序消息
  4. 延迟任务: 订单超时、定时提醒
  5. 核心业务: 对消息不能丢失有严格要求

Spring Boot 集成示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
// 1. 添加依赖
// pom.xml
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-spring-boot-starter</artifactId>
<version>2.2.3</version>
</dependency>

// 2. 配置
// application.yml
rocketmq:
name-server: localhost:9876
producer:
group: order-producer-group
send-message-timeout: 3000
retry-times-when-send-failed: 3
consumer:
group: order-consumer-group

// 3. 生产者
@Service
public class OrderProducer {
@Autowired
private RocketMQTemplate rocketMQTemplate;

public void sendOrder(Order order) {
// 同步发送
SendResult result = rocketMQTemplate.syncSend("order-topic", order);
log.info("消息发送结果: {}", result.getSendStatus());

// 异步发送
rocketMQTemplate.asyncSend("order-topic", order, new SendCallback() {
@Override
public void onSuccess(SendResult sendResult) {
log.info("消息发送成功");
}

@Override
public void onException(Throwable e) {
log.error("消息发送失败", e);
}
});

// 单向发送(不关心结果,最快)
rocketMQTemplate.sendOneWay("order-topic", order);
}
}

// 4. 消费者
@Component
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
messageModel = MessageModel.CLUSTERING // 集群模式(默认)
)
public class OrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
log.info("收到订单: {}", order);
orderService.processOrder(order);
}
}

// 并发消费配置
@RocketMQMessageListener(
topic = "order-topic",
consumerGroup = "order-consumer-group",
consumeThreadMax = 20 // 最大消费线程数
)
public class ConcurrentOrderConsumer implements RocketMQListener<Order> {
@Override
public void onMessage(Order order) {
// 处理消息
}
}

优缺点总结

优点:

  • 高可靠性(事务消息、消息追踪)
  • 高性能(十万级/秒)
  • 功能丰富(延迟消息、顺序消息、过滤等)
  • 运维友好(可视化控制台)
  • 专为金融级业务设计

缺点:

  • 社区相对小(但在国内很活跃)
  • 客户端主要支持 Java
  • 学习成本较高
  • 某些功能需要特定配置才能使用

其他知名MQ

ActiveMQ - 老牌 JMS 实现

ActiveMQ 是 Apache 出品的老牌消息中间件,完全支持 JMS 1.1 规范。

特点:

  • 完全支持 JMS 规范
  • 易于使用,上手快
  • 支持多种协议(OpenWire、STOMP、AMQP、MQTT 等)
  • 有官方管理界面

适用场景:

  • 中小型项目
  • 对 JMS 规范有要求的项目
  • 传统企业应用

不足:

  • 性能一般(万级/秒)
  • 社区不够活跃
  • 高可用方案不够完善
  • 逐渐被新一代 MQ 替代

Pulsar - 云原生时代的新星

Pulsar 是 Apache 顶级项目,由 Yahoo 开发,定位为统一消息和流处理平台

核心特性:

  1. 计算与存储分离: Broker(计算层) + BookKeeper(存储层)
  2. 多租户: 原生支持租户隔离
  3. 多层存储: 支持热数据和冷数据分层存储
  4. 地理复制: 跨数据中心复制
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// Pulsar 使用示例
@Service
public class PulsarService {
@Autowired
private PulsarTemplate<Order> pulsarTemplate;

// 发送消息
public void sendOrder(Order order) {
pulsarTemplate.send("persistent://public/default/order-topic", order);
}

// 消费消息
@PulsarListener(
topics = "persistent://public/default/order-topic",
subscriptionName = "order-subscription"
)
public void consume(Order order) {
log.info("收到订单: {}", order);
}
}

优势:

  • 超高吞吐量(百万级/秒)
  • 计算存储分离,扩展性好
  • 多租户支持
  • 统一流批处理

不足:

  • 架构复杂,运维成本高
  • 社区相对较小
  • 国内生态不够完善

如何选择合适的MQ?

选型决策树

1
2
3
4
5
6
7
8
9
10
11
开始

是否需要超高吞吐量(百万级/秒)?
├─ 是 → 日志收集/大数据场景?
│ ├─ 是 → Kafka
│ └─ 否 → RocketMQ 或 Pulsar
└─ 否 → 是否需要复杂路由和延迟消息?
├─ 是 → RabbitMQ
└─ 否 → 是否需要金融级可靠性?
├─ 是 → RocketMQ
└─ 否 → RabbitMQ 或 ActiveMQ

场景推荐

场景 推荐MQ 理由
日志收集 Kafka 超高吞吐量,支持流处理
用户行为追踪 Kafka 海量数据,需要回溯
订单系统 RocketMQ 事务消息,顺序消息
支付系统 RocketMQ 金融级可靠性
任务队列 RabbitMQ 功能全面,易用性高
延迟任务 RabbitMQ/RocketMQ 都支持延迟消息
实时推荐 Kafka + Flink 流处理能力强
IoT RabbitMQ/Pulsar 支持 MQTT 协议
中小型项目 RabbitMQ 简单易用,功能全面
传统企业 ActiveMQ 支持 JMS 规范

技术选型考量因素

  1. 业务需求:

    • 吞吐量要求(QPS)
    • 延迟要求
    • 消息顺序
    • 可靠性要求
  2. 团队因素:

    • 团队技术栈
    • 学习成本
    • 运维能力
  3. 生态因素:

    • 社区活跃度
    • 文档完善度
    • 客户端支持
    • 监控工具
  4. 成本因素:

    • 服务器资源
    • 开发成本
    • 运维成本

消息队列的最佳实践

1. 消息幂等性

由于网络抖动、重试等原因,消息可能会被重复投递,消费者需要保证幂等性

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;

@KafkaListener(topics = "order-topic")
public void consume(Order order) {
String messageId = order.getMessageId();
String key = "consumed:" + messageId;

// 使用 Redis SETNX 实现幂等
Boolean success = redisTemplate.opsForValue()
.setIfAbsent(key, "1", Duration.ofDays(7));

if (Boolean.TRUE.equals(success)) {
// 第一次消费,执行业务逻辑
orderService.processOrder(order);
} else {
// 重复消息,忽略
log.warn("重复消息: {}", messageId);
}
}
}

2. 消息顺序性

如果需要保证顺序,应该:

  • Kafka: 相同 key 的消息发送到同一分区,消费者单线程消费
  • RabbitMQ: 使用单队列,消费者单线程消费
  • RocketMQ: 使用顺序消息 API
1
2
3
4
5
// Kafka 保证顺序
producer.send(new ProducerRecord<>("topic", userId, message)); // 相同 userId 进入同一分区

// RocketMQ 保证顺序
rocketMQTemplate.syncSendOrderly("topic", message, userId); // 相同 userId 进入同一队列

3. 消息丢失防范

生产者端:

  • 同步发送或使用回调确认
  • 开启事务或 acks=all(Kafka)

Broker 端:

  • 消息持久化
  • 主从复制

消费者端:

  • 手动确认消费
  • 处理成功后再提交 offset
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 生产者确认
kafkaTemplate.send("topic", message).addCallback(
result -> log.info("发送成功"),
ex -> {
log.error("发送失败,存入数据库重试", ex);
saveToRetryTable(message);
}
);

// 消费者手动确认
@KafkaListener(topics = "topic")
public void consume(Order order, Acknowledgment ack) {
try {
orderService.processOrder(order);
ack.acknowledge(); // 处理成功后才确认
} catch (Exception e) {
log.error("处理失败,等待重试", e);
// 不确认,消息会重新投递
}
}

4. 消息积压处理

消息积压通常由于消费速度 < 生产速度导致。

解决方案:

  1. 增加消费者数量(最快):

    1
    2
    3
    4
    5
    6
    7
    // 增加消费者实例(部署多台服务器)
    // 或增加消费线程
    @RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-group",
    consumeThreadMax = 64 // 增加消费线程
    )
  2. 批量消费:

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @RocketMQMessageListener(
    topic = "order-topic",
    consumerGroup = "order-group",
    consumeMode = ConsumeMode.CONCURRENTLY,
    messageModel = MessageModel.CLUSTERING,
    consumeThreadMax = 20
    )
    public class BatchConsumer implements RocketMQListener<List<Order>> {
    @Override
    public void onMessage(List<Order> orders) {
    // 批量处理
    orderService.batchProcess(orders);
    }
    }
  3. 优化消费逻辑:

  • 异步化耗时操作
  • 缓存热点数据
  • 使用批量接口
  1. 临时扩容队列:
  • 创建新队列分流
  • 等积压处理完再恢复

5. 监控告警

关键指标:

  • 消息生产速率
  • 消息消费速率
  • 消息积压数量
  • 消费延迟
  • 消费失败率
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// Spring Boot Actuator + Prometheus 监控
@Component
public class MqMetrics {
private final MeterRegistry registry;

public MqMetrics(MeterRegistry registry) {
this.registry = registry;
}

public void recordMessageSent(String topic) {
registry.counter("mq.message.sent""topic", topic).increment();
}

public void recordMessageConsumed(String topic, long duration) {
registry.counter("mq.message.consumed""topic", topic).increment();
registry.timer("mq.consume.duration""topic", topic)
.record(Duration.ofMillis(duration));
}
}

总结

消息队列是分布式系统中的重要组件,通过异步解耦提升系统性能和可维护性。

核心要点

  1. Kafka: 大数据、日志收集、流处理的首选,超高吞吐量
  2. RabbitMQ: 功能最全面的通用MQ,易用性高,适合业务消息
  3. RocketMQ: 金融级可靠性,事务消息,适合核心业务
  4. ActiveMQ: 老牌 JMS 实现,适合中小型传统项目
  5. Pulsar: 云原生新星,计算存储分离,适合统一消息平台

选型建议

  • 日志收集、用户行为追踪: Kafka
  • 订单、支付等核心业务: RocketMQ
  • 任务队列、延迟任务: RabbitMQ
  • 中小型项目: RabbitMQ 或 ActiveMQ
  • 云原生架构: Pulsar

最佳实践

  1. 保证消息幂等性(Redis + 消息ID)
  2. 合理使用顺序消息
  3. 多层确认防止消息丢失
  4. 监控告警及时发现问题
  5. 做好限流和降级预案

选择合适的消息队列,需要综合考虑业务需求、团队能力、成本等因素。在实际项目中,也可以混合使用多种MQ,发挥各自优势。


参考资料