ActiveMQ指南

news2024/11/15 20:29:41

入门

官网: http://activemq.apache.org/

ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。

JMS

JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。

优势
  • 异步:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端;
  • 解耦:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
  • 可靠:JMS保证消息只会递送一次。可能你遇到过重复创建消息问题,而JMS能帮你避免该问题。
消息模型
点对点(Point-To-Point)
  • 概述
    点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发送到由某个特定名字标示的消息队列(Queue)中,消息消费者从这个特定队列中获取对应消息。在消息传送给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。
    在点对点消息传送模型中,应用程序由消息队列(Queue),发送者(Sender),接收者(Receiver)组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。
  • 特点
    • 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
    • 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响消息被发送到消息队列中。
    • 当接收者收到消息的时候,会发送确认收到通知(acknowledgement)到消息队列。
  • 模型图
发布订阅(Publish/Subscribe)
  • 概述
    在发布/订阅消息模型中,发布者发布一个消息,该消息通过主题(Topic)传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅主题(Topic)。主题(Topic)主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
    在发布与订阅消息传送模型中,应用程序由主题(Topic),发布者(Publisher),订阅者(Subscriber)组成。客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。
  • 特点
    • 每个消息可以有多个消费者即多个订阅者。
    • 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
    • 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
  • 模型图
消息消费
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
  • 阻塞:订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。
  • 异步:订阅者或接收者需要注册一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
JMS 组成

JMS API
要素作用
Destination表示消息所走通道的目标定义,用来定义消息从发送端发出后要走的通道,而不是接收方。Destination属于管理类对象
ConnectionFactory顾名思义,用于创建连接对象,ConnectionFactory属于管理类的对象
Connection连接接口,所负责的重要工作时创建Session
Session会话接口,这是一个非常重要的对象,消息发送者、消息接收者以及消息对象本身,都是通过这个会话对象创建的
MessageConsumer消息的消费者,也就是订阅消息并处理消息的对象
MessageProducer消息的生产者,也就是用来发送消息的对象
Message消息,包含了消息头、消息体、消息属性

  • ConnectionFactory
    创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。
  • Destination
    Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic
  • Connection
    Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产 生一个或多个Session
  • Session
    Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。
  • Producter
    Producter(消息生产者):消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。
  • Consumer
    Consumer(消息消费者):消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。
  • MessageListener
    消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。
  • Message
    发送的消息实例

ActiveMQ安装及控制台

原生 JMS API 入门案例

导入依赖
<dependencies>
  <!--  activemq  所需要的jar 包-->
  <dependency>
    <groupId>org.apache.activemq</groupId>
    <artifactId>activemq-all</artifactId>
    <version>5.15.9</version>
  </dependency>
  <!--  activemq 和 spring 整合的基础包 -->
  <dependency>
    <groupId>org.apache.xbean</groupId>
    <artifactId>xbean-spring</artifactId>
    <version>3.16</version>
  </dependency>
</dependencies>
Queue(PTP模式)的生产及消费
消息生产者
package com.at.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce {
    //  linux 上部署的activemq 的 IP 地址 + activemq 的端口号
    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    // 目的地的名称
    public static final String QUEUE_NAME = "jdbc01";


    public static void main(String[] args) throws  Exception{
        // 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 2 通过连接工厂,获得连接 connection 并启动访问。
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        // 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的生产者
        MessageProducer messageProducer = session.createProducer(queue);
        // 6 通过messageProducer 生产 3 条 消息发送到消息队列中
        for (int i = 1; i < 4 ; i++) {
            // 7  创建消息
            TextMessage textMessage = session.createTextMessage("msg--" + i);
            // 8  通过messageProducer发送给mq
            messageProducer.send(textMessage);
        }
        // 9 关闭资源
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** 消息发送到MQ完成 ****");
    }
}

控制台显示:

Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。

Number Of Consumers:消费者数量,消费者端的消费者数量。

Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。

Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。

总结:

当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。

当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。

当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。

消息消费者
  • 阻塞方式:
package  com.at.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

// 消息的消费者
public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        // 5 创建消息的消费者
        MessageConsumer messageConsumer = session.createConsumer(queue);
        while(true){
            // reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
            // 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
            TextMessage message = (TextMessage)messageConsumer.receive(); 
            if (null != message){
                System.out.println("****消费者的消息:"+message.getText());
            }else {
                break;
            }
        }
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

控制台显示:

  • 非阻塞方式
package  com.at.activemq.queue;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

// 消息的消费者  也就是回答消息的系统
public class JmsConsumer {

    public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    public static final String QUEUE_NAME = "jdbc01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);

        /* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
           通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
         */
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message)  {
//  instanceof 判断是否A对象是否是B类的子类
                    if (null != message  && message instanceof TextMessage){
                        TextMessage textMessage = (TextMessage)message;
                        try {
                            System.out.println("****消费者的消息:"+textMessage.getText());
                        }catch (JMSException e) {
                            e.printStackTrace();
                        }
                }
            }
        });
        // 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
        // 实际开发中,我们的程序会一直运行,这句代码都会省略。
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
总结
  • 2种消费方式
    同步阻塞方式(receive)
    订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
    异步非阻塞方式(监听器onMessage())
    订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。
  • 队列的特点
    点对点消息传递域的特点如下:
    (1) 每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
    (2) 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费
    者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
    (3) 消息被消费后队列中不会再存储,
    所以消费者不会消费到已经被消费掉的消息。
  • 消息消费情况
    情况1:只启动消费者1
    结果:消费者1会消费所有的数据。
    情况2:先启动消费者1,再启动消费者2。
    结果:消费者1消费所有的数据。消费者2不会消费到消息。
    情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。
    结果:消费者1和消费者2平摊了消息。各自消费3条消息。
Topic(Pub/Sub模式)的生产及消费
消息生产者
package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsProduce_topic {

    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws  Exception{
             ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        Topic topic = session.createTopic(TOPIC_NAME);

        MessageProducer messageProducer = session.createProducer(topic);
        for (int i = 1; i < 4 ; i++) {
            TextMessage textMessage = session.createTextMessage("topic_name--" + i);
            messageProducer.send(textMessage);
            MapMessage mapMessage = session.createMapMessage();
        }
        messageProducer.close();
        session.close();
        connection.close();
        System.out.println("  **** TOPIC_NAME消息发送到MQ完成 ****");
    }
}
消息消费者
package  com.at.activemq.topic;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class JmsConsummer_topic {
    public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
    public static final String TOPIC_NAME = "topic01";

    public static void main(String[] args) throws Exception{
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 4 创建目的地 (两种 : 队列/主题   这里用主题)
        Topic topic = session.createTopic(TOPIC_NAME);

        MessageConsumer messageConsumer = session.createConsumer(topic);
// MessageListener接口只有一个方法,可以使用lambda表达式
        messageConsumer.setMessageListener( (message) -> {
            if (null != message  && message instanceof TextMessage){
                     TextMessage textMessage = (TextMessage)message;
                    try {
                      System.out.println("****消费者text的消息:"+textMessage.getText());
                    }catch (JMSException e) {
                    }
                }
        });

        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。

  • 控制台显示
    topic有多个消费者时,消费消息的数量 ≈ 在线消费者数量*生产消息的数量。
    下图展示了:我们先启动了3个消费者,再启动一个生产者,并生产了3条消息。

Spring 与 ActvieMQ 整合

消息生产者

  • 引入依赖
<dependencies>
    <dependency>
        <groupId>org.apache.activemq</groupId>
        <artifactId>activemq-all</artifactId>
        <version>5.11.2</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-core</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-web</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-oxm</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-tx</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jdbc</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-webmvc</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-aop</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-context-support</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-test</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
        <version>5.0.2.RELEASE</version>
    </dependency>
    <dependency>
        <groupId>javax.jms</groupId>
        <artifactId>javax.jms-api</artifactId>
        <version>2.0.1</version>
    </dependency>
    <dependency>
        <groupId>org.apache.xbean</groupId>
        <artifactId>xbean-spring</artifactId>
        <version>3.7</version>
    </dependency>
    <dependency>
        <groupId>junit</groupId>
        <artifactId>junit</artifactId>
        <version>4.12</version>
    </dependency>
</dependencies>
  • 编写Spring整合ActiveMQ配置:applicationContext-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:amp="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
    <!--1.创建连接工厂对象-->
    <amp:connectionFactory
		id="connetionFactory"
		brokerURL="tcp://192.168.66.133:61616"
		userName="admin"
		password="admin"/>
    <!--2.创建缓存连接工厂-->
    <bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
        <!--注入连接工厂-->
        <property name="targetConnectionFactory" ref="connetionFactory"/>
        <!--缓存消息数据-->
        <property name="sessionCacheSize" value="5"/>
    </bean>
    <!--3.创建用于点对点发送的JmsTemplate-->
    <bean id="jmsQueueTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <!--注入缓存连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--指定是否为发布订阅模式-->
        <property name="pubSubDomain" value="false"/>
    </bean>
    <!--4.创建用于发布订阅发送的JmsTemplate-->
    <bean id="jmsTopicTemplate"
class="org.springframework.jms.core.JmsTemplate">
        <!--注入缓存连接工厂-->
        <property name="connectionFactory" ref="cachingConnectionFactory"/>
        <!--指定是否为发布订阅模式-->
        <property name="pubSubDomain" value="true"/>
    </bean>
</beans>
  • 编写测试类,实现发送消息
package com.idata.tuxi.wizard.ui;

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;

import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;

/**
 * 演示Spring与ActiveMQ整合
 */
@RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
@ContextConfiguration("classpath:applicationContext-producer.xml") // 加载spring配置文件
public class SpringProducer {
    //点对点模式
    @Autowired
    @Qualifier("jmsQueueTemplate")
    private JmsTemplate jmsQueueTemplate;
    //发布订阅模式
    @Autowired
    @Qualifier("jmsTopicTemplate")
    private JmsTemplate jmsTopicTemplate;

    /**
     * 点对点发送
     */
    @Test
    public void ptpSender() {
        /**
         * 参数一:指定队列的名称
         * 参数二:MessageCreator接口,我们需要提供该接口的匿名内部实现
         */
        jmsQueueTemplate.send("spring_queue", new MessageCreator() {
		//我们只需要返回发送的消息内容即可
         @Override
         public Message createMessage(Session session) throws JMSException {
				//创建文本消息
                TextMessage textMessage = session.createTextMessage("spring test
                        message");
                return textMessage;
            }
        });
        System.out.println("消息发送已完成");
    }

    /**
     * 发布订阅发送
     */
    @Test
    public void psSender() {
        jmsTopicTemplate.send("spring_topic", new MessageCreator() {
            @Override
            public Message createMessage(Session session) throws JMSException {
//创建文本消息
                TextMessage textMessage = session.createTextMessage("spring test
                        message--topic");
                return textMessage;
            }
        });
        System.out.println("消息发送已完成");
    }
}

消息消费者

  • 编写监听器:监听主题消息、队列消息
    @Component
    public class EmailMessageListener implements MessageListener {
        @Override
        public void onMessage(Message message) {
            MapMessage mapMessage = (MapMessage) message;
            try {
                String email = mapMessage.getString("email");
                System.out.println("消费消息:" + email);
            } catch (JMSException e) {
                e.printStackTrace();
            }
        }
    }
  • 编写Spring整合ActiveMQ配置:applicationContext-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans
    xmlns="http://www.springframework.org/schema/beans"
    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xmlns:aop="http://www.springframework.org/schema/aop"
    xmlns:context="http://www.springframework.org/schema/context"
    xmlns:tx="http://www.springframework.org/schema/tx"
    xmlns:amq="http://activemq.apache.org/schema/core"
    xmlns:jms="http://www.springframework.org/schema/jms"
    xsi:schemaLocation="
        http://www.springframework.org/schema/beans
        http://www.springframework.org/schema/beans/spring-beans.xsd
        http://www.springframework.org/schema/aop
        http://www.springframework.org/schema/aop/spring-aop.xsd
        http://www.springframework.org/schema/context
        http://www.springframework.org/schema/context/spring-context.xsd
        http://www.springframework.org/schema/tx
        http://www.springframework.org/schema/tx/spring-tx.xsd
        http://www.springframework.org/schema/jms
        http://www.springframework.org/schema/jms/spring-jms.xsd
        http://activemq.apache.org/schema/core
        http://activemq.apache.org/schema/core/activemq-core.xsd">
    <!-- 1. 创建ActiveMQ连接工厂 -->
    <amq:connectionFactory
        id="amqConnectionFactory"
        userName="admin" password="admin"
        brokerURL="tcp://192.168.12.132:61616"/>
    <!-- 2. 创建缓存工厂 -->
    <bean id="cachingConnectionFactory"
		class="org.springframework.jms.connection.CachingConnectionFactory">
        <!-- 注入 连接工厂-->
        <property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
        <!-- session缓存数目 -->
        <property name="sessionCacheSize" value="5"></property>
    </bean>
    <!--开启注解扫描-->
    <context:component-scan basepackage="cn.itcast.spring_activemq_consumer"/>
    <!--
        配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。
        jms:listener-container
        destination-type 监听的JMS消息类型(queue、topic)
        connection-factory Spring的缓存连接工厂
        jms:listener
        destination 对应MQ中队列名称或主题名称
        rel 消息监听器类(实现MessageListener接口)
	-->
    <!-- 3.1 监听指定名称(email)的队列中的消息-->
    <jms:listener-container destination-type="queue" connection-factory="cachingConnectionFactory">
        <jms:listener destination="email" ref="emailMessageListener"/>
    </jms:listener-container>
    <!-- 3.2 监听指定名称(email)的主题中的消息 -->
    <jms:listener-container destination-type="topic" connection-factory="cachingConnectionFactory">
        <jms:listener destination="sms" ref="smsMessageListener"/>
    </jms:listener-container>
</beans>
  • 编写测试类,实现发送消息
   /**
     * Spring整合ActiveMQ消费消息
     */
    public class Consumer {
        public static void main(String[] args) throws IOException {
            ApplicationContext ac =
                    new ClassPathXmlApplicationContext("applicationContext-activemq-consumer.xml");
            System.in.read();
        }
    }

SpringBoot 与 ActiveMQ 整合

消息生产者

  • 引入依赖
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    <relativePath/>
</parent>
<properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    <maven.compiler.source>1.8</maven.compiler.source>
    <maven.compiler.targer>1.8</maven.compiler.targer>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-activemq</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
        <scope>test</scope>
    </dependency>
</dependencies>
  • 配置
server:
	port: 9001 #端口
spring:
	application:
		name: activemq-producer # 服务名称
	# springboot与activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 连接地址
		user: admin # activemq用户名
		password: admin # activemq密码
	# 指定发送模式 (点对点 false , 发布订阅 true)
	jms:
		pub-sub-domain: false
  • 编写启动类
package com.itheima.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 生产者启动类
*/
@SpringBootApplication
public class ProducerApplication {
	public static void main(String[] args) {
		SpringApplication.run(ProducerApplication.class,args);
	}
}
  • 编写生产者
package com.itheima.producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* 演示SpringBoot与ActiveMQ整合- 消息生产者
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
	//JmsMessagingTemplate: 用于工具类发送消息,通过主题名/队列名自动区分
	@Autowired
	private JmsMessagingTemplate jmsMessagingTemplate;
	@Test
	public void ptpSender(){
        /**
        * 参数一:队列的名称或主题名称
        * 参数二:消息内容
        */
		jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot message");
	}
}

消息消费者

  • 引入依赖
    与生产者一致
  • 配置
server:
	port: 9002 #端口
spring:
	application:
		name: activemq-consumer # 服务名称
	# springboot与activemq整合配置
	activemq:
		broker-url: tcp://192.168.66.133:61616 # 连接地址
		user: admin # activemq用户名
		password: admin # activemq密码
	# 指定发送模式 (点对点 false , 发布订阅 true)
	jms:
		pub-sub-domain: false
activemq:
	name: springboot_queue
  • 编写启动类
package com.itheima.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消息消费者启动类
*/
@SpringBootApplication
public class ConsumerApplication {
	public static void main(String[] args) {
		SpringApplication.run(ConsumerApplication.class,args);
	}
}
  • 编写消费者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
*/
@Component // 放入IOC容器
public class MsgListener {
    /**
    * 用于接收消息的方法
    * destination: 队列的名称或主题的名称
    */
    @JmsListener(destination = "${activemq.name}")
    public void receiveMessage(Message message){
    	if(message instanceof TextMessage){
    		TextMessage textMessage = (TextMessage)message;
            try {
                System.out.println("接收消息:"+textMessage.getText());
            } catch (JMSException e) {
                e.printStackTrace();
            }
    	}
    }
}

可通过@JmsListener下指定对应消费的是Queue/Topic

@Slf4j
@Component
public class JMSConfiguration {
	
    /**
     * 队列容器工厂
     */
    @Bean
    public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(false);
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }

    /**
     * 主题队列工厂
     */
    @Bean
    public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setPubSubDomain(true);
        factory.setConcurrency("1");
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
}

消费时使用

@JmsListener(destination = DestinationConsts.MASTER_CLEANING,containerFactory = "queueListenerFactory")

JMS消息组成

整个JMS协议组成结构如下:

结构描述
JMS Provider消息中间件/消息服务器
JMS Producer消息生产者
JMS Consumer消息消费者
JMS Message消息(重要)

JMS Message消息由三部分组成:

1)消息头

2)消息体

3)消息属性

JMS消息头

JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:

斜体 为重要的消息头

名称描述
JMSDestination消息发送的 Destination,在发送过程中由提供者设置
JMSMessageID唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的 JMSMessageID
JMSDeliveryMode消息持久化。包含值 DeliveryMode.PERSISTENT 或者DeliveryMode.NON_PERSISTENT。
JMSTimestamp提供者发送消息的时间,由提供者在发送过程中设置
JMSExpiration消息失效的时间,毫秒,值 0 表明消息不会过期,默认值为0
JMSPriority消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4
JMSCorrelationID通常用来链接响应消息与请求消息,由发送消息的 JMS 程序设置。
JMSReplyTo请求程序用它来指出回复消息应发送的地方,由发送消息的 JMS 程序设置
JMSTypeJMS 程序用它来指出消息的类型。
JMSRedelivered消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息

不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,**因此开发者使用以上setJMSXXX()**方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType。

JMS消息体

在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:

JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。

  • TextMessage–一个字符串对象 *
  • MapMessage–一套名称-值对
  • ObjectMessage–一个序列化的 Java 对象 *
  • BytesMessage–一个字节的数据流 *
  • StreamMessage – Java原始值的数据流
TextMessage
  • 生产
/**
* 发送TextMessage消息
*/
@Test
public void testMessage(){
	jmsTemplate.send(name, new MessageCreator() {
		@Override
		public Message createMessage(Session session) throws JMSException {
			TextMessage textMessage = session.createTextMessage("文本消息");
			return textMessage;
		}
	});
}
  • 消费
/**
* 接收TextMessage的方法
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
	if(message instanceof TextMessage){
		TextMessage textMessage = (TextMessage)message;
		try {
			System.out.println("接收消息:"+textMessage.getText());
		} catch (JMSException e) {
			e.printStackTrace();
		}
	}
}
MapMessage
  • 生产
/**
* 发送MapMessage消息
*/
@Test
public void mapMessage(){
	jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            MapMessage mapMessage = session.createMapMessage();
            mapMessage.setString("name","张三");
            mapMessage.setInt("age",20);
            return mapMessage;
        }
	});
}
  • 消费
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
    if(message instanceof MapMessage){
        MapMessage mapMessage = (MapMessage)message;
        try {
            System.out.println("名称:"+mapMessage.getString("name"));
            System.out.println("年龄:"+mapMessage.getString("age"));
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}
ObjectMessage
  • 生产
//发送ObjectMessage消息
@Test
public void test2(){
    jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            User user = new User();
            user.setName("小苍");
            user.setAge(18);
            ObjectMessage objectMessage = session.createObjectMessage(user);
            return objectMessage;
        }
    });
}
  • 消费
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
    if(message instanceof ObjectMessage){
        ObjectMessage objectMessage = (ObjectMessage)message;
        try {
            User user = (User)objectMessage.getObject();
            System.out.println(user.getUsername());
            System.out.println(user.getPassword());
        } catch (JMSException e) {
       		e.printStackTrace();	
        }
    }
}
  • 注意:ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。
spring:
	activemq:
		broker-url: tcp://192.168.66.133:61616
		user: admin
		password: admin
		packages:
			trust-all: true # 添加所有包到信任列表
BytesMessage
  • 生产
//发送BytesMessage消息
@Test
public void test3(){
    jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            BytesMessage bytesMessage = session.createBytesMessage();
            try {
                File file = new File("d:/spring.jpg");
                FileInputStream in = new FileInputStream(file);
                byte[] bytes = new byte[(int)file.length()];
                in.read(bytes);
                bytesMessage.writeBytes(bytes);
            } catch (Exception e) {
            	e.printStackTrace();
            }
            return bytesMessage;
        }
    });
}
  • 消费
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
    BytesMessage bytesMessage = (BytesMessage)message;
    FileOutputStream out = new FileOutputStream("d:/abc.jpg");
    byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
    bytesMessage.readBytes(buf);
    out.write(buf);
    out.close();
}
StreamMessage
  • 生产
//发送StreamMessage消息
@Test
public void test4(){
    jmsTemplate.send(name, new MessageCreator() {
        @Override
        public Message createMessage(Session session) throws JMSException {
            StreamMessage streamMessage = session.createStreamMessage();
            streamMessage.writeString("你好,ActiveMQ");
            streamMessage.writeInt(20);
            return streamMessage;
        }
    });
}
  • 消费
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
    StreamMessage streamMessage = (StreamMessage)message;
    String str = streamMessage.readString();
    int i = streamMessage.readInt();
    System.out.println(str);
    System.out.println(i);
}

JMS消息属性

我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。

message.setStringProperty("Property",Property); //自定义属性

JMS 消息可靠性

消息持久化

什么是持久化消息?

保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。

存储方式

ActiveMQ提供了以下三种的消息存储方式:

  1. Memory 消息存储-基于内存的消息存储。
  2. AMQ Message Store, 基于文件的存储机制,是以前的默认机制,现在不再使用
  3. 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复能力。
  4. 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。

ActiveMQ持久化流程图

AMQ Message Store

AMQ 消息存储

基于文件的存储机制,是以前的默认机制,现在不再使用。

AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。

kahaDB Message Store

官网文档:http://activemq.aache.org/kahadb

KahaDB是从ActiveMQ5.4(含)开始默认的持久化插件,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。

KahaDB是一个专门针对消息持久化的解决方案.它对典型的消息使用模式进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。

存储原理

kahadb在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比这就非常简洁了。

  1. db-.og KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-.og。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.og、db-2.og、db-3.og·"。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
  2. db.data 该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree (B树),使用B-Tree作为家引指向db-.log里面存储的消息。
  3. db.free 当前db.data文件里哪些页面是空闲的,文件具体内容是所有空闲页的ID
  4. db.redo 用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。
  5. lock文件锁,表示当前获得kahadb读写权限的broker。

配置

  • 配置文件activemq.xml中。日志文件的存储目录在:%activemq安装目录%/data/kahadb
<persistenceAdapter>
	<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
LevelDB Message Store

这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储在它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引。

由于还推出了Replicated LevelDB,且其余各种原因,暂时不作为默认推荐的存储方式。官网还是推荐默认使用kahaDB。

配置

  • 配置文件activemq.xml中。
< persistenceAdapter >
	<levelDBdirectory = "activemq-data" />
</persistenceAdapter >

JDBC Message Store

  • 添加mysql数据库的驱动包到lib文件夹
  • 修改activemq.xml
<!--配置数据库连接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
    <property name="driverClassName" value="com.mysql.jdbc.Driver" />
    <property name="url"value="jdbc:mysql://192.168.66.133:3306/db_activemq" />
    <property name="username" value="root" />
    <property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter> 
	<!--createTableOnStartup建议在第一次启动开启后关闭 -->
    <jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>

默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。

  • 需要我们准备一个mysql数据库,并创建一个名为activemq的数据库。新建的数据库要采用latin1 或者ASCII编码。
    https://blog.csdn.net/JeremyJiaming/article/details/88734762
    • ACTIVEMQ_MSGS
      如果是queue,在没有消费者消费的情况下会将消息保存到activemq msgs表中,只要有任意一个消费者已经消费过了消费之后这些消息将会立即被删除。
    • ACTIVEMQ_ACKS
      如果是topic,一般是先启动消费订阅然后再生产的情况下会将消息保存到activemq acks。
    • ACTIVEMQ_LOCK
  • 重启activemq
JDBC Message Store with ActiveMQ Journal

这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。

举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。

为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。

Queue(PTP模式)持久化与非持久化

Queue非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。

Queue持久化,当服务器宕机,消息依然存在。queue消息默认是持久化的。(默认配置)

Topic(Pub/Sub模式)持久化与非持久化

topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。

topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。

  • 消费者需要提前接入配置
// 设置客户端ID。向MQ服务器注册自己的名称
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");

注意:

  1. 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
  2. 然后再运行生产者发送消息。
  3. 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。

控制台显示

消息的事务性

消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。

一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。

生产者、消费者与消息服务器直接都支持事务性;

ActionMQ的事务主要偏向在生产者的应用。

ActionMQ消息事务流程图:

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

生产者事务
  • 手动提交方式
/**
* 事务性发送--方案一
*/
@Test
public void sendMessageTx(){
    //获取连接工厂
    ConnectionFactory connectionFactory =jmsMessagingTemplate.getConnectionFactory();
    Session session = null;
    try {
        //创建连接
        Connection connection = connectionFactory.createConnection();
        /**
        * 参数一:是否开启消息事务
        */
        session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        //创建生产者
        MessageProducer producer =  session.createProducer(session.createQueue(name));
        for(int i=1;i<=10;i++){
            //模拟异常
            if(i==4){
           	 int a = 10/0;
            }
            TextMessage textMessage = session.createTextMessage("消息--" +i);
            producer.send(textMessage);
        }
        //注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达
        MQ服务器
        session.commit();
    } catch (JMSException e) {
   		e.printStackTrace();
        //消息事务回滚
        try {
        	session.rollback();
        } catch (JMSException e1) {
        	e1.printStackTrace();
        }
    }
}
  • 配置自动开启事务
    • 配置类
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
	@Bean
	public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
		return new JmsTransactionManager(connectionFactory);
	}
}
  • 业务类
package com.itheima.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送的业务类
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
    @Value("${activemq.name}")
    private String name;
    @Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
    public void sendMessage(){
        for(int i=1;i<=10;i++) {
            //模拟异常
            if(i==4){
                int a = 10/0;
            }
            jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
    	}
    }
}
  • 测试发送方法
@Autowired
private MessageService messageService;
/**
* 事务性发送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
	messageService.sendMessage();
}

生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。

消费者事务
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
    /**
    * 接收消息的方法
    */
    @JmsListener(destination="${activemq.name}",containerFactory = "jmsQueryListenerFactory")
    public void receiveMessage(TextMessage textMessage,Session session) throws JMSException {
        try {
            System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"  + textMessage.getJMSRedelivered());
            int i = 100/0; //模拟异常
            session.commit();//提交事务
        } catch (JMSException e) {
            try {
            session.rollback();//回滚事务
            } catch (JMSException e1) {
            }
            e.printStackTrace();
        }
    }
}

消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。

问题

(1) 问:消费者和生产者需要同时操作事务才行吗?

  答:消费者和生产者的事务,**完全没有关联,各自是各自的事务。**

消息确认(签收)机制

JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:

描述
Session.AUTO_ACKNOWLEDGE当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息
Session.CLIENT_ACKNOWLEDGE客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认
Session.DUPS_ACKNOWLEDGE该选择只是会话迟钝确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true
Session.SESSION_TRANSACTED开始事务的情况下,可以使用该方式。该种方式很少使用到。
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

注意消息确认机制与事务机制是冲突的,只能选其中一种。所以演示消息确认前,先关闭事务。

		**事务偏向生产者,签收偏向消费者**。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。
auto_acknowledge 自动确认
  • 添加配置类:
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
    @Bean(name="jmsQueryListenerFactory")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory=new
        DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(false); // 不开启事务操作
        factory.setSessionAcknowledgeMode(1); //自动确认
        return factory;
    }
}
  • 消费者
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory ="jmsQueryListenerFactory")
    public void receiveMessage(TextMessage textMessage){
        try {
            System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
            + textMessage.getJMSRedelivered());
            throw new RuntimeException("test");
        } catch (JMSException e) {
			e.printStackTrace();
        }
    }
}

如果消费方接收消息失败,JMS服务器会重发消息,默认重发6次。

client_acknowledge 手动确认
  • 配置类
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
    @Bean(name="jmsQueryListenerFactory")
    public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){
        DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        factory.setSessionTransacted(false); // 不开启事务操作
        factory.setSessionAcknowledgeMode(4); //手动确认
        return factory;
    }
}
  • 消费者
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
    /**
    * 接收消息的方法
    */
    @JmsListener(destination="${activemq.name}",containerFactory ="jmsQueryListenerFactory")
    public void receiveMessage(TextMessage textMessage){
        try {
            System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
            + textMessage.getJMSRedelivered());
            textMessage.acknowledge(); // 确认收到消息,一旦消息确认,消息不会重新发送
            throw new RuntimeException("test");
        } catch (JMSException e) {
        	e.printStackTrace();
   		}
    }
}

JMS 消息投递方式

异步投递

ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。

同步发送

消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。

异步发送

如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞 Producer.send方法。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能。不过这也带来了额外的问题,就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加

想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true属性

  1. 当alwaysSyncSend=true
    系统将会忽略useAsyncSend设置的值都采用同步
  2. 当alwaysSyncSend=false
  • NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送
  • 如果useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。
    如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送

总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送!

配置异步投递
//1.在连接上配置
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
//2.通过ConnectionFactory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
//3.通过connection
((ActiveMQConnection)connection).setUseAsyncSend(true);

注意:如果是Spring或SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递

@Configuration
public class ActiveConfig {
/**
* 配置用于异步发送的非持久化JmsTemplate
*/
@Autowired
@Bean
public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    template.setExplicitQosEnabled(true);
    template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
    return template;
}
/**
* 配置用于同步发送的持久化JmsTemplate
*/
@Autowired
@Bean
public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
    JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
    return template;
}
异步投递如何确认

由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。

这时,可以给异步投递方法接收回调,以确认消息是否发送成功!

import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;

import javax.jms.*;
import java.util.UUID;

public class Jms_TX_Producer {

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    private static final String ACTIVEMQ_QUEUE_NAME = "Async";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        activeMQConnectionFactory.setUseAsyncSend(true);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                //添加回调
                textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu");
                final String  msgId = textMessage.getJMSMessageID();
                activeMQMessageProducer.send(textMessage, new AsyncCallback() {
                    public void onSuccess() {
                        System.out.println("成功发送消息Id:"+msgId);
                    }

                    public void onException(JMSException e) {
                        System.out.println("失败发送消息Id:"+msgId);
                    }
                });
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            activeMQMessageProducer.close();
            session.close();
            connection.close();
        }
    }
}

控制台观察发送消息的信息:

延迟投递和定时投递

官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

四大属性

属性值类型描述
AMQ_SCHEDULED_DELAYlong延迟投递的时间(毫秒)
AMQ_SCHEDULED_PERIODlong重复投递的时间间隔(毫秒)
AMQ_SCHEDULED_REPEATint重复投递次数
AMO_SCHEDULED_CRONStringCron表达式(注:此corn表达式并非Quartz框架中的corn表达式,而是linux中corntab中的表达 式,基本顺序是"分(0-59) 时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ")
除了基本的时间单位外,cron 表达式还支持一些特殊的字符,如:
  • 表示所有可能的值,例如,在第五个字段中表示每个月都执行。
    ? 表示不指定值,例如,?在第六个字段中表示不关心周几。
    - 表示范围,例如,1-5 在第四个字段中表示 1 号到5 号都执行表示列表,例如,1,3,5 在第四个字段中表示 1 号、3 号、5 号.都执行。
    / 表示步长,例如,/5 在第一字段中表示每 5 秒执行一次。
    示例:https://www.ngui.cc/el/2728544.html?action=onClick
    http://tech.zhiding.net/tech/2023/0430/59884.html |
配置
  • 修改activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" ...
	schedulerSupport="true" >
......
</broker>

注意:添加schedulerSupport="true"配置

  • 代码
import org.apache.activemq.*;
import javax.jms.*;
import java.util.UUID;

public class Jms_TX_Producer {

    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";

    private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer messageProducer = session.createProducer(queue);
        long delay =  10*1000;
        long period = 5*1000;
        int repeat = 3 ;
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                // 延迟的时间
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
                // 重复投递的时间间隔
                textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
                // 重复投递的次数
                textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
                // 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
                messageProducer.send(textMessage);
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            messageProducer.close();
            session.close();
            connection.close();
        }
    }
}

使用 JmsTemplate结合 MessagePostProcessor

long delay =  10*1000;
long period = 5*1000;
int repeat = 3 ;
this.jmsTemplate.convertAndSend(this.getDestination(), t, new MessagePostProcessor() {
    @Override
    public Message postProcessMessage(Message message) throws JMSException {
          // 延迟的时间
          message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
          // 重复投递的时间间隔
          message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
          // 重复投递的次数
          message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
          return message;
     }
});

发送成功后,可以登录activemq的webconsole查看消息的属性:
在scheduled面板中,可以看到延时的消息
外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传
注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。
此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息是延时消息,参考下图:

延迟/定时任务删除

通过ActiveMQ提供的ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION,可以将延迟/定时计划移除。

//在消费的时候获取当计划id
String scheduled_id = objectMessage.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
this.jmsTemplate.convertAndSend(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, scheduled_id, 
                                message -> {
      //通过添加之前添加的消息标识                              
      message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
      message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, scheduled_id);
            return message;
    });

JMS 消息的重试机制

官网文档:http://activemq.apache.org/redelivery-policy

消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。

具体哪些情况会引发消息重发

  1. Client用了transactions且再session中调用了rollback
  2. Client用了transactions且再调用commit之前关闭或者没有commit
  3. Client在CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover

有毒消息(Poison ACK)

一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)

属性说明

属性描述
collisionAvoidanceFactor设置防止冲突范围的正负百分比,只有启用useColisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15。
maximumRedeliveries最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为6
maximumRedeliveryDelay最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。
initialRedeliveryDelay初始重发延迟时间,默认1000L
redeliveryDelay重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L
useCollisionAvoidance启用防止冲突功能,默认false
useExponentialBackOff启用指数倍数递增的方式增加延迟时间,默认false
backOfMutiolier重连时间间隔递增倍数,只有值大于1和启用useExponentalBackOff参数时大生效。默认是5

配置重试机制

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
import java.io.IOException;

public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
    private static final String ACTIVEMQ_QUEUE_NAME = "dead01";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        // 修改默认参数,设置消息消费重试3次
        RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
        redeliveryPolicy.setMaximumRedeliveries(3);
        activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {
            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    TextMessage textMessage = (TextMessage) message;
                    try {
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                        //session.commit();
                    }catch (Exception e){
                        e.printStackTrace();
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}

JMS 消息的死信队列

官网文档: http://activemq.apache.org/redelivery-policy

异常消息规避处理的集合,主要处理失败或者过期的消息。

配置

  • 共享死信策略(sharedDeadLetterStrategy)
    不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
<deadLetterStrategy>
	<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
  • 个人死信策略(individualDeadLetterStrategy)
    可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
<destinationPolicy>
    <policyMap>
        <policyEntries>
            <policyEntry queue=">">
                <deadLetterStrategy>
                	<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
                </deadLetterStrategy>
            </policyEntry>
            <policyEntry topic=">" >
                <pendingMessageLimitStrategy>
                	<constantPendingMessageLimitStrategy limit="1000"/>
                </pendingMessageLimitStrategy>
            </policyEntry>
        </policyEntries>
    </policyMap>
</destinationPolicy>

把DeedLetter放入各自的死信通道中,
对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”
对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic.”
比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order’。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。
默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue: 不过开发者也可以指定为Topic。

<policyEntry queue="order">
    <deadLetterStrategy>
    	<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" />
    </deadLetterStrategy>
</policyEntry>

将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。
属性“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。

自动删除过期消息

过期消息是值生产者指定的过期时间,超过这个时间的消息。

有时需要直接删除过期的消息而不需要发送到死队列中,“processExpired”表示是否将过期消息放入死信队列,默认为true:

<policyEntry queue= ">" >
    <deadletterStrategy>
    	<sharedDeadLetterStrategy processExpired= "false" />
    </deadLetterStrategy>
</policyEntry>

存放非持久消息到死信队列中

默认情况下,Activemq不会把非持久的死消息发送到死信队列中。

processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。

非持久性如果你想把非持久的消息发送到死队列中,需要设置属性processNonPersistent=“true"

<policyEntry queue= ">" >
    <deadLetterStrategy>
    	<sharedDeadLetterStrategy processNonPersistent= "true" />
    </deadLetterStrategy>
</policyEntry>

消息的幂等性

网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。

如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。

如果上面两种情况还不行,准备-个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。

扩展

broker 多实例

  • broker是什么
    相当于一个ActiveMQ服务器实例。
使用配置文件

启动broker时指定配置文件,可以帮助我们在一台服务器上启动多个broker。实际工作中一般一台服务器只启动一个broker。

使用嵌入式方式

用ActiveMQ Broker作为独立的消息服务器来构建Java应用。ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。

  • 依赖引用
<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.10.1</version>
</dependency>
  • 嵌入式broker的启动类
import org.apache.activemq.broker.BrokerService;

public class EmbedBroker {

    public static void main(String[] args) throws Exception {
        //ActiveMQ也支持在vm中通信基于嵌入的broker
        BrokerService brokerService = new BrokerService();
        brokerService.setPopulateJMSXUserID(true);
        brokerService.addConnector("tcp://127.0.0.1:61616");
        brokerService.start();
   }
}

ActiveMQ的传输协议

ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。

activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html

<transportConnectors>
    <transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
    <transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600"/>
</transportConnectors>

在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如

描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;

描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;

唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire

支持的传输协议

除了tcp和nio协议,其他的了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。

协议描述
TCP默认的协议,性能相对可以
NIO基于代P协议之上的,进行了扩展和优化,具有更好的扩展性
UDP性能比TCP更好,但是不具有可靠性
SSL安全链接
HTTP(S)基于HTTP或者HTTPS
VMVM本身不是协议,当客户端和代理在同一个Java虚拟机(VM)中运行时,他们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式
TCP协议
  • Transmission Control Protocol(TCP)是默认的。TCP的Client监听端口61616
  • 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。
  • TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
    关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference

TCP传输的的优点:

TCP协议传输可靠性高,稳定性强

高效率:字节流方式传递,效率很高

有效性、可用性:应用广泛,支持任何平台

NIO协议
  • New I/O API Protocol(NIO)
  • NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
  • 适合使用NIO协议的场景:
    • 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
    • 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
  • NIO连接的URI形式:nio://hostname:port?key=value&key=value
    关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html
AMQP协议
STOMP协议
MQTT协议
NIO案例

ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。

所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型。

  • 修改配置文件activemq.xml
    • 修改配置文件activemq.xml在 节点下添加如下内容:
    • 修改完成后重启activemq: service activemq restart
    • 查看管理后台,可以看到页面多了nio
  • 生产者
package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;

public class Jms_TX_Producer {
	//仅此处不同
    private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";

    private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";

    public static void main(String[] args) throws JMSException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageProducer producer = session.createProducer(queue);
        try {
            for (int i = 0; i < 3; i++) {
                TextMessage textMessage = session.createTextMessage("tx msg--" + i);
                producer.send(textMessage);
            }
            System.out.println("消息发送完成");
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            //8.关闭资源
            producer.close();
            session.close();
            connection.close();
        }
    }
}
  • 消费者
package com.activemq.demo;

import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;

public class Jms_TX_Consumer {
   	//仅此处不同
    private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";
    private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";

    public static void main(String[] args) throws JMSException, IOException {
        ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
        Connection connection = activeMQConnectionFactory.createConnection();
        connection.start();
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
        Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
        MessageConsumer messageConsumer = session.createConsumer(queue);
        messageConsumer.setMessageListener(new MessageListener() {

            public void onMessage(Message message) {
                if (message instanceof TextMessage) {
                    try {
                        TextMessage textMessage = (TextMessage) message;
                        System.out.println("***消费者接收到的消息:   " + textMessage.getText());
                    } catch (Exception e) {
                        System.out.println("出现异常,消费失败,放弃消费");
                    }
                }
            }
        });
        System.in.read();
        messageConsumer.close();
        session.close();
        connection.close();
    }
}
NIO增强

URI格式头以”nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持openwire协议。如何让其他协议传输底层也使用NIO网络IO模型呢?

  • 使用auto关键字
  • 使用“+”符号来为端口设置多种特性
  • 如果我们既需要某一个端口支持NIo网络io模型,又需要它支持多个协议

修改配置文件activemq.xml

<transportConnectors>
      <transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&amp;wireFormat.maxFrameSize=104857600&amp;org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&amp;org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
</transportConnectors>

auto : 针对所有的协议,他会识别我们是什么协议。

nio:使用NIO网络IO模型

修改配置文件后重启activemq。

代码

//使用nio模型的tcp协议生产者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException {
         ......
    }
}

//使用nio模型的tcp协议消费者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException, IOException {
       ......
    }
}

//使用nio模型的nio协议生产者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Producer {
    private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException {
       ......
    }
}

//使用nio模型的nio协议消费者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Consumer {
    private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
    private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";

    public static void main(String[] args) throws JMSException, IOException {
        ......
    }
}

ActiveMQ多节点集群

从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2071972.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux自旋锁和读写锁

在前面的文章中我们已经介绍了有关互斥锁的概念与使用&#xff0c;本篇将开始介绍在 Linux 中的自旋锁和读写锁。这三种锁分别用于在不同的应用场景之中&#xff0c;其中互斥锁最为常用&#xff0c;但是我们需要了解一下其他的锁。 对于自旋锁和读写锁都介绍了其原理以及接口使…

【信创】麒麟KylinOS V10打开root登录桌面权限

原文链接&#xff1a;【信创】麒麟KylinOS V10打开root登录桌面权限 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇关于在麒麟KYLINOS V10上如何打开root用户登录桌面的文章。在大多数Linux发行版中&#xff0c;出于安全考虑&#xff0c;root用户默认情况下是禁止直…

KRTS网络模块:TCP服务端、客户端实例

KRTS网络模块:TCP服务端、客户端实例 目录 KRTS网络模块:TCP服务端、客户端实例TCP简介KRST服务端简介核心特性界面设计核心代码 KRTS客户端简介核心特性界面设置核心代码 运行实例 Socket模块基于Packet模块&#xff0c;实时提供更高的协议&#xff0c;如RAW-IP、TCP 和 UDP(参…

【求助帖】用PyTorch搭建MLP网络时遇到奇怪的问题

求助&#xff1a;我在测试自己搭建的通用MLP网络时&#xff0c;发现它与等价的参数写死的MLP网络相比效果奇差无比&#xff0c;不知道是哪里出了问题&#xff0c;请大佬们帮忙看下。 我写的通用MLP网络&#xff1a; class MLP(nn.Module):def __init__(self, feature_num, cl…

3、Unity【基础】Resources资源场景动态加载

文章目录 一、Resources资源动态加载1、Unity中特殊文件夹1、工程路径获取2、Resources资源文件夹3、StreamingAssets流动资源文件夹4、persistentDataPath持久数据文件夹5、Plugins插件文件夹6、Editor编辑器文件夹7、默认资源文件夹StandardAssets 2、Resources同步加载1、Re…

Auto-Editor

文章目录 一、关于 Auto-Editor安装系统兼容性版权 二、切割自动切割的方法看看自动编辑器删掉了什么 三、导出到编辑器命名时间线按 Clip 分割 四、手工编辑五、更多的选择 一、关于 Auto-Editor github : https://github.com/WyattBlue/auto-editor (2.8k star – 2408)主页…

ubuntu 20.04系统安装pytorch

1.1 安装gcc 安装cuda之前&#xff0c;首先应该安装gcc&#xff0c;安装cuda需要用到gcc&#xff0c;否则报错。可以先使用下方指令在终端查看是否已经安装gcc。 gcc --version 如果终端打印如下则说明已经安装。 如果显示“找不到命令 “gcc”......”使用下方指令安装 su…

阅读笔记5:董超底层视觉之美|时空的交错与融合——论视频超分辨率

原文链接&#xff1a;https://mp.weixin.qq.com/s/pmJ56Y0-dbIlYbHbJyrfAA 1. 多帧超分和时空超分 视频超分的本质就是多帧超分&#xff0c;多帧超分的历史远早于视频超分。 在早期&#xff0c;Super Resolution专指多帧超分&#xff0c;因为只有多帧超分才能补充进入真实的信…

Golang | Leetcode Golang题解之第368题最大整除子集

题目&#xff1a; 题解&#xff1a; func largestDivisibleSubset(nums []int) (res []int) {sort.Ints(nums)// 第 1 步&#xff1a;动态规划找出最大子集的个数、最大子集中的最大整数n : len(nums)dp : make([]int, n)for i : range dp {dp[i] 1}maxSize, maxVal : 1, 1fo…

对讲模块升级的重要性-OTA空中升级与串口升级

在现代通信设备的设计中&#xff0c;灵活的升级能力已成为评估模块性能的重要标准。无论是在开发过程中&#xff0c;还是在产品的生命周期内&#xff0c;支持OTA和串口升级的模块可以极大地提高设备的可维护性和适应性。 SA618F30&#xff0c;作为一款高性价比、高集成度的大功…

SSRF 302跳转攻击redis写入ssh公钥实现远程登录

目录 SSRF漏洞 SSRF攻击Redis 302跳转 漏洞复现&#xff1a; index.html: index.php: 攻击步骤&#xff1a; 1.生成ssh公钥数据&#xff1a; 2.用SSH公钥数据伪造Redis数据&#xff1a; 3.在自己的服务器上写302跳转&#xff1a; 4.最后尝试在.ssh目录下登录&#…

Golang | Leetcode Golang题解之第371题两整数之和

题目&#xff1a; 题解&#xff1a; func getSum(a, b int) int {for b ! 0 {carry : uint(a&b) << 1a ^ bb int(carry)}return a }

MySQL主从复制之GTID模式

目录 1 MySQL 主从复制 GTID 模式介绍 2 传统复制模式与GTID复制模式的区别 3 GTID模式核心参数 4 GTID 实现自动复制原理 4.1 GTID基本概念 4.2 GTID复制流程 5 GTID 实现自动定位 5.1 配置 my.cnf 5.2 配置 SLAVE 实现自动定位 5.3 测试 6 GTID 模式 故障转移的方法流程 6.1…

如何使用ssm实现宠物领养系统+vue

TOC ssm103宠物领养系统vue 课题背景 在当今的社会&#xff0c;可以说是信息技术的发展时代&#xff0c;在社会的方方面面无不涉及到各种信息的处理。信息是人们对客观世界的具体描述&#xff0c;是人们进行交流与联系的重要途径。人类社会就处在一个对信息进行有效合理的加…

mysql数据库----简单认识库的操作

目录 1.区分概念 2.什么是数据库 3.数据库的创建和销毁 4.数据库编码初识 5.查询系统默认编码配置 6.两个查询编码表的指令 7.创建指定编码的数据库 8.不同编码的区别 第一个编码方式&#xff1a; 第二个编码方式&#xff1a; 查询结果说明&#xff1a; 9.数据库的增…

QT Quick QML 网络助手——TCP客户端

GitHub 源码: QmlLearningPro &#xff0c;选择子工程 Nettools.pro QML 其它文章请点击这里: QT QUICK QML 学习笔记 ● 运行效果&#xff1a; 左侧为常用的网络调试工具&#xff0c;右侧为本项目 UI 效果&#xff0c;前端使用 QML &#xff0c;后端使用C &#xff…

ArkTs之:数据懒加载——LazyForEach的用法

官方描述 LazyForEach从提供的数据源中按需迭代数据&#xff0c;并在每次迭代过程中创建相应的组件。当在滚动容器中使用了LazyForEach&#xff0c;框架会根据滚动容器可视区域按需创建组件&#xff0c;当组件滑出可视区域外时&#xff0c;框架会进行组件销毁回收以降低内存占…

我在某日重新下载了idea

# 1 Maven设置 2 字体样式,字体颜色 3 插件 1,fitten code和通义灵码 2,one dark theme主题 3,mybatisX 4,Rainbow Brackets 5,Key Promoter X 设置 自动导入包

Ps:首选项 - 常规

Ps菜单&#xff1a;编辑/首选项 Edit/Preferences 快捷键&#xff1a;Ctrl K Photoshop 首选项中的“常规” General选项卡主要用于调整 Photoshop 的整体工作行为和用户体验。这些设置让用户可以根据个人习惯和工作流程定制软件的响应方式和界面布局&#xff0c;从而提高工作…

下载的word中的mathtype公式双击无法打开编辑器

原因分析&#xff1a; 该word中的此公式不是通过word内置的mathtype插入公式的&#xff0c;而是从mathtype编辑器中复制粘贴到word中的。 后者的方式当被其他人下载接收后&#xff0c;无法修改此公式&#xff0c;而且该公式也不能被其他人复制&#xff0c;会报错如下&#xff…