ActiveMQ的消息存储和持久化
- MQ的高可用
- 事务
- 持久
- 签收
- 可持久化 (类似于与mq消息的同步机制)
为了避免意外宕机以后丢失信息,需要做到重启后可以恢复消息队列,消息系统一半都会采用持久化机制
。
ActiveMQ的消息持久化机制
- ActiveMQ的消息持久化机制有
- JDBC
- AMQ
- KahaDB
- LevelDB
- 无论使用哪种持久化方式,消息的存储逻辑都是一致的。
- 逻辑
- 就是在发送者将消息发送出去后
- 消息中心首先将消息存储到本地数据文件、内存数据库或者远程数据库等。
- 再试图将消息发给接收者
- 成功则将消息从存储中删除,失败则继续尝试尝试发送。
消息中心启动以后,要先检查指定的存储位置是否有未成功发送的消息,如果有,则会先把存储位置中的消息发出去。
官网
ActiveMQ的消息持久化机制有哪些?
-
AMQ Message Store
- 基于文件的存储方式,是以前的默认消息存储,现在不用了。
- 具有写入速度快和容易恢复的特点
- 消息存储在一个个文件中,文件默认大小为32M,当一个存储文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清楚阶段买这个文件可删除
AMQ适用于ActiveMQ5.3之前的版本
-
KahaDB
(默认)- 基于日志文件,从ActiveMQ5.4开始默认的持久化插件,我们从配置文件 activemq.xml 中可以看到
- KahaDB是一个基于文件的持久性数据库,它是使用它的消息代理的本地数据库。
- db-.log 为 kahaDB 存储消息的数据记录文件,文件大小限定32M,大于32M就会递增下一个数据文件,比如:log-2.log、log-3.log 以此类推。当不再引用数据文件中的任何消息时,文件就会被删除或归档。
- db.data 文件包含了持久化的 BTree 索引,是消息的索引文件,本质上是 B-Tree 树,使用 B-Tree 作为索引指向 db-.log 里面存储的消息。
- db.free 记录 db.data 文件中哪些页面是空闲的,后面建索引则优先从空闲页中创建。文件的具体内容是所有空闲页的ID
- db.redo 用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引
- lock 文件为文件锁,表示当前获得 kahaDB 读写权限的 broker
- 它已经针对快速持久性进行了优化。
- 它是自ActiveMQ 5.4以来的默认存储机制。
- KahaDB比其前身AMQ消息存储使用更少的文件描述符,并提供更快的恢复
- 基于日志文件,从ActiveMQ5.4开始默认的持久化插件,我们从配置文件 activemq.xml 中可以看到
-
LevelDB消息存储
- 从 ActiveMQ 5.8 之后引进的,和 KahaDB 非常相似
- 基于文件的本地数据存储形式,但是它提供比 KahaDB 更快的持久性。
- 虽然还不是默认的消息存储,但我们希望此存储实现在未来的版本中成为默认的。
- 但它不适用自定义 B-Tree 实现来索引预写日志,而是使用基于 LevelDB 的索引。要使用 LevelDB 需要修改配置文件为:
<persistenceAdapter>
<levelDB directory="activemq-data"/>
</persistenceAdapter>
-
JDBC消息存储
-
原理图
-
添加mysql数据库的驱动包到lib文件夹
-
jdbcPersistenceAdapter配置
-
数据库连接池配置
-
建仓sql和建表说明
- ACTIVEMQ_MSGS
queue和topic都存储在里面
- ACTIVEMQ_ACKS
存储持久订阅的信息和最后一个持久订阅接收的消息ID
- ACTIVEMQ_LOCK
表ACTIVEMQ_LOCK在集群环境下才有用,只有一个Broker可以获取消息,称为Master Broker,其他的只能作为备份等待Master Broker不可用,才可能成为下一个Master Broker。这个表用于记录哪个Broker是当前的Master Broker
- ACTIVEMQ_MSGS
-
验证和数据表变化
- 下面我先以 queue 为例,想要消息保存到数据库生效,必须在创建消息生产者的时候开启持久化模式(消费者无需改写代码):
这样,当生产者发送消息成功后就可以在数据库中的 activemq_msgs 表看到相应的数据了。而当消费者消费消息之后,数据库的相应记录就会被删除。
- 点到点(queue)
- 在点对点类型中
- 当DeliveryMode设置为NON_PERSISTENCE时,消息被保存在内存中
- 当DeliveryMode设置为PERSISTENCE时,消息保存在broker的相应的文件或者数据库中。
- 而且点对点类型中消息一旦被Consumer消费,就从数据中删除
- 发布/订阅(topic)
- 设置了持久订阅数据库里面会保存订阅者的信息
- 我们先启动一下持久化topic 的消费者。看到ACTIVEMQ_ACKS 数据表多了一条消息。LAST_ACKED_ID记录了CLIENT_ID最后签收的一条消息.
- 我们启动持久化生产6条数据,ACTIVE_MSGS数据表新增6条数据,消费者消费所有的数据后,表里的数据并没有消失。持久化的topic的消息不管是否被消费,是否有消费者,产生的数据永远存在且只存储一条。需要注意,持久化topic数据量太大会导致性能下降。
- 下面我先以 queue 为例,想要消息保存到数据库生效,必须在创建消息生产者的时候开启持久化模式(消费者无需改写代码):
-
-
JDBC Message store with ActiveMQ Journal
- 这种方式克服了 JDBC 存储的不足:JDBC 每次消息过来,都要去读写数据库,比较消耗性能,所以引进了 ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能
- 要使用 ActiveMQ Journa 的话,需要修改配置文件,把持久化工厂修改为如下
- 当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。 - 总结: 以前是实时写入mysql,在使用了journal后,数据会被journal处理,如果在一定时间内journal处理(消费)完了,就不写入mysql,如果没消费完,就写入mysql,起到一个缓存的作用。在慢慢的同步数据库
-
持久化总结