.Net Core工作流WorkFlowCore

news2024/11/26 0:49:29

WorkFlowCore是一个针对.NetCore的轻量级的工作流引擎,提供了FluentAPI、多任务、持久化以及并行处理的功能,适合于小型工作流、责任链的需求开发。支持工作流长期运行,提供了各种持久化方式。

本篇开发环境为.Net7,此处不演示Jsonyaml配置,详细文档请查看官方文档和项目源码地址

 一、安装与基础使用

通过以下命令安装

Install-Package WorkflowCore

然后注入WorkFlowCore

builder.Services.AddWorkflow();

 WorkFlowCore主要分为两部分:步骤工作流

 步骤

 多个步骤组成一个工作流,每个步骤都可以有输入并产生输出,这些输出可以传递回其所在的工作流。通过创建继承抽象类StepBody或StepBodyAsync的类,并且实现Run或RunAsync方法来定义步骤,很明显它们的区别是是否异步

public class FirstStepBody: StepBody
    {
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Console.WriteLine("Hello world!First");
            return ExecutionResult.Next();
        }
    }

工作流

 通过继承IWorkflow接口定义一个工作流,接口只有IdVersionBuild方法(内部可以执行多个步骤),工作流主机使用这些信息来标识工作流

public class MyWorkflow :IWorkflow
    {
        public string Id => "HelloWorld";
        public int Version => 1;
        public void Build(IWorkflowBuilder<object> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Then<FirstStepBody>();
        }
    }

工作流如果想使用必须在工作流主机中通过RegisterWorkflow()方法注册,并且通过Start()方法启动主机,当然也可以通过Stop()方法停止工作流。执行工作流需要使用StartWorkflow()方法,参数为工作流类的Id,如下

 [ApiController]
    [Route("[controller]")]
    public class WeatherForecastController : ControllerBase
    {
        private readonly IWorkflowHost _workflowHost;
        public WeatherForecastController(IWorkflowHost workflowHost)
        {
            _workflowHost = workflowHost;
        }
        [HttpGet(Name = "get")]
        public ContentResult Get()
        {
            if (!_workflowHost.Registry.IsRegistered("HelloWorld",1))
            {
                _workflowHost.RegisterWorkflow<MyWorkflow>();
            }
            _workflowHost.Start();
            _workflowHost.StartWorkflow("HelloWorld");
            //host.Stop();
            return Content("ok");
        }
    }

 当然也可以在构建web服务的时候统一注册,然后就可以直接执行啦

var host = app.Services.GetService<IWorkflowHost>();
host.RegisterWorkflow<MyWorkflow>();
host.Start();

二、在步骤之间传递参数

每个步骤都是一个黑盒,因此它们支持输入和输出。这些输入和输出可以映射到一个数据类,该数据类定义与每个工作流实例相关的自定义数据。

以下示例显示了如何定义步骤的输入和输出,然后显示了如何使用内部数据的类型化类定义工作流,以及如何将输入和输出映射到自定义数据类的属性。

//步骤包含属性,并且计算
    public class FirstStepBody: StepBody
    {
        public int Input1 { get; set; }
        public int Input2 { get; set; }
        public int Output { get; set; }
        public override ExecutionResult Run(IStepExecutionContext context)
        {
            Output = Input1 + Input2;
            Console.WriteLine(Output);
            return ExecutionResult.Next();
        }
    }
    //工作流包含输入输出的赋值
    public class MyWorkflow :IWorkflow<MyDataClass>
    {
        public string Id => "HelloWorld";
        public int Version => 1;
        public void Build(IWorkflowBuilder<MyDataClass> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Input(step => step.Input1,data => data.Value1)
                .Input(step => step.Input2, data => 100)
                .Output(data => data.Answer, step => step.Output)
                .Then<FirstStepBody>()
                .Input(step => step.Input1, data => data.Value1)
                .Input(step => step.Input2, data => data.Answer)
                .Output(data => data.Answer, step => step.Output);
        }
    }
    //工作流的属性类
    public class MyDataClass
    {
        public int Value1 { get; set; }
        public int Value2 { get; set; }
        public int Answer { get; set; }
    }
    //执行工作流传入参数
    MyDataClass myDataClass = new MyDataClass();
    myDataClass.Value1 = 100;
    myDataClass.Value2 = 200;
    //不传入myDataClass则每次执行都是新的数据对象
    _workflowHost.StartWorkflow("HelloWorld", myDataClass);

从上述例子可以看到工作流可以定义一个初始的类作为参数传入,每个步骤可以有自己的属性字段去接收参数(可以是工作流类的字段,也可以是固定值),可以用Input方法传入,Output方法输出赋值。如果在工作流执行时不传入参数每次执行都是新的对象的默认值,比如在StartWorkflow方法中不传myDataClass,运行结果是100100,否则是200300

三、外部事件

工作流可以使用WaitFor方法进行等待,通过外部触发此事件,将事件产生的数据传递给工作流,并且让工作流继续执行下面的步骤。示例如下:

public class MyWorkflow :IWorkflow<MyDataClass>
    {
        //省略。。。。
        public void Build(IWorkflowBuilder<MyDataClass> builder)
        {
            builder
                .StartWith<FirstStepBody>()
                .Input(step => step.Input1,data => data.Value1)
                .Input(step => step.Input2, data => 100)
                .Output(data => data.Answer, step => step.Output)
                .WaitFor("MyEvent",key => "EventKey")
                .Output(data => data.Answer,step => step.EventData)
                .Then<FirstStepBody>()
                .Input(step => step.Input1, data => data.Value1)
                .Input(step => step.Input2, data => data.Answer)
                .Output(data => data.Answer, step => step.Output);
        }
    }
    //。。。
    [HttpGet(Name = "get")]
    public ContentResult Get()
    {
        MyDataClass myDataClass = new MyDataClass();
        myDataClass.Value1 = 100;
        myDataClass.Value2 = 200;
        _workflowHost.StartWorkflow("HelloWorld", myDataClass);
            return Content("ok");
        }
  [HttpPost(Name = "event")]
  public ContentResult PublishEvent()
  {
    _workflowHost.PublishEvent("MyEvent", "EventKey", 200);
    return Content("ok");
  }

 使用WaitFor方法可以使工作流等待监听指定事件的执行,有两个入参事件名称事件关键字。通过工作流主机去触发PublishEvent执行指定的事件,有三个入参触发事件名称触发事件关键字和事件参数

 需要执行事件,工作流才会继续下一步,如下动图演示:

 

  可以为等待事件设置有效时间,在有效时间之前执行事件是不会继续下一步流程的,只有当大于有效时间之后执行事件才会继续下一步步骤。如下代码设置,为工作流执行时间一天后执行事件才会继续执行,否则就等待不动。

WaitFor("MyEvent",key => "EventKey", data => DateTime.Now.AddDays(1))

四、活动

活动被定义为在工作流中可以被等待的外部工作队列中的步骤。

在本例中,工作流将等待活动activity-1,直到活动完成才继续工作流。它还将data.Value1的值传递给活动,然后将活动的结果映射到data.Value2

然后我们创建一个worker来处理活动项的队列。它使用GetPendingActivity方法来获取工作流正在等待的活动和数据。

    //.....
    builder
    .StartWith<FirstStepBody>()
    .Input(step => step.Input1,data => data.Value1)
    .Input(step => step.Input2, data => 100)
    .Output(data => data.Answer, step => step.Output)
    .Activity("activity-1", (data) => data.Value1)
    .Output(data => data.Value2, step => step.Result)
    .Then<FirstStepBody>()
    .Input(step => step.Input1, data => data.Value1)
    .Input(step => step.Input2, data => data.Answer)
    .Output(data => data.Answer, step => step.Output);
    //....
    [HttpPost(Name = "active")]
   public ContentResult PublishEvent()
   {
    var activity = _workflowHost.GetPendingActivity("activity-1", "worker1", TimeSpan.FromMinutes(1)).Result;
    if (activity != null)
    {
      Console.WriteLine(activity.Parameters);
      _workflowHost.SubmitActivitySuccess(activity.Token, 100);
    }
    return Content("ok");
   }

活动可以看作一个等待的步骤可以传入参数和输出参数,和事件的区别是事件不能输入参数而是单纯的等待。

五、错误处理

每个步骤都可以配置自己的错误处理行为,可以在以后重试、挂起工作流或终止工作流。

    public void Build(IWorkflowBuilder<object> builder)
    {
        builder                
            .StartWith<HelloWorld>()
                .OnError(WorkflowErrorHandling.Retry,TimeSpan.FromMinutes(10))
            .Then<GoodbyeWorld>();
    }

工作流的流程控制包括分支、循环等各种操作

决策分支

在工作流中定义多个独立分支,并根据表达式值选择满足条件的分支执行。

使用IWorkflowBuilderCreateBranch方法定义分支。然后我们可以使用branch方法选择一个分支。

选择表达式将与通过branch方法列出的分支相匹配,匹配的分支将安排执行。匹配多个分支将导致并行分支运行。

如果data.Value1的值为1,则此工作流将选择branch1,如果为2,则选择branch2

  var branch1 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 1")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 1");

  var branch2 = builder.CreateBranch()
    .StartWith<PrintMessage>()
        .Input(step => step.Message, data => "hi from 2")
    .Then<PrintMessage>()
        .Input(step => step.Message, data => "bye from 2");
  builder
    .StartWith<HelloWorld>()
    .Decide(data => data.Value1)
        .Branch((data, outcome) => data.Value1 == "one", branch1)
        .Branch((data, outcome) => data.Value1 == "two", branch2);

并行ForEach

使用ForEach方法启动并行for循环

  public class ForEachWorkflow : IWorkflow
  {
      public string Id => "Foreach";
      public int Version => 1;
      public void Build(IWorkflowBuilder<object> builder)
      {
          builder
              .StartWith<SayHello>()
              .ForEach(data => new List<int>() { 1, 2, 3, 4 })
                  .Do(x => x
                      .StartWith<DisplayContext>()
                          .Input(step => step.Message, (data, context) => context.Item)
                      .Then<DoSomething>())
              .Then<SayGoodbye>();
      }        
  }

While循环

使用While方法启动while循环

  public class WhileWorkflow : IWorkflow<MyData>
  {
      public string Id => "While";
      public int Version => 1;
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .While(data => data.Counter < 3)
                  .Do(x => x
                      .StartWith<DoSomething>()
                      .Then<IncrementStep>()
                          .Input(step => step.Value1, data => data.Counter)
                          .Output(data => data.Counter, step => step.Value2))
              .Then<SayGoodbye>();
      }        
  }

If判断

使用If方法执行if判断

  public class IfWorkflow : IWorkflow<MyData>
  { 
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .If(data => data.Counter < 3).Do(then => then
                  .StartWith<PrintMessage>()
                      .Input(step => step.Message, data => "Value is less than 3")
              )
              .If(data => data.Counter < 5).Do(then => then
                  .StartWith<PrintMessage>()
                      .Input(step => step.Message, data => "Value is less than 5")
              )
              .Then<SayGoodbye>();
      }        
  }

并行

使用Parallel方法并行执行任务

  public class ParallelWorkflow : IWorkflow<MyData>
  {
      public string Id => "parallel-sample";
      public int Version => 1;
      public void Build(IWorkflowBuilder<MyData> builder)
      {
          builder
              .StartWith<SayHello>()
              .Parallel()
                  .Do(then => 
                      then.StartWith<Task1dot1>()
                          .Then<Task1dot2>()
                  .Do(then =>
                      then.StartWith<Task2dot1>()
                          .Then<Task2dot2>()
              .Join()
              .Then<SayGoodbye>();
    }        
}

Schedule

使用Schedule方法在工作流中注册在指定时间后执行的异步方法

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Schedule(data => TimeSpan.FromSeconds(5)).Do(schedule => schedule
        .StartWith(context => Console.WriteLine("Doing scheduled tasks"))
    )
    .Then(context => Console.WriteLine("Doing normal tasks"));

使用Recure方法在工作流中设置一组重复的后台步骤,直到满足特定条件为止

builder
    .StartWith(context => Console.WriteLine("Hello"))
    .Recur(data => TimeSpan.FromSeconds(5), data => data.Counter > 5).Do(recur => recur
        .StartWith(context => Console.WriteLine("Doing recurring task"))
    )
    .Then(context => Console.WriteLine("Carry on"));

saga允许在saga transaction中封装一系列步骤,并为每一个步骤提供补偿步骤,使用CompensateWith方法在对应的步骤后面添加补偿步骤,补偿步骤将会在步骤抛出异常的时候触发。

如下示例,步骤Task2如果抛出一个异常,那么补偿步骤UndoTask2UndoTask1将被触发。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
        .CompensateWith<CleanUp>()
    .Then(context => Console.WriteLine("End"));

也可以指定重试策略,在指定时间间隔后重试。

builder
    .StartWith(context => Console.WriteLine("Begin"))
    .Saga(saga => saga
        .StartWith<Task1>()
            .CompensateWith<UndoTask1>()
        .Then<Task2>()
            .CompensateWith<UndoTask2>()
        .Then<Task3>()
            .CompensateWith<UndoTask3>()
    )
    .OnError(Models.WorkflowErrorHandling.Retry, TimeSpan.FromSeconds(5))
    .Then(context => Console.WriteLine("End"));

八、持久化

可以使用RedisMongdbSqlserver等持久化,具体可以看文档,此处使用Redis,先安装nuget

Install-Package WorkflowCore.Providers.Redis

然后注入就可以了

builder.Services.AddWorkflow(cfg =>
{
    cfg.UseRedisPersistence("localhost:6379", "app-name");
    cfg.UseRedisLocking("localhost:6379");
    cfg.UseRedisQueues("localhost:6379", "app-name");
    cfg.UseRedisEventHub("localhost:6379", "channel-name");
    //cfg.UseMongoDB(@"mongodb://mongo:27017", "workflow");
    //cfg.UseElasticsearch(new ConnectionSettings(new Uri("http://elastic:9200")), "workflows");
});

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/844895.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

代码调试3:coco数据集生成退化图

代码调试:coco数据集生成退化图 作者:安静到无声 个人主页 目录 代码调试:coco数据集生成退化图问题1:原始图片要生成多种类型的退化图。问题2:输入尺寸的匹配问题。问题3:如何将缩放后的图片恢复到原始尺寸?遇到灰色图片怎么办。问题4:如何设计出端到端的的程序问题5…

uni-app离线打包高德地图导入android studio不能正常显示

本人使用的uni-app SDK版本&#xff1a;Android-SDK3.8.7.81902_20230704 1.导入以上文件&#xff0c;依赖已经自动添加了 2.确保这个正常引入 3.修改AndroidMainifest.xml,添加自己的密钥

使用gpt对对话数据进行扩增,对话数据扩增,数据增强

我们知道一个问题可以使用很多方式问&#xff0c;但都可以使用完全一样的回答&#xff0c;基于这个思路&#xff0c;我们可以很快的扩增我们的数据集。思路就是使用chatgpt或者gpt4生成类似问题&#xff0c;如下&#xff1a; 然后我们可以工程化这个过程&#xff0c;从而快速扩…

python -- 如何将nc数据中的时间转换为北京时区的时间

在nc数据处理时&#xff0c;以ERA5的小时数据为例&#xff0c;使用的时间为UTC&#xff0c;不同时区存在时间上的差异&#xff0c;如何将其转化为北京当地的时间呢? https://confluence.ecmwf.int/display/CKB/ERA5%3Adatadocumentation #!/usr/bin/env python3 # -*- cod…

uniapp 微信小程序 echarts地图 点击显示类目

效果如图&#xff1a; 在tooltip内axisPointer内添加 label:{show:true} 即可显示“请求离婚”的标题

CMU-CERT内部威胁数据集 r4.2版本介绍

CMU-CERT内部威胁数据集 r4.2版本介绍 一、相关介绍二、CMU-CERT r4.2版本内容三、重大变更 一、相关介绍 “CMU”是卡内基梅隆大学&#xff08;Carnegie Mellon University&#xff09;的简称。 “CERT”是卡内基梅隆大学的一个研究中心叫“CERT”&#xff0c;主要研究内部威…

Acwing.876 快速幂求逆元

题目 给定n组ai ,pi&#xff0c;其中p;是质数,求α;模p;的乘法逆元&#xff0c;若逆元不存在则输出impossible。 输入格式 第一行包含整数n。 接下来n行&#xff0c;每行包含一个数组ai, pi&#xff0c;数据保证p;是质数。 输出格式 输出共n行&#xff0c;每组数据输出一…

【C++】哈希开散列 | unordered系列容器的封装

文章目录 一.开散列1. 开散列的概念2. 开散列结构3. Insert 插入4. Find 查找5. Insert 扩容6. Erase 删除7. 析构函数8. 其它函数接口9. 性能测试 二.封装1. 封装内部结构2. 实现接口 三.代器器1. 迭代器的定义2. 常用接口3. 迭代器4. begin()、end()5. find的改动6. 下标访问…

如果您需要高质量的电源模块,不要犹豫,选择YB5011 非隔离AC-DC!

您是否正在寻找高质量的电源模块&#xff1f;我们昱灿电子推荐YB5011 非隔离AC-DC。它具有广泛的输入电压范围和高达90%的高效率。这款电源模块还配备了多种保护功能&#xff0c;如过载和短路保护&#xff0c;确保您的设备始终处于安全状态。不仅如此&#xff0c;YB5011还采用了…

Linux命令200例:whereis用于搜索以及定位二进制文件

&#x1f3c6;作者简介&#xff0c;黑夜开发者&#xff0c;全栈领域新星创作者✌&#xff0c;阿里云社区专家博主&#xff0c;2023年6月csdn上海赛道top4。 &#x1f3c6;本文已收录于专栏&#xff1a;Linux命令大全。 &#x1f3c6;本专栏我们会通过具体的系统的命令讲解加上鲜…

【从零开始学习JAVA | 第四十一篇】深入JAVA锁机制

目录 前言&#xff1a; 引入&#xff1a; 锁机制&#xff1a; CAS算法&#xff1a; 乐观锁与悲观锁&#xff1a; 总结&#xff1a; 前言&#xff1a; 在多线程编程中&#xff0c;线程之间的协作和资源共享是一个重要的话题。当多个线程同时操作共享数…

windows(iis)服务器部署安装wordpress(php)网站教程

该教程包含iis安装,php安装,mysql安装,php网站部署上线,windows服务部署php网站,只需要这一篇文章就够了。 该教程为iis服务器部署安装wordpress(php)网站教程,同样适用wordpress网站迁移。 配置要求 1、windows服务器安装iis windows服务器安装iis管理器 打开控制面…

最新AI创作系统ChatGPT源码V2.5.8/支持GPT4.0+GPT联网提问/支持ai绘画Midjourney+Prompt+MJ以图生图+思维导图生成!

使用Nestjs和Vue3框架技术&#xff0c;持续集成AI能力到系统&#xff01; 最新版【V2.5.8】更新&#xff1a; 新增 MJ 官方图片重新生成指令功能同步官方 Vary 指令 单张图片对比加强 Vary(Strong) | Vary(Subtle)同步官方 Zoom 指令 单张图片无限缩放 Zoom out 2x | Zoom ou…

2020年09月 Python(一级)真题解析#中国电子学会#全国青少年软件编程等级考试

一、单选题 第1题 Python自带的编程环境是&#xff1f; A&#xff1a;PyScripter B&#xff1a;Spyder C&#xff1a;Notepad D&#xff1a;IDLE 正确的答案是&#xff1a;D Python自带的编程环境是IDLE&#xff08;Integrated Development and Learning Environment&a…

LVS工作环境配置

一、LVS-DR工作模式配置 模拟环境如下&#xff1a; 1台客户机 1台LVS负载调度器 2台web服务器 1、环境部署 &#xff08;1&#xff09;LVS负载调度器 yum install -y ipvsadm # 在LVS负载调度器上进行环境安装 ifconfig ens33:200 192.168.134.200/24 # 配置LVS的VIP…

Synchronized同步锁的优化方法 待完工

Synchronized 和后来出的这个lock锁的区别 在并发编程中&#xff0c;多个线程访问同一个共享资源时&#xff0c;我们必须考虑如何维护数据的原子性。在 JDK1.5 之前&#xff0c;Java 是依靠 Synchronized 关键字实现锁功能来做到这点的。Synchronized 是 JVM 实现的一种内置锁…

【枚举】CF1706 C

有人一道1400写了一个小时 Problem - C - Codeforces 题意&#xff1a; 思路&#xff1a; 首先先去观察样例&#xff1a; 很显然&#xff0c;对于n是奇数的情况&#xff0c;只有一种情况&#xff0c;直接操作偶数位就好了 主要是没搞清楚n是偶数的情况 其实有个小技巧&…

前沿分享-鱼形机器人

可能并不太前沿了&#xff0c;是21年底的新闻了&#xff0c;但是看见了就顺便发一下吧。 大概就是&#xff0c;通过在pH响应型水凝胶中编码不同的膨胀速率而构建了一种环境适应型变形微机器人,让微型机器人直接向癌细胞输送药物从而减轻药物带来副作用。 技术原理是&#xff0c…

第五章 树与二叉树

一、数据结构定义 二叉树的顺序存储结构二叉树的链式存储结构&#xff08;即二叉链表&#xff09;&#xff08;因为有两个指针所以是二叉链表&#xff0c;如果再加一个指向父节点的指针就是三叉链表&#xff09;typedef struct BTNode{ // BT即Binary Tree&#xff0c;二叉树i…

Reinforcement Learning with Code 【Code 4. DQN】

Reinforcement Learning with Code 【Code 4. DQN】 This note records how the author begin to learn RL. Both theoretical understanding and code practice are presented. Many material are referenced such as ZhaoShiyu’s Mathematical Foundation of Reinforcement…