Akka 进阶(三)Route 路由

news2025/1/12 1:05:10

目录

  • 一 路由Actor
  • 二 Pool方式的方式创建路由
  • 三 Group方式创建路由

消息可以通过多种方式送达目的地,比如tell、ask、forward等,这些方式是最常规也是最简单的,但是对于复杂的消息投递逻辑,比如轮询投递、随机投递、广播组等,就需要开发者自己去做一层封装,好在Akka已经提供了丰富的路由组件,可以很好地满足这类需求。在实际项目中,我们通常会使用路由器来做负载均衡和任务分派。
epub_933107_25.jpg

一 路由Actor

路由器可以是一个自包含的Actor,它通常管理着自己的所有Routee,一般来讲,我们会把这类路由配置在*.conf文件中,然后通过编码的方式加载并创建路由器,创建一个路由Actor有两种模式:pool和group。

  • pool的方式表示路由器Actor会创建子Actor作为其Routee并对其监督和监控,当某个Routee终止时将其移除出去。
  • group的方式表示可以将Routee的生产方式放在外部(不必自包含),然后路由器Actor通过路径(path)对这些目标进行消息发送。

基本使用

  • 定义路由Actor
import akka.actor.UntypedActor;

/**
 * @description: 路由Actor
 * @author: shu
 * @createDate: 2022/12/27 13:02
 * @version: 1.0
 */
public class RouteeActor extends UntypedActor {
        @Override
        public void onReceive(Object msg) throws Exception {
            System.out.println(getSelf()+"-->"+msg);
        }
}

  • 定义路由分发Actor
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;

import java.util.ArrayList;
import java.util.List;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/27 13:04
 * @version: 1.0
 */
class RouterTaskActor extends UntypedActor {
    private Router router;

    @Override
    public void preStart() throws Exception {
        List<Routee> listRoutee=new ArrayList<Routee>();
        for(int i=0; i<2; i++) {
            ActorRef ref=getContext().actorOf(Props.create(RouteeActor.class), "routeeActor"+i);
            listRoutee.add(new ActorRefRoutee(ref));
        }
        router=new Router(new RoundRobinRoutingLogic(), listRoutee);
    }


    @Override
    public void onReceive(Object msg) throws Exception {
        router.route(msg, getSender());
    }

    public static void main(String[] args) {
        ActorSystem system=ActorSystem.create("sys");
        ActorRef routerActor=system.actorOf(Props.create(RouterTaskActor.class), "routerTaskActor");
        routerActor.tell("helloA", ActorRef.noSender());
        routerActor.tell("helloB", ActorRef.noSender());
        routerActor.tell("helloC", ActorRef.noSender());
    }
}
  • 测试

image.png
我们可以发现消息经过了路由的分发,传递了不同的路由Actor,下是不同的路由类型
epub_933107_27.jpg

二 Pool方式的方式创建路由

使用pool方式定义的路由Actor会自动将Routee创建为自己的子级,这种层级关系在最开始就自动存在,不必通过getContext.actorOf的方式来指定。
对于路由功能来讲,我们最好有一个消息中转的过程,即不会直接通过路由器来发送消息,而是先经过一层中间转发的过程,这样有利于构建更加清晰的管理结构,所以这里我们首先定义一个Actor,作为消息中转处理器。

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.RoundRobinPool;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/27 14:29
 * @version: 1.0
 */
class MasterRouterActor extends UntypedActor {


    ActorRef router = null;


    @Override
    public void preStart() throws Exception {
        router = getContext().actorOf(
                new RoundRobinPool(3).props(Props.create(TaskActor.class)),
                "taskActor");
        System.out.println("router:"+router);

    }


    @Override
    public void onReceive(Object msg) throws Exception {
        router.tell(msg, getSender());
    }

    public static void main(String[] args) {
        ActorSystem system=ActorSystem.create("sys");
        ActorRef routerActor=system.actorOf(Props.create(MasterRouterActor.class), "routerTaskActor");
        routerActor.tell("helloA", ActorRef.noSender());
        routerActor.tell("helloB", ActorRef.noSender());
        routerActor.tell("helloC", ActorRef.noSender());
    }
}

image.png
上面写的代码式路由Actor,下面介绍一下配置是代码

        akka.actor.deployment {
            /masterRouterActor/taskActor {
              router = round-robin-pool
              nr-of-instances = 3
            }
        }


        router = getContext().actorOf(
            FromConfig.getInstance().props(Props.create(TaskActor.class)), "taskActor");

到这里,大家可能会有个疑问,假如此时Routee回复一个消息会怎样呢?到底该谁接收?
实际上,路由和forward一样,在整个消息转发的过程中并不会改变原始sender,所以消息会被回复给最初的sender。并且,在回复消息时可以让父Actor(路由Actor)成为自己的sender,这样在某种程度上可以隐藏自己的相关信息。对于pool方式来讲,另外一个需要注意的是:由于父监督原则,路由Actor承担着Routee的监督工作,当没有显式指定监督策略时,路由Actor默认会把失败上溯到上级。当路由Actor重启时,会重新创建Routee(子级Actor),并且在池中维护相同个数(nr-of-instances)的actor;当所有的Routee被终止时,路由Actor也会停止(watch它的生命周期,就可以收到它的Terminated消息)。

自定义监督策略

import akka.actor.*;
import akka.japi.Function;
import akka.japi.pf.DeciderBuilder;
import akka.routing.RoundRobinPool;
import scala.concurrent.duration.Duration;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/27 14:29
 * @version: 1.0
 */
class MasterRouterActor extends UntypedActor {


    ActorRef router = null;

    /**
     * 监督策越
     */
    SupervisorStrategy strategy = new OneForOneStrategy(3,
            Duration. create( "1 minute"),  DeciderBuilder
            .match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
            .match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
            .match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
            .matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
            .build());




    @Override
    public void preStart() throws Exception {
        router = getContext().actorOf(
                new RoundRobinPool(3).withSupervisorStrategy(strategy  ).props(Props.create(TaskActor.class)),
                "taskActor");
        System.out.println("router:"+router);

    }


    @Override
    public void onReceive(Object msg) throws Exception {
        router.tell(msg, getSender());
    }

    public static void main(String[] args) {
        ActorSystem system=ActorSystem.create("sys");
        ActorRef routerActor=system.actorOf(Props.create(MasterRouterActor.class), "routerTaskActor");
        routerActor.tell("helloA", ActorRef.noSender());
        routerActor.tell("helloB", ActorRef.noSender());
        routerActor.tell("helloC", ActorRef.noSender());
    }
}

三 Group方式创建路由

我们需要单独(外部)创建Routee时,可以采用group方式。在使用group路由前,需要先定义Routee-Actor,然后将它们以path的形式配置起来,路由Actor会通过path列表来进行路由。

        //定义Routee
        getContext().actorOf(Props.create(WorkTask.class), "wt1");
        getContext().actorOf(Props.create(WorkTask.class), "wt2");
        getContext().actorOf(Props.create(WorkTask.class), "wt3");
        router=getContext().actorOf(FromConfig.getInstance().props(), "router");

其中/masterActor/router表示路由器的path, routees.paths用来配置Routee的path,并且本地和远程Actor都是支持的。假如希望这些path能更加动态地产生(或者依赖其他业务逻辑产生),可以使用编码的方式来实现

        List<String>  routeePaths  =  Arrays.asList("/user/masterActor/wt1",  "/user/
            masterActor/wt2", "/user/masterActor/wt3");
        router=getContext().actorOf(new RoundRobinGroup(routeePaths).props(), "router");

还有一点要注意的就是:group方式并不限制Routee一定得在同一个层级下,比如当你想增加一个其他外部的Actor作为Routee时,仅仅需要将它的path配置在routees.paths中即可。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/118145.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

IB课程预估分,请认真规划学习进程

近年受疫情影响&#xff0c;IB考试形式与分数情况&#xff0c;有不小的波动&#xff0c;年度出分依然吸引着众人目光。尤其明后年准备参加大考的同学&#xff0c;更关心实战难度和考试分数的变化趋势。 01预估分&#xff1a;严重被压低的预期分数 全球疫大环境下&#xff0c;IB…

合宙 ESP32C3 烧录 Micropython 后连接端口报错

合宙esp32c3 开发板烧录micropython 后连接VScode 或 Thonny报错&#xff1a; Device is busy or does not respond. 1. 原因&#xff1a; 烧录的micropython bin文件有问题。 问题参考&#xff1a; 链接: 合宙ESP32-C3 烧录Micropython报错入坑记 2. 解决办法&#xff1a…

H3C smart-link实验 C套拆解

H3C smart-link实验 C套拆解一、项目拓扑二、项目需求三、配置步骤1.vlan-trunk2.STP3.smart-link四、测试一、项目拓扑 二、项目需求 总部局域网内sw3进行双上行链路灵活备份&#xff0c;smart-link组1 引用实例1(绑定vlan10)的流量从经过sw1的链路通向出口路由器r1&#xff…

10个优秀的Python库,实用且有趣

序言 哈喽兄弟们&#xff0c;今天分享10个优秀的Python库&#xff0c;超级实用&#xff01; 为什么这么多人选择学习python?首先&#xff0c;python是一门全场景编程语言&#xff0c;对于初学编程的人而言&#xff0c;选择一门全场景编程语言是非常不错的选择;其次&#xff…

PHP aws-sdk-php文件存储的实现与应用

前言 最近项目需要用到对象存储&#xff0c;将所有上传文件&#xff0c;存储到BOS云存储上。在开发过程中&#xff0c;遇到一些小小的问题&#xff0c;做个简单记录。 功能实现 1 下载sdk&#xff08;以下两种方式&#xff0c;任选其一即可&#xff09; &#xff08;1&#…

手绘图说电子元器件-电阻,电容,电感

电阻器与电位器 电阻器是最基本的电子元件,电位器是最基本的可调电子元件,它们广泛应用在各种电子电路中。 电阻器 电阻器是限制电流的元件,通常简称为电阻,是一种最基本、最常用的电子元件,包括固定电阻器、可变电阻器、敏感电阻器等。 电阻器的主要参数有电阻值和额…

FFT(2)

DFT到FFT 这是DFT公式 对DFT代数变换 将DFT的计算&#xff0c;分为计数组和偶数组。 惊奇的发现&#xff1a;只需要改变WkNW_k^NWkN​的符号即可得到X&#xff08;k&#xff09;的另一半项数的结果。 得到FFT算法&#xff08;蝶形运算&#xff09; 惊奇的发现&#xff1…

docker安装教程,即学即会

docker教程&#xff1a; https://www.runoob.com/docker/docker-tutorial.html卸载docker 较旧的 Docker 版本称为 docker 或 docker-engine 。如果已安装这些程序&#xff0c;请卸载它们以及相关的依赖项。 yum remove docker docker-client docker-client-latest docker-co…

maven打包缺少依赖异常eu.neilalexander:jnacl:jar:1.0.0 was not found in...解决

在Linux系统的服务器上使用脚本部署项目&#xff0c;脚本的逻辑是&#xff1a; 进入到工作空间的项目文件夹从SVN拉取最新代码命令svn up执行mvn clean执行mvn package进入jar包生成的target文件夹nohub java -jar xxxxx.jar >/dev/null 2>&1 & 在项目打包过程…

C++迭代器详解

思考一个问题&#xff1a;我们该如何遍历一个字符串呢&#xff1f; 方法一&#xff1a;正常遍历 string s1("hello"); for(size_t i 0;i<s1.size();i) {cout<<s1[i]<<" ";//[]是一个重载运算符&#xff0c;实际上调用了s1.operator[](i)…

【爬虫+数据清洗+可视化分析】用Python分析哔哩哔哩“阳了“的评论数据

目录 一、背景介绍 二、爬虫代码 爬虫部分不作讲解。 三、可视化代码 3.1 读取数据 3.2 数据清洗 3.3 可视化 3.3.1 IP属地分析-柱形图 3.3.2 评论时间分析-折线图 3.3.3 点赞数分布-直方图 3.3.4 评论内容-情感分布饼图 3.3.5 评论内容-词云图 三、演示视频 一、…

Ansible常用模块

ping模块 验证主机的连通性 [rootmonster1 ~]# ansible all -m ping 192.168.71.131 | SUCCESS > {"ansible_facts": {"discovered_interpreter_python": "/usr/bin/python"}, "changed": false, "ping": "pong&q…

Java中mongodb指定DB通过aggregate聚合查询操作示例

目录 前言&#xff1a; 应用场景&#xff1a; 命令描述&#xff1a;​ 代码示例&#xff1a; 聚会查询&#xff1a; 数量查询&#xff1a; 前言&#xff1a; 大家都知道&#xff0c;mongodb是一个非关系型数据库&#xff0c;也就是说&#xff0c;mongodb数据库中的每张表…

node-express学习总结

项目搭建 1. 使用express提供的框架构建&#xff08;不需要&#xff09; 2. 从零开始&#xff08;推荐&#xff09;安装 初始化项目 npm init -y安装express npm install express1.express的基本使用 创建js文件 const express require(express) // 1&#xff0c;创建服…

SSL/TLS类安全漏洞及SLB安全漏洞问题

SSL/TLS类安全漏洞及SLB安全漏洞问题1 : 问题背景1.1、SSL/TLS类漏洞-Sweet32 攻击1.2、SSL/TLS类漏洞-弱密码套件2 : 解决思路2.1、学习SSL/TLS是什么2.2、安装检测工具2.3、升级OpenSSL2.4、调整加密算法3 : 总结3.1、比较环境的不同3.2、解决该问题3.3、相关资源1 : 问题背景…

创建进程与进程地址空间

目录 创建进程 进程地址空间 为什么要用虚拟地址呢&#xff1f; 什么是进程地址空间&#xff1f; 为什么要写时拷贝呢&#xff1f; 创建进程 前面提到使用fork可以创建子进程&#xff0c;现在介绍fork创建子进程的细节。 fork创建子进程的时候&#xff0c;子进程的内核数…

如何给公司内网搭建一个专用的DNS服务器?

如何给公司内网搭建一个专用的DNS服务器&#xff1f; 引言 平时做域名解析&#xff0c;一般直接修改的/etc/hosts文件。对于服务器数量小的情况完全可以&#xff0c;但是如果服务器数量较多&#xff0c;每个都修改比较麻烦。 DNS是作为域名解析。在实际的生产过程中&#xff…

基于ASP.NET C#的服装商城管理系统

摘 要 本毕业设计的内容是设计并且实现一个基于net语言的服装商城管理系统。它是在Windows下&#xff0c;以SQL Server为数据库开发平台&#xff0c;服装商城管理系统的功能已基本实现&#xff0c;主要包括用户、服装信息、通知公告、留言板、订单信息等。 论文主要从系统的分…

飞腾FT-2000/4处理器+复旦微FPGA+国产操作系统解决方案(2)

XM-1104飞腾核心处理主板 ▶体积小、功耗低、高性能。 ▶功能接口多样化&#xff0c;采用高密度连接器&#xff0c;抗震效果好。 ▶成本低&#xff0c;扩展性强&#xff0c;根据用户的需求定制各种底板。 ▶产品灵活&#xff0c;便于维护&#xff0c;生命周期长。 指标 参数 …

SuperMap GIS地质体数据处理QA

作者:hyy 一、地质体数据简介 什么是三维地质建模&#xff1f;百度上给出的解析是&#xff1a;将地质、测井、地球物理资料和各种解释结果或者概念模型综合在一起生成的三维定量随机模型。 已建成的地质模型可以为我们提供很多信息。首先是地质的三维可视化。通过三维可视化&…