Akka 学习(七)Actor的生命周期

news2025/1/17 22:02:58

在Actor的生命周期中会调用几个方法,我们在需要时可以重写这些方法。
● prestart():在构造函数之后调用。
● postStop():在重启之前调用。
● preRestart(reason, message):默认情况下会调用postStop()。
● postRestart():默认情况下会调用preStart()。

一 生命周期

1.1 基本介绍

package com.shu;

import akka.actor.AbstractActor;
import scala.Option;

import java.util.Optional;

/**
 * @description: 生命周期ActorDemo
 * @author: shu
 * @createDate: 2022/12/10 11:33
 * @version: 1.0
 */
public class LifeActorDemo extends AbstractActor {

    /**
     * 在构造函数之后调用 ,可以完成一些初始化
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        System.out.println("Life 初始化");
    }

    /**
     * 在重启之前调用
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void postStop() throws Exception, Exception {
        super.postStop();
        System.out.println("Life 即将重启");
    }
    


    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     * @param reason
     * @param message
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
        super.preRestart(reason, message);
        System.out.println("Life 即将重启 调用preStart初始化");
    }



    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     * @param reason
     * @throws Exception
     */
    @Override
    public void postRestart(Throwable reason) throws Exception, Exception {
        super.postRestart(reason);
        System.out.println("Life 即将重启 调用postStop方法");
    }

    /**
     * 收到消息
     * @return
     */
    @Override
    public Receive createReceive() {
        return null;
    }
}

  • 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
  • 这样我们就能够决定,到底是只在Actor启动或停止的时候调用一次preStart和postStop,还是每次重启一个Actor的时候就调用preStart和postStop。

epub_22651331_38.jpg

1.2 自定义监督策略

重写Actor的supervisorStrategy方法

    /**
     * 可以制定系你的监督策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
//         super.supervisorStrategy();
         return new OneForOneStrategy(2, Duration.create("1 minute"), PartialFunction.empty());
    }

1.3 终止或kill一个Actor

有多种不同的方法可以用来停止一个Actor,下面任一方法都可以停止Actor:
● 调用ActorSystem.stop(actorRef);
● 调用ActorContext.stop(actorRef);
● 给Actor发送一条PoisonPill消息,会在Actor完成消息处理后将其停止;
● 给Actor发送一条kill消息,会导致Actor抛出ActorKilledException异常

对比

  1. 调用context.stop或system.stop会导致Actor立即停止
  2. 发送PoisonPill消息则会在Actor处理完消息后将其停止
  3. 不同的是,kill不会马上直接停止Actor,而是会导致Actor抛出一个ActorKilledException,

1.4 生命周期监控和DeathWatch

  • 监督机制描述了如何对子Actor的状态进行响应。
  • 而Actor也可以对其他任何Actor进行监督。
  • 通过调用context.watch(actorRef)注册后,Actor就能够监控另一个Actor的终止,而调用context.unwatch(actorRef)就可以取消监控注册。
  • 如果被监控的Actor停止了,负责监控的Actor就会收到一条Terminated(ActorRef)消息。

1.5 状态

我们已经介绍过,Actor能够安全地存储状态,它允许我们使用无锁的方式并发处理状态,现在我们就来介绍Actor如何通过不同的状态来改变它的行为。

1.5.1 在状态之间暂存消息(stash)

  • Akka提供了一种叫做stash的机制来支持这一功能。stash消息会把消息暂存到一个独立的队列中,该队列中存储目前无法处理的消息
  • unstash则把消息从暂存队列中取出,放回邮箱队列中,Actor就能继续处理这些消息了。
  • 在我们实际开发中,比如终端不在线,需要上线后执行一些操作,我就可以用这个机制来解决这个问题,实际上就是把消息缓存到一个队列中,但是如果缓存过多会造成内存泄漏,邮箱拥挤。
        if(cantHandleMessage) {
            // 缓存消息
            stash();
        } else {
            // 处理消息
            handleMessage(message);
            // 取出消息
            unstash()
        } 

要注意的是,虽然stash()和unstash()在希望快速改变状态的时候使用起来非常方便,但是stash消息的状态一定要和某个时间限制进行绑定,否则就有可能填满邮箱。

案例

        private Boolean online = false;
        public PartialFunction receive() {
            return RecieveBuilder
                .match(GetRequest.class, x -> {
                    if(online) {
                          processMessage(x);
                    } else {
                        stash();
                    }
                })
                .match(Connected.class, x -> {
                    online = true;
                    unstash();
                )
                .match(Disconnected.class, x -> online = false)
                .build();

1.5.2 热交换(Hotswap):Become/Unbecome

Akka提供了become()和unbecome(),用于管理不同的行为,这一用法可以大大改善代码的可读性。在Actor的context()中,有两个方法:
● become(PartialFunction behavior):这个方法将receive块中定义的行为修改为一个新的PartialFunction。
● unbecome():这个方法将Actor的行为修改回默认行为。

        public PartialFunction receive() {
            return RecieveBuilder
                .match(GetRequest.class, x -> stash())
                .match(Connected.class, x -> {
                    context().become(online);
                    unstash();
                  })
                  .build();
        }
        final private PartialFunction<Object, BoxedUnit> online(
            final ActorRef another) {
                return RecieveBuilder
                    .match(GetRequest.class, x -> processMessage(x))
                    .build();
            }

每个状态的行为都定义在自己独立的PartialFunction中,在PartialFunction中,使用模式匹配来定义不同的行为。这样我们就能够互不影响地阅读Actor中不同状态的行为。

1.5.3 有限自动机(Finite State Machine, FSM)

和热交换很相似的是,FSM中也有状态以及基于状态的行为变化,跟热交换比起来,FSM是一个更重量级的抽象概念,需要更多的代码和类型才能够实现并运行。所以通常来说,热交换是一个更简单、可读性更高的选择。

        when(DISCONNECTED,
            matchEvent(FlushMsg.class, (msg, container) -> stay())
                .event(GetRequest.class, (msg, container) -> {
                    container.add(msg);
                    return stay();
                })
                .event(Tcp.Connected.class, (msg, container) -> {
                    if(container.getFirst() == null) {
                          return goTo(CONNECTED);
                    } else {
                          return goTo(CONNECTED_AND_PENDING);
                    }
                }));

        when(CONNECTED,
            matchEvent(FlushMsg.class, (msg, container) -> stay()) {
                .event(GetRequest.class, (msg, container) -> {
                    container.add(msg);
                    return goTo(CONNECTED_AND_PENDING);
                }));

        when(CONNECTED_AND_PENDING,
            matchEvent(FlushMsg.class, (msg, container) -> {
                container = new EventQueue();
                return stay();
            })
            .event(GetRequest.class, (msg, container) -> {
                container.add(msg);
                return goTo(CONNECTED_AND_PENDING);
            }));
        scala.PartialFunction pf = ReceiveBuilder.match(String.class,
            x -> System.out.println(x)).build();
        when(CONNECTED, pf);

1.7 案例

结构图
纯属虚构,方便自己理解上面的知识(Java代码实现)

失败生命周期的处理

● prestart():在构造函数之后调用。
● postStop():在重启之前调用。
● preRestart(reason, message):默认情况下会调用postStop()。
● postRestart():默认情况下会调用preStart()。
image.png
我们可以发现当Actor内部发生了错误,他并不是终止了程序而是重新启动。

自定义监督策越

    /**
     * 自定义监督策越
     */
    private static SupervisorStrategy strategy = new OneForOneStrategy(
                    10,
                    Duration.ofMinutes(1),
                    DeciderBuilder
                            .match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
                            .match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
                            .match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
                            .matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
                            .build());
    /**
     * 自定义策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }

10和Duration.create(1, TimeUnit.MINUTES)分别传递给maxNrOfRetries和withinTimeRange参数,这意味着策略每分钟重新启动一个子级最多10次。如果在withinTimeRange持续时间内重新启动计数超过maxNrOfRetries,则子 Actor 将停止。

基本效果图

  • 服务端
  1. 上线
  2. 请求数据
  3. 下线

image.png

  • 客服端

image.png

关键代码

  • 服务端
package com.shu.terminal;

import akka.actor.*;
import akka.io.Tcp;
import akka.japi.Function;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import com.shu.meter.MeterDemoActor;
import pojo.Login;
import pojo.Logout;
import pojo.Meter;
import pojo.MeterRequest;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;

import static scala.compat.java8.FutureConverters.toJava;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/10 14:52
 * @version: 1.0
 */
public class TerminalDemoActor extends AbstractActor {

    /**
     * 自定义监督策越
     */
    private static SupervisorStrategy strategy = new OneForOneStrategy(
                    10,
                    Duration.ofMinutes(1),
                    DeciderBuilder
                            .match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
                            .match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
                            .match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
                            .matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
                            .build());


    /**
     * 在线状态
     */
    private Boolean Online;

    /**
     * 在构造函数之后调用 ,可以完成一些初始化
     *
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preStart() throws Exception, Exception {
        super.preStart();
        System.out.println("Life 初始化");
    }

    /**
     * 在重启之前调用
     *
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void postStop() throws Exception, Exception {
        super.postStop();
        System.out.println("Life 即将重启");
    }


    /**
     * 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
     *
     * @param reason
     * @param message
     * @throws Exception
     * @throws Exception
     */
    @Override
    public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
        super.preRestart(reason, message);
        System.out.println("Life 即将重启 调用preStart初始化");
    }


    /**
     * 自定义策越
     * @return
     */
    @Override
    public SupervisorStrategy supervisorStrategy() {
        return strategy;
    }

    /**
     * 收到小消息
     *
     * @return
     */
    @Override
    public Receive createReceive() {
        return ReceiveBuilder.create()
                // 连接成功
                .match(Login.class, x -> {
                    // 在线状态改变
                    setOnline(true);
                    // 回应消息,登录成功
                    sender().tell(1001, self());
                    System.out.println("收到登录请求");
                })
                // 连接成功
                .match(Logout.class, x -> {
                    // 在线状态改变
                    setOnline(false);
                    // 回应消息,登录成功
                    sender().tell(1002, self());
                    System.out.println("收到注销请求");
                })
                // 请求数据
                .match(MeterRequest.class, msg -> {
                    // 在线
                    if (getOnline()) {
                        // 获取消息
                        Future sFuture = new AskableActorRef(context().actorOf(Props.create(MeterDemoActor.class))).ask(msg,Timeout.apply(1000,TimeUnit.SECONDS) );
                        CompletionStage<Meter> cs = toJava(sFuture);
                        CompletableFuture<Meter> future = (CompletableFuture<Meter>) cs;
                        // 消息发送给客服端
                        if (future.get() != null) {
                            sender().tell(future.get(), self());
                        }
                    }
                })
                // 未找到消息
                .matchAny(o ->
                        sender().tell(new Status.Failure(new ClassNotFoundException()), self())
                )
                .build();

    }


    public Boolean getOnline() {
        return Online;
    }

    public void setOnline(Boolean online) {
        Online = online;
    }
}

  • 客服端
package client;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.*;

import java.util.Date;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;

import static scala.compat.java8.FutureConverters.toJava;

/**
 * @description:
 * @author: shu
 * @createDate: 2022/12/10 18:22
 * @version: 1.0
 */
public class TerminalClient {
    private final ActorSystem system = ActorSystem.create("LocalSystem");

    private final ActorSelection remoteTerminal;

    public TerminalClient(String remoteAddress) {
        remoteTerminal = system.actorSelection("akka.tcp://terminal@" +
                remoteAddress+ "/user/terminal-server");
    }


    /**
     * 获取消息
     * @param key
     * @param value
     * @return
     */
    public CompletionStage getMeterInfo(String key, int value) {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new MeterRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
    }


    /**
     * 上线
     * @return
     */
    public CompletionStage sendLogin() {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new Login("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
    }


    /**
     * 下线
     * @return
     */
    public CompletionStage sendLogout() {
        System.out.println(remoteTerminal);
        return toJava(new AskableActorSelection(remoteTerminal).ask(new Logout("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
    }


}

具体案例代码:https://github.com/Eason-shu/Akka
demo03 ,demo04

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/78612.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

D/A转换器

性能指标&#xff1a;转换精度&#xff0c;转换速度 相互之间是矛盾的&#xff0c;精度越高&#xff0c;相比而言速度就会慢一些 权电阻网络D/A转换器 阻值的选取是按照二进制的位权来选择的&#xff0c;所以我们看到了这个结构&#xff0c;和我们刚才分析的是一致的 权电阻网…

基于花授粉算法优化的lssvm回归预测-附代码

基于花授粉算法优化的lssvm回归预测 - 附代码 文章目录基于花授粉算法优化的lssvm回归预测 - 附代码1.数据集2.lssvm模型3.基于花授粉算法优化的LSSVM4.测试结果5.Matlab代码摘要&#xff1a;为了提高最小二乘支持向量机&#xff08;lssvm&#xff09;的回归预测准确率&#xf…

c++11 std::thread和mutex用法

c11 std::thread和mutex用法thread和mutex用法thread简单示例thread构造函数梳理thread关键成员函数mutex使用thread和mutex用法 本文对c 11中的std::thread 和 mutex作简要的使用说明 thread简单示例 #include <iostream> #include <string> #include <thre…

mysql性能监控

一.使用show profile查询剖析工具&#xff0c;查看mysql语句执行时间&#xff1a; 官网&#xff1a;https://dev.mysql.com/doc/refman/8.0/en/show-profile.html mysql -uroot -p //进入数据库服务器 use 数据库名 //进入数据库 set profiling1; //开启profiling参数 select…

Python基础(十二):字典的详细讲解

文章目录 字典的详细讲解 一、字典的应用场景 二、创建字典的语法

具身智能综述和应用(Embodied AI)

什么是具身智能&#xff1f; 目前人工智能的进展&#xff0c;在诸多数据源和数据集&#xff08;Youtube、Flickr、Facebook&#xff09;、机器计算能力&#xff08;CPU、GPU、TPU&#xff09;的加持下&#xff0c;已经在CV、NLP上取得了许多任务&#xff08;如目标检测、语义分…

Python学习----闭包和装饰器

情景&#xff1a; 当我们调用函数的时候&#xff0c;函数调用完成之后&#xff0c;函数内定义的变量都会被销毁&#xff0c;但是我们有时候需要保存函数内的这个变量&#xff0c;每次在这个变量的基础上完成一系列的操作&#xff0c;比如&#xff1a;每次在这个变量的基础上和其…

【全网惟一面向软件测试人员的Python基础教程】- 学Python之前要搞懂的道理

全网惟一面向软件测试人员的Python基础教程 起点&#xff1a;《python软件测试实战宝典》介绍 第一章 为什么软件测试人员要学习Python 第二章 学Python之前要搞懂的道理 文章目录全网惟一面向软件测试人员的Python基础教程计算机的本质是什么&#xff1f;什么是编程呢&#x…

Java泛型的使用和原理

文章目录泛型-概述基础使用泛型类的使用泛型类派生子类泛型接口泛型方法类型通配符类型通配符上限类型通配符下限常用泛型标识符类型擦除使用注意泛型与数组泛型和反射其他泛型-概述 Java 泛型&#xff08;generics&#xff09;是 JDK 5 中引入的一个新特性&#xff0c;泛型提…

第十四届蓝桥杯集训——JavaC组第八篇——进制转换

第十四届蓝桥杯集训——JavaC组第八篇——进制转换 目录 第十四届蓝桥杯集训——JavaC组第八篇——进制转换 短除法 十进制转二进制示例&#xff1a; 十进制转换二进制 十进制转换八进制 十进制转换十六进制 二进制转十进制 八进制转十进制 十六进制转十进制 进制转换…

【✨十五天搞定电工基础】半导体器件

本章要求1. 理解PN结的单向导电性&#xff0c;三极管的电流分配和电流放大作用 2. 了解二极管、稳压管和三极管的基本构造、工作原理和特性曲线&#xff0c;理解主要参数的意义 3. 会分析含有二极管的电路 目录 一、半导体基础知识 1、本征半导体的导电机理 2、杂质半导体 …

部分核心技术(持续更新)

文章目录1.Schedule&#xff08;定时任务&#xff09;2.高并发线程安全的解决方案2.1为什么不适用同步锁&#xff08;Synchronized&#xff09;&#xff1f;2.2 Redis的分布式锁setnx2.3 redisson分布式锁&#xff08;看门狗机制&#xff09;2.3.1 Redis的分布式锁setnx产生的问…

保姆级入门nest笔记

使用 NEXT 搭建后台服务接口 https://docs.nestjs.com/ # 准备工作 安装 node 全局安装 nest npm i -给nestjs/cli nest --version # 创建项目 创建项目next new 启动项目npm run start 或 npm run start:dev 访问接口 localhost:3000 获取命令解释 next g -h # 快速创建…

Pixracer接线图 及电调调参 BLheliSuite

Pixracer接线指南 pixracer官方链接 正反面引脚定义 接口含义 BLheliSuite调参软件 官方下载&#xff1a; https://www.mediafire.com/folder/dx6kfaasyo24l/BLHeliSuite 我使用了如下软件https://www.mediafire.com/file/9uccf1zy3wqb1w5/BLHeliSuite32_32.9.0.3.zip/fil…

Bio-Net:编解码器结构的循环双向连接网络

目录 摘要 方法 循环双向跳跃连接 前向跳跃连接 后向跳跃连接 递归的推断训练 BiO-Net网络结构 总结 摘要 对UNet以前的扩展主要集中对现有模块的改进或者提出新的模块来提高性能。因此这些变量通常会导致模型的复杂性不可忽视的增加。为了解决这种复杂性的问题。在本…

redis cluster 集群安装

redis cluster 集群安装 redis集群方案 哨兵集群 如图&#xff0c;实际上还是一个节点对外提供服务&#xff0c;所以虽然是三台机器&#xff0c;但是还是一台机器的并发量&#xff0c;而且master挂了之后&#xff0c;整个集群不能对外提供服务 cluster集群 多个主从集群节点…

五、伊森商城 前端基础-Vue 整合ElementUI快速开发 p28

目录 一、安装 1、安装ElementUI 2、在main.js文件中引入 2.1、引入ElementUI组件 2.2、让Vue使用ElementUI组件 二、使用 1、在hello.vue组件使用单选框 2、使用ElementUI快速搭建后台管理系统 2.1、修改App.vue 3、修改功能成动态显示 3.1、编写快速生成组件的模板 3…

java计算机毕业设计ssm学习互助平台网站8f554(附源码、数据库)

java计算机毕业设计ssm学习互助平台网站8f554&#xff08;附源码、数据库&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff0…

C++11之引用

文章目录目的为啥要引入右值引用什么是右值引用右值引用作用移动构造函数移动语义 std::move移动语义注意事项完美转发博客目的 了解对应左值引用&#xff0c; 右值引用&#xff0c;移动语义&#xff0c; 完美转发含义。 右值引用&#xff08;及其支持的移动语义Move semanti…

1562_AURIX_TC275_电源监控

全部学习汇总&#xff1a; GreyZhang/g_TC275: happy hacking for TC275! (github.com) 这一次的学习笔记内容比较少&#xff0c;因为有几页的文档内容跟之前看过的DataSheet内容雷同。因此&#xff0c;相应的学习笔记不再整理。 之前的学习笔记&#xff1a; (56条消息) 1451_…