RocketMQ事务消息的核心设计是通过”半消息机制+事务状态回查”实现分布式系统的事务最终一致性,以下是深度拆解:
RocketMQ事务消息(核心:消息驱动最终一致)
实现原理(两阶段提交)
1 | sequenceDiagram |
关键步骤详解
Half Message(半消息)
- 消息暂时存入
RMQ_SYS_TRANS_HALF_TOPIC(消费者不可见) - 规避:业务未执行先被消费的问题
- 消息暂时存入
本地事务执行
1
2
3
4
5
6
7
8
9
10
11
12
13// 事务监听器伪代码
public class OrderTransactionListener implements TransactionListener {
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
try {
// 执行数据库事务(如创建订单)
createOrder(arg);
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}事务状态回查(兜底机制)
触发场景:生产者宕机/网络超时未返回事务状态
Broker行为:
- 每隔30秒扫描半消息(可配置)
- 回调生产者的
checkLocalTransaction方法
生产者需实现:
1
2
3
4
5
6
7
8
9
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
// 根据msgKey查询本地事务状态
Order order = orderDao.findByMsgKey(msg.getKeys());
if(order != null && order.getStatus()== PAID)
return LocalTransactionState.COMMIT_MESSAGE;
else
return LocalTransactionState.ROLLBACK_MESSAGE;
}
异常场景处理
| 故障类型 | RocketMQ处理策略 | 业务注意事项 |
|---|---|---|
| 生产者宕机 | 依赖Broker回查机制 | 需保证事务状态查询接口幂等 |
| 网络分区 | 消息默认保留24小时 | 需监控堆积告警 |
| 重复投递 | 消息幂等Key(msgKey防重) | 消费者需实现幂等 |
| Broker崩溃 | 同步刷盘+主从复制 | 使用Dledger技术保证高可用 |
生产实践最佳方案
事务表设计(支撑回查)
1
2
3
4
5
6CREATE TABLE trans_log (
msg_key VARCHAR(64) PRIMARY KEY, -- RocketMQ消息Key
biz_id BIGINT NOT NULL, -- 业务主键
status TINYINT DEFAULT 0, -- 0:未知 1:成功 2:失败
create_time TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);消费者幂等
1
2
3
4
5
6
7
8
9consumer.registerMessageListener((MessageListenerOrderly) (msgs, context) -> {
for (MessageExt msg : msgs) {
String msgKey = msg.getKeys();
if(redis.setnx("consumed:"+msgKey, "1", 24h)) {
processMessage(msg); // 业务处理
}
}
return ConsumeOrderlyStatus.SUCCESS;
});监控指标
RMQ_SYS_TRANS_HALF_TOPIC堆积量- 事务回查失败次数(
TransactionCheckTimes) - 消息Commit/Rollback比例
在阿里云双十一中,该方案支撑每秒20万笔交易事务,端到端延迟<500ms。关键点:本地事务与消息必须共用同一个数据源连接(保证原子性)
代码流程详解
executeLocalTransaction 方法调用时机
当生产者调用 sendMessageInTransaction() 发送事务消息时,RocketMQ 会按以下顺序处理:
- 发送半消息:消息先发送到 Broker,但此时对消费者不可见
- 触发本地事务:Broker 确认半消息存储成功后,立即回调生产者的
executeLocalTransaction()方法 - 决定消息命运:根据该方法返回值决定后续操作:
COMMIT→ 提交消息(对消费者可见)ROLLBACK→ 删除消息UNKNOWN→ 进入回查流程
✅ 典型场景:创建订单时同步执行 DB 事务,根据 DB 事务结果返回
COMMIT/ROLLBACK
checkLocalTransaction 方法调用时机
当出现以下情况时,Broker 会主动发起事务状态回查:
- 半消息未决超时:
executeLocalTransaction()返回UNKNOWN后,默认 60秒 触发首次回查 - 生产者断连:生产者宕机/网络断开,导致未返回事务状态
- 定时回查机制:Broker 周期性扫描半消息(默认配置:首次回查间隔 60 秒,后续每 30 秒一次,最多 15 次)
✅ 典型场景:支付超时订单检查,根据订单最终状态决定提交/回滚消息
双方法协作流程
1 | sequenceDiagram |
关键设计解析
| 场景 | 执行方法 | 触发条件 | 业务要求 |
|---|---|---|---|
| 主事务流程 | executeLocalTransaction | 半消息存储成功后立即触发 | 快速执行本地事务 |
| 事务状态恢复 | checkLocalTransaction | 超时/断连/主动返回UNKNOWN | 幂等性设计 + 状态可查 |
| 消息最终一致性 | 双方法协作 | 通过回查机制覆盖异常场景 | 需持久化事务状态 |
生产实践建议
事务状态持久化
在 DB 事务中同步记录事务状态,供回查时使用:1
2
3
4
5
public void createOrder(String orderId) {
orderDao.insert(order); // 业务数据
txLogDao.insert(orderId, "PENDING"); // 事务状态记录
}回查幂等设计
1
2
3
4
5
6
7
8public RocketMQLocalTransactionState checkLocalTransaction(Message msg) {
String orderId = parseMsg(msg);
// 查询事务日志表(非业务表)
TxLog txLog = txLogDao.selectById(orderId);
if (txLog == null) return ROLLBACK;
return "SUCCESS".equals(txLog.getStatus()) ? COMMIT : ROLLBACK;
}超时配置优化(在 Broker 端调整)
1
2
3
4# broker.conf
transactionTimeout=20 # 超时阈值(秒)
transactionCheckMax=5 # 最大回查次数
transactionCheckInterval=30000 # 回查间隔(ms)
⚠️ 注意:避免在
executeLocalTransaction()中执行耗时操作,否则可能因超时触发回查导致重复执行!
