activemq消息中间件

news2025/1/16 18:58:26

ActiveMQ消息中间件详解

下载地址:https://activemq.apache.org/activemq-5015009-release

1、MQ的产品种类

1.1、消息中间件的特性/共同特性/共同维度

  • Kafka(大数据专用、由java/scala编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RabbitMQ(erlang编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • RocketMQ(java编写
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合
  • ActiveMQ
    • API发送和接收
    • MQ的高可用性
    • MQ的集群和容错配置
    • MQ持久化
      • radis 特性:持久化
    • 延时发送/定时投递
    • 签收机制
    • Spring整合

1.2 入门场景使用概述

订单秒杀系统下单之后存在在业务的流程

(读取订单、库存检查、库存冻结、余额检查、余额冻结、订单生成、余额扣减、库存扣减、生成流水、余额冻结、库存解冻)

RPC接口基本是同步调用,整体的服务性能遵循“木桶理论”,即整体系统的耗时取决于最慢的那个接口。比如A调用B/C/D都是50 ms,但是B调用B1花费的时间为2000 ms,那么将会拖累整个系统的服务性能。

image-20230531220137860

注:在设计系统时,明确达到的目标

  1. 要做到系统解耦,当新的模块接进来时,要做到代码的改动最小;能够解耦
  2. 设置流量缓冲池,可以让后端按照系统自身的吞吐能力进行消费,不被冲垮;能够削峰
  3. 强弱依赖梳理能将非常关键调用链路的操作异步化并提升整体系统的吞吐能力;能够异步

消息中间件的作用:解耦、削峰、异步;

定义:发送者把消息发送给消息服务器,消息服务器将消息存放在若干队列/主题中,在合适的时候,消息服务器会将消息转发给消息接收者。在这个过程中,发送和接收都是异步的,也就是发送无需等待,而且发送者和接收者的生命周期也没有必然的关系;尤其在发布pub/订阅sub模式下,也可以完成一对多的通信,即让一个消息有多个接收者。

image-20230531220905761

队列(queue):相当于发短信,一对一。

主题(topic):相当于朋友圈,需要订阅,公众号;一对多。

特点

  • 异步处理
  • 应用系统之间解耦
  • 削峰

image-20230531222008307

ActiveMQ的解压安装

  • 在官网进行下载(下载地址:https://activemq.apache.org/activemq-5015009-release)

image-20230603215545727

注:建议下载linux版本,消息队列大多数情况都是在集群的环境下进行部署,而集群的使用大多数是使用linux

  • 上传linux压缩包并进行解压(注:外来文件放在/opt下

image-20230603220051707

  • 进行启动(在bin目录下)
./activemq start #启动
./activemq stop #关闭
./activemq restart #重启
  • 查看启动是否成功activemq的默认进程编号为61616,查看进程是否被占用
netstat -anp|grep 61616
lsof -i 61616
ps -ef|grep activemq
#可以访问http://localhost:8161 可现可视化界面
#默认的用户名/密码: admin/admin

注:页面访问,点击进行登录

  • 携带日志启动
./activemq start > 路径/run_activemq.log

image-20230603221649898

1.3、JMS编码总体架构

image-20230604165513141

2、代码编写

  • IDEA创建新的maven工程,并引进相应版本的依赖。可在你下载ActiveMQ的版本网页查找对应版本的maven依赖

image-20230604171146556

<dependency>
  <groupId>org.apache.activemq</groupId>
  <artifactId>activemq-all</artifactId>
  <version>5.15.9</version>
</dependency>

2.1、队列(Queues)实现

注:ActiveMQ的访问地址的方式为tcp的方式

进行生产者进行消息发送

//生产者
public static  void queueDemo() throws JMSException {
        String path="tcp://192.168.160.128:61616/";
        String name ="queue01";

        //创建连接工厂,采用默认的用户名和密码
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);
        //创建连接并启动
        Connection connection = connectionFactory.createConnection();
        connection.start();
        //创建会话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(name);
        //创建消息生产者
        MessageProducer producer = session.createProducer(queue);
        //进行消息发送
        for (int i =0;i<3;i++){
            //创建消息
            String message = "这是第"+i+"条消息";
            //创建消息
            Message textMessage = session.createTextMessage(message);
            //消息发送
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("消息发送完成");

    }

//消费者
public static void queueConsumersDemo() throws JMSException {
        String path = "tcp://192.168.160.128:61616";
        String name ="queue01";
        //创建连接工厂
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(path);
        //创建链接
        Connection connection = connectionFactory.createConnection();
        //启动
        connection.start();
        //创建session
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        //创建消费者
        Queue queue = session.createQueue(name);
        //创建消费者
        MessageConsumer consumer = session.createConsumer(queue);
        while (true){
            //可以通过recive设置等待时间,当超时时,消费者自动关闭
            //Message receive = consumer.receive(4000L);
            Message receive = consumer.receive();
            if (receive != null){
                System.err.println(receive);
            }
        }
    }

注:tcp对应的进程的端口为61616,注意连接的地址

image-20230604174451423

image-20230604180026455

队列监听setMessageListener

 public static void setListener() throws JMSException, IOException {
        ActiveMQConnectionFactory MQ = new ActiveMQConnectionFactory(path);
        Connection conn = MQ.createConnection();
        conn.start();

        Session session = conn.createSession(false, AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(name);
        MessageConsumer consumer = session.createConsumer(queue);

        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                if (message!=null ){
                    try {
//                        TextMessage receive = (TextMessage) consumer.receive();
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("==>>>"+textMessage.getText());
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            }
        });
        //防止后台关闭,对MQ进行监听,当有消息的时候进行输出
        System.in.read();
        consumer.close();
        session.close();
        conn.close();
    }

在当多个消费者进行等待,之后生产者进行消息的产生,每个消费者对消息进行平均分配,类似于负载均衡。

例:当两个消费者在进行等待时,生产者产生6条消息,则两个消费者对者6条消息进行平均分配,没人三条。

2.2、主题(topic)代码实现

特点:

  1. 生产者将消息发送到topic中,每个消息可以有多个消费者,属于1:N的关系
  2. 生产者和消费者之间有时间上的相关性。订阅某一个主题的消费者只能消费自它订阅之后发布的消息
  3. 生产者生产时,topic不保存消息它是无状态不能落地,例如无人订阅就去生产,那是一条废消息,所以,一般先启动消费者在启动生产者

JMS规范允许客户创建持久订阅,还在一定程度上放松了时间上的相关性要求。持久订阅允许消费者消费它在未处于激活状态时发送的消息。一句话,类似于微信公众号的订阅

//主题生产者
    public static void topicProduce() throws JMSException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
        connection.start();
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);

        MessageProducer producer = session.createProducer(topic);

        for (int i =0; i <3; i++){

            String tpc = "这是第一条消息";
            TextMessage textMessage = session.createTextMessage(tpc);
            producer.send(textMessage);
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("主题发送成功====");

    }
//创建消费者
public static void topicConsumer() throws JMSException, IOException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
        connection.start();
        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        MessageConsumer consumer = session.createConsumer(topic);
        consumer.setMessageListener(new MessageListener() {
            @Override
            public void onMessage(Message message) {
                TextMessage textMessage = (TextMessage) message;
                try {
                    System.err.println("============="+textMessage.getText());
                } catch (JMSException e) {
                    e.printStackTrace();
                }
            }
        });
        System.in.read();
        consumer.close();
        session.close();
        connection.close();
    }

注:主题的启动顺序,先启动消费者,在启动生产者。消费者每个都会得到一份生产者发出的所有的信息。

image-20230615220347023

3、浅谈JMS与JavaEE

3.1、什么是JavaEE

JavaEE是一套使用Java进行企业级应用开发的大家一直遵循的13个核心规范工业标准。JavaEE平台提供了一个基于组件的方法来加快设计、开发、装配即部署企业应用程序。

image-20230615221826581

3.2、什么时JMS(Java消息服务)

Java消息服务是指的两个应用程序之间进行异步通信的API,它为标准消息协议和消息服务提供了一组通用接口,包括创建、发送、读取消息等,用于支持Java应用程序的开发,在JavaEE中,当两个应用程序使用JMS进行通信时,他们之间并不是直接相连,而是通过一个共同的消息收发服务组件关联起来以达到解耦、异步、削峰的效果

image-20230615222420549

3.3、MQ中间件的其他落地产品

image-20230615222605081

特性ActiveMQRabbitMQRocketMQKafka
单机吞吐量万级,比 RocketMQ、Kafka 低一个数量级同 ActiveMQ10 万级,支撑高吞吐10 万级,高吞吐,一般配合大数据类的系统来进行实时数据计算、日志采集等场景
opic 数量对吞吐量的影响topic 可以达到几百/几千的级别,吞吐量会有较小幅度的下降,这是 RocketMQ 的一大优势,在同等机器下,可以支撑大量的 topictopic 从几十到几百个时候,吞吐量会大幅度下降,在同等机器下,Kafka 尽量保证 topic 数量不要过多,如果要支撑大规模的 topic,需要增加更多的机器资源
时效性ms 级微秒级,这是 RabbitMQ 的一大特点,延迟最低ms 级延迟在 ms 级以内
可用性高,基于主从架构实现高可用同 ActiveMQ非常高,分布式架构非常高,分布式,一个数据多个副本,少数机器宕机,不会丢失数据,不会导致不可用
消息可靠性有较低的概率丢失数据基本不丢经过参数优化配置,可以做到 0 丢失同 RocketMQ
功能支持MQ 领域的功能极其完备基于 erlang 开发,并发能力很强,性能极好,延时很低MQ 功能较为完善,还是分布式的,扩展性好功能较为简单,主要支持简单的 MQ 功能,在大数据领域的实时计算以及日志采集被大规模使用
其他Apache软件基金会开发、起步较早,但没有经过大量吞吐场景验证,目前社区不是很活跃开源,稳定,社区活跃度高阿里出品,目前已交给Apache,但社区活跃度较低Apache软件基金会开发、开源、高通吐量,社区活跃度高

3.4、JMS组成结构和特点

  1. JMS provider 实现JMS接口和规范的消息中间件,也就是我们的MQ服务器
  2. JMS producer 消息的生产者 创建和发送消息的客户端应用
  3. JMS consumer 消息消费者 接受和处理JMS消息的终端
  4. JMS message 产生的消息信息体
    • 消息头
    • 消息属性
    • 消息体 封装具体的消息数据,5中消息体格式,发送和接收的消息体类型必须一一对应

注:发送什么类型的消息,就得接收什么类型的消息,要一一对应

消息持久模式与非持久模式

  • 持久性,应该被传送“一次仅仅一次”,这意味着如果JMS提供者出现故障,该消息不会丢失。他会在服务器恢复之后再次传递(注:主要没有被发送,便会一直在消息服务器中存储)
  • 非持久,最多会传送一次,只要服务器出现故障,消息就会被丢失

消息头属性

  1. JMSDestination 消息发动的目的地
  2. JMSDeliveryMode 消息是否持久
  3. JMSExpiration 消息的过期时间(默认时永不过期)可设置消息在一定时间之后会过期。消息的过期时间Destination的send方法中的timeToLive值加上发送时刻的GMT时间值如果timeToLive的值为0,表示消息永不过期,如果发送后,在消息过期时间之后消息还没有被发送到消息的目的地,则该消息被清除。
  4. JMSPriority 消息优先级,从0-9十个级别,0-4是普通消息,5-9是加急消息。JMS不要求MQ严格按照者是个优先级发送消息,但必须保重加急消息要先于普通消息到达默认级别是4
  5. JMSMessageId 消息ID,消息的唯一识别方式。

消息体的五种属性

  1. TextMessage 字符串类型
  2. MapMessage Map类型
  3. BytesMessage 字节类型(二进制数组消息)
  4. StreamMessage 流类型
  5. ObjectMessage 对象类型

消息属性

如果需要除消息头字段以外的值,那么可以使用消息属性,识别/去重/重点标注等操作非常有用的方法。他们是以属性名和**属性值对(K:V)**的形式指定的,可以将属性是为消息头的扩展,属性指定一些消息头没有包括的附加消息,比如可以在属性里指定消息选择器。

消息的属性就像可以分配给一条消息的附加消息头一样,他们可以允许开发者添加有关消息的不透明附加消息,他们还用于暴漏消息选择器在消息过滤是使用的数据。

例:

TextMessage message = session.createTextMessage();
message.setText(text);
message.setStringProperty("username","ABC")// 进行消息自定义

3.5、消息持久性

3.5.1、持久的队列(Queue)

消息队列中,默认的消息为消息持久化

采用持久性消息,当ActiveMQ宕机时,未消费的消息的数量保持不变,有利于保持消息而不被丢失。

队列的默认传送模式,此模式保证这些消息被成功的发送一次和成功的使用一次。对于这些消息可靠性是优先考虑的因素。

可靠性另一个重要的方面是确保持久性消息传送至目标后,消息服务在向消费者传送它们之前不会丢失这些消息。

3.5.2、持久主题(topic)

  • 先启动订阅,在启动生产
	//持久化主题,消息消费者
    public static void lastingTopicConsumer() throws JMSException, IOException {
        ActiveMQConnectionFactory mq = new ActiveMQConnectionFactory(path);
        Connection connection = mq.createConnection();
       connection.setClientID(name);


        Session session = connection.createSession(false, AUTO_ACKNOWLEDGE);
        Topic topic = session.createTopic(topicName);
        TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

        connection.start();

        //主题的订阅者
        Message receive = topicSubscriber.receive();
        while (receive!=null){

            TextMessage textMessage =(TextMessage) receive;
            System.out.println("====="+textMessage.getText());
            receive = topicSubscriber.receive(5000);
        }


        System.in.read();
        session.close();
        connection.close();
    }
  1. 一定要先运行一次消费者,类似于订阅这个主题
  2. 然后在运行生产者发送信息
  3. 无论消费者是否在线,都会接收到,不在线的话,下次连接的时候,会把没有收到的消息接收下来

4、ActiveMQ的broker

定义:相当于一个ActiveMQ服务器实例

Broker其实就是实现了用代码的形式启动ActiveMQ将MQ嵌入到Java代码中,以便随时用随时启动,

在用的时候再去启动这样节省了资源,也保证了可靠性。

方式:

用ActiveMQ Broker作为独立的消息服务器来构建java应用

ActiveMQ也支持在VM中通信基于嵌入式的broker,能够无缝的集成其他java应用

所需mvn依赖

		//ActiveMQ核心依赖
		<dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
            
         // 整合spring所需要的依赖  
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
            
          //进行json数据格式转换,在进行Java内嵌activqMQ时,需要引进,否则会报错
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>

代码实现

//相当于在本机上边开启了一个ActiveMQ的服务
public static void main(String[] args) throws Exception {
        BrokerService brokerService = new BrokerService();
        brokerService.setUseJmx(true);
        brokerService.addConnector("tcp://localhost:61616");
        brokerService.start();
    }

image-20230705214051813

5、spring整合ActiveMQ

application.xml

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:aop="http://www.springframework.org/schema/aop"
       xmlns:tx="http://www.springframework.org/schema/tx"
       xmlns:context="http://www.springframework.org/schema/context"
       xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx.xsd http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd">

    <!--开启包的自动扫描-->
    <context:component-scan base-package="com.atguigu.activemq"/>
    <bean id="jmsFactory" class="org.apache.activemq.pool.PooledConnectionFactory" destroy-method="stop">
        <property name="connectionFactory">
            <bean class="org.apache.activemq.ActiveMQConnectionFactory">
                <property name="brokerURL" value="tcp:192.168.160.128"/>
            </bean>
        </property>
        <property name="maxConnections" value="100"/>
    </bean>
    <bean id="destinationQueue" class="org.apache.activemq.command.ActiveMQQueue">
        <constructor-arg index="0" value="spring-active-queue"/>
    </bean>
    <bean id="jsmTemplate" class="org.springframework.jms.core.JmsTemplate">
        <property name="connectionFactory" ref="jmsFactory"/>
        <property name="defaultDestination" ref="destinationQueue"/>
        <property name="messageConverter">
            <bean class="org.springframework.jms.support.converter.SimpleMessageConverter"/>
        </property>
    </bean>
</beans>

生产者

package com.atguigu.activemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.stereotype.Service;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

@Service
public class Producer {
    @Autowired
    private JmsTemplate  jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        Producer producer =(Producer) applicationContext.getBean("Producer");
        producer.jmsTemplate.send(new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
                TextMessage textMessage = session.createTextMessage("********当前消息");
                return textMessage;
            }
        });
    }
}

消费者

package com.atguigu.activemq;

import org.apache.xbean.spring.context.ClassPathXmlApplicationContext;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.ApplicationContext;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;


@Service
public class Customer {
    @Autowired
    private JmsTemplate jmsTemplate;

    public static void main(String[] args) {
        ApplicationContext applicationContext = new ClassPathXmlApplicationContext("applicationContext.xml");
        Customer customer =(Customer) applicationContext.getBean("Customer");
        String receiveAndConvert =(String) customer.jmsTemplate.receiveAndConvert();
        System.out.println(receiveAndConvert);
    }
}

mvn依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>ActiveMQDemo</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!--activemq需要的jav包-->
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-all</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.apache.xbean</groupId>
            <artifactId>xbean-spring</artifactId>
            <version>3.16</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
            <version>2.9.5</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-jms</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>activemq-pool</artifactId>
            <version>5.15.9</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-core</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-context</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-aop</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-orm</artifactId>
            <version>4.3.23.RELEASE</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjrt</artifactId>
            <version>1.6.1</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
            <version>1.6.8</version>
        </dependency>
        <dependency>
            <groupId>cglib</groupId>
            <artifactId>cglib</artifactId>
            <version>2.1_2</version>
        </dependency>
        <!--下面是junit/log4等通用配置-->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>
        <dependency>
            <groupId>ch.qos.logback</groupId>
            <artifactId>logback-classic</artifactId>
            <version>1.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.16.18</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.12</version>
        </dependency>
    </dependencies>
    <properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
    </properties>

</project>

6、springBoot整合ActiveMQ

7、ActiveMQ的传输协议

  • ActiveMQ支持的client-broker通讯协议有:TCP、NIO、 UDP、SSL、Https(Http)、VM 。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。

image-20230715134903343

注:在ActiveMQ中默认的使用TCP协议,也默认支持多种的传输协议

设置支持NIO网络协议

<transportConnector name="nio" uri="nio://0.0.0.0:61618"/>

设置进行多协议支持,在5.13之后在支持auto多协议同时支持

<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61617?auto.protocols=default,stomp"/>

image-20230715142346935

8、ActiveMQ的消息存储和持久化

为了避免以为宕机之后数据的丢失,需要做到重启之后可以恢复消息队列,消息系统一般都会采用持久化机制。ActiveMQ的消息持久化机制又JDBC、AMQ、KahaDB(默认使用)、LevelDB,无论使用哪一种持久化的机制,消息存储的逻辑都是一致的。

消息存储机制:

就是在发送者在发送出去之后,消息中心首先将消息存储到本地的数据文件、内存数据库或者远程数据库等在试图将消息在发送给接收者,成功则将消息从存储中进行删除,失败则继续尝试发送。消息中心启动以后首先要检查指定的存储位置,如果有未发送成功的消息,则需要将消息发送出去。

KahaDB:在5.3版本之后建议推荐使用KahaDB存储方式,在5.4版本之后,默认使用kahaDB存储方式

image-20230715145324601

默认的文件的存储位置在activeMQ下的data文件中

image-20230715145453094

8.1、KahaDB存储机制的原理

  • 在KahaDB在消息目录进行存储时。只有4类文件和一个lock。以下四个文件还有一个db.free 四类文件一把锁

  • KahaDB会将消息存储在db-.log文件之中,当一个文件已满时(默认的一个文件的大小为32MB),会自动创建一个新的文件进行相关的存储。例:db-1.log、db-2.log …。当不会再有引用到数据文件中的任何数据消息时,文件会被删除或者时归档。
  • db.data该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,他是消息的索引文件,本质是B-Tree(B树),使用B-Tree作为索引执行db-.log文件中存储的消息
  • db.free当前db.data文件那些页面是空闲的,文件具体内通过是所有空闲页面的ID
  • db.radio是用来进行消息恢复的,当KahaDB消息存储被强制退出后启动,用于恢复BTree索引
  • lock为文件的读取进行添加锁机制,防止数据出现混乱。

8.2、JDBC消息存储(一部分消息会被存储到数据库中)

注:对于长时间的存储,建议使用JDBC的存储方式

  • 将数据库驱动jar包放到MQ的lib文件夹下
  • 做JDBC吃持久化的配置,对文件进行修改适配

image-20230715153921537

<persistenceAdapter> 
  <jdbcPersistenceAdapter dataSource="#mysql-ds" createTablesOnStartup="true"/>   
</persistenceAdapter>

dataSource指定将要引用的持久化数据库的bean名称;
createTablesOnStartup 是否在启动的时候创建数据表,默认值是true;
这样每次启动都会去创建数据表,一般是第一次启动的时候设置为true 之后改成false;

  • 数据库连接池的配置
relaxAutoCommit 表示进行自动提交

<bean id="mysql-ds" class="org.apache.commons.dbcp2.BasicDataSource" destroy-method="close"> 
    <property name="driverClassName" value="com.mysql.jdbc.Driver"/> 
    <property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/> 
    <property name="username" value="activemq"/> 
    <property name="password" value="activemq"/> 
    <property name="poolPreparedStatements" value="true"/> 
</bean> 

注:注意配置信息将要添加的位置,否则可能会报错:

**image-20230715155441934

  • 建仓SQL和建表说明
  1. 创建对应名称的数据库
  2. 创建表,默认表名:
    1. ACTIVEMQ_MSGS
    2. ACTIVEMQ_ACKS
    3. ACTIVEMQ_LOCK
  3. 如果新建数据库OK+上述的配置OK+代码运行OK,3表会自动生成。

ACTIVEMQ_MSGS表字段:

ID:自增数据库主键
CONTAINER:消息的Destination
MSGID_PROD:消息发送者的主键
MSG_SEQ:是发下哦那个消息的顺序,MSGID_PROD+MSG_SEQ可以足证JMS的MessageID
EXPIRATION:消息的过期时间,存储的是从1970-01-01
MSG:消息本体的Java序列对象的二进制数据
PRIORITY:优先级,0-9,数值越大优先级越高

ACTIVEMQ_ACKS表字段:用于存储订阅关系,如果是持久化topic,订阅者和服务器的订阅关系在这个表里面进行保存;

CONTAINER:消息的介绍
SUB_DEST:如果使用的是Static集群,这个字段会有集群其他系统的信息
CLIENT_ID:每个订阅者都必须有一个唯一的客户端ID用以区分
SUB_NAME:订阅者名称
SELECTOR:选择器,可以选择之消费满足条件的信息,条件可以用自定义的属性进行实现,支持多属性ANDOR
LAST_ACKED_ID:记录消费过的消息的ID

可能出现的错误

在这里插入图片描述

解决方法:

在保证activeMQ在使用JDBC消息持久化时,所需要的jar包,以及数据库密码,用户、地址都没有错误的情况下,造成报错的原因可能是数据库远程连接的权限没有被放开,导致数据库在进行远程连接时,连接失败。这种情况下,需要放开数据库远程连接的权限。

  • 在本地通过连接数据库的工具或者cmd命令框进入到数据库都可以,在这我采用cmd
mysql -u root -p

image-20230716162207402

  • 查看mysql中存在的库,并使用mysql库
show databases;
use mysql;

image-20230716162533190

  • 查看对应表
select User,Host from user;

image-20230716162455449

注:如果权限没有被放开的情况下,root所对应的Host为localhost,这是我们需要将其改为%

update set Host='%' where User='root';
  • 进行配置刷新
flush privileges;

注:在修改完成之后一定要进行配置刷新,否则相关修改不起作用

  • 在服务器重启activeMQ,进行日志查看

image-20230716162852413

  • 启动成功,activeMQ会在你对应的数据库中创建相关的表

image-20230716162949701

注:在使用消息持久化存储机制时,一定要将activeMQ设置为持久化。否则不会将信息存储到数据库中

点对点类型
在DeliveryMode设置为NODE_PERSISTENCE(非持久化)时,消息保存在内存中

在DeliveryMode设置为PERSISTENCE(持久化),消息保存在broker的相应的文件或者数据库中

而且点对点类型中消息一旦被消费,消息就会在存储的位置进行删除操作。

在使用Topic时,在消息持久化,消息在被消费的时候,消息不会被删除。

8.3、开发中遇到的问题

如果是queue

在没有消费者消费的情况下会将信息保存在activemq_msgs表中,只要有任意消费者已经消费过了,消费之后这些消息将被删除。

如果时topic

一般是先启动消费订阅然后在生产的情况下,会将消息保存到activemq_acks表中

数据库jar包

记得需要使用到的相关的jar文件,放置到lib目录下,mysql-jdbc驱动的jar包和对应的数据库连接池的jar包。默认是dbcp

createTableOnStartup属性
在jdbcPersistenceAdapter标签中设置了createTableOnStartup属性为true时,在第一次启动ActiveMQ时,ActiveMQ服务节点会自动在数据库中创建相关的数据库表,启动完成后可以去掉这个属性,或者是将属性值修改为false。其属性值默认为true,建议在不用的时候将属性值,修改为false

下划线的问题

“java.lang.lllegalStateExcepton:BeanFactory not initialized or already closed”

产生报错的原因是因为您的操作系统的机器名中存在“_”符号,请修改机器名并重启可以解决相关问题。

8.4、高性能缓存(ActiveMQ Journal)

在activemq.xml中进行配置

<persistenceFactory>
    <journalPersistenceAdapterFactory journalLogFiles="4"
                                      journalLogFileSize=""
                                      useJournal="true"
                                      useQuickJournal="true"
                                      dataSource="#mysql-ds"
                                      dataDirectory="activemq-data"/>
</persistenceFactory>    

配置完成之后,对activeMQ进行重启操作

9、ActiveMQ的高级特性

9.1、ActiveMQ异步传输以及确认发送成功

注:ActiveMQ默认的消息发送方式为异步传输,但是建议在进行代码编写时,再次将消息传输的方式设置为异步
注:当消息设置为不采用事务,但是却将消息设置为持久化时,MQ会默认将消息的传输 方式设置为同步消息传输。这样的传输方式,只有当消息发送出去之后,只有被接受返回成功之后,才会进行下一个消息的处理,容易造成堵塞。

注:在进行异步消息传输时,MQ允许存在极少量的数据丢失,也会存在极少量数据丢失的情况。

例:

cf = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");

((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);

((ActiveMQConnection)connection).setUseAsyncSend(true);

注:在使用异步消息队列是,注意要采用消息的回调,来确认消息是否发送成功

 public static  void queueDemo() throws JMSException {
//        String path="tcp://192.168.160.128:61616/";
        String name ="queue01";

        //创建连接工厂,采用默认的用户名和密码
        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
        //创建连接并启动
        Connection connection = connectionFactory.createConnection();

        connection.start();
        //创建会话session,第一个参数是事务,第二个参数是签收
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        //创建目的地
        Queue queue = session.createQueue(name);
        //创建消息生产者
        ActiveMQMessageProducer producer = (ActiveMQMessageProducer) session.createProducer(queue);

        //进行消息发送
        for (int i =0;i<3;i++){
            //创建消息
            String message = "这是第"+i+"条消息";
            //创建消息
            Message textMessage = session.createTextMessage(message);
            textMessage.setJMSMessageID(UUID.randomUUID().toString().replaceAll("-","")); //设置消息的id
            //消息发送
            producer.send(textMessage, new AsyncCallback() {
                @Override
                public void onSuccess() {
                    //进行消息的回调
                    System.out.println("发送成功的消息"+textMessage.toString());
                }

                @Override
                public void onException(JMSException e) {
                     //进行消息的回调
                    System.out.println("发送失败的消息"+textMessage.toString());
                }
            });
        }
        producer.close();
        session.close();
        connection.close();
        System.err.println("消息发送完成");

    }

9.2、消息的延时发送和定时发送

image-20230726211612426

9.3、分发策略

9.4、ActiveMQ重试机制(重新交付政策)

官网地址:https://activemq.apache.org/redelivery-policy

引起消息重发的情况

Client用了transaction且在session中调用了rollback();(没被签收被回调)

Client用了transactions且在调用commit()之前关闭或者时没有commit(事务没有被提交)

Client在CLIENT_ACKNOWLEDGE的传递模式下,在session中调用了recover()

默认:每一秒钟发送6次,当消息发送超过最大次数时,该消息会被标志位异常消息,最终会被存储到死信队列

常用属性:

image-20230726213519746

9.5、死信队列

处理发送失败的消息

  • 一般的生产环境之中,MQ一般会设置两个队列:核心业务队列和死信队列
  • 核心业务队列:就是处理正常的业务信息,死信队列主主要是处理异常的业务队列信息

将所有的DeadLetter保存到一个共享的队列之中,是ActiveMQ的默认的策略

共享队列默认为ActiveMQ.DLQ可以通过deadLetterQueue属性进行设定

<deadLetterStratege>
    <shareDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStratege>   

可以将定时自动删除死信队列中的消息,或者可以存储非持久的异常消息

9.6、如何保证消息不被重复消费

可以通过radis进行相关的解决

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/794024.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【uni-app2.0】实现登录页记住密码功能

使用uni-app的uni.setStorageSync()和uni.getStorageSync()方法来存储和读取密码 在登录页中添加一个记住密码的u-checkbox选项&#xff0c;并在data里面添加一个rememberPwd的布尔值&#xff0c;在每次点击记住密码change的时候来记录用户的选择 <u-checkbox-group place…

Spring Boot 自定义启动画面

文章目录 自定 Banner获取属性设置颜色实操关闭 Banner参考 我们启动项目的之后&#xff0c;会在控制台上看到类似下面的画面&#xff1a; 那么&#xff0c;我们是否可以自定义呢&#xff1f; 肯定可以 自定 Banner 上面的截图信息就是 Banner 信息&#xff0c;我们可以在项目…

JavaScript |(二)JavaScript自定义对象及函数 | 尚硅谷JavaScript基础实战

学习来源&#xff1a;尚硅谷JavaScript基础&实战丨JS入门到精通全套完整版 文章目录 &#x1f4da;自定义对象&#x1f407; 对象的分类&#x1f407;对象基本操作&#x1f407;对象的属性&#x1f407;基本和引用数据类型&#x1f407;对象字面量 &#x1f4da;函数&#…

边写代码边学习之全连接Dense

1. 全连接原理 全连接神经网络&#xff08;Fully Connected Neural Network&#xff09;是一种最基本的神经网络结构&#xff0c;也被称为多层感知器&#xff08;Multilayer Perceptron&#xff0c;MLP&#xff09;。其原理是模拟人脑神经元之间的连接方式&#xff0c;通过多个…

AI视频监控综合管理平台EasyCVR多分屏默认播放协议的配置优化

智能视频监控平台EasyCVR可拓展性强、开放度高&#xff0c;既能作为业务平台使用&#xff0c;也能作为视频能力层被调用和集成。视频监控综合管理平台兼容度高&#xff0c;支持自由调用、支持与第三方集成。在AI能力的接入上&#xff0c;TSINGSEE青犀视频平台可支持AI智能分析网…

SD NAND【商业】

SD NAND【商业】 前言版权推荐SD NAND外观NAND与TF卡的区别雷龙CS SD NAND(贴片式TF卡)性能体验及应用 最后 前言 2023-7-23 16:20:19 因为本人对硬件了解不是很多&#xff0c;所以该篇参考自官方文档。 以下内容源自《【商业】》 仅供学习交流使用 版权 禁止其他平台发布…

linux 学成之路(基础篇)(二十三)MySQL服务(下)

目录 一、用户权限管理概述 二、用户权限类型 三、用户赋予权限 四、删除权限 五、删除用户 一、用户权限管理概述 数据库用户权限管理是数据库系统中非常重要的一个方面&#xff0c;它用于控制不同用户访问和操作数据库的权限范围。数据库用户权限管理可以保护敏感数据和…

QT项目打包成软件进行发布的三种方式

目录 一、打包成绿色便携版 二、打包成单文件版 三、打包成可安装版本 本教程对应的IDE是Qt Creater。 保证绿色便携版能正常运行才能够打包成单文件版本和可安装版本。 一、打包成绿色便携版 特点&#xff1a;给别人发送的时候需要先制作成一个压缩包文件&#xff0c;解…

【javaSE】 递归与汉诺塔详解

目录 递归 生活中的故事 递归的概念 递归的必要条件 示例 递归执行过程分析 代码示例 递归练习 练习一 执行过程图 练习二 执行过程图 练习三 执行流程图 ​编辑斐波那契数列 汉诺塔 汉诺塔问题解析 总结 递归 关于递归博主在C语言部分也进行了详解&#xff…

Rabbit MQ整合springBoot

一、pom依赖二、消费端2.1、application.properties 配置文件2.2、消费端核心组件 三、生产端3.1、application.properties 配置文件2.2、生产者 MQ消息发送组件四、测试1、生产端控制台2、消费端控制台 一、pom依赖 <dependency><groupId>org.springframework.boo…

【lesson4】linux权限

文章目录 权限权限是什么&#xff1f;对人权限对角色和文件权限权限修改改属性改人 权限 权限分为两种对人权限和对角色和文件的权限 权限是什么&#xff1f; 在脑海中我们对权限有一定的理解那么权限的定义到底是什么我们却说不出来&#xff0c;接下来我们来举个例子介绍一…

黑客和网络安全学习资源,限时免费领取,点这里!

统计数据显示&#xff0c;目前我国网安人才缺口达140万之多… 不管你是网络安全爱好者还是有一定工作经验的从业人员 不管你是刚毕业的行业小白还是想跳槽的专业人员 都需要这份超级超级全面的资料 几乎打败了市面上90%的自学资料 并覆盖了整个网络安全学习范畴 来 收藏它&…

MySQL基础(三)用户权限管理

目录 前言 一、概述 二、用户权限类型 1.CREATE 2.DROP 三、用户赋权 例子 四、权限删除 例子 五、用户删除 例子 总结 前言 关于MySQL的权限简单的理解就是MySQL允许你做你权利以内的事情&#xff0c;不可以越界。MySQL服务器通过权限表来控制用户对数据库的访问&…

[SSM]Spring中的JabcTemplate

目录 十三、JdbcTemplate 13.1环境准备 13.2新增 13.3修改 13.4删除 13.5查询 13.6查询一个值 13.7批量添加 13.8批量修改 13.9批量删除 13.10使用回调函数 13.11使用德鲁伊连接池 十三、JdbcTemplate JdbcTemplate是Spring提供的一个JDBC模板类&#xff0c;是对JDBC…

如何使用一个数据库构建一个消耗大量IOPS的应用程序

​我很喜欢关于社交媒体和数据库的创作主意。所以&#xff0c;让我们以一个新的方向来探索&#xff1a;看看Twitch.tv或任何具有即时通讯功能的平台。如果你刚开始接触数据库&#xff0c;可以阅读之前的那篇文章&#xff1a;社交媒体中的“点赞”“喜欢”是如何存储在数据库中的…

ubuntu开机自启动

ubuntu开机自启动 1、建一个test.sh脚本&#xff0c;并写入 #!/bin/sh gnome-terminal -x bash -c ‘cd /home/文件路径/;python3 main.py’ exit 0 2、:wq!保存 3、创建rc-local.service文件&#xff08;sudo vim /etc/systemd/system/rc-local.service&#xff09;&#xf…

Python post请求发送的是Form Data的类型

常规的Form Data 大部分的Form Data 可以直接都是可以通过正常的post请求进行提交的 import requestsheaders {自己设置的请求头键: 自己设置的请求头键,Content-Type: 网页接受的数据类型 }form_data {对应的键1&#xff1a;对应的值1,对应的键2&#xff1a;对应的值2, }r…

【C++】C++11右值引用|新增默认成员函数|可变参数模版|lambda表达式

文章目录 1. 右值引用和移动语义1.1 左值引用和右值引用1.2 左值引用和右值引用的比较1.3右值引用的使用场景和意义1.4 左值引用和右值引用的深入使用场景分析1.5 完美转发1.5.1 万能引用1.5.2 完美转发 2. 新的类功能2.1 默认成员函数2.2 类成员变量初始化2.3 强制生成默认函数…

(链表) 剑指 Offer 25. 合并两个排序的链表 ——【Leetcode每日一题】

❓剑指 Offer 25. 合并两个排序的链表 难度&#xff1a;简单 输入两个递增排序的链表&#xff0c;合并这两个链表并使新链表中的节点仍然是递增排序的。 示例1&#xff1a; 输入&#xff1a;1->2->4, 1->3->4 输出&#xff1a;1->1->2->3->4->4 …

畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC

文章目录 畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC1. 畅捷通TPlus DownloadProxy.aspx 简介2.漏洞描述3.影响版本4.fofa查询语句5.漏洞复现6.POC&EXP7.整改意见8.往期回顾 畅捷通TPlus DownloadProxy.aspx 存在任意文件读取漏洞 附POC 免责声明&#x…