引言:为什么数据一致性这么难?
在单体应用时代,保证数据一致性很简单:一个数据库,一个事务,搞定!
1 2 3 4 5 6 7
| @Transactional public void transferMoney(Long fromAccount, Long toAccount, BigDecimal amount) { accountDao.deduct(fromAccount, amount); accountDao.add(toAccount, amount); }
|
但当我们拆分成微服务后,一切都变了:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public void createOrder(OrderRequest request) { Order order = orderService.create(request);
inventoryService.deduct(request.getItems());
paymentService.pay(order.getTotalAmount());
}
|
核心问题:
- 跨服务操作:每个服务有自己的数据库,无法使用传统的ACID事务
- 网络不可靠:服务调用可能失败、超时、重复
- 部分失败:某些操作成功,某些操作失败,如何回滚?
- 高并发场景:如何在保证一致性的同时,不牺牲太多性能?
本文将从理论到实战,深入讲解分布式微服务下的数据一致性解决方案。
一、理解分布式数据一致性
1.1 CAP定理:不可能三角
在分布式系统中,有一个著名的CAP定理:
- C (Consistency): 一致性 - 所有节点同一时间看到相同的数据
- A (Availability): 可用性 - 每个请求都能得到响应(成功或失败)
- P (Partition Tolerance): 分区容错性 - 网络分区时系统仍能运行
CAP定理核心:一个分布式系统最多只能同时满足其中两项。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| 一致性(C) /\ / \ / \ / \ / \ / CP \ / \ / \ / CAP? \ / \ / AP \ /________________________\ 可用性(A) 分区容错(P)
|
在网络分区(P)必然存在的现实下,我们只能在C和A之间权衡:
CP系统:牺牲可用性,保证一致性
- 例:传统关系型数据库(MySQL、PostgreSQL)
- 场景:金融交易、订单系统
AP系统:牺牲一致性,保证可用性
- 例:NoSQL数据库(Cassandra、DynamoDB)
- 场景:社交媒体的点赞数、浏览量统计
1.2 BASE理论:现实的妥协
由于CAP定理的限制,实际工程中我们采用BASE理论:
- BA (Basically Available): 基本可用 - 系统保证基本可用,允许损失部分可用性
- S (Soft state): 软状态 - 允许系统中的数据存在中间状态
- E (Eventually Consistent): 最终一致性 - 经过一段时间后,数据最终达到一致
BASE vs ACID:
| 特性 |
ACID(单体) |
BASE(分布式) |
| 一致性 |
强一致性 |
最终一致性 |
| 隔离性 |
严格隔离 |
允许中间状态 |
| 可用性 |
可能阻塞 |
高可用 |
| 适用场景 |
单一数据库 |
分布式系统 |
1.3 一致性模型分类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| 强一致性 ├── 线性一致性 (Linearizability) │ └── 最严格,任何读操作都能读到最新写入的数据 │ 示例:单机数据库的事务 │ └── 顺序一致性 (Sequential Consistency) └── 所有进程看到的操作顺序相同 示例:分布式锁
弱一致性 ├── 会话一致性 (Session Consistency) │ └── 同一会话内保证一致性 │ 示例:购物车服务 - 用户自己能看到刚加的商品 │ ├── 因果一致性 (Causal Consistency) │ └── 有因果关系的操作保证顺序 │ 示例:社交媒体评论 - 先发帖后评论 │ └── 最终一致性 (Eventual Consistency) └── 一段时间后达到一致 示例:DNS、缓存同步
|
二、分布式事务解决方案
2.1 两阶段提交(2PC)
2PC是最经典的分布式事务协议,分为准备阶段和提交阶段。
工作流程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| 协调者(Coordinator) 参与者(Participants) [订单服务] [库存服务] [支付服务] | | | | |------- 1.准备阶段(Prepare) ---->| | | | "你们能执行事务吗?" | | | | | | | |<------ 投票(Vote) --------------| | | | "我准备好了!" | | | |<--------------------------------| | | | "我也准备好了!" | | |<--------------------------------------------| | | "我也OK!" | |<--------------------------------------------------------| | | |------- 2.提交阶段(Commit) ------------------------>全部| | "所有人提交!" | | | | | |<------ 确认(ACK) ---------------| | | |<--------------------------------| | | |<--------------------------------------------| |
|
核心伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53
| public TransactionResult executeTransaction(TransactionRequest request) { String transactionId = generateId(); List<TransactionParticipant> participants = new ArrayList<>();
try { for (TransactionParticipant service : Arrays.asList(orderService, inventoryService, paymentService)) { if (!service.prepare(transactionId, request)) { throw new RuntimeException("准备失败"); } participants.add(service); }
for (TransactionParticipant participant : participants) { participant.commit(transactionId); }
return TransactionResult.SUCCESS;
} catch (Exception e) { for (TransactionParticipant participant : participants) { participant.rollback(transactionId); } return TransactionResult.FAILURE; } }
public boolean prepare(String transactionId, OrderData data) { if (!valid(data)) { return false; }
Order tempOrder = createTempOrder(data); cache.put(transactionId, tempOrder);
return true; }
public void commit(String transactionId) { Order tempOrder = cache.get(transactionId); database.save(tempOrder); cache.remove(transactionId); }
public void rollback(String transactionId) { cache.remove(transactionId); }
|
2PC的优缺点
优点:
缺点:
- 同步阻塞: 所有参与者在准备阶段后会阻塞等待协调者的指令
- 单点故障: 协调者宕机会导致所有参与者一直阻塞
- 数据不一致风险: 如果协调者在发送commit消息时宕机,部分参与者收到commit,部分没收到
- 性能差: 需要多次网络通信,且所有参与者需要等待最慢的那个
适用场景: 对一致性要求极高,但并发量不大的场景,如金融核心交易。
2.2 三阶段提交(3PC)
3PC是2PC的改进版本,增加了超时机制和PreCommit阶段。
工作流程
1 2 3 4 5 6 7 8 9 10 11 12
| 阶段1: CanCommit (询问阶段) 协调者询问: "你能执行事务吗?" 参与者回复: "可以" 或 "不可以"
阶段2: PreCommit (预提交阶段) 协调者发送: "准备提交" 参与者写redo/undo日志,但不提交 参与者回复: "准备完成"
阶段3: DoCommit (提交阶段) 协调者发送: "提交" 或 "回滚" 参与者执行提交或回滚
|
3PC vs 2PC:
- 3PC增加了超时机制,减少阻塞
- 参与者在超时后可以自动提交,降低单点故障影响
- 但仍然存在数据不一致的风险
2.3 TCC (Try-Confirm-Cancel)
TCC是一种补偿型事务,将业务操作拆分成三个阶段。
核心思想
1 2 3
| Try阶段: 尝试执行,预留资源 Confirm阶段: 确认执行,使用预留的资源 Cancel阶段: 取消执行,释放预留的资源
|
实战案例:电商下单
场景:用户下单,需要扣减库存、扣减余额、创建订单。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90
| public TransactionResult executeTransaction(OrderRequest request) { String transactionId = generateId();
try { Long orderId = orderService.tryCreateOrder(transactionId, request);
if (!inventoryService.tryReserveInventory(transactionId, request.getItems())) { throw new RuntimeException("库存不足"); }
if (!accountService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount())) { throw new RuntimeException("余额不足"); }
orderService.confirmCreateOrder(transactionId); inventoryService.confirmReserveInventory(transactionId); accountService.confirmReserveBalance(transactionId);
return TransactionResult.SUCCESS;
} catch (Exception e) { orderService.cancelCreateOrder(transactionId); inventoryService.cancelReserveInventory(transactionId); accountService.cancelReserveBalance(transactionId);
return TransactionResult.FAILURE; } }
public boolean tryReserveInventory(String transactionId, List<OrderItem> items) { for (OrderItem item : items) { Inventory inventory = getInventory(item.getProductId());
if (inventory.getAvailable() < item.getQuantity()) { return false; }
inventory.setAvailable(inventory.getAvailable() - item.getQuantity()); inventory.setFrozen(inventory.getFrozen() + item.getQuantity()); updateInventory(inventory);
saveFrozenRecord(transactionId, item.getProductId(), item.getQuantity(), "FROZEN"); }
return true; }
public void confirmReserveInventory(String transactionId) { List<FrozenRecord> frozenRecords = getFrozenRecords(transactionId);
for (FrozenRecord record : frozenRecords) { Inventory inventory = getInventory(record.getProductId());
inventory.setTotal(inventory.getTotal() - record.getQuantity()); inventory.setFrozen(inventory.getFrozen() - record.getQuantity()); updateInventory(inventory);
updateRecordStatus(record, "CONFIRMED"); } }
public void cancelReserveInventory(String transactionId) { List<FrozenRecord> frozenRecords = getFrozenRecords(transactionId);
for (FrozenRecord record : frozenRecords) { if ("CANCELLED".equals(record.getStatus())) { continue; }
Inventory inventory = getInventory(record.getProductId());
inventory.setAvailable(inventory.getAvailable() + record.getQuantity()); inventory.setFrozen(inventory.getFrozen() - record.getQuantity()); updateInventory(inventory);
updateRecordStatus(record, "CANCELLED"); } }
|
数据库设计
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| CREATE TABLE inventory ( id BIGINT PRIMARY KEY AUTO_INCREMENT, product_id BIGINT NOT NULL, total_quantity INT NOT NULL COMMENT '总库存', available_quantity INT NOT NULL COMMENT '可用库存', frozen_quantity INT NOT NULL COMMENT '冻结库存', INDEX idx_product_id (product_id) );
CREATE TABLE frozen_inventory ( id BIGINT PRIMARY KEY AUTO_INCREMENT, transaction_id VARCHAR(64) NOT NULL COMMENT '事务ID', product_id BIGINT NOT NULL, quantity INT NOT NULL, status VARCHAR(20) NOT NULL COMMENT 'FROZEN/CONFIRMED/CANCELLED', create_time DATETIME NOT NULL, INDEX idx_transaction_id (transaction_id) );
|
TCC的关键要点
1. 资源预留
Try阶段必须预留资源(如冻结库存、冻结余额),而不是直接操作。
2. 幂等性
Confirm和Cancel操作必须支持幂等,因为可能会重复调用。
1 2 3 4 5 6 7 8 9 10 11
| public void confirmDeductInventory(String transactionId) { FrozenRecord record = getFrozenRecord(transactionId);
if ("CONFIRMED".equals(record.getStatus())) { return; }
}
|
3. 空回滚
如果Try阶段失败,没有执行成功,Cancel阶段也要能正常处理。
1 2 3 4 5 6 7 8 9 10
| public void cancelDeductInventory(String transactionId) { FrozenRecord record = getFrozenRecord(transactionId);
if (record == null) { return; }
}
|
4. 悬挂处理
Cancel先于Try执行的情况(网络延迟导致)。
1 2 3 4 5 6 7 8 9
| public boolean tryDeductInventory(String transactionId, List<OrderItem> items) { FrozenRecord record = getFrozenRecord(transactionId); if (record != null && "CANCELLED".equals(record.getStatus())) { return false; }
}
|
TCC的优缺点
优点:
- 不需要锁定资源,性能较好
- 没有阻塞,各服务可以并发执行
- 适合长事务场景
缺点:
- 业务侵入性强,需要实现Try、Confirm、Cancel三个接口
- 开发成本高,需要处理幂等、空回滚、悬挂等问题
- 一致性较弱,属于最终一致性
适用场景:
- 对性能要求较高的场景
- 需要跨多个服务的复杂业务
- 可以接受最终一致性的场景
开源框架:
- Seata(阿里开源)
- ByteTCC
- TCC-Transaction
2.4 Saga模式
Saga是一种长事务解决方案,将一个分布式事务拆分成多个本地事务,每个本地事务都有对应的补偿事务。
两种实现方式
1. 基于事件的编排(Choreography)
每个服务在完成本地事务后,发布事件触发下一个服务。
1 2 3 4 5
| 订单服务 --[订单创建事件]--> 库存服务 --[库存扣减事件]--> 支付服务 | | | |<----[支付失败事件]-------------|<----[支付失败事件]---------| | | | 回滚订单 回滚库存 X
|
2. 基于协调的编排(Orchestration)
由一个协调器(Orchestrator)负责调度各个服务。
1 2 3 4 5
| Saga 协调器 | /---------+---------\ | | | 订单服务 库存服务 支付服务
|
Saga核心伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public TransactionResult executeSaga(OrderRequest request) { String sagaId = generateId(); List<CompletedStep> completedSteps = new ArrayList<>();
try { Order order = orderService.createOrder(request); completedSteps.add(new CompletedStep("CREATE_ORDER", order.getId()));
inventoryService.deductInventory(request.getItems()); completedSteps.add(new CompletedStep("DEDUCT_INVENTORY", request.getItems()));
paymentService.pay(order.getUserId(), order.getAmount()); completedSteps.add(new CompletedStep("PAYMENT", order.getAmount()));
return TransactionResult.SUCCESS;
} catch (Exception e) { compensate(completedSteps); return TransactionResult.FAILURE; } }
public void compensate(List<CompletedStep> completedSteps) { Collections.reverse(completedSteps); for (CompletedStep step : completedSteps) { try { switch (step.getName()) { case "PAYMENT": paymentService.refund(step.getData()); break; case "DEDUCT_INVENTORY": inventoryService.restoreInventory(step.getData()); break; case "CREATE_ORDER": orderService.cancelOrder(step.getData()); break; } } catch (Exception e) { logError("补偿失败: " + step.getName()); } } }
|
基于事件的Saga实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@EventListener public void onUserPlaceOrderEvent(UserPlaceOrderEvent event) { Order order = createOrder(event.getRequest()); eventPublisher.publish(new OrderCreatedEvent(order)); }
@EventListener public void onOrderCreatedEvent(OrderCreatedEvent event) { try { deductInventory(event.getOrder().getItems()); eventPublisher.publish(new InventoryDeductedEvent(event.getOrder())); } catch (Exception e) { eventPublisher.publish(new InventoryDeductionFailedEvent(event.getOrder())); } }
@EventListener public void onInventoryDeductedEvent(InventoryDeductedEvent event) { try { pay(event.getOrder()); eventPublisher.publish(new PaymentSucceededEvent(event.getOrder())); } catch (Exception e) { eventPublisher.publish(new PaymentFailedEvent(event.getOrder())); } }
@EventListener public void onPaymentFailedEvent(PaymentFailedEvent event) { restoreInventory(event.getOrder().getItems()); eventPublisher.publish(new InventoryRestoredEvent(event.getOrder())); }
@EventListener public void onInventoryRestoredEvent(InventoryRestoredEvent event) { cancelOrder(event.getOrder().getId()); }
|
Saga的优缺点
优点:
- 适合长事务场景
- 无需锁定资源,性能好
- 参与方可以异步执行
- 业务侵入性相对TCC较低
缺点:
- 不保证隔离性,可能出现中间状态
- 补偿逻辑复杂,需要仔细设计
- 事件编排模式下,服务间依赖关系复杂
Saga vs TCC:
| 特性 |
Saga |
TCC |
| 事务类型 |
正向事务+补偿事务 |
Try+Confirm+Cancel |
| 资源锁定 |
无需锁定 |
需要预留资源 |
| 隔离性 |
较弱 |
较强 |
| 性能 |
较好 |
一般 |
| 业务侵入 |
中等 |
较高 |
| 适用场景 |
长事务 |
短事务 |
适用场景:
- 业务流程较长的场景(如订单流程)
- 对性能要求高,可以接受短暂的中间状态
- 各个步骤可以定义明确的补偿操作
开源框架:
- Apache ServiceComb Pack
- Eventuate Tram Saga
2.5 本地消息表
本地消息表是一种基于消息的最终一致性方案。
核心思想
在本地事务中,同时更新业务数据和消息表,通过定时任务将消息发送到消息队列。
1 2 3 4 5 6 7 8 9 10 11 12
| 1. 本地事务中: - 执行业务操作(如创建订单) - 插入消息到本地消息表
2. 定时任务: - 扫描未发送的消息 - 发送到消息队列 - 标记消息为已发送
3. 消费者: - 消费消息 - 执行业务操作(如扣减库存)
|
核心伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
| @Transactional public void createOrder(OrderRequest request) { Order order = new Order(request); database.save(order);
LocalMessage message = new LocalMessage(); message.setTopic("order.created"); message.setContent(toJSON(order)); message.setStatus("PENDING"); database.save(message);
}
@Scheduled(fixedDelay = 5000) public void sendPendingMessages() { List<LocalMessage> pendingMessages = database.query( "SELECT * FROM local_message WHERE status='PENDING' AND retry_count<3" );
for (LocalMessage message : pendingMessages) { try { kafka.send(message.getTopic(), message.getContent());
message.setStatus("SENT"); message.setSendTime(now()); database.update(message);
} catch (Exception e) { message.setRetryCount(message.getRetryCount() + 1); if (message.getRetryCount() >= 3) { message.setStatus("FAILED"); } database.update(message); } } }
@KafkaListener(topic = "order.created") public void handleOrderCreated(String message) { Order order = fromJSON(message);
if (hasProcessed(order.getId())) { return; }
inventoryService.deductInventory(order.getItems());
saveConsumeRecord(order.getId()); }
|
本地消息表的关键点
1. 本地事务保证
业务操作和消息插入在同一个本地事务中,保证要么都成功,要么都失败。
2. 定时扫描
通过定时任务扫描未发送的消息,确保消息一定会被发送。
3. 幂等性
消费者必须保证幂等性,因为可能会收到重复的消息。
4. 消息可靠性
- 定时任务保证消息最终一定会发送
- 重试机制处理发送失败
- 超过最大重试次数的消息需要人工介入
本地消息表的优缺点
优点:
- 实现简单,容易理解
- 不依赖第三方分布式事务框架
- 消息一定会被发送(定时扫描保证)
缺点:
- 对业务有侵入,需要创建消息表
- 需要定时任务,有一定延迟
- 需要消费者保证幂等性
适用场景:
- 对一致性要求不高,可以接受短暂延迟
- 希望实现简单,不依赖复杂框架
- 消息量不是特别大的场景
2.6 MQ事务消息
部分消息队列(如RocketMQ)提供了事务消息功能,可以保证本地事务和消息发送的一致性。
RocketMQ事务消息原理
1 2 3 4 5 6 7
| 1. 生产者发送半事务消息(Half Message)到MQ 2. MQ收到消息,暂不投递给消费者 3. 生产者执行本地事务 4. 生产者向MQ确认本地事务状态: - Commit: 消息可以被消费 - Rollback: 删除消息 5. 如果MQ没收到确认,会回查生产者的本地事务状态
|
核心伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| public TransactionSendResult createOrder(OrderRequest request) { Message message = new Message("order_topic", toJSON(request));
TransactionSendResult result = transactionMQProducer.sendMessageInTransaction(message, request);
return result; }
class TransactionListenerImpl implements TransactionListener { @Override public LocalTransactionState executeLocalTransaction(Message message, Object arg) { try { OrderRequest request = (OrderRequest) arg; Order order = new Order(request); database.save(order);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) { return LocalTransactionState.ROLLBACK_MESSAGE; } }
@Override public LocalTransactionState checkLocalTransaction(MessageExt message) { String orderId = message.getKeys();
Order order = database.findById(orderId);
if (order != null) { return LocalTransactionState.COMMIT_MESSAGE; } else { return LocalTransactionState.ROLLBACK_MESSAGE; } } }
@RocketMQMessageListener(topic = "order_topic") public void onMessage(String message) { OrderRequest request = fromJSON(message);
inventoryService.deductInventory(request.getItems()); }
|
MQ事务消息的优缺点
优点:
- 保证本地事务和消息发送的一致性
- 无需本地消息表,实现简洁
- 消息队列提供回查机制,可靠性高
缺点:
- 依赖特定的消息队列(RocketMQ)
- 需要实现事务回查接口
- 有一定的性能开销
适用场景:
- 使用RocketMQ的项目
- 对一致性要求高,希望实现简洁
- 需要可靠的消息投递保证
三、实战:电商下单场景的完整解决方案
3.1 业务场景分析
电商下单是典型的分布式事务场景:
1 2 3 4 5
| 用户下单 ──> 创建订单 ├──> 扣减库存 ├──> 扣减余额 ├──> 创建支付单 └──> 发送通知
|
挑战:
- 多个服务调用,任何一个失败都要回滚
- 高并发场景,性能要求高
- 用户体验要好,不能让用户等太久
3.2 方案选型
根据业务特点,我们采用TCC + 本地消息表的混合方案:
| 服务 |
方案 |
原因 |
| 订单服务 |
TCC |
需要快速创建订单 |
| 库存服务 |
TCC |
需要实时扣减库存 |
| 支付服务 |
TCC |
需要实时扣减余额 |
| 积分服务 |
本地消息表 |
可以异步处理 |
| 通知服务 |
本地消息表 |
可以异步处理 |
3.3 完整流程伪代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| public OrderResult placeOrder(OrderRequest request) { String transactionId = generateId(); List<String> completedSteps = new ArrayList<>();
try {
Long orderId = orderTccService.tryCreateOrder(transactionId, request); completedSteps.add("ORDER");
if (!inventoryTccService.tryReserveInventory(transactionId, request.getItems())) { throw new RuntimeException("库存不足"); } completedSteps.add("INVENTORY");
if (!paymentTccService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount())) { throw new RuntimeException("余额不足"); } completedSteps.add("PAYMENT");
orderTccService.confirmCreateOrder(transactionId); inventoryTccService.confirmReserveInventory(transactionId); paymentTccService.confirmReserveBalance(transactionId);
localMessageService.sendMessage("points.add", new PointsAddEvent(request.getUserId(), points));
localMessageService.sendMessage("order.notification", new OrderNotificationEvent(orderId));
return OrderResult.success(orderId);
} catch (Exception e) {
if (completedSteps.contains("PAYMENT")) { paymentTccService.cancelReserveBalance(transactionId); } if (completedSteps.contains("INVENTORY")) { inventoryTccService.cancelReserveInventory(transactionId); } if (completedSteps.contains("ORDER")) { orderTccService.cancelCreateOrder(transactionId); }
return OrderResult.failure(e.getMessage()); } }
|
3.4 性能优化
1. 并行Try
将多个Try操作并行执行,减少总耗时:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public OrderResult placeOrder(OrderRequest request) { String transactionId = generateId();
CompletableFuture<Long> orderFuture = CompletableFuture.supplyAsync( () -> orderTccService.tryCreateOrder(transactionId, request) ); CompletableFuture<Boolean> inventoryFuture = CompletableFuture.supplyAsync( () -> inventoryTccService.tryReserveInventory(transactionId, request.getItems()) ); CompletableFuture<Boolean> paymentFuture = CompletableFuture.supplyAsync( () -> paymentTccService.tryReserveBalance(transactionId, request.getUserId(), request.getAmount()) );
CompletableFuture.allOf(orderFuture, inventoryFuture, paymentFuture).join();
}
|
2. 空Confirm优化
如果Confirm阶段没有额外操作,可以在Try阶段直接提交:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public boolean tryReserveInventory(String transactionId, List<OrderItem> items) { freezeInventory(items); return true; }
public void confirmReserveInventory(String transactionId) { deductInventory(transactionId); }
public boolean tryReserveInventory(String transactionId, List<OrderItem> items) { deductInventory(items); return true; }
public void confirmReserveInventory(String transactionId) { }
|
3. 缓存预热
对热点商品的库存信息进行缓存预热,减少数据库访问:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| public boolean tryDeductFromCache(Long productId, Integer quantity) { String key = "inventory:" + productId;
String script = "local current = redis.call('get', KEYS[1]) " + "if current and tonumber(current) >= tonumber(ARGV[1]) then " + " redis.call('decrby', KEYS[1], ARGV[1]) " + " return 1 " + "else " + " return 0 " + "end";
Long result = redis.execute(script, Collections.singletonList(key), Collections.singletonList(quantity)); return result == 1; }
|
3.5 监控和告警
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| @Around("@annotation(TccTransaction)") public Object monitor(ProceedingJoinPoint joinPoint) throws Throwable { String transactionId = extractTransactionId(joinPoint); long startTime = System.currentTimeMillis();
try { Object result = joinPoint.proceed(); long duration = System.currentTimeMillis() - startTime;
logger.info("事务耗时: {}, {}ms", transactionId, duration);
if (duration > 1000) { alertService.send("事务耗时过长: " + transactionId); }
return result; } catch (Exception e) { logger.error("事务失败: {}", transactionId, e); throw e; } }
@Scheduled(fixedDelay = 60000) public void checkFailedCompensation() { List<CompensationRecord> failedRecords = database.query( "SELECT * FROM compensation_record WHERE status='FAILED' AND retry_count>3" );
if (failedRecords.size() > 0) { alertService.send("有" + failedRecords.size() + "条补偿记录失败,需要人工介入"); } }
|
四、方案对比与选型建议
4.1 方案对比表
| 方案 |
一致性 |
性能 |
复杂度 |
业务侵入 |
适用场景 |
| 2PC |
强一致 |
低 |
中 |
低 |
金融核心交易 |
| 3PC |
强一致 |
低 |
高 |
低 |
对一致性要求极高 |
| TCC |
最终一致 |
高 |
高 |
高 |
高并发,强一致性要求 |
| Saga |
最终一致 |
高 |
中 |
中 |
长事务,可接受中间状态 |
| 本地消息表 |
最终一致 |
中 |
低 |
中 |
简单场景,延迟不敏感 |
| MQ事务消息 |
最终一致 |
中 |
低 |
低 |
使用RocketMQ的项目 |
4.2 选型建议
场景1:金融交易(如转账)
- 推荐方案: TCC
- 理由: 需要强一致性,TCC能保证资金安全,性能也可接受
场景2:电商下单
- 推荐方案: TCC(核心流程) + 本地消息表(非核心流程)
- 理由: 订单、库存、支付需要强一致性,积分、通知可以异步处理
场景3:物流跟踪
- 推荐方案: Saga
- 理由: 流程长,各个环节可以定义补偿操作,可接受中间状态
场景4:数据同步
- 推荐方案: 本地消息表 或 MQ事务消息
- 理由: 对实时性要求不高,实现简单,成本低
4.3 混合方案
在实际项目中,通常会组合多种方案:
1 2 3 4 5 6 7 8 9 10 11
| 电商系统: ├── 核心交易链路 │ ├── 订单服务 ──┐ │ ├── 库存服务 ──┼─── TCC事务 │ └── 支付服务 ──┘ │ └── 非核心链路 ├── 积分服务 ──┐ ├── 优惠券服务 ├─── 本地消息表 ├── 通知服务 ──┤ └── 数据分析 ──┘
|
五、最佳实践
5.1 设计原则
1. 尽量避免分布式事务
- 能用单体实现的,不要拆分成微服务
- 能用单个服务完成的,不要跨多个服务
2. 业务驱动,而非技术驱动
- 根据业务特点选择合适的方案
- 不要为了用技术而用技术
3. 优先考虑最终一致性
- 大部分业务场景都可以接受最终一致性
- 强一致性的代价很高
5.2 实现建议
1. 幂等性设计
方法1: 唯一业务ID
1 2 3 4 5 6 7 8
| public void processOrder(String orderId) { if (database.exists(orderId)) { return; }
}
|
方法2: 状态机
1 2 3 4 5 6 7 8 9 10 11
| public void confirmOrder(String orderId) { Order order = database.findById(orderId);
if (!"PENDING".equals(order.getStatus())) { return; }
order.setStatus("CONFIRMED"); database.save(order); }
|
方法3: 唯一约束
1 2 3 4 5 6 7 8
| CREATE TABLE order_process_record ( order_id VARCHAR(64) PRIMARY KEY, process_time DATETIME );
INSERT INTO order_process_record (order_id, process_time) VALUES (?, NOW());
|
2. 超时处理
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public boolean tryWithTimeout(Callable<Boolean> task, long timeoutMs) { ExecutorService executor = Executors.newSingleThreadExecutor(); Future<Boolean> future = executor.submit(task);
try { return future.get(timeoutMs, TimeUnit.MILLISECONDS); } catch (TimeoutException e) { future.cancel(true); return false; } catch (Exception e) { return false; } finally { executor.shutdown(); } }
|
3. 补偿重试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| public void compensateWithRetry(Runnable compensation, int maxRetries) { int retryCount = 0;
while (retryCount < maxRetries) { try { compensation.run(); return; } catch (Exception e) { retryCount++;
if (retryCount >= maxRetries) { saveToManualProcessQueue(compensation); } else { Thread.sleep(1000 * (long)Math.pow(2, retryCount)); } } } }
|
5.3 运维建议
1. 日志记录
关键节点记录日志:
- 事务ID
- 执行阶段(Try/Confirm/Cancel)
- 执行结果(成功/失败)
- 执行耗时
2. 监控指标
需要监控的指标:
- 事务成功率
- 事务平均耗时
- 补偿失败次数
- 待人工处理任务数
3. 人工介入机制
1 2 3 4 5 6 7 8 9 10 11
| CREATE TABLE manual_process_queue ( id BIGINT PRIMARY KEY AUTO_INCREMENT, transaction_id VARCHAR(64) NOT NULL, task_type VARCHAR(50) NOT NULL COMMENT 'COMPENSATION/TIMEOUT/ERROR', task_content TEXT COMMENT '任务详情(JSON)', status VARCHAR(20) NOT NULL COMMENT 'PENDING/PROCESSING/COMPLETED', operator VARCHAR(50) COMMENT '处理人', create_time DATETIME NOT NULL, process_time DATETIME );
|
六、总结
6.1 核心要点
- CAP定理: 分布式系统无法同时满足一致性、可用性和分区容错性
- BASE理论: 实际工程中通过牺牲强一致性,换取可用性和性能
- 方案选择: 根据业务场景选择合适的一致性方案
- 混合方案: 核心链路用强一致性方案,非核心链路用最终一致性方案
- 关键设计: 幂等性、超时处理、补偿重试、人工介入
6.2 实战经验
- 不要过度设计: 不是所有场景都需要分布式事务
- 优先异步: 能异步的就不要同步
- 做好监控: 完善的监控比完美的方案更重要
- 预留后门: 总会有意外情况,要有人工介入机制
6.3 进阶阅读
论文:
- 《Life beyond Distributed Transactions: an Apostate’s Opinion》
- 《Sagas》- Hector Garcia-Molina
开源框架:
书籍:
- 《数据密集型应用系统设计》(DDIA)
- 《微服务架构设计模式》
分布式数据一致性是一个复杂的话题,没有银弹,只有根据具体业务场景做出合适的权衡。