入门
官网: http://activemq.apache.org/
ActiveMQ 是Apache出品,最流行的,能力强劲的开源消息总线。ActiveMQ 是一个完全支持JMS1.1和J2EE 1.4规范的 JMS Provider实现。
JMS
JMS即Java消息服务(Java Message Service)应用程序接口,是一个Java平台中关于面向消息中间件(MOM)的API,用于在两个应用程序之间,或分布式系统中发送消息,进行异步通信。Java消息服务是一个与具体平台无关的API,绝大多数MOM提供商都对JMS提供支持。
优势
- 异步:发送消息者可以在发送消息后进行其它的工作,不用等待接收者的回应,而接收者客户端获取消息的时候,不需要主动发送请求,消息会自动发送给可用的客户端;
- 解耦:客户进程和服务对象进程不要求都正常运行,如果由于服务对象崩溃或者网络故障导致客户的请求不可达,客户不会接收到异常,消息中间件能保证消息不会丢失。
- 可靠:JMS保证消息只会递送一次。可能你遇到过重复创建消息问题,而JMS能帮你避免该问题。
消息模型
点对点(Point-To-Point)
- 概述
点对点模型用于消息生产者和消息消费者之间点到点的通信。消息生产者将消息发送到由某个特定名字标示的消息队列(Queue)中,消息消费者从这个特定队列中获取对应消息。在消息传送给消费者之前它被存储在这个队列中。队列可以是持久的,以保证在消息服务出现故障时仍然能够传递消息。
在点对点消息传送模型中,应用程序由消息队列(Queue),发送者(Sender),接收者(Receiver)组成。每一个消息发送给一个特殊的消息队列,该队列保存了所有发送给它的消息(除了被接收者消费掉的和过期的消息)。 - 特点
- 每个消息只有一个消费者(Consumer)(即一旦被消费,消息就不再在消息队列中)。
- 发送者和接收者之间在时间上没有依赖性,也就是说当发送者发送了消息之后,不管接收者有没有正在运行,它不会影响消息被发送到消息队列中。
- 当接收者收到消息的时候,会发送确认收到通知(acknowledgement)到消息队列。
- 模型图
发布订阅(Publish/Subscribe)
- 概述
在发布/订阅消息模型中,发布者发布一个消息,该消息通过主题(Topic)传递给所有的客户端。在这种模型中,发布者和订阅者彼此不知道对方,是匿名的且可以动态发布和订阅主题(Topic)。主题(Topic)主要用于保存和传递消息,且会一直保存消息直到消息被传递给客户端。
在发布与订阅消息传送模型中,应用程序由主题(Topic),发布者(Publisher),订阅者(Subscriber)组成。客户端将消息发送到主题。多个发布者将消息发送到Topic,系统将这些消息传递给多个订阅者。 - 特点
- 每个消息可以有多个消费者即多个订阅者。
- 发布者和订阅者之间有时间上的依赖性。针对某个主题(Topic)的订阅者,它必须创建一个订阅者之后,才能消费发布者的消息,而且为了消费消息,订阅者必须保持运行的状态。
- 为了缓和这样严格的时间相关性,JMS允许订阅者创建一个可持久化的订阅。这样,即使订阅者没有被激活(运行),它也能接收到发布者的消息。
- 模型图
消息消费
在JMS中,消息的产生和消息是异步的。对于消费来说,JMS的消息者可以通过两种方式来消费消息。
- 阻塞:订阅者或接收者调用receive方法来接收消息,receive方法在能够接收到消息之前(或超时之前)将一直阻塞。
- 异步:订阅者或接收者需要注册一个消息监听器。当消息到达之后,系统自动调用监听器的onMessage方法。
JMS 组成
JMS API
要素 | 作用 |
---|---|
Destination | 表示消息所走通道的目标定义,用来定义消息从发送端发出后要走的通道,而不是接收方。Destination属于管理类对象 |
ConnectionFactory | 顾名思义,用于创建连接对象,ConnectionFactory属于管理类的对象 |
Connection | 连接接口,所负责的重要工作时创建Session |
Session | 会话接口,这是一个非常重要的对象,消息发送者、消息接收者以及消息对象本身,都是通过这个会话对象创建的 |
MessageConsumer | 消息的消费者,也就是订阅消息并处理消息的对象 |
MessageProducer | 消息的生产者,也就是用来发送消息的对象 |
Message | 消息,包含了消息头、消息体、消息属性 |
- ConnectionFactory
创建Connection对象的工厂,针对两种不同的jms消息模型,分别有QueueConnectionFactory和TopicConnectionFactory两种。 - Destination
Destination的意思是消息生产者的消息发送目标或者说消息消费者的消息来源。对于消息生产者来说,它的Destination是某个队列(Queue)或某个主题(Topic);对于消息消费者来说,它的Destination也是某个队列或主题(即消息来源)。所以,Destination实际上就是两种类型的对象:Queue、Topic - Connection
Connection表示在客户端和JMS系统之间建立的链接(对TCP/IP socket的包装)。Connection可以产 生一个或多个Session - Session
Session 是我们对消息进行操作的接口,可以通过session创建生产者、消费者、消息等。Session 提供了事务的功能,如果需要使用session发送/接收多个消息时,可以将这些发送/接收动作放到一个事务中。 - Producter
Producter(消息生产者):消息生产者由Session创建,并用于将消息发送到Destination。同样,消息生产者分两种类型:QueueSender和TopicPublisher。可以调用消息生产者的方法(send或publish方法)发送消息。 - Consumer
Consumer(消息消费者):消息消费者由Session创建,用于接收被发送到Destination的消息。两种类型:QueueReceiver和TopicSubscriber。可分别通过session的createReceiver(Queue)或createSubscriber(Topic)来创建。当然,也可以session的creatDurableSubscriber方法来创建持久化的订阅者。 - MessageListener
消息监听器。如果注册了消息监听器,一旦消息到达,将自动调用监听器的onMessage方法。EJB中的MDB(Message-Driven Bean)就是一种MessageListener。 - Message
发送的消息实例
ActiveMQ安装及控制台
原生 JMS API 入门案例
导入依赖
<dependencies>
<!-- activemq 所需要的jar 包-->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.15.9</version>
</dependency>
<!-- activemq 和 spring 整合的基础包 -->
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.16</version>
</dependency>
</dependencies>
Queue(PTP模式)的生产及消费
消息生产者
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce {
// linux 上部署的activemq 的 IP 地址 + activemq 的端口号
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
// 目的地的名称
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception{
// 1 按照给定的url创建连接工厂,这个构造器采用默认的用户名密码。该类的其他构造方法可以指定用户名和密码。
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 2 通过连接工厂,获得连接 connection 并启动访问。
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
// 3 创建会话session 。第一参数是是否开启事务, 第二参数是消息签收的方式
Session session = connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地(两种 :队列/主题)。Destination是Queue和Topic的父类
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的生产者
MessageProducer messageProducer = session.createProducer(queue);
// 6 通过messageProducer 生产 3 条 消息发送到消息队列中
for (int i = 1; i < 4 ; i++) {
// 7 创建消息
TextMessage textMessage = session.createTextMessage("msg--" + i);
// 8 通过messageProducer发送给mq
messageProducer.send(textMessage);
}
// 9 关闭资源
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** 消息发送到MQ完成 ****");
}
}
控制台显示:
Number Of Pending Messages:等待消费的消息,这个是未出队列的数量,公式=总接收数-总出队列数。
Number Of Consumers:消费者数量,消费者端的消费者数量。
Messages Enqueued:进队消息数,进队列的总消息量,包括出队列的。这个数只增不减。
Messages Dequeued:出队消息数,可以理解为是消费者消费掉的数量。
总结:
当有一个消息进入这个队列时,等待消费的消息是1,进入队列的消息是1。
当消息消费后,等待消费的消息是0,进入队列的消息是1,出队列的消息是1。
当再来一条消息时,等待消费的消息是1,进入队列的消息就是2。
消息消费者
- 阻塞方式:
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 消息的消费者
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
// 5 创建消息的消费者
MessageConsumer messageConsumer = session.createConsumer(queue);
while(true){
// reveive() 一直等待接收消息,在能够接收到消息之前将一直阻塞。 是同步阻塞方式 。和socket的accept方法类似的。
// reveive(Long time) : 等待n毫秒之后还没有收到消息,就是结束阻塞。
// 因为消息发送者是 TextMessage,所以消息接受者也要是TextMessage
TextMessage message = (TextMessage)messageConsumer.receive();
if (null != message){
System.out.println("****消费者的消息:"+message.getText());
}else {
break;
}
}
messageConsumer.close();
session.close();
connection.close();
}
}
控制台显示:
- 非阻塞方式
package com.at.activemq.queue;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
// 消息的消费者 也就是回答消息的系统
public class JmsConsumer {
public static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
public static final String QUEUE_NAME = "jdbc01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
javax.jms.Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
/* 通过监听的方式来消费消息,是异步非阻塞的方式消费消息。
通过messageConsumer 的setMessageListener 注册一个监听器,当有消息发送来时,系统自动调用MessageListener 的 onMessage 方法处理消息
*/
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
// instanceof 判断是否A对象是否是B类的子类
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("****消费者的消息:"+textMessage.getText());
}catch (JMSException e) {
e.printStackTrace();
}
}
}
});
// 让主线程不要结束。因为一旦主线程结束了,其他的线程(如此处的监听消息的线程)也都会被迫结束。
// 实际开发中,我们的程序会一直运行,这句代码都会省略。
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
总结
- 2种消费方式
同步阻塞方式(receive)
订阅者或接收者抵用MessageConsumer的receive()方法来接收消息,receive方法在能接收到消息之前(或超时之前)将一直阻塞。
异步非阻塞方式(监听器onMessage())
订阅者或接收者通过MessageConsumer的setMessageListener(MessageListener listener)注册一个消息监听器,当消息到达之后,系统会自动调用监听器MessageListener的onMessage(Message message)方法。 - 队列的特点
点对点消息传递域的特点如下:
(1) 每个消息只能有一个消费者,类似1对1的关系。好比个人快递自己领取自己的。
(2) 消息的生产者和消费者之间没有时间上的相关性。无论消费者在生产者发送消息的时候是否处于运行状态,消费
者都可以提取消息。好比我们的发送短信,发送者发送后不见得接收者会即收即看。
(3) 消息被消费后队列中不会再存储,
所以消费者不会消费到已经被消费掉的消息。 - 消息消费情况
情况1:只启动消费者1。
结果:消费者1会消费所有的数据。
情况2:先启动消费者1,再启动消费者2。
结果:消费者1消费所有的数据。消费者2不会消费到消息。
情况3:生产者发布6条消息,在此之前已经启动了消费者1和消费者2。
结果:消费者1和消费者2平摊了消息。各自消费3条消息。
Topic(Pub/Sub模式)的生产及消费
消息生产者
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsProduce_topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
MessageProducer messageProducer = session.createProducer(topic);
for (int i = 1; i < 4 ; i++) {
TextMessage textMessage = session.createTextMessage("topic_name--" + i);
messageProducer.send(textMessage);
MapMessage mapMessage = session.createMapMessage();
}
messageProducer.close();
session.close();
connection.close();
System.out.println(" **** TOPIC_NAME消息发送到MQ完成 ****");
}
}
消息消费者
package com.at.activemq.topic;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class JmsConsummer_topic {
public static final String ACTIVEMQ_URL = "tcp://192.168.17.3:61616";
public static final String TOPIC_NAME = "topic01";
public static void main(String[] args) throws Exception{
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 4 创建目的地 (两种 : 队列/主题 这里用主题)
Topic topic = session.createTopic(TOPIC_NAME);
MessageConsumer messageConsumer = session.createConsumer(topic);
// MessageListener接口只有一个方法,可以使用lambda表达式
messageConsumer.setMessageListener( (message) -> {
if (null != message && message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("****消费者text的消息:"+textMessage.getText());
}catch (JMSException e) {
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
存在多个消费者,每个消费者都能收到,自从自己启动后所有生产的消息。
- 控制台显示
topic有多个消费者时,消费消息的数量 ≈ 在线消费者数量*生产消息的数量。
下图展示了:我们先启动了3个消费者,再启动一个生产者,并生产了3条消息。
Spring 与 ActvieMQ 整合
消息生产者
- 引入依赖
<dependencies>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-all</artifactId>
<version>5.11.2</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-core</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-web</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-oxm</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-tx</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jdbc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-webmvc</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-aop</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-context-support</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-test</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>5.0.2.RELEASE</version>
</dependency>
<dependency>
<groupId>javax.jms</groupId>
<artifactId>javax.jms-api</artifactId>
<version>2.0.1</version>
</dependency>
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-spring</artifactId>
<version>3.7</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
- 编写Spring整合ActiveMQ配置:applicationContext-producer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:amp="http://activemq.apache.org/schema/core"
xsi:schemaLocation="http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!--1.创建连接工厂对象-->
<amp:connectionFactory
id="connetionFactory"
brokerURL="tcp://192.168.66.133:61616"
userName="admin"
password="admin"/>
<!--2.创建缓存连接工厂-->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!--注入连接工厂-->
<property name="targetConnectionFactory" ref="connetionFactory"/>
<!--缓存消息数据-->
<property name="sessionCacheSize" value="5"/>
</bean>
<!--3.创建用于点对点发送的JmsTemplate-->
<bean id="jmsQueueTemplate"
class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--指定是否为发布订阅模式-->
<property name="pubSubDomain" value="false"/>
</bean>
<!--4.创建用于发布订阅发送的JmsTemplate-->
<bean id="jmsTopicTemplate"
class="org.springframework.jms.core.JmsTemplate">
<!--注入缓存连接工厂-->
<property name="connectionFactory" ref="cachingConnectionFactory"/>
<!--指定是否为发布订阅模式-->
<property name="pubSubDomain" value="true"/>
</bean>
</beans>
- 编写测试类,实现发送消息
package com.idata.tuxi.wizard.ui;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.core.MessageCreator;
import org.springframework.test.context.ContextConfiguration;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 演示Spring与ActiveMQ整合
*/
@RunWith(SpringJUnit4ClassRunner.class) // junit与spring整合
@ContextConfiguration("classpath:applicationContext-producer.xml") // 加载spring配置文件
public class SpringProducer {
//点对点模式
@Autowired
@Qualifier("jmsQueueTemplate")
private JmsTemplate jmsQueueTemplate;
//发布订阅模式
@Autowired
@Qualifier("jmsTopicTemplate")
private JmsTemplate jmsTopicTemplate;
/**
* 点对点发送
*/
@Test
public void ptpSender() {
/**
* 参数一:指定队列的名称
* 参数二:MessageCreator接口,我们需要提供该接口的匿名内部实现
*/
jmsQueueTemplate.send("spring_queue", new MessageCreator() {
//我们只需要返回发送的消息内容即可
@Override
public Message createMessage(Session session) throws JMSException {
//创建文本消息
TextMessage textMessage = session.createTextMessage("spring test
message");
return textMessage;
}
});
System.out.println("消息发送已完成");
}
/**
* 发布订阅发送
*/
@Test
public void psSender() {
jmsTopicTemplate.send("spring_topic", new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
//创建文本消息
TextMessage textMessage = session.createTextMessage("spring test
message--topic");
return textMessage;
}
});
System.out.println("消息发送已完成");
}
}
消息消费者
- 编写监听器:监听主题消息、队列消息
@Component
public class EmailMessageListener implements MessageListener {
@Override
public void onMessage(Message message) {
MapMessage mapMessage = (MapMessage) message;
try {
String email = mapMessage.getString("email");
System.out.println("消费消息:" + email);
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 编写Spring整合ActiveMQ配置:applicationContext-consumer.xml
<?xml version="1.0" encoding="UTF-8"?>
<beans
xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xmlns:context="http://www.springframework.org/schema/context"
xmlns:tx="http://www.springframework.org/schema/tx"
xmlns:amq="http://activemq.apache.org/schema/core"
xmlns:jms="http://www.springframework.org/schema/jms"
xsi:schemaLocation="
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/aop/spring-aop.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/tx
http://www.springframework.org/schema/tx/spring-tx.xsd
http://www.springframework.org/schema/jms
http://www.springframework.org/schema/jms/spring-jms.xsd
http://activemq.apache.org/schema/core
http://activemq.apache.org/schema/core/activemq-core.xsd">
<!-- 1. 创建ActiveMQ连接工厂 -->
<amq:connectionFactory
id="amqConnectionFactory"
userName="admin" password="admin"
brokerURL="tcp://192.168.12.132:61616"/>
<!-- 2. 创建缓存工厂 -->
<bean id="cachingConnectionFactory"
class="org.springframework.jms.connection.CachingConnectionFactory">
<!-- 注入 连接工厂-->
<property name="targetConnectionFactory" ref="amqConnectionFactory"></property>
<!-- session缓存数目 -->
<property name="sessionCacheSize" value="5"></property>
</bean>
<!--开启注解扫描-->
<context:component-scan basepackage="cn.itcast.spring_activemq_consumer"/>
<!--
配置消息监听器类,监听队列或主题消息模型中的消息。从而实现消费消息。
jms:listener-container
destination-type 监听的JMS消息类型(queue、topic)
connection-factory Spring的缓存连接工厂
jms:listener
destination 对应MQ中队列名称或主题名称
rel 消息监听器类(实现MessageListener接口)
-->
<!-- 3.1 监听指定名称(email)的队列中的消息-->
<jms:listener-container destination-type="queue" connection-factory="cachingConnectionFactory">
<jms:listener destination="email" ref="emailMessageListener"/>
</jms:listener-container>
<!-- 3.2 监听指定名称(email)的主题中的消息 -->
<jms:listener-container destination-type="topic" connection-factory="cachingConnectionFactory">
<jms:listener destination="sms" ref="smsMessageListener"/>
</jms:listener-container>
</beans>
- 编写测试类,实现发送消息
/**
* Spring整合ActiveMQ消费消息
*/
public class Consumer {
public static void main(String[] args) throws IOException {
ApplicationContext ac =
new ClassPathXmlApplicationContext("applicationContext-activemq-consumer.xml");
System.in.read();
}
}
SpringBoot 与 ActiveMQ 整合
消息生产者
- 引入依赖
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.1.RELEASE</version>
<relativePath/>
</parent>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.targer>1.8</maven.compiler.targer>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
- 配置
server:
port: 9001 #端口
spring:
application:
name: activemq-producer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
- 编写启动类
package com.itheima.producer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 生产者启动类
*/
@SpringBootApplication
public class ProducerApplication {
public static void main(String[] args) {
SpringApplication.run(ProducerApplication.class,args);
}
}
- 编写生产者
package com.itheima.producer;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.test.context.junit4.SpringJUnit4ClassRunner;
/**
* 演示SpringBoot与ActiveMQ整合- 消息生产者
*/
@RunWith(SpringJUnit4ClassRunner.class)
@SpringBootTest(classes = ProducerApplication.class)
public class SpringBootProducer {
//JmsMessagingTemplate: 用于工具类发送消息,通过主题名/队列名自动区分
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Test
public void ptpSender(){
/**
* 参数一:队列的名称或主题名称
* 参数二:消息内容
*/
jmsMessagingTemplate.convertAndSend("springboot_queue","spring boot message");
}
}
消息消费者
- 引入依赖
与生产者一致 - 配置
server:
port: 9002 #端口
spring:
application:
name: activemq-consumer # 服务名称
# springboot与activemq整合配置
activemq:
broker-url: tcp://192.168.66.133:61616 # 连接地址
user: admin # activemq用户名
password: admin # activemq密码
# 指定发送模式 (点对点 false , 发布订阅 true)
jms:
pub-sub-domain: false
activemq:
name: springboot_queue
- 编写启动类
package com.itheima.consumer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 消息消费者启动类
*/
@SpringBootApplication
public class ConsumerApplication {
public static void main(String[] args) {
SpringApplication.run(ConsumerApplication.class,args);
}
}
- 编写消费者
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
/**
* 用于监听消息类(既可以用于队列的监听,也可以用于主题监听)
*/
@Component // 放入IOC容器
public class MsgListener {
/**
* 用于接收消息的方法
* destination: 队列的名称或主题的名称
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
}
可通过@JmsListener下指定对应消费的是Queue/Topic
@Slf4j
@Component
public class JMSConfiguration {
/**
* 队列容器工厂
*/
@Bean
public JmsListenerContainerFactory<?> queueListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(false);
factory.setConnectionFactory(connectionFactory);
return factory;
}
/**
* 主题队列工厂
*/
@Bean
public JmsListenerContainerFactory<?> topicListenerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setPubSubDomain(true);
factory.setConcurrency("1");
factory.setConnectionFactory(connectionFactory);
return factory;
}
}
消费时使用
@JmsListener(destination = DestinationConsts.MASTER_CLEANING,containerFactory = "queueListenerFactory")
JMS消息组成
整个JMS协议组成结构如下:
结构 | 描述 |
---|---|
JMS Provider | 消息中间件/消息服务器 |
JMS Producer | 消息生产者 |
JMS Consumer | 消息消费者 |
JMS Message | 消息(重要) |
JMS Message消息由三部分组成:
1)消息头
2)消息体
3)消息属性
JMS消息头
JMS消息头预定义了若干字段用于客户端与JMS提供者之间识别和发送消息,预编译头如下:
斜体 为重要的消息头
名称 | 描述 |
---|---|
JMSDestination | 消息发送的 Destination,在发送过程中由提供者设置 |
JMSMessageID | 唯一标识提供者发送的每一条消息。这个字段是在发送过程中由提供者设置的,客户机只能在消息发送后才能确定消息的 JMSMessageID |
JMSDeliveryMode | 消息持久化。包含值 DeliveryMode.PERSISTENT 或者DeliveryMode.NON_PERSISTENT。 |
JMSTimestamp | 提供者发送消息的时间,由提供者在发送过程中设置 |
JMSExpiration | 消息失效的时间,毫秒,值 0 表明消息不会过期,默认值为0 |
JMSPriority | 消息的优先级,由提供者在发送过程中设置。优先级 0 的优先级最低,优先级 9 的优先级最高。0-4为普通消息,5-9为加急消息。ActiveMQ不保证优先级高就一定先发送,只保证了加急消息必须先于普通消息发送。默认值为4 |
JMSCorrelationID | 通常用来链接响应消息与请求消息,由发送消息的 JMS 程序设置。 |
JMSReplyTo | 请求程序用它来指出回复消息应发送的地方,由发送消息的 JMS 程序设置 |
JMSType | JMS 程序用它来指出消息的类型。 |
JMSRedelivered | 消息的重发标志,false,代表该消息是第一次发生,true,代表该消息为重发消息 |
不过需要注意的是,在传送消息时,消息头的值由JMS提供者来设置,**因此开发者使用以上setJMSXXX()**方法分配的值就被忽略了,只有以下几个值是可以由开发者设置的:JMSCorrelationID,JMSReplyTo,JMSType。
JMS消息体
在消息体中,JMS API定义了五种类型的消息格式,让我们可以以不同的形式发送和接受消息,并提供了对已有消息格式的兼容。不同的消息类型如下:
JMS 定义了五种不同的消息正文格式,以及调用的消息类型,允许你发送并接收一些不同形式的数据,提供现有消息格式的一些级别的兼容性。
- TextMessage–一个字符串对象 *
- MapMessage–一套名称-值对
- ObjectMessage–一个序列化的 Java 对象 *
- BytesMessage–一个字节的数据流 *
- StreamMessage – Java原始值的数据流
TextMessage
- 生产
/**
* 发送TextMessage消息
*/
@Test
public void testMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
TextMessage textMessage = session.createTextMessage("文本消息");
return textMessage;
}
});
}
- 消费
/**
* 接收TextMessage的方法
*/
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof TextMessage){
TextMessage textMessage = (TextMessage)message;
try {
System.out.println("接收消息:"+textMessage.getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
MapMessage
- 生产
/**
* 发送MapMessage消息
*/
@Test
public void mapMessage(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name","张三");
mapMessage.setInt("age",20);
return mapMessage;
}
});
}
- 消费
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof MapMessage){
MapMessage mapMessage = (MapMessage)message;
try {
System.out.println("名称:"+mapMessage.getString("name"));
System.out.println("年龄:"+mapMessage.getString("age"));
} catch (JMSException e) {
e.printStackTrace();
}
}
}
ObjectMessage
- 生产
//发送ObjectMessage消息
@Test
public void test2(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
User user = new User();
user.setName("小苍");
user.setAge(18);
ObjectMessage objectMessage = session.createObjectMessage(user);
return objectMessage;
}
});
}
- 消费
@JmsListener(destination = "${activemq.name}")
public void receiveMessage(Message message){
if(message instanceof ObjectMessage){
ObjectMessage objectMessage = (ObjectMessage)message;
try {
User user = (User)objectMessage.getObject();
System.out.println(user.getUsername());
System.out.println(user.getPassword());
} catch (JMSException e) {
e.printStackTrace();
}
}
}
- 注意:ActiveMQ5.12后 ,为了安全考虑,ActiveMQ默认不接受自定义的序列化对象,需要将自定义的加入到受信任的列表。
spring:
activemq:
broker-url: tcp://192.168.66.133:61616
user: admin
password: admin
packages:
trust-all: true # 添加所有包到信任列表
BytesMessage
- 生产
//发送BytesMessage消息
@Test
public void test3(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
try {
File file = new File("d:/spring.jpg");
FileInputStream in = new FileInputStream(file);
byte[] bytes = new byte[(int)file.length()];
in.read(bytes);
bytesMessage.writeBytes(bytes);
} catch (Exception e) {
e.printStackTrace();
}
return bytesMessage;
}
});
}
- 消费
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
BytesMessage bytesMessage = (BytesMessage)message;
FileOutputStream out = new FileOutputStream("d:/abc.jpg");
byte[] buf = new byte[(int)bytesMessage.getBodyLength()];
bytesMessage.readBytes(buf);
out.write(buf);
out.close();
}
StreamMessage
- 生产
//发送StreamMessage消息
@Test
public void test4(){
jmsTemplate.send(name, new MessageCreator() {
@Override
public Message createMessage(Session session) throws JMSException {
StreamMessage streamMessage = session.createStreamMessage();
streamMessage.writeString("你好,ActiveMQ");
streamMessage.writeInt(20);
return streamMessage;
}
});
}
- 消费
@JmsListener(destination="${activemq.name}")
public void receiveMessage(Message message) throws Exception {
StreamMessage streamMessage = (StreamMessage)message;
String str = streamMessage.readString();
int i = streamMessage.readInt();
System.out.println(str);
System.out.println(i);
}
JMS消息属性
我们可以给消息设置自定义属性,这些属性主要是提供给应用程序的。对于实现消息过滤功能,消息属性非常有用,JMS API定义了一些标准属性,JMS服务提供者可以选择性的提供部分标准属性。
message.setStringProperty("Property",Property); //自定义属性
JMS 消息可靠性
消息持久化
什么是持久化消息?
保证消息只被传送一次和成功使用一次。在持久性消息传送至目标时,消息服务将其放入持久性数据存储。如果消息服务由于某种原因导致失败,它可以恢复此消息并将此消息传送至相应的消费者。虽然这样增加了消息传送的开销,但却增加了可靠性。
存储方式
ActiveMQ提供了以下三种的消息存储方式:
- Memory 消息存储-基于内存的消息存储。
- AMQ Message Store, 基于文件的存储机制,是以前的默认机制,现在不再使用
- 基于日志消息存储方式,KahaDB是ActiveMQ的默认日志存储方式,它提供了容量的提升和恢复能力。
- 基于JDBC的消息存储方式-数据存储于数据库(例如:MySQL)中。
ActiveMQ持久化流程图
AMQ Message Store
AMQ 消息存储
基于文件的存储机制,是以前的默认机制,现在不再使用。
AMQ是一种文件存储形式,它具有写入速度快和容易恢复的特点。消息存储再一个个文件中文件的默认大小为32M,当一个文件中的消息已经全部被消费,那么这个文件将被标识为可删除,在下一个清除阶段,这个文件被删除。AMQ适用于ActiveMQ5.3之前的版本。
kahaDB Message Store
官网文档:http://activemq.aache.org/kahadb
KahaDB是从ActiveMQ5.4(含)开始默认的持久化插件,可用于任何场景,提高了性能和恢复能力。消息存储使用一个事务日志和仅仅用一个索引文件来存储它所有的地址。
KahaDB是一个专门针对消息持久化的解决方案.它对典型的消息使用模式进行了优化。数据被追加到data logs中。当不再需要log文件中的数据的时候,log文件会被丢弃。
存储原理
kahadb在消息保存目录中只有4类文件和一个lock,跟ActiveMQ的其他几种文件存储引擎相比这就非常简洁了。
- db-.og KahaDB存储消息到预定义大小的数据记录文件中,文件命名为db-.og。当数据文件已满时,一个新的文件会随之创建,number数值也会随之递增,它随着消息数量的增多,如每32M一个文件,文件名按照数字进行编号,如db-1.og、db-2.og、db-3.og·"。当不再有引用到数据文件中的任何消息时,文件会被删除或归档。
- db.data 该文件包含了持久化的BTree索引,索引了消息数据记录中的消息,它是消息的索引文件,本质上是B-Tree (B树),使用B-Tree作为家引指向db-.log里面存储的消息。
- db.free 当前db.data文件里哪些页面是空闲的,文件具体内容是所有空闲页的ID
- db.redo 用来进行消息恢复,如果KahaDB消息存储在强制退出后启动,用于恢复BTree索引。
- lock文件锁,表示当前获得kahadb读写权限的broker。
配置
- 配置文件activemq.xml中。日志文件的存储目录在:%activemq安装目录%/data/kahadb
<persistenceAdapter>
<kahaDB directory="${activemq.data}/kahadb"/>
</persistenceAdapter>
LevelDB Message Store
这种文件系统是从ActiveMQ5.8之后引进的,它和KahaDB非常相似,也是基于文件的本地数据库储在它提供比KahaDB更快的持久性。
但它不使用自定义B-Tree实现来索引预写日志,而是使用基于LevelDB的索引。
由于还推出了Replicated LevelDB,且其余各种原因,暂时不作为默认推荐的存储方式。官网还是推荐默认使用kahaDB。
配置
- 配置文件activemq.xml中。
< persistenceAdapter >
<levelDBdirectory = "activemq-data" />
</persistenceAdapter >
JDBC Message Store
- 添加mysql数据库的驱动包到lib文件夹
- 修改activemq.xml
<!--配置数据库连接池-->
<bean name="mysql-ds" class="com.alibaba.druid.pool.DruidDataSource" destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver" />
<property name="url"value="jdbc:mysql://192.168.66.133:3306/db_activemq" />
<property name="username" value="root" />
<property name="password" value="123456"/>
</bean>
<!--JDBC Jdbc用于master/slave模式的数据库分享 -->
<persistenceAdapter>
<!--createTableOnStartup建议在第一次启动开启后关闭 -->
<jdbcPersistenceAdapter dataSource="#mysql-ds" createTableOnStartup="true"/>
</persistenceAdapter>
默认是的dbcp数据库连接池,如果要换成其他数据库连接池,需要将该连接池jar包,也放到lib目录下。
- 需要我们准备一个mysql数据库,并创建一个名为activemq的数据库。新建的数据库要采用latin1 或者ASCII编码。
https://blog.csdn.net/JeremyJiaming/article/details/88734762- ACTIVEMQ_MSGS
如果是queue,在没有消费者消费的情况下会将消息保存到activemq msgs表中,只要有任意一个消费者已经消费过了消费之后这些消息将会立即被删除。 - ACTIVEMQ_ACKS
如果是topic,一般是先启动消费订阅然后再生产的情况下会将消息保存到activemq acks。 - ACTIVEMQ_LOCK
- ACTIVEMQ_MSGS
- 重启activemq
JDBC Message Store with ActiveMQ Journal
这种方式克服了JDBC Store的不足,JDBC每次消息过来,都需要去写库读库。ActiveMQ Journal,使用高速缓存写入技术,大大提高了性能。当消费者的速度能够及时跟上生产者消息的生产速度时,journal文件能够大大减少需要写入到DB中的消息。
举个例子:生产者生产了1000条消息,这1000条消息会保存到journal文件,如果消费者的消费速度很快的情况下,在journal文件还没有同步到DB之前,消费者已经消费了90%的以上消息,那么这个时候只需要同步剩余的10%的消息到DB。如果消费者的速度很慢,这个时候journal文件可以使消息以批量方式写到DB。
为了高性能,这种方式使用日志文件存储+数据库存储。先将消息持久到日志文件,等待一段时间再将未消费的消息持久到数据库。该方式要比JDBC性能要高。
Queue(PTP模式)持久化与非持久化
Queue非持久,当服务器宕机,消息不存在(消息丢失了)。即便是非持久,消费者在不在线的话,消息也不会丢失,等待消费者在线,还是能够收到消息的。
Queue持久化,当服务器宕机,消息依然存在。queue消息默认是持久化的。(默认配置)
Topic(Pub/Sub模式)持久化与非持久化
topic默认就是非持久化的,因为生产者生产消息时,消费者也要在线,这样消费者才能消费到消息。
topic消息持久化,只要消费者向MQ服务器注册过,所有生产者发布成功的消息,该消费者都能收到,不管是MQ服务器宕机还是消费者不在线。
- 消费者需要提前接入配置
// 设置客户端ID。向MQ服务器注册自己的名称
connection.setClientID("marrry");
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Topic topic = session.createTopic(TOPIC_NAME);
// 创建一个topic订阅者对象。一参是topic,二参是订阅者名称
TopicSubscriber topicSubscriber = session.createDurableSubscriber(topic,"remark...");
注意:
- 一定要先运行一次消费者,等于向MQ注册,类似我订阅了这个主题。
- 然后再运行生产者发送消息。
- 之后无论消费者是否在线,都会收到消息。如果不在线的话,下次连接的时候,会把没有收过的消息都接收过来。
控制台显示
消息的事务性
消息事务,是保证消息传递原子性的一个重要特征,和JDBC的事务特征类似。
一个事务性发送,其中一组消息要么能够全部保证到达服务器,要么都不到达服务器。
生产者、消费者与消息服务器直接都支持事务性;
ActionMQ的事务主要偏向在生产者的应用。
ActionMQ消息事务流程图:
生产者事务
- 手动提交方式
/**
* 事务性发送--方案一
*/
@Test
public void sendMessageTx(){
//获取连接工厂
ConnectionFactory connectionFactory =jmsMessagingTemplate.getConnectionFactory();
Session session = null;
try {
//创建连接
Connection connection = connectionFactory.createConnection();
/**
* 参数一:是否开启消息事务
*/
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
//创建生产者
MessageProducer producer = session.createProducer(session.createQueue(name));
for(int i=1;i<=10;i++){
//模拟异常
if(i==4){
int a = 10/0;
}
TextMessage textMessage = session.createTextMessage("消息--" +i);
producer.send(textMessage);
}
//注意:一旦开启事务发送,那么就必须使用commit方法进行事务提交,否则消息无法到达
MQ服务器
session.commit();
} catch (JMSException e) {
e.printStackTrace();
//消息事务回滚
try {
session.rollback();
} catch (JMSException e1) {
e1.printStackTrace();
}
}
}
- 配置自动开启事务
- 配置类
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.connection.JmsTransactionManager;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.transaction.PlatformTransactionManager;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean
public PlatformTransactionManager transactionManager(ConnectionFactory connectionFactory) {
return new JmsTransactionManager(connectionFactory);
}
}
- 业务类
package com.itheima.producer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
/**
* 消息发送的业务类
*/
@Service
public class MessageService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Value("${activemq.name}")
private String name;
@Transactional // 对消息发送加入事务管理(同时也对JDBC数据库的事务生效)
public void sendMessage(){
for(int i=1;i<=10;i++) {
//模拟异常
if(i==4){
int a = 10/0;
}
jmsMessagingTemplate.convertAndSend(name, "消息---"+i);
}
}
}
- 测试发送方法
@Autowired
private MessageService messageService;
/**
* 事务性发送--方案二: Spring的JmsTransactionManager功能
*/
@Test
public void sendMessageTx2(){
messageService.sendMessage();
}
生产者开启事务后,执行commit方法,这批消息才真正的被提交。不执行commit方法,这批消息不会提交。执行rollback方法,之前的消息会回滚掉。生产者的事务机制,要高于签收机制,当生产者开启事务,签收机制不再重要。
消费者事务
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory = "jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage,Session session) throws JMSException {
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重发:" + textMessage.getJMSRedelivered());
int i = 100/0; //模拟异常
session.commit();//提交事务
} catch (JMSException e) {
try {
session.rollback();//回滚事务
} catch (JMSException e1) {
}
e.printStackTrace();
}
}
}
消费者开启事务后,执行commit方法,这批消息才算真正的被消费。不执行commit方法,这些消息不会标记已消费,下次还会被消费。执行rollback方法,是不能回滚之前执行过的业务逻辑,但是能够回滚之前的消息,回滚后的消息,下次还会被消费。消费者利用commit和rollback方法,甚至能够违反一个消费者只能消费一次消息的原理。
问题
(1) 问:消费者和生产者需要同时操作事务才行吗?
答:消费者和生产者的事务,**完全没有关联,各自是各自的事务。**
消息确认(签收)机制
JMS消息只有在被确认之后,才认为已经被成功地消费了。消息的成功消费通常包含三个阶段:客户接收消息、客户处理消息和消息被确认。在事务性会话中,当一个事务被提交的时候,确认自动发生。在非事务性会话中,消息何时被确认取决于创建会话时的应答模式(acknowledgement mode)。该参数有以下三个可选值:
值 | 描述 |
---|---|
Session.AUTO_ACKNOWLEDGE | 当客户成功的从receive方法返回的时候,或者从MessageListener.onMessage方法成功返回的时候,会话自动确认客户收到的消息 |
Session.CLIENT_ACKNOWLEDGE | 客户通过消息的acknowledge方法确认消息。需要注意的是,在这种模式中,确认是在会话层上进行:确认一个被消费的消息将自动确认所有已被会话消费的消息。例如,如果一个消息消费者消费了10个消息,然后确认第5个消息,那么所有10个消息都被确认 |
Session.DUPS_ACKNOWLEDGE | 该选择只是会话迟钝确认消息的提交。如果JMS provider失败,那么可能会导致一些重复的消息。如果是重复的消息,那么JMS provider必须把消息头的JMSRedelivered字段设置为true |
Session.SESSION_TRANSACTED | 开始事务的情况下,可以使用该方式。该种方式很少使用到。 |
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
注意:消息确认机制与事务机制是冲突的,只能选其中一种。所以演示消息确认前,先关闭事务。
**事务偏向生产者,签收偏向消费者**。也就是说,生产者使用事务更好点,消费者使用签收机制更好点。
auto_acknowledge 自动确认
- 添加配置类:
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory=new
DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(false); // 不开启事务操作
factory.setSessionAcknowledgeMode(1); //自动确认
return factory;
}
}
- 消费者
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory ="jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage){
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
+ textMessage.getJMSRedelivered());
throw new RuntimeException("test");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
如果消费方接收消息失败,JMS服务器会重发消息,默认重发6次。
client_acknowledge 手动确认
- 配置类
package com.itheima;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.SimpleJmsListenerContainerFactory;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.ConnectionFactory;
import javax.jms.Session;
/**
*
*/
@Configuration
public class ActiveMqConfig {
@Bean(name="jmsQueryListenerFactory")
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory){
DefaultJmsListenerContainerFactory factory=new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(false); // 不开启事务操作
factory.setSessionAcknowledgeMode(4); //手动确认
return factory;
}
}
- 消费者
package com.itheima.consumer;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.Session;
import javax.jms.TextMessage;
/**
* 消息消费者
*/
@Component
public class Consumer {
/**
* 接收消息的方法
*/
@JmsListener(destination="${activemq.name}",containerFactory ="jmsQueryListenerFactory")
public void receiveMessage(TextMessage textMessage){
try {
System.out.println("消息内容:" + textMessage.getText() + ",是否重发:"
+ textMessage.getJMSRedelivered());
textMessage.acknowledge(); // 确认收到消息,一旦消息确认,消息不会重新发送
throw new RuntimeException("test");
} catch (JMSException e) {
e.printStackTrace();
}
}
}
JMS 消息投递方式
异步投递
ActiveMQ支持同步、异步两种发送的模式将消息发送到broker,模式的选择对发送延时有巨大的影响。producer能达到怎样的产出率(产出率=发送数据总量/时间)主要受发送延时的影响,使用异步发送可以显著的提高发送的性能。
同步发送
消息生产者使用持久(Persistent)传递模式发送消息的时候,Producer.send() 方法会被阻塞,直到broker 发送一个确认消息给生产者(ProducerAck),这个确认消息暗示broker已经成功接收到消息并把消息保存到二级存储中。
异步发送
如果应用程序能够容忍一些消息的丢失,那么可以使用异步发送。异步发送不会在受到broker的确认之前一直阻塞 Producer.send方法。我们通常在发送消息量比较密集的情况下使用异步发送,它可以很大的提升Producer性能。不过这也带来了额外的问题,就是需要消耗较多的Client端内存同时也会导致broker端性能消耗增加。
想要使用异步,在brokerURL中增加 jms.alwaysSyncSend=false&jms.useAsyncSend=true属性
- 当alwaysSyncSend=true
系统将会忽略useAsyncSend设置的值都采用同步。 - 当alwaysSyncSend=false
- NON_PERSISTENT”(非持久化)、事务中的消息将使用“异步发送
- 如果useAsyncSend=true,“PERSISTENT”类型的消息使用异步发送。
如果useAsyncSend=false,“PERSISTENT”类型的消息使用同步发送
总结: 默认情况(alwaysSyncSend=false,useAsyncSend=false),非持久化消息、事务内的消息均采用异步发送;对于持久化消息采用同步发送!
配置异步投递
//1.在连接上配置
new ActiveMQConnectionFactory("tcp://locahost:61616?jms.useAsyncSend=true");
//2.通过ConnectionFactory
((ActiveMQConnectionFactory)connectionFactory).setUseAsyncSend(true);
//3.通过connection
((ActiveMQConnection)connection).setUseAsyncSend(true);
注意:如果是Spring或SpringBoot项目,通过修改JmsTemplate的默认参数实现异步或同步投递
@Configuration
public class ActiveConfig {
/**
* 配置用于异步发送的非持久化JmsTemplate
*/
@Autowired
@Bean
public JmsTemplate asynJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
template.setExplicitQosEnabled(true);
template.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
return template;
}
/**
* 配置用于同步发送的持久化JmsTemplate
*/
@Autowired
@Bean
public JmsTemplate synJmsTemplate(PooledConnectionFactory pooledConnectionFactory) {
JmsTemplate template = new JmsTemplate(pooledConnectionFactory);
return template;
}
异步投递如何确认
由于消息不阻塞,生产者会认为所有 send 的消息均被成功发送至 MQ。如果 MQ 突然宕机,此时生产者端内存中尚未被发送至 MQ 的消息都会丢失。
这时,可以给异步投递方法接收回调,以确认消息是否发送成功!
import org.apache.activemq.ActiveMQConnection;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.ActiveMQMessageProducer;
import org.apache.activemq.AsyncCallback;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Async";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
activeMQConnectionFactory.setUseAsyncSend(true);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
ActiveMQMessageProducer activeMQMessageProducer = (ActiveMQMessageProducer)session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
//添加回调
textMessage.setJMSMessageID(UUID.randomUUID().toString()+"orderAtguigu");
final String msgId = textMessage.getJMSMessageID();
activeMQMessageProducer.send(textMessage, new AsyncCallback() {
public void onSuccess() {
System.out.println("成功发送消息Id:"+msgId);
}
public void onException(JMSException e) {
System.out.println("失败发送消息Id:"+msgId);
}
});
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
activeMQMessageProducer.close();
session.close();
connection.close();
}
}
}
控制台观察发送消息的信息:
延迟投递和定时投递
官网文档:http://activemq.apache.org/delay-and-schedule-message-delivery.html
四大属性
属性 | 值类型 | 描述 |
---|---|---|
AMQ_SCHEDULED_DELAY | long | 延迟投递的时间(毫秒) |
AMQ_SCHEDULED_PERIOD | long | 重复投递的时间间隔(毫秒) |
AMQ_SCHEDULED_REPEAT | int | 重复投递次数 |
AMO_SCHEDULED_CRON | String | Cron表达式(注:此corn表达式并非Quartz框架中的corn表达式,而是linux中corntab中的表达 式,基本顺序是"分(0-59) 时(0-23) 日(1-31) 月(1-12) 星期几(1-7) ") |
除了基本的时间单位外,cron 表达式还支持一些特殊的字符,如: |
- 表示所有可能的值,例如,在第五个字段中表示每个月都执行。
? 表示不指定值,例如,?在第六个字段中表示不关心周几。
- 表示范围,例如,1-5 在第四个字段中表示 1 号到5 号都执行表示列表,例如,1,3,5 在第四个字段中表示 1 号、3 号、5 号.都执行。
/ 表示步长,例如,/5 在第一字段中表示每 5 秒执行一次。
示例:https://www.ngui.cc/el/2728544.html?action=onClick
http://tech.zhiding.net/tech/2023/0430/59884.html |
配置
- 修改activemq.xml
<broker xmlns="http://activemq.apache.org/schema/core" ...
schedulerSupport="true" >
......
</broker>
注意:添加schedulerSupport="true"配置
- 代码
import org.apache.activemq.*;
import javax.jms.*;
import java.util.UUID;
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "Schedule01";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer messageProducer = session.createProducer(queue);
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
// 延迟的时间
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重复投递的时间间隔
textMessage.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重复投递的次数
textMessage.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
// 此处的意思:该条消息,等待10秒,之后每5秒发送一次,重复发送3次。
messageProducer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
messageProducer.close();
session.close();
connection.close();
}
}
}
使用 JmsTemplate结合 MessagePostProcessor
long delay = 10*1000;
long period = 5*1000;
int repeat = 3 ;
this.jmsTemplate.convertAndSend(this.getDestination(), t, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws JMSException {
// 延迟的时间
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay);
// 重复投递的时间间隔
message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period);
// 重复投递的次数
message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
return message;
}
});
发送成功后,可以登录activemq的webconsole查看消息的属性:
在scheduled面板中,可以看到延时的消息
注:在开启消息持久化存储的前提下,就算把相应的queue在webconsole面板中删除(即删除队列),只要投递的时间尚未到,该消息也不会删除,仍然能正常延时投递。
此外,在queues面板中,如何查看某条具体的消息,也可以通过属性发现这条消息是延时消息,参考下图:
延迟/定时任务删除
通过ActiveMQ提供的ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION
,可以将延迟/定时计划移除。
//在消费的时候获取当计划id
String scheduled_id = objectMessage.getStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID);
this.jmsTemplate.convertAndSend(ScheduledMessage.AMQ_SCHEDULER_MANAGEMENT_DESTINATION, scheduled_id,
message -> {
//通过添加之前添加的消息标识
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULER_ACTION, ScheduledMessage.AMQ_SCHEDULER_ACTION_REMOVE);
message.setStringProperty(ScheduledMessage.AMQ_SCHEDULED_ID, scheduled_id);
return message;
});
JMS 消息的重试机制
官网文档:http://activemq.apache.org/redelivery-policy
消费者收到消息,之后出现异常了,没有告诉broker确认收到该消息,broker会尝试再将该消息发送给消费者。尝试n次,如果消费者还是没有确认收到该消息,那么该消息将被放到死信队列重,之后broker不会再将该消息发送给消费者。
具体哪些情况会引发消息重发
- Client用了transactions且再session中调用了rollback
- Client用了transactions且再调用commit之前关闭或者没有commit
- Client在CLIENT_ACKNOWLEDGE的传递模式下,session中调用了recover
有毒消息(Poison ACK)
一个消息被redelivedred超过默认的最大重发次数(默认6次)时,消费的回个MQ发一个“poison ack”表示这个消息有毒,告诉broker不要再发了。这个时候broker会把这个消息放到DLQ(死信队列)
属性说明
属性 | 描述 |
---|---|
collisionAvoidanceFactor | 设置防止冲突范围的正负百分比,只有启用useColisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。默认值为0.15。 |
maximumRedeliveries | 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。默认值为6 |
maximumRedeliveryDelay | 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。默认为-1。 |
initialRedeliveryDelay | 初始重发延迟时间,默认1000L |
redeliveryDelay | 重发延迟时间,当initialRedeliveryDelay=0时生效,默认1000L |
useCollisionAvoidance | 启用防止冲突功能,默认false |
useExponentialBackOff | 启用指数倍数递增的方式增加延迟时间,默认false |
backOfMutiolier | 重连时间间隔递增倍数,只有值大于1和启用useExponentalBackOff参数时大生效。默认是5 |
配置重试机制
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.RedeliveryPolicy;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61626";
private static final String ACTIVEMQ_QUEUE_NAME = "dead01";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
// 修改默认参数,设置消息消费重试3次
RedeliveryPolicy redeliveryPolicy = new RedeliveryPolicy();
redeliveryPolicy.setMaximumRedeliveries(3);
activeMQConnectionFactory.setRedeliveryPolicy(redeliveryPolicy);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
final Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
TextMessage textMessage = (TextMessage) message;
try {
System.out.println("***消费者接收到的消息: " + textMessage.getText());
//session.commit();
}catch (Exception e){
e.printStackTrace();
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
JMS 消息的死信队列
官网文档: http://activemq.apache.org/redelivery-policy
异常消息规避处理的集合,主要处理失败或者过期的消息。
配置
- 共享死信策略(sharedDeadLetterStrategy)
不管是queue还是topic,失败的消息都放到这个队列中。下面修改activemq.xml的配置,可以达到修改队列的名字。
<deadLetterStrategy>
<sharedDeadLetterStrategy deadLetterQueue="DLQ-QUEUE"/>
</deadLetterStrategy>
- 个人死信策略(individualDeadLetterStrategy)
可以为queue和topic单独指定两个死信队列。还可以为某个话题,单独指定一个死信队列。
<destinationPolicy>
<policyMap>
<policyEntries>
<policyEntry queue=">">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true" />
</deadLetterStrategy>
</policyEntry>
<policyEntry topic=">" >
<pendingMessageLimitStrategy>
<constantPendingMessageLimitStrategy limit="1000"/>
</pendingMessageLimitStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>
把DeedLetter放入各自的死信通道中,
对于Queue而言,死信通道的前缀默认为“ActiveMQ.DLQ.Queue.”
对于Topic而言,死信通道的前缀默认为“ActiveMQ.DLQ.Topic.”
比如队列Order,那么它对应的死信通道为“ActiveMQ.DLQ.Queue.Order’。我们使用“queuePrefix”“topicPrefix”来指定上述前缀。
默认情况下,无论是Topic还是Queue,broker将使用Queue来保存DeadLeader,即死信通道通常为Queue: 不过开发者也可以指定为Topic。
<policyEntry queue="order">
<deadLetterStrategy>
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="false" />
</deadLetterStrategy>
</policyEntry>
将队列Order中出现的DeadLetter保存在DLQ.Order中,不过此时DLQ.Order为Topic。
属性“useQueueForTopicMessages”,此值表示是否将Topic的DeadLetter保存在Queue中,默认为true。
自动删除过期消息
过期消息是值生产者指定的过期时间,超过这个时间的消息。
有时需要直接删除过期的消息而不需要发送到死队列中,“processExpired”表示是否将过期消息放入死信队列,默认为true:
<policyEntry queue= ">" >
<deadletterStrategy>
<sharedDeadLetterStrategy processExpired= "false" />
</deadLetterStrategy>
</policyEntry>
存放非持久消息到死信队列中
默认情况下,Activemq不会把非持久的死消息发送到死信队列中。
processNonPersistent”表示是否将“非持久化”消息放入死信队列,默认为false。
非持久性如果你想把非持久的消息发送到死队列中,需要设置属性processNonPersistent=“true"
<policyEntry queue= ">" >
<deadLetterStrategy>
<sharedDeadLetterStrategy processNonPersistent= "true" />
</deadLetterStrategy>
</policyEntry>
消息的幂等性
网络延迟传输中,会造成进行MQ重试中,在重试过程中,可能会造成重复消费。
如果消息是做数据库的插入操作,给这个消息做一个唯一主键,那么就算出现重复消费的情况,就会导致主键冲突,避免数据库出现脏数据。
如果上面两种情况还不行,准备-个第三服务方来做消费记录。以redis为例,给消息分配一个全局id,只要消费过该消息,将<id,message>以K-V形式写入redis。那消费者开始消费前,先去redis中查询有没消费记录即可。
扩展
broker 多实例
- broker是什么
相当于一个ActiveMQ服务器实例。
使用配置文件
启动broker时指定配置文件,可以帮助我们在一台服务器上启动多个broker。实际工作中一般一台服务器只启动一个broker。
使用嵌入式方式
用ActiveMQ Broker作为独立的消息服务器来构建Java应用。ActiveMQ也支持在vm中通信基于嵌入的broker,能够无缝的集成其他java应用,以便随时用随时启动,在用的时候再去启动这样能节省了资源,也保证了可用性。这种方式,我们实际开发中很少采用,因为他缺少太多了东西,如:日志,数据存储等等。
- 依赖引用
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.10.1</version>
</dependency>
- 嵌入式broker的启动类
import org.apache.activemq.broker.BrokerService;
public class EmbedBroker {
public static void main(String[] args) throws Exception {
//ActiveMQ也支持在vm中通信基于嵌入的broker
BrokerService brokerService = new BrokerService();
brokerService.setPopulateJMSXUserID(true);
brokerService.addConnector("tcp://127.0.0.1:61616");
brokerService.start();
}
}
ActiveMQ的传输协议
ActiveMQ支持的client-broker通讯协议有:TCP、NIO、UDP、SSL、Http(s)、VM。其中配置Transport Connector的文件在ActiveMQ安装目录的conf/activemq.xml中的标签之内。
activemq传输协议的官方文档:http://activemq.apache.org/configuring-version-5-transports.html
<transportConnectors>
<transportConnector name="openwire" uri="tcp://0.0.0.0:61616?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="amqp" uri="amqp://0.0.0.0:5672?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="stomp" uri="stomp://0.0.0.0:61613?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="mqtt" uri="mqtt://0.0.0.0:1884?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
<transportConnector name="ws" uri="ws://0.0.0.0:61614?maximumConnections=1000&wireFormat.maxFrameSize=104857600"/>
</transportConnectors>
在上文给出的配置信息中,URI描述信息的头部都是采用协议名称:例如
描述amqp协议的监听端口时,采用的URI描述格式为“amqp://······”;
描述Stomp协议的监听端口时,采用URI描述格式为“stomp://······”;
唯独在进行openwire协议描述时,URI头却采用的“tcp://······”。这是因为ActiveMQ中默认的消息协议就是openwire
支持的传输协议
除了tcp和nio协议,其他的了解就行。各种协议有各自擅长该协议的中间件,工作中一般不会使用activemq去实现这些协议。如: mqtt是物联网专用协议,采用的中间件一般是mosquito。ws是websocket的协议,是和前端对接常用的,一般在java代码中内嵌一个基站(中间件)。stomp好像是邮箱使用的协议的,各大邮箱公司都有基站(中间件)。
协议 | 描述 |
---|---|
TCP | 默认的协议,性能相对可以 |
NIO | 基于代P协议之上的,进行了扩展和优化,具有更好的扩展性 |
UDP | 性能比TCP更好,但是不具有可靠性 |
SSL | 安全链接 |
HTTP(S) | 基于HTTP或者HTTPS |
VM | VM本身不是协议,当客户端和代理在同一个Java虚拟机(VM)中运行时,他们之间需要通信,但不想占用网络通道,而是直接通信,可以使用该方式 |
TCP协议
- Transmission Control Protocol(TCP)是默认的。TCP的Client监听端口61616
- 在网络传输数据前,必须要先序列化数据,消息是通过一个叫wire protocol的来序列化成字节流。
- TCP连接的URI形式如:tcp://HostName:port?key=value&key=value,后面的参数是可选的。
关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/tcp-transport-reference
TCP传输的的优点:
TCP协议传输可靠性高,稳定性强
高效率:字节流方式传递,效率很高
有效性、可用性:应用广泛,支持任何平台
NIO协议
- New I/O API Protocol(NIO)
- NIO协议和TCP协议类似,但NIO更侧重于底层的访问操作。它允许开发人员对同一资源可有更多的client调用和服务器端有更多的负载。
- 适合使用NIO协议的场景:
- 可能有大量的Client去连接到Broker上,一般情况下,大量的Client去连接Broker是被操作系统的线程所限制的。因此,NIO的实现比TCP需要更少的线程去运行,所以建议使用NIO协议。
- 可能对于Broker有一个很迟钝的网络传输,NIO比TCP提供更好的性能。
- NIO连接的URI形式:nio://hostname:port?key=value&key=value
关于Transport协议的可选配置参数可以参考官网http://activemq.apache.org/configuring-version-5-transports.html
AMQP协议
STOMP协议
MQTT协议
NIO案例
ActiveMQ这些协议传输的底层默认都是使用BIO网络的IO模型。只有当我们指定使用nio才使用NIO的IO模型。
所以为了首先提高单节点的网络吞吐性能,我们需要明确指定Active的网络IO模型。
- 修改配置文件activemq.xml
- 修改配置文件activemq.xml在 节点下添加如下内容:
- 修改完成后重启activemq: service activemq restart
- 查看管理后台,可以看到页面多了nio
- 修改配置文件activemq.xml在 节点下添加如下内容:
- 生产者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
public class Jms_TX_Producer {
//仅此处不同
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageProducer producer = session.createProducer(queue);
try {
for (int i = 0; i < 3; i++) {
TextMessage textMessage = session.createTextMessage("tx msg--" + i);
producer.send(textMessage);
}
System.out.println("消息发送完成");
} catch (Exception e) {
e.printStackTrace();
} finally {
//8.关闭资源
producer.close();
session.close();
connection.close();
}
}
}
- 消费者
package com.activemq.demo;
import org.apache.activemq.ActiveMQConnectionFactory;
import javax.jms.*;
import java.io.IOException;
public class Jms_TX_Consumer {
//仅此处不同
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61618";
private static final String ACTIVEMQ_QUEUE_NAME = "nio-test";
public static void main(String[] args) throws JMSException, IOException {
ActiveMQConnectionFactory activeMQConnectionFactory = new ActiveMQConnectionFactory(ACTIVEMQ_URL);
Connection connection = activeMQConnectionFactory.createConnection();
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = session.createQueue(ACTIVEMQ_QUEUE_NAME);
MessageConsumer messageConsumer = session.createConsumer(queue);
messageConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
if (message instanceof TextMessage) {
try {
TextMessage textMessage = (TextMessage) message;
System.out.println("***消费者接收到的消息: " + textMessage.getText());
} catch (Exception e) {
System.out.println("出现异常,消费失败,放弃消费");
}
}
}
});
System.in.read();
messageConsumer.close();
session.close();
connection.close();
}
}
NIO增强
URI格式头以”nio”开头,表示这个端口使用以TCP协议为基础的NIO网络IO模型。但是这样的设置方式,只能使这个端口支持openwire协议。如何让其他协议传输底层也使用NIO网络IO模型呢?
- 使用auto关键字
- 使用“+”符号来为端口设置多种特性
- 如果我们既需要某一个端口支持NIo网络io模型,又需要它支持多个协议
修改配置文件activemq.xml
<transportConnectors>
<transportConnector name="auto+nio" uri="auto+nio://0.0.0.0:61608?maximumConnections=1000&wireFormat.maxFrameSize=104857600&org.apache.activemq.transport.nio.SelectorManager.corePoolSize=20&org.apache.activemq.transport.nio.Se1ectorManager.maximumPoo1Size=50"/>
</transportConnectors>
auto : 针对所有的协议,他会识别我们是什么协议。
nio:使用NIO网络IO模型
修改配置文件后重启activemq。
代码
//使用nio模型的tcp协议生产者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
//使用nio模型的tcp协议消费者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "tcp://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
//使用nio模型的nio协议生产者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Producer {
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException {
......
}
}
//使用nio模型的nio协议消费者。除灰色背景外,其他代码和之前一样
public class Jms_TX_Consumer {
private static final String ACTIVEMQ_URL = "nio://118.24.20.3:61608";
private static final String ACTIVEMQ_QUEUE_NAME = "auto-nio";
public static void main(String[] args) throws JMSException, IOException {
......
}
}
ActiveMQ多节点集群
从5.9开始提供了LevelDB和Zookeeper的数据复制方法,用于Master-slave方式的首选数据复制方案。