Akka 学习(八)路由与Dispatcher

news2025/1/14 10:54:57

目录

  • 一 编发编程
  • 二 Actor路由
    • 2.1 路由的作用
    • 2.2 路由的创建方式
    • 2.3 路由策略
    • 2.4 广播消息
    • 2.5 监督路由对象
    • 2.6 Akka 案例
  • 三 Dispatcher 任务分发
    • 3.1 什么是Dispatcher?
    • 3.2 Dispatcher的线程池
    • 3.3 Dispatcher的分类

一 编发编程

Akka 是一个用于实现分布式、并发、响应式应用程序的工具包和运行时环境。它是基于 Actor 模型的,并使用 Scala 语言实现,但也可以与 Java 一起使用。Actor 模型是一种软件架构,它将应用程序中的对象分配给演员,每个演员都是一个并发实体,并独立地执行它的任务。这样做的好处是,你可以更轻松地实现并发和分布式应用程序,并且不必担心多线程编程中的复杂性和错误。
Akka中提供了两种可以用来进行多核并行编程的抽象:Future和Actor。
实际上,要决定到底使用Actor还是Future其实并不简单。我曾经听别人说过一个通用准则:“Future用于并发,Actor用于状态。”
其实在请前面的案例中我们已经接触到了这两种方式:

Future

Akka Future 是一种用于表示异步计算结果的对象。它与 Java 的 Future 类似,但有一些不同之处。
在 Akka 中,Future 由一个 Actor 创建,并由另一个 Actor 消费。当创建者完成计算并设置了结果时,消费者会收到通知。这样,消费者就可以在不阻塞的情况下等待计算结果。

                         // 获取消息
                        Future sFuture = new AskableActorRef(context().actorOf(Props.create(MeterDemoActor.class))).ask(msg,Timeout.apply(1000,TimeUnit.SECONDS) );
                        CompletionStage<Meter> cs = toJava(sFuture);
                        CompletableFuture<Meter> future = (CompletableFuture<Meter>) cs;
                        // 消息发送给客服端
                        if (future.get() != null) {
                            sender().tell(future.get(), self());
                        }
  • Actor
 public Receive createReceive() {
        return ReceiveBuilder.create()
                .match(MeterRequest.class, x->{
                    System.out.println("收到电表请求消息");
                    sender().tell(new Meter("1001","测试"), self());
                })
                .matchEquals(String.class, System.out::println)
                // 未找到消息
                .matchAny(o ->
                        sender().tell(new Status.Failure(new ClassNotFoundException()), self())
                )
                .build();
    }

二 Actor路由

2.1 路由的作用

  1. 在创建了Router之后,当Router接收到消息时,就会将消息传递给Group/Pool中的一个或多个Actor
  2. 有多种策略可以用来决定Router选择下一个消息发送对象的顺序。在我们的例子中,所有的Actor都运行在本地,我们需要一个包含多个Actor的Router来支持使用多个CPU核进行并行运算。
  3. 如果Actor运行在远程机器上,也可以使用Router在服务器集群上分发工作

epub_22651331_46.jpg

2.2 路由的创建方式

在Akka中,Router是一个用于负载均衡和路由的抽象,创建Router时,必须要传入一个Actor Group,或者由Router来创建一个Actor Pool。

Actor Pool 方式创建路由

	// 创建路由方式一
    ActorRef workerRouter = context()
        .actorOf(Props.create(MeterDemoActor.class)
                 .withRouter(new RoundRobinPool(8)));

在这种情况下使用Router非常简单:照常实例化一个Actor,然后调用withRouter,并传入一个路由策略,以及希望Pool中包含的Actor数量。

Actor Group 方式创建路由

  // 创建方式二
    ActorRef router = context().actorOf(new RoundRobinGroup(actors.map(actor -> actor.path()).props());

2.3 路由策略

epub_22651331_47.jpg

2.4 广播消息

无论是使用Group还是Pool的形式来创建Router,都可以通过广播,将一条消息发送给所有Actor。

          router.tell(new akka.routing.Broadcast(msg));
          router ! akka.routing.Broadcast(msg)

2.5 监督路由对象

如果使用Pool的方式创建Router,由Router负责创建Actor,那么这些路由对象会成为Router的子节点。创建Router时,可以给Router提供一个自定义的监督策略。

    ActorRefworkerRouter = system.actorOf(Props.create(ArticleParseActor.class).withRouter(new RoundRobinPool(8).withSupervisorStrategy(strategy)));

2.6 Akka 案例

路由使用

package com.shu;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import pojo.Meter;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/11 17:25
 * @version: 1.0
 */

public class RouterExample {
    public static void main(String[] args) {
        ActorSystem system = ActorSystem.create("RouterExample");

        // 创建路由器
        ActorRef router = system.actorOf(new RoundRobinPool(5).props(Props.create(Meter.class)));

        // 向路由器发送消息
        for (int i = 0; i < 10; i++) {
            router.tell(new Meter(), ActorRef.noSender());
        }
    }
}

在上面的例子中,我们创建了一个名为“RouterExample”的 ActorSystem,然后创建了一个路由器。路由器使用了 RoundRobinPool 算法,它能够将消息轮流发送到不同的 actor,最后,我们向路由器发送了 10 个消息,路由器会将这些消息分拣到不同的 actor,并由这些 actor 来处理这些消息。

监督路由对象

import akka.actor.{Actor, ActorRef, Props, SupervisorStrategy}
import akka.routing.RoundRobinPool

class Router extends Actor {
  // 定义路由算法
  val router = context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router")
  // 定义监督策略
  override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
  def receive = {
    case message => router ! message
  }
}
//使用
val router = system.actorOf(Props[Router])

我们定义了一个名为 Router 的类,它继承自 Actor类。在 Router 类中,我们创建了一个路由器,并使用 RoundRobinPool 算法进行路由。我们还定义了一个监督策略,用于在路由器的子 actor 发生错误或崩溃时采取适当的措施。

三 Dispatcher 任务分发

3.1 什么是Dispatcher?

  • Dispatcher将如何执行任务与何时运行任务两者解耦。一般来说,Dispatcher会包含一些线程,这些线程会负责调度并运行任务,比如处理Actor的消息以及线程中的Future事件。
  • Dispatcher是Akka能够支持响应式编程的关键,是负责完成任务的机制。
  • akka Dispatcher 是用于调度 akka Actor 的线程池。
  • 它负责将 akka Actor 任务分配给线程池中的线程执行,并监控执行情况。
  • 在 akka 框架中,所有的 akka Actor 都会关联一个 Dispatcher,Dispatcher 能够有效地管理和调度 akka Actor 的执行,从而提高 Actor 的并发性能。

epub_22651331_48.jpg

如何获取Dispatcher

          system.dispatcher //actor system's dispatcher
          system.dispatchers.lookup("my-dispatcher"); //custom dispatcher

扩展

在 Akka 中,Dispatcher 是一个用于控制消息处理和线程分配的抽象。开发者可以通过使用 akka.dispatch.Dispatchers 类来获取特定的 Dispatcher。例如,下面的代码片段演示了如何获取用于执行阻塞 IO 操作的 Dispatcher:


import akka.dispatch.Dispatchers 
val ioDispatcher = Dispatchers.IO

在这个例子中,ioDispatcher 变量将包含一个用于执行阻塞 IO 操作的 Dispatcher。你可以使用这个 Dispatcher 来指定 Akka 要使用哪个线程池来执行特定的操作,以及如何处理超时等问题。
请注意,使用 Dispatcher 是可选的,并且默认情况下 Akka 会使用内置的默认 Dispatcher 来处理消息。但是,在某些情况下,如果你想要更细粒度地控制消息处理和线程分配,你可能需要使用 Dispatcher。

3.2 Dispatcher的线程池

  • Dispatcher基于Executor,所以在具体介绍Dispatcher之前,我们将介绍两种主要的Executor类型:ForkJoinPool和ThreadPool。
  • ThreadPool Executor有一个工作队列,队列中包含了要分配给各线程的工作。线程空闲时就会从队列中认领工作。由于线程资源的创建和销毁开销很大,而ThreadPool允许线程的重用,所以就可以减少创建和销毁线程的次数,提高效率。
  • ForkJoinPool Executor使用一种分治算法,递归地将任务分割成更小的子任务,然后把子任务分配给不同的线程运行。接着再把运行结果组合起来。由于提交的任务不一定都能够被递归地分割成ForkJoinTask,所以ForkJoinPool Executor有一个工作窃取算法,允许空闲的线程“窃取”分配给另一个线程的工作。
  • 由于工作可能无法平均分配并完成,所以工作窃取算法能够更高效地利用硬件资源。ForkJoinPool Executor几乎总是比ThreadPool的Executor效率更高,是我们的默认选择。

3.3 Dispatcher的分类

要在application.conf中定义一个Dispatcher,需要指定Dispatcher的类型和Executor。还可以指定Executor的具体配置细节,比如使用线程的数量,或是每个Actor一次性处理的消息数量。

        my-dispatcher {
            type = Dispatcher
            executor = "fork-join-executor"

            fork-join-executor {
                parallelism-min = 2 #Minimum threads
                parallelism-factor = 2.0 #Maximum threads per core
                parallelism-max = 10 #Maximum total threads
            }
            throughput = 100 #Max messages to process in an actor before moving on.
        }

● Dispatcher:默认的Dispatcher类型。将会使用定义的Executor,在Actor中处理消息。在大多数情况下,这种类型能够提供最好的性能。
● PinnedDispatcher:给每个Actor都分配自己独有的线程。这种类型的Dispatcher为每个Actor都创建一个ThreadPool Executor,每个Executor中都包含一个线程。如果希望确保每个Actor都能够立即响应,那么这似乎是个不错的方法。不过PinnedDispatcher比其他共享资源的方法效率更高的情况其实并不多。可以在单个Actor必须处理很多重要工作的时候试试这种类型的Dispatcher,否则的话不推荐使用。
●CallingThreadDispatcher:这个Dispatcher比较特殊,它没有Executor,而是在发起调用的线程上执行工作。这种Dispatcher主要用于测试,特别是调试。
● BalancingDispatcher:我们会在一些Akka文档中看到BalancingDispatcher。现在已经不推荐直接使用BalancingDispatcher了,应该使用前面介绍过的BalancingPool Router,使用BalancingPool时,Pool中的所有Actor会共享同一个邮箱,然后通过高效的工作窃取机制将任务重新分配给任何空闲的Actor。由于共享同一个邮箱,因此使用BalancingPool有助于确保在有工作的时候降低Actor的空闲率。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/80557.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mPEG-Phosphate,甲氧基-聚乙二醇-磷酸盐试剂供应

一&#xff1a;产品描述 1、名称 英文&#xff1a;mPEG-Phosphate 中文&#xff1a;甲氧基-聚乙二醇-磷酸盐 2、CAS编号&#xff1a;N/A 3、所属分类&#xff1a;Phosphate PEG Methoxy PE 4、分子量&#xff1a;可定制&#xff0c;2000/1000/3400/20000/5000/10000 5、…

认识Java中的反射与枚举

作者&#xff1a;~小明学编程 文章专栏&#xff1a;JavaSE基础 格言&#xff1a;目之所及皆为回忆&#xff0c;心之所想皆为过往 目录 反射 什么是反射&#xff1f; 常用的反射类 Class类 Class类中的相关方法 常用获得类中属性相关的方法 获得类中注解相关的方法 获得…

Java中的运算符--短路运算

文章目录0 写在前面1 介绍2 举例2.1 逻辑与 &&2.2 逻辑或 ||3 小技巧4 写在最后0 写在前面 JAVA中有两个短路运算&#xff0c;一个是短路与&#xff0c;一个是短路或。 所谓短路&#xff0c;就是当一个参与运算的操作数足以推断该表达式的值时&#xff0c;另一个操作数…

VirtualBox安装CentOS7

一&#xff1a;、下载CentOS7的镜像 下载地址&#xff1a;Downloadhttps://www.centos.org/download/ 进入后有三个版本可以选择&#xff1a; 1、DVD ISO 标准安装版&#xff0c;一般下载这个就可以了&#xff08;推荐&#xff09;本文以此为例&#xff01; 2、Everything…

2023跨年代码(烟花+雪花)

一眨眼&#xff0c;马上就2023年了&#xff0c;祝大家在新的一年里&#xff1a;身体健康平安&#xff0c;生活充实饱满&#xff0c;事业步步高升&#xff0c;心情阳光灿烂&#xff0c;财运滚滚而来&#xff0c;家庭美满幸福&#xff0c;新年开心快乐! 本文将给大家分享一些跨年…

GitHub Copilot

介绍 GitHub Copilot 是人工智能编程助手&#xff0c;它可以帮助你编写程序。在你用visual studio或visual studio code等软件设计工具进行编程时&#xff0c;它可以直接给你整行或整个方法的代码提示&#xff0c;并且提供多种提示方案供你选择。他是由openai公司&#xff08;马…

docker 及docker-compose network概念及操作详解

1. docker network概述 Docker通过使用网络驱动程序【network drivers】支持网络容器。默认情况下&#xff0c;Docker提供了多个网络驱动程序&#xff0c;如bridge 和overlay驱动程序。用户也可以自己写一个网络驱动插件&#xff0c;这样就可以创建自己的驱动程序。 Docker引…

SpringBoot整合Mybatis-Plus分页失效

场景&#xff1a;项目整合mybatis-Plus分页失效&#xff0c;current一直是1&#xff0c;size一直是10&#xff0c;total属性一直是0&#xff0c;数据分页不准 先看官网给的示例&#xff1a; 解决方案是新建mybatis-Plus的配置文件&#xff1a; package com.amc.config;import …

[附源码]计算机毕业设计海南琼旅旅游网Springboot程序

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; Springboot mybatis MavenVue等等组成&#xff0c;B/S模式…

混检阳性概率的计算(贝叶斯定理的一个应用例)

目录 1. 混检阳性概率的计算 2. 混管阳性时你阳性的概率有多大&#xff1f; 2.1 贝叶斯分析结果的解释 1. 混检阳性概率的计算 目前核酸混检的基本做法是十混一&#xff0c;如果阳性人群分布完全随机&#xff0c;那么做十混一混检为阳性的概率有多大呢&#xff1f; …

对immutable的理解?如何应用在react项目中?

一、是什么 Immutable&#xff0c;不可改变的&#xff0c;在计算机中&#xff0c;即指一旦创建&#xff0c;就不能再被更改的数据 对 Immutable 对象的任何修改或添加删除操作都会返回一个新的 Immutable 对象 Immutable 实现的原理是 Persistent Data Structure&#xff08…

笔试强训(四十二)

目录一、选择题二、编程题2.1 解读密码2.1.1 题目2.1.2 题解2.2 走迷宫2.2.1 题目2.2.2 题解一、选择题 &#xff08;1&#xff09;tcp套接字中&#xff0c;不会阻塞的是哪一种操作&#xff08;D&#xff09; A.read B.write C.accept D.bind bind函数不会阻塞执行流的 &#…

Stm32旧版库函数3——nrf24l01 16位数据 51单片机发送与stm32接收

51代码&#xff1a; #include <reg52.h> #include <intrins.h> typedef unsigned char uchar; typedef unsigned char uint; //****************************************NRF24L01端口定义*************************************** sbit MISO P1^7; sbit …

runnable、callable、consumer、supplier

Java 没有委托的概念&#xff1b; 相反&#xff0c;如果需要一个指向函数的指针&#xff0c;可以创建内联匿名类&#xff08;或 Java 8 的 lambda 表达式&#xff09;&#xff0c;它们是为此建议设计的某些接口的实现&#xff08;也称为 Java 8 的功能接口&#xff09;。 然而&…

Java项目:SSM汽车维修中心管理系统

作者主页&#xff1a;源码空间站2022 简介&#xff1a;Java领域优质创作者、Java项目、学习资料、技术互助 文末获取源码 项目介绍 本系统包括普通用户和管理员两种角色&#xff1b; 用户角色包含以下功能&#xff1a; 用户信息管理,查看车辆信息,维修记录查看等功能。 管理…

ORB-SLAM2 --- Tracking::GrabImageMonocular函数解析

目录 1.函数作用 2.到这步之前我们做了什么 3.code 4.函数解析 1.函数作用 哈哈哈&#xff0c;这其实应该是这个专栏的第一篇文章&#xff0c;也没什么必要写&#xff0c;但是我怕大家看的时候对单目还没有初始化没有进入跟踪线程前面比较懵逼&#xff0c;所以我补了此内…

Kali Linux安装go语言环境详解

今天继续给大家介绍渗透测试相关知识&#xff0c;本文主要内容是Kali Linux安装go语言环境。 免责声明&#xff1a; 本文所介绍的内容仅做学习交流使用&#xff0c;严禁利用文中技术进行非法行为&#xff0c;否则造成一切严重后果自负&#xff01; 再次强调&#xff1a;严禁对未…

【实操篇】Linux权限管理

目录 ●权限的基本介绍 ●rwx权限 ①rwx作用到文件 ②rwx作用到目录 ●修改权限——chmod ①、-、变更权限 ②数字变更权限 ●修改文件所有者——chown ●修改文件所在组——chgrp ●权限的基本介绍 从中随便找一行进行分析如下图所示&#xff1a; 1.文件类型 - &#x…

Hudi 0.12.0 搭建——集成 Hive3.1 与 Spark3.2

Hudi 搭建环境准备一、安装 Maven1.解压2.配置环境变量3.修改 Maven 下载源二、安装 Hudi1.解压2.配置环境变量3.修改 Hudi 下载源与对应版本号4.修改源码以兼容 Hadoop3.x5. 解决 Spark 依赖冲突6. 解决 Spark 向 Hudi 插入报错7. 编译 Hudi8. 启动测试集群其它生态安装与配置…

JSP ssh驾校管理系统myeclipse开发mysql数据库MVC模式java编程计算机网页设计

一、源码特点 JSP ssh驾校管理系统是一套完善的web设计系统&#xff08;系统采用ssh框架进行设计开发&#xff09;&#xff0c;对理解JSP java编程开发语言有帮助&#xff0c;系统具有完整的源代码和数据库&#xff0c;系统主要采用B/S模式 开发。开发环境为TOMCAT7.0,Mye…