目录
- 一 默认邮箱配置
- 二 内置邮箱
- 三 自定义邮箱
- 四 配置邮箱
- 五 RequiresMessageQueue接口
Actor中的邮箱是一个队列结构,所有发送过来的消息都会在该队列进行排队,在默认情况下,它遵循先进先出(FIFO)的模式,假如需要改变这种默认处理方式,需要自定义邮箱或消息队列。
一 默认邮箱配置
Akka对邮箱提供了专门的配置项,即默认邮箱配置(default-mailbox),比如邮箱类型(mailbox-type)、邮箱容量(mailbox-capacity)、入队超时时间(mailbox-push-timeout-time)等
akka.actor.default-mailbox {
mailbox-type = "akka.dispatch.UnboundedMailbox"
mailbox-capacity = 1000
mailbox-push-timeout-time = 10s
}
- mailbox-type:邮箱类型,分为有界(Bounded)和无界(Unbounded), Akka默认采用UnboundedMailbox,表示不限制邮箱队列的大小。
- mailbox-capacity:邮箱容量,定义了有界邮箱(BoundedMail)的大小,该值只能是正数。
- mailbox-push-timeout-time:入队超时时间,主要是指push一个消息到有界邮箱的队列的超时时限。假如为负数,则表示无限超时,这可能会带来死锁问题。
二 内置邮箱
在Akka中,邮箱主要分为两大类:Unbounded和Bounded。Unbounded表示无界,即邮箱没有容量上的限制;Bounded表示有界,即邮箱有容量上的限制。
三 自定义邮箱
自定义邮箱消息优先级
import akka.actor.ActorSystem;
import akka.dispatch.PriorityGenerator;
import akka.dispatch.UnboundedStablePriorityMailbox;
import com.typesafe.config.Config;
/**
* @description: 自定义优先级
* @author: shu
* @createDate: 2022/12/23 19:23
* @version: 1.0
*/
class MsgPriorityMailBox extends UnboundedStablePriorityMailbox {
/**
* 返回值越小,优先级越高
* @param settings
* @param config
*/
public MsgPriorityMailBox(ActorSystem.Settings settings, Config config) {
super(new PriorityGenerator() {
@Override
public int gen(Object message) {
if (message.equals("张三")) {
return 0;
}else if(message.equals("李四")) {
return 1;
}else if(message.equals("王五")) {
return 2;
}else {
return 3;
}
}
});
}}
配置
msgprio-mailbox {
mailbox-type = "MsgPriorityMailBox"
}
测试
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
/**
* @description:
* @author: shu
* @createDate: 2022/12/23 14:08
* @version: 1.0
*/
class MsgPriorityActor extends UntypedActor {
@Override
public void onReceive(Object msg) throws Exception {
System.out.println(getSelf()+"--->"+msg+""
+Thread.currentThread().getName());
}
public static void main(String[] args) {
ActorSystem sys=ActorSystem.create("system");
ActorRef ref= sys. actorOf(Props.create(MsgPriorityActor.class).withMailbox("msgprio-mailbox"), "priorityActor");
Object[] messages= {"王五", "李四", "张三", "小二"};
for(Object msg:messages) {
ref.tell(msg, ActorRef.noSender());
}
}
}
我们可以看到我们控制了消息的优先级
四 配置邮箱
代码配置
- withMailbox来关联mailbox
ActorRef ref= sys. actorOf(Props.create(MsgPriorityActor.class).withMailbox("msgprio-mailbox"), "priorityActor");
配置文件
akka.actor.deployment {
/priorityActor {
mailbox = msgprio-mailbox
}}
配置dispatcher邮箱
my-msgprio-dispatcher {
type = Dispatcher
mailbox-type = "MsgPriorityMailBox"
#其他dispatcher配置在此省略
}
sys.actorOf(Props.create(MsgPriorityActor.class).withDispatcher("my-msgprio-dispa
tcher"), "priorityActor")
五 RequiresMessageQueue接口
为了让Actor自动拥有某个特定类型的邮箱,可以让该Actor实现RequiresMessage-Queue接口,并且设置接口泛型为该邮箱队列的语义接口。