ActiveMq是一种开源的java程序,支持Java消息服务(JMS) 1.1 版本
一、持久化机制
1、KahaDB:5.4及之后版本,默认使用日志文件
activemq.xml默认使用KahaDB持久化存储,默认配置安装路径data目录下
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
KahaDB日志文件默认配置安装路径data目录下
db-num.log:存储消息数据,32M一个,超过会自动生成新的db-num+1.log文件,不在使用删除或者归档此文件
db.data :存储索引,持久化BTree索引db.log中的记录
db.free :空闲页面的id
db.redo : 消息恢复,KahaDB强制退出后启动,用于恢复BTree索引
lock :文件锁,获取KahaDB需求权限Broker
2、AMQ :5.3及之前的版本,文件存储形式,写入速度快,容易恢复。默认存储大小32M
3、JDBC:数据库持久化消息(例如mqsql)
数据库连接驱动:mysql-connector-java.version.jar放在mq安装lib目录中
修改持久化方式:activemq.xml中的persistenceAdapter修改为jdbc方式
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#jdbcsouce" createTablesOnStartup="true" />
</persistenceAdapter>
#jdbcsouce:mysql数据源配置
createTablesOnStartup : mq重启自动创建持久化jdbc所需表。默认true
mysql数据库配置(#jdbcsouce):
<bean id="jdbcsource" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://数据库IP/activemq?relaxAutoCommit=true"/>
<property name="username" value="用户名"/>
<property name="password" value="密码"/>
<property name="poolPreparedStatements" value="true"/>
</bean>
</beans>
数据库创建:mq重启后,自动创建表
activemq_msgs : 消息数据;
activemq_acks : 订阅关系;
activemq_lock : 集群环境使用,记录哪个broker是当前的 master broker
Journal数据库缓存插件:消息jdbc持久化单条消费qps较低,通过缓存批量写入jdbc提高消费速度,activemq.xm配置如下
<persistenceFactory>
<journalPersistenceAdapterFactory
journalLogFiles="4"
journalLogFileSize="32768"
useJournal="true"
useQuickJournal="true"
dataSource="#mysql-ds"
dataDirectory="activemq-data />
</persistenceFactory>
4、LevelDB:5.8及以后版本,文件存储。比KahaDB快,基于LevelDB索引,不是BTree索引(未普及,建议KahaDB),activemq.xml中如下
<persistenceAdapter>
<kahaDB directory="activemq-data“/>
</persistenceAdapter>
二、常见问题
1、Mq服务器宕机怎么办
宕机重启:非持久化消息存储在内存中,持久化消息是存储在文件中,重启会持久化消息从文件恢复
消息文件超过文件限制
1)、非持久化:生产者阻塞,消费者停止消费。系统可连接无法生产和消费挂了状态
2)、持久化:生产者阻塞,消费者正常消费。系统待消息消费掉一部分,文件空间释放后可继续生产发送消息,恢复正常
2、消息丢失
如果客户端发送数据后直接close,心跳包发送失败报SocketException,缓存数据作废,服务端reade接收者缓存数据时报Software caused connetiion abort:recv failed。
原因:非持久化消息超过内存设置,写入文件会阻塞所有操作,持续时间超过15s。如果客户端发送完消息,发送连接close时,服务器正好此操作,导致超过15s未应答,客户端直接调用socket关闭tcp连接。这时候接收者缓存数据作废,导致消息丢失
解决方案:非持久消息及时处理不阻塞
持久化消息机制
启动事务,commit成功后等待服务器响应
3、持久化消慢
非持久化默认异步,持久化默认同步,通过开启事务采用异步发送提高效率
4、集群消费消息不均匀
原因:mq采用prefetch机制,通过批量获取数据(默认1000条),批消息未处理完,管理控制台消息认可见,不会分配其他消费者
解决方案:设置prefetch设小点
5、消息发送失败处理
设置重发延迟时间和重传次数