在Actor的生命周期中会调用几个方法,我们在需要时可以重写这些方法。
● prestart():在构造函数之后调用。
● postStop():在重启之前调用。
● preRestart(reason, message):默认情况下会调用postStop()。
● postRestart():默认情况下会调用preStart()。
一 生命周期
1.1 基本介绍
package com.shu;
import akka.actor.AbstractActor;
import scala.Option;
import java.util.Optional;
/**
* @description: 生命周期ActorDemo
* @author: shu
* @createDate: 2022/12/10 11:33
* @version: 1.0
*/
public class LifeActorDemo extends AbstractActor {
/**
* 在构造函数之后调用 ,可以完成一些初始化
* @throws Exception
* @throws Exception
*/
@Override
public void preStart() throws Exception, Exception {
super.preStart();
System.out.println("Life 初始化");
}
/**
* 在重启之前调用
* @throws Exception
* @throws Exception
*/
@Override
public void postStop() throws Exception, Exception {
super.postStop();
System.out.println("Life 即将重启");
}
/**
* 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
* @param reason
* @param message
* @throws Exception
* @throws Exception
*/
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
super.preRestart(reason, message);
System.out.println("Life 即将重启 调用preStart初始化");
}
/**
* 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
* @param reason
* @throws Exception
*/
@Override
public void postRestart(Throwable reason) throws Exception, Exception {
super.postRestart(reason);
System.out.println("Life 即将重启 调用postStop方法");
}
/**
* 收到消息
* @return
*/
@Override
public Receive createReceive() {
return null;
}
}
- 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
- 这样我们就能够决定,到底是只在Actor启动或停止的时候调用一次preStart和postStop,还是每次重启一个Actor的时候就调用preStart和postStop。
1.2 自定义监督策略
重写Actor的supervisorStrategy方法
/**
* 可以制定系你的监督策越
* @return
*/
@Override
public SupervisorStrategy supervisorStrategy() {
// super.supervisorStrategy();
return new OneForOneStrategy(2, Duration.create("1 minute"), PartialFunction.empty());
}
1.3 终止或kill一个Actor
有多种不同的方法可以用来停止一个Actor,下面任一方法都可以停止Actor:
● 调用ActorSystem.stop(actorRef);
● 调用ActorContext.stop(actorRef);
● 给Actor发送一条PoisonPill消息,会在Actor完成消息处理后将其停止;
● 给Actor发送一条kill消息,会导致Actor抛出ActorKilledException异常
对比
- 调用context.stop或system.stop会导致Actor立即停止
- 发送PoisonPill消息则会在Actor处理完消息后将其停止
- 不同的是,kill不会马上直接停止Actor,而是会导致Actor抛出一个ActorKilledException,
1.4 生命周期监控和DeathWatch
- 监督机制描述了如何对子Actor的状态进行响应。
- 而Actor也可以对其他任何Actor进行监督。
- 通过调用context.watch(actorRef)注册后,Actor就能够监控另一个Actor的终止,而调用context.unwatch(actorRef)就可以取消监控注册。
- 如果被监控的Actor停止了,负责监控的Actor就会收到一条Terminated(ActorRef)消息。
1.5 状态
我们已经介绍过,Actor能够安全地存储状态,它允许我们使用无锁的方式并发处理状态,现在我们就来介绍Actor如何通过不同的状态来改变它的行为。
1.5.1 在状态之间暂存消息(stash)
- Akka提供了一种叫做stash的机制来支持这一功能。stash消息会把消息暂存到一个独立的队列中,该队列中存储目前无法处理的消息
- unstash则把消息从暂存队列中取出,放回邮箱队列中,Actor就能继续处理这些消息了。
- 在我们实际开发中,比如终端不在线,需要上线后执行一些操作,我就可以用这个机制来解决这个问题,实际上就是把消息缓存到一个队列中,但是如果缓存过多会造成内存泄漏,邮箱拥挤。
if(cantHandleMessage) {
// 缓存消息
stash();
} else {
// 处理消息
handleMessage(message);
// 取出消息
unstash()
}
要注意的是,虽然stash()和unstash()在希望快速改变状态的时候使用起来非常方便,但是stash消息的状态一定要和某个时间限制进行绑定,否则就有可能填满邮箱。
案例
private Boolean online = false;
public PartialFunction receive() {
return RecieveBuilder
.match(GetRequest.class, x -> {
if(online) {
processMessage(x);
} else {
stash();
}
})
.match(Connected.class, x -> {
online = true;
unstash();
)
.match(Disconnected.class, x -> online = false)
.build();
1.5.2 热交换(Hotswap):Become/Unbecome
Akka提供了become()和unbecome(),用于管理不同的行为,这一用法可以大大改善代码的可读性。在Actor的context()中,有两个方法:
● become(PartialFunction behavior):这个方法将receive块中定义的行为修改为一个新的PartialFunction。
● unbecome():这个方法将Actor的行为修改回默认行为。
public PartialFunction receive() {
return RecieveBuilder
.match(GetRequest.class, x -> stash())
.match(Connected.class, x -> {
context().become(online);
unstash();
})
.build();
}
final private PartialFunction<Object, BoxedUnit> online(
final ActorRef another) {
return RecieveBuilder
.match(GetRequest.class, x -> processMessage(x))
.build();
}
每个状态的行为都定义在自己独立的PartialFunction中,在PartialFunction中,使用模式匹配来定义不同的行为。这样我们就能够互不影响地阅读Actor中不同状态的行为。
1.5.3 有限自动机(Finite State Machine, FSM)
和热交换很相似的是,FSM中也有状态以及基于状态的行为变化,跟热交换比起来,FSM是一个更重量级的抽象概念,需要更多的代码和类型才能够实现并运行。所以通常来说,热交换是一个更简单、可读性更高的选择。
when(DISCONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay())
.event(GetRequest.class, (msg, container) -> {
container.add(msg);
return stay();
})
.event(Tcp.Connected.class, (msg, container) -> {
if(container.getFirst() == null) {
return goTo(CONNECTED);
} else {
return goTo(CONNECTED_AND_PENDING);
}
}));
when(CONNECTED,
matchEvent(FlushMsg.class, (msg, container) -> stay()) {
.event(GetRequest.class, (msg, container) -> {
container.add(msg);
return goTo(CONNECTED_AND_PENDING);
}));
when(CONNECTED_AND_PENDING,
matchEvent(FlushMsg.class, (msg, container) -> {
container = new EventQueue();
return stay();
})
.event(GetRequest.class, (msg, container) -> {
container.add(msg);
return goTo(CONNECTED_AND_PENDING);
}));
scala.PartialFunction pf = ReceiveBuilder.match(String.class,
x -> System.out.println(x)).build();
when(CONNECTED, pf);
1.7 案例
结构图
纯属虚构,方便自己理解上面的知识(Java代码实现)
失败生命周期的处理
● prestart():在构造函数之后调用。
● postStop():在重启之前调用。
● preRestart(reason, message):默认情况下会调用postStop()。
● postRestart():默认情况下会调用preStart()。
我们可以发现当Actor内部发生了错误,他并不是终止了程序而是重新启动。
自定义监督策越
/**
* 自定义监督策越
*/
private static SupervisorStrategy strategy = new OneForOneStrategy(
10,
Duration.ofMinutes(1),
DeciderBuilder
.match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
.match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
.match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
.matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
.build());
/**
* 自定义策越
* @return
*/
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
10和Duration.create(1, TimeUnit.MINUTES)分别传递给maxNrOfRetries和withinTimeRange参数,这意味着策略每分钟重新启动一个子级最多10次。如果在withinTimeRange持续时间内重新启动计数超过maxNrOfRetries,则子 Actor 将停止。
基本效果图
- 服务端
- 上线
- 请求数据
- 下线
- 客服端
关键代码
- 服务端
package com.shu.terminal;
import akka.actor.*;
import akka.io.Tcp;
import akka.japi.Function;
import akka.japi.pf.DeciderBuilder;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import com.shu.meter.MeterDemoActor;
import pojo.Login;
import pojo.Logout;
import pojo.Meter;
import pojo.MeterRequest;
import scala.Option;
import scala.PartialFunction;
import scala.concurrent.Await;
import scala.concurrent.Future;
import static scala.compat.java8.FutureConverters.toJava;
import java.time.Duration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
/**
* @description:
* @author: shu
* @createDate: 2022/12/10 14:52
* @version: 1.0
*/
public class TerminalDemoActor extends AbstractActor {
/**
* 自定义监督策越
*/
private static SupervisorStrategy strategy = new OneForOneStrategy(
10,
Duration.ofMinutes(1),
DeciderBuilder
.match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
.match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
.match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
.matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
.build());
/**
* 在线状态
*/
private Boolean Online;
/**
* 在构造函数之后调用 ,可以完成一些初始化
*
* @throws Exception
* @throws Exception
*/
@Override
public void preStart() throws Exception, Exception {
super.preStart();
System.out.println("Life 初始化");
}
/**
* 在重启之前调用
*
* @throws Exception
* @throws Exception
*/
@Override
public void postStop() throws Exception, Exception {
super.postStop();
System.out.println("Life 即将重启");
}
/**
* 要注意的是preRestart和postRestart只在重启的时候才会被调用。它们默认调用了preStart和postStop,但是调用它们的时候就不再直接调用preStart和postStop了。
*
* @param reason
* @param message
* @throws Exception
* @throws Exception
*/
@Override
public void preRestart(Throwable reason, Option<Object> message) throws Exception, Exception {
super.preRestart(reason, message);
System.out.println("Life 即将重启 调用preStart初始化");
}
/**
* 自定义策越
* @return
*/
@Override
public SupervisorStrategy supervisorStrategy() {
return strategy;
}
/**
* 收到小消息
*
* @return
*/
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
// 连接成功
.match(Login.class, x -> {
// 在线状态改变
setOnline(true);
// 回应消息,登录成功
sender().tell(1001, self());
System.out.println("收到登录请求");
})
// 连接成功
.match(Logout.class, x -> {
// 在线状态改变
setOnline(false);
// 回应消息,登录成功
sender().tell(1002, self());
System.out.println("收到注销请求");
})
// 请求数据
.match(MeterRequest.class, msg -> {
// 在线
if (getOnline()) {
// 获取消息
Future sFuture = new AskableActorRef(context().actorOf(Props.create(MeterDemoActor.class))).ask(msg,Timeout.apply(1000,TimeUnit.SECONDS) );
CompletionStage<Meter> cs = toJava(sFuture);
CompletableFuture<Meter> future = (CompletableFuture<Meter>) cs;
// 消息发送给客服端
if (future.get() != null) {
sender().tell(future.get(), self());
}
}
})
// 未找到消息
.matchAny(o ->
sender().tell(new Status.Failure(new ClassNotFoundException()), self())
)
.build();
}
public Boolean getOnline() {
return Online;
}
public void setOnline(Boolean online) {
Online = online;
}
}
- 客服端
package client;
import akka.actor.ActorSelection;
import akka.actor.ActorSystem;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import pojo.*;
import java.util.Date;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.TimeUnit;
import static scala.compat.java8.FutureConverters.toJava;
/**
* @description:
* @author: shu
* @createDate: 2022/12/10 18:22
* @version: 1.0
*/
public class TerminalClient {
private final ActorSystem system = ActorSystem.create("LocalSystem");
private final ActorSelection remoteTerminal;
public TerminalClient(String remoteAddress) {
remoteTerminal = system.actorSelection("akka.tcp://terminal@" +
remoteAddress+ "/user/terminal-server");
}
/**
* 获取消息
* @param key
* @param value
* @return
*/
public CompletionStage getMeterInfo(String key, int value) {
System.out.println(remoteTerminal);
return toJava(new AskableActorSelection(remoteTerminal).ask(new MeterRequest(key, value), Timeout.apply(5000, TimeUnit.SECONDS)));
}
/**
* 上线
* @return
*/
public CompletionStage sendLogin() {
System.out.println(remoteTerminal);
return toJava(new AskableActorSelection(remoteTerminal).ask(new Login("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
}
/**
* 下线
* @return
*/
public CompletionStage sendLogout() {
System.out.println(remoteTerminal);
return toJava(new AskableActorSelection(remoteTerminal).ask(new Logout("1001", new Date()), Timeout.apply(5000, TimeUnit.SECONDS)));
}
}
具体案例代码:https://github.com/Eason-shu/Akka
demo03 ,demo04