Seata 使用调研

Seata 使用调研

有业务部门提出,希望调研下Seata的基本使用方式,为分布式事务选型提供多种选择;

本文主要阐述 Seata基本使用,和koca-dtc对比的差异;

seata官网

[Spring Cloud Alibaba版本关系表](版本说明 · alibaba/spring-cloud-alibaba Wiki · GitHub)

[Seata 下载](Releases · seata/seata · GitHub)

Seata功能概述

功能名称 官方文档 特点 基本原理 使用场景 存在问题
AT 模式 AT模式官方文档 自动补偿事务,性能较高,开发简单 代理数据源,一阶段解析sql将数据写入undo_log表,二阶段根据undo_log表进行回滚 支持 ACID 事务的关系型数据库 开发时需注意:脏读
TCC 模式 TCC模式官方文档 手动补偿事务,性能高,开发复杂 代理数据源,一阶段执行prepare方法,二阶段执行commit或rollback方法 不支持 ACID 事务的数据库
性能要求高场景
开发时需注意:空回滚、幂等、悬挂
Saga 模式 SAGA模式官方文档 性能高,开发复杂 基于状态机引擎实现,用户设计状态图,开发rollback方法,状态机根据状态图 json 调用相关方法 业务流程长、业务流程多
参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
除了额外开发方法,需设计开发状态图 json
XA 模式 XA模式官方文档 满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,开发简单,性能较低 由数据库XA协议完成提交、回滚 支持XA 事务的数据库 性能较低

Seata快速使用

服务端

1、到Seata Github网站上下载服务端

下载下图所示两个文件:

seata_download

  • seata-server-*.zip:Seata服务端程序
  • Source code:Seata源码程序,里面的script很重要,内涵sql脚本,配置中心脚本等,后续有重要用途

2、解压 seata-server-*.zip 并启动 seata-server

Windows 启动 seata/bin/seata-server.bat

Mac or Linux 启动 seata/bin/seata-server.sh

客户端

1、pom.xml 添加依赖、application.properties 添加配置、数据库导入源码包script/client/at/db/mysql.sql

pom.xml:

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-alibaba-seata</artifactId>
</dependency>

application.properties:

seata.enabled=true
spring.cloud.alibaba.seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.service.grouplist.default=localhost:8091

2、AT 模式

  • 全局事务接口入口添加 @GlobalTransactional 注解即可

3、TCC 模式

  • 全局事务接口入口添加 @GlobalTransactional 注解;
  • 创建接口类,添加 @LocalTCC 注解,接口中含3个方法,其中1个方法添加 @TwoPhaseBusinessAction 注解,示例如下:

@LocalTCC:RM 的接口上面必须要有@LocalTCC 注解,且必须在接口上面

@TwoPhaseBusinessAction:指明prepare方法,prepare方法执行成功后,Seata 调用的 commitMethod, 失败后,Seata 调用的 rollbackMethod

@BusinessActionContextParameter:被修饰方法参数值,可以在 commitMethodrollbackMethod 中。使用 BusinessActionContext context.getActionContext("paramName") 得到

@LocalTCC
public interface TccAccountService {
    @TwoPhaseBusinessAction(name = "UpdateAccount", commitMethod = "commit", rollbackMethod = "rollback")
    String updateAccount(@BusinessActionContextParameter(paramName = "userId") String userId,
        @BusinessActionContextParameter(paramName = "money") int money);

    boolean commit(BusinessActionContext context);

    boolean rollback(BusinessActionContext context);
}

4、Saga模式

  • 需学习Saga状态机
  • 借助状态机设计器,开发Saga 状态图,生产状态图 json 文件,熟悉的可直接编写 json文件
  • 项目初始化Saga相关配置并运行,示例如下:

Saga配置

@Configuration
public class SeataSagaConfiguration {

    @Bean
    public ThreadPoolExecutor threadPoolExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() * 2);
        executor.setQueueCapacity(100);
        executor.setThreadNamePrefix("SAGA_THREAD_");
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
        executor.initialize();
        return executor.getThreadPoolExecutor();
    }

    @Bean
    public DefaultStateMachineConfig defaultStateMachineConfig(ThreadPoolExecutor threadPoolExecutor) {
        DefaultStateMachineConfig machineConfig = new DefaultStateMachineConfig();
        Resource resource = new ClassPathResource("saga/PayOrder.json");
        machineConfig.setResources(new Resource[]{resource});
        machineConfig.setEnableAsync(true);
        machineConfig.setThreadPoolExecutor(threadPoolExecutor);
        return machineConfig;
    }

    @Bean
    public ProcessCtrlStateMachineEngine stateMachineEngine(DefaultStateMachineConfig defaultStateMachineConfig) {
        ProcessCtrlStateMachineEngine machineEngine = new ProcessCtrlStateMachineEngine();
        machineEngine.setStateMachineConfig(defaultStateMachineConfig);
        return machineEngine;
    }

    @Bean
    public StateMachineEngineHolder stateMachineEngineHolder(ProcessCtrlStateMachineEngine stateMachineEngine) {
        StateMachineEngineHolder engineHolder = new StateMachineEngineHolder();
        engineHolder.setStateMachineEngine(stateMachineEngine);
        return engineHolder;
    }
}

状态图 json

{
  "Name": "PayOrder",
  "Comment": "PayOrder",
  "StartState": "CreateStorage",
  "Version": "0.0.2",
  "States": {
    "CreateStorage": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.StorageService",
      "ServiceMethod": "sagaStorage",
      "CompensateState": "CompensateStorage",
      "Next": "ChoiceState",
      "Input": [
        "$.[businessKey]",
        "$.[commodityCode]",
        "$.[count]",
        {
          "throwException": "$.[mockCreateStorageFail]"
        }
      ],
      "Output": {
        "createStorageResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },"Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger1"
        }
      ]
    },
    "ChoiceState": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[createStorageResult] == true",
          "Next": "CreateOrder"
        }
      ],
      "Default": "CompensationTrigger1"
    },
    "CreateOrder": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.OrderService",
      "ServiceMethod": "sagaOrder",
      "CompensateState": "CompensateOrder",
      "Input": [
        "$.[businessKey]",
        "$.[userId]",
        "$.[commodityCode]",
        "$.[orderCount]",
        {
          "throwException": "$.[mockCreateOrderFail]"
        }
      ],
      "Output": {
        "createOrderResult": "$.#root"
      },
      "Status": {
        "#root == true": "SU",
        "#root == false": "FA",
        "$Exception{java.lang.Throwable}": "UN"
      },
      "Catch": [
        {
          "Exceptions": [
            "java.lang.Throwable"
          ],
          "Next": "CompensationTrigger2"
        }
      ],
      "Next": "ChoiceState2"
    },
    "ChoiceState2": {
      "Type": "Choice",
      "Choices": [
        {
          "Expression": "[createOrderResult] == true",
          "Next": "Succeed"
        }
      ],
      "Default": "CompensationTrigger2"
    },
    "CompensateStorage": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.StorageService",
      "ServiceMethod": "sagaUndoStorage",
      "Input": [
        "$.[businessKey]",
        "$.[commodityCode]",
        "$.[count]"
      ]
    },
    "CompensateOrder": {
      "Type": "ServiceTask",
      "ServiceName": "com.szkingdom.seata.sample.OrderService",
      "ServiceMethod": "sagaUndoOrder",
      "Input": [
        "$.[businessKey]",
        "$.[userId]",
        "$.[commodityCode]",
        "$.[orderCount]"
      ]
    },
    "CompensationTrigger1": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "CompensationTrigger2": {
      "Type": "CompensationTrigger",
      "Next": "Fail"
    },
    "Succeed": {
      "Type": "Succeed"
    },
    "Fail": {
      "Type": "Fail",
      "ErrorCode": "PURCHASE_ORDER_FAILED",
      "Message": "purchase order failed"
    }
  }
}

运行

@GetMapping(value = "/seata/sagafeign", produces = "application/json")
public String sagafeign() {
  String businessKey = String.valueOf(System.currentTimeMillis());
  HashMap<String, Object> parames = new HashMap<>();
  parames.put("businessKey", businessKey);
  parames.put("commodityCode", COMMODITY_CODE);
  parames.put("orderCount", ORDER_COUNT);
  parames.put("count", ORDER_COUNT);
  parames.put("userId", USER_ID);
  StateMachineInstance instance =
    stateMachineEngine.startWithBusinessKey("PayOrder", null, businessKey, parames);
  if (ExecutionStatus.SU.equals(instance.getStatus())) {
    LOGGER.info("Saga PayOrder完成 Xid:{}", instance.getId());
    return SUCCESS;
  } else {
    LOGGER.info("Saga PayOrder失败 Xid:{}", instance.getId());
    return FAIL;
  }
}

5、XA模式

  • 需要支持XA模式的数据库
  • 全局事务接口入口添加 @GlobalTransactional 注解
  • 更换XA模式代理数据源,示例如下
@Bean("dataSource")
public DataSource dataSource(DruidDataSource druidDataSource) {
  // DataSourceProxy for AT mode
  // return new DataSourceProxy(druidDataSource);

  // DataSourceProxyXA for XA mode
  return new DataSourceProxyXA(druidDataSource);
}

Seata高可用部署

基本步骤:

1、将 Seata 关联至注册中心;

2、将 Seata 依赖的配置,上传至配置中心;多个 seata-server 启动时,会共用相同的配置内容;

3、修改客户端配置文件,关联Seata;启动时,客户端可从注册中心获取到所有 seata-server 地址,并连接上,运行时,会从中选一个可用的,将数据传送过去;

服务端

1、修改Seata配置文件:seata/conf/registry.conf

  • 修改 registry.type = nacos , config.type = nacos
  • 修改 registry、config 中 nacos 配置

注: namespace = “” 默认为public,此处笔者在 nacos 创建 seata 命名空间下图所示:

seata/conf/registry.conf 修改后的配置内容:

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
    cluster = "default"
    username = ""
    password = ""
  }
  ...省略其他内容
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "01d58c83-c43e-47f4-a226-6af44f54c4d0"
    group = "SEATA_GROUP"
    username = ""
    password = ""
  }
  ...省略其他内容
}

2、github下载对应版本seata源码包,修改 script/config-center/config.txt 文件

笔者修改的内容如下:

service.vgroupMapping.seata_sample_group=default  #此处 seata_sample_group 需和客户端配置文件中的对应上
store.mode=db
store.db.url=jdbc:mysql://127.0.0.1:3306/seata?useUnicode=true
store.db.user=root
store.db.password=123456

执行脚本 script/config-center/nacos/nacos-config.sh ,将 config.txt 内容上传至 nacos 配置中心

sh nacos-config.sh -h 127.0.0.1 -p 8848 -g SEATA_GROUP -t 01d58c83-c43e-47f4-a226-6af44f54c4d0

进入 nacos 查看上传的配置内容,如下图所示:

单机多端口启动:

seata-server.bat -h 127.0.0.1 -p 8091
seata-server.bat -h 127.0.0.1 -p 8092
seata-server.bat -h 127.0.0.1 -p 8093

在 Nacos上可观察到 Seata集群中有3个健康实例,如下图所示:

3、创建数据库,导入 源码包/script/server/db/mysql.sql

客户端

修改 application.yml ,增加如下配置:

seata.enabled=true
seata.tx-service-group=seata_sample_group
seata.service.vgroup-mapping.seata_sample_group=default
seata.registry.type=nacos
seata.registry.nacos.cluster=default
seata.registry.nacos.server-addr=localhost:8848
seata.registry.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0
seata.config.type=nacos
seata.config.nacos.server-addr=localhost:8848
seata.config.nacos.namespace=01d58c83-c43e-47f4-a226-6af44f54c4d0

启动客户端,观察每个seata-server输出的日志,可观察到客户端发出的请求分散到了每台server上

Seata高可用原理分析

Seata结合注册中心、数据库完成高可用

注册中心主要就是为了让TM,RM动态感知协调者的变更。 在向协调者发送请求的时候,会从注册中心里的协调者列表用负载均衡算法选择一个协调者,然后发送请求。 从这里可以看出协调者必须是对等的, 每个请求发给哪一台协调者都是一样的。

Seata中各角色与注册中心交互图:

KOCA-DTC 与 Seata 对比

功能 KOCA-DTC SEATA 说明
自动补偿事务 TSC模式 AT模式 TSC模式:代理数据源完成提交、回滚
存在的问题: 代理数据源假关闭连接,暂时不提交本地事务,但是容易造成数据的死锁。

AT模式:代理数据源,解析sql保存至undo_log表,成功清除undo_log相关数据,失败根据undo_log表数据回滚
存在的问题: 采用undo_log的形式逆向生成sql语句实现回滚,避免死锁现象但是容易出现脏读
手动补偿事务 TCC模式 TCC模式 大体一致,使用上有些区别
Saga模式 Saga模式 适用于业务流程长、多的业务
或参与者包含其它公司或遗留系统服务,无法提供 TCC 模式要求的三个接口
XA模式 XA模式 XA可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性
依赖数据库支持,性能低
高可用 Redis数据库 注册中心+Redis或Mysql数据库

综合分析

自动补偿事务:

  • 性能 :AT 模式应比 TSC 模式高;
  • 编程复杂度:TSC 模式比 AT 模式简单;

手动补偿事务:

  • 二者差不多,使用上有些区别,开发中会遇到相同的难题;

其他:

  • Saga 模式,学习难度高,上手困难,除了要构造状态图 json,需要开发手动补偿方法,类似 TCC 模式,TCC 模式存在的问题,Saga 模式也有;
  • XA 模式可以满足全局数据一致性,AT、TCC、Saga 都是补偿型,无法做到真正的全局一致性,缺点是性能低,需要数据库支持;

附录

空回滚

在没用调用TCC资源Try方法的情况下,调用了第二阶段的Cancel方法,Cancel方法需要识别出这是一个空回滚,然后直接返回成功。
出现原因是当一个分支事务所在服务宕机或网络异常,分支事务调用记录为失败,这个时候其实是没有执行Try阶段,当故障恢复后,分布式事务进行回滚则会调用第二阶段的Cacel方法,从而形成空回滚。
解决思路是关键就是要识别出这个空回滚。思路很简单就是需要知道一阶段是否执行,如果执行了,那就是正常回滚。如果没有执行,那就是空回滚。前面已经说过TM在发起全局事务时生成全局事务记录,全局事务ID贯穿整个分布式事务调用链条。再额外增加一张分支事务记录表,其中有全局事务ID和分支事务ID,第一阶段Try方法里会插入一条记录,表示一阶段执行了。Cancel接口里读取该记录,如果该记录存在,则正常回滚;如果该记录不存在,则是空回滚。

幂等

通过前面介绍已经了解到,为了保证TCC二阶段提交重试机制不会引发数据不一致,要求TCC的二阶段Try、Confirm和Cancel接口保证幂等,这样不会重复使用或者释放资源。如果幂等控制没有做好,很有可能导致数据不一致等严重问题。
解决思路在上述“分支事务记录”中增加执行状态,每次执行前都查询该状态。

悬挂

悬挂就是对于一个分布式事务,其二阶段Cancel接口比Try接口先执行。
出现原因是在RPC调用分支事务try时,先注册分支事务,再执行RPC调用,如果此时RPC调用的网络发生拥堵,通常RPC调用是有超时时间的,RPC超时以后,TM就会通知RM回滚该分布式事务,可能回滚完成后,RPC请求才到达参与者真正执行,而一个Try方法预留的业务资源,只有该分布式事务才能使用,该分布式事务第一阶段预留的业务资源就再也没有人能够处理了,对于这种情况,我们就成为悬挂,即业务资源预留后没法继续处理。
解决思路时候如果二阶段执行完成,那一阶段就不能再继续执行。再执行一阶段事务时判断在该全局事务下,“分支事务记录”表中是否已经有二阶段事务记录,如果有则不执行Try。

Sample说明

示例代码

1、下载nacos-server-1.4.1、seata-server-1.3.0,运行;

2、创建数据库,导入db.sql文件,修改确认 account-service、business-service、order-service、storage-service 配置文件,运行;

3、调用 business-service 中 com.szkingdom.seata.sample.BusinessController 相关方法即可

:+1: :+1: