目录
- 一 基本案例
- 1.1 Java 版
- 1.2 Scala版
- 二 Actor的创建
- 2.1 ActorRef
- 2.2 Props
- 2.3 ActorSelection
- 三 Promise、Future和事件驱动的编程模型
- 3.1 阻塞与事件驱动
- 3.2 Future进行Actor响应
- 3.2.1 Java版
- 3.2.2 Scala 版
- 3.2.3 总结
- 3.3 成功处理
- 3.4 失败处理
- 3.5 恢复
- 3.6 链式调用
- 3.7 结果转换
- 3.8 组合Future
- 3.9 处理集合
一 基本案例
1.1 Java 版
需求:我们将在这个例子中构建一个简单的Actor,这个Actor接收一个字符串“Ping”,返回字符串“Pong”作为响应。
- 编码
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
/**
* @description: Java 收消息测试
* @author: shu
* @createDate: 2022/11/27 16:10
* @version: 1.0
*/
public class JavaPongActor extends AbstractActor {
public PartialFunction receive() {
// Actor字符匹配
return ReceiveBuilder
.matchEquals("Ping", s ->
// System.out.println("收到消息:"+s.toString()))
sender().tell("收到:OVER,OVER消息!", ActorRef.noSender()))
.matchAny(x ->
sender().tell(
new Status.Failure(new Exception("unknown message")), self()
))
.build();
}
}
- 测试
@Test
public void JavaPongActorTest(){
// 创建一个actor
TestActorRef<JavaPongActor> actorRef = TestActorRef.create(system, Props.create(JavaPongActor.class));
// 发送消息
actorRef.tell("Ping",ActorRef.noSender());
// 返回的消息
// actorRef.receive(ReceiveBuilder
// .matchEquals("收到:OVER,OVER消息!", message -> {
// System.out.printf("收到的key:%s",message);
// })
// .matchAny(o -> System.out.printf("收到的消息:%s",o))
// .build());
}
参数讲解
- AbstractActor:首先,我们继承了AbstractActor。这是一个Java 8特有的API,利用了Java 8的匿名函数(lambda)的特性。
- Receive:AbstractActor类有一个receive方法,其子类必须实现这个方法或是通过构造函数调用该方法。
- ReceiveBuilder:连续调用ReceiveBuilder的方法,为所有需要匹配处理的输入消息类型提供响应方法的描述,然后调用build()方法生成需要返回的PartialFunction。
- Match:ReceiveBuilder提供了一些值得一提的match方法,scala中的模式匹配。
ReceiveBuilder
.matchEquals("Ping", s -> System.out.println("It's Ping: " + s))
.match(String.class, s -> System.out.println("It's a string: " + s))
.matchAny(x -> System.out.println("It's something else: " + x))
.build
- tell():sender()函数会返回一个ActorRef,tell()是最基本的单向消息传输模式,第一个参数是我们想要发送至对方信箱的消息,第二个参数则是希望对方Actor看到的发送者。
1.2 Scala版
- 代码
import akka.actor.{Actor, Status}
import sun.rmi.runtime.Log
/**
* @description: Scala 收消息测试
* @author: shu
* @createDate: 2022/11/27 17:00
* @version: 1.0
*/
class ScalaPongActor extends Actor {
override def receive: Receive = {
case "Ping" => println("收到消息")
/
case "Ping" => sender() ! "Pong"
case _ =>
sender() ! Status.Failure(new Exception("unknown message"))
}
}
我们对比Java 版本来看,他更加简洁明了
- 测试
import akka.actor.ActorSystem
import akka.testkit.TestActorRef
import org.scalatest.{FunSpecLike, Matchers}
class AkkademyDbSpec extends FunSpecLike with Matchers {
// 获取系统实例
implicit val system = ActorSystem()
describe("akkademyDb") {
describe("given SetRequest") {
// 测试1
it("should place key/value into map") {
// 创建Actor实例
val actorRef = TestActorRef(new AkkaDba)
// 发送消息
actorRef ! SetRequest("key", "123456")
// 验证消息
val akkademyDb = actorRef.underlyingActor
akkademyDb.map.get("key") should equal("123456")
}
// 测试2
it("PONG TEST"){
val actorRef = TestActorRef(new ScalaPongActor)
actorRef ! ("Ping")
}
}
}
}
参数解释
- Actor:要定义一个Actor,首先要继承Actor基类。Actor基类是基本的Scala Actor API,非常简单,并且符合Scala语言的特性。
- Receive:在Actor中重写基类的receive方法。并且返回一个PartialFunction,要注意的是,receive方法的返回类型是Receive,Receive只不过是定义的一种类型,表示scala.PartialFunction[scala.Any,scala.Unit]。
- tell方法(!):我们使用tell方法向发送方发送响应消息,在Scala中,通过“! ”来调用tell方法,在tell方法“! ”的方法签名中,有一个隐式的ActorRef参数。如果在Actor外部调用tell方法的话,该参数的默认值会设为noSender。
def ! (message: Any)(implicit sender: ActorRef = Actor.noSender): Unit
- Actor中有一个隐式的变量self,Actor通过self得到消息发送者的值,因此Actor中tell方法的消息发送者永远是self。
二 Actor的创建
- 访问Actor的方法和访问普通对象的方法有所不同,我们从来都不会得到Actor的实例,从不调用Actor的方法,也不直接改变Actor的状态,反之,只向Actor发送消息。
- 除此之外,我们也不会直接访问Actor的成员,而是通过消息传递来请求获取关于Actor状态的信息,使用消息传递代替直接方法调用可以加强封装性。
2.1 ActorRef
- 在Akka中,这个指向Actor实例的引用叫做ActorRef。
- ActorRef是一个无类型的引用,将其指向的Actor封装起来,提供了更高层的抽象,并且给用户提供了一种与Actor进行通信的机制。
- 有一点可能相当明显:我们也正是在Actor系统中创建新的Actor并获取指向Actor的引用,actorOf方法会生成一个新的Actor,并返回指向该Actor的引用。
//Java
ActorRef actor = actorSystem.actorOf(Props.create(JavaPongActor.class));
//Scala
val actor: ActorRef =
actorSystem.actorOf(Props(classOf[ScalaPongActor]))
- 要注意的是,这里我们实际上并没有新建Actor,例如,我们没有调用actorOf(new PongActor)。
2.2 Props
- 为了保证能够将Actor的实例封装起来,不让其被外部直接访问,我们将所有构造函数的参数传给一个Props的实例。
- Props允许我们传入Actor的类型以及一个变长的参数列表。
//Java
Props.create(PongActor.class, arg1, arg2, argn);
//Scala
Props(classOf[PongActor], arg1, arg2, argn)
- 我们可以创建一个工厂方法,用于生成这样的Props示例
//Java
public static Props props(String response) {
return Props.create(this.class, response);
}
//Scala
object ScalaPongActor {
def props(response: String): Props = {
Props(classOf[ScalaPongActor], response)
}
}
- 然后就可以使用Props的工厂方法来创建Actor
//Java
ActorRef actor = actorSystem.actorOf(JavaPongActor.props("PongFoo"));
//Scala
val actor: ActorRef = actorSystem.actorOf(ScalaPongActor props
"PongFoo")
我的理解向前端组件的卡槽,用来传递值
2.3 ActorSelection
我们可以通过ActorRef.path来查看该路径
●akka://default/user/$$a
该路径是一个URI,它甚至可以指向使用akka.tcp协议的远程Actor,
● akka.tcp://my-sys@remotehost:5678/user/CharlieChaplin
理解
要注意的是,路径的前缀说明使用的协议是akka.tcp,并且指定了远程Actor系统的主机名和端口号,如果知道Actor的路径,就可以使用actorSelection来获取指向该Actor引用的ActorSelection(无论该Actor在本地还是远程)。
ActorSelection selection = system.actorSelection("akka.tcp://
actorSystem@host.jason-goodwin.com:5678/user/KeanuReeves");
三 Promise、Future和事件驱动的编程模型
3.1 阻塞与事件驱动
事件阻塞
- 进行IO操作时,编写的都是阻塞式的代码,我们调用一个同步的API时,调用的方法不会立即返回:应用程序会等待该调用执行完成,例如,如果发起一个HTTP请求的话,只有在请求完成后,才会收到返回的响应对象,由于发起调用的线程会暂停执行并等待,因此等待IO操作完成的代码都是阻塞的,在IO操作完成之前,发起调用的线程无法进行任何其他操作。
多线程知识参考:
多线程进行阻塞IO的问题?
● 代码没有在返回类型中明确表示错误
● 代码没有在返回类型中明确表示延时
● 阻塞模型的吞吐量受到线程池大小的限制
● 创建并使用许多线程会耗费额外的时间用于上下文切换,影响系统性能。
3.2 Future进行Actor响应
3.2.1 Java版
- 依赖
<dependency>
<groupId>org.scala-lang.modules</groupId>
<artifactId>scala-java8-compat_2.11</artifactId>
<version>1.0.2</version>
</dependency>
- 代码
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.Status;
import akka.japi.pf.ReceiveBuilder;
import scala.PartialFunction;
/**
* @description: Java 收消息测试
* @author: shu
* @createDate: 2022/11/27 16:10
* @version: 1.0
*/
public class JavaPongActor extends AbstractActor {
public PartialFunction receive() {
// Actor字符匹配
return ReceiveBuilder
.matchEquals("Ping", s ->
// System.out.println("收到消息:"+s.toString()))
sender().tell("Pong", ActorRef.noSender()))
.matchAny(x ->
sender().tell(
new Status.Failure(new Exception("unknown message")), self()
))
.build();
}
}
- 测试
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.pattern.AskableActorRef;
import akka.util.Timeout;
import org.junit.Test;
import scala.concurrent.Future;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import static scala.compat.java8.FutureConverters.toJava;
/**
* @description:
* @author: shu
* @createDate: 2022/11/27 18:49
* @version: 1.0
*/
public class PongActorTest {
ActorSystem system = ActorSystem.create();
ActorRef actorRef = system.actorOf(Props.create(JavaPongActor.class));
@Test
public void shouldReplyToPingWithPong() throws Exception {
// 采用Ask的方式来询问消息
Future sFuture =new AskableActorRef(actorRef).ask( "Ping", Timeout.apply(1000));
// 将scala转换成java
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture =
(CompletableFuture<String>) cs;
assert (jFuture.get(1000, TimeUnit.MILLISECONDS)
.equals("Pong"));
// 打印获取的值
System.out.println("获取的信息:"+jFuture.get());
}
@Test
public void shouldReplyToUnknownMessageWithFailure() throws
Exception {
Future sFuture = new AskableActorRef(actorRef).ask( "unknown", Timeout.apply(5000));
final CompletionStage<String> cs = toJava(sFuture);
final CompletableFuture<String> jFuture =
(CompletableFuture<String>) cs;
jFuture.get(1000, TimeUnit.MILLISECONDS);
}
}
成功
失败
3.2.2 Scala 版
- 代码
import akka.actor.{Actor, Status}
import sun.rmi.runtime.Log
/**
* @description: Scala 收消息测试
* @author: shu
* @createDate: 2022/11/27 17:00
* @version: 1.0
*/
class ScalaPongActor extends Actor {
override def receive: Receive = {
case "Ping" => sender() ! "Pong"
case _ =>
sender() ! Status.Failure(new Exception("unknown message"))
}
}
- 测试
/**
* @description:
* @author: shu
* @createDate: 2022/11/27 19:44
* @version: 1.0
*/
import akka.actor.{ActorRef, ActorSystem, Props}
import akka.pattern.ask
import akka.util.Timeout
import org.scalatest.{FunSpecLike, Matchers}
import scala.concurrent.Await
import scala.concurrent.duration._
import scala.language.postfixOps
class ScalaAskExamplesTest extends FunSpecLike with Matchers {
val system: ActorSystem = ActorSystem()
implicit val timeout: Timeout = Timeout(5 second)
val pongActor: ActorRef = system.actorOf(Props(classOf[ScalaPongActor]))
describe("Pong actor") {
// 正确测试
it("should respond with Pong") {
val future = pongActor ? "Ping" //uses the implicit timeout
val result = Await.result(future.mapTo[String], 1 second)
assert(result == "Pong")
println(result)
}
// 失败测试
it("should fail on unknown message") {
val future = pongActor ? "unknown"
intercept[Exception]{
Await.result(future.mapTo[String], 1 second)
}
println( Await.result(future.mapTo[String], 1 second))
}
}
}
成功
失败
3.2.3 总结
无论是在Java还是Scala的例子中,如果Future返回失败,那么阻塞线程会抛出异常。Future失败会产生一个Throwable, Java 8的CompletableFuture会对这个Throwable进行封装并抛出ExecutionException,而Scala API则会直接抛出这个Throwable。(Scala没有受检异常,所以可以直接抛出Throwable,而Java会在这里抛出一个非受检的异常类型。
注意:我们可以抽离成为公共类
/**
* 抽离成公共类
* @param message
* @return
*/
public CompletionStage<String> askPong(String message){
Future sFuture = new AskableActorRef(actorRef).ask(message, Timeout.apply(1000));
CompletionStage<String> cs = toJava(sFuture);
return cs;
}
/**
* 抽离成公共类
* @param message
* @return
*/
def askPong(message: String): Future[String] = (pongActor ? message).mapTo[String]
3.3 成功处理
- java
thenAccept方法操作返回结果
/**
* thenAccept测试
* @throws Exception
*/
@Test public void printToConsole() throws Exception {
askPong("Ping").
thenAccept(x -> System.out.println("replied with: " + x));
Thread.sleep(100);
}
- scala
onSuccess函数来处理
// 成功处理测试
it("should print to console"){
(pongActor ? "Ping").onSuccess({
case x: String => println("replied with: " + x)
})
Thread.sleep(100)
}
注意到onSuccess接受一个部分函数作为参数,所以非常适合用来处理Akka的无类型响应(通过模式匹配来判断返回结果的类型)。
3.4 失败处理
- java
/**
* 错误处理
*/
@Test
public void ErrorResult(){
askPong("cause error").handle((x, t) -> {
if(t!=null){
System.out.println("Error: " + t);
}
return null;
});
}
handle接受一个BiFunction作为参数,该函数会对成功或失败情况进行转换。handle中的函数在成功情况下会提供结果,在失败情况下则会提供Throwable,因此需要检查Throwable是否存在(结果和Throwable中只有一个不是null)。如果Throwable存在,就向日志输出一条语句。由于我们需要在该函数中返回一个值,而失败情况下又不需要对返回值做任何操作,因此直接返回null。
- scala
onFailure()函数
it("结果处理测试"){
askPong("causeError").onFailure {
case e: Exception => println("Got exception")
}
}
3.5 恢复
很多时候,在发生错误的时候我们仍然想要使用某个结果值。如果想要从错误中恢复的话,可以对该Future进行转换,使之包含一个成功的结果值。
- java
/**
* 失败默认值
*/
@Test
public void Recover(){
CompletionStage<String> cs = askPong("cause error")
.exceptionally(t -> {
return "default";
});
}
- scala
it("失败默认值"){
val f = askPong("causeError").recover {
case t: Exception => "default"
}
}
异步恢复
我们经常需要在发生错误时使用另一个异步方法来恢复,下面是两个用例。重试某个失败的操作。没有命中缓存时,需要调用另一个服务的操作。
- java
askPong("cause error")
.handle( (pong, ex) -> ex == null
? CompletableFuture.completedFuture(pong)
: askPong("Ping")
).thenCompose(x -> x);
- scala
askPong("causeError").recoverWith({
case t: Exception => askPong("Ping")
})
3.6 链式调用
应用函数式风格来处理延迟和失败的好处之一就是可以把多个操作组合起来,而在组合的过程中无需处理异常。我们可以把注意力放在成功的情况上,在链式操作的结尾再收集错误。
- java
CompletionStage<String> cs = askPong("Ping").thenCompose(x ->
askPong("Ping"));
- scala
val f: Future[String] = askPong("Ping").flatMap(x => askPong("Ping"))
3.7 结果转换
- java
/**
* 处理结果测试
* @throws ExecutionException
* @throws InterruptedException
*/
@Test
public void CoverResult() throws ExecutionException, InterruptedException {
// 将处理结果转为大写
CompletableFuture<String> future = (CompletableFuture<String>) askPong("Ping").thenApply(x -> x.toUpperCase(Locale.ROOT));
System.out.println(future.get());
}
/**
* 异步转换结果
*/
@Test
public void AsyncCoverResult(){
CompletionStage<CompletionStage<String>> futureFuture =
askPong("Ping").thenApply(this::askPong);
}
- scala
it("结果处理测试"){
//
val result= askPong("Ping").map(_.toUpperCase())
println(Await.result(result.mapTo[String], 1 second));
// 异步结果处理
val futureFuture: Future[Future[String]] =
askPong("Ping").map(x => {
askPong(x)
})
}
3.8 组合Future
- Java
@Test
public void CompletableFuture() throws ExecutionException, InterruptedException {
CompletableFuture<String> future = (CompletableFuture<String>) askPong("Ping")
.thenCombine(askPong("Ping"), (a, b) -> {
return a + b; //"PongPong"
});
System.out.println(future.get());
}
- scala
it("组合结果测试"){
val f1 = askPong("Ping")
val f2 = askPong("Ping")
val futureAddition: Future[String] = {
for (
res1 <- f1;
res2 <- f2
) yield res1 + res2
}
println(Await.result(futureAddition.mapTo[String], 1 second));
}
这个例子展示了一种处理多个不同类型Future的机制。通过这种方法,可以并行地执行任务,同时处理多个请求,更快地将响应返回给用户。
3.9 处理集合
在Scala中,如果我们有一个消息列表,对于列表中的每个消息,向PongActor发送查询,最后会得到如下的一个Future列表
// 返回列表
it("Future list"){
// list
val listOfFutures: List[Future[String]] = List("Ping", "Ping",
"failed").map(x => askPong(x))
// 转换结果为list
val futureOfList: Future[List[String]] = Future.sequence(listOfFutures)
futureOfList.foreach(x=> println(x))
}