一、主从和集群架构的特点
1.1 主从架构的-Master/slave模式特点
读写分离,纵向扩展,所有的写操作一般在master上完成,slave只提供一个热备
1.2 集群架构-Cluster模式特点
分布式的一种存储,水平的扩展,消息的分布式共享
二、ActiveMQ的主从架构
2.1 架构图
2.2 特点
- 只有master对外提供服务,也就是说,producer和consumer智能连接master
- 一个master下面可以有一个或者多个slave,slave不对外提供服务
- 一个slave只能属于一个master
- 整个主从架构中只有一个master,否则容易造成数据复制到混乱
- master、slave之间的数据是同步的(共享方式有3种)是一个同步的复制
2.3 实现方式(3种)
- 基于文件(Shared File System),需要创建一个共享的持久化文件
<persistenceAdapter>
<kahaDB directory="/data/kahadb"/> # 自定义的地址
</persistenceAdapter>
主要是通过共享目录存储目录来实现master和slave的热备,谁先启动,谁就可以最终取得共享目录的控制权成为master,其它的应用就只能作为slave
- 基于数据库(JDBC Master Slave),需要创建一个共享的数据库
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource" destory-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="root" />
<property name="poolPreparedStatements" value="true"/>
</bean>
<persistenceAdapter>
<jdbcPersistenceAdapter dataDirectory="${activemq.base}/data" dataSource="#mysql-ds"/>
</persistenceAdapter>
与shared filesystem方式类似,知识共享的存储介质由文件系统改成数据库而已
- 基于LevelDB复制(Replicated LevelDB Store),需要zookeeper的支持,例如使用activemq本身就支持的LevelDB持久化。
2.4 实现 (基于LevelDB复制)
2.4.1 准备环境
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
三台服务器均下载安装activemq
[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
2.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
zkAddress 为zookeeper的地址
<!--
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
-->
<persistenceAdapter>
<replicatedLevelDB
directory="${activemq.data}/leveldb"
replicas="3"
bind="tcp://0.0.0.0:0"
zkAddress="192.168.190.200:2181"
hostname="192.168.190.201" <!-- 集群中任意一台服务器ip或通过host文件配置的主机名 -->
sync="local_disk"
zkPath="/activemq/leveldb-stores"
/>
</persistenceAdapter>
如果是单机需要修改下边配置,61616端口修改,其它全部注掉,防止端口冲突,我们这里是多服务器配置,可以不用更改,本配置忽略下边配置
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<!--
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
2.4.2 启动
先启动zookeeper,然后分别启动三台服务器的activemq
[root@master bin]# pwd
/myactivemq/apache-activemq-5.15.9/bin
[root@master bin]# ./activemq start
我这里先启动了241的节点,所以241为主master节点
因此我们只能访问241的控制界面
2.4.3 宕机测试
选择master节点, ./activemq stop
我们发现其中一台从节点被选举为主节点
重新启动原来master,其会作为slave服务器继续提供服务
三、ActiveMq的集群架构
3.1 架构图
3.2 集群的配置实现
3.2.1 静态的发现
需要在节点的配置文件中,显示的配置其他节点的IP地址和服务端口号,例如:
<networkConnectors>
<networkConnector uri="static:(tcp://192.168.190.200:61616,tcp://192.168.190.201:61616,tcp://192.168.190.202:61616)" />
</networkConnectors>
3.2.3 动态的发现
通过广播的方式,动态的发现其它节点。例如:
<networkConnectors>
<networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
<!--
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
-->
</transportConnectors>
在每一个节点中activemq.xml配置文件中配置上述代码。其中defalut是我们自定义的名称,在discoveryUri属性中进行引用即可。有利于集群节点的动态变更。
注意,如单机部署多个activemq示例,jetty.xml文件中的8161端口也需要更改
3.3 集群的具体实现(动态的发现)
3.4.1 准备环境 (同2.4.1)
这里准备3台服务器:192.168.190.200、192.168.190.201、192.168.190.202
三台服务器均下载安装activemq
[root@master /]# mkdir myactivemq
[root@master /]# cd myactivemq
[root@master opt]# wget https://archive.apache.org/dist/activemq/5.15.9/apache-activemq-5.15.9-bin.tar.gz
[root@master opt]# tar -zxvf apache-activemq-5.15.9-bin.tar.gz
3.4.2 修改配置文件 config/activemq.xml (三台服务器activemq相同配置)
<networkConnectors>
<networkConnector uri="multicast://default" /> <!-- 这里defalut广播名可以是随意取的 -->
</networkConnectors>
<transportConnectors>
<!-- DOS protection, limit concurrent connections to 1000 and frame size to 100MB -->
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600" discoveryUri="multicast://default"
/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1883?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
3.4.3 启动三台服务
我这里启动2台的能成功加入集群,启动三台失败,暂未找到原因...当启动第三台的时候,查看日志报错如下
2024-04-15 12:06:18,843 | WARN | Failed to add Connection id=localhost->localhost-36779-1713153927901-42:2, clientId=NC_localhost_outbound due to {} | org.apache.activemq.broker.TransportConnection | ActiveMQ Transport: tcp:///192.168.190.202:50616@61616
javax.jms.InvalidClientIDException: Broker: localhost - Client: NC_localhost_outbound already connected from tcp://192.168.190.200:47904
at org.apache.activemq.broker.region.RegionBroker.addConnection(RegionBroker.java:247)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.jmx.ManagedRegionBroker.addConnection(ManagedRegionBroker.java:227)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.advisory.AdvisoryBroker.addConnection(AdvisoryBroker.java:119)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.BrokerFilter.addConnection(BrokerFilter.java:99)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection.processAddConnection(TransportConnection.java:849)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.jmx.ManagedTransportConnection.processAddConnection(ManagedTransportConnection.java:77)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.command.ConnectionInfo.visit(ConnectionInfo.java:139)[activemq-client-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection.service(TransportConnection.java:336)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:200)[activemq-broker-5.15.9.jar:5.15.9]
at org.apache.activemq.transport.MutexTransport.onComm