分布式事务解决方案之 Seata(二):Seata AT 模式
作者:行百里er
博客:https://chendapeng.cn (opens new window)
提示
这里是 行百里er 的博客:行百里者半九十,凡事善始善终,吾将上下而求索!
# 前言
通过上一篇文章对分布式事务解决方案的介绍,我们已经对 两阶段提交 、TCC 及 基于MQ的最终一致性 有所了解了。
Seata 提供了 AT
、TCC
、SAGA
和 XA
事务模式,他是一站式的分布式解决方案。
本文将先介绍 Seata 的 AT
模式,他是基于 两阶段提交 的演变。
Seata AT 模式是一种 非侵入式
的分布式事务解决方案,在 AT 模式下,我们只需关注自己的 业务 SQL
,业务 SQL
作为一阶段,Seata 框架会自动生成事务的二阶段提交和回滚操作。
Seata 在内部做了对数据库操作的代理层,我们使用 Seata AT 模式时,实际上用的是 Seata 自带的数据源代理 DataSourceProxy
,Seata 在这层代理中加入了很多逻辑,比如插入回滚 undo_log
日志,检查全局锁
等。
# Seata AT 模式整体机制
前面说过,AT
模式是 两阶段提交协议 的演变,其实现机制为:
一阶段:业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。
二阶段:
- 提交异步化,非常快速地完成。
- 回滚通过一阶段的回滚日志进行反向补偿。
在 一阶段
中,Seata 会拦截业务 SQL,首先解析 SQL 语义,找到要更新的业务数据,在数据被更新前,保存下来放到 undo_log
表,然后执行业务SQL更新数据,更新之后再次保存数据 redo
,最后生成行锁,这些操作都在 本地数据库事务
内完成,这样保证了一阶段的 原子性。
相对 一阶段
,二阶段
比较简单,负责 整体的回滚和提交 :
- 如果在一阶段中的事务全部执行通过,那么执行全局提交;
- 如果之前的一阶段中有本地事务没有通过,那么就执行全局回滚,回滚用到的就是一阶段记录的
undo_log
,通过回滚记录生成反向更新SQL并执行,以完成分支的回滚。
Seata 术语:
TC (Transaction Coordinator) - 事务协调者:维护全局和分支事务的状态,驱动全局事务提交或回滚。
TM (Transaction Manager) - 事务管理器:定义全局事务的范围:开始全局事务、提交或回滚全局事务。
RM (Resource Manager) - 资源管理器:管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。
当然事务完成后会释放所有资源和删除所有日志。undo_log
表稍后我们会演示观察记录。
# 实战演示 Seata AT 模式解决分布式事务问题
案例提供两个服务 seata-order-service
和 seata-ware-service
,订单服务实现创建订单业务,业务包括扣减库存和新增订单。
扣减库存是通过 OpenFeign 进行远程调用仓库服务,通过操作数据库 seata-ware
的表 t_ware
进行库存量减一操作,执行 update
语句;而创建订单则是操作另一个数据库 seata-order
的表 t_order
,执行 insert
语句。
也就是说这两个服务操作了两个数据库,有可能会产生分布式事务的问题。
# 分布式事务问题的产生
先看两个服务分别执行 SQL 操作的代码。
仓库服务:
DAO:
@Mapper
public interface WareMapper extends BaseMapper<Ware> {
@Update("update t_ware set stock=stock-1 where sku_id=#{skuId}")
void deductStock(Long skuId);
}
2
3
4
5
Service:
@Service
@Slf4j
public class WareServiceImpl extends ServiceImpl<WareMapper, Ware> implements WareService {
@Autowired
private WareMapper wareMapper;
@Override
public void deductStock(Long skuId) {
log.info("开始扣减库存,skuId={}", skuId);
wareMapper.deductStock(skuId);
}
}
2
3
4
5
6
7
8
9
10
11
12
13
Controller:
@RestController
@RequestMapping("/ware")
public class WareController {
@Autowired
private WareService wareService;
@GetMapping("/deduct")
public void deductStock(@RequestParam Long skuId) {
wareService.deductStock(skuId);
}
}
2
3
4
5
6
7
8
9
10
11
12
订单服务:
DAO:
@Mapper
public interface OrderMapper extends BaseMapper<Order> {
}
2
3
4
新增订单的 insert
语句,直接使用 Mybatis-Plus 提供的默认实现:
FeignClient:
@FeignClient("seata-ware-service")
public interface WareFeignClient {
@GetMapping("/ware/deduct")
void deductStock(@RequestParam(value = "skuId") Long skuId);
}
2
3
4
5
6
Service:
@Service
@Slf4j
public class OrderServiceImpl extends ServiceImpl<OrderMapper, Order> implements OrderService {
@Autowired
private OrderMapper orderMapper;
@Autowired
private WareFeignClient wareFeignClient;
@Override
@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
log.info("开始扣减库存,skuId={}", order.getSkuId());
// 扣减库存
wareFeignClient.deductStock(order.getSkuId());
log.info("扣减库存完成,skuId={}", order.getSkuId());
// 订单号
String orderSn = IdWorker.getTimeId();
order.setOrderSn(orderSn);
order.setCreateTime(new Date());
log.info("开始创建订单:{}", order);
log.error("此处添加异常order.getId()此时为null,模拟分布式事务出现:{}", order.getId().toString());
// 创建订单
orderMapper.insert(order);
log.info("创建订单完成");
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
Service 中先远程调用执行减库存,然后在插入订单之前模拟一个异常出现:
order.getId().toString()
此时还未执行 insert
, order.getId()
为 null
,所以此处会出现异常,因此下面的 insert
语句就不会继续执行了,而前面的减库存操作却已经执行成功,库存减了,订单未增加,这样就出现了分布式事务的问题。
用 Spring 的 @Transactional 注解看一下能否解决此问题,即看一下数据库的数据是否一致。
数据库数据初始状态:
调用创建订单接口 http://localhost:8007/order/create
:
按照我们预先设置的异常,该接口出现异常了,我们来看一下数据库数据的变化:
从数据库中的数据可以看到,即使我们在业务接口上加了
@Transactional(rollbackFor = Exception.class)
注解,也对分布式事务没有办法解决,数据最终还是不一致,因为库存扣减了订单却没有相应的增加。
# 使用 Seata 的 AT 模式解决分布式事务问题
从前面的案例我们已经得知,Spring 的 @Transactional 并不能解决分布式事务的问题,我们就以 Seata 提供的方案来处理。Seata 解决分布式事务的默认模式就是 AT
模式。
1, 引入 Seata 依赖
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
2
3
4
2, 涉及到分布式事务的服务数据库均新建 undo_log
表:
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
2
3
4
5
6
7
8
9
10
11
12
3, 在两个微服务的 application.yml
配置文件分别加入 Seata 的配置:
seata:
tx-service-group: default_tx_group
service:
vgroup-mapping:
default_tx_group: default
registry:
type: nacos
nacos:
server-addr: 192.168.242.112:81
namespace: 5a4e4c1f-beda-4ae5-a3d7-428950e7473b
group: SEATA_GROUP
config:
type: nacos
nacos:
data-id: seataServer.properties
server-addr: 192.168.242.112:81
namespace: 5a4e4c1f-beda-4ae5-a3d7-428950e7473b
group: SEATA_GROUP
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
这里有几个配置:
seata.tx-service-group
:事务分组。该配置要和 Seata Server 中配置的一致,由于我这里是用 Nacos 作为配置中心,配置的dataId
为seataServer.properties
,所以可以直接在 Nacos 界面上查看该配置:seata.service.vgroupMapping.事务分组名称
:该配置项的值为 TC 集群名称,根据上图可以看到此处的值应为default
。seata.registry.xx
:注册中心,这里选择的是 Nacos 。seata.config.xx
:配置中心,这里也是 Nacos 。
4, 在 TM 端,使用 @GlobalTransactional 开启全局事务:
@Override
@GlobalTransactional
//@Transactional(rollbackFor = Exception.class)
public void createOrder(Order order) {
log.info("开始扣减库存,skuId={}", order.getSkuId());
// 扣减库存
wareFeignClient.deductStock(order.getSkuId());
log.info("扣减库存完成,skuId={}", order.getSkuId());
// 订单号
String orderSn = IdWorker.getTimeId();
order.setOrderSn(orderSn);
order.setCreateTime(new Date());
log.info("开始创建订单:{}", order);
log.error("此处添加异常order.getId()此时为null,模拟分布式事务出现:{}", order.getId().toString());
// 创建订单
orderMapper.insert(order);
log.info("创建订单完成");
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
好了,经过以上几步,我们先恢复数据库数据的值为初始值,然后再次测试。
数据已恢复至初始值:
再次执行接口,发现执行完成以后并没有达到想要的事务回滚的效果,通过服务日志看到一直再打印如下日志:
transaction [192.168.242.16:8091:18317606214187586] current status is [RollbackRetrying]
Seata Server 端也有日志:
此时看一下 undo_log
表:
种种迹象都在说该事务在尝试回滚,but,就是一直回滚不成功,再看一下微服务的日志,可以看到有这样一个提示:
reason:[Branch session rollback failed and try again later xid = 192.168.242.16:8091:18317606214181627 branchId = 18317606214181630 Class cannot be created (missing no-arg constructor): java.time.LocalDateTime
这是 Seata 的一个 Bug,详细的 Issue 见:
https://github.com/seata/seata/issues/3620 (opens new window)
该 bug 在 1.4.2
版本提供了 SPI 扩展接口,可以自定义一个序列化类,具体做法是:
1, 新建一个专门序列化 java.time.LocalDateTime
类型的类:
package cn.chendapeng.springcloud.seatawareservice.utils;
import com.fasterxml.jackson.databind.JsonDeserializer;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.datatype.jsr310.deser.LocalDateTimeDeserializer;
import com.fasterxml.jackson.datatype.jsr310.ser.LocalDateTimeSerializer;
import io.seata.rm.datasource.undo.parser.spi.JacksonSerializer;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
/**
* seata LocalDateTime 序列化扩展点
*
* 博客:https://chendapeng.cn - 行百里者半九十,凡事善始善终,吾将上下而求索!
* 公众号:行百里er
*
* @author 行百里者
* @date 2022-09-02 21:17
*/
public class LocalDateTimeJacksonSerializer implements JacksonSerializer<LocalDateTime> {
public static final String NORM_DATETIME_MS_PATTERN = "yyyy-MM-dd HH:mm:ss.SSS";
@Override
public Class<LocalDateTime> type() {
return LocalDateTime.class;
}
@Override
public JsonSerializer<LocalDateTime> ser() {
return new LocalDateTimeSerializer(DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN));
}
@Override
public JsonDeserializer<? extends LocalDateTime> deser() {
return new LocalDateTimeDeserializer(DateTimeFormatter.ofPattern(NORM_DATETIME_MS_PATTERN));
}
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
2, 在 resources
目录下新建 META-INF/seata
文件夹,并在其下新增 io.seata.rm.datasource.undo.parser.spi.JacksonSerializer
文件,文件内容为:
cn.chendapeng.springcloud.seatawareservice.utils.LocalDateTimeJacksonSerializer
两个微服务均要如此做。
然后我们再来调用一下 http://localhost:8007/order/create
,调用完成后,
2022-09-08 14:28:56.551 INFO 3992 --- [nio-8008-exec-1] c.c.s.s.service.impl.WareServiceImpl : 开始扣减库存,skuId=10086
2022-09-08 14:28:56.576 INFO 3992 --- [nio-8008-exec-1] i.s.c.rpc.netty.RmNettyRemotingClient : will register resourceId:jdbc:mysql://192.168.242.112:3306/seata-ware
2022-09-08 14:28:56.584 INFO 3992 --- [ctor_RMROLE_1_1] io.seata.rm.AbstractRMHandler : the rm client received response msg [version=1.4.2,extraData=null,identified=true,resultCode=null,msg=null] from tc server.
2022-09-08 14:28:56.787 INFO 3992 --- [nio-8008-exec-1] i.s.r.d.u.parser.JacksonUndoLogParser : jackson undo log parser load [cn.chendapeng.springcloud.seatawareservice.utils.LocalDateTimeJacksonSerializer].
2022-09-08 14:29:57.071 INFO 3992 --- [h_RMROLE_1_1_16] i.s.c.r.p.c.RmBranchRollbackProcessor : rm handle branch rollback process:xid=192.168.242.16:8091:18318220201103576,branchId=18318220201103579,branchType=AT,resourceId=jdbc:mysql://192.168.242.112:3306/seata-ware,applicationData=null
2022-09-08 14:29:57.075 INFO 3992 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacking: 192.168.242.16:8091:18318220201103576 18318220201103579 jdbc:mysql://192.168.242.112:3306/seata-ware
2022-09-08 14:29:57.187 INFO 3992 --- [h_RMROLE_1_1_16] i.s.r.d.undo.AbstractUndoLogManager : xid 192.168.242.16:8091:18318220201103576 branch 18318220201103579, undo_log deleted with GlobalFinished
2022-09-08 14:29:57.189 INFO 3992 --- [h_RMROLE_1_1_16] io.seata.rm.AbstractRMHandler : Branch Rollbacked result: PhaseTwo_Rollbacked
2
3
4
5
6
7
8
再次查看数据:
数据一致,库存没有减,订单没有增。
# AT 模式工作机制分析
以上面的案例来分析 AT 模式的工作机制。
库存表 seata-ware.t_ware
:
mysql> describe t_ware;
+-------------+----------+------+-----+---------+----------------+
| Field | Type | Null | Key | Default | Extra |
+-------------+----------+------+-----+---------+----------------+
| id | bigint | NO | PRI | NULL | auto_increment |
| sku_id | bigint | YES | | NULL | |
| stock | int | YES | | NULL | |
| create_time | datetime | YES | | NULL | |
| update_time | datetime | YES | | NULL | |
+-------------+----------+------+-----+---------+----------------+
2
3
4
5
6
7
8
9
10
AT 分支事务的业务逻辑是:
@Update("update t_ware set stock=stock-1, update_time=now() where sku_id=#{skuId}")
void deductStock(Long skuId);
2
具体的 SQL 执行语句:
update t_ware set stock=stock-1,update_time=now() where sku_id=10086
# 执行一阶段
该阶段的执行过程:
1, 解析 SQL:得到 SQL 的类型(UPDATE),表(product),条件(where name = 'TXC')等相关的信息。
2, 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
select id,sku_id,stock,create_time,update_time from t_ware where sku_id=10086
得到执行前的镜像:
id | sku_id | stock | create_time | update_time |
---|---|---|---|---|
1 | 10086 | 1000 | 2022-09-01 17:14:16 | 2022-09-01 17:14:16 |
3, 执行业务 SQL:更新这条记录的 stock 为 999(stock=stock-1)。
4, 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
select id,sku_id,stock,create_time,update_time from t_ware where id=1
得到执行后的镜像:
id | sku_id | stock | create_time | update_time |
---|---|---|---|---|
id | 10086 | 999 | 2022-09-01 17:14:16 | 2022-09-08 14:28:49 |
5, 插入回滚日志表,把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 undo_log
表中。
{
"@class": "io.seata.rm.datasource.undo.BranchUndoLog",
"xid": "192.168.242.16:8091:18318220201103576",
"branchId": 18318220201103579,
"sqlUndoLogs": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.undo.SQLUndoLog",
"sqlType": "UPDATE",
"tableName": "t_ware",
"beforeImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_ware",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": -5,
"value": [
"java.lang.Long",
1
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "stock",
"keyType": "NULL",
"type": 4,
"value": 1000
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "update_time",
"keyType": "NULL",
"type": 93,
"value": [
"java.time.LocalDateTime",
"2022-09-01 17:14:16.000"
]
}
]
]
}
]
]
},
"afterImage": {
"@class": "io.seata.rm.datasource.sql.struct.TableRecords",
"tableName": "t_ware",
"rows": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Row",
"fields": [
"java.util.ArrayList",
[
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "id",
"keyType": "PRIMARY_KEY",
"type": -5,
"value": [
"java.lang.Long",
1
]
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "stock",
"keyType": "NULL",
"type": 4,
"value": 999
},
{
"@class": "io.seata.rm.datasource.sql.struct.Field",
"name": "update_time",
"keyType": "NULL",
"type": 93,
"value": [
"java.time.LocalDateTime",
"2022-09-08 14:28:49.000"
]
}
]
]
}
]
]
}
}
]
]
}
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
6, 提交前,向 TC 注册分支:申请 t_ware
表中,主键值等于 1 的记录的 全局锁 。
7, 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
8, 将本地事务提交的结果上报给 TC。
# 执行二阶段-回滚
1, 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作;
2, 通过 XID
和 Branch ID
查找到相应的 UNDO LOG 记录;
3, 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改;
4, 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句:
update t_ware set stock = 1000, update_time='2022-09-01 17:14:16' where id = 1;
5, 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
# 执行二阶段-提交
1, 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC;
2, 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。
执行完成后,undo_log
表相应的记录被删除:
# 小结
使用 Seata 解决分布式事务问题时,默认开启的就是 AT 模式,该模式是一种 无侵入的分布式事务解决方案
,具体实现机制为:
一阶段,Seata 会拦截
业务 SQL
,首先解析 SQL 语义,找到业务 SQL
要更新的业务数据,在业务数据被更新前,将其保存成before image
,然后执行业务 SQL
更新业务数据,在业务数据更新之后,再将其保存成after image
,最后生成行锁。以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。二阶段,分为 提交 和 回滚 两种情况:
- 提交的情况:因为
业务 SQL
在一阶段已经提交至数据库, 所以 Seata 只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可。 - 回滚的情况:Seata 需要回滚一阶段已经执行的
业务 SQL
,还原业务数据。回滚方式就是用before image
还原业务数据;但在还原前要首先要校验 脏写 ,对比数据库当前业务数据
和after image
,如果两份数据完全一致就说明没有脏写,可以还原业务数据,如果不一致就说明有脏写,出现脏写就需要转 人工处理 。
关于出现脏写的现象,可以模拟出来,比如当执行完业务 SQL 后,手动再去修改一次数据库中的值,这样 after image 中的值和数据库中的值就不一样了,这就出现了脏写的现象。
- 提交的情况:因为
从以上实现机制可以看出,不管是提交还是回滚,均有 Seata 完成,我们只需要安心写我们的业务SQL即可,这就是所谓的 无侵入
。
首发公众号 行百里er ,欢迎各位关注阅读指正。