文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

怎么用C#实现SAGA分布式事务

2023-06-28 23:27

关注

这篇文章将为大家详细讲解有关怎么用C#实现SAGA分布式事务,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

背景

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

市面上使用比较多的分布式事务框架,支持 SAGA 的,大部分都是 JAVA 为主的,没有提供 C# 的对接方式,或者是对接难度大,一定程度上让人望而却步。

下面就基于这个框架来实践一下银行转账的例子。

前置工作

dotnet add package Dtmcli --version 0.3.0

成功的 SAGA

先来看一下一个成功完成的 SAGA 时序图。

怎么用C#实现SAGA分布式事务

上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面是两个服务的正向操作和补偿操作的处理。

OutApi

app.MapPost("/api/TransOut", (string branch_id, string gid, string op, TransRequest req) => {    // 进行 数据库操作    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");    return Results.Ok(TransResponse.BuildSucceedResponse());});app.MapPost("/api/TransOutCompensate", (string branch_id, string gid, string op, TransRequest req) =>{    // 进行 数据库操作    Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");    return Results.Ok(TransResponse.BuildSucceedResponse());});

InApi

app.MapPost("/api/TransIn", (string branch_id, string gid, string op, TransRequest req) =>{    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");    return Results.Ok(TransResponse.BuildSucceedResponse());});app.MapPost("/api/TransInCompensate", (string branch_id, string gid, string op, TransRequest req) =>{    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");    return Results.Ok(TransResponse.BuildSucceedResponse());});

注:示例为了简单,没有进行实际的数据库操作。

到此各个子事务的处理已经 OK 了,然后是开启 SAGA 事务,进行分支调用

var userOutReq = new TransRequest() { UserId = "1", Amount = -30 };var userInReq = new TransRequest() { UserId = "2", Amount = 30 };var ct = new CancellationToken();var gid = await dtmClient.GenGid(ct);var saga = new Saga(dtmClient, gid)    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)    .Add(inApi + "/TransIn", inApi + "/TransInCompensate", userInReq)    ;var flag = await saga.Submit(ct);Console.WriteLine($"case1, {gid} saga 提交结果 = {flag}");

到这里,一个完整的 SAGA 分布式事务就编写完成了。

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

怎么用C#实现SAGA分布式事务

当然,上面的情况太理想了,转出转入都是一次性就成功了。

但是实际上我们会遇到许许多多的问题,最常见的应该就是网络故障了。

下面来看一个异常的 SAGA 示例

异常的 SAGA

做一个假设,用户1的转出是正常的,但是用户2在转入的时候出现了问题。

由于事务已经提交给 dtm 了,按照 SAGA 事务的协议,dtm 会重试未完成的操作。

这个时候用户2 这边会出现什么样的情况呢?

转入其实成功了,但是 dtm 收到错误 (网络故障等)转入没有成功,直接告诉 dtm 失败了 (应用异常等)

无论是那一种,dtm 都会进行重试操作。这个时候会发生什么呢?我们继续往下看。

先看一下事务失败交互的时序图

怎么用C#实现SAGA分布式事务

再通过调整上面成功的例子,来比较直观的看看出现的情况。

在 InApi 加多一个转入失败的处理接口

app.MapPost("/api/TransInError", (string branch_id, string gid, string op, TransRequest req) =>{    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作--失败,gid={gid}, branch_id={branch_id}, op={op}");    //return Results.BadRequest();    return Results.Ok(TransResponse.BuildFailureResponse());});

失败的返回有两种,一种是状态码大于 400,一种是状态码是 200 并且响应体包含 FAILURE,上面的例子是第二种

调整一下调用方,把转入正向操作替换成上面这个返回错误的接口。

var saga = new Saga(dtmClient, gid)    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)    .Add(inApi + "/TransInError", inApi + "/TransInCompensate", userInReq);

运行结果如下:

怎么用C#实现SAGA分布式事务

在这个例子中,只考虑补偿/重试成功的情况下。

用户1 转出的 30 块钱最终是回到了他的帐号上,他没有出现损失。

用户2 就有点苦逼了,转入没有成功,返回了失败,还触发了转入的补偿机制,结果就是把用户2 还没进帐的 30 块钱给多扣了,这个就是上面的情况2,常见的空补偿问题。

这个时候就要在进行转入补偿的时候做一系列的判断,转入有没有成功,转出有没有失败等等,把业务变的十分复杂。

如果出现了上述的情况1,会发生什么呢?

用户2 第一次已经成功转入 30 块钱,返回的也是成功,但是网络出了点问题,导致 dtm 认为失败了,它就会进行重试,相当于用户2 还会收到第二个转入 30 块钱的请求!也就是说这次转帐,用户2 会进账 60 块钱,翻倍了,也就是说这个请求不是幂等。

同样的,要处理这个问题,在进行转入的正向操作中也要进行一系列的判断,同样会把复杂度上升一个级别。

前面有提到 dtm 提供了子事务屏障的功能,保证了幂等、空补偿等常见问题。

怎么用C#实现SAGA分布式事务

再来看看这个子事务屏障的功能有没有帮我们简化上面异常处理。

子事务屏障

子事务屏障,需要根据 trans_type,gid,branch_id 和 op 四个内容进行创建。

这4个内容 dtm 在回调时会放在 querysting 上面。

客户端里面提供了 IBranchBarrierFactory 来供我们使用。

空补偿

针对上面的异常情况(用户2 凭空消失 30 块钱),对转入的补偿进行子事务屏障的改造。

app.MapPost("/api/BarrierTransInCompensate", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>{    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);    using var db = Db.GeConn();    await barrier.Call(db, async (tx) =>    {        // 转入失败的情况下,不应该输出下面这个        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】补偿操作,gid={gid}, branch_id={branch_id}, op={op}");        // tx 参数是事务,可和本地事务一起提交回滚        await Task.CompletedTask;    });    Console.WriteLine($"子事务屏障-补偿操作,gid={gid}, branch_id={branch_id}, op={op}");    return Results.Ok(TransResponse.BuildSucceedResponse());});

Call 方法就是关键所在了,需要传入一个 DbConnection 和真正的业务操作,这里的业务操作就是在控制台输出补偿操作的信息。

同样的,我们再调整一下调用方,把转入补偿操作替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)    .Add(inApi + "/TransInError", inApi + "/BarrierTransInCompensate", userInReq)    ;

再来运行这个例子。

怎么用C#实现SAGA分布式事务

会发现转入的补偿操作并没执行,控制台没有输出补偿信息,而是输出了

Will not exec busiCall, isNullCompensation=True, isDuplicateOrPend=False

这个就表明了,这个请求是个空补偿,是不应该执行业务方法的,既空操作。

再来看一下,转入成功的,但是 dtm 收到了失败的信号,不断重试造成重复请求的情况。

幂等

针对用户2 转入两次 30 块钱的异常情况,对转入的正向操作进行子事务屏障的改造。

app.MapPost("/api/BarrierTransIn", async (string branch_id, string gid, string op, string trans_type, TransRequest req, IBranchBarrierFactory factory) =>{    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】请求来了!!! gid={gid}, branch_id={branch_id}, op={op}");    var barrier = factory.CreateBranchBarrier(trans_type, gid, branch_id, op);    using var db = Db.GeConn();    await barrier.Call(db, async (tx) =>    {        var c = Interlocked.Increment(ref _errCount);        // 模拟一个超时执行        if (c > 0 && c < 2) await Task.Delay(10000);        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】正向操作,gid={gid}, branch_id={branch_id}, op={op}");        await Task.CompletedTask;    });    return Results.Ok(TransResponse.BuildSucceedResponse());});

这里通过一个超时执行来让 dtm 进行转入正向操作的重试。

同样的,我们再调整一下调用方,把转入的正向操作也替换成上面带子事务屏障的接口。

var saga = new Saga(dtmClient, gid)    .Add(outApi + "/TransOut", outApi + "/TransOutCompensate", userOutReq)    .Add(inApi + "/BarrierTransIn", inApi + "/BarrierTransInCompensate", userInReq)    ;

再来运行这个例子。

怎么用C#实现SAGA分布式事务

可以看到转入的正向操作确实是触发了多次,第一次实际上是成功,只是响应比较慢,导致 dtm 认为是失败了,触发了第二次请求,但是第二次请求并没有执行业务操作,而是输出了

Will not exec busiCall, isNullCompensation=False, isDuplicateOrPend=True

这个就表明了,这个请求是个重复请求,是不应该执行业务方法的,保证了幂等。

到这里,可以看出,子事务屏障确实解决了幂等和空补偿的问题,大大降低了业务判断的复杂度和出错的可能性。

关于怎么用C#实现SAGA分布式事务就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯