ActiveMQ基础学习简单记录

news2025/1/12 1:49:08

ActiveMQ基础学习简单记录

  • JMS是什么
  • JMS消息模型
    • JMS Message Type
  • Activemq
    • 安装
    • 概念强化
    • JMS的跨平台性
      • JMS通用接口
      • JMS希望达到的目标是什么
    • Activemq发送消息的三种模式
        • 至少一次
        • 至多一次
        • 精确一次
        • 可重复确认模式
        • 小结
    • Activemq支持众多协议
    • Activemq支持的定时消息,延迟消息,优先级消息
    • 消息选择器
    • Spring整合


本文为Activemq简单入门文章


JMS是什么

JMS即Java Message Service,是JavaEE的消息服务接口。JMS主要有两个版本:1.1和2.0。2.0和1.1相比,主要是简化了收发消息的代码。

JMS为Java程序提供了一种通用方法, 用于创建、发送、接收和读取企业消息系统中的消息。

JMS是一组接口定义,如果我们要使用JMS,还需要选择一个具体的JMS产品。常用的JMS服务器有开源的ActiveMQ,商业服务器如WebLogic、WebSphere等也内置了JMS支持。

在这里插入图片描述
使用消息服务,而不是直接调用对方的API,它的好处是:

  • 双方各自无需知晓对方的存在,消息可以异步处理,因为消息服务器会在Consumer离线的时候自动缓存消息;
  • 如果Producer发送的消息频率高于Consumer的处理能力,消息可以积压在消息服务器,不至于压垮Consumer;
  • 通过一个消息服务器,可以连接多个Producer和多个Consumer。

JMS消息模型

JMS提供了两种消息模型:

  • PTP(点对点消息模型)
    在这里插入图片描述

  • Pub/Sub(发布订阅消息模式)
    在这里插入图片描述

  • Queue是一种一对一的通道

    • 如果Consumer离线无法处理消息时,Queue会把消息存起来,等Consumer再次连接的时候发给它。
    • 设定了持久化机制的Queue不会丢失消息。
    • 如果有多个Consumer接入同一个Queue,那么它们等效于以集群方式处理消息
      • 例如,发送方发送的消息是A,B,C,D,E,F,两个Consumer可能分别收到A,C,E和B,D,F,即每个消息只会交给其中一个Consumer处理。
  • Topic则是一种一对多通道。

    • 一个Producer发出的消息,会被多个Consumer同时收到,即每个Consumer都会收到一份完整的消息流。
    • 如果某个Consumer暂时离线,过一段时间后又上线了,那么在它离线期间产生的消息还能不能收到呢?
      • 这取决于消息服务器对Topic类型消息的持久化机制。
        • 如果消息服务器不存储Topic消息,那么离线的Consumer会丢失部分离线时期的消息,如果消息服务器存储了Topic消息,那么离线的Consumer可以收到自上次离线时刻开始后产生的所有消息。
        • JMS规范通过Consumer指定一个持久化订阅可以在上线后收取所有离线期间的消息,如果指定的是非持久化订阅,那么离线期间的消息会全部丢失。

注意:

  • 如果一个Topic的消息全部都持久化了,并且只有一个Consumer,那么它和Queue其实是一样的。实际上,很多消息服务器内部都只有Topic类型的消息架构,Queue可以通过Topic“模拟”出来。
  • 无论是Queue还是Topic,对Producer没有什么要求。多个Producer也可以写入同一个Queue或者Topic,此时消息服务器内部会自动排序确保消息总是有序的。

上面是消息服务的基本模型。当某个到达消息服务器时,Producer和Consumer通常是通过TCP连接消息服务器,在编写JMS程序时,又会遇到ConnectionFactory、Connection、Session等概念,其实这和JDBC连接是类似的:

  • ConnectionFactory:代表一个到消息服务器的连接池,类似JDBC的DataSource;
  • Connection:代表一个到消息服务器的连接,类似JDBC的Connection;
  • Session:代表一个经过认证后的连接会话;
  • Message:代表一个消息对象。

在JMS 1.1中,发送消息的代码示例如下:

try {
    Connection connection = null;
    try {
        // 创建连接:
        connection = connectionFactory.createConnection();
        // 创建会话:
        Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
        // 创建一个Producer并关联到某个Queue:
        MessageProducer messageProducer = session.createProducer(queue);
        // 创建一个文本消息:
        TextMessage textMessage = session.createTextMessage(text);
        // 发送消息:
        messageProducer.send(textMessage);
    } finally {
        // 关闭连接:
        if (connection != null) {
            connection.close();
        }
    }
} catch (JMSException ex) {
    // 处理JMS异常
}

JMS 2.0改进了一些API接口,发送消息变得更简单:

try (JMSContext context = connectionFactory.createContext()) {
    context.createProducer().send(queue, text);
}

注意:

  • JMSContext实现了AutoCloseable接口,可以使用try(resource)语法,代码更简单。

JMS Message Type

JMS的消息类型支持以下几种:

消息类型适用场景通信机制消息传递模型
TextMessage传递文本消息 String XML JSON同步或异步点对点或发布/订阅
MapMessage传递带有多个属性的消息同步或异步点对点或发布/订阅
BytesMessage传递二进制数据同步或异步点对点或发布/订阅
StreamMessage传递流式数据,适用于需要分块读取数据的场景同步或异步点对点或发布/订阅
ObjectMessage传递Java对象同步或异步点对点或发布/订阅
Message用于接收任意类型的消息,适用于需要动态解析消息类型的场景同步或异步点对点或发布/订阅

JMS支持的消息头字段(Message header fields)有:

在这里插入图片描述


Activemq

Activemq简介:

  • ActiveMQ是一个开源的消息中间件,实现了JMS规范。它提供了很多高级特性,比如消息分组、消息持久化、事务、异步发送等。ActiveMQ支持多种协议和编程语言,可以很方便地与Java、.NET、C++、Ruby等语言进行集成。
  • 在ActiveMQ中,消息由生产者发送到队列或主题,消费者从队列或主题中接收消息。ActiveMQ还提供了许多扩展功能,如消息分组、延迟发送、异步发送等。同时,ActiveMQ也支持多种协议,如STOMP、AMQP、OpenWire等,可以很方便地与其他系统进行集成。
  • 除了基本的JMS功能外,ActiveMQ还提供了许多高级功能,如消息选择器、消息传递规则、消息转发等。此外,ActiveMQ还支持多种消息类型,包括文本、对象、字节流等。
  • ActiveMQ可以作为单个Java应用程序的嵌入式消息中间件,也可以作为分布式系统的消息中间件。它支持多种部署模式,如集群模式、Master-Slave模式等。
  • ActiveMQ的插件机制允许通过插件扩展其功能,例如实现消息过滤、路由和安全认证。插件是以Java类的形式存在的,可以通过配置文件或编程的方式来加载它们。ActiveMQ提供了很多可用的插件,比如JAAS身份认证插件、STOMP支持插件、AMQP支持插件等。此外,ActiveMQ还允许自定义插件,以满足特定的需求。

插件:

  • Camel 插件:将 ActiveMQ 与 Apache Camel 集成,以支持各种数据转换和消息路由模式。
  • LevelDB 存储插件:使用 LevelDB 作为消息存储的替代方案。
  • MQTT 插件:支持使用 MQTT 协议进行消息传递。
  • STOMP 插件:支持使用 STOMP 协议进行消息传递。
  • Virtual Topics 插件:提供虚拟主题,以简化发布订阅模式的实现。
  • WebSocket 插件:支持使用 WebSocket 协议进行消息传递。
  • JMX 监控插件:提供 JMX 监控功能,以监视 ActiveMQ 运行时的性能和状态信息。

安装

ActiveMQ Classic或者ActiveMQ Artemis的关系:

  • ActiveMQ Classic原来就叫ActiveMQ,是Apache开发的基于JMS 1.1的消息服务器,目前稳定版本号是5.x
  • ActiveMQ Artemis是由RedHat捐赠的HornetQ服务器代码的基础上开发的,目前稳定版本号是2.x。
    • 和ActiveMQ Classic相比,Artemis版的代码与Classic完全不同,并且,它支持JMS 2.0,使用基于Netty的异步IO,大大提升了性能。
    • 此外,Artemis不仅提供了JMS接口,它还提供了AMQP接口,STOMP接口和物联网使用的MQTT接口。选择Artemis,相当于一鱼四吃。

所以,我们这里直接选择ActiveMQ Artemis。从官网下载最新的2.x版本,解压后设置环境变量ARTEMIS_HOME,指向Artemis根目录,例如C:\Apps\artemis,然后,把ARTEMIS_HOME/bin加入PATH环境变量:

  • Windows下添加%ARTEMIS_HOME%\bin到Path路径;
  • Mac和Linux下添加$ARTEMIS_HOME/bin到PATH路径。

Artemis有个很好的设计,就是它把程序和数据完全分离了。我们解压后的ARTEMIS_HOME目录是程序目录,要启动一个Artemis服务,还需要创建一个数据目录。我们把数据目录直接设定在项目spring-integration-jmsjms-data目录下。执行命令artemis create jms-data

在这里插入图片描述

在创建过程中,会要求输入连接用户和口令,这里我们设定admin和password,以及是否允许匿名访问(这里选择N)。

此数据目录jms-data不仅包含消息数据、日志,还自动创建了两个启动服务的命令bin/artemisbin/artemis-service,前者在前台启动运行,按Ctrl+C结束,后者会一直在后台运行。

我们把目录切换到jms-data/bin,直接运行artemis run即可启动Artemis服务:

在这里插入图片描述
启动成功后,Artemis提示可以通过URL: http://localhost:8161/console访问管理后台。注意不要关闭命令行窗口

如果Artemis启动时显示警告:AMQ222212: Disk Full! … Clients will report blocked.这是因为磁盘空间不够,可以在etc/broker.xml配置中找到<max-disk-usage>并改为99。

在这里插入图片描述


概念强化

上面简单介绍了JMS的基本概念,本节结合Activemq来具体说明JMS的使用。

JMS中的核心概念:

  • JMS Client:用来发送和接收消息的Java程序
  • Non-JMS client: 不使用 JMS API 的客户端应用程序。这意味着这些应用程序不使用 JMS 提供的接口和协议与消息中间件进行交互。

注意:

  • Non-JMS client 可能使用其他协议或方式与消息中间件进行通信,例如: 直接使用底层的消息中间件提供的原生 API、使用自定义的消息格式或协议进行通信等。
  • 这就好像你不使用java提供的JBDC统一驱动接口进行调用,而直接调用各个第三方厂商提供的驱动实现类一样
  • 需要注意的是,使用 Non-JMS client 进行消息通信可能会导致与特定消息中间件的耦合性增加。因为它们直接依赖于消息中间件提供的接口和协议,所以在切换或迁移到其他消息中间件时可能需要进行修改和适配
  • Non-JMS client 的存在也提供了一种灵活性和自由度,可以根据特定需求选择更适合的通信方式和协议。它们可能适用于特定的应用场景或需要与非 JMS 兼容的系统进行集成的情况。
  • Messages:JMS Client之间进行通讯的消息
  • JMS Provider (JMS提供者):实现了JMS规范的消息系统,提供了消息服务和管理功能

类比: mysql-connector-java

  • Administered Objects(被管理的对象):是预先配置的JMS对象,通常由管理员或开发者创建 --> ConnectionFactory和Destination(可以理解为队列Queue)

注意:

  • 在JMS(Java Message Service)中,Administered Objects(管理对象)是由JMS提供者(如消息中间件)管理和提供的一些资源,包括队列(Queue)、主题(Topic)、连接工厂(ConnectionFactory)等。这些对象提供了与消息传递相关的基础设施,并允许应用程序与消息中间件进行交互。
  • 通过使用 Administered Objects,应用程序可以更方便地与消息中间件进行交互,而无需了解底层的通信协议和细节。应用程序可以通过配置或通过编程的方式访问和使用这些对象,以满足不同的消息传递需求

引入相关依赖:

        <dependency>
            <groupId>org.apache.activemq</groupId>
            <artifactId>artemis-jms-client</artifactId>
            <version>2.11.0</version>
        </dependency>
         <dependency>
            <groupId>javax.json</groupId>
            <artifactId>javax.json-api</artifactId>
            <version>1.1.4</version>
        </dependency>
        <dependency>
            <groupId>org.glassfish</groupId>
            <artifactId>javax.json</artifactId>
            <version>1.1.4</version>
        </dependency>       

低版本的Artemis的Client依赖包中,没有下面这个问题,无需引入。
高版本的Artemis的Client依赖包中,虽然依赖了jakarta.jms:jakarta.jms-api,但是由于artemis-jakarta-client包依赖的jakarta.jms-api版本比较老,内部包名还是旧版本的javax.jms开头:
在这里插入图片描述
因此需要手动引入高版本jakarta.jms-api,以maven依赖的就近查找原则,覆盖旧版本:
在这里插入图片描述

上面已经介绍过了JMS中支持的消息类型,下面我们来看一下如何创建并发送不同类型的消息:

  • 发布一条不含有任何消息内容的异常文本消息 – 不含有有效负载的简单通知,仅包含JSM消息头和消息属性,用于事件通知
    public Message createMessage(Session session) throws JMSException {
        Message m = session.createMessage();
        m.setStringProperty("exception", "java.lang.NosuchMethodException");
        return m;
    }
  • 发布一条仅包含文本类型的消息以及XML和JSON
    public Message createMessage(Session session) throws JMSException {
        TextMessage tm = session.createTextMessage();
        tm.setText("{ \"name\": \"John\", \"age\": 30}");
        return tm;
    }
  • 发布一组key/value作为消息内容
    public Message createMessage(Session session) throws JMSException {
        MapMessage mm = session.createMapMessage();
        mm.setInt("id", 2017);
        mm.setString("name", "zhangsan");
        mm.setString("password", "123456");
        return mm;
    }
  • 发布一组二进制数组作为消息内容 — 可以传输任意二进制类型的数据,如: 图像和音频
        MessageProducer producer = session.createProducer(queue);
        BytesMessage message = session.createBytesMessage();
        // 设置消息内容
        byte[] payload = "Hello, JMS!".getBytes();
        message.writeBytes(payload);
        // 发送消息
        producer.send(message);
  • 发布可序列化的java对象作为消息内容
    public Message createMessage(Session session) throws JMSException {
        ObjectMessage om = session.createObjectMessage();
        User user = new User();
        user.setID(2020001);
        user.setName("zhangsan");
        om.setObject(user);
        return om;
    }
  • 传递流式数据,用于处理连续数据流的消息类型 --> 如: 用于传输和处理来自传感器等源的实时数据

生产者:

import jakarta.jms.*;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

public class Producer {
    public static void main(String[] args) {
        try {
            // 创建连接工厂和连接
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616","root","123456");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 创建会话和队列
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test");

            // 创建生产者并发送多个消息
            MessageProducer producer = session.createProducer(destination);

            for (int i = 0; i < 5; i++) {
                // 创建消息并设置消息内容
                StreamMessage message = session.createStreamMessage();
                message.writeInt(i);
                message.writeString("Message " + i);

                // 发送消息
                producer.send(message);
                System.out.println("Sent message: " + message);
            }

            // 关闭连接和会话
            producer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者:

import jakarta.jms.*;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

public class Consumer {
    public static void main(String[] args) {
        try {
            // 创建连接工厂和连接
 ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616","root","123456");
            Connection connection = connectionFactory.createConnection();
            connection.start();

            // 创建会话和队列
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue("test");

            // 创建消费者并接收消息
            MessageConsumer consumer = session.createConsumer(destination);

            while (true) {
                Message message = consumer.receive();
                if (message instanceof StreamMessage) {
                    StreamMessage streamMessage = (StreamMessage) message;
                    int intValue = streamMessage.readInt();
                    String stringValue = streamMessage.readString();
                    System.out.println("Received message: " + intValue + ", " + stringValue);
                }else {
                    break;
                }
            }

            // 关闭连接和会话
            consumer.close();
            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

JMS的跨平台性

在这里插入图片描述
JMS的跨平台实现与JDBC类似,核心思路是如何定位到第三方厂商提供的服务实现类。

就像JDBC中,我们需要通过JDK SPI机制获取Drive接口实现类一样,JMS同样需要采用一种服务发现机制,获取第三方提供的:

  • ConnectionFactory: 客户端用来创建同JMS提供者之间的连接的对象
  • Destination :客户端用来指明消息被发送的目的地以及客户端接收消息的来源 --> Queue

被管理的对象一般被管理员放在JNDI名字空间中,通常在JMS客户端应用的文档中说明它所需要的JMS被管理对象,以及应以何种JNDI名字来提供这些JMS被管理对象。


JMS通用接口

在这里插入图片描述
JMS 1.1中各个接口之间的关系如下图所示:

在这里插入图片描述
JMS 2.0中改进了一些API接口,此时各个接口之间的关系如下所示:

在这里插入图片描述
在JMS(Java消息服务)的简化API中,单个JMSContext对象包含了经典API中由两个独立对象提供的行为,即ConnectionSession。尽管该规范中提到了JMSContext具有底层的“连接”和“会话”,但简化API并不使用ConnectionSession接口。

在简化API中,JMSContext作为一个统一的入口点,用于创建JMS对象、发送和接收消息以及管理事务。它以更简洁和方便的方式封装了连接和会话的行为。

通过使用JMSContext,可以执行诸如创建生产者或消费者、发送和接收消息、提交或回滚事务等操作。JMSContext在后台管理底层的连接和会话,提供了一个简化和更直观的编程模型。

简化API的目标是使JMS的使用更加简单和直观,减少开发人员直接使用底层连接和会话对象的需求。相反,他们可以依赖于JMSContext来处理这些细节,集中精力进行核心消息操作。

总之,JMSContext是JMS简化API中的主要对象,它整合了连接和会话的功能,并提供了更简洁、易用的编程模型,使JMS的使用更加便捷。

下面列举一个demo示例:

生产者:

package org.example;

import javax.jms.*;

public class JMSProducer {

    public static void main(String[] args) {
        // 设置 ActiveMQ 的连接信息
        String brokerUrl = "tcp://localhost:61616";
        String username = "root";
        String password = "123456";
        String queueName = "jms/queue/mail";

        // 创建连接工厂
        ConnectionFactory connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(brokerUrl);

        try (JMSContext context = connectionFactory.createContext(username, password)) {
            // 创建目标队列
            Destination destination = context.createQueue(queueName);

            // 创建消息生产者
            javax.jms.JMSProducer producer = context.createProducer();

            // 创建文本消息
            TextMessage message = context.createTextMessage();
            message.setText("Hello, JMS!");

            // 发送消息
            producer.send(destination, message);

            System.out.println("Message sent successfully.");
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者:

package org.example;

import javax.jms.*;

public class JMSConsumer {

    public static void main(String[] args) {
        // 设置 ActiveMQ 的连接信息
        String brokerUrl = "tcp://localhost:61616";
        String username = "root";
        String password = "123456";
        String queueName = "jms/queue/mail";

        // 创建连接工厂
        ConnectionFactory connectionFactory = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory(brokerUrl);

        try (JMSContext context = connectionFactory.createContext(username, password)) {
            // 创建目标队列
            Destination destination = context.createQueue(queueName);

            // 创建消息消费者
            javax.jms.JMSConsumer consumer = context.createConsumer(destination);

            // 接收消息
            Message message = consumer.receive();

            if (message instanceof TextMessage) {
                TextMessage textMessage = (TextMessage) message;
                System.out.println("Received message: " + textMessage.getText());
            } else {
                System.out.println("Received message of unsupported type.");
            }
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}


JMS希望达到的目标是什么

目标1:JMS提供一种标准的、平台无关的方法,使分布式应用程序之间可以可靠地交换消息。

  • JMS允许应用程序发送和接收消息,这些消息可以是文本消息、字节消息或对象消息,可以在同步或异步模式下进行发送和接收。
  • JMS 还提供了一些高级特性,如消息持久化、应答机制、事务支持和消息过滤,以满足不同应用程序的需求。

目标2:实现松耦合的消息传递机制,使得分布式应用程序能够通过消息的方式进行通信,而不需要知道对方的具体实现。

  • 这种松耦合的通信机制使得分布式应用程序能够更加灵活、可扩展、可维护,同时也能提高应用程序的可靠性和可用性。

Activemq发送消息的三种模式

至少一次

At least once 模式确保消息至少被传递一次,但是可能会被传递多次

  • 在这种模式下,消息消费者在接收到消息后必须向ActiveMQ发送一个确认消息,告知ActiveMQ消息已经被正确接收并处理。
  • 如果消费者没有发送确认消息,则ActiveMQ会认为消息未被正确处理,尝试重新传递消息,直到收到确认消息为止。
  • 如果消费者发送的ack丢失,会导致Broker重新发送消息。
  • 该模式下需要注意消息的幂等性

以下是使用 ActiveMQ 实现 “at least once” 语义的消息消费者代码示例:

package org.example;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;

public class JMSConsumer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616";
        String username = "your-username";
        String password = "your-password";
        String queueName = "jms/queue/mail";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try (Connection connection = connectionFactory.createConnection(username, password)) {
            connection.start();
            //CLIENT_ACKNOWLEDGE:表示消费者receive消息后必须手动的调用acknowledge()方法进行签收
            Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            String text = textMessage.getText();
                            System.out.println("Received message: " + text);
                            // 执行具体的业务逻辑

                            // 手动确认消息已被处理
                            message.acknowledge();
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 暂停主线程,等待消息到达和处理
            Thread.sleep(30000);

            session.close();
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

至多一次

At most once模式确保消息最多被传递一次

  • 在这种模式下,消息消费者在接收到消息后不需要发送确认消息,ActiveMQ会假设消息已经被正确处理,不会尝试重新传递消息

以下是一个使用 ActiveMQ 的消息消费者代码示例,展示了 “at most once” 的行为:

package org.example;

import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;

import javax.jms.*;

public class JMSConsumer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616";
        String username = "your-username";
        String password = "your-password";
        String queueName = "jms/queue/mail";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try (Connection connection = connectionFactory.createConnection(username, password)) {
            connection.start();
            //AUTO_ACKNOWLEDGE:表示在消费者receive消息的时候自动的签收    
            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
            Destination destination = session.createQueue(queueName);
            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            String text = textMessage.getText();
                            System.out.println("Received message: " + text);
                            // 执行具体的业务逻辑
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 暂停主线程,等待消息到达和处理
            Thread.sleep(30000);

            session.close();
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

精确一次

Exactly once模式确保消息只被传递一次

  • 要实现消息的精确一次发送(Exactly Once),可以使用 ActiveMQ 提供的事务机制来确保消息的可靠性传递。

以下是使用 ActiveMQ 实现精确一次发送语义的消息生产者和消费者代码示例:

生产者:

public class JMSProducer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616";
        String username = "your-username";
        String password = "your-password";
        String queueName = "jms/queue/mail";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try (Connection connection = connectionFactory.createConnection(username, password)) {
            connection.start();

            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Destination destination = session.createQueue(queueName);
            MessageProducer producer = session.createProducer(destination);

            // 创建并发送消息
            TextMessage message = session.createTextMessage("Hello, ActiveMQ!");
            producer.send(message);

            // 提交事务
            session.commit();

            session.close();
            connection.close();
        } catch (JMSException e) {
            e.printStackTrace();
        }
    }
}

消费者:

public class JMSConsumer {

    public static void main(String[] args) {
        String brokerUrl = "tcp://localhost:61616";
        String username = "your-username";
        String password = "your-password";
        String queueName = "jms/queue/mail";

        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(brokerUrl);

        try (Connection connection = connectionFactory.createConnection(username, password)) {
            connection.start();

            Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
            Destination destination = session.createQueue(queueName);
            MessageConsumer consumer = session.createConsumer(destination);

            consumer.setMessageListener(new MessageListener() {
                public void onMessage(Message message) {
                    try {
                        if (message instanceof TextMessage) {
                            TextMessage textMessage = (TextMessage) message;
                            String text = textMessage.getText();
                            System.out.println("Received message: " + text);

                            // 执行具体的业务逻辑

                            // 手动提交事务
                            session.commit();
                        }
                    } catch (JMSException e) {
                        e.printStackTrace();
                    }
                }
            });

            // 暂停主线程,等待消息到达和处理
            Thread.sleep(30000);

            session.close();
            connection.close();
        } catch (JMSException | InterruptedException e) {
            e.printStackTrace();
        }
    }
}

在这个代码示例中,我们创建了一个消息生产者和一个消息消费者。消息生产者使用 Session.SESSION_TRANSACTED 模式创建了会话,并在发送消息后提交了事务。消息消费者也使用 Session.SESSION_TRANSACTED 模式创建了会话,并在处理消息后手动提交了事务。

通过使用事务机制,可以确保消息在发送和接收过程中的可靠性。如果消息发送或处理过程中发生异常,事务会回滚,消息不会被确认,从而实现了消息的精确一次发送语义(Exactly Once)。

ActiveMQ的事务机制的底层原理涉及到消息的持久化和日志的记录。

  • 当使用事务提交时,ActiveMQ会将事务中的消息写入持久化存储,通常是磁盘上的数据库或文件系统。这样可以确保在发生故障或断电等情况下,消息的持久性得到保证,不会丢失。

  • 同时,ActiveMQ会将事务的操作记录在事务日志中。事务日志记录了所有发送、接收和确认消息的操作,以及事务的状态信息。这样可以在系统恢复时,根据事务日志的内容来恢复之前未完成的事务,并确保事务的一致性。

  • 事务提交的过程可以简述为以下几个步骤:

    1. 在事务提交时,将事务中的消息写入持久化存储。
    2. 将事务的操作记录写入事务日志。
    3. 标记事务为已提交。
    4. 释放事务相关资源。
  • 在事务回滚的情况下,会根据事务日志中的记录进行回滚操作,包括将持久化存储中的消息删除或标记为未发送状态,并将事务标记为已回滚。

  • 通过持久化存储和事务日志的机制,ActiveMQ能够提供消息的可靠性传递和事务的原子性,确保消息在发送和接收过程中的可靠性和一致性。这样可以保证消息在分布式系统中的可靠性传递,并提供消息的精确一次发送语义(Exactly
    Once)。


可重复确认模式

DUPS_OK_ACKNOWLEDGE(重复确认模式)适用于消息重复消费不会造成严重问题的场景,

Session session = connection.createSession(false, Session.DUPS_OK_ACKNOWLEDGE);

需要注意的是,由于DUPS_OK_ACKNOWLEDGE模式下允许消息的重复消费,因此不能将该模式应用于对数据准确性要求极高的场景,例如金融交易等。


小结

此部分内容转载此文: Activemq中的四种消息确认模式

AUTO_ACKNOWLEDGE : 自动确认,这就意味着消息的确认时机将由consumer择机确认."择机确认"似乎充满了不确定性,这也意味着,开发者必须明确知道"择机确认"的具体时机,否则将有可能导致消息的丢失,或者消息的重复接受.那么在ActiveMQ中,AUTO_ACKNOWLEDGE是如何运作的呢?

  1. 对于consumer而言,optimizeAcknowledge属性只会在AUTO_ACK模式下有效。
  2. 其中DUPS_ACKNOWLEGE也是一种潜在的AUTO_ACK,只是确认消息的条数和时间上有所不同。
  3. 在“同步”(receive)方法返回message之前,会检测optimizeACK选项是否开启,如果没有开启,此单条消息将立即确认,所以在这种情况下,message返回之后,如果开发者在处理message过程中出现异常,会导致此消息也不会redelivery,即"潜在的消息丢失";如果开启了optimizeACK,则会在unAck数量达到prefetch * 0.65时确认,当然我们可以指定prefetchSize = 1来实现逐条消息确认。
  4. 在"异步"(messageListener)方式中,将会首先调用listener.onMessage(message),此后再ACK,如果onMessage方法异常,将导致client端补充发送一个ACK_TYPE为REDELIVERED_ACK_TYPE确认指令;如果onMessage方法正常,消息将会正常确认(STANDARD_ACK_TYPE)。此外需要注意,消息的重发次数是有限制的,每条消息中都会包含“redeliveryCounter”计数器,用来表示此消息已经被重发的次数,如果重发次数达到阀值,将会导致发送一个ACK_TYPE为POSION_ACK_TYPE确认指令,这就导致broker端认为此消息无法消费,此消息将会被删除或者迁移到"dead letter"通道中。

因此当我们使用messageListener方式消费消息时,通常建议在onMessage方法中使用try-catch,这样可以在处理消息出错时记录一些信息,而不是让consumer不断去重发消息;如果你没有使用try-catch,就有可能会因为异常而导致消息重复接收的问题,需要注意你的onMessage方法中逻辑是否能够兼容对重复消息的判断。

CLIENT_ACKNOWLEDGE : 客户端手动确认,这就意味着AcitveMQ将不会“自作主张”的为你ACK任何消息,开发者需要自己择机确认。

  • 在此模式下,开发者需要需要关注几个方法:
1. message.acknowledge()2. ActiveMQMessageConsumer.acknowledege()3. ActiveMQSession.acknowledge()
  • 其中1和3是等效的,将当前session中所有consumer中尚未ACK的消息都一起确认,2只会对当前consumer中那些尚未确认的消息进行确认。开发者可以在合适的时机必须调用一次上述方法。
  • 我们通常会在基于Group(消息分组)情况下会使用CLIENT_ACKNOWLEDGE,我们将在一个group的消息序列接受完毕之后确认消息(组);不过当你认为消息很重要,只有当消息被正确处理之后才能确认时,也可以使用此ACK_MODE。
  • 如果开发者忘记调用acknowledge方法,将会导致当consumer重启后,会接受到重复消息,因为对于broker而言,那些尚未真正ACK的消息被视为“未消费”。
  • 开发者可以在当前消息处理成功之后,立即调用message.acknowledge()方法来"逐个"确认消息,这样可以尽可能的减少因网络故障而导致消息重发的个数;当然也可以处理多条消息之后,间歇性的调用acknowledge方法来一次确认多条消息,减少ack的次数来提升consumer的效率,不过这仍然是一个利弊权衡的问题。
  • 除了message.acknowledge()方法之外,ActiveMQMessageConumser.acknowledge()和ActiveMQSession.acknowledge()也可以确认消息,只不过前者只会确认当前consumer中的消息。其中sesson.acknowledge()和message.acknowledge()是等效的。
  • 无论是“同步”/“异步”,ActiveMQ都不会发送STANDARD_ACK_TYPE,直到message.acknowledge()调用。如果在client端未确认的消息个数达到prefetchSize * 0.5时,会补充发送一个ACK_TYPE为DELIVERED_ACK_TYPE的确认指令,这会触发broker端可以继续push消息到client端。(参看PrefetchSubscription.acknwoledge方法)
  • 在broker端,针对每个Consumer,都会保存一个因为"DELIVERED_ACK_TYPE"而“拖延”的消息个数,这个参数为prefetchExtension,事实上这个值不会大于prefetchSize * 0.5,因为Consumer端会严格控制DELIVERED_ACK_TYPE指令发送的时机(参见ActiveMQMessageConsumer.ackLater方法),broker端通过“prefetchExtension”与prefetchSize互相配合,来决定即将push给client端的消息个数,count = prefetchExtension + prefetchSize - dispatched.size(),其中dispatched表示已经发送给client端但是还没有“STANDARD_ACK_TYPE”的消息总量;
  • 由此可见,在CLIENT_ACK模式下,足够快速的调用acknowledge()方法是决定consumer端消费消息的速率;如果client端因为某种原因导致acknowledge方法未被执行,将导致大量消息不能被确认,broker端将不会push消息,事实上client端将处于“假死”状态,而无法继续消费消息。我们要求client端在消费1.5*prefetchSize个消息之前,必须acknowledge()一次;通常我们总是每消费一个消息调用一次,这是一种良好的设计。
  • 此外需要额外的补充一下:所有ACK指令都是依次发送给broker端,在CLIET_ACK模式下,消息在交付给listener之前,都会首先创建一个DELIVERED_ACK_TYPE的ACK指令,直到client端未确认的消息达到"prefetchSize * 0.5"时才会发送此ACK指令,如果在此之前,开发者调用了acknowledge()方法,会导致消息直接被确认(STANDARD_ACK_TYPE)。broker端通常会认为“DELIVERED_ACK_TYPE”确认指令是一种“slow consumer”信号,如果consumer不能及时的对消息进行acknowledge而导致broker端阻塞,那么此consumer将会被标记为“slow”,此后queue中的消息将会转发给其他Consumer。

DUPS_OK_ACKNOWLEDGE : "消息可重复"确认,意思是此模式下,可能会出现重复消息,并不是一条消息需要发送多次ACK才行。它是一种潜在的"AUTO_ACK"确认机制,为批量确认而生,而且具有“延迟”确认的特点。对于开发者而言,这种模式下的代码结构和AUTO_ACKNOWLEDGE一样,不需要像CLIENT_ACKNOWLEDGE那样调用acknowledge()方法来确认消息。

  • 在ActiveMQ中,如果所在Destination是Queue通道,我们真的可以认为DUPS_OK_ACK就是“AUTO_ACK + optimizeACK + (prefetch > 0)”这种情况,在确认时机上几乎完全一致;此外在此模式下,如果prefetchSize =1 或者没有开启optimizeACK,也会导致消息逐条确认,从而失去批量确认的特性。
  • 如果Destination为Topic,DUPS_OK_ACKNOWLEDGE才会产生JMS规范中诠释的意义,即无论optimizeACK是否开启,都会在消费的消息个数>=prefetch * 0.5时,批量确认(STANDARD_ACK_TYPE),在此过程中,不会发送DELIVERED_ACK_TYPE的确认指令,这是和AUTO_ACK的最大的区别。
  • 这也意味着,当consumer故障重启后,那些尚未ACK的消息会重新发送过来。

SESSION_TRANSACTED : 当session使用事务时,就是使用此模式。在事务开启之后,和session.commit()之前,所有消费的消息,要么全部正常确认,要么全部redelivery。这种严谨性,通常在基于GROUP(消息分组)或者其他场景下特别适合。在SESSION_TRANSACTED模式下,optimizeACK并不能发挥任何效果,因为在此模式下,optimizeACK会被强制设定为false,不过prefetch仍然可以决定DELIVERED_ACK_TYPE的发送时机。

  • 因为Session非线程安全,那么当前session下所有的consumer都会共享同一个transactionContext;同时建议,一个事务类型的Session中只有一个Consumer,已避免rollback()或者commit()方法被多个consumer调用而造成的消息混乱。

  • 当consumer接受到消息之后,首先检测TransactionContext是否已经开启,如果没有,就会开启并生成新的transactionId,并把信息发送给broker;此后将检测事务中已经消费的消息个数是否 >= prefetch * 0.5,如果大于则补充发送一个“DELIVERED_ACK_TYPE”的确认指令;这时就开始调用onMessage()方法,如果是同步(receive),那么即返回message。上述过程,和其他确认模式没有任何特殊的地方。

  • 当开发者决定事务可以提交时,必须调用session.commit()方法,commit方法将会导致当前session的事务中所有消息立即被确认;事务的确认过程中,首先把本地的deliveredMessage队列中尚未确认的消息全部确认(STANDARD_ACK_TYPE);此后向broker发送transaction提交指令并等待broker反馈,如果broker端事务操作成功,那么将会把本地deliveredMessage队列清空,新的事务开始;如果broker端事务操作失败(此时broker已经rollback),那么对于session而言,将执行inner-rollback,这个rollback所做的事情,就是将当前事务中的消息清空并要求broker重发(REDELIVERED_ACK_TYPE),同时commit方法将抛出异常。

  • 当session.commit方法异常时,对于开发者而言通常是调用session.rollback()回滚事务(事实上开发者不调用也没有问题),当然你可以在事务开始之后的任何时机调用rollback(),rollback意味着当前事务的结束,事务中所有的消息都将被重发。需要注意,无论是inner-rollback还是调用session.rollback()而导致消息重发,都会导致message.redeliveryCounter计数器增加,最终都会受限于brokerUrl中配置的"jms.redeliveryPolicy.maximumRedeliveries",如果rollback的次数过多,而达到重发次数的上限时,消息将会被DLQ(dead letter)。

INDIVIDUAL_ACKNOWLEDGE : 单条消息确认,这种确认模式,我们很少使用,它的确认时机和CLIENT_ACKNOWLEDGE几乎一样,当消息消费成功之后,需要调用message.acknowledege来确认此消息(单条),而CLIENT_ACKNOWLEDGE模式先message.acknowledge()方法将导致整个session中所有消息被确认(批量确认)。


Activemq支持众多协议

这里以In Jvm协议进行讲解:

ActiveMQ在内部支持以下的In-JVM协议:

  1. VM Transport Protocol(VM传输协议):这是ActiveMQ默认的In-JVM协议,它允许在同一个Java虚拟机(JVM)内的不同线程之间进行消息的传递。使用VM传输协议,消息可以在应用程序内部的不同组件之间快速传递,而无需通过网络进行通信。

  2. VM Transport Bridge(VM传输桥):这是ActiveMQ提供的用于连接不同Java虚拟机(JVM)之间的In-JVM通信的协议。它允许在不同的JVM实例之间通过内存进行消息传递,提供了一种简单而高效的方式来实现进程间通信。

这些In-JVM协议适用于在同一个Java虚拟机内的不同线程或不同进程之间进行高性能的消息传递。它们可以避免使用网络通信带来的延迟和开销,提供了更快的消息传递速度和更低的资源消耗。

ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory("vm://localhost");
factory.setObjectMessageSerializationDefered(true);

Activemq还支持AMQP协议,MQTT协议等,这里不多列举了。


Activemq支持的定时消息,延迟消息,优先级消息

这里简单介绍一下,具体配置和使用说明,大家参考官方文档或自行查询资料学习。
在这里插入图片描述

  • 延迟消息
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long time = 60 * 1000;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, time);
producer.send(message); 
  • 您可以将消息设置为在初始延迟时等待,重复传递 10 次,每次重新传递之间等待 10 秒:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
long delay = 30 * 1000;
long period = 10 * 1000;
int repeat = 9;
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
producer.send(message);
  • 通过CRON表达式设置消息被间隔发送一次
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
producer.send(message);
  • 假设您希望一条消息传递 10 次,每条消息之间有一秒钟的延迟 - 并且您希望每小时发生一次 - 您将这样做:
MessageProducer producer = session.createProducer(destination);
TextMessage message = session.createTextMessage("test msg");
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_CRON, "0 * * * *");
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, 1000);
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, 1000);
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, 9);
producer.send(message);

消息选择器

ActiveMQ 是一个流行的开源消息中间件,它支持多种消息选择器的方式。消息选择器允许您从消息队列中选择特定的消息,以便只有满足某些条件的消息会被消费者接收。

在 ActiveMQ 中,消息选择器使用 SQL-92 类似的语法来定义选择条件。您可以在创建消费者时使用消息选择器,通过在消息选择器表达式中指定条件来选择感兴趣的消息。

下面是一些常见的消息选择器示例:

  1. 使用消息属性进行选择:
String selector = "color = 'red'";
MessageConsumer consumer = session.createConsumer(destination, selector);

上述代码中,消息选择器指定了一个属性 color 的值为 'red' 的消息将被选择。

  1. 使用 JMS 头部属性进行选择:
String selector = "JMSPriority > 5";
MessageConsumer consumer = session.createConsumer(destination, selector);

上述代码中,消息选择器指定了一个 JMS 头部属性 JMSPriority 的值大于 5 的消息将被选择。

  1. 使用消息体内容进行选择:
String selector = "text LIKE '%important%'";
MessageConsumer consumer = session.createConsumer(destination, selector);

上述代码中,消息选择器指定了一个消息体内容包含 'important' 关键字的消息将被选择。

请注意,消息选择器只能应用于支持消息选择器功能的消息中间件,而且消费者必须使用带有消息选择器的 createConsumer 方法来创建。另外,使用过多的消息选择器可能会对系统性能产生负面影响,因此在使用时需要权衡选择条件的复杂性和性能需求。

这里只给出一个简单的Demo,关于消息选择器的更多知识,可以参考官方文档,或者自行查找资料学习:

生产者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MessageProducerExample {
    public static void main(String[] args) throws JMSException {
        // 连接到 ActiveMQ 代理
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建目标队列
        Destination destination = session.createQueue("myQueue");

        // 创建生产者
        MessageProducer producer = session.createProducer(destination);

        // 创建消息
        TextMessage message1 = session.createTextMessage();
        message1.setText("Message 1");
        message1.setStringProperty("color", "red");

        TextMessage message2 = session.createTextMessage();
        message2.setText("Message 2");
        message2.setStringProperty("color", "blue");

        // 发送消息
        producer.send(message1);
        producer.send(message2);

        // 关闭连接
        connection.close();
    }
}

消费者:

import org.apache.activemq.ActiveMQConnectionFactory;

import javax.jms.*;

public class MessageConsumerExample {
    public static void main(String[] args) throws JMSException {
        // 连接到 ActiveMQ 代理
        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory("tcp://localhost:61616");
        Connection connection = connectionFactory.createConnection();
        connection.start();

        // 创建会话
        Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

        // 创建目标队列
        Destination destination = session.createQueue("myQueue");

        // 创建消费者并设置消息选择器
        MessageConsumer consumer = session.createConsumer(destination, "color = 'red'");

        // 接收消息
        Message message = consumer.receive();
        if (message instanceof TextMessage) {
            TextMessage textMessage = (TextMessage) message;
            System.out.println("Received message: " + textMessage.getText());
        }

        // 关闭连接
        connection.close();
    }
}

上述示例中,生产者向名为 “myQueue” 的队列发送了两条消息,每条消息都带有一个名为 “color” 的属性。然后消费者通过设置消息选择器 “color = ‘red’”,只接收具有红色属性的消息。


Spring整合

此部分内容转载: 廖雪峰大佬的JMS入门教程

  • 在上面依赖基础上,引入如下依赖
      <dependency>
        <groupId>org.springframework</groupId>
        <artifactId>spring-jms</artifactId>
      </dependency>
  • AppConfig中,通过@EnableJmsSpring自动扫描JMS相关的Bean,并加载JMS配置文件jms.properties
@EnableJms
@PropertySource({"classpath:/jms.properties"})
@SpringBootApplication
public class AppConfig {
    public static void main(String[] args) {
        SpringApplication.run(AppConfig.class, args);
    }
}
  • 首先要创建的Bean是ConnectionFactory,即连接消息服务器的连接池:
    @Bean
    ConnectionFactory createJMSConnectionFactory(
            @Value("${jms.uri:tcp://localhost:61616}") String uri,
            @Value("${jms.username:admin}") String username,
            @Value("${jms.password:password}") String password) {
        return new ActiveMQJMSConnectionFactory(uri, username, password);
    }

因为我们使用的消息服务器是ActiveMQ Artemis,所以ConnectionFactory的实现类就是消息服务器提供的ActiveMQJMSConnectionFactory,它需要的参数均由配置文件读取后传入,并设置了默认值。

  • 我们再创建一个JmsTemplate,它是Spring提供的一个工具类,和JdbcTemplate类似,可以简化发送消息的代码:
    @Bean
    JmsTemplate createJmsTemplate(@Autowired ConnectionFactory connectionFactory) {
        return new JmsTemplate(connectionFactory);
    }
  • 下一步要创建的是JmsListenerContainerFactory,
    @Bean("jmsListenerContainerFactory")
    DefaultJmsListenerContainerFactory createJmsListenerContainerFactory(@Autowired ConnectionFactory connectionFactory) {
        DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory);
        return factory;
    }
  • 除了必须指定Bean的名称为jmsListenerContainerFactory外,这个Bean的作用是处理和Consumer相关的Bean。我们先跳过它的原理,继续编写MessagingService来发送消息:
@Component
public class MessagingService {
    @Autowired
    private ObjectMapper objectMapper;
    @Autowired
    private JmsTemplate jmsTemplate;

    public void sendMailMessage(MailMessage msg) throws Exception {
        final String text = objectMapper.writeValueAsString(msg);
        jmsTemplate.send("jms/queue/mail", new MessageCreator() {
            public Message createMessage(Session session) throws JMSException {
                return session.createTextMessage(text);
            }
        });
    }

    public static class MailMessage {
        private String header;
        private String body;
    }
}

JMS的消息类型支持以下几种:

  • TextMessage:文本消息;
  • BytesMessage:二进制消息;
  • MapMessage:包含多个Key-Value对的消息;
  • ObjectMessage:直接序列化Java对象的消息;
  • StreamMessage:一个包含基本类型序列的消息。

最常用的是发送基于JSON的文本消息,上述代码通过JmsTemplate创建一个TextMessage并发送到名称为jms/queue/mail的Queue。

注意:Artemis消息服务器默认配置下会自动创建Queue,因此不必手动创建一个名为jms/queue/mail的Queue,但不是所有的消息服务器都会自动创建Queue,生产环境的消息服务器通常会关闭自动创建功能,需要手动创建Queue。

再注意到MailMessage是我们自己定义的一个JavaBean,真正的JMS消息是创建的TextMessage,它的内容是JSON。

当用户注册成功后,我们就调用MessagingService.sendMailMessage()发送一条JMS消息,此代码十分简单,这里不再贴出。

下面我们要详细讨论的是如何处理消息,即编写Consumer。从理论上讲,可以创建另一个Java进程来处理消息,但对于我们这个简单的Web程序来说没有必要,直接在同一个Web应用中接收并处理消息即可。

处理消息的核心代码是编写一个Bean,并在处理方法上标注@JmsListener

@Component
public class MailMessageListener {
    final Logger logger = LoggerFactory.getLogger(getClass());

    @Autowired ObjectMapper objectMapper;
    @Autowired MailService mailService;

    @JmsListener(destination = "jms/queue/mail", concurrency = "10")
    public void onMailMessageReceived(Message message) throws Exception {
        logger.info("received message: " + message);
        if (message instanceof TextMessage) {
            String text = ((TextMessage) message).getText();
            MailMessage mm = objectMapper.readValue(text, MailMessage.class);
            mailService.sendRegistrationMail(mm);
        } else {
            logger.error("unable to process non-text message!");
        }
    }
}    

注意到@JmsListener指定了Queue的名称,因此,凡是发到此Queue的消息都会被这个onMailMessageReceived()方法处理,方法参数是JMS的Message接口,我们通过强制转型为TextMessage并提取JSON,反序列化后获得自定义的JavaBean,也就获得了发送邮件所需的所有信息。

下面问题来了:Spring处理JMS消息的流程是什么?

如果我们直接调用JMS的API来处理消息,那么编写的代码大致如下:

// 创建JMS连接:
Connection connection = connectionFactory.createConnection();
// 创建会话:
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个Consumer:
MessageConsumer consumer = session.createConsumer(queue);
// 为Consumer指定一个消息处理器:
consumer.setMessageListener(new MessageListener() { 
    public void onMessage(Message message) {
        // 在此处理消息... 
    }
});
// 启动接收消息的循环:
connection.start();

我们自己编写的MailMessageListener.onMailMessageReceived()相当于消息处理器:

consumer.setMessageListener(new MessageListener() { 
    public void onMessage(Message message) {
        mailMessageListener.onMailMessageReceived(message); 
    }
});

所以,Spring根据AppConfig的注解@EnableJms自动扫描带有@JmsListener的Bean方法,并为其创建一个MessageListener把它包装起来。

注意到前面我们还创建了一个JmsListenerContainerFactory的Bean,它的作用就是为每个MessageListener创建MessageConsumer并启动消息接收循环。

再注意到@JmsListener还有一个concurrency参数,10表示可以最多同时并发处理10个消息,5-10表示并发处理的线程可以在5~10之间调整。

因此,Spring在通过MessageListener接收到消息后,并不是直接调用mailMessageListener.onMailMessageReceived(),而是用线程池调用,因此,要时刻牢记,onMailMessageReceived()方法可能被多线程并发执行,一定要保证线程安全。

我们总结一下Spring接收消息的步骤:

通过JmsListenerContainerFactory配合@EnableJms扫描所有@JmsListener方法,自动创建MessageConsumerMessageListener以及线程池,启动消息循环接收处理消息,最终由我们自己编写的@JmsListener方法处理消息,可能会由多线程同时并发处理。

要验证消息发送和处理,我们注册一个新用户,可以看到如下日志输出:

2020-06-02 08:04:27 INFO  c.i.learnjava.web.UserController - user registered: bob@example.com
2020-06-02 08:04:27 INFO  c.i.l.service.MailMessageListener - received message: ActiveMQMessage[ID:9fc5...]:PERSISTENT/ClientMessageImpl[messageID=983, durable=true, address=jms/queue/mail, ...]]
2020-06-02 08:04:27 INFO  c.i.learnjava.service.MailService - [send mail] sending registration mail to bob@example.com...
2020-06-02 08:04:30 INFO  c.i.learnjava.service.MailService - [send mail] registration mail was sent to bob@example.com.

可见,消息被成功发送到Artemis,然后在很短的时间内被接收处理了。

使用消息服务对发送Email进行改造的好处是,发送Email的能力通常是有限的,通过JMS消息服务,如果短时间内需要给大量用户发送Email,可以先把消息堆积在JMS服务器上慢慢发送,对于批量发送邮件、短信等尤其有用。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/541445.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C++技能树】类和对象的使用 --初始化列表,static,友元,内部类,匿名对象的理解与使用

Halo&#xff0c;这里是Ppeua。平时主要更新C语言&#xff0c;C&#xff0c;数据结构算法…感兴趣就关注我bua&#xff01; 类和对象的使用 0. 初始化列表explicit关键字 1.Static静态成员变量2.友元2.1.友元函数2.2.友元类 3.内部类4.匿名对象4.匿名对象至此初始化列表,static…

【Linux】2. Shell运行原理与Linux权限操作

专栏导读 &#x1f341;作者简介&#xff1a;余悸&#xff0c;在读本科生一枚&#xff0c;致力于 C方向学习。 &#x1f341;收录于 C 专栏&#xff0c;本专栏主要内容为 C 初阶、 C 进阶、 STL 详解等&#xff0c;持续更新中&#xff01; &#x1f341;相关专栏推荐&#xff1…

Cloud Studio 有“新”分享

GitHub仓库推荐 Awesome Open Source Applications - 收集了各种开源应用程序&#xff0c;包括 Web 应用、桌面应用、移动应用等。Cloud Studio 一键运行 Free for Dev - 收集了各种免费的开源应用程序和工具&#xff0c;包括 Web 应用、桌面应用、移动应用等。Cloud Studio 一…

kaggle经典赛 | IEEE欺诈检测竞赛金牌方案分享

https://www.kaggle.com/competitions/ieee-fraud-detection 赛题背景 想象一下&#xff0c;站在杂货店的收银台&#xff0c;身后排着长队&#xff0c;收银员不那么安静地宣布你的卡被拒绝了。在这一刻&#xff0c;你可能没有考虑决定你命运的数据科学。 尴尬&#xff0c;并…

一文搞定验证码(上部分)

1.背景 目前收到反馈,存在一类用户,在利用会员权益大量进行二次销售;而且还是自动进行操作的. 那么意味着他们有一个自动平台在对我们的商品进行二次销售. 这是就该我们的主角登场了. 验证码模块可以有效防止机器人刷接口 2.开源验证码框架 通过在网上查找资料, 发现了几个验…

C++:采用哈希表封装unordered_map和unordered_set

目录 一. 如何使用一张哈希表封装unordered_map和unordered_set 二. 哈希表迭代器的实现 2.1 迭代器成员变量及应当实现的功能 2.2 operator函数 2.3 operator*和operator->函数 2.4 operator!和operator函数 2.5 begin()和end() 2.6哈希表迭代器实现代码 三. unord…

渗透测试--6.2.mdk3攻击wifi

前言 本次依然使用Kali虚拟机系统&#xff0c;win11主机&#xff0c;网卡Ralink 802.11 配合mdk3进行wifi伪造、连接设备查看、解除认证攻击。本次实验只用于学习交流&#xff0c;攻击目标为自家的手机热点&#xff0c;请勿违法使用&#xff01; 目录 前言 1.Deauth攻击原…

Electron简介、安装、实践

本文中的所有代码均存放在https://github.com/MADMAX110/my-electron-app Electron是什么&#xff1f; Electron是一个开源的框架&#xff0c;可以使用JavaScript, HTML和CSS来构建跨平台的桌面应用程序。Electron的核心是由Chromium和Node.js组成&#xff0c;它们分别提供了渲…

【springboot 开发工具】接口文档我正在使用它生成,舒坦

前言 先来描述下背景&#xff1a;由于新公司业务属于自研产品开发&#xff0c;但是发现各产品业务线对于接口文档暂时还是通过集成Swagger来维护&#xff0c;准确来说是knife4j&#xff08;Swagger的增强解决方案&#xff09;。但是对于5年的后端开发老说&#xff0c;早就厌倦…

Java-线程安全的四个经典案例和线程池

单例模式 有些对象&#xff0c;在一个程序中应该只有唯一 一个实例&#xff08;光靠人保证不靠谱 借助语法来保证&#xff09; 就可以使用单例模式 在单例模式下 对象的实例化被限制了 只能创建一个 多了的也创建不了 单例模式分为两种&#xff1a;饿汉模式和懒汉模式 饿汉模式…

[Java基础]—SpringBoot

Springboot入门 Helloworld 依赖 <parent><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-parent</artifactId><version>2.3.4.RELEASE</version> </parent><dependencies><depend…

软件测试基础知识整理(四)- 软件开发模型、测试过程模型

目录 一、软件开发模型 1.1 瀑布模型 1.1.1 特点 1.1.2 优缺点 1.2 快速原型模型&#xff08;了解&#xff09; 1.2.1 特点 1.2.2 优缺点 1.3 螺旋模型&#xff08;了解&#xff09; 1.3.1 特点 1.3.2 优缺点 二、测试过程模型 2.1 V模型&#xff08;重点&#xff…

LeetCode_29. 两数相除

目录 题目描述 思路分析 我的题解 题目描述 给你两个整数&#xff0c;被除数 dividend 和除数 divisor。将两数相除&#xff0c;要求 不使用 乘法、除法和取余运算。 整数除法应该向零截断&#xff0c;也就是截去&#xff08;truncate&#xff09;其小数部分。例如&#xff…

8个免费的高质量UI图标大全网站

UI图标素材是设计师必不可少的设计元素。 高质量的UI图标会让设计师的设计效率事半功倍。 本文分享8个免费的高质量UI图标大全网站。 即时设计资源社区 即时设计资源广场中精选了多款专业免费的UI图标设计资源&#xff0c;无需下载即可一键保存源文件&#xff0c;同时还提供…

深入浅析Linux Perf 性能分析工具及火焰图

Perf Event 子系统 Perf 是内置于 Linux 内核源码树中的性能剖析&#xff08;profiling&#xff09;工具。它基于事件采样的原理&#xff0c;以性能事件为基础&#xff0c;支持针对处理器相关性能指标与操作系统相关性能指标的性能剖析。可用于性能瓶颈的查找与热点代码的定位…

Maven PKIX path building failed 错误提示

最近公司的项目突然出现了下面的提示。 PKIX path building failed: sun.security.provider.certpath.SunCertPathBuilderException: unable to find valid certification path to requested target -> [Help 2]问题和解决 出现上面的提示的问题是因为 SSL 签名的问题。 …

经典面试题:理解Cookie和Session之间的区别

文章目录 一、Cookie概念先知1、Cookie是什么&#xff1f;2、Cookie从哪里来&#xff1f;3、Cookie要存到哪里去&#xff1f;4、Cookie是存在哪里的&#xff1f;5、浏览器是如何通过Cookie来记录的&#xff1f;6、Cookie的过期时间有什么用&#xff1f; 二、见见Cookie三、会话…

软件设计师考试笔记,已通过

目录 系统可靠度 外部实体 内聚类型 编译过程 逆波兰式 前驱图 scrum框架模型 编译和解释 有限自动机 聚簇索引和非聚簇索引 二叉树的前序,中序,后序遍历 动态规划贪心算法 算法 01背包问题 系统可靠度 1. 串联部件可靠度 串联部件想要这条路走通&#xff0c;只有…

软件测试行业7年了,薪资从10k到了22k,感觉到头了?

蜕变之前 明天的希望&#xff0c;让我们忘了今天的痛苦。 怎样区别一个废柴和一个精英&#xff1f;看外貌&#xff0c;看气质&#xff0c;看谈吐&#xff0c;看消费… 有人忙着把人和人进行分类&#xff0c;有人忙着怎么从这一阶层过渡到上一阶层。当你很累的时候&#xff0c…

引入外部文件实现步骤

1.引入数据库相关依赖 2.创建外部属性文件&#xff0c;properties格式&#xff0c;定义数据信息&#xff1a;用户名 密码 地址等 3.创建spring配置文件&#xff0c;引入context命名空间&#xff0c;引入属性文件&#xff0c;使用表达式完成注入 <beans xmlns"http://w…