Spring Boot集成Akka Cluster实现在分布式节点中执行任务

news2024/9/19 7:59:26

1.写在前面

前面已经写过akka的很多文章了,具体如下:

  • Spring Boot集成akka actor快速入门Demo
  • Spring Boot集成Akka Stream快速入门Demo
  • Spring Boot集成Akka remoting快速入门Demo
  • Spring Boot集成Akka Cluster快速入门Demo

今天主要讲一下如何在一个akka集群环境中提交任务并在集群中执行

2.代码工程

pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>springboot-demo</artifactId>
        <groupId>com.et</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>akka</artifactId>

    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>


    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-autoconfigure</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- Akka Streams -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-stream_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>
        <!-- Akka Actor dependency -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-actor-typed_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>

        <!-- Akka Remote dependency -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-remote_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>
        <!-- Akka Cluster dependency -->
        <dependency>
            <groupId>com.typesafe.akka</groupId>
            <artifactId>akka-cluster-typed_2.13</artifactId>
            <version>2.6.0</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
        </dependency>
    </dependencies>

</project>

MasterActor

 

  • 作用MasterActor 是系统中的主控制器或协调者。它负责管理和分配任务,监控工作进度,以及处理系统的全局状态。
  • 功能
    • 任务分配:接收任务请求并将这些任务分发给工作节点(WorkActor)。
    • 协调工作:协调多个 WorkActor 的工作,确保任务按预期执行。
    • 结果汇总:汇总来自 WorkActor 的结果,可能还会对结果进行处理或存储。
    • 监控与容错:监控 WorkActor 的状态,处理异常情况,进行故障恢复等。

 

package com.et.akka.cluster;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.actor.ActorRef;
import akka.event.Logging;
import akka.event.LoggingAdapter;

import java.util.HashMap;
import java.util.Map;

public class MasterActor extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);
    private final ActorRef workerRouter;

    // Constructor to initialize the worker router
    public MasterActor(ActorRef workerRouter) {
        this.workerRouter = workerRouter;
    }

    public static Props props(ActorRef workerRouter) {
        return Props.create(MasterActor.class, workerRouter);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(TaskMessage.class, msg -> {
                    log.info("Received task message: {}", msg.task);
                    workerRouter.tell(msg, getSelf()); // Forward task to worker router
                })
                .build();
    }
}

WorkerRouterActor

 

  • 作用WorkerRouterActor 是一个路由器,负责将任务分发给多个 WorkActor 实例。它通常用于负载均衡和高效地管理工作负载。
  • 功能
    • 任务路由:根据路由策略将任务分发给一个或多个 WorkActor 实例。常见的路由策略包括轮询、随机、最少工作量等。
    • 负载均衡:确保工作负载在所有 WorkActor 实例中均匀分布。
    • 动态调整:可以根据系统负载动态调整 WorkActor 的数量,进行扩展或收缩。

 

package com.et.akka.cluster;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Terminated;
import akka.routing.RoundRobinPool;
import akka.routing.Router;

public class WorkerRouterActor extends AbstractActor {
    private final ActorRef router;

    public WorkerRouterActor(int numberOfWorkers) {
        this.router = getContext().actorOf(new RoundRobinPool(numberOfWorkers).props(WorkerActor.props()), "workerRouter");
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(TaskMessage.class, msg -> router.tell(msg, getSelf()))
                .match(Terminated.class, t -> getContext().stop(getSelf()))
                .build();
    }

    public static Props props(int numberOfWorkers) {
        return Props.create(WorkerRouterActor.class, numberOfWorkers);
    }
}

WorkActor

 

  • 作用WorkActor 是实际执行任务的工作单元。它负责处理具体的工作负载,并返回结果给 MasterActor 或通过 WorkerRouterActor 进行汇总。
  • 功能
    • 执行任务:接收任务并执行相关操作,如计算、数据处理等。
    • 报告结果:完成任务后,将结果发送回 MasterActor 或其他负责汇总结果的组件。
    • 错误处理:处理任务执行过程中可能出现的错误或异常情况。

 

package com.et.akka.cluster;

import akka.actor.AbstractActor;
import akka.actor.Props;
import akka.event.Logging;
import akka.event.LoggingAdapter;

public class WorkerActor extends AbstractActor {
    private final LoggingAdapter log = Logging.getLogger(getContext().getSystem(), this);

    public static Props props() {
        return Props.create(WorkerActor.class);
    }

    @Override
    public Receive createReceive() {
        return receiveBuilder()
                .match(TaskMessage.class, msg -> {
                    log.info("Processing task: {}", msg.task);
                    // Simulate task processing
                    Thread.sleep(1000);
                    log.info("Task completed: {}", msg.task);
                })
                .build();
    }
}

ClusterApp2

package com.et.akka.cluster;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.cluster.Cluster;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;

public class ClusterApp2 {

    public static void main(String[] args) {
        // Load configuration
        Config config = ConfigFactory.load();
        ActorSystem system = ActorSystem.create("ClusterSystem", config);

        // Create WorkerRouterActor with 5 workers
        ActorRef workerRouter = system.actorOf(WorkerRouterActor.props(5), "workerRouter");

        // Create MasterActor
        ActorRef masterActor = system.actorOf(MasterActor.props(workerRouter), "masterActor");

        // Log cluster membership
        Cluster cluster = Cluster.get(system);
        System.out.println("Cluster initialized with self member: " + cluster.selfAddress());

        // Submit tasks
        masterActor.tell(new TaskMessage("Task 1"), ActorRef.noSender());
        masterActor.tell(new TaskMessage("Task 2"), ActorRef.noSender());
        masterActor.tell(new TaskMessage("Task 3"), ActorRef.noSender());

        // Keep system alive for demonstration purposes
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        
        system.terminate();
    }
}

只是一些关键代码,所有代码请参见下面代码仓库

代码仓库

  • GitHub - Harries/springboot-demo: a simple springboot demo with some components for example: redis,solr,rockmq and so on.(akka)

3.测试

启动ClusterApp2类之中的main方法,查看日志,发现任务执行成功

Cluster initialized with self member: akka://ClusterSystem@127.0.0.1:2551
22:03:04.948 [ClusterSystem-akka.actor.default-dispatcher-15] INFO com.et.akka.cluster.MasterActor - Received task message: Task 1
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 1
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 2
22:03:04.949 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.MasterActor - Received task message: Task 3
22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 2
22:03:04.950 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Processing task: Task 3
22:03:05.951 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 3
22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 2
22:03:05.952 [ClusterSystem-akka.actor.default-dispatcher-16] INFO com.et.akka.cluster.WorkerActor - Task completed: Task 1

4.引用

  • https://doc.akka.io/docs/akka/current/common/cluster.html#gossip

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2140768.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Qt ORM模块使用说明

附源码&#xff1a;QxOrm是一个C库资源-CSDN文库 使用说明 把QyOrm文件夹拷贝到自己的工程项目下, 在自己项目里的Pro文件里添加include($$PWD/QyOrm/QyOrm.pri)就能使用了 示例test_qyorm.h写了表的定义,Test_QyOrm_Main.cpp中写了所有支持的功能的例子: 通过自动表单添加…

C++——异常处理机制(try/catch/throw)

一、什么是异常处理机制 C++中的异常处理机制是一种用来检测和处理程序执行期间可能存在的异常情况的技术。它允许开发者编写健壮的代码,能够提前预判和处理程序执行可能会出现的错误,保证程序正常执行,而不会导致程序崩溃。 C++异常处理主要由几个关键字组成: try、cat…

C++笔记之std::map的实用操作

C++笔记之std::map的实用操作 code review 文章目录 C++笔记之std::map的实用操作1.初始化1.1.使用列表初始化1.2.使用 `insert` 方法1.3.使用 `emplace` 方法1.4.复制构造1.5.移动构造2.赋值2.1.列表赋值2.2.插入元素2.3.批量插入3.取值3.1.使用 `[]` 操作符3.2.使用 `at()` …

Vue路由配置、网络请求访问框架项目、element组件介绍学习

系列文章目录 第一章 基础知识、数据类型学习 第二章 万年历项目 第三章 代码逻辑训练习题 第四章 方法、数组学习 第五章 图书管理系统项目 第六章 面向对象编程&#xff1a;封装、继承、多态学习 第七章 封装继承多态习题 第八章 常用类、包装类、异常处理机制学习 第九章 集…

回归预测|基于开普勒优化相关向量机的数据回归预测Matlab程序KOA-RVM 多特征输入单输出 含基础RVM

回归预测|基于开普勒优化相关向量机的数据回归预测Matlab程序KOA-RVM 多特征输入单输出 含基础RVM 文章目录 一、基本原理1. **相关向量机&#xff08;RVM&#xff09;**2. **开普勒优化算法&#xff08;KOA&#xff09;**3. **KOA-RVM回归预测模型**总结 二、实验结果三、核心…

k8s集群备份与迁移

什么是 Velero? Velero 是一个用Go语言开发的开源工具&#xff0c;用于 Kubernetes 集群的备份、恢复、灾难恢复和迁移。 Velero备份工作流程 当用户发起velero backup create时&#xff0c;会执行如下四个动作&#xff1a; velero客户端调用Kubernetes API创建自定义资源并…

启动windows更新/停止windows更新,在配置更新中关闭自动更新的方法

在Windows操作系统中&#xff0c;启动或停止Windows更新&#xff0c;以及调整“配置更新”的关闭方法&#xff0c;涉及多种途径&#xff0c;这里将详细阐述几种常用的专业方法。 启动Windows更新 1.通过Windows服务管理器&#xff1a; -打开“运行”对话框&#xff08;…

15. 三数之和(实际是双指针类型的题目)

15. 三数之和 15. 三数之和 给你一个整数数组 nums &#xff0c;判断是否存在三元组 [nums[i], nums[j], nums[k]] 满足 i ! j、i ! k 且 j ! k &#xff0c;同时还满足 nums[i] nums[j] nums[k] 0 。请你返回所有和为 0 且不重复的三元组。 注意&#xff1a;答案中不可以…

Uniapp的alertDialog返回值+async/await处理确定/取消问题

今天在使用uniui的alertDialog时&#xff0c;想添加一个确定/取消的警告框时 发现alertDialog和下面的处理同步进行了&#xff0c;没有等待alaertDialog处理完才进行 查询后发现问题在于 await 关键字虽然被用来等待 alertDialog.value.open() 的完成&#xff0c;但是 alertDi…

Android中的冷启动,热启动和温启动

在App启动方式中分为三种&#xff1a;冷启动&#xff08;cold start&#xff09;、热启动&#xff08;hot start&#xff09;、温启动&#xff08;warm start&#xff09; 冷启动&#xff1a; 系统不存在App进程&#xff08;App首次启动或者App被完全杀死&#xff09;时启动A…

使用 GaLore 预训练LLaMA-7B

项目代码&#xff1a; https://github.com/jiaweizzhao/galorehttps://github.com/jiaweizzhao/galore 参考博客&#xff1a; https://zhuanlan.zhihu.com/p/686686751 创建环境 基础环境配置如下&#xff1a; 操作系统: CentOS 7CPUs: 单个节点具有 1TB 内存的 Intel CP…

F12抓包11:UI自动化 - Recoder(记录器)

课程大纲 使用场景&#xff08;导入和导出&#xff09;: ① 测试的重复性工作&#xff0c;本浏览器录制并进行replay&#xff1b; ② 导入/导出录制脚本&#xff0c;移植后replay&#xff1b; ③ 导出给开发进行replay复现bug&#xff1b; ④ 进行前端性能分析。 1、录制脚…

kubernetes 学习 尚硅谷

出自 https://www.bilibili.com/video/BV13Q4y1C7hS 相关命令 kubeadm init &#xff1a;将当前节点创建为主节点 kubectl get nodes&#xff1a;获取集群所有节点 kubectl apply -f xxx.yaml&#xff1a;根据配置文件&#xff0c;给集群创建资源 kubectl delete -f xx.yaml&…

【C++】模板进阶:深入解析模板特化

C语法相关知识点可以通过点击以下链接进行学习一起加油&#xff01;命名空间缺省参数与函数重载C相关特性类和对象-上篇类和对象-中篇类和对象-下篇日期类C/C内存管理模板初阶String使用String模拟实现Vector使用及其模拟实现List使用及其模拟实现容器适配器Stack与Queue 本章将…

判断关系模式的无损连接(表格法)

目录 前言 一、什么是无损连接&#xff1f; 二、如何判断无损连接&#xff1f; 1.表格法 2.示例题 D选项构造初始的判断表如下&#xff1a; 总结 前言 在数据库设计中&#xff0c;确保数据的完整性和有效性是至关重要的。在关系数据库中&#xff0c;函数依赖和无损连接是…

docker|Oracle数据库|docker快速部署Oracle11g和数据库的持久化(可用于生产环境)

一、 容器数据持久化的概念 docker做为容器化的领先技术&#xff0c;现在广泛应用于各个平台中&#xff0c;但不知道什么时候有一个说法是docker并不适用容器化数据库&#xff0c;说容器化的数据库性能不稳定&#xff0c;其实&#xff0c;这个说法主要是因为对docker的数据持…

零基础5分钟上手亚马逊云科技-利用API网关管理API

简介 欢迎来到小李哥全新亚马逊云科技AWS云计算知识学习系列&#xff0c;适用于任何无云计算或者亚马逊云科技技术背景的开发者&#xff0c;通过这篇文章大家零基础5分钟就能完全学会亚马逊云科技一个经典的服务开发架构方案。 我会每天介绍一个基于亚马逊云科技AWS云计算平台…

C语言 ——— 编写代码,将一个长整数用逗号隔开,每3位一个逗号,并输出打印

目录 题目要求 代码实现 题目要求 对于一个较大的整数 N (1 < N < 2,000,000,000) &#xff0c;将 N 每个 3 位加上一个逗号&#xff0c;并且最后输出打印 举例说明&#xff1a; 输入&#xff1a;1980364535 输出&#xff1a;1,980,364,535 代码实现 代码演示&#…

详解JUC

Java并发工具包&#xff08;Java Util Concurrent&#xff0c; 简称JUC&#xff09;是Java提供的一组用于简化多线程编程的类和接口&#xff0c;它包含了用于线程同步、并发数据结构、线程池、锁、原子操作以及其他并发实用工具的丰富集合。 1. 线程池 线程池是 Java 并发编程…

【Go】Go语言中的数组基本语法与应用实战

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; &#x1f3c6; 作者简介&#xff1a;景天科技苑 &#x1f3c6;《头衔》&#xff1a;大厂架构师&#xff0c;华为云开发者社区专家博主&#xff0c;…