Seata 使用调研
有业务部门提出,希望调研下Seata的基本使用方式,为分布式事务选型提供多种选择;
本文主要阐述 Seata基本使用,和koca-dtc对比的差异;
[Spring Cloud Alibaba版本关系表](版本说明 · alibaba/spring-cloud-alibaba Wiki · GitHub)
[Seata 下载](Releases · seata/seata · GitHub)
Seata功能概述
| 功能名称 | 官方文档 | 特点 | 基本原理 | 使用场景 | 存在问题 | 
|---|---|---|---|---|---|
| AT 模式 | AT模式官方文档 | 自动补偿事务,性能较高,开发简单 | 代理数据源,一阶段解析sql将数据写入undo_log表,二阶段根据undo_log表进行回滚 | 支持 ACID 事务的关系型数据库 | 开发时需注意:脏读 | 
| TCC 模式 | TCC模式官方文档 | 手动补偿事务,性能高,开发复杂 | 代理数据源,一阶段执行prepare方法,二阶段执行commit或rollback方法 | 不支持 ACID 事务的数据库 性能要求高场景 | 开发时需注意:空回滚、幂等、悬挂 | 
| Saga 模式 | SAGA模式官方文档 | 性能高,开发复杂 | 基于状态机引擎实现,用户设计状态图,开发rollback方法,状态机根据状态图 json 调用相关方法 | 业务流程长、业务流程多 参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口 | 除了额外开发方法,需设计开发状态图 json | 
| XA 模式 | XA模式官方文档 | 满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,开发简单,性能较低 | 由数据库XA协议完成提交、回滚 | 支持XA 事务的数据库 | 性能较低 | 
Seata快速使用
服务端
1、到Seata Github网站上下载服务端
下载下图所示两个文件:

- seata-server-*.zip:Seata服务端程序
- Source code:Seata源码程序,里面的script很重要,内涵sql脚本,配置中心脚本等,后续有重要用途
2、解压
seata-server-*.zip并启动 seata-server
Windows 启动 seata/bin/seata-server.bat
Mac or Linux 启动 seata/bin/seata-server.sh
客户端
1、
pom.xml添加依赖、application.properties添加配置、数据库导入源码包script/client/at/db/mysql.sql
pom.xml:
<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>
application.properties:
seata.enabled=true
spring.cloud.alibaba.seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.service.grouplist.default=localhost:8091
2、AT 模式
- 全局事务接口入口添加 @GlobalTransactional注解即可
3、TCC 模式
- 全局事务接口入口添加 @GlobalTransactional注解;
- 创建接口类,添加 @LocalTCC注解,接口中含3个方法,其中1个方法添加@TwoPhaseBusinessAction注解,示例如下:
@LocalTCC:RM 的接口上面必须要有@LocalTCC 注解,且必须在接口上面
@TwoPhaseBusinessAction:指明prepare方法,prepare方法执行成功后,Seata 调用的 commitMethod, 失败后,Seata 调用的 rollbackMethod
@BusinessActionContextParameter:被修饰方法参数值,可以在 commitMethod 和 rollbackMethod 中。使用 BusinessActionContext context.getActionContext("paramName") 得到
@LocalTCC
public interface TccAccountService {
    @TwoPhaseBusinessAction(name = "UpdateAccount", commitMethod = "commit", rollbackMethod = "rollback")
    String updateAccount(@BusinessActionContextParameter(paramName = "userId") String userId,
        @BusinessActionContextParameter(paramName = "money") int money);
    boolean commit(BusinessActionContext context);
    boolean rollback(BusinessActionContext context);
}
4、Saga模式
- 需学习Saga状态机
- 借助状态机设计器,开发Saga 状态图,生产状态图 json 文件,熟悉的可直接编写 json文件
- 项目初始化Saga相关配置并运行,示例如下:
Saga配置
@Configuration
public class SeataSagaConfiguration {
    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("SAGA_THREAD_");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }
    @Bean
    public DefaultStateMachineConfig defaultStateMachineConfig(ThreadPoolExecutor threadPoolExecutor) {
        DefaultStateMachineConfig machineConfig = new DefaultStateMachineConfig();
        Resource resource = new ClassPathResource("saga/PayOrder.json");
        machineConfig.setResources(new Resource[]{resource});
        machineConfig.setEnableAsync(true);
        machineConfig.setThreadPoolExecutor(threadPoolExecutor);
        return machineConfig;
    }
    @Bean
    public ProcessCtrlStateMachineEngine stateMachineEngine(DefaultStateMachineConfig defaultStateMachineConfig) {
        ProcessCtrlStateMachineEngine machineEngine = new ProcessCtrlStateMachineEngine();
        machineEngine.setStateMachineConfig(defaultStateMachineConfig);
        return machineEngine;
    }
    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) {
        StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
        engineHolder.setStateMachineEngine(stateMachineEngine);
        return engineHolder;
    }
}
状态图 json
{
  "Name": "PayOrder",
  "Comment": "PayOrder",
  "StartState": "CreateStorage",
  "Version": "0.0.2",
  "States": {
    "CreateStorage": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.StorageService",
      "ServiceMethod": "sagaStorage",
      "CompensateState": "CompensateStorage",
      "Next": "ChoiceState",
      "Input": [
        "$.[businessKey]",
        "$.[commodityCode]",
        "$.[count]",
        {
          "throwException": "$.[mockCreateStorageFail]"
        }
      ],
      "Output": {
        "createStorageResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },"Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger1"
        }
      ]
    },
    "ChoiceState": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[createStorageResult] == true",
          "Next": "CreateOrder"
        }
      ],
      "Default": "CompensationTrigger1"
    },
    "CreateOrder": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.OrderService",
      "ServiceMethod": "sagaOrder",
      "CompensateState": "CompensateOrder",
      "Input": [
        "$.[businessKey]",
        "$.[userId]",
        "$.[commodityCode]",
        "$.[orderCount]",
        {
          "throwException": "$.[mockCreateOrderFail]"
        }
      ],
      "Output": {
        "createOrderResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },
      "Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger2"
        }
      ],
      "Next": "ChoiceState2"
    },
    "ChoiceState2": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[createOrderResult] == true",
          "Next": "Succeed"
        }
      ],
      "Default": "CompensationTrigger2"
    },
    "CompensateStorage": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.StorageService",
      "ServiceMethod": "sagaUndoStorage",
      "Input": [
        "$.[businessKey]",
        "$.[commodityCode]",
        "$.[count]"
      ]
    },
    "CompensateOrder": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.OrderService",
      "ServiceMethod": "sagaUndoOrder",
      "Input": [
        "$.[businessKey]",
        "$.[userId]",
        "$.[commodityCode]",
        "$.[orderCount]"
      ]
    },
    "CompensationTrigger1": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "CompensationTrigger2": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail",
      "ErrorCode": "PURCHASE_ORDER_FAILED",
      "Message": "purchase order failed"
    }
  }
}
运行
@GetMapping(value = "/seata/sagafeign", produces = "application/json")
public String sagafeign() {
  String businessKey = String.valueOf(System.currentTimeMillis());
  HashMap<String, Object> parames = new HashMap<>();
  parames.put("businessKey", businessKey);
  parames.put("commodityCode", COMMODITY_CODE);
  parames.put("orderCount", ORDER_COUNT);
  parames.put("count", ORDER_COUNT);
  parames.put("userId", USER_ID);
  StateMachineInstance instance =
    stateMachineEngine.startWithBusinessKey("PayOrder", null, businessKey, parames);
  if (ExecutionStatus.SU.equals(instance.getStatus())) {
    LOGGER.info("Saga PayOrder完成 Xid:{}", instance.getId());
    return SUCCESS;
  } else {
    LOGGER.info("Saga PayOrder失败 Xid:{}", instance.getId());
    return FAIL;
  }
}
5、XA模式
- 需要支持XA模式的数据库
- 全局事务接口入口添加 @GlobalTransactional注解
- 更换XA模式代理数据源,示例如下
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
  // DataSourceProxy for AT mode
  // return new DataSourceProxy(druidDataSource);
  // DataSourceProxyXA for XA mode
  return new DataSourceProxyXA(druidDataSource);
}
Seata高可用部署
基本步骤:
1、将 Seata 关联至注册中心;
2、将 Seata 依赖的配置,上传至配置中心;多个 seata-server 启动时,会共用相同的配置内容;
3、修改客户端配置文件,关联Seata;启动时,客户端可从注册中心获取到所有 seata-server 地址,并连接上,运行时,会从中选一个可用的,将数据传送过去;
服务端
1、修改Seata配置文件:
seata/conf/registry.conf
- 修改 registry.type = nacos , config.type = nacos
- 修改 registry、config 中 nacos 配置
注: namespace = “” 默认为public,此处笔者在 nacos 创建 seata 命名空间下图所示:
seata/conf/registry.conf 修改后的配置内容:
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"
  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
    cluster = "default"
    username = ""
    password = ""
  }
  ...省略其他内容
}
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"
  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  ...省略其他内容
}
2、github下载对应版本seata源码包,修改
script/config-center/config.txt文件
笔者修改的内容如下:
service.vgroupMapping.seata_sample_group=default  #此处 seata_sample_group 需和客户端配置文件中的对应上
store.mode=db
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456
执行脚本 script/config-center/nacos/nacos-config.sh ,将 config.txt 内容上传至 nacos 配置中心
sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 01d58c83-c43e-47f4-a226-6af44f54c4d0
进入 nacos 查看上传的配置内容,如下图所示:
单机多端口启动:
seata-server.bat -h 127.0.0.1 -p 8091
seata-server.bat -h 127.0.0.1 -p 8092
seata-server.bat -h 127.0.0.1 -p 8093
在 Nacos上可观察到 Seata集群中有3个健康实例,如下图所示:
3、创建数据库,导入
源码包/script/server/db/mysql.sql
客户端
修改 application.yml ,增加如下配置:
seata.enabled=true
seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.registry.type=nacos
seata.registry.nacos.cluster=default
seata.registry.nacos.server-addr=localhost:8848
seata.registry.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0
seata.config.type=nacos
seata.config.nacos.server-addr=localhost:8848
seata.config.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0
启动客户端,观察每个seata-server输出的日志,可观察到客户端发出的请求分散到了每台server上
Seata高可用原理分析
Seata结合注册中心、数据库完成高可用
注册中心主要就是为了让TM,RM动态感知协调者的变更。 在向协调者发送请求的时候,会从注册中心里的协调者列表用负载均衡算法选择一个协调者,然后发送请求。 从这里可以看出协调者必须是对等的, 每个请求发给哪一台协调者都是一样的。
Seata中各角色与注册中心交互图:
KOCA-DTC 与 Seata 对比
| 功能 | KOCA-DTC | SEATA | 说明 | 
|---|---|---|---|
| 自动补偿事务 | TSC模式 | AT模式 | TSC模式:代理数据源完成提交、回滚 存在的问题: 代理数据源假关闭连接,暂时不提交本地事务,但是容易造成数据的死锁。 AT模式:代理数据源,解析sql保存至undo_log表,成功清除undo_log相关数据,失败根据undo_log表数据回滚 存在的问题: 采用undo_log的形式逆向生成sql语句实现回滚,避免死锁现象但是容易出现脏读 | 
| 手动补偿事务 | TCC模式 | TCC模式 | 大体一致,使用上有些区别 | 
| Saga模式 | 无 | Saga模式 | 适用于业务流程长、多的业务 或参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口 | 
| XA模式 | 无 | XA模式 | XA可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性 依赖数据库支持,性能低 | 
| 高可用 | Redis数据库 | 注册中心+Redis或Mysql数据库 | 无 | 
综合分析
自动补偿事务:
- 性能 :AT 模式应比 TSC 模式高;
- 编程复杂度:TSC 模式比 AT 模式简单;
手动补偿事务:
- 二者差不多,使用上有些区别,开发中会遇到相同的难题;
其他:
- Saga 模式,学习难度高,上手困难,除了要构造状态图 json,需要开发手动补偿方法,类似 TCC 模式,TCC 模式存在的问题,Saga 模式也有;
- XA 模式可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,缺点是性能低,需要数据库支持;
附录
空回滚
在没用调用TCC资源Try方法的情况下,调用了第二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用第二阶段的Cacel方法,从而形成空回滚。
解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚。如果没有执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID和分支事务ID,第一阶段Try方法里会插入一条记录,表示一阶段执行了。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。
幂等
通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC的二阶段Try、Confirm和Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。
悬挂
悬挂就是对于一个分布式事务,其二阶段Cancel接口比Try接口先执行。
出现原因是在RPC调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时RPC调用的网络发生拥堵,通常RPC调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就成为悬挂,即业务资源预留后没法继续处理。
解决思路时候如果二阶段执行完成,那一阶段就不能再继续执行。再执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。
Sample说明
1、下载nacos-server-1.4.1、seata-server-1.3.0,运行;
2、创建数据库,导入db.sql文件,修改确认 account-service、business-service、order-service、storage-service 配置文件,运行;
3、调用 business-service 中 com.szkingdom.seata.sample.BusinessController 相关方法即可
 
      
    



