文章目录
- 前言
- 一、应用同步的配置与实现原理
- 二、应用同步源码分析
- 三、如何获取集群的节点列表
- 四、通讯模块Tribe
- 五、集群的Session同步
- 六、集群的Session共享
- 总结
前言
相信大家对Tomcat的集群部署都不陌生,以往,我们手动搭建一个Tomcat的集群环境,然后手动部署每个Tomcat上面的应用,保证他们都是相同的应用程序包,以便负载均衡的时候不会出现问题
但是问题来了,如果我们项目源码修改了,重新打包,这时候就要给每个Tomcat单独替换里面的war包,相当麻烦!
Tomcat给我们提供了一种十分简单的解决方案:
应用同步
一、应用同步的配置与实现原理
大家在修改Tomcat的server.xml配置文件的时候,应该都注意到了他这一行注释,
cluster这个单词格外的引人瞩目,没错他就是Tomcat里面集群的配置类,他里面提供了我们想要的应用同步的而功能!
前提,我们已经配置了多个Tomcat的集群环境,可以用nginx或者Apache做负载均衡器。
然后我们只需在server.xml配置中的 标签里面添加下面红色框里面的配置,就可以在集群环境,开启各个Tomcat之间的应用自动同步的功能。这里的配置是设置了 temp/ 目录作为临时保存同步war文件的目录, webapps/目录作为Tomcat的部署目录, watch/目录是监听目
Tomcat在启动的时候,会开启一个定时任务,一直检测监听目录文件改变,只要里面的文件保存时间改变了,就触发一系列方法,更新(传输与接收)war包到集群中的其他节点。
集群环境的每个节点都实现了发送端、与接收端的任务,每个节点他们是相等的关系,实际上吗,没有主从之分。
二、应用同步源码分析
监听watch目录是怎么监听文件变化的,又是怎么处理相关逻辑的,下面我们根据源码来了解!
我们先从集群节点中的,同步请求发送端开始讲起。
tomcat在启动时,开启的定时任务,会调用SimpleTcpCluster类的backgroundProcess(),里面通过clusterDeployer对象,调用其父类FarmWarDeployer的backgroundProcess()
在FarmWarDeployer的backgroundProcess方法中,调用了WarWatcher的check()方法,这个方法只要就是实现监听文件,做文件的变更判断
WarWatcher的check()方法中,有两个核心的地方:
- 获取配置文件指定的监听目录下的文件列表
- 遍历文件获取上次文件修改时间做对比,判断到文件修改了,就执行war包同步的逻辑
上面走到的fileModified()方法,是FarmWarDeployer类下的一个方法,这个类主要是实现部署war包的。
触发了FarmWarDeployer的fileModified()方法后,调用里面的copy()复制war包到指定的部署目录(实现这台tomcat的部署)
下面再调FarmWarDeployer的install()方法,获取集群的节点列表
再给集群节点发送同步的请求
下面的源码分析就到了集群同步接收端的节点,如何实现获取应用并实现部署
Tomcat在启动的时候,会开启一个定时任务NioReplicationTask,他会开启通道,定时接收同步请求
在drainChannel()方法里面,源码一层一层地点进去后发现,调用的是FarmWarDeployer的check()方法
再通过反射调用HostConfig的check()方法
然后通过deployApps()再调用deployWAR()方法,在这一行,将已经封装好的应用数据添加到context,里面调用StandardHost的addChild
将这个context(应用)部署到host里面
三、如何获取集群的节点列表
- 通过SimpleTcpCluster的registerMember(),用来注册一个集群节点
-
SimpleTcpCluster的unregisterMember(),用来排除一个集群节点
-
tomcat启动的时候通过生命周期Lifecycle执行SimpleTcpCluster的startInternal(),先注册自己作为localMember
-
启动一个线程(McastServiceImpl里面的线程类ReceiverThread),一直循环调用receive()一直通过socket不停地接收来自局域网等网段的信息(网络中各个ip一直在相互ping)
调用memberDataReceived(),将信息封装成member(包含ip等信息)
当这次调用的member被判断到是不在集群中,就调用里面的service.memberAdded()
注意:这里虽然调用了memberAdded(),但是不一定会加入到当前集群列表
然后里面一层一层调用memberAdded()
当调用到TcpFailureDetector的memberAdded(),如果这里的notify是true,就代表当前member跟localMember(这台tomcat)是属于同一集群的
notify这个值,是通过Membership来实现多播心跳发送与接收,只要server.xml配置了SimpleTcpCluster,他就会启用Membership来实现多播心跳检测。
同一集群的tomcat如果都配置了SimpleTcpCluster,也就能接收对方的心跳,就将这个notify设置为true,只有属于同一集群的,才会调用到SimpleTcpCluster的memberAdded(),在这里判断,如果该节点还没注册,就调registerMember()注册
四、通讯模块Tribe
Apache Tribes是Tomcat的一个模块,支持服务器集群中的组通信。
Tribes是一个具有组通信能力的消息传递框架,这些是在Tomcat 5容器的集群/session复制代码之外创建的。它是为Tamcat集群实 现提供的通信框架。它的目的之一是简化分布式应用点对点(peer-to-peer)及点对组(peer-to-group)通信。Tribes支持两种 类型的消息传递:可用于两个节点间事件的并发(concurrent)消息传递和可用于发送消息给多个节点的平行(parallel)消息传递
整个Tribes的设计核心可以用下图表示:
Tribes的设计思路,主要是通过拦截器与监听器来实现的,拦截器则是对底层数据的一种统一额外加工处理,监听器则作为接口提供应用层对数据做业务逻辑处理,组成了一个优雅的设计方案
- 应用层:
应用层面主要就是一些监听器,Tomcat内置了一些类,实现了这些监听器里面指定的方法,接受IO层传输过来的信息,并对其做响应的逻辑处理。
- 拦截器栈:
拦截器栈提供了在消息传送到应用层之前对消息进行一些额外的操作,例如对某些信息进行过滤编码等等操作。
- 在IO层有三个重要的模块:
MembershipService 模块主要负责组成员关系的维护,包括维护现有成员及发现新成员。
ChannelSender 模块负责向组内其他成员发送消息及其各种机制的详细实现
ChannelReceiver 模块用于接收组内其他成员发送过来的消息及其各种机制的详细实现
下面我会从底层往上,逐层通过具体的组件,结合源码来讲解:
MembershipService
一个集群包含若干成员,要对这些成员进行管理就必须要有一张包含所有成员的列表,当要对某个节点做操作时通过这个列表可以准确找到该节点的地址进而对该节点发送操作消息。
Tribes的设计是基于同等节点之间的通信,并不存在主节点选举的问题,每个节点都是相等的关系。它具备自动发现节点,即新节点加入要通知集群其他成员更新成员列表,让每个节点都能及时更新成员列表,每个节点通过交换机各自都维护一份集群成员表,且他们隔一段时间向交换机组播自己节点消息,即心跳操作。
集群的成员列表在 SimpleTcpCluster 类的成员属性 memberOnameMap 里:
Tribes通过 registerMember()、unregisterMember()方法,实现集群节点的注册和卸载,更新各自节点的集群节点列表
举个例子:
一个Tomcat集群,一开始有两个节点,A和B。
A和B各自分别都创建一个节点信息发射器和节点信息接收器,让他们运行于独立的线程中。发射器用于向组内发送自己节点的消息,而接收器则用于接收其他节点发送过来的节点消息并进行处理。
A和B都定时向交换机发送心跳检测,如果超过一定时间交换机没有接受到某个节点的心跳,则会给所有节点发送信息,然后所有节点更新自己的集群节点列表,将他移除。
如果这时候集群新增了节点C,节点C的Tomcat会通过发射器,向交换机发送自己的讯号,交换机负责传递,告诉每个节点,更新他们各自的集群节点列表,新增这个节点。
Tomcat提供了一个接口 MembershipService。我们可以通过他,获取集群环境中的其他成员节点,获取各个节点的信息。
例如,下面的这些方法:
等等方法 …
ChannelSender
刚才也说了,Tribes是个通讯框架,那就肯定有消息的发送者,ChannelSender 就充当这个角色。
ChannelSender 是个接口,Tomcat中他有一个唯一的实现类 ReplicationTransmitter ,他是作为消息发送的实际执行者。
这个类最核心的方法是 sendMessage(),他一目了然,先获取发送消息的承载类,然后利用他,传入集群环境的其他节点,分别对他们发送消息。
MultiPointSender接口的实现类有多个,实现了sendMessage()方法,作为实际的发送执行者,有以下几个,分别处理不同的场景。
ChannelReceive
消息的传递,有发送者,自然也有接受者,ChannelReceive就充当这个接收者。他负责接收处理其他节点从消息发送通道发送过来的消息。
本质其实是每个节点暴露一个端口作为服务端,去监听客户端,接受客户端发送的消息;而每个节点又充当客户端,去连接集群中其他节点的服务端,发送自己的消息。
ChannelSender就是充当客户端的,而ChannelReceiver充当服务端。
从ChannelReceive的类继承关系中可以看到,Tribes的消息接受端,有两种处理的方式,NIO和BIO。
。
BIO:
同步阻塞,服务器实现模式为一个连接一个线程,常使用于连接数目比较小且固定的架构,程序简单易理解,但是性能较低。
NIO:
同步非阻塞,基于Channel(通道)和Buffer(缓冲区)进行操作的,数据总是从通道读取到缓冲区中,或者从缓冲区写入到通道中,Selector(选择器)用于监听多个通道事件,因此使用单个线程就可以监听多个客户端通道,能够大大提升处理的效率。
NioReceiver 这个接收端处理类中值得一提的是,他引入了任务池的概念,在加载这个类时,已经创建好任务池,把接收任务提前定义好放入内存中,当接收到来自集群节点的消息时,可直接获取使用而不用再实例化。
ChannelInterceptor
通道拦截器。为了提高系统的可扩展性和灵活性,实现在应用层提供对源消息统一处理,Tribes在通道中引入通道拦截器。
通过类继承关系可以看到,目前Tomcat提供了以下几个通道拦截器:
这些具体的拦截器都是继承类 ChannelInterceptorBase,源码跟踪发现,启动时,会通过通道的任务池executor,遍历集群成员列表,分别启动一个任务线程,去执行拦截器。
MembershipListener和ChannelListener
为了更清晰更好地划分职责,Tribes设计了IO层和应用层,IO层专心负责网络传输方面的逻辑处理,把接收到的数据往应用层传送,应用层发送的数据也是通过此IO层发送。
而应用层的处理入口,就是MembershipListener和ChannelListener。
从名字可以看出,这里用了监听器模式,在信息传输时,触发安装好的监听器,使之执行监听器里面的处理逻辑。这些事件主要包含了集群成员的加入和退出、消息报文接收完毕等信息。所以分成两类监听器:
MembershipListener:跟集群成员的变化相关
ChannelListener:是跟集群消息接收发送相关
MembershipListener接口有两个抽象方法:
当集群环境添加了节点时,其他节点就会接收信息,这个信息进来Tomcat的通道时,就会触发监听器的 memberAdded()方法(memberAdded一直监听者集群环境过来的而信息),就会根据当前节点Tomcat执行的模式,去调用对应的方法,执行具体的处理逻辑。
移除节点也一样。
ChannelListener接口有两个抽象方法:
当消息通过通道传递进来时,触发监听器的 messageReceived(),根据当前节点Tomcat执行的模式,去调用对应的方法,执行具体的处理逻辑。
而accept()方法,则是由通道调用,以确定监听器是否将处理此消息。
五、集群的Session同步
Session的同步,是指在一个Tomcat的集群环境下,不借助其他第三方的组件,由Tomcat内部通讯机制,实现各个节点之间的Session信息的同步。
如果集群中有一个Tomcat实例的会话变了,它会通过会话管理器将改变的动作消息封装成消息然后调用集群对象Cluster,由Tribes将会话同步请求发出去。集群中会话同步的过程中,通信过程是以ClusterMessage为对象进行传输的,将实际的会话数据载体SessionMessageImpl序列化,进行传输,到其他Tomcat实例的时候会反序列化,消息由Tribes接收之后向Cluster上传。最后达到会话管理器,它根据动作消息同步会话
- Ssession同步的配置
每个节点的Tomcat,在配置了集群的前提下,在 标签里面,加入 标签,配置上 ClusterSessionListener。
针对ClusterSessionListener源码进行分析:
里面代码不多,主要就一个方法messageReceived(),用于接受集群环境传输过来的要同步的session信息。
通过ClusterMessage封装会话信息,传进来,解析这是对应哪个context应用的会话
拿到集群管理器里面对应的context,将会话信息设置进去
消息的接收方法messageDataReceived(),源码跟踪进去,最终到了DetalManager类的messageReceived()。
从下图可以看到,根据接收的会话消息实体里面的类型,分别做不同的处理,例如会话信息的创建、修改、移除、获取 …
我们点进去created方法,看看创建会话的代码细节
这个方法是创建一个新的session会话信息,可以看到,他先将全局的接受计数器属性 +1,然后创建一个空的session,把sessionId,会话信息等设置进去。
这个方法是集群的其他节点请求获取所有session信息的处理方法,他会把当前节点tomcat的所有session信息查询出来,然后通过集群发送出去。
其他剩余的session修改、移除、获取等的方法,这里就不看了,代码不复杂。
六、集群的Session共享
前面说的session同步,是在Tomcat集群环境下,实现每个节点的会话信息的同步,从而可以使集群的整体,无论访问哪个节点,获取到的会话信息都是一样的。
但是这种方案,有很大的弊端,他是不断通过网络IO去发起请求,接收请求,去不断地在每个节点执行同步,不止存在网络的延迟,还会有性能的损耗。所以自从这种方案诞生,就没几个开发者愿意使用这种方式。
大部分人,还是会选择使用session的共享。共享,就是每个节点使用的session,都是来自同一个地方,拿数据写数据都是在同一个地方操作,这样就不需要同步数据,防止出现数据延迟,数据不一致的问题。
但是Tomcat原生是不支持这种共享方式的,所以有一位开源作者,开发了一个用第三方中间件redis来存储会话信息的的实现方案。
这种方案他的实现方式看以下图:
他跟用Redis做缓存的方式是一样的,只不过他现在是用Redis来存储Session会话的信息,说白了,他们其实是一个意思,只是如果原本项目代码写了会话的方式,这里可以直接,用会话共享,实现项目的集群部署。
- 使用方法
把 tomcat8.5-redis-session-manager.jar 和相关的依赖包,放到Tomcat安装目录下的lib目录。
然后修改Tomcat的 context.xml 配置文件,在 标签里面加入如下配置:
- 原理讲解
从上面的修改配置中我们看到,引入了两个类:
com.s.tomcat.redissessions.RedisSessionHandlerValve
com.s.tomcat.redissessions.RedisSessionManager
从他们的包名可以看出来他们都是来自我们给Tomcat引入的第三方依赖包。这个包主要实现的功能是:
在Tomcat里添加一个阀门,当session内容变动时,触发这个阀门,向配置好的redis插入数据,把session的信息同步到redis里面,在具体的应用中,当我们通过代码获取当前的Session时,调用的是这个包里面的方法而不是Tomcat的方法,然后这个方法会去redis,获取我们想要的会话信息并返回,这个过程,应用的程序编写者是完全不感知的,他和原本的获取Tomcat会话信息的代码是一致的。
下面出现的所有代码截图,都是来自tomcat8.5-redis-session-manager.jar的代码
我们打开 RedisSessionManager 类的源码看到,他继承了Tomcat的抽象类ManagerBase,这个类就是用来管理信息的一个父类,他的子类将用来承载例如Session的会话数据的管理。
而RedisSessionManager继承了ManagerBase,自然也继承了他管理会话的能力,然后他重写了里面的 createSession()、findSession()、add()、remove()等等方法,利用redis,去实现会话信息的创建和获取。
我们以createSession()方法来探索一下,可以看到,他先通过配置文件的redis连接信息,获取redis的连接,以sessionId作为key在redis创建一条作为session的父级数据。
下面这两段代码,他创建了一个内部类似Session管理的对象(RedisSession继承了Tomcat的StandardSession,StandardSession就是Tomcat的Session信息承载类),然后将会话信息设置进去,调用saveInternal()方法,把对象的信息写进redis。
然后我们再分析下findSession(),刚才那个方法,通过第三方的jar包,在里面维护了一个充当session对象的类,然后将会话信息写进redis。
这里,通过传入的sessionId,去redis查询出对应的信息,反序列化成一个对象,才封装成session对象返回。
到这里就讲解完这个包如何实现redis转存Session信息了。
总结
欢迎指出我的错误!