【源码分析】zeebe actor模型源码解读

news2024/12/28 18:36:01

zeebe actor 模型🙋‍♂️

如果有阅读过zeebe 源码的朋友一定能够经常看到actor.run() 之类的语法,那么这篇文章就围绕actor.run 方法,说说zeebe actor 的模型。

环境⛅

zeebe release-8.1.14

actor.run() 是怎么开始的🌈

zeebe actor 模型

LongPollingActivateJobsHandler.java

以LongPollingActivateJobsHandler 的激活任务方法为例,我们可以看到run 方法实际上执行ActorControl类的run 方法,让我们进到run 方法中。

	private ActorControl actor;

    public void activateJobs(final InflightActivateJobsRequest request) {
        actor.run(
                () -> {
                    final InFlightLongPollingActivateJobsRequestsState state =
                            getJobTypeState(request.getType());

                    if (state.shouldAttempt(failedAttemptThreshold)) {
                        activateJobsUnchecked(state, request);
                    } else {
                        completeOrResubmitRequest(request, false);
                    }
                });
    }

ActorControl

可以看到scheduleRunnable 的目标是构造ActorJob,然后将job 添加到ActorTask 中,添加的方式分为insert 和submit。其实到这里我们就可以认为actor.run 就已经结束了,因为insert 和submit 方法主要就是将job 添加到task 的jobQueues 中,对于job 的执行要等到队列不断被线程pop 到当前job 的时候。

	final ActorTask task;
	
    @Override
    public void run(final Runnable action) {
        scheduleRunnable(action);
    }
    private void scheduleRunnable(final Runnable runnable) {
        final ActorThread currentActorThread = ActorThread.current();

        if (currentActorThread != null && currentActorThread.getCurrentTask() == task) {
            final ActorJob newJob = currentActorThread.newJob();
            newJob.setRunnable(runnable);
            newJob.onJobAddedToTask(task);

            // 插入到执行队列
            task.insertJob(newJob);
        } else {
            final ActorJob job = new ActorJob();
            job.setRunnable(runnable);
            job.onJobAddedToTask(task);

            // 提交到外部队列
            // submit 实际上是将task 放到thread group 里边
            task.submit(job);
        }
    }

job 是怎么被执行的⚡

并不是任意一个ActorControl 都可以执行run 方法的,按照上图所示,Actor 会在broker 生命周期开始要进行注册 ,也就是说ActorControl 中的task 会注册到taskQueues。然后“线程池”不断从taskQueues 中pop 出task,每个task 中又会有多个job,按照策略选取不同的job 执行,我们可以认为job 就是actor.run(Runnable runnable) 中的runnable。

Gateway.java

gateway 注册task

  
  private CompletableFuture<ActivateJobsHandler> submitActorToActivateJobs(
      final ActivateJobsHandler handler) {
    final var future = new CompletableFuture<ActivateJobsHandler>();
    final var actor =
        Actor.newActor()
            .name("ActivateJobsHandler")
            .actorStartedHandler(handler.andThen(t -> future.complete(handler)))
            .build();
	
	// 将task 注册到TaskQueues
    actorSchedulingService.submitActor(actor);
    return future;
  }

ActorThreadGroup.java

就是上面提到的“线程池”,负责初始化每一条ActorThread 线程,并为其分配默认的WorkStealingGroup

	protected final String groupName;
    protected final ActorThread[] threads;
    protected final WorkStealingGroup tasks;
    protected final int numOfThreads;

	// 构造器,初始化每条线程,并为其分配一个默认的WorkStealingGroup 任务队列
    public ActorThreadGroup(
            final String groupName, final int numOfThreads, final ActorSchedulerBuilder builder) {
        this.groupName = groupName;
        this.numOfThreads = numOfThreads;

        tasks = new WorkStealingGroup(numOfThreads);

        threads = new ActorThread[numOfThreads];

        for (int t = 0; t < numOfThreads; t++) {
            final String threadName = String.format("%s-%d", groupName, t);
            final ActorThread thread =
                    builder
                            .getActorThreadFactory()
                            .newThread(
                                    threadName,
                                    t,
                                    this,
                                    tasks,
                                    builder.getActorClock(),
                                    builder.getActorTimerQueue(),
                                    builder.isMetricsEnabled());

            threads[t] = thread;
        }
    }
	
	// start
    public void start() {
        for (final ActorThread actorThread : threads) {

            // 启动每一个ActorThread
            actorThread.start();
        }
    }

ActorThread.java

ActorThread 继承自Thread,可以看到start=>run=>doWork 的引用流程,在doWork 方法中,首先从taskScheduler 中获取当前task,然后执行当前task


	// 继承自Thread 
    @Override
    public synchronized void start() {
        if (UNSAFE.compareAndSwapObject(
                this, STATE_OFFSET, ActorThreadState.NEW, ActorThreadState.RUNNING)) {
            
            // super.start 会执行下面的run 方法
            super.start();
        } else {
            throw new IllegalStateException("Cannot start runner, not in state 'NEW'.");
        }
    }

	// 主要执行doWork 方法
    @Override
    public void run() {
        idleStrategy.init();
		
        while (state == ActorThreadState.RUNNING) {
            try {
                doWork();
            } catch (final Exception e) {
                LOG.error("Unexpected error occurred while in the actor thread {}", getName(), e);
            }
        }

        state = ActorThreadState.TERMINATED;

        terminationFuture.complete(null);
    }
	private void doWork() {
        submittedCallbacks.drain(this);

        if (clock.update()) {
            timerJobQueue.processExpiredTimers(clock);
        }

		// 从taskScheduler 中获取当前task
        currentTask = taskScheduler.getNextTask();

        if (currentTask != null) {
            final var actorName = currentTask.actor.getName();
            try (final var timer = actorMetrics.startExecutionTimer(actorName)) {

                // 执行当前任务
                executeCurrentTask();
            }
            if (actorMetrics.isEnabled()) {
                actorMetrics.updateJobQueueLength(actorName, currentTask.estimateQueueLength());
                actorMetrics.countExecution(actorName);
            }
        } else {
            idleStrategy.onIdle();
        }
    }

	private void executeCurrentTask() {
        final var properties = currentTask.getActor().getContext();
        MDC.setContextMap(properties);
        idleStrategy.onTaskExecuted();

        boolean resubmit = false;

        try {
			// 真正执行当前任务
            resubmit = currentTask.execute(this);
        } catch (final Throwable e) {
            FATAL_ERROR_HANDLER.handleError(e);
            LOG.error("Unexpected error occurred in task {}", currentTask, e);
        } finally {
            MDC.remove("actor-name");
            clock.update();
        }

        if (resubmit) {
            currentTask.resubmit();
        }
    }

ActorTask.java

ActorTask 的执行流程,它会不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll(),从这里可以看到submittedJobs 和fastlaneJobs 的区别!

找到job 后开始执行当前job


	public boolean execute(final ActorThread runner) {
        schedulingState.set(TaskSchedulingState.ACTIVE);

        boolean resubmit = false;

        // 不断从订阅的列表中拉取job,poll 方法会更新当前currentJob, 如果一次逻辑执行中从fastlaneJobs 中poll 到了任务,那么currentJob != null 会短路返回true,而不进行poll()
        while (!resubmit && (currentJob != null || poll())) {
            currentJob.execute(runner);

            switch (currentJob.schedulingState) {
                case TERMINATED -> {
                    final ActorJob terminatedJob = currentJob;

                    // 从fastlaneJobs任务集合中拉取任务
                    currentJob = fastLaneJobs.poll();

                    // 如果是通过订阅触发的任务
                    if (terminatedJob.isTriggeredBySubscription()) {
                        final ActorSubscription subscription = terminatedJob.getSubscription();

                        // 如果订阅是一次性的,那么在订阅触发后则将订阅移除
                        if (!subscription.isRecurring()) {
                            removeSubscription(subscription);
                        }

                        // 执行订阅的回调任务
                        subscription.onJobCompleted();
                    } else {
                      
                        runner.recycleJob(terminatedJob);
                    }
                }
                case QUEUED ->
                    // the task is experiencing backpressure: do not retry it right now, instead re-enqueue
                    // the actor task.
                    // this allows other tasks which may be needed to unblock the backpressure to run
                        resubmit = true;
                default -> {
                }
            }

            if (shouldYield) {
                shouldYield = false;
                resubmit = currentJob != null;
                break;
            }
        }

        if (currentJob == null) {
            resubmit = onAllJobsDone();
        }

        return resubmit;
    }
    private boolean poll() {
        boolean result = false;

        result |= pollSubmittedJobs();
        result |= pollSubscriptions();

        return result;
    }

ActorJob.java

ActorJob 的执行逻辑

还记得上面说过ActorJob 可以理解为runnable 的吗,在invoke 中ActorJob 中的runnable 真正执行了,至此job 的执行过程结束

	void execute(final ActorThread runner) {
        actorThread = runner;
        observeSchedulingLatency(runner.getActorMetrics());
        try {

            // 执行actor 的 callable 或者 runnable 方法
            invoke();

            if (resultFuture != null) {
                resultFuture.complete(invocationResult);
                resultFuture = null;
            }

        } catch (final Throwable e) {
            FATAL_ERROR_HANDLER.handleError(e);
            task.onFailure(e);
        } finally {
            actorThread = null;

            // 无论那种情况,成功或者失败,都要判断是否job 应该被resubmitted
            // in any case, success or exception, decide if the job should be resubmitted
            if (isTriggeredBySubscription() || runnable == null) {
                schedulingState = TaskSchedulingState.TERMINATED;
            } else {
                schedulingState = TaskSchedulingState.QUEUED;
                scheduledAt = System.nanoTime();
            }
        }
    }
    
    private void invoke() throws Exception {
        if (callable != null) {
            invocationResult = callable.call();
        } else {
            // only tasks triggered by a subscription can "yield"; everything else just executes once
            if (!isTriggeredBySubscription()) {
                final Runnable r = runnable;
                runnable = null;
                r.run();
            } else {
            	// runnable 真正执行
                runnable.run();
            }
        }
    }

总结📝

本文中的激活例子其实只是列举了Actor 的实现原理,想一想文中提到的功能用一个真正的线程池可以很好的解决。但是actor模型 的特性远不仅如此,对于其他特性在zeebe 中是如何实现的还请读者自己去挖掘🤏~

zeebe 团队真的是太喜欢functional programming了,找一个方法的调用链头都大了😅

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/957019.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

APP出海推广前要做哪些事?

一个移动应用APP从开发完成到成功出海&#xff0c;中间要经历哪些过程&#xff1f;上架应用商店、数据对接、各渠道推广要做些什么&#xff1f; 1、上架应用市场 对于想要出海的APP开发商来说&#xff0c;谷歌应用商店Google Play和苹果应用商店Apple APP Store是主要的发行渠…

QT 插件化图像算法研究平台

因工作和生活需要&#xff0c;做了一个插件化的图像算法研究平台。 相关技术如下&#xff1a; 一、插件化 实现了基本的插件框架&#xff0c;能载入插件、加载菜单。 主程序只有插件载入功能。 主窗体、其它任何功能及窗体均由各种插件提供。 二、Opencv相关插件&#xf…

JVM的故事——虚拟机类加载机制

虚拟机类加载机制 文章目录 虚拟机类加载机制一、概述二、类加载的时机三、类加载的过程四、类加载器 一、概述 本章将要讲解class文件如何进入虚拟机以及虚拟机如何处理这些class文件。Java虚拟机把class文件加载到内存&#xff0c;并对数据进行校验、转换解析和初始化&#…

Qt+C++桌面计算器源码

程序示例精选 QtC桌面计算器源码 如需安装运行环境或远程调试&#xff0c;见文章底部个人QQ名片&#xff0c;由专业技术人员远程协助&#xff01; 前言 这篇博客针对<<QtC桌面计算器源码>>编写代码&#xff0c;代码整洁&#xff0c;规则&#xff0c;易读。 学习与…

素数之谜揭秘:一文详解试除法判断素数

这是我非常喜欢的一道编程题目。不要小看这道题&#xff0c;它看似简单&#xff0c;实则奥妙无穷。由于这是C语言的入门篇&#xff0c;只介绍最简单&#xff0c;也最容易想到的方法&#xff1a;试除法。但哪怕是试除法&#xff0c;也有不少变化。 要想了解试除法&#xff0c;首…

Flowable7 设计器

1、flowable7 已经在主版本上移除了Flowable UI相关的包&#xff0c;包含bpm-json相关的所有包和流程设计器相关前端文件。 2、flowable7 版本目前只保留了xml运行相关的包&#xff0c;ui modeler已经移除 3、目前官方给的回复是只能在 flowable 云产品上使用设计器&#xff…

进程的组成:PCB、程序段、数据段

进程的组成:PCB、程序段、数据段 什么是进程 在操作系统中,进程是资源分配和程序执行的基本单位,它是操作系统动态执行的一个程序。 进程是一个动态的概念,当一个程序运行时,它就是一个进程,进程需要相应的系统资源:内存、CPU、文件等等,以保证其能够正确运行。对于同一个程…

短线炒股必杀技

一、短线交易入门基础 1.什么是短线 短线交易博取的是短期差价收益&#xff0c;一般不太关心股票的业绩和潜质&#xff0c;只关心个股近期是否会上涨&#xff0c;具体涨多少。短线投资者以技术派为主&#xff0c;主要依据技术图表进行分析。一般短线投资者的通常持股周期是以几…

设计师都去哪些网站找样机素材

在当今的设计领域&#xff0c;3D样机素材已经成为一个重要的领域。3D样机素材可以让设计师更好地展示他们的设计理念和概念&#xff0c;也可以帮助客户更好地理解设计。为了帮助设计师更容易地创建3D样机素材&#xff0c;以下是我推荐的10个易于使用的3D样机素材网站。 即时设…

那个学C++不没有点大病?一点点癫狂的语法混乱版note和有一点点长的无语的标题,让人怀疑精神状态尼奥

类型转换 切勿混用无符号类型和有符号类型 表达式中两者都有时&#xff0c;有符号类型会转化为无符号类型&#xff0c;当该值为负时会出现非预期结果&#xff1b; unsigned a 1; int b -1; cout<<a*b;//输出 4294967295 //详解: b的源码&#xff1a;100...1 负数转补…

PL端DDR4读写测试实验(未完成)

文章目录 DDR4介绍实验过程编写XDC使用IP核上板验证TODO 参考 DDR4介绍 开发板PL有一颗16bit的DDR4。 先说明硬件信号&#xff08;按该芯片&#xff09;&#xff1a; 信号名说明DQData input/output&#xff0c;双向数据线&#xff08;这个芯片是x16的&#xff0c;使用DQ[15…

SpringBoot初级开发--多环境配置的集成(9)

在Springboot的开发中&#xff0c;我们经常要切换各种各样的环境配置&#xff0c;比如现在是开发环境&#xff0c;然后又切换到生产环境&#xff0c;这个时候用多环境配置就是一个明智的选择。接下来我们沿用上一章的工程来配置多环境配置工程。 1.准备多环境配置文件 这里我…

时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比

时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比 目录 时序预测 | MATLAB实现基于PSO-GRU、GRU时间序列预测对比效果一览基本描述程序设计参考资料 效果一览 基本描述 MATLAB实现基于PSO-GRU、GRU时间序列预测对比。 1.MATLAB实现基于PSO-GRU、GRU时间序列预测对比&…

批量剪辑工具:轻松垂直翻转倒立视频画面

你是否曾经遇到这样的情况&#xff1a;拍摄的视频画面是倒立的&#xff0c;但你需要在正立的情况下观看。这时候&#xff0c;你需要一款视频批量剪辑工具来帮助你垂直翻转倒立的视频画面。 首先第一步&#xff0c;我们要打开【视频剪辑高手】&#xff0c;登录账号。 第二步&…

偏置曲柄滑块机构连杆上的双尖点轨迹

偏置曲柄滑块机构是一种常见的机械传动机构&#xff0c;由曲柄、偏置滑块和连杆组成。其中&#xff0c;偏置滑块具有急回特性&#xff0c;可以使机构在运动过程中产生快速的反向运动。 偏置曲柄滑块机构中&#xff0c;连杆上的双尖点轨迹指的是连杆在偏置曲柄滑块机构的运动过…

MOS的减速加速电路设计

引言&#xff1a;在开始讲解MOS的减速加速电路之前&#xff0c;我们还是先来回顾MOS开启与关闭的根本机制。以NMOS为例&#xff0c;开启NMOS本质是对G极进行充电&#xff0c;至Cgs电荷充满&#xff0c;G极才会达到控制端电平值或者开启阈值&#xff0c;关断NMOS时&#xff0c;G…

嵌入式开发之syslog和rsyslog构建日志记录

1.syslogd作客户端 BusyBox v1.20.2 (2022-04-06 16:19:14 CST) multi-call binary.Usage: syslogd [OPTIONS]System logging utility-n Run in foreground-O FILE Log to FILE (default:/var/log/messages)-l N Log only messages more urge…

QT DAY 2

window.cpp #include "window.h" #include<QDebug> #include<QIcon> Window::Window(QWidget *parent) //构造函数的定义: QWidget(parent) //显性调用父类的构造函数 {//this->resize(430,330);this->resize(QSize(800,600));// this…

音视频入门基础理论知识

文章目录 前言一、视频1、视频的概念2、常见的视频格式3、视频帧4、帧率5、色彩空间6、采用 YUV 的优势7、RGB 和 YUV 的换算 二、音频1、音频的概念2、采样率和采样位数①、采样率②、采样位数 3、音频编码4、声道数5、码率6、音频格式 三、编码1、为什么要编码2、视频编码①、…

无涯教程-Android - CheckBox函数

CheckBox是可以由用户切换的on/off开关。为用户提供一组互不排斥的可选选项时,应使用复选框。 CheckBox 复选框属性 以下是与CheckBox控件相关的重要属性。您可以查看Android官方文档以获取属性的完整列表以及可以在运行时更改这些属性的相关方法。 继承自 android.widget.T…