目录
- 一 消息传递方式
- 1.1 消息不可变
- 1.2 ASK消息模式
- 1.3 Tell消息模式
- 1.4 Forward消息模式
- 1.4 Pipe消息模式
有4种核心的Actor消息模式:Tell、Ask、Forward和Pipe。
一 消息传递方式
在这里,将从Actor之间发送消息的角度来介绍所有关于消息传递的概念。
● Ask:向Actor发送一条消息,返回一个Future。当Actor返回响应时,会完成Future。不会向消息发送者的邮箱返回任何消息。
● Tell。向Actor发送一条消息。所有发送至sender()的响应都会返回给发送消息的Actor。● Forward:将接收到的消息再发送给另一个Actor。所有发送至sender()的响应都会返回给原始消息的发送者。
● Pipe:用于将Future的结果返回给sender()或另一个Actor。如果正在使用Ask或是处理一个Future,那么使用Pipe可以正确地返回Future的结果。
1.1 消息不可变
- 前面提到过,消息应该是不可变的,由于Akka基于JVM,而Java和Scala都支持可变类型,所以是有可能发送可变消息的。
- 不过如果这么做,就可能会失去Akka在消除共享状态方面提供的诸多益处,一旦有了可变消息,就引入了一种风险:开发者可能会在某些时候开始修改消息,而他们修改消息的方式可能会破坏应用程序的运行。
- 当然,如果不修改消息的状态,那么要安全地使用可变消息也不是不可能,不过最好还是使用不可变消息,确保不会因为未来的变化而引入错误。
可变消息定义
// Java
public class Message {
public StringBuffer mutableBuffer;
public Message(StringBuffer: mutableBuffer) {
this.mutableBuffer = mutableBuffer;
}
}
// scala
class Message(var mutableBuffer: StringBuffer = new StringBuffer);
Message message = new Message(new StringBuffer("original"));
message.mutableBuffer = new StringBuffer("new");
val message = new Message(new StringBuffer("original"))
message.mutableBuffer = new StringBuffer("new")
这个例子新建了一条消息,然后修改mutableBuffer引用,使之指向另一个新建的StringBuffer,消息创建时传入的是StringBuffer (“original”),后来被修改成了StringBuffer(“new”)。这就是通过修改引用来修改消息的方法。
消息不可变
public class Message {
// final
public final StringBuffer mutableBuffer;
Message(StringBuffer mutableBuffer) {
this.mutableBuffer = mutableBuffer;
}
}
- 在Scala中,如果没有在声明中给出任何访问修饰符,成员变量的引用默认就是不可变的val。
- 现在我们Java中加入Final关键字,可以使引用设置为不可变,但是会出现问题,StringBuffer是可变的,因此可以修改修改StringBuffer对象本身。
public class ImmutableMessage{
public final String immutableType;
public ImmutableMessage(String immutableType) {
this.immutableType = immutableType;
}
}
class ImmutableMessage(immutableType: String)
- 由于String是不可变类型,因此可以通过使用String代替StringBuffer使得消息不可变。
总结
理解不可变性不仅仅对于Akka消息是至关重要的,对于如何进行安全的通用并发编程也是必不可少的,因此,无论什么时候,只要需要在线程之间共享数据,就应该首先考虑将数据定义为不可变。(重点:消息不可变)
1.2 ASK消息模式
- 在调用ask向Actor发起请求时,Akka实际上会在Actor系统中创建一个临时Actor。
- 接收请求的Actor在返回响应时使用的sender()引用就是这个临时Actor。
- 当一个Actor接收到ask请求发来的消息并返回响应时,这个临时Actor会使用返回的响应来完成Future。
- 因为sender()引用就指向临时Actor的路径,所以Akka知道要用哪个消息来完成Future。
要求
Ask模式要求定义一个超时参数,如果对方没有在超时参数限定的时间内返回这个ask的响应,那么Future就会返回失败。ask/?方法要求提供的超时参数可以是长整型的毫秒数,也可以是akka.util.Timeout,这种类型提供了更丰富的时间表达方式。
- Java模式
static import akka.pattern.Patterns.ask;
Timeout timeout = new akka.util.Timeout(
1,
java.util.concurrent.TimeUnit.SECONDS
);
Future future = ask(actor, message, timeout);
- Scala
import scala.concurrent.duration._
import akka.pattern.ask
// 设置超时时间
implicit val timeout = akka.util.Timeout(1 second)
val future = actorRef ? "message"
案例
- 查询时我们先去缓存中查询是否有数据
- 如果缓存中没用数据,我们去请求实际的url,获取数据,如果有数据,字节返回
- 获取到数据,交给实际的解析器进行进行返回
- 解析返回后,返回数据给请求客服端
- java
package article;
import akka.actor.AbstractActor;
import akka.actor.ActorRef;
import akka.actor.ActorSelection;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.pattern.AskableActorSelection;
import akka.util.Timeout;
import org.jboss.netty.handler.codec.http.HttpResponse;
import pojo.GetRequest;
import static scala.compat.java8.FutureConverters.toJava;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionStage;
/**
* @description: 解析Ask解析
* @author: shu
* @createDate: 2022/12/7 20:50
* @version: 1.0
*/
public class AskDemoArticleParser extends AbstractActor {
/**
* 缓存Actor路径
*/
private final ActorSelection cacheActor;
/**
* HttpActro 路径
*/
private final ActorSelection httpClientActor;
/**
* 实际解析路径
*/
private final ActorSelection artcileParseActor;
/**
* 超时时间
*/
private final Timeout timeout;
/**
* 当前Actor
*/
final ActorRef senderRef = sender();
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
/**
* 构造器
* @param cacheActorPath 文章html解析缓存路径
* @param httpClientActorPath http请求路径
* @param artcileParseActorPath 文章解析actor路径
* @param timeout 超时设置
*/
public AskDemoArticleParser(String cacheActorPath, String httpClientActorPath, String artcileParseActorPath, Timeout timeout) {
this.cacheActor = context().actorSelection(cacheActorPath);
this.httpClientActor = context().actorSelection(httpClientActorPath);
this.artcileParseActor = context().actorSelection(artcileParseActorPath);
this.timeout = timeout;
}
/**
* 消息的监听
* @return
*/
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ParseArticle.class, msg -> {
// 第一步先走缓存中查询是否有数据
final CompletionStage cacheResult =
toJava(new AskableActorSelection(cacheActor).ask( new GetRequest(msg.getUrl()), timeout));
log.info("缓存中的数据:{}"+cacheResult);
// 第二步:缓存中没用的话,就解析实时数据
final CompletionStage result = cacheResult.handle((x, t) -> { return (x!= null)
? CompletableFuture.completedFuture(x) :
// HTTP客户端的Actor发送ask请求,请求获取数据
toJava(new AskableActorSelection(httpClientActor).ask( msg.getUrl(), timeout))
.thenCompose(rawArticle -> toJava(
// 向用于文章解析的Actor发送ask请求,请求对原始文章进行解析。
new AskableActorSelection(artcileParseActor).ask(rawArticle, timeout))
);
}).thenCompose(x -> x);
log.info("实时处理的数据:{}"+cacheResult);
// 处理解析内容
result.handle((x, t) -> {
if(x != null) {
if(x instanceof String)
senderRef.tell(x, self());
} else if( x == null )
senderRef.tell(new akka.actor.Status.Failure((Throwable)t), self());
return null;
});
}).build();
}
}
缺点
- Ask模式看上去很简单,不过它是有隐藏的额外性能开销的,首先,ask会导致Akka在/temp路径下新建一个临时Actor。(需要一个中间商来代理获取到的消息)
- 这个临时Actor会等待从接收ask消息的Actor返回的响应,其次,Future也有额外的性能开销。
- Ask会创建Future,由临时Actor负责完成,这个开销并不大,但是如果需要非常高频地执行ask操作,那么还是要将这一开销考虑在内的。
总结
Ask很简单,不过考虑到性能,使用Tell是更高效的解决方案。
1.3 Tell消息模式
- Tell是ActorRef/ActorSelection类的一个方法,它也可以接受一个响应地址作为参数,接收消息的Actor中的sender()其实就是这个响应地址。
- 在Scala中,默认情况下,sender会被隐式定义为发送消息的Actor,如果没有sender(如在Actor外部发起请求),那么响应地址不会默认设置为任何邮箱(叫做DeadLetters)。
使用
- java 使用
sender().tell('需要发送的消息', self());
// self字段保存了这个actor的ActorRef。
final def sender(): ActorRef = context.sender()
- scala 使用
sender() ! '消息'
案例
package article;
import akka.actor.*;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
import akka.util.Timeout;
import org.jboss.netty.handler.codec.http.HttpResponse;
import pojo.GetRequest;
import pojo.SetRequest;
import java.util.concurrent.TimeoutException;
/**
* @description:
* @author: shu
* @createDate: 2022/12/9 20:07
* @version: 1.0
*/
public class TellDemoArticleParse extends AbstractActor {
/**
* 缓存Actor路径
*/
private final ActorSelection cacheActor;
/**
* HttpActro 路径
*/
private final ActorSelection httpClientActor;
/**
* 文章实际解析路径
*/
private final ActorSelection artcileParseActor;
/**
* 超时时间
*/
private final Timeout timeout;
/**
* 当前Actor
*/
final ActorRef senderRef = sender();
LoggingAdapter log = Logging.getLogger(getContext().system(), this);
public TellDemoArticleParse(ActorSelection cacheActor, ActorSelection httpClientActor, ActorSelection artcileParseActor, Timeout timeout) {
this.cacheActor = cacheActor;
this.httpClientActor = httpClientActor;
this.artcileParseActor = artcileParseActor;
this.timeout = timeout;
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.match(ParseArticle.class, msg -> {
// 运程Actor
ActorRef extraActor = buildExtraActor(sender(), msg.getUrl());
// 缓存Actor
cacheActor.tell(new ParseArticle(msg.getUrl()), extraActor);
// httpActor
httpClientActor.tell(msg.getUrl(), extraActor);
context().system().scheduler().scheduleOnce(
timeout.duration(),
extraActor,
"timeout",
context().system().dispatcher(),
ActorRef.noSender()
);
}).build();
}
// 返回一个aCTOR
private ActorRef buildExtraActor(ActorRef senderRef, String uri) {
class MyActor extends AbstractActor {
public MyActor() {
}
@Override
public Receive createReceive() {
return ReceiveBuilder.create()
.matchEquals(String.class, x ->
//if we get timeout, then fail
x.equals("timeout"), x -> {
senderRef.tell(
new Status.Failure(
new TimeoutException("timeout! ")
),
self()
);
context().stop(self());
})
.match(HttpResponse.class, httpResponse -> {
artcileParseActor.tell(
new ParseArticle(uri),
self()
);
})
.match(String.class, body -> {
//The cache response will come back before
//the HTTP response so we never parse in this case.
senderRef.tell(body, self());
context().stop(self());
})
.match(ParseArticle.class, articleBody -> {
cacheActor.tell(
new SetRequest(articleBody.getUrl(), "收到了消息"),
self()
);
senderRef.tell("收到了消息", self());
context().stop(self());
})
.build();
}
}
return context().actorOf(Props.create(MyActor.class,()->new MyActor()));
}
}
1.4 Forward消息模式
- Tell在语义上是用于将一条消息发送至另一个Actor,并将响应地址设置为当前的Actor。而Forward和邮件转发非常类似:初始发送者保持不变,只不过新增了一个收件人。
- 在使用tell时,我们指定了一个响应地址,或是将响应地址隐式设为发送消息的Actor。而使用forward传递消息时,响应地址就是原始消息的发送者
- 有时候我们需要将接受到的消息传递给另一个Actor来处理,而最终的处理结果需要传回给初始发起请求的一方。
//Java
actor.forward(result, getContext());
//Scala context信息是隐式传入的。
actor forward message
1.4 Pipe消息模式
很多情况下,需要将Actor中的某个Future返回给请求发送者。上文介绍过sender()是一个方法,所以要在Future的回调函数中访问sender(),我们必须存储一个指向sender()的引用:
//Java
final ActorRef senderRef = sender();
future.map(x -> {senderRef.tell(x, ActorRef.noSender())});
//Scala
val senderRef = sender();
future.map(x => senderRef ! x);(原文为future.map(x => senderRef ! ActorRef.noSender),有误。)
//Java
pipe(future, system.dispatcher()).to(sender());
//Scala
future pipeTo sender()
pipe(future) to sender()
pipe接受Future的结果作为参数,然后将其传递给所提供的Actor引用。在上面的例子中,因为sender()执行在当前线程上,所以我们可以直接调用sender(),而不用干一些奇怪的事情(比如把sender()引用存储在一个变量中),执行结果一切正确。这就好多了!