参考文章
- Gitter Chat,Akka 在线交流平台
- Akka Forums,Akka 论坛
- Akka in GitHub,Akka 开源项目仓库
- Akka Official Website,Akka 官网
- Akka Java API,Akka 应用程序编程接口
- 《Akka入门与实践》 [加]Jason Goodwin(贾森·古德温)
文章系列
- Akka 学习(一)Actor 初步认识与环境搭建 已完成
- Akka 学习(二)第一个入门程序 已完成
- Akka 学习(三)Actor的基本使用 已完成
- Akka 学习(四)Remote Actor 已完成
- Akka 学习(五)消息传递的方式 已完成
- Akka 学习(六)Actor的监督机制 已完成
- Akka 学习(七)Actor的生命周期 已完成
- Akka 学习(八)路由与Dispatcher 已完成
- Akka 学习(九)Akka Cluster 已完成
Akka 基础篇就此结束了,Akka基础篇主要介绍Akka的基本概念与一些基本术语,使用方式
代码:https://github.com/Eason-shu/Akka# 一 CAP理论
CAP理论是由Eric Brewer提出的,指的是一个分布式系统中的一致性(Consistency)、可用性(Availability)和分区容错性(Partition tolerance)之间的一个权衡。在分布式系统中,这三个概念是相互矛盾的,因此,系统必须在这三个概念之间进行权衡,以确定在特定情况下采取哪种策略。例如,在一个分布式系统中,如果要同时保证一致性和可用性,就必须牺牲分区容错性。
1.1 C -一致性(Consistency)
一致性是指客户端会返回某条记录最新的值。假如有一个银行账号,如果我们在存入一张400美元的支票之后马上试图取回400美元,我们希望系统能够给出正确的账户余额,并且允许我们取回400美元。
1.2 A可用性(Availability)
可用性是指一个没有发生失败的节点能够返回一个合理的响应。(例如,给出某个写操作是否成功的精确描述)。
1.3 P -分区容错性(Partition Tolerance)
分区容错性指的是如果某个节点由于暂时的网络错误而被从网络中移除,那么系统可以继续正常运行。如果数据被冗余备份到三个节点,那么如果其中一个节点暂时变得不可用,而另两个节点仍然能够正常运行[插图],那么就认为系统具备分区容错性。
二 Akka Cluster状态的监听
2.1 节点的状态
- Akka Cluster基于Remoting,但是更强大,用处也更,如果使用Remoting,那么我们需要在基础设施或者代码中自己考虑高可用性这样的问题。
- 而Akka Cluster会负责解决很多这些问题,因此是我们构建分布式Actor系统的绝佳选择。
- joining - 当尝试加入集群时的初始状态
- up - 加入集群后的正常状态
- **leaving / exiting **- 节点退出集群时的中间状态
- down - 集群无法感知某节点后,将其标记为down
- removed - 从集群中被删除,以后也无法再加入集群
- fd* - 这个表示akka的错误检测机制Faiulre Detector被触发后,将节点标记为unreachable
- unreachable* - unreachable不是一个真正的节点状态,更多的像是一个flag,用来描述集群无法与该节点进行通讯。当错误检测机制侦测到这个节点又能正常通讯时,会移除这个flag。
2.2 节点状态案例
加入依赖
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-cluster_2.13</artifactId>
<version>2.5.23</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.typesafe.akka/akka-contrib -->
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-contrib_2.13</artifactId>
<version>2.5.23</version>
</dependency>
编写一个Actor
import akka.actor.AbstractActor;
import akka.cluster.Cluster;
import akka.cluster.ClusterEvent;
import akka.event.Logging;
import akka.event.LoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
/**
* @description:
* @author: shu
* @createDate: 2022/12/12 10:34
* @version: 1.0
*/
public class ClusterController extends AbstractActor {
protected final LoggingAdapter log = Logging.getLogger(context().system(), this);
Cluster cluster = Cluster.get(getContext().system());
@Override
public void preStart() throws Exception {
cluster.subscribe(self(), (ClusterEvent.SubscriptionInitialStateMode) ClusterEvent.initialStateAsEvents(), ClusterEvent.MemberEvent.class, ClusterEvent.UnreachableMember.class);
log.info("I'm about to start! Code: {} ", getSelf().hashCode());
}
@Override
public void postStop() throws Exception {
cluster.unsubscribe(self());
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(ClusterEvent.MemberUp.class, mUp->log.info("Member is Up: {}", mUp.member()))
.match(ClusterEvent.UnreachableMember.class, mUnreachable->log.info("Member detected as unreachable: {}", mUnreachable.member()))
.match(ClusterEvent.MemberRemoved.class, mRemoved->log.info("Member is Removed: {}", mRemoved.member()))
.match(ClusterEvent.LeaderChanged.class, msg->log.info("Leader is changed: {}", msg.getLeader()))
.match(ClusterEvent.RoleLeaderChanged.class, msg->log.info("RoleLeader is changed: {}", msg.getLeader()))
.match(ClusterEvent.MemberEvent.class, event->{})
.build();
}
}
配置文件
akka {
actor {
provider = "akka.cluster.ClusterActorRefProvider"
}
remote {
netty.tcp {
hostname = "127.0.0.1"
port = 0
}
artery {
enabled = on
canonical.hostname = "127.0.0.1"
canonical.port = 0
}
}
cluster {
seed-nodes = [
"akka://ClusterSystem@127.0.0.1:2552",
"akka://ClusterSystem@127.0.0.1:2551"
]
}
}
启动类
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* @description:
* @author: shu
* @createDate: 2022/12/12 11:04
* @version: 1.0
*/
public class MainSystem {
public static void main( String[] args )
{
if(args.length==0)
startup(new String[] {"2551", "2552", "0"});
else
startup(args);
}
public static void startup(String[] ports){
ExecutorService pool = Executors.newFixedThreadPool(ports.length);
for(String port : ports){
pool.submit(()->{
// Using input port to start multiple instances
Config config = ConfigFactory.parseString(
"akka.remote.netty.tcp.port=" + port + "\n" +
"akka.remote.artery.canonical.port=" + port)
.withFallback(ConfigFactory.load());
// Create an Akka system
ActorSystem system = ActorSystem.create("ClusterSystem", config);
// Create an
system.actorOf(Props.create(ClusterController.class), "ClusterListener");
});
}
}
}
节点初始化
2552的初始化
2551的初始化
说明
- 在底层有一个逻辑上的leader节点,负责协调状态的变化,集群会从逻辑上对节点进行排序,集群中的所有节点都会遵循这个顺序。排序列表中的第一个节点就是leader节点。
- Leader节点会对加入和离开集群的请求做出响应,并修改集群成员的状态。
- 在加入集群时,将要加入的节点会将其状态设为Joining。Leader节点会做出响应,将其状态改为Up。
- 如果一个节点将状态设置为Leaving,那么leader节点会做出响应,先将其状态改为Exiting,然后再改成Removed。
- 值得注意的是,如果一个节点被标记为不可达,那么Akka集群不会改变任何节点的状态:也就是说,在该节点由于无法从不可达状态恢复,被标记为Down之前,Akka Cluster将不会改变任何节点的状态。
- 如果节点无法访问并且被标记为Down,那么该节点在这之后将服务重新加入集群。结果会产生两个分离的集群(形成所谓的“左右脑”的情况)。