文章目录
- 应用场景
- 与 SpringBoot 集成
- 示例
应用场景
AKKA 是一个用于构建高并发、分布式和容错应用程序的开源框架。它基于Actor模型,提供了强大的并发抽象和工具,适用于各种业务场景。以下是一些使用AKKA框架的常见业务场景的示例:
-
实时数据处理:AKKA提供了轻量级的Actor模型,可以用于处理实时数据流。您可以创建多个Actor来处理数据的不同部分,并使用消息传递机制进行通信和协调。这在实时监控、实时分析和实时推送等场景中非常有用。
-
并发任务执行:AKKA的Actor模型使得并发任务的执行变得简单。您可以将任务分解为多个独立的Actor,并让它们并行地执行。每个Actor可以负责处理一部分任务,并通过消息传递进行协调和结果汇总。这在批处理、并行计算和任务调度等场景中非常有用。
-
分布式系统:AKKA提供了分布式Actor模型,可以在多个节点上分布Actor的实例。这使得构建分布式系统变得更加容易。您可以使用AKKA的远程Actor和集群功能来实现分布式的任务分发、数据共享和容错机制。
-
微服务架构:AKKA可以作为构建微服务架构的基础。每个微服务可以由一个或多个Actor组成,并使用消息传递进行通信。AKKA的容错机制和监督策略可以帮助实现高可用性和容错性的微服务。
-
实时通信和聊天应用:AKKA提供了高效的消息传递机制,适用于实时通信和聊天应用。每个用户可以由一个Actor表示,消息可以通过Actor之间的邮箱进行传递。这使得实现实时聊天、通知和协作功能变得更加简单。
与 SpringBoot 集成
添加依赖
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-slf4j_2.12</artifactId>
<version>2.5.22</version>
</dependency>
集成要点
由于 ActorSystem 的创建不是依赖new方式,而是通过 create 方法,所以我们需要写一个 Bean 来生产 ActorSystem。另外 Actor,它也是通过 actorOf() 方法创建的,所以我们也需要写生产 Actor 引用的方法,Akka 提供了 IndirectActorProducer 接口,通过实现该接口,我们就可以实现DI(依赖注入)。集成 SpringBoot 之后,ActorSystem 范围内的依赖都会交给 SpringBoot 来管理,并且每个ActorSystem都会持有一个 ApplicationContext。
Actor 生产者
实现IndirectActorProducer,用于生产Actor,既然是交给Spring管理,所以必须 ApplicationContext对象 和 bean名称
import akka.actor.Actor;
import akka.actor.IndirectActorProducer;
import org.springframework.context.ApplicationContext;
public class ActorProducer implements IndirectActorProducer {
private ApplicationContext context;
private String beanName;
public ActorProducer(ApplicationContext context,String beanName){
this.context=context;
this.beanName=beanName;
}
@Override
public Actor produce() {
return (Actor) context.getBean(beanName);
}
@Override
public Class<? extends Actor> actorClass() {
return (Class<? extends Actor>) context.getType(beanName);
}
}
扩展组件
构造 Props,可以用于创建 ActorRef 对象
import akka.actor.Extension;
import akka.actor.Props;
import org.springframework.context.ApplicationContext;
public class SpringExt implements Extension {
private ApplicationContext context;
public void init(ApplicationContext context) {
System.out.println("applicationContext初始化...");
this.context = context;
}
public Props create(String beanName) {
return Props.create(ActorProducer.class, this.context, beanName);
}
}
扩展组件的提供者
通过 SpringExtProvider 我们可以获取到 SpringExt,通过 SpringExt 我们可以使用 Props 创建 ActorRef 对象
import akka.actor.AbstractExtensionId;
import akka.actor.ExtendedActorSystem;
public class SpringExtProvider extends AbstractExtensionId<SpringExt> {
private static SpringExtProvider provider = new SpringExtProvider();
public static SpringExtProvider getInstance() {
return provider;
}
@Override
public SpringExt createExtension(ExtendedActorSystem extendedActorSystem) {
return new SpringExt();
}
}
配置类
用于初始化 ActorSystem,并扫描到纳入到容器的 Actor
import akka.actor.ActorSystem;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class ScanConfig {
private final ApplicationContext context;
@Autowired
public ScanConfig(ApplicationContext context) {
this.context = context;
}
@Bean
public ActorSystem createSystem() {
ActorSystem system = ActorSystem.create("system");
SpringExtProvider.getInstance().get(system).init(context);
return system;
}
}
示例
创建一个 Controller
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.sie.mbm.mom.common.core.util.R;
import com.sie.mbm.mom.framework.security.annotation.Inner;
import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import org.springframework.validation.annotation.Validated;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import javax.annotation.Resource;
@Tag(name = "test")
@RestController
@RequestMapping("/test")
@Validated
@Inner(value = false)
public class TestController {
@Resource
private ActorSystem actorSystem;
@PostMapping("/test")
@Operation(summary = "test")
public R search( ) {
ActorRef pcm = actorSystem.actorOf(Props.create(BossActor.class));
pcm.tell("I AM MASTER.TELLING BOSS", ActorRef.noSender());
return R.ok();
}
}
创建一个 Actor 接受信息
import akka.actor.UntypedAbstractActor;
public class BossActor extends UntypedAbstractActor {
@Override
public void onReceive(Object message) {
System.out.println(message);
}
}
成功输出