引言:为什么数据一致性这么难?

在单体应用时代,保证数据一致性很简单:一个数据库,一个事务,搞定!

1
2
3
4
5
6
7
// 单体应用的美好时光
@Transactional
public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) {
accountDao.deduct(fromAccount, amount); // 扣款
accountDao.add(toAccount, amount); // 加款
// 要么全成功,要么全失败 - ACID保证!
}

但当我们拆分成微服务后,一切都变了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
// 微服务的噩梦开始了...
public void createOrder(OrderRequest request) {
// 调用订单服务 - 创建订单
Order order = orderService.create(request);

// 调用库存服务 - 扣减库存
inventoryService.deduct(request.getItems());

// 调用支付服务 - 扣减余额
paymentService.pay(order.getTotalAmount());

// 如果支付失败了,前面两步怎么办?
// 如果网络超时了,到底成功没有?
// 如果服务宕机了,数据会不会不一致?
}

核心问题:

  • 跨服务操作:每个服务有自己的数据库,无法使用传统的ACID事务
  • 网络不可靠:服务调用可能失败、超时、重复
  • 部分失败:某些操作成功,某些操作失败,如何回滚?
  • 高并发场景:如何在保证一致性的同时,不牺牲太多性能?

本文将从理论到实战,深入讲解分布式微服务下的数据一致性解决方案。

一、理解分布式数据一致性

1.1 CAP定理:不可能三角

在分布式系统中,有一个著名的CAP定理:

  • C (Consistency): 一致性 - 所有节点同一时间看到相同的数据
  • A (Availability): 可用性 - 每个请求都能得到响应(成功或失败)
  • P (Partition Tolerance): 分区容错性 - 网络分区时系统仍能运行

CAP定理核心:一个分布式系统最多只能同时满足其中两项

1
2
3
4
5
6
7
8
9
10
11
12
13
14
        一致性(C)
/\
/ \
/ \
/ \
/ \
/ CP \
/ \
/ \
/ CAP? \
/ \
/ AP \
/________________________\
可用性(A) 分区容错(P)

在网络分区(P)必然存在的现实下,我们只能在C和A之间权衡:

  • CP系统:牺牲可用性,保证一致性

    • 例:传统关系型数据库(MySQL、PostgreSQL)
    • 场景:金融交易、订单系统
  • AP系统:牺牲一致性,保证可用性

    • 例:NoSQL数据库(Cassandra、DynamoDB)
    • 场景:社交媒体的点赞数、浏览量统计

1.2 BASE理论:现实的妥协

由于CAP定理的限制,实际工程中我们采用BASE理论:

  • BA (Basically Available): 基本可用 - 系统保证基本可用,允许损失部分可用性
  • S (Soft state): 软状态 - 允许系统中的数据存在中间状态
  • E (Eventually Consistent): 最终一致性 - 经过一段时间后,数据最终达到一致

BASE vs ACID:

特性 ACID(单体) BASE(分布式)
一致性 强一致性 最终一致性
隔离性 严格隔离 允许中间状态
可用性 可能阻塞 高可用
适用场景 单一数据库 分布式系统

1.3 一致性模型分类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
强一致性
├── 线性一致性 (Linearizability)
│ └── 最严格,任何读操作都能读到最新写入的数据
│ 示例:单机数据库的事务

└── 顺序一致性 (Sequential Consistency)
└── 所有进程看到的操作顺序相同
示例:分布式锁

弱一致性
├── 会话一致性 (Session Consistency)
│ └── 同一会话内保证一致性
│ 示例:购物车服务 - 用户自己能看到刚加的商品

├── 因果一致性 (Causal Consistency)
│ └── 有因果关系的操作保证顺序
│ 示例:社交媒体评论 - 先发帖后评论

└── 最终一致性 (Eventual Consistency)
└── 一段时间后达到一致
示例:DNS、缓存同步

二、分布式事务解决方案

2.1 两阶段提交(2PC)

2PC是最经典的分布式事务协议,分为准备阶段和提交阶段。

工作流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
协调者(Coordinator)                     参与者(Participants)
[订单服务] [库存服务] [支付服务]
| | | |
|------- 1.准备阶段(Prepare) ---->| | |
| "你们能执行事务吗?" | | |
| | | |
|<------ 投票(Vote) --------------| | |
| "我准备好了!" | | |
|<--------------------------------| | |
| "我也准备好了!" | |
|<--------------------------------------------| |
| "我也OK!" |
|<--------------------------------------------------------|
| |
|------- 2.提交阶段(Commit) ------------------------>全部|
| "所有人提交!" |
| | | |
|<------ 确认(ACK) ---------------| | |
|<--------------------------------| | |
|<--------------------------------------------| |

核心伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// 协调者
public TransactionResult executeTransaction(TransactionRequest request) {
String transactionId = generateId();
List<TransactionParticipant> participants = new ArrayList<>();

try {
// ========== 阶段1: 准备阶段 ==========
for (TransactionParticipant service : Arrays.asList(orderService, inventoryService, paymentService)) {
if (!service.prepare(transactionId, request)) {
throw new RuntimeException("准备失败");
}
participants.add(service);
}

// ========== 阶段2: 提交阶段 ==========
for (TransactionParticipant participant : participants) {
participant.commit(transactionId);
}

return TransactionResult.SUCCESS;

} catch (Exception e) {
// 任何失败都要回滚
for (TransactionParticipant participant : participants) {
participant.rollback(transactionId);
}
return TransactionResult.FAILURE;
}
}

// 参与者(以订单服务为例)
public boolean prepare(String transactionId, OrderData data) {
// 1. 验证数据
if (!valid(data)) {
return false;
}

// 2. 预留资源(但不提交)
Order tempOrder = createTempOrder(data);
cache.put(transactionId, tempOrder);

return true;
}

public void commit(String transactionId) {
Order tempOrder = cache.get(transactionId);
database.save(tempOrder); // 持久化
cache.remove(transactionId);
}

public void rollback(String transactionId) {
cache.remove(transactionId); // 清理临时数据
}

2PC的优缺点

优点:

  • 强一致性保证
  • 逻辑清晰,易于理解

缺点:

  1. 同步阻塞: 所有参与者在准备阶段后会阻塞等待协调者的指令
  2. 单点故障: 协调者宕机会导致所有参与者一直阻塞
  3. 数据不一致风险: 如果协调者在发送commit消息时宕机,部分参与者收到commit,部分没收到
  4. 性能差: 需要多次网络通信,且所有参与者需要等待最慢的那个

适用场景: 对一致性要求极高,但并发量不大的场景,如金融核心交易。

2.2 三阶段提交(3PC)

3PC是2PC的改进版本,增加了超时机制和PreCommit阶段。

工作流程

1
2
3
4
5
6
7
8
9
10
11
12
阶段1: CanCommit (询问阶段)
协调者询问: "你能执行事务吗?"
参与者回复: "可以" 或 "不可以"

阶段2: PreCommit (预提交阶段)
协调者发送: "准备提交"
参与者写redo/undo日志,但不提交
参与者回复: "准备完成"

阶段3: DoCommit (提交阶段)
协调者发送: "提交" 或 "回滚"
参与者执行提交或回滚

3PC vs 2PC:

  • 3PC增加了超时机制,减少阻塞
  • 参与者在超时后可以自动提交,降低单点故障影响
  • 但仍然存在数据不一致的风险

2.3 TCC (Try-Confirm-Cancel)

TCC是一种补偿型事务,将业务操作拆分成三个阶段。

核心思想

1
2
3
Try阶段:     尝试执行,预留资源
Confirm阶段: 确认执行,使用预留的资源
Cancel阶段: 取消执行,释放预留的资源

实战案例:电商下单

场景:用户下单,需要扣减库存、扣减余额、创建订单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// TCC事务管理器(伪代码)
public TransactionResult executeTransaction(OrderRequest request) {
String transactionId = generateId();

try {
// ========== Try 阶段 ==========
Long orderId = orderService.tryCreateOrder(transactionId, request);

if (!inventoryService.tryReserveInventory(transactionId, request.getItems())) {
throw new RuntimeException("库存不足");
}

if (!accountService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount())) {
throw new RuntimeException("余额不足");
}

// ========== Confirm 阶段 ==========
orderService.confirmCreateOrder(transactionId);
inventoryService.confirmReserveInventory(transactionId);
accountService.confirmReserveBalance(transactionId);

return TransactionResult.SUCCESS;

} catch (Exception e) {
// ========== Cancel 阶段 ==========
orderService.cancelCreateOrder(transactionId);
inventoryService.cancelReserveInventory(transactionId);
accountService.cancelReserveBalance(transactionId);

return TransactionResult.FAILURE;
}
}

// 库存服务TCC实现(伪代码)
public boolean tryReserveInventory(String transactionId, List<OrderItem> items) {
for (OrderItem item : items) {
Inventory inventory = getInventory(item.getProductId());

// 检查库存
if (inventory.getAvailable() < item.getQuantity()) {
return false;
}

// 冻结库存(减少可用,增加冻结)
inventory.setAvailable(inventory.getAvailable() - item.getQuantity());
inventory.setFrozen(inventory.getFrozen() + item.getQuantity());
updateInventory(inventory);

// 记录冻结记录
saveFrozenRecord(transactionId, item.getProductId(), item.getQuantity(), "FROZEN");
}

return true;
}

public void confirmReserveInventory(String transactionId) {
List<FrozenRecord> frozenRecords = getFrozenRecords(transactionId);

for (FrozenRecord record : frozenRecords) {
Inventory inventory = getInventory(record.getProductId());

// 真正扣减总库存
inventory.setTotal(inventory.getTotal() - record.getQuantity());
inventory.setFrozen(inventory.getFrozen() - record.getQuantity());
updateInventory(inventory);

// 更新记录状态
updateRecordStatus(record, "CONFIRMED");
}
}

public void cancelReserveInventory(String transactionId) {
List<FrozenRecord> frozenRecords = getFrozenRecords(transactionId);

for (FrozenRecord record : frozenRecords) {
if ("CANCELLED".equals(record.getStatus())) {
continue; // 防止重复取消
}

Inventory inventory = getInventory(record.getProductId());

// 恢复可用库存
inventory.setAvailable(inventory.getAvailable() + record.getQuantity());
inventory.setFrozen(inventory.getFrozen() - record.getQuantity());
updateInventory(inventory);

// 更新记录状态
updateRecordStatus(record, "CANCELLED");
}
}

数据库设计

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
-- 库存表
CREATE TABLE inventory (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
product_id BIGINT NOT NULL
total_quantity INT NOT NULL COMMENT '总库存'
available_quantity INT NOT NULL COMMENT '可用库存'
frozen_quantity INT NOT NULL COMMENT '冻结库存'
INDEX idx_product_id (product_id)
);

-- 冻结库存记录表
CREATE TABLE frozen_inventory (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
transaction_id VARCHAR(64) NOT NULL COMMENT '事务ID'
product_id BIGINT NOT NULL
quantity INT NOT NULL
status VARCHAR(20) NOT NULL COMMENT 'FROZEN/CONFIRMED/CANCELLED'
create_time DATETIME NOT NULL
INDEX idx_transaction_id (transaction_id)
);

TCC的关键要点

1. 资源预留
Try阶段必须预留资源(如冻结库存、冻结余额),而不是直接操作。

2. 幂等性
Confirm和Cancel操作必须支持幂等,因为可能会重复调用。

1
2
3
4
5
6
7
8
9
10
11
// 幂等性实现
public void confirmDeductInventory(String transactionId) {
FrozenRecord record = getFrozenRecord(transactionId);

// 检查状态,防止重复确认
if ("CONFIRMED".equals(record.getStatus())) {
return; // 已处理过,直接返回
}

// 执行确认操作...
}

3. 空回滚
如果Try阶段失败,没有执行成功,Cancel阶段也要能正常处理。

1
2
3
4
5
6
7
8
9
10
public void cancelDeductInventory(String transactionId) {
FrozenRecord record = getFrozenRecord(transactionId);

// 空回滚: Try阶段没有执行成功,没有冻结记录
if (record == null) {
return; // 直接返回
}

// 执行取消操作...
}

4. 悬挂处理
Cancel先于Try执行的情况(网络延迟导致)。

1
2
3
4
5
6
7
8
9
public boolean tryDeductInventory(String transactionId, List<OrderItem> items) {
// 检查是否已经执行过Cancel
FrozenRecord record = getFrozenRecord(transactionId);
if (record != null && "CANCELLED".equals(record.getStatus())) {
return false; // 拒绝Try
}

// 执行Try操作...
}

TCC的优缺点

优点:

  • 不需要锁定资源,性能较好
  • 没有阻塞,各服务可以并发执行
  • 适合长事务场景

缺点:

  • 业务侵入性强,需要实现Try、Confirm、Cancel三个接口
  • 开发成本高,需要处理幂等、空回滚、悬挂等问题
  • 一致性较弱,属于最终一致性

适用场景:

  • 对性能要求较高的场景
  • 需要跨多个服务的复杂业务
  • 可以接受最终一致性的场景

开源框架:

  • Seata(阿里开源)
  • ByteTCC
  • TCC-Transaction

2.4 Saga模式

Saga是一种长事务解决方案,将一个分布式事务拆分成多个本地事务,每个本地事务都有对应的补偿事务。

两种实现方式

1. 基于事件的编排(Choreography)
每个服务在完成本地事务后,发布事件触发下一个服务。

1
2
3
4
5
订单服务 --[订单创建事件]--> 库存服务 --[库存扣减事件]--> 支付服务
| | |
|<----[支付失败事件]-------------|<----[支付失败事件]---------|
| | |
回滚订单 回滚库存 X

2. 基于协调的编排(Orchestration)
由一个协调器(Orchestrator)负责调度各个服务。

1
2
3
4
5
          Saga 协调器
|
/---------+---------\
| | |
订单服务 库存服务 支付服务

Saga核心伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// Saga协调器(基于协调模式)
public TransactionResult executeSaga(OrderRequest request) {
String sagaId = generateId();
List<CompletedStep> completedSteps = new ArrayList<>();

try {
// 步骤1: 创建订单
Order order = orderService.createOrder(request);
completedSteps.add(new CompletedStep("CREATE_ORDER", order.getId()));

// 步骤2: 扣减库存
inventoryService.deductInventory(request.getItems());
completedSteps.add(new CompletedStep("DEDUCT_INVENTORY", request.getItems()));

// 步骤3: 执行支付
paymentService.pay(order.getUserId(), order.getAmount());
completedSteps.add(new CompletedStep("PAYMENT", order.getAmount()));

return TransactionResult.SUCCESS;

} catch (Exception e) {
// 补偿操作(按照相反的顺序回滚)
compensate(completedSteps);
return TransactionResult.FAILURE;
}
}

public void compensate(List<CompletedStep> completedSteps) {
// 反向遍历
Collections.reverse(completedSteps);
for (CompletedStep step : completedSteps) {
try {
switch (step.getName()) {
case "PAYMENT":
paymentService.refund(step.getData());
break;
case "DEDUCT_INVENTORY":
inventoryService.restoreInventory(step.getData());
break;
case "CREATE_ORDER":
orderService.cancelOrder(step.getData());
break;
}
} catch (Exception e) {
// 补偿失败,记录日志,人工介入
logError("补偿失败: " + step.getName());
}
}
}

基于事件的Saga实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// 事件驱动的Saga(伪代码)

// 订单服务监听 - 用户下单
@EventListener
public void onUserPlaceOrderEvent(UserPlaceOrderEvent event) {
Order order = createOrder(event.getRequest());
eventPublisher.publish(new OrderCreatedEvent(order));
}

// 库存服务监听 - 订单创建成功
@EventListener
public void onOrderCreatedEvent(OrderCreatedEvent event) {
try {
deductInventory(event.getOrder().getItems());
eventPublisher.publish(new InventoryDeductedEvent(event.getOrder()));
} catch (Exception e) {
eventPublisher.publish(new InventoryDeductionFailedEvent(event.getOrder()));
}
}

// 支付服务监听 - 库存扣减成功
@EventListener
public void onInventoryDeductedEvent(InventoryDeductedEvent event) {
try {
pay(event.getOrder());
eventPublisher.publish(new PaymentSucceededEvent(event.getOrder()));
} catch (Exception e) {
eventPublisher.publish(new PaymentFailedEvent(event.getOrder()));
}
}

// 库存服务监听 - 支付失败(补偿)
@EventListener
public void onPaymentFailedEvent(PaymentFailedEvent event) {
restoreInventory(event.getOrder().getItems());
eventPublisher.publish(new InventoryRestoredEvent(event.getOrder()));
}

// 订单服务监听 - 库存恢复成功(补偿)
@EventListener
public void onInventoryRestoredEvent(InventoryRestoredEvent event) {
cancelOrder(event.getOrder().getId());
}

Saga的优缺点

优点:

  • 适合长事务场景
  • 无需锁定资源,性能好
  • 参与方可以异步执行
  • 业务侵入性相对TCC较低

缺点:

  • 不保证隔离性,可能出现中间状态
  • 补偿逻辑复杂,需要仔细设计
  • 事件编排模式下,服务间依赖关系复杂

Saga vs TCC:

特性 Saga TCC
事务类型 正向事务+补偿事务 Try+Confirm+Cancel
资源锁定 无需锁定 需要预留资源
隔离性 较弱 较强
性能 较好 一般
业务侵入 中等 较高
适用场景 长事务 短事务

适用场景:

  • 业务流程较长的场景(如订单流程)
  • 对性能要求高,可以接受短暂的中间状态
  • 各个步骤可以定义明确的补偿操作

开源框架:

  • Apache ServiceComb Pack
  • Eventuate Tram Saga

2.5 本地消息表

本地消息表是一种基于消息的最终一致性方案。

核心思想

在本地事务中,同时更新业务数据和消息表,通过定时任务将消息发送到消息队列。

1
2
3
4
5
6
7
8
9
10
11
12
1. 本地事务中:
- 执行业务操作(如创建订单)
- 插入消息到本地消息表

2. 定时任务:
- 扫描未发送的消息
- 发送到消息队列
- 标记消息为已发送

3. 消费者:
- 消费消息
- 执行业务操作(如扣减库存)

核心伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// 订单服务 - 使用本地消息表
@Transactional
public void createOrder(OrderRequest request) {
// 1. 创建订单
Order order = new Order(request);
database.save(order);

// 2. 插入本地消息表(在同一个本地事务中)
LocalMessage message = new LocalMessage();
message.setTopic("order.created");
message.setContent(toJSON(order));
message.setStatus("PENDING"); // 未发送
database.save(message);

// 本地事务提交,保证订单和消息要么都成功,要么都失败
}

// 消息发送定时任务
@Scheduled(fixedDelay = 5000) // 每隔5秒执行一次
public void sendPendingMessages() {
// 1. 查询未发送的消息
List<LocalMessage> pendingMessages = database.query(
"SELECT * FROM local_message WHERE status='PENDING' AND retry_count<3"
);

for (LocalMessage message : pendingMessages) {
try {
// 2. 发送到Kafka
kafka.send(message.getTopic(), message.getContent());

// 3. 更新消息状态为已发送
message.setStatus("SENT");
message.setSendTime(now());
database.update(message);

} catch (Exception e) {
// 4. 发送失败,增加重试次数
message.setRetryCount(message.getRetryCount() + 1);
if (message.getRetryCount() >= 3) {
message.setStatus("FAILED"); // 标记为失败
}
database.update(message);
}
}
}

// 库存服务 - 消费订单创建消息
@KafkaListener(topic = "order.created")
public void handleOrderCreated(String message) {
Order order = fromJSON(message);

// 幂等性检查
if (hasProcessed(order.getId())) {
return; // 已处理过,直接返回
}

// 扣减库存
inventoryService.deductInventory(order.getItems());

// 记录消费记录
saveConsumeRecord(order.getId());
}

本地消息表的关键点

1. 本地事务保证
业务操作和消息插入在同一个本地事务中,保证要么都成功,要么都失败。

2. 定时扫描
通过定时任务扫描未发送的消息,确保消息一定会被发送。

3. 幂等性
消费者必须保证幂等性,因为可能会收到重复的消息。

4. 消息可靠性

  • 定时任务保证消息最终一定会发送
  • 重试机制处理发送失败
  • 超过最大重试次数的消息需要人工介入

本地消息表的优缺点

优点:

  • 实现简单,容易理解
  • 不依赖第三方分布式事务框架
  • 消息一定会被发送(定时扫描保证)

缺点:

  • 对业务有侵入,需要创建消息表
  • 需要定时任务,有一定延迟
  • 需要消费者保证幂等性

适用场景:

  • 对一致性要求不高,可以接受短暂延迟
  • 希望实现简单,不依赖复杂框架
  • 消息量不是特别大的场景

2.6 MQ事务消息

部分消息队列(如RocketMQ)提供了事务消息功能,可以保证本地事务和消息发送的一致性。

RocketMQ事务消息原理

1
2
3
4
5
6
7
1. 生产者发送半事务消息(Half Message)到MQ
2. MQ收到消息,暂不投递给消费者
3. 生产者执行本地事务
4. 生产者向MQ确认本地事务状态:
- Commit: 消息可以被消费
- Rollback: 删除消息
5. 如果MQ没收到确认,会回查生产者的本地事务状态

核心伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// 订单服务 - 使用RocketMQ事务消息
public TransactionSendResult createOrder(OrderRequest request) {
// 1. 发送半事务消息
Message message = new Message("order_topic", toJSON(request));

// 2. 发送事务消息,触发本地事务执行
TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(message, request);

return result;
}

// 事务监听器
class TransactionListenerImpl implements TransactionListener {
// 执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object arg) {
try {
// 执行本地事务:创建订单
OrderRequest request = (OrderRequest) arg;
Order order = new Order(request);
database.save(order);

// 本地事务成功,提交消息
return LocalTransactionState.COMMIT_MESSAGE;

} catch (Exception e) {
// 本地事务失败,回滚消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}

// 检查本地事务状态(MQ回查)
@Override
public LocalTransactionState checkLocalTransaction(MessageExt message) {
String orderId = message.getKeys();

// 检查订单是否存在
Order order = database.findById(orderId);

if (order != null) {
return LocalTransactionState.COMMIT_MESSAGE; // 订单存在,提交消息
} else {
return LocalTransactionState.ROLLBACK_MESSAGE; // 订单不存在,回滚消息
}
}
}

// 库存服务 - 消费订单创建消息
@RocketMQMessageListener(topic = "order_topic")
public void onMessage(String message) {
OrderRequest request = fromJSON(message);

// 扣减库存
inventoryService.deductInventory(request.getItems());
}

MQ事务消息的优缺点

优点:

  • 保证本地事务和消息发送的一致性
  • 无需本地消息表,实现简洁
  • 消息队列提供回查机制,可靠性高

缺点:

  • 依赖特定的消息队列(RocketMQ)
  • 需要实现事务回查接口
  • 有一定的性能开销

适用场景:

  • 使用RocketMQ的项目
  • 对一致性要求高,希望实现简洁
  • 需要可靠的消息投递保证

三、实战:电商下单场景的完整解决方案

3.1 业务场景分析

电商下单是典型的分布式事务场景:

1
2
3
4
5
用户下单 ──> 创建订单
├──> 扣减库存
├──> 扣减余额
├──> 创建支付单
└──> 发送通知

挑战:

  • 多个服务调用,任何一个失败都要回滚
  • 高并发场景,性能要求高
  • 用户体验要好,不能让用户等太久

3.2 方案选型

根据业务特点,我们采用TCC + 本地消息表的混合方案:

服务 方案 原因
订单服务 TCC 需要快速创建订单
库存服务 TCC 需要实时扣减库存
支付服务 TCC 需要实时扣减余额
积分服务 本地消息表 可以异步处理
通知服务 本地消息表 可以异步处理

3.3 完整流程伪代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
// 订单编排服务
public OrderResult placeOrder(OrderRequest request) {
String transactionId = generateId();
List<String> completedSteps = new ArrayList<>();

try {
// ========== TCC Try 阶段(核心服务) ==========

// 1. 订单服务 Try
Long orderId = orderTccService.tryCreateOrder(transactionId, request);
completedSteps.add("ORDER");

// 2. 库存服务 Try
if (!inventoryTccService.tryReserveInventory(transactionId, request.getItems())) {
throw new RuntimeException("库存不足");
}
completedSteps.add("INVENTORY");

// 3. 支付服务 Try
if (!paymentTccService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount())) {
throw new RuntimeException("余额不足");
}
completedSteps.add("PAYMENT");

// ========== TCC Confirm 阶段 ==========

orderTccService.confirmCreateOrder(transactionId);
inventoryTccService.confirmReserveInventory(transactionId);
paymentTccService.confirmReserveBalance(transactionId);

// ========== 异步操作(本地消息表) ==========

// 4. 发送积分增加消息
localMessageService.sendMessage("points.add"new PointsAddEvent(request.getUserId(), points));

// 5. 发送订单通知消息
localMessageService.sendMessage("order.notification"new OrderNotificationEvent(orderId));

return OrderResult.success(orderId);

} catch (Exception e) {
// ========== TCC Cancel 阶段 ==========

// 回滚已完成的步骤
if (completedSteps.contains("PAYMENT")) {
paymentTccService.cancelReserveBalance(transactionId);
}
if (completedSteps.contains("INVENTORY")) {
inventoryTccService.cancelReserveInventory(transactionId);
}
if (completedSteps.contains("ORDER")) {
orderTccService.cancelCreateOrder(transactionId);
}

return OrderResult.failure(e.getMessage());
}
}

3.4 性能优化

1. 并行Try

将多个Try操作并行执行,减少总耗时:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public OrderResult placeOrder(OrderRequest request) {
String transactionId = generateId();

// 并行执行Try
CompletableFuture<Long> orderFuture = CompletableFuture.supplyAsync(
() -> orderTccService.tryCreateOrder(transactionId, request)
);
CompletableFuture<Boolean> inventoryFuture = CompletableFuture.supplyAsync(
() -> inventoryTccService.tryReserveInventory(transactionId, request.getItems())
);
CompletableFuture<Boolean> paymentFuture = CompletableFuture.supplyAsync(
() -> paymentTccService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount())
);

// 等待所有Try完成
CompletableFuture.allOf(orderFuture, inventoryFuture, paymentFuture).join();

// 检查结果并继续...
}

2. 空Confirm优化

如果Confirm阶段没有额外操作,可以在Try阶段直接提交:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// 传统TCC: Try阶段只预留资源
public boolean tryReserveInventory(String transactionId, List<OrderItem> items) {
freezeInventory(items); // 冻结,但不真正扣减
return true;
}

public void confirmReserveInventory(String transactionId) {
deductInventory(transactionId); // Confirm阶段才真正扣减
}

// 空Confirm优化: Try阶段直接提交
public boolean tryReserveInventory(String transactionId, List<OrderItem> items) {
deductInventory(items); // 直接扣减库存
return true;
}

public void confirmReserveInventory(String transactionId) {
// 空实现
}

3. 缓存预热

对热点商品的库存信息进行缓存预热,减少数据库访问:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
public boolean tryDeductFromCache(Long productId, Integer quantity) {
String key = "inventory:" + productId;

// 使用Lua脚本保证原子性
String script =
"local current = redis.call('get', KEYS[1]) " +
"if current and tonumber(current) >= tonumber(ARGV[1]) then " +
" redis.call('decrby', KEYS[1], ARGV[1]) " +
" return 1 " +
"else " +
" return 0 " +
"end";

Long result = redis.execute(script, Collections.singletonList(key), Collections.singletonList(quantity));
return result == 1;
}

3.5 监控和告警

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// 1. 事务耗时监控(伪代码)
@Around("@annotation(TccTransaction)")
public Object monitor(ProceedingJoinPoint joinPoint) throws Throwable {
String transactionId = extractTransactionId(joinPoint);
long startTime = System.currentTimeMillis();

try {
Object result = joinPoint.proceed();
long duration = System.currentTimeMillis() - startTime;

// 记录耗时
logger.info("事务耗时: {}, {}ms", transactionId, duration);

// 如果耗时超过阈值,发送告警
if (duration > 1000) {
alertService.send("事务耗时过长: " + transactionId);
}

return result;
} catch (Exception e) {
logger.error("事务失败: {}", transactionId, e);
throw e;
}
}

// 2. 补偿失败监控(伪代码)
@Scheduled(fixedDelay = 60000) // 每分钟检查一次
public void checkFailedCompensation() {
List<CompensationRecord> failedRecords = database.query(
"SELECT * FROM compensation_record WHERE status='FAILED' AND retry_count>3"
);

if (failedRecords.size() > 0) {
alertService.send("有" + failedRecords.size() + "条补偿记录失败,需要人工介入");
}
}

四、方案对比与选型建议

4.1 方案对比表

方案 一致性 性能 复杂度 业务侵入 适用场景
2PC 强一致 金融核心交易
3PC 强一致 对一致性要求极高
TCC 最终一致 高并发,强一致性要求
Saga 最终一致 长事务,可接受中间状态
本地消息表 最终一致 简单场景,延迟不敏感
MQ事务消息 最终一致 使用RocketMQ的项目

4.2 选型建议

场景1:金融交易(如转账)

  • 推荐方案: TCC
  • 理由: 需要强一致性,TCC能保证资金安全,性能也可接受

场景2:电商下单

  • 推荐方案: TCC(核心流程) + 本地消息表(非核心流程)
  • 理由: 订单、库存、支付需要强一致性,积分、通知可以异步处理

场景3:物流跟踪

  • 推荐方案: Saga
  • 理由: 流程长,各个环节可以定义补偿操作,可接受中间状态

场景4:数据同步

  • 推荐方案: 本地消息表 或 MQ事务消息
  • 理由: 对实时性要求不高,实现简单,成本低

4.3 混合方案

在实际项目中,通常会组合多种方案:

1
2
3
4
5
6
7
8
9
10
11
电商系统:
├── 核心交易链路
│ ├── 订单服务 ──┐
│ ├── 库存服务 ──┼─── TCC事务
│ └── 支付服务 ──┘

└── 非核心链路
├── 积分服务 ──┐
├── 优惠券服务 ├─── 本地消息表
├── 通知服务 ──┤
└── 数据分析 ──┘

五、最佳实践

5.1 设计原则

1. 尽量避免分布式事务

  • 能用单体实现的,不要拆分成微服务
  • 能用单个服务完成的,不要跨多个服务

2. 业务驱动,而非技术驱动

  • 根据业务特点选择合适的方案
  • 不要为了用技术而用技术

3. 优先考虑最终一致性

  • 大部分业务场景都可以接受最终一致性
  • 强一致性的代价很高

5.2 实现建议

1. 幂等性设计

方法1: 唯一业务ID

1
2
3
4
5
6
7
8
public void processOrder(String orderId) {
// 检查是否已经处理过
if (database.exists(orderId)) {
return; // 已处理,直接返回
}

// 处理订单...
}

方法2: 状态机

1
2
3
4
5
6
7
8
9
10
11
public void confirmOrder(String orderId) {
Order order = database.findById(orderId);

// 只有待确认状态才能确认
if (!"PENDING".equals(order.getStatus())) {
return; // 已经确认过了
}

order.setStatus("CONFIRMED");
database.save(order);
}

方法3: 唯一约束

1
2
3
4
5
6
7
8
-- 在数据库层面保证幂等
CREATE TABLE order_process_record (
order_id VARCHAR(64) PRIMARY KEY, -- 唯一约束
process_time DATETIME
);

-- 插入时如果已存在会报错,说明已经处理过
INSERT INTO order_process_record (order_id, process_time) VALUES (?, NOW());

2. 超时处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public boolean tryWithTimeout(Callable<Boolean> task, long timeoutMs) {
ExecutorService executor = Executors.newSingleThreadExecutor();
Future<Boolean> future = executor.submit(task);

try {
return future.get(timeoutMs, TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
future.cancel(true);
return false;
} catch (Exception e) {
return false;
} finally {
executor.shutdown();
}
}

3. 补偿重试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public void compensateWithRetry(Runnable compensation, int maxRetries) {
int retryCount = 0;

while (retryCount < maxRetries) {
try {
compensation.run();
return; // 成功
} catch (Exception e) {
retryCount++;

if (retryCount >= maxRetries) {
// 超过最大重试次数,人工介入
saveToManualProcessQueue(compensation);
} else {
// 等待一段时间后重试(指数退避)
Thread.sleep(1000 * (long)Math.pow(2, retryCount));
}
}
}
}

5.3 运维建议

1. 日志记录

关键节点记录日志:

  • 事务ID
  • 执行阶段(Try/Confirm/Cancel)
  • 执行结果(成功/失败)
  • 执行耗时

2. 监控指标

需要监控的指标:

  • 事务成功率
  • 事务平均耗时
  • 补偿失败次数
  • 待人工处理任务数

3. 人工介入机制

1
2
3
4
5
6
7
8
9
10
11
-- 人工处理队列表
CREATE TABLE manual_process_queue (
id BIGINT PRIMARY KEY AUTO_INCREMENT,
transaction_id VARCHAR(64) NOT NULL
task_type VARCHAR(50) NOT NULL COMMENT 'COMPENSATION/TIMEOUT/ERROR'
task_content TEXT COMMENT '任务详情(JSON)'
status VARCHAR(20) NOT NULL COMMENT 'PENDING/PROCESSING/COMPLETED'
operator VARCHAR(50) COMMENT '处理人'
create_time DATETIME NOT NULL
process_time DATETIME
);

六、总结

6.1 核心要点

  1. CAP定理: 分布式系统无法同时满足一致性、可用性和分区容错性
  2. BASE理论: 实际工程中通过牺牲强一致性,换取可用性和性能
  3. 方案选择: 根据业务场景选择合适的一致性方案
  4. 混合方案: 核心链路用强一致性方案,非核心链路用最终一致性方案
  5. 关键设计: 幂等性、超时处理、补偿重试、人工介入

6.2 实战经验

  • 不要过度设计: 不是所有场景都需要分布式事务
  • 优先异步: 能异步的就不要同步
  • 做好监控: 完善的监控比完美的方案更重要
  • 预留后门: 总会有意外情况,要有人工介入机制

6.3 进阶阅读

分布式数据一致性是一个复杂的话题,没有银弹,只有根据具体业务场景做出合适的权衡。