Seata 使用调研
有业务部门提出,希望调研下Seata的基本使用方式,为分布式事务选型提供多种选择;
本文主要阐述 Seata基本使用,和koca-dtc对比的差异;
[Spring Cloud Alibaba版本关系表](版本说明 · alibaba/spring-cloud-alibaba Wiki · GitHub)
[Seata 下载](Releases · seata/seata · GitHub)
Seata功能概述
功能名称 | 官方文档 | 特点 | 基本原理 | 使用场景 | 存在问题 |
---|---|---|---|---|---|
AT 模式 | AT模式官方文档 | 自动补偿事务,性能较高,开发简单 | 代理数据源,一阶段解析sql将数据写入undo_log表,二阶段根据undo_log表进行回滚 | 支持 ACID 事务的关系型数据库 | 开发时需注意:脏读 |
TCC 模式 | TCC模式官方文档 | 手动补偿事务,性能高,开发复杂 | 代理数据源,一阶段执行prepare方法,二阶段执行commit或rollback方法 | 不支持 ACID 事务的数据库 性能要求高场景 |
开发时需注意:空回滚、幂等、悬挂 |
Saga 模式 | SAGA模式官方文档 | 性能高,开发复杂 | 基于状态机引擎实现,用户设计状态图,开发rollback方法,状态机根据状态图 json 调用相关方法 | 业务流程长、业务流程多 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口 |
除了额外开发方法,需设计开发状态图 json |
XA 模式 | XA模式官方文档 | 满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,开发简单,性能较低 | 由数据库XA协议完成提交、回滚 | 支持XA 事务的数据库 | 性能较低 |
Seata快速使用
服务端
1、到Seata Github网站上下载服务端
下载下图所示两个文件:
- seata-server-*.zip:Seata服务端程序
- Source code:Seata源码程序,里面的script很重要,内涵sql脚本,配置中心脚本等,后续有重要用途
2、解压
seata-server-*.zip
并启动 seata-server
Windows 启动 seata/bin/seata-server.bat
Mac or Linux 启动 seata/bin/seata-server.sh
客户端
1、
pom.xml
添加依赖、application.properties
添加配置、数据库导入源码包script/client/at/db/mysql.sql
pom.xml:
<dependency>
<groupId>com.alibaba.cloud</groupId>
<artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
application.properties:
seata.enabled=true
spring.cloud.alibaba.seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.service.grouplist.default=localhost:8091
2、AT 模式
- 全局事务接口入口添加
@GlobalTransactional
注解即可
3、TCC 模式
- 全局事务接口入口添加
@GlobalTransactional
注解; - 创建接口类,添加
@LocalTCC
注解,接口中含3个方法,其中1个方法添加@TwoPhaseBusinessAction
注解,示例如下:
@LocalTCC:RM 的接口上面必须要有@LocalTCC 注解,且必须在接口上面
@TwoPhaseBusinessAction:指明prepare方法,prepare方法执行成功后,Seata 调用的 commitMethod
, 失败后,Seata 调用的 rollbackMethod
@BusinessActionContextParameter:被修饰方法参数值,可以在 commitMethod
和 rollbackMethod
中。使用 BusinessActionContext context.getActionContext("paramName")
得到
@LocalTCC
public interface TccAccountService {
@TwoPhaseBusinessAction(name = "UpdateAccount", commitMethod = "commit", rollbackMethod = "rollback")
String updateAccount(@BusinessActionContextParameter(paramName = "userId") String userId,
@BusinessActionContextParameter(paramName = "money") int money);
boolean commit(BusinessActionContext context);
boolean rollback(BusinessActionContext context);
}
4、Saga模式
- 需学习Saga状态机
- 借助状态机设计器,开发Saga 状态图,生产状态图 json 文件,熟悉的可直接编写 json文件
- 项目初始化Saga相关配置并运行,示例如下:
Saga配置
@Configuration
public class SeataSagaConfiguration {
@Bean
public ThreadPoolExecutor threadPoolExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
executor.setQueueCapacity(100);
executor.setThreadNamePrefix("SAGA_THREAD_");
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
executor.initialize();
return executor.getThreadPoolExecutor();
}
@Bean
public DefaultStateMachineConfig defaultStateMachineConfig(ThreadPoolExecutor threadPoolExecutor) {
DefaultStateMachineConfig machineConfig = new DefaultStateMachineConfig();
Resource resource = new ClassPathResource("saga/PayOrder.json");
machineConfig.setResources(new Resource[]{resource});
machineConfig.setEnableAsync(true);
machineConfig.setThreadPoolExecutor(threadPoolExecutor);
return machineConfig;
}
@Bean
public ProcessCtrlStateMachineEngine stateMachineEngine(DefaultStateMachineConfig defaultStateMachineConfig) {
ProcessCtrlStateMachineEngine machineEngine = new ProcessCtrlStateMachineEngine();
machineEngine.setStateMachineConfig(defaultStateMachineConfig);
return machineEngine;
}
@Bean
public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) {
StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
engineHolder.setStateMachineEngine(stateMachineEngine);
return engineHolder;
}
}
状态图 json
{
"Name": "PayOrder",
"Comment": "PayOrder",
"StartState": "CreateStorage",
"Version": "0.0.2",
"States": {
"CreateStorage": {
"Type": "ServiceTask",
"ServiceName": "com.szkingdom.seata.sample.StorageService",
"ServiceMethod": "sagaStorage",
"CompensateState": "CompensateStorage",
"Next": "ChoiceState",
"Input": [
"$.[businessKey]",
"$.[commodityCode]",
"$.[count]",
{
"throwException": "$.[mockCreateStorageFail]"
}
],
"Output": {
"createStorageResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger1"
}
]
},
"ChoiceState": {
"Type": "Choice",
"Choices": [
{
"Expression": "[createStorageResult] == true",
"Next": "CreateOrder"
}
],
"Default": "CompensationTrigger1"
},
"CreateOrder": {
"Type": "ServiceTask",
"ServiceName": "com.szkingdom.seata.sample.OrderService",
"ServiceMethod": "sagaOrder",
"CompensateState": "CompensateOrder",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[orderCount]",
{
"throwException": "$.[mockCreateOrderFail]"
}
],
"Output": {
"createOrderResult": "$.#root"
},
"Status": {
"#root == true": "SU",
"#root == false": "FA",
"$Exception{java.lang.Throwable}": "UN"
},
"Catch": [
{
"Exceptions": [
"java.lang.Throwable"
],
"Next": "CompensationTrigger2"
}
],
"Next": "ChoiceState2"
},
"ChoiceState2": {
"Type": "Choice",
"Choices": [
{
"Expression": "[createOrderResult] == true",
"Next": "Succeed"
}
],
"Default": "CompensationTrigger2"
},
"CompensateStorage": {
"Type": "ServiceTask",
"ServiceName": "com.szkingdom.seata.sample.StorageService",
"ServiceMethod": "sagaUndoStorage",
"Input": [
"$.[businessKey]",
"$.[commodityCode]",
"$.[count]"
]
},
"CompensateOrder": {
"Type": "ServiceTask",
"ServiceName": "com.szkingdom.seata.sample.OrderService",
"ServiceMethod": "sagaUndoOrder",
"Input": [
"$.[businessKey]",
"$.[userId]",
"$.[commodityCode]",
"$.[orderCount]"
]
},
"CompensationTrigger1": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"CompensationTrigger2": {
"Type": "CompensationTrigger",
"Next": "Fail"
},
"Succeed": {
"Type": "Succeed"
},
"Fail": {
"Type": "Fail",
"ErrorCode": "PURCHASE_ORDER_FAILED",
"Message": "purchase order failed"
}
}
}
运行
@GetMapping(value = "/seata/sagafeign", produces = "application/json")
public String sagafeign() {
String businessKey = String.valueOf(System.currentTimeMillis());
HashMap<String, Object> parames = new HashMap<>();
parames.put("businessKey", businessKey);
parames.put("commodityCode", COMMODITY_CODE);
parames.put("orderCount", ORDER_COUNT);
parames.put("count", ORDER_COUNT);
parames.put("userId", USER_ID);
StateMachineInstance instance =
stateMachineEngine.startWithBusinessKey("PayOrder", null, businessKey, parames);
if (ExecutionStatus.SU.equals(instance.getStatus())) {
LOGGER.info("Saga PayOrder完成 Xid:{}", instance.getId());
return SUCCESS;
} else {
LOGGER.info("Saga PayOrder失败 Xid:{}", instance.getId());
return FAIL;
}
}
5、XA模式
- 需要支持XA模式的数据库
- 全局事务接口入口添加
@GlobalTransactional
注解 - 更换XA模式代理数据源,示例如下
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
// DataSourceProxy for AT mode
// return new DataSourceProxy(druidDataSource);
// DataSourceProxyXA for XA mode
return new DataSourceProxyXA(druidDataSource);
}
Seata高可用部署
基本步骤:
1、将 Seata 关联至注册中心;
2、将 Seata 依赖的配置,上传至配置中心;多个 seata-server 启动时,会共用相同的配置内容;
3、修改客户端配置文件,关联Seata;启动时,客户端可从注册中心获取到所有 seata-server 地址,并连接上,运行时,会从中选一个可用的,将数据传送过去;
服务端
1、修改Seata配置文件:
seata/conf/registry.conf
- 修改 registry.type = nacos , config.type = nacos
- 修改 registry、config 中 nacos 配置
注: namespace = “” 默认为public,此处笔者在 nacos 创建 seata 命名空间下图所示:
seata/conf/registry.conf
修改后的配置内容:
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "nacos"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
cluster = "default"
username = ""
password = ""
}
...省略其他内容
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "nacos"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
group = "SEATA_GROUP"
username = ""
password = ""
}
...省略其他内容
}
2、github下载对应版本seata源码包,修改
script/config-center/config.txt
文件
笔者修改的内容如下:
service.vgroupMapping.seata_sample_group=default #此处 seata_sample_group 需和客户端配置文件中的对应上
store.mode=db
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
执行脚本 script/config-center/nacos/nacos-config.sh
,将 config.txt 内容上传至 nacos 配置中心
sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 01d58c83-c43e-47f4-a226-6af44f54c4d0
进入 nacos 查看上传的配置内容,如下图所示:
单机多端口启动:
seata-server.bat -h 127.0.0.1 -p 8091
seata-server.bat -h 127.0.0.1 -p 8092
seata-server.bat -h 127.0.0.1 -p 8093
在 Nacos上可观察到 Seata集群中有3个健康实例,如下图所示:
3、创建数据库,导入
源码包/script/server/db/mysql.sql
客户端
修改 application.yml
,增加如下配置:
seata.enabled=true
seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.registry.type=nacos
seata.registry.nacos.cluster=default
seata.registry.nacos.server-addr=localhost:8848
seata.registry.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0
seata.config.type=nacos
seata.config.nacos.server-addr=localhost:8848
seata.config.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0
启动客户端,观察每个seata-server输出的日志,可观察到客户端发出的请求分散到了每台server上
Seata高可用原理分析
Seata结合注册中心、数据库完成高可用
注册中心主要就是为了让TM,RM动态感知协调者的变更。 在向协调者发送请求的时候,会从注册中心里的协调者列表用负载均衡算法选择一个协调者,然后发送请求。 从这里可以看出协调者必须是对等的, 每个请求发给哪一台协调者都是一样的。
Seata中各角色与注册中心交互图:
KOCA-DTC 与 Seata 对比
功能 | KOCA-DTC | SEATA | 说明 |
---|---|---|---|
自动补偿事务 | TSC模式 | AT模式 | TSC模式:代理数据源完成提交、回滚 存在的问题: 代理数据源假关闭连接,暂时不提交本地事务,但是容易造成数据的死锁。 AT模式:代理数据源,解析sql保存至undo_log表,成功清除undo_log相关数据,失败根据undo_log表数据回滚 存在的问题: 采用undo_log的形式逆向生成sql语句实现回滚,避免死锁现象但是容易出现脏读 |
手动补偿事务 | TCC模式 | TCC模式 | 大体一致,使用上有些区别 |
Saga模式 | 无 | Saga模式 | 适用于业务流程长、多的业务 或参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口 |
XA模式 | 无 | XA模式 | XA可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性 依赖数据库支持,性能低 |
高可用 | Redis数据库 | 注册中心+Redis或Mysql数据库 | 无 |
综合分析
自动补偿事务:
- 性能 :AT 模式应比 TSC 模式高;
- 编程复杂度:TSC 模式比 AT 模式简单;
手动补偿事务:
- 二者差不多,使用上有些区别,开发中会遇到相同的难题;
其他:
- Saga 模式,学习难度高,上手困难,除了要构造状态图 json,需要开发手动补偿方法,类似 TCC 模式,TCC 模式存在的问题,Saga 模式也有;
- XA 模式可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,缺点是性能低,需要数据库支持;
附录
空回滚
在没用调用TCC资源Try方法的情况下,调用了第二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用第二阶段的Cacel方法,从而形成空回滚。
解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚。如果没有执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID和分支事务ID,第一阶段Try方法里会插入一条记录,表示一阶段执行了。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。
幂等
通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC的二阶段Try、Confirm和Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。
悬挂
悬挂就是对于一个分布式事务,其二阶段Cancel接口比Try接口先执行。
出现原因是在RPC调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时RPC调用的网络发生拥堵,通常RPC调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就成为悬挂,即业务资源预留后没法继续处理。
解决思路时候如果二阶段执行完成,那一阶段就不能再继续执行。再执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
Sample说明
1、下载nacos-server-1.4.1、seata-server-1.3.0,运行;
2、创建数据库,导入db.sql文件,修改确认 account-service、business-service、order-service、storage-service 配置文件,运行;
3、调用 business-service 中 com.szkingdom.seata.sample.BusinessController
相关方法即可