Seata 手动事务控制详解
在Spring Boot项目中,通过Seata的API实现分布式事务的手动控制是一项关键技能。以下是基于Seata的RootContext管理全局事务ID(XID)以及通过Seata API或编程式事务接口实现分支事务注册、提交和回滚的具体说明与示例。
Seata API 核心功能
- 高级API:全局事务编程
高级API提供了对全局事务的显式控制能力,包括开启、提交、回滚及状态查询等功能,适用于需要精细化管理事务边界的场景。
GlobalTransaction 接口
public interface GlobalTransaction {
void start() throws TransactionException; // 开启事务,默认配置
void start(int timeout) throws TransactionException; // 指定超时时间
void start(int timeout, String name) throws TransactionException; // 指定事务名和超时时间
void finish() throws TransactionException; // 提交事务
void cancel() throws TransactionException; // 回滚事务
GlobalStatus checkStatus() throws TransactionException; // 获取事务状态
String getGlobalId() throws TransactionException; // 获取全局事务ID
}
GlobalTransactionContext 工具类
public class GlobalTransactionContext {
public static GlobalTransaction fetchOrCreate() {
// 获取当前线程绑定的GlobalTransaction实例,若无则创建新实例
GlobalTransaction tx = fetch();
if (tx == null) {
return createNewInstance();
}
return tx;
}
public static GlobalTransaction restore(String xid) throws TransactionException {
// 重新载入指定XID的全局事务实例(仅用于状态查询,不可开启事务)
return new DefaultGlobalTransaction(xid, GlobalStatus.Unknown, GlobalTransactionRole.Launcher) {
@Override
public void start(int timeout, String name) throws TransactionException {
throw new IllegalStateException("Cannot START on a RESTORED GlobalTransaction.");
}
};
}
}
TransactionalTemplate 模板类
public class TransactionalTemplate {
public Object run(TransactionalExecutor businessLogic) throws ExecutionException {
GlobalTransaction tx = GlobalTransactionContext.fetchOrCreate();
try {
tx.start(businessLogic.timeout(), businessLogic.name()); // 开启事务
Object result = businessLogic.execute(); // 执行业务逻辑
tx.finish(); // 提交事务
return result;
} catch (Throwable ex) {
try {
tx.cancel(); // 回滚事务
throw new ExecutionException(tx, Code.RollbackDone, ex);
} catch (TransactionException txe) {
throw new ExecutionException(tx, Code.RollbackFailure, ex);
}
}
}
}
使用示例
// 显式事务控制
GlobalTransaction tx = GlobalTransactionContext.fetchOrCreate();
try {
tx.start(30000, "order_process"); // 开启事务,超时时间30秒
orderService.createOrder(order); // 业务操作1
accountService.updateBalance(order.getUserId(), order.getAmount()); // 业务操作2
tx.finish(); // 提交事务
} catch (Exception e) {
tx.cancel(); // 回滚事务
throw new RuntimeException("Transaction failed", e);
}
// 使用模板方法简化
TransactionalTemplate template = new TransactionalTemplate();
template.run(() -> {
orderService.createOrder(order);
accountService.updateBalance(order.getUserId(), order.getAmount());
return true;
});
- 低级API:分支事务与XID传播
低级API通过RootContext管理事务上下文(XID),支持跨服务的事务传播,适合需要自定义事务传播逻辑的场景。
RootContext 核心方法
public class RootContext {
private static final ThreadLocal<String> CONTEXT_HOLDER = new ThreadLocal<>();
public static String getGlobalId() { // 获取当前线程绑定的XID
return CONTEXT_HOLDER.get();
}
public static void attach(String xid) { // 绑定XID到当前线程
CONTEXT_HOLDER.set(xid);
}
public static String detach() { // 解绑当前线程的XID
return CONTEXT_HOLDER.remove();
}
}
跨服务XID传播实现
- Feign拦截器示例(Spring Cloud Alibaba)
public class SeataFeignInterceptor implements Client {
@Override
public Response execute(Request request, Request.Options options) {
String xid = RootContext.getGlobalId();
if (xid != null) {
Map<String, String> headers = new HashMap<>();
headers.put(RootContext.KEY_XID, xid); // 在请求头中携带XID
// 修改请求并传递XID
Request modifiedRequest = updateRequestHeaders(request, headers);
return delegate.execute(modifiedRequest, options);
}
return delegate.execute(request, options);
}
}
- Dubbo过滤器示例
@Activate(group = {Constants.PROVIDER, Constants.CONSUMER})
public class SeataDubboInterceptor implements Filter {
@Override
public Result invoke(Invoker<?> invoker, Invocation invocation) {
String xid = RootContext.getGlobalId();
if (xid != null) {
RpcContext.getContext().setAttachment(RootContext.KEY_XID, xid); // 在RPC上下文中传递XID
}
return invoker.invoke(invocation);
}
}
使用示例
// 服务A中开启全局事务
GlobalTransaction tx = GlobalTransactionContext.fetchOrCreate();
tx.start(30000, "cross_service_operation");
String xid = tx.getGlobalId();
// 调用服务B(通过Feign/Dubbo传递XID)
RootContext.attach(xid); // 手动绑定XID(若拦截器未自动处理)
serviceB.performTask(); // 服务B内部通过RootContext.getGlobalId()获取XID
RootContext.detach(); // 解绑XID(可选)
// 服务B中注册分支事务
BranchHandler.registerBranch(xid, "service_b_branch");
高级与低级API对比
| 维度 | 高级API | 低级API |
|---|---|---|
| 核心功能 | 全局事务边界控制、提交/回滚、状态查询 | 事务上下文(XID)传播、分支事务管理 |
| 适用场景 | 显式事务控制、复杂业务逻辑编排 | 微服务间XID传递、自定义RPC框架集成 |
| 线程模型 | 基于ThreadLocal的XID绑定 |
直接操作线程上下文 |
| 典型用例 | 订单创建涉及库存、账户、积分等多服务调用 | Feign/Dubbo拦截器实现XID自动传播 |
最佳实践建议
- 优先使用高级API
- 大多数场景下,
GlobalTransaction+TransactionalTemplate的组合可以满足需求。 - 示例:订单服务调用库存服务、账户服务、积分服务时需保证一致性。
- 低级API适用场景
- 自定义RPC框架集成(如gRPC、Thrift)。
- 特殊事务传播需求(如MQ消息消费事务)。
- 示例:RocketMQ消费者处理消息时绑定XID。
- 性能优化
- 避免在高频短事务中频繁调用
start/finish,改用模板方法减少对象创建。 - 监控
RootContext线程绑定开销(可通过异步线程池优化)。
- 异常处理范式
try {
tx.start();
// 业务操作
tx.finish();
} catch (TransactionException te) {
if (tx.checkStatus() == GlobalStatus.CommitFailed) {
// 重试逻辑
} else {
// 告警处理
}
} catch (BusinessException be) {
tx.cancel();
// 业务补偿
}
代码实例
- 手动绑定XID并执行分支事务
import io.seata.core.context.RootContext;
import io.seata.tm.api.GlobalTransaction;
import io.seata.tm.api.GlobalTransactionContext;
@Service
public class OrderService {
@Autowired
private InventoryService inventoryService;
public void placeOrderWithManualTx() {
// 1. 手动开始全局事务(若未通过注解)
GlobalTransaction tx = GlobalTransactionContext.fetchOrCreate();
try {
tx.start(300000, "place_order"); // 超时时间5分钟
String xid = RootContext.getGlobalId();
System.out.println("New Global Transaction XID: " + xid);
// 2. 执行本地事务(订单创建)
Order order = new Order();
order.setUserId("1001");
order.setProductId("P001");
order.setQuantity(10);
// orderMapper.insert(order); // 假设的数据库操作
// 3. 调用其他服务(库存扣减)
inventoryService.decreaseStockManual(order.getProductId(), order.getQuantity(), xid);
// 4. 提交全局事务
tx.finish();
} catch (Exception e) {
// 5. 异常时回滚
tx.cancel();
throw new RuntimeException("Transaction failed", e);
}
}
}
- 库存服务手动参与事务
import io.seata.core.context.RootContext;
@Service
public class InventoryService {
public void decreaseStockManual(String productId, int quantity, String xid) {
// 绑定XID到当前线程
RootContext.attach(xid);
try {
// 执行库存扣减逻辑
// inventoryMapper.updateStock(productId, quantity); // 假设的数据库操作
System.out.println("Stock decreased for product: " + productId);
} finally {
// 解绑XID
RootContext.detach();
}
}
}
关键点说明
- XID传播
- 使用
RootContext.attach(xid)将全局事务ID绑定到当前线程,确保分支事务关联到同一全局事务。 - 跨服务调用时,需通过HTTP头或RPC上下文传递XID(如Feign的
RequestInterceptor)。
- 分支事务管理
- 每个分支事务应在
try-finally中解绑XID,避免线程复用导致事务混乱。 - 若分支事务失败,需抛出异常触发全局回滚。
- 与注解模式对比
- 注解模式(
@GlobalTransactional):声明式,适合简单场景。 - API模式:编程式,适合需要动态控制事务边界的复杂场景(如条件分支、异步调用)。
适用场景
- 动态事务边界:根据运行时条件决定是否参与事务。
- 异步事务处理:在异步回调中手动绑定XID。
- 遗留系统集成:无法修改代码的旧服务通过API参与事务。
注意事项
- 确保XID绑定/解绑在同一个线程内完成。
- 设置合理的超时时间,通过
tx.start(timeout, name)控制。 - 捕获异常并调用
tx.cancel(),避免事务悬挂。