文章详情

短信预约-IT技能 免费直播动态提醒

请输入下面的图形验证码

提交验证

短信预约提醒成功

基于golang的轻量级工作流框架Fastflow

2024-04-02 19:55

关注

Fastflow 是什么?用一句话来定义它:一个 基于golang协程支持水平扩容的分布式高性能工作流框架
它具有以下特点:

为什么要开发 Fastflow

组内有很多项目都涉及复杂的任务流场景,比如离线任务,集群上下架,容器迁移等,这些场景都有几个共同的特点:

流程耗时且步骤复杂,比如创建一个 k8s 集群,需要几十步操作,其中包含脚本执行、接口调用等,且相互存在依赖关系。

任务量巨大,比如容器平台每天都会有几十万的离线任务需要调度执行、再比如我们管理数百个K8S集群,几乎每天会有集群需要上下节点、迁移容器等。

我们尝试过各种解法:

当然 Github 上也还有其他的任务流引擎,我们也都评估过,无法满足需求。比如 kubeflow 是基于 Pod 执行任务的,比起 进程 更为重量,还有一些项目,要么就是没有经过海量数据的考验,要么就是没有考虑可伸缩性,面对大量任务的执行无法水平扩容。

Concept

工作流模型

fastflow 的工作流模型基于 DAG(Directed acyclic graph),下图是一个简单的 DAG 示意图:

在这个图中,首先 A 节点所定义的任务会被执行,当 A 执行完毕后,B、C两个节点所定义的任务将同时被触发,而只有 B、C 两个节点都执行成功后,最后的 D 节点才会被触发,这就是 fastflow 的工作流模型。

工作流的要素

fastflow 执行任务的过程会涉及到几个概念:Dag, Task, Action, DagInstance

Dag

描述了一个完整流程,它的每个节点被称为 Task,它定义了各个 Task 的执行顺序和依赖关系,你可以通过编程 or yaml 来定义它

一个编程式定义的DAG

dag := &entity.Dag{
BaseInfo: entity.BaseInfo{
ID: "test-dag",
},
Name: "test",
Tasks: []entity.Task{
{ID: "task1", ActionName: "PrintAction"},
{ID: "task2", ActionName: "PrintAction", DependOn: []string{"task1"}},
{ID: "task3", ActionName: "PrintAction", DependOn: []string{"task2"}},
},
}

对应的yaml如下:

id: "test-dag"
name: "test"
tasks:
- id: "task1"
actionName: "PrintAction"
- id: ["task2"]
actionName: "PrintAction"
dependOn: ["task1"]
- id: "task3"
actionName: "PrintAction"
dependOn: ["task2"]

同时 Dag 可以定义这个工作流所需要的参数,以便于在各个 Task 去消费它:

id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
filePath:
desc: "the file path"
defaultValue: "/tmp/"
tasks:
- id: "task1"
actionName: "PrintAction"
params:
writeName: "{{fileName}}"
writePath: "{{filePath}}"

Task

它定义了这个节点的具体工作,比如是要发起一个 http 请求,或是执行一段脚本等,这些不同动作都通过选择不同的 Action 来实现,同时它也可以定义在何种条件下需要跳过 or 阻塞该节点。
下面这段yaml演示了 Task 如何根据某些条件来跳过运行该节点。

id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
tasks:
- id: "task1"
actionName: "PrintAction"
preCheck:
- act: skip #you can set "skip" or "block"
conditions:
- source: vars # source could be "vars" or "share-data"
key: "fileName"
op: "in"
values: ["warn.txt", "error.txt"]

Task 的状态有以下几个:

Action

Action 是工作流的核心,定义了该节点将执行什么操作,fastflow携带了一些开箱即用的Action,但是一般你都需要根据具体的业务场景自行编写,它有几个关键属性:

自行开发的 Action 在使用前都必须先注册到 fastflow,如下所示:

type PrintParams struct {
Key string
Value string
}
type PrintAction struct {
}
// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
return "PrintAction"
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
cinput := params.(*ActionParam)
fmt.Println("action start: ", time.Now())
fmt.Println(fmt.Sprintf("params: key[%s] value[%s]", cinput.Key, cinput.Value))
return nil
}
func (a *PrintAction) ParameterNew() interface{} {
return &PrintParams{}
}
func main() {
...
// Register action
fastflow.RegisterAction([]run.Action{
&PrintAction{},
})
...
}

DagInstance

当你开始运行一个 Dag 后,则会为本次执行生成一个执行记录,它被称为 DagInstance,当它生成以后,会由 Leader 实例将其分发到一个健康的 Worker,再由其解析、执行。

实例类型与Module

首先 fastflow 是一个分布式的框架,意味着你可以部署多个实例来分担负载,而实例被分为两类角色:

而不同节点能够承担不同的功能,其背后是不同的 模块 在各司其职,不同节点所运行的模块如下图所示:

NOTE

从上面的图看,Leader 实例会比 Worker 实例多运行一些模块用于执行中仲裁者相关的任务,模块之间的协作关系如下图所示:

其中各个模块的职责如下:

Tips

以上模块的分布机制仅仅只是 fastflow 的默认实现,你也可以自行决定实例运行的模块,比如在 Leader 上不再运行 Worker 的实例,让其专注于任务调度。

GetStart

更多例子请参考项目下面的 examples 目录

准备一个Mongo实例

如果已经你已经有了可测试的实例,可以直接替换为你的实例,如果没有的话,可以使用Docker容器在本地跑一个,指令如下:

docker run -d --name fastflow-mongo --network host mongo

运行 fastflow

运行以下示例

package main
import (
"fmt"
"log"
"time"
"github.com/shiningrush/fastflow"
mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/shiningrush/fastflow/pkg/mod"
mongoStore "github.com/shiningrush/fastflow/store/mongo"
)
type PrintAction struct {
}
// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
return "PrintAction"
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("action start: ", time.Now())
return nil
}
func main() {
// Register action
fastflow.RegisterAction([]run.Action{
&PrintAction{},
})
// init keeper, it used to e
keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-1",
// if your mongo does not set user/pwd, youshould remove it
ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := keeper.Init(); err != nil {
log.Fatal(fmt.Errorf("init keeper failed: %w", err))
}
// init store
st := mongoStore.NewStore(&mongoStore.StoreOption{
// if your mongo does not set user/pwd, youshould remove it
ConnStr: "mongodb://root:pwd@127.0.0.1:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := st.Init(); err != nil {
log.Fatal(fmt.Errorf("init store failed: %w", err))
}
go createDagAndInstance()
// start fastflow
if err := fastflow.Start(&fastflow.InitialOption{
Keeper: keeper,
Store: st,
// use yaml to define dag
ReadDagFromDir: "./",
}); err != nil {
panic(fmt.Sprintf("init fastflow failed: %s", err))
}
}
func createDagAndInstance() {
// wait fast start completed
time.Sleep(time.Second)
// run some dag instance
for i := 0; i < 10; i++ {
_, err := mod.GetCommander().RunDag("test-dag", nil)
if err != nil {
log.Fatal(err)
}
time.Sleep(time.Second * 10)
}
}

程序运行目录下的test-dag.yaml

id: "test-dag"
name: "test"
tasks:
- id: "task1"
actionName: "PrintAction"
- id: "task2"
actionName: "PrintAction"
dependOn: ["task1"]
- id: "task3"
actionName: "PrintAction"
dependOn: ["task2"]

Basic

Task与Task之间的通信

由于任务都是基于 goroutine 来执行,因此任务之间的 context 是共享的,意味着你完全可以使用以下的代码:

func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error {
ctx.WithValue("key", "value")
return nil
}

func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error {
val := ctx.Context().Value("key")
return nil
}

但是注意这样做有个弊端:当节点重启时,如果任务尚未执行完毕,那么这部分内容会丢失。
如果不想因为故障or升级而丢失你的更改,可以使用 ShareData 来传递进行通信,ShareData 是整个 在整个 DagInstance 的生命周期都会共享的一块数据空间,每次对它的写入都会通过 Store 组件持久化,以确保数据不会丢失,用法如下:

func (a *UpAction) Run(ctx run.ExecuteContext, params interface{}) error {
ctx.ShareData().Set("key", "value")
return nil
}

func (a *DownAction) Run(ctx run.ExecuteContext, params interface{}) error {
val := ctx.ShareData().Get("key")
return nil
}

任务日志

fastflow 还提供了 Task 粒度的日志记录,这些日志都会通过 Store 组件持久化,用法如下:

func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error {
ctx.Trace("some message")
return nil
}

使用Dag变量

上面的文章中提到,我们可以在 Dag 中定义一些变量,在创建工作流时可以对这些变量进行赋值,比如以下的Dag,定义了一个名为 `fileName 的变量

id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"

随后我们可以使用 Commander 组件来启动一个具体的工作流:

mod.GetCommander().RunDag("test-id", map[string]string{
"fileName": "demo.txt",
})

这样本次启动的工作流的变量则被赋值为 demo.txt,接下来我们有两种方式去消费它

1.带参数的Action

id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
tasks:
- id: "task1"
action: "PrintAction"
params:
# using {{var}} to consume dag's variable
fileName: "{{fileName}}"

PrintAction.go:

type PrintParams struct {
FileName string `json:"fileName"`
}

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
return "PrintAction"
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
cinput := params.(*ActionParam)

fmt.Println(fmt.Sprintf("params: file[%s]", cinput.FileName, cinput.Value))
return nil
}

func (a *PrintAction) ParameterNew() interface{} {
return &PrintParams{}
}

2.编程式读取
fastflow 也提供了相关函数来获取 Dag 变量

func (a *Action) Run(ctx run.ExecuteContext, params interface{}) error {
// get variable by name
ctx.GetVar("fileName")

// iterate variables
ctx.IterateVars(func(key, val string) (stop bool) {
...
})
return nil
}

分布式锁

如前所述,你可以在直接使用 Keeper 模块提供的分布式锁,如下所示:

...
mod.GetKeeper().NewMutex("mutex key").Lock(ctx.Context(),
mod.LockTTL(time.Second),
mod.Reentrant("worker-key1"))
...

其中:

到此这篇关于基于golang的轻量级工作流框架Fastflow的文章就介绍到这了,更多相关go  Fastflow内容请搜索编程网以前的文章或继续浏览下面的相关文章希望大家以后多多支持编程网!

阅读原文内容投诉

免责声明:

① 本站未注明“稿件来源”的信息均来自网络整理。其文字、图片和音视频稿件的所属权归原作者所有。本站收集整理出于非商业性的教育和科研之目的,并不意味着本站赞同其观点或证实其内容的真实性。仅作为临时的测试数据,供内部测试之用。本站并未授权任何人以任何方式主动获取本站任何信息。

② 本站未注明“稿件来源”的临时测试数据将在测试完成后最终做删除处理。有问题或投稿请发送至: 邮箱/279061341@qq.com QQ/279061341

软考中级精品资料免费领

  • 历年真题答案解析
  • 备考技巧名师总结
  • 高频考点精准押题
  • 2024年上半年信息系统项目管理师第二批次真题及答案解析(完整版)

    难度     813人已做
    查看
  • 【考后总结】2024年5月26日信息系统项目管理师第2批次考情分析

    难度     354人已做
    查看
  • 【考后总结】2024年5月25日信息系统项目管理师第1批次考情分析

    难度     318人已做
    查看
  • 2024年上半年软考高项第一、二批次真题考点汇总(完整版)

    难度     435人已做
    查看
  • 2024年上半年系统架构设计师考试综合知识真题

    难度     224人已做
    查看

相关文章

发现更多好内容

猜你喜欢

AI推送时光机
位置:首页-资讯-后端开发
咦!没有更多了?去看看其它编程学习网 内容吧
首页课程
资料下载
问答资讯