要解决数据库事务提交与消息发送的原子性问题(避免数据与消息状态不一致),需通过事务消息方案实现两阶段操作的事务保障。以下是经过大规模生产验证的四大方案:
方案1:本地消息表(最经典方案)
核心思想:将消息作为数据库事务的一部分
1 | sequenceDiagram |
关键实现:
创建消息事务表
1
2
3
4
5
6
7
8
9CREATE TABLE message_transaction (
id BIGINT AUTO_INCREMENT PRIMARY KEY,
biz_id VARCHAR(64) NOT NULL, -- 业务ID(如订单号)
topic VARCHAR(255) NOT NULL,
payload TEXT NOT NULL,
status TINYINT NOT NULL DEFAULT 0, -- 0:待发送 1:已发送 2:失败
retry_count INT DEFAULT 0,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);使用Spring事务管理器保证原子性
1
2
3
4
5
6
7
8
public void executeBusiness(String orderId) {
// 1. 更新业务表
orderDao.updateStatus(orderId, PAID);
// 2. 写入本地消息表 (同一个事务)
messageDao.insertMessage(new Message(orderId, "payment_success", ...));
}独立线程池异步发送+重试
1
2
3
4
5
6
7
8
9
10
11
12// 独立线程池,轮询本地消息表,发送到MQ,如果失败则重试
public void retryFailedMessages() {
List<Message> messages = messageDao.findFailedMessages();
messages.forEach(msg -> {
if (mqProducer.send(msg)) {
messageDao.updateStatus(msg.id, SENT);
} else {
messageDao.increaseRetryCount(msg.id);
}
});
}
方案2:RocketMQ事务消息(无侵入方案)
架构优势:无需自建消息表,依赖MQ中间件能力
1 | // 事务消息生产方 |
防丢失设计:
- 半消息先存Broker(消费者不可见)
- 生产者确认本地事务状态(COMMIT/ROLLBACK)
- Broker未收到确认时主动回查
- 消息持久化到磁盘+同步复制到Slave
方案3:事务同步监听(MySQL Binlog方案)
适用场景:无法修改业务代码的遗留系统
1 | graph LR |
实现步骤:
部署Canal Server捕获MySQL变更
过滤关键业务表的UPDATE事件
1
2
3
4
5
6
7
8
9// Canal消息处理器
public void handleBinlogEvent(Event event) {
if (isOrderPaidEvent(event)) {
// 构造领域事件消息
PaymentEvent paymentEvent = buildEvent(event);
// 发送消息到MQ
mqProducer.send(new Message("ORDER_PAID", paymentEvent));
}
}消费方实现幂等处理
1
2
3
4
5consumer.subscribe("ORDER_PAID", (msg) -> {
String uniqueId = msg.getProperty("biz_id");
if(idempotentChecker.processed(uniqueId)) return;
paymentService.handlePaymentEvent(msg);
});
优势:业务代码零改造
时延:毫秒级(取决于Binlog解析速度)
方案4:最大努力通知(最终一致)
适用场景:可接受分钟级延迟的非核心业务
架构设计:
事务成功后先写Redis标记状态
1
SET order:1001:msg_pending "1" EX 600 # 10分钟有效期
定时任务轮询补偿
1
2
3
4
5
6
7
8
9
10
11
public void notifyDownstream() {
Set<String> pendingOrders = redis.keys("order:*:msg_pending");
pendingOrders.forEach(orderId -> {
Order order = orderService.getById(orderId);
if (order.isPaid()) {
boolean success = notifyService.sendPaymentMsg(order);
if(success) redis.del("order:"+orderId+":msg_pending");
}
});
}
方案选型建议
| 方案 | 一致性级别 | 复杂度 | 适用场景 |
|---|---|---|---|
| 本地消息表 | 最终一致 | ★★☆ | 所有业务(推荐首选) |
| RocketMQ事务消息 | 最终一致 | ★★★ | 新项目+使用RocketMQ |
| MySQL Binlog监听 | 最终一致 | ★★★☆ | 遗留系统改造 |
| 最大努力通知 | 最终一致 | ★☆ | 可延迟的非核心业务 |
避坑指南:
- 消费方必须实现幂等(通过业务唯一ID)
- 本地消息表方案要控制重试次数(避免无限循环)
- 事务消息方案需建立消息状态监控大盘
- 所有方案都需要设计消息TTL(避免积压)
某电商支付系统数据:采用方案1后,在订单量日均1000万的场景下,消息丢失率从0.1%降至0.0001%,事务补偿延迟<500ms。核心关键点:将消息存储与业务数据放在同一个数据库事务中。
