行百里er 行百里er
首页
  • 分类
  • 标签
  • 归档
设计模式
  • JVM
  • Java基础
MySQL
Elastic Stack
Redis
  • Kafka
  • RocketMQ
分布式
Spring Cloud Alibaba
云原生
数据结构与算法
关于
GitHub (opens new window)

行百里er

Java程序员一枚
首页
  • 分类
  • 标签
  • 归档
设计模式
  • JVM
  • Java基础
MySQL
Elastic Stack
Redis
  • Kafka
  • RocketMQ
分布式
Spring Cloud Alibaba
云原生
数据结构与算法
关于
GitHub (opens new window)
  • Kafka

  • RocketMQ

    • 【RocketMQ】RocketMQ入门之闪电三连鞭:消息队列、RocketMQ介绍及安装使用
    • 【RocketMQ】近距离感受RocketMQ如何收发消息,有备而来!
    • 【RocketMQ】基于RocketMQ的分布式事务
      • 2.1 两阶段提交(2PC)
        • 2.1.1 阶段一:准备阶段
        • 2.1.2 阶段二:提交阶段
        • 2.1.3 两阶段提交存在的问题
      • 2.2 三阶段提交(3PC,TCC,补偿事务)
        • 2.2.1 三个阶段
        • 2.2.2 优缺点
      • 3.1 实现方式
        • Half Message(半消息,预处理消息)
        • 检查事务状态
        • 事务消息的三种状态
        • 消息回查
        • 超时
      • 3.2 Show you the code
        • 3.2.1 Producer发送事务性消息
        • 3.2.2 设置监听回查
        • 3.2.3 发送消息
        • 3.2.4 消费事务消息
    • 【RocketMQ】RocketMQ集群,RocketMQ-on-DLedger可容灾集群
  • 消息中间件
  • RocketMQ
行百里er
2020-11-16
目录

【RocketMQ】基于RocketMQ的分布式事务

作者:行百里er

博客:https://chendapeng.cn (opens new window)

提示

这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!

RocketMQ系列第三篇。

前两篇介绍了消息队列及ROcketMQ的基本使用,本次来聊一下基于RocketMQ的分布式事务解决方案。

  • Why分布式事务
  • 分布式事务解决方案
  • 基于RocketMQ的分布式事务
  • 代码实现

# 0x01 为什么有分布式事务

现在很多大公司的项目都拆分为为服务器架构的了,通常每个服务只处理一件事情,部署在一个服务器节点上,不同的服务部署在不同的机器上,这就存在服务之间的相互通信问题。

比如订单服务和支付服务,这里举一个简单的业务流程,创建一个订单之后,向MQ发送消息,支付服务消费消息,调起支付,然后订单服务进行修改订单状态,发货。

如果用户已经支付完成了,但是在处理订单状态环节出现了问题,该怎么办?这个时候消费者方(支付服务)已经把消息消费了,无法回滚了。

所以这两个服务,从创建订单到支付到更新订单状态等一系列的操作必须是原子性的。

这就是分布式系统中涉及到的分布式事务问题。

# 0x02 实现最终一致性的解决思路

# 2.1 两阶段提交(2PC)

Two-phase Commit,简称2PC,两阶段提交。从字面意思就能想到,提交事务时分两个阶段来完成最终事务的提交。

该方案通过引入一个第三方协调者(Coordinator)来协调参与者的行为,并最终决定这些参与者是否要真正执行事务。

# 2.1.1 阶段一:准备阶段

协调者询问参与者事务是否执行成功,参与者发回事务执行结果。

# 2.1.2 阶段二:提交阶段

如果事务在每个参与者上都执行成功,事务协调者发送通知让参与者提交事务;否则,协调者发送通知让参与者回滚事务。

Tip:在准备阶段,参与者执行了事务,但是还未提交。只有在提交阶段接收到协调者发来的通知后,才进行提交或者回滚。

# 2.1.3 两阶段提交存在的问题

  • 同步阻塞 所有事务参与者在等待其它参与者响应的时候都处于同步阻塞状态,无法进行其它操作。
  • 单点问题 协调者在 2PC 中起到非常大的作用,发生故障将会造成很大影响。特别是在阶段二发生故障,所有参与者会一直等待状态,无法完成其它操作。
  • 数据不一致 在阶段二,如果协调者只发送了部分 Commit 消息,此时网络发生异常,那么只有部分参与者接收到 Commit 消息,也就是说只有部分参与者提交了事务,使得系统数据不一致。
  • 太过保守 任意一个节点失败就会导致整个事务失败,没有完善的容错机制。

# 2.2 三阶段提交(3PC,TCC,补偿事务)

Try-Confirm-Cancel,TCC,采用的是补偿机制。其核心思想是:针对每个操作,都要注册一个与其对应的确认和补偿(撤销)操作。

# 2.2.1 三个阶段

  • Try 主要是对业务系统做检测及资源预留
  • Confirm 主要是对业务系统做确认提交,Try阶段执行成功并开始执行Confirm阶段时,默认Confirm阶段是不会出错的。即:只要Try成功,Confirm一定成功。
  • Cancel 主要是在业务执行错误,需要回滚的状态下执行的业务取消,预留资源释放。

# 2.2.2 优缺点

TCC是对2PC的一个改进,try阶段通过预留资源的方式避免了同步阻塞资源的情况;

但是TCC编程需要业务自己实现try,confirm,cancel,对业务入侵太大,实现起来也比较复杂。

# 0x03 基于RocketMQ的分布式事务

RocketMQ支持分布式事务功能,通过RocketMQ事务消息能达到分布式事务的最终一致。

# 3.1 实现方式

# Half Message(半消息,预处理消息)

当Broker收到此类消息后,会存储到RMQ_SYS_TRANS_HALF_TOPIC的消息消费队列中,它暂时不会被Consumer消费。

# 检查事务状态

Broker会开启一个定时任务,消费RMQ_SYS_TRANS_HALF_TOPIC队列中的消息,每次执行任务会向Producer确认事务执行状态(提交、回滚、未知),如果是未知,等待下一次回调。

# 事务消息的三种状态

  • 提交状态:提交事务,它允许消费者消费此消息。
  • 回滚状态:回滚事务,它代表该消息将被删除,不允许被消费。
  • 未知状态:中间状态,它代表需要检查消息队列来确定状态。

# 消息回查

有一种场景,如果发送预备消息成功,执行本地事务成功,但发送确认消息失败;那么问题就来了,因为Producer的业务都已经处理完毕了,就剩下Consumer消费了,但是你commit失败了,Consumer消费不到,这里就出现了数据不一致。

RocketMQ采用消息状态回查来解决这种问题,RocketMQ会定时遍历commitlog中的预备消息。

因为预备消息最终肯定会变为Commit消息或Rollback消息,所以遍历预备消息去回查本地业务的执行状态,如果发现本地业务没有执行成功就Rollback,如果执行成功就发送Commit消息。

# 超时

如果超过回查次数,默认回滚消息。

# 3.2 Show you the code

# 3.2.1 Producer发送事务性消息

RocketMQ的分布式事务,需要生产者发送事务性消息,使用TransactionMQProducer类创建生产者,并指定唯一的ProducerGroup,就可以设置自定义线程池来处理这些检查请求。

执行本地事务后,需要根据执行结果对消息队列进行回复。

生成TransactionMQProducer实例:

TransactionMQProducer producer = new TransactionMQProducer("laopo");
producer.setNamesrvAddr("192.168.2.110:9876");

//处理检查请求的线程池
ExecutorService executorService = new ThreadPoolExecutor(2,
        5,
        100,
        TimeUnit.SECONDS,
        new ArrayBlockingQueue<Runnable>(2000),
        new ThreadFactory() {
            @Override
            public Thread newThread(Runnable r) {
                Thread thread = new Thread(r);
                thread.setName("client-transaction-msg-check-thread");
                return thread;
            }
        });

producer.setExecutorService(executorService);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19

# 3.2.2 设置监听回查

设置监听事务的接口TransactionListener: 当发送半消息成功时,使用executeLocalTransaction方法来执行本地事务,返回前文所述的三种状态之一:提交、回滚、未知。

checkLocalTransaction方法用于检查本地事务状态,并回应消息队列的检查请求,该方法也返回提交、回滚、未知三种状态之一。

//设置回查
producer.setTransactionListener(new TransactionListener() {

    private AtomicInteger transactionIndex = new AtomicInteger(0);
    //用来保存事务的状态
    private ConcurrentHashMap<String, Integer> localTrans = new ConcurrentHashMap<>();
    //半消息发送成功触发此方法来执行本地事务
    @Override
    public LocalTransactionState executeLocalTransaction(Message message, Object o) {
        int value = transactionIndex.getAndIncrement();
        int status = value % 3;
        localTrans.put(message.getTransactionId(), status);

        return LocalTransactionState.UNKNOW;
    }

    //broker将发送检查消息来检查事务状态,并将调用此方法来获取本地事务状态
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt messageExt) {
        Integer status = localTrans.get(messageExt.getTransactionId());
        if (null != status) {
            switch (status) {
                case 0:
                    return LocalTransactionState.UNKNOW;
                case 1:
                    return LocalTransactionState.COMMIT_MESSAGE;
                case 2:
                    return LocalTransactionState.ROLLBACK_MESSAGE;
            }
        }
        return LocalTransactionState.COMMIT_MESSAGE;
    }
});
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33

# 3.2.3 发送消息

调用sendMessageInTransaction来发送消息:

//生产并发送消息
producer.start();
String[] tags = new String[] {"TagA", "TagB", "TagC", "TagD", "TagE"};
for (int i = 0; i < 10; i++) {
    Message msg =
            new Message("girl", tags[i % tags.length], "KEY" + i,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
    //发送事务消息
    SendResult sendResult = producer.sendMessageInTransaction(msg, null);
    System.out.printf("%s%n", sendResult);
    Thread.sleep(10);
}
for (int i = 0; i < 100000; i++) {
    Thread.sleep(1000);
}
//关闭生产者实例
producer.shutdown();
System.out.printf("%s", "已关闭生产者实例");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

运行结果:

# 3.2.4 消费事务消息

之前生产消息生产了TagA、B到TagE的消息,我们这里顺便再验证一下TAG过滤消费,就消费TagB的吧:

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("laopo-consumer");
consumer.setNamesrvAddr("192.168.2.110:9876");

//订阅topic,消费TagB的消息
consumer.subscribe("girl", "TagB");

consumer.registerMessageListener(new MessageListenerConcurrently() {
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
        System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), list);
        // 标记该消息已经被成功消费
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
});
// 启动Consumer实例
consumer.start();
System.out.println("consumer started.");
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17

成功消费了TagB的消息。

本次导航结束,以上。


首发公众号 行百里er ,欢迎老铁们关注阅读指正。

#RocketMQ
上次更新: 2022/10/04, 18:14:30
【RocketMQ】近距离感受RocketMQ如何收发消息,有备而来!
【RocketMQ】RocketMQ集群,RocketMQ-on-DLedger可容灾集群

← 【RocketMQ】近距离感受RocketMQ如何收发消息,有备而来! 【RocketMQ】RocketMQ集群,RocketMQ-on-DLedger可容灾集群→

最近更新
01
重要数据不能丢!MySQL数据库定期备份保驾护航!
05-22
02
分布式事务解决方案之 Seata(二):Seata AT 模式
09-09
03
Seata 番外篇:使用 docker-compose 部署 Seata Server(TC)及 K8S 部署 Seata 高可用
09-05
更多文章>
Theme by Vdoing | Copyright © 2020-2023 行百里er | MIT License | 豫ICP备2022020385号-1
  • 跟随系统
  • 浅色模式
  • 深色模式
  • 阅读模式