目录
- 一 路由Actor
- 二 Pool方式的方式创建路由
- 三 Group方式创建路由
消息可以通过多种方式送达目的地,比如tell、ask、forward等,这些方式是最常规也是最简单的,但是对于复杂的消息投递逻辑,比如轮询投递、随机投递、广播组等,就需要开发者自己去做一层封装,好在Akka已经提供了丰富的路由组件,可以很好地满足这类需求。在实际项目中,我们通常会使用路由器来做负载均衡和任务分派。
一 路由Actor
路由器可以是一个自包含的Actor,它通常管理着自己的所有Routee,一般来讲,我们会把这类路由配置在*.conf文件中,然后通过编码的方式加载并创建路由器,创建一个路由Actor有两种模式:pool和group。
- pool的方式表示路由器Actor会创建子Actor作为其Routee并对其监督和监控,当某个Routee终止时将其移除出去。
- group的方式表示可以将Routee的生产方式放在外部(不必自包含),然后路由器Actor通过路径(path)对这些目标进行消息发送。
基本使用
- 定义路由Actor
import akka.actor.UntypedActor;
/**
* @description: 路由Actor
* @author: shu
* @createDate: 2022/12/27 13:02
* @version: 1.0
*/
public class RouteeActor extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
System.out.println(getSelf()+"-->"+msg);
}
}
- 定义路由分发Actor
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.ActorRefRoutee;
import akka.routing.RoundRobinRoutingLogic;
import akka.routing.Routee;
import akka.routing.Router;
import java.util.ArrayList;
import java.util.List;
/**
* @description:
* @author: shu
* @createDate: 2022/12/27 13:04
* @version: 1.0
*/
class RouterTaskActor extends UntypedActor {
private Router router;
@Override
public void preStart() throws Exception {
List<Routee> listRoutee=new ArrayList<Routee>();
for(int i=0; i<2; i++) {
ActorRef ref=getContext().actorOf(Props.create(RouteeActor.class), "routeeActor"+i);
listRoutee.add(new ActorRefRoutee(ref));
}
router=new Router(new RoundRobinRoutingLogic(), listRoutee);
}
@Override
public void onReceive(Object msg) throws Exception {
router.route(msg, getSender());
}
public static void main(String[] args) {
ActorSystem system=ActorSystem.create("sys");
ActorRef routerActor=system.actorOf(Props.create(RouterTaskActor.class), "routerTaskActor");
routerActor.tell("helloA", ActorRef.noSender());
routerActor.tell("helloB", ActorRef.noSender());
routerActor.tell("helloC", ActorRef.noSender());
}
}
- 测试
我们可以发现消息经过了路由的分发,传递了不同的路由Actor,下是不同的路由类型
二 Pool方式的方式创建路由
使用pool方式定义的路由Actor会自动将Routee创建为自己的子级,这种层级关系在最开始就自动存在,不必通过getContext.actorOf的方式来指定。
对于路由功能来讲,我们最好有一个消息中转的过程,即不会直接通过路由器来发送消息,而是先经过一层中间转发的过程,这样有利于构建更加清晰的管理结构,所以这里我们首先定义一个Actor,作为消息中转处理器。
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import akka.routing.RoundRobinPool;
/**
* @description:
* @author: shu
* @createDate: 2022/12/27 14:29
* @version: 1.0
*/
class MasterRouterActor extends UntypedActor {
ActorRef router = null;
@Override
public void preStart() throws Exception {
router = getContext().actorOf(
new RoundRobinPool(3).props(Props.create(TaskActor.class)),
"taskActor");
System.out.println("router:"+router);
}
@Override
public void onReceive(Object msg) throws Exception {
router.tell(msg, getSender());
}
public static void main(String[] args) {
ActorSystem system=ActorSystem.create("sys");
ActorRef routerActor=system.actorOf(Props.create(MasterRouterActor.class), "routerTaskActor");
routerActor.tell("helloA", ActorRef.noSender());
routerActor.tell("helloB", ActorRef.noSender());
routerActor.tell("helloC", ActorRef.noSender());
}
}
上面写的代码式路由Actor,下面介绍一下配置是代码
akka.actor.deployment {
/masterRouterActor/taskActor {
router = round-robin-pool
nr-of-instances = 3
}
}
router = getContext().actorOf(
FromConfig.getInstance().props(Props.create(TaskActor.class)), "taskActor");
到这里,大家可能会有个疑问,假如此时Routee回复一个消息会怎样呢?到底该谁接收?
实际上,路由和forward一样,在整个消息转发的过程中并不会改变原始sender,所以消息会被回复给最初的sender。并且,在回复消息时可以让父Actor(路由Actor)成为自己的sender,这样在某种程度上可以隐藏自己的相关信息。对于pool方式来讲,另外一个需要注意的是:由于父监督原则,路由Actor承担着Routee的监督工作,当没有显式指定监督策略时,路由Actor默认会把失败上溯到上级。当路由Actor重启时,会重新创建Routee(子级Actor),并且在池中维护相同个数(nr-of-instances)的actor;当所有的Routee被终止时,路由Actor也会停止(watch它的生命周期,就可以收到它的Terminated消息)。
自定义监督策略
import akka.actor.*;
import akka.japi.Function;
import akka.japi.pf.DeciderBuilder;
import akka.routing.RoundRobinPool;
import scala.concurrent.duration.Duration;
/**
* @description:
* @author: shu
* @createDate: 2022/12/27 14:29
* @version: 1.0
*/
class MasterRouterActor extends UntypedActor {
ActorRef router = null;
/**
* 监督策越
*/
SupervisorStrategy strategy = new OneForOneStrategy(3,
Duration. create( "1 minute"), DeciderBuilder
.match(ArithmeticException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.resume())
.match(NullPointerException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.restart())
.match(IllegalArgumentException.class, e -> (SupervisorStrategy.Directive) SupervisorStrategy.stop())
.matchAny(o -> (SupervisorStrategy.Directive) SupervisorStrategy.escalate())
.build());
@Override
public void preStart() throws Exception {
router = getContext().actorOf(
new RoundRobinPool(3).withSupervisorStrategy(strategy ).props(Props.create(TaskActor.class)),
"taskActor");
System.out.println("router:"+router);
}
@Override
public void onReceive(Object msg) throws Exception {
router.tell(msg, getSender());
}
public static void main(String[] args) {
ActorSystem system=ActorSystem.create("sys");
ActorRef routerActor=system.actorOf(Props.create(MasterRouterActor.class), "routerTaskActor");
routerActor.tell("helloA", ActorRef.noSender());
routerActor.tell("helloB", ActorRef.noSender());
routerActor.tell("helloC", ActorRef.noSender());
}
}
三 Group方式创建路由
我们需要单独(外部)创建Routee时,可以采用group方式。在使用group路由前,需要先定义Routee-Actor,然后将它们以path的形式配置起来,路由Actor会通过path列表来进行路由。
//定义Routee
getContext().actorOf(Props.create(WorkTask.class), "wt1");
getContext().actorOf(Props.create(WorkTask.class), "wt2");
getContext().actorOf(Props.create(WorkTask.class), "wt3");
router=getContext().actorOf(FromConfig.getInstance().props(), "router");
其中/masterActor/router表示路由器的path, routees.paths用来配置Routee的path,并且本地和远程Actor都是支持的。假如希望这些path能更加动态地产生(或者依赖其他业务逻辑产生),可以使用编码的方式来实现
List<String> routeePaths = Arrays.asList("/user/masterActor/wt1", "/user/
masterActor/wt2", "/user/masterActor/wt3");
router=getContext().actorOf(new RoundRobinGroup(routeePaths).props(), "router");
还有一点要注意的就是:group方式并不限制Routee一定得在同一个层级下,比如当你想增加一个其他外部的Actor作为Routee时,仅仅需要将它的path配置在routees.paths中即可。