Akka 学习(五)消息传递的方式

news2025/1/12 1:48:30

目录

  • 一 消息传递方式
    • 1.1 消息不可变
    • 1.2 ASK消息模式
    • 1.3 Tell消息模式
    • 1.4 Forward消息模式
    • 1.4 Pipe消息模式

有4种核心的Actor消息模式:Tell、Ask、Forward和Pipe。

一 消息传递方式


在这里,将从Actor之间发送消息的角度来介绍所有关于消息传递的概念。
● Ask:向Actor发送一条消息,返回一个Future。当Actor返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
● Tell。向Actor发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。● Forward:将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都会返回给原始消息的发送者。
● Pipe:用于将Future的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe可以正确地返回Future的结果。

1.1 消息不可变

  • 前面提到过,消息应该是不可变的,由于Akka基于JVM,而Java和Scala都支持可变类型,所以是有可能发送可变消息的。
  • 不过如果这么做,就可能会失去Akka在消除共享状态方面提供的诸多益处,一旦有了可变消息,就引入了一种风险:开发者可能会在某些时候开始修改消息,而他们修改消息的方式可能会破坏应用程序的运行。
  • 当然,如果不修改消息的状态,那么要安全地使用可变消息也不是不可能,不过最好还是使用不可变消息,确保不会因为未来的变化而引入错误。

可变消息定义


// Java
public class Message {
            public StringBuffer mutableBuffer;
            public Message(StringBuffer: mutableBuffer) {
                this.mutableBuffer = mutableBuffer;
            }
        }
// scala
class Message(var mutableBuffer: StringBuffer = new StringBuffer);
        Message message = new Message(new StringBuffer("original"));
        message.mutableBuffer = new StringBuffer("new");

        val message = new Message(new StringBuffer("original"))
        message.mutableBuffer = new StringBuffer("new")

这个例子新建了一条消息,然后修改mutableBuffer引用,使之指向另一个新建的StringBuffer,消息创建时传入的是StringBuffer (“original”),后来被修改成了StringBuffer(“new”)。这就是通过修改引用来修改消息的方法。
epub_22651331_25.jpg

消息不可变

        public class Message {
            // final 
            public final StringBuffer mutableBuffer;

            Message(StringBuffer mutableBuffer) {
                this.mutableBuffer = mutableBuffer;
            }
        }
  • 在Scala中,如果没有在声明中给出任何访问修饰符,成员变量的引用默认就是不可变的val。
  • 现在我们Java中加入Final关键字,可以使引用设置为不可变,但是会出现问题,StringBuffer是可变的,因此可以修改修改StringBuffer对象本身。
        public class ImmutableMessage{
            public final String immutableType;
            public ImmutableMessage(String immutableType) {
                this.immutableType = immutableType;
            }
        }
        class ImmutableMessage(immutableType: String)
  • 由于String是不可变类型,因此可以通过使用String代替StringBuffer使得消息不可变。

总结

理解不可变性不仅仅对于Akka消息是至关重要的,对于如何进行安全的通用并发编程也是必不可少的,因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。(重点:消息不可变)

1.2 ASK消息模式

epub_22651331_27.jpg

  • 在调用ask向Actor发起请求时,Akka实际上会在Actor系统中创建一个临时Actor。
  • 接收请求的Actor在返回响应时使用的sender()引用就是这个临时Actor。
  • 当一个Actor接收到ask请求发来的消息并返回响应时,这个临时Actor会使用返回的响应来完成Future。
  • 因为sender()引用就指向临时Actor的路径,所以Akka知道要用哪个消息来完成Future。

要求

Ask模式要求定义一个超时参数,如果对方没有在超时参数限定的时间内返回这个ask的响应,那么Future就会返回失败。ask/?方法要求提供的超时参数可以是长整型的毫秒数,也可以是akka.util.Timeout,这种类型提供了更丰富的时间表达方式。
epub_22651331_29.jpg

  • Java模式
static import akka.pattern.Patterns.ask;

Timeout timeout = new akka.util.Timeout(
    1,
    java.util.concurrent.TimeUnit.SECONDS
);
Future future = ask(actor, message, timeout);
  • Scala
import scala.concurrent.duration._
import akka.pattern.ask
// 设置超时时间
implicit val timeout = akka.util.Timeout(1 second)
val future = actorRef ? "message"

案例

  1. 查询时我们先去缓存中查询是否有数据
  2. 如果缓存中没用数据,我们去请求实际的url,获取数据,如果有数据,字节返回
  3. 获取到数据,交给实际的解析器进行进行返回
  4. 解析返回后,返回数据给请求客服端
  • java
package article;

import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import org.jboss.netty.handler.codec.http.HttpResponse;
import pojo.GetRequest;
import static scala.compat.java8.FutureConverters.toJava;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;

/**
* @description: 解析Ask解析
* @author: shu
* @createDate: 2022/12/7 20:50
* @version: 1.0
*/
public class AskDemoArticleParser extends AbstractActor {

    /**
* 缓存Actor路径
*/
    private final ActorSelection cacheActor;
    /**
* HttpActro 路径
*/
    private final ActorSelection httpClientActor;
    /**
* 实际解析路径
*/
    private final ActorSelection artcileParseActor;
    /**
* 超时时间
*/
    private final Timeout timeout;
    /**
*  当前Actor
*/
    final ActorRef senderRef = sender();


    LoggingAdapter log = Logging.getLogger(getContext().system(), this);

    /**
* 构造器
* @param cacheActorPath 文章html解析缓存路径
* @param httpClientActorPath http请求路径
* @param artcileParseActorPath 文章解析actor路径
* @param timeout 超时设置
*/
    public AskDemoArticleParser(String cacheActorPath, String httpClientActorPath, String artcileParseActorPath, Timeout timeout) {
        this.cacheActor = context().actorSelection(cacheActorPath);
        this.httpClientActor = context().actorSelection(httpClientActorPath);
        this.artcileParseActor = context().actorSelection(artcileParseActorPath);
        this.timeout = timeout;
    }


    /**
* 消息的监听
* @return
*/
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
            .match(ParseArticle.class, msg -> {
                // 第一步先走缓存中查询是否有数据
                final CompletionStage cacheResult =
                    toJava(new AskableActorSelection(cacheActor).ask( new GetRequest(msg.getUrl()), timeout));
                log.info("缓存中的数据:{}"+cacheResult);
                // 第二步:缓存中没用的话,就解析实时数据
                final CompletionStage result = cacheResult.handle((x, t) -> { return (x!= null)
                    ? CompletableFuture.completedFuture(x) :
                    // HTTP客户端的Actor发送ask请求,请求获取数据
                    toJava(new AskableActorSelection(httpClientActor).ask( msg.getUrl(), timeout))
                    .thenCompose(rawArticle -> toJava(
                        // 向用于文章解析的Actor发送ask请求,请求对原始文章进行解析。
                        new AskableActorSelection(artcileParseActor).ask(rawArticle, timeout))
                                );
                                                                            }).thenCompose(x -> x);
                log.info("实时处理的数据:{}"+cacheResult);
                // 处理解析内容
                   result.handle((x, t) -> {
                   if(x != null) {
                   if(x instanceof String) 
                   senderRef.tell(x, self());
                   } else if( x == null )
                   senderRef.tell(new akka.actor.Status.Failure((Throwable)t), self());
                   return null;
                   });
                   }).build();
                   }
}


缺点

  1. Ask模式看上去很简单,不过它是有隐藏的额外性能开销的,首先,ask会导致Akka在/temp路径下新建一个临时Actor。(需要一个中间商来代理获取到的消息)
  2. 这个临时Actor会等待从接收ask消息的Actor返回的响应,其次,Future也有额外的性能开销。
  3. Ask会创建Future,由临时Actor负责完成,这个开销并不大,但是如果需要非常高频地执行ask操作,那么还是要将这一开销考虑在内的。

总结

Ask很简单,不过考虑到性能,使用Tell是更高效的解决方案。

1.3 Tell消息模式

epub_22651331_31.jpg

  • Tell是ActorRef/ActorSelection类的一个方法,它也可以接受一个响应地址作为参数,接收消息的Actor中的sender()其实就是这个响应地址。
  • 在Scala中,默认情况下,sender会被隐式定义为发送消息的Actor,如果没有sender(如在Actor外部发起请求),那么响应地址不会默认设置为任何邮箱(叫做DeadLetters)。

使用

  • java 使用
 sender().tell('需要发送的消息', self());
// self字段保存了这个actor的ActorRef。
  final def sender(): ActorRef = context.sender()
  • scala 使用
   sender() ! '消息' 

案例

package article;

import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import org.jboss.netty.handler.codec.http.HttpResponse;
import pojo.GetRequest;
import pojo.SetRequest;

import java.util.concurrent.TimeoutException;


/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/9 20:07
 * @version: 1.0
 */
public class TellDemoArticleParse  extends AbstractActor {
    /**
     * 缓存Actor路径
     */
    private final ActorSelection cacheActor;
    /**
     * HttpActro 路径
     */
    private final ActorSelection httpClientActor;
    /**
     * 文章实际解析路径
     */
    private final ActorSelection artcileParseActor;
    /**
     * 超时时间
     */
    private final Timeout timeout;
    /**
     * 当前Actor
     */
    final ActorRef senderRef = sender();


    LoggingAdapter log = Logging.getLogger(getContext().system(), this);


    public TellDemoArticleParse(ActorSelection cacheActor, ActorSelection httpClientActor, ActorSelection artcileParseActor, Timeout timeout) {
        this.cacheActor = cacheActor;
        this.httpClientActor = httpClientActor;
        this.artcileParseActor = artcileParseActor;
        this.timeout = timeout;
    }






    @Override
    public Receive createReceive() {

        return  ReceiveBuilder.create()
                .match(ParseArticle.class, msg -> {
                    // 运程Actor
                    ActorRef extraActor = buildExtraActor(sender(), msg.getUrl());
                    // 缓存Actor
                    cacheActor.tell(new ParseArticle(msg.getUrl()), extraActor);
                    // httpActor
                    httpClientActor.tell(msg.getUrl(), extraActor);

                    context().system().scheduler().scheduleOnce(
                            timeout.duration(),
                            extraActor,
                            "timeout",
                            context().system().dispatcher(),
                            ActorRef.noSender()
                    );
                }).build();
    }


    // 返回一个aCTOR
    private ActorRef buildExtraActor(ActorRef senderRef, String uri) {

        class MyActor extends AbstractActor {
            public MyActor() {
            }

            @Override
            public Receive createReceive() {
                return ReceiveBuilder.create()
                        
                        .matchEquals(String.class, x ->
                                //if we get timeout, then fail
                                x.equals("timeout"), x -> {
                            senderRef.tell(
                                    new Status.Failure(
                                            new TimeoutException("timeout! ")
                                    ),
                                    self()
                            );
                            context().stop(self());
                        })

                        .match(HttpResponse.class, httpResponse -> {
                            artcileParseActor.tell(
                                    new ParseArticle(uri),
                                    self()
                            );
                        })

                        .match(String.class, body -> {
                            //The cache response will come back before
                            //the HTTP response so we never parse in this case.
                            senderRef.tell(body, self());
                            context().stop(self());
                        })

                        .match(ParseArticle.class, articleBody -> {
                            cacheActor.tell(
                                    new SetRequest(articleBody.getUrl(), "收到了消息"),
                                    self()
                            );
                            senderRef.tell("收到了消息", self());
                            context().stop(self());
                        })


                        .build();
            }
        }
        
        return context().actorOf(Props.create(MyActor.class,()->new MyActor()));
    }
}


1.4 Forward消息模式

  • Tell在语义上是用于将一条消息发送至另一个Actor,并将响应地址设置为当前的Actor。而Forward和邮件转发非常类似:初始发送者保持不变,只不过新增了一个收件人。
  • 在使用tell时,我们指定了一个响应地址,或是将响应地址隐式设为发送消息的Actor。而使用forward传递消息时,响应地址就是原始消息的发送者
  • 有时候我们需要将接受到的消息传递给另一个Actor来处理,而最终的处理结果需要传回给初始发起请求的一方。

epub_22651331_34.jpg

          //Java
          actor.forward(result, getContext());

          //Scala context信息是隐式传入的。
          actor forward message

1.4 Pipe消息模式

很多情况下,需要将Actor中的某个Future返回给请求发送者。上文介绍过sender()是一个方法,所以要在Future的回调函数中访问sender(),我们必须存储一个指向sender()的引用:

          //Java
          final ActorRef senderRef = sender();
          future.map(x -> {senderRef.tell(x, ActorRef.noSender())});

          //Scala
          val senderRef = sender();
          future.map(x => senderRef ! x);(原文为future.map(x => senderRef ! ActorRef.noSender),有误。)
          //Java
          pipe(future, system.dispatcher()).to(sender());

          //Scala
          future pipeTo sender()
          pipe(future) to sender()

pipe接受Future的结果作为参数,然后将其传递给所提供的Actor引用。在上面的例子中,因为sender()执行在当前线程上,所以我们可以直接调用sender(),而不用干一些奇怪的事情(比如把sender()引用存储在一个变量中),执行结果一切正确。这就好多了!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/76341.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【多线程(六)】并发工具类的基本使用、ConcurrentHashMap1.7版本及1.8版本底层原理分析

文章目录6.并发工具类6.1 并发工具类-Hashtable6.2 并发工具类-ConcurrentHashMap基本使用6.3 并发工具类-ConcurrentHashMap1.7原理6.4 并发工具类-ConcurrentHashMap1.8原理6.5 并发工具类-CountDownLatch6.6并发工具类-Semaphore总结6.并发工具类 6.1 并发工具类-Hashtable…

一文看懂MySQL中order by排序语句的原理

order by 是怎么工作的? 表定义 CREATE TABLE t1 ( id int(11) NOT NULL, city varchar(16) NOT NULL, name varchar(16) NOT NULL, age int(11) NOT NULL, addr varchar(128) DEFAULT NULL, PRIMARY KEY (id), KEY city (city)) ENGINEInnoDB;SQL语句可以…

零基础入门JavaWeb——Vue的生命周期

一、概念 在编程领域,生命周期是一个很常见的概念。一个对象从创建、初始化、工作、释放、清理和销毁,会经历很多环节的演变。 二、Vue对象的生命周期 三、生命周期钩子函数 Vue允许在特定的生命周期环节中通过钩子函数加入我们的代码。 3.1 示例代码…

基于双向LSTM模型进行电力需求预测(Matlab代码实现)

💥💥💥💞💞💞欢迎来到本博客❤️❤️❤️💥💥💥 🎉作者研究:🏅🏅🏅主要研究方向是电力系统和智能算法、机器学…

尚硅谷笔记——求和案例纯react版、redux精简版

家人们天气冷啦注意保暖呀,不要像我一样因为冷而不想起床学习,冬日里也不能放弃训练 看了两遍尚硅谷的redux课程,把reduc案例代码重新敲了一次为了加深印象还是写个播客把,强烈推荐大家看尚硅谷课太细致啦 redux 是什么&#x…

即将到来的2023,国内元宇宙开始“割”企业了?

元宇宙爆火一年后,UTONMOS即将成为全球化全部实现ERC-721协议NFT链上垂直游戏价值生态的系统平台,旨在通过利用自身所拥有的各类头部资源和游戏化打造内容层的融合,建立一个元气满满的元宇宙Web3.0平台。 通过数字藏品技术的应用&#xff0c…

Flask框架

Flask一 前言二 快速使用三 内置配置变量四 配置文件的写法五 路由六 cbv写法6.1 快速使用6.2 cbv加装饰器6.3 as_view的执行流程6.4 as_view的name参数6.5 继承View写cbv七 模板语法7.1 渲染变量7.2 变量的循环7.3 逻辑判断一 前言 Flask是一个基于Python开发并且依赖jinja2模…

Fluent中模型设置和数据的复用

1 背景 在实际工程中,必然存在利用仿真比较各类设计方案优劣的场景。 对于复杂模型,逐个设置各个设计方案的仿真模型并从头开始计算结果,既易错也耗时。因此需要通过模型设置和数据的复用,达到防错和提高工作效率。 2 模型设置复…

基于Docker做MySQL主从搭建与Django的读写分离

目录 基于Docker做MySQL主从搭建 django读写分离 基于Docker做MySQL主从搭建 主从的作用:写数据数据时使用主库,从库只用来读数据,这样做能够减少数据库压力,主从搭建可以一主一从,也可以是一主多从。 mysql主从配…

肝2022世界杯,怒写企业级镜像私仓Docker+Harbor实践

2022-12-09 揭幕2022卡塔尔世界杯4强角逐的第一天,越来越精彩了 同时记录程序猿的成长~ 1.背景 由于期望搭建一个企业级CICD的环境,开始尝试常规的gitlabjenkinsk8sdocker harborspringboot开始练手 其中版本如下: 1.gitlab: GitLab Com…

天权信安catf1ag网络安全联合公开赛---wp

文章目录misc简单隐写十位马WebhistoryCrypto疑惑ezrsapasswdre遗失的物品misc 简单隐写 丢进kali binwalk 分离一下 得到一个加密的压缩包 内含flag.txt 使用jphs无密码得到一个txt 得到password:catf1agcatf1agcatf1ag 解压压缩包得到一串字符串 dbug1bh{KQit_x1o_Z0v_…

threejs官方demo学习(2):相机

webgl_camera 不知道是哪里写的有问题,最终的效果,跟官方案例有比较大的差距。不过可以学到的知识点挺多的。 知识点 CameraHelper 相机辅助对象,用于模拟相机视锥体 // 创建透视相机 cameraPerspective new THREE.PerspectiveCamera(5…

二叉树路径和(c#)

问题描述 给定一个二叉树的根和一个整数值,如果二叉树中有根节点到叶子节点的路径上节点值的和等于给定的整数值,则返回真,否则返回假。 叶子节点:没有孩子的节点。 示例 示例1 Input: root [5,4,8,11,null,13,4,7,2,null,null,null,1], t…

两个List<Integer>在相同的值比较返回值为false的问题解析

写在前面:今天刷LeetCode的时候发现一个测试用例始终过不去&#xff0c;代码出问题处大概表述如下: List<Integer> a new ArrayList<>(); a.add(300); List<Integer> b new ArrayList<>(); b.add(300); if(a.get(0) b.get(0)){ 代码块B } else{ 代…

[生成 pdf 详解]

目录 前言: pom需要的依赖: 测试类: 效果: 生成表格PDF: 其他复杂的格式就去研究那个 如何生成吧 测试类代码: 前言: 摸鱼来的 pom需要的依赖: <dependency><groupId>com.itextpdf</groupId><artifactId>itextpdf</artifactId><vers…

【计算机毕业设计】76.垃圾分类系统源码

一、系统截图&#xff08;需要演示视频可以私聊&#xff09; 摘 要 随着现在网络的快速发展&#xff0c;网上管理系统也逐渐快速发展起来&#xff0c;网上管理模式很快融入到了许多国有企业的之中&#xff0c;随之就产生了“垃圾分类系统”&#xff0c;这样就让垃圾分类系统更…

web前端大作业:旅游网页主题网站设计——武汉旅游网页设计(11页)HTML+CSS+JavaScript

&#x1f468;‍&#x1f393;学生HTML静态网页基础水平制作&#x1f469;‍&#x1f393;&#xff0c;页面排版干净简洁。使用HTMLCSS页面布局设计,web大学生网页设计作业源码&#xff0c;这是一个不错的旅游网页制作&#xff0c;画面精明&#xff0c;排版整洁&#xff0c;内容…

AWS CodeCommit SSH公钥配置

在本地计算机上的终端中&#xff0c;运行 ssh-keygen 命令&#xff0c;并按照说明将文件保存到您的配置文件的 .ssh 目录中。 其中windows用户可以使用git bash。 这会生成&#xff1a; codecommit_rsa 文件&#xff0c;该文件为私有密钥文件。 codecommit_rsa.pub 文件&#…

Android 搜索匹配的文字之后显示成红色

先简单看一下效果&#xff1a; 实现的主要代码&#xff1a; /*** * param color 需要提示的演示* param txt 字符串信息* param keyword 搜索的关键字* return*/ private SpannableString matchSearchText(int color, String txt, String keyword) {SpannableString spannableS…

基于springboot vue前后端分离的赛事疫情管理系统源码

开发工具&#xff1a;idea (eclipse) 环境&#xff1a;jdk1.8 mysql5.7&#xff0c; navcat 演示视频&#xff1a; 【java毕业设计】基于springboot vue前后端分离的赛事疫情管理系统源码许多年以前&#xff0c;人们在对数据进行统计和记录时候&#xff0c;使用的是纸和笔&…