目录
- 一 编发编程
- 二 Actor路由
- 2.1 路由的作用
- 2.2 路由的创建方式
- 2.3 路由策略
- 2.4 广播消息
- 2.5 监督路由对象
- 2.6 Akka 案例
- 三 Dispatcher 任务分发
- 3.1 什么是Dispatcher?
- 3.2 Dispatcher的线程池
- 3.3 Dispatcher的分类
一 编发编程
Akka 是一个用于实现分布式、并发、响应式应用程序的工具包和运行时环境。它是基于 Actor 模型的,并使用 Scala 语言实现,但也可以与 Java 一起使用。Actor 模型是一种软件架构,它将应用程序中的对象分配给演员,每个演员都是一个并发实体,并独立地执行它的任务。这样做的好处是,你可以更轻松地实现并发和分布式应用程序,并且不必担心多线程编程中的复杂性和错误。
Akka中提供了两种可以用来进行多核并行编程的抽象:Future和Actor。
实际上,要决定到底使用Actor还是Future其实并不简单。我曾经听别人说过一个通用准则:“Future用于并发,Actor用于状态。”
其实在请前面的案例中我们已经接触到了这两种方式:
Future
Akka Future 是一种用于表示异步计算结果的对象。它与 Java 的 Future 类似,但有一些不同之处。
在 Akka 中,Future 由一个 Actor 创建,并由另一个 Actor 消费。当创建者完成计算并设置了结果时,消费者会收到通知。这样,消费者就可以在不阻塞的情况下等待计算结果。
// 获取消息
Future sFuture = new AskableActorRef(context().actorOf(Props.create(MeterDemoActor.class))).ask(msg,Timeout.apply(1000,TimeUnit.SECONDS) );
CompletionStage<Meter> cs = toJava(sFuture);
CompletableFuture<Meter> future = (CompletableFuture<Meter>) cs;
// 消息发送给客服端
if (future.get() != null) {
sender().tell(future.get(), self());
}
- Actor
public Receive createReceive() {
return ReceiveBuilder.create()
.match(MeterRequest.class, x->{
System.out.println("收到电表请求消息");
sender().tell(new Meter("1001","测试"), self());
})
.matchEquals(String.class, System.out::println)
// 未找到消息
.matchAny(o ->
sender().tell(new Status.Failure(new ClassNotFoundException()), self())
)
.build();
}
二 Actor路由
2.1 路由的作用
- 在创建了Router之后,当Router接收到消息时,就会将消息传递给Group/Pool中的一个或多个Actor
- 有多种策略可以用来决定Router选择下一个消息发送对象的顺序。在我们的例子中,所有的Actor都运行在本地,我们需要一个包含多个Actor的Router来支持使用多个CPU核进行并行运算。
- 如果Actor运行在远程机器上,也可以使用Router在服务器集群上分发工作
2.2 路由的创建方式
在Akka中,Router是一个用于负载均衡和路由的抽象,创建Router时,必须要传入一个Actor Group,或者由Router来创建一个Actor Pool。
Actor Pool 方式创建路由
// 创建路由方式一
ActorRef workerRouter = context()
.actorOf(Props.create(MeterDemoActor.class)
.withRouter(new RoundRobinPool(8)));
在这种情况下使用Router非常简单:照常实例化一个Actor,然后调用withRouter,并传入一个路由策略,以及希望Pool中包含的Actor数量。
Actor Group 方式创建路由
// 创建方式二
ActorRef router = context().actorOf(new RoundRobinGroup(actors.map(actor -> actor.path()).props());
2.3 路由策略
2.4 广播消息
无论是使用Group还是Pool的形式来创建Router,都可以通过广播,将一条消息发送给所有Actor。
router.tell(new akka.routing.Broadcast(msg));
router ! akka.routing.Broadcast(msg)
2.5 监督路由对象
如果使用Pool的方式创建Router,由Router负责创建Actor,那么这些路由对象会成为Router的子节点。创建Router时,可以给Router提供一个自定义的监督策略。
ActorRefworkerRouter = system.actorOf(Props.create(ArticleParseActor.class).withRouter(new RoundRobinPool(8).withSupervisorStrategy(strategy)));
2.6 Akka 案例
路由使用
package com.shu;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.routing.RoundRobinPool;
import pojo.Meter;
/**
* @description:
* @author: shu
* @createDate: 2022/12/11 17:25
* @version: 1.0
*/
public class RouterExample {
public static void main(String[] args) {
ActorSystem system = ActorSystem.create("RouterExample");
// 创建路由器
ActorRef router = system.actorOf(new RoundRobinPool(5).props(Props.create(Meter.class)));
// 向路由器发送消息
for (int i = 0; i < 10; i++) {
router.tell(new Meter(), ActorRef.noSender());
}
}
}
在上面的例子中,我们创建了一个名为“RouterExample”的 ActorSystem,然后创建了一个路由器。路由器使用了 RoundRobinPool 算法,它能够将消息轮流发送到不同的 actor,最后,我们向路由器发送了 10 个消息,路由器会将这些消息分拣到不同的 actor,并由这些 actor 来处理这些消息。
监督路由对象
import akka.actor.{Actor, ActorRef, Props, SupervisorStrategy}
import akka.routing.RoundRobinPool
class Router extends Actor {
// 定义路由算法
val router = context.actorOf(RoundRobinPool(5).props(Props[Worker]), "router")
// 定义监督策略
override val supervisorStrategy = SupervisorStrategy.stoppingStrategy
def receive = {
case message => router ! message
}
}
//使用
val router = system.actorOf(Props[Router])
我们定义了一个名为 Router 的类,它继承自 Actor类。在 Router 类中,我们创建了一个路由器,并使用 RoundRobinPool 算法进行路由。我们还定义了一个监督策略,用于在路由器的子 actor 发生错误或崩溃时采取适当的措施。
三 Dispatcher 任务分发
3.1 什么是Dispatcher?
- Dispatcher将如何执行任务与何时运行任务两者解耦。一般来说,Dispatcher会包含一些线程,这些线程会负责调度并运行任务,比如处理Actor的消息以及线程中的Future事件。
- Dispatcher是Akka能够支持响应式编程的关键,是负责完成任务的机制。
- akka Dispatcher 是用于调度 akka Actor 的线程池。
- 它负责将 akka Actor 任务分配给线程池中的线程执行,并监控执行情况。
- 在 akka 框架中,所有的 akka Actor 都会关联一个 Dispatcher,Dispatcher 能够有效地管理和调度 akka Actor 的执行,从而提高 Actor 的并发性能。
如何获取Dispatcher
system.dispatcher //actor system's dispatcher
system.dispatchers.lookup("my-dispatcher"); //custom dispatcher
扩展
在 Akka 中,Dispatcher 是一个用于控制消息处理和线程分配的抽象。开发者可以通过使用 akka.dispatch.Dispatchers 类来获取特定的 Dispatcher。例如,下面的代码片段演示了如何获取用于执行阻塞 IO 操作的 Dispatcher:
import akka.dispatch.Dispatchers
val ioDispatcher = Dispatchers.IO
在这个例子中,ioDispatcher 变量将包含一个用于执行阻塞 IO 操作的 Dispatcher。你可以使用这个 Dispatcher 来指定 Akka 要使用哪个线程池来执行特定的操作,以及如何处理超时等问题。
请注意,使用 Dispatcher 是可选的,并且默认情况下 Akka 会使用内置的默认 Dispatcher 来处理消息。但是,在某些情况下,如果你想要更细粒度地控制消息处理和线程分配,你可能需要使用 Dispatcher。
3.2 Dispatcher的线程池
- Dispatcher基于Executor,所以在具体介绍Dispatcher之前,我们将介绍两种主要的Executor类型:ForkJoinPool和ThreadPool。
- ThreadPool Executor有一个工作队列,队列中包含了要分配给各线程的工作。线程空闲时就会从队列中认领工作。由于线程资源的创建和销毁开销很大,而ThreadPool允许线程的重用,所以就可以减少创建和销毁线程的次数,提高效率。
- ForkJoinPool Executor使用一种分治算法,递归地将任务分割成更小的子任务,然后把子任务分配给不同的线程运行。接着再把运行结果组合起来。由于提交的任务不一定都能够被递归地分割成ForkJoinTask,所以ForkJoinPool Executor有一个工作窃取算法,允许空闲的线程“窃取”分配给另一个线程的工作。
- 由于工作可能无法平均分配并完成,所以工作窃取算法能够更高效地利用硬件资源。ForkJoinPool Executor几乎总是比ThreadPool的Executor效率更高,是我们的默认选择。
3.3 Dispatcher的分类
要在application.conf中定义一个Dispatcher,需要指定Dispatcher的类型和Executor。还可以指定Executor的具体配置细节,比如使用线程的数量,或是每个Actor一次性处理的消息数量。
my-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2 #Minimum threads
parallelism-factor = 2.0 #Maximum threads per core
parallelism-max = 10 #Maximum total threads
}
throughput = 100 #Max messages to process in an actor before moving on.
}
● Dispatcher:默认的Dispatcher类型。将会使用定义的Executor,在Actor中处理消息。在大多数情况下,这种类型能够提供最好的性能。
● PinnedDispatcher:给每个Actor都分配自己独有的线程。这种类型的Dispatcher为每个Actor都创建一个ThreadPool Executor,每个Executor中都包含一个线程。如果希望确保每个Actor都能够立即响应,那么这似乎是个不错的方法。不过PinnedDispatcher比其他共享资源的方法效率更高的情况其实并不多。可以在单个Actor必须处理很多重要工作的时候试试这种类型的Dispatcher,否则的话不推荐使用。
●CallingThreadDispatcher:这个Dispatcher比较特殊,它没有Executor,而是在发起调用的线程上执行工作。这种Dispatcher主要用于测试,特别是调试。
● BalancingDispatcher:我们会在一些Akka文档中看到BalancingDispatcher。现在已经不推荐直接使用BalancingDispatcher了,应该使用前面介绍过的BalancingPool Router,使用BalancingPool时,Pool中的所有Actor会共享同一个邮箱,然后通过高效的工作窃取机制将任务重新分配给任何空闲的Actor。由于共享同一个邮箱,因此使用BalancingPool有助于确保在有工作的时候降低Actor的空闲率。