【RocketMQ】基于RocketMQ的分布式事务
作者:行百里er
博客:https://chendapeng.cn (opens new window)
提示
这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!
RocketMQ系列第三篇。
前两篇介绍了消息队列及ROcketMQ的基本使用,本次来聊一下基于RocketMQ的分布式事务解决方案。
- Why分布式事务
- 分布式事务解决方案
- 基于RocketMQ的分布式事务
- 代码实现
# 0x01 为什么有分布式事务
现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。
比如订单服务和支付服务,这里举一个简单的业务流程,创建一个订单之后,向MQ发送消息,支付服务消费消息,调起支付,然后订单服务进行修改订单状态,发货。
如果用户已经支付完成了,但是在处理订单状态环节出现了问题,该怎么办?这个时候消费者方(支付服务)已经把消息消费了,无法回滚了。
所以这两个服务,从创建订单到支付到更新订单状态等一系列的操作必须是原子性的。
这就是分布式系统中涉及到的分布式事务问题。
# 0x02 实现最终一致性的解决思路
# 2.1 两阶段提交(2PC)
Two-phase Commit,简称2PC,两阶段提交。从字面意思就能想到,提交事务时分两个阶段来完成最终事务的提交。
该方案通过引入一个第三方协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。
# 2.1.1 阶段一:准备阶段
协调者询问参与者事务是否执行成功,参与者发回事务执行结果。
# 2.1.2 阶段二:提交阶段
如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。
Tip:在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。
# 2.1.3 两阶段提交存在的问题
- 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
- 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。
- 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
- 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。
# 2.2 三阶段提交(3PC,TCC,补偿事务)
Try-Confirm-Cancel,TCC,采用的是补偿机制。其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。
# 2.2.1 三个阶段
- Try 主要是对业务系统做检测及资源预留
- Confirm 主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
- Cancel 主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。
# 2.2.2 优缺点
TCC是对2PC的一个改进,try阶段通过预留资源的方式避免了同步阻塞资源的情况;
但是TCC编程需要业务自己实现try,confirm,cancel,对业务入侵太大,实现起来也比较复杂。
# 0x03 基于RocketMQ的分布式事务
RocketMQ支持分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致。
# 3.1 实现方式
# Half Message(半消息,预处理消息)
当Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,它暂时不会被Consumer消费。
# 检查事务状态
Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向Producer确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。
# 事务消息的三种状态
- 提交状态:提交事务,它允许消费者消费此消息。
- 回滚状态:回滚事务,它代表该消息将被删除,不允许被消费。
- 未知状态:中间状态,它代表需要检查消息队列来确定状态。
# 消息回查
有一种场景,如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;那么问题就来了,因为Producer的业务都已经处理完毕了,就剩下Consumer消费了,但是你commit失败了,Consumer消费不到,这里就出现了数据不一致。
RocketMQ采用消息状态回查来解决这种问题,RocketMQ会定时遍历commitlog中的预备消息。
因为预备消息最终肯定会变为Commit消息或Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就Rollback,如果执行成功就发送Commit消息。
# 超时
如果超过回查次数,默认回滚消息。
# 3.2 Show you the code
# 3.2.1 Producer发送事务性消息
RocketMQ的分布式事务,需要生产者发送事务性消息,使用TransactionMQProducer
类创建生产者,并指定唯一的ProducerGroup
,就可以设置自定义线程池来处理这些检查请求。
执行本地事务后,需要根据执行结果对消息队列进行回复。
生成TransactionMQProducer实例
:
TransactionMQProducer producer = new TransactionMQProducer("laopo");
producer.setNamesrvAddr("192.168.2.110:9876");
//处理检查请求的线程池
ExecutorService executorService = new ThreadPoolExecutor(2,
5,
100,
TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(2000),
new ThreadFactory() {
@Override
public Thread newThread(Runnable r) {
Thread thread = new Thread(r);
thread.setName("client-transaction-msg-check-thread");
return thread;
}
});
producer.setExecutorService(executorService);
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
# 3.2.2 设置监听回查
设置监听事务的接口TransactionListener
:
当发送半消息成功时,使用executeLocalTransaction
方法来执行本地事务,返回前文所述的三种状态之一:提交、回滚、未知。
checkLocalTransaction
方法用于检查本地事务状态,并回应消息队列的检查请求,该方法也返回提交、回滚、未知三种状态之一。
//设置回查
producer.setTransactionListener(new TransactionListener() {
private AtomicInteger transactionIndex = new AtomicInteger(0);
//用来保存事务的状态
private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
//半消息发送成功触发此方法来执行本地事务
@Override
public LocalTransactionState executeLocalTransaction(Message message, Object o) {
int value = transactionIndex.getAndIncrement();
int status = value % 3;
localTrans.put(message.getTransactionId(), status);
return LocalTransactionState.UNKNOW;
}
//broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
@Override
public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
Integer status = localTrans.get(messageExt.getTransactionId());
if (null != status) {
switch (status) {
case 0:
return LocalTransactionState.UNKNOW;
case 1:
return LocalTransactionState.COMMIT_MESSAGE;
case 2:
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
return LocalTransactionState.COMMIT_MESSAGE;
}
});
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
# 3.2.3 发送消息
调用sendMessageInTransaction
来发送消息:
//生产并发送消息
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
Message msg =
new Message("girl", tags[i % tags.length], "KEY" + i,
("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
//发送事务消息
SendResult sendResult = producer.sendMessageInTransaction(msg, null);
System.out.printf("%s%n", sendResult);
Thread.sleep(10);
}
for (int i = 0; i < 100000; i++) {
Thread.sleep(1000);
}
//关闭生产者实例
producer.shutdown();
System.out.printf("%s", "已关闭生产者实例");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
运行结果:
# 3.2.4 消费事务消息
之前生产消息生产了TagA、B到TagE的消息,我们这里顺便再验证一下TAG过滤消费,就消费TagB的吧:
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laopo-consumer");
consumer.setNamesrvAddr("192.168.2.110:9876");
//订阅topic,消费TagB的消息
consumer.subscribe("girl", "TagB");
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
// 标记该消息已经被成功消费
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 启动Consumer实例
consumer.start();
System.out.println("consumer started.");
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
成功消费了TagB的消息。
本次导航结束,以上。
首发公众号 行百里er ,欢迎老铁们关注阅读指正。