1. 环境搭建
1.1 env-version
JDK | 1.8 |
Spring | 2.7.13 |
Maven | 3.6 |
ActiveMQ | 5.15.2 |
1.2 docker-compose.yml
version: '3.8'
services:
activemq:
image: rmohr/activemq:5.16.3
container_name: activemq
ports:
- "61616:61616"
- "8161:8161"
environment:
- ACTIVEMQ_ADMIN_LOGIN=admin
- ACTIVEMQ_ADMIN_PASSWORD=admin
- ACTIVEMQ_CONFIG_MINMEMORY=512
- ACTIVEMQ_CONFIG_MAXMEMORY=2048
# volumes:
# - ./data/activemq:/var/activemq/data
# - ./conf/activemq.xml:/var/activemq/conf/activemq.xml
networks:
- activemq-network
networks:
activemq-network:
driver: bridge
在这个docker-compose.yml
文件中:
-
activemq
服务使用了rmohr/activemq
Docker镜像,这是一个社区维护的ActiveMQ镜像。请确保选择一个与你的Spring Boot版本兼容的ActiveMQ版本。 -
container_name
设置了容器的名称。 -
ports
映射了ActiveMQ的JMS端口(61616)和管理控制台端口(8161)到宿主机的相同端口。 -
environment
部分设置了管理员账号和密码,以及JVM的最小和最大内存配置。这些可以根据需要进行调整。 -
volumes
部分映射了宿主机的目录到容器内部,用于持久化ActiveMQ的数据和配置文件。你需要创建相应的目录并放置你的activemq.xml
配置文件。 -
networks
定义了一个自定义网络,以便ActiveMQ服务可以连接到其他可能需要的Docker服务。
在使用这个docker-compose.yml
文件之前,请确保你已经创建了data
和conf
目录,并且在conf
目录中放置了自定义的activemq.xml
配置文件。如果不需要持久化存储,可以移除volumes
部分。
1.3 添加依赖
<!-- ActiveMQ 依赖 -->
<!-- https://mvnrepository.com/artifact/org.apache.activemq/activemq-spring -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-spring</artifactId>
<version>5.15.4</version>
<!-- 排除依赖 -->
<exclusions>
<exclusion>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- Spring Boot 与 JMS 集成的 starter -->
<!-- https://mvnrepository.com/artifact/org.springframework.boot/spring-boot-starter-activemq -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<version>2.7.12</version>
</dependency>
2. 工程结构
activemq/
├── src/
│ ├── main/
│ │ ├── java/
│ │ │ ├── com/
│ │ │ │ ├── xiaokai/
│ │ │ │ │ ├── ActiveMQApplication.java // 应用程序的主类,通常包含main方法
│ │ │ │ │ ├── config/ // 配置包
│ │ │ │ │ │ └── JmsConfig.java // ActiveMQ的配置类
│ │ │ │ │ ├── event/ // 事件包
│ │ │ │ │ │ ├── Eventinfo.java // 事件信息类:构建消息、send topic
│ │ │ │ │ ├── listener/ // 监听器包
│ │ │ │ │ │ └── MessageListener.java // 消息监听器类
│ │ │ │ │ ├── service/ // 服务包
│ │ │ │ │ │ └── ActiveMQService.java // ActiveMQ服务类
│ │ │ │ │
│ │ │ ├── resources/ // 资源文件
│ │ │ │ └── application.yml // Spring配置文件
│ │ │
│ │ ├── test/ // 测试代码
│ │ │ ├── java/ // 测试Java代码
│ │ │ │ ├── com/
│ │ │ │ │ ├── xiaokai/
│ │ │ │ │ └── ActiveMQTest.java // ActiveMQ的测试类
└── pom.xml // Maven构建配置文件(未在文件内容中列出)
ActiveMQApplication.java:项目的主类,通常包含启动Spring应用程序的main方法。
JmsConfig.java:配置ActiveMQ的Java配置类。
Eventinfo.java:可能用于表示事件信息的类。
MessageListener.java:消息监听器,用于监听并处理ActiveMQ消息。
ActiveMQService.java:服务类,可能包含与ActiveMQ交互的业务逻辑。
application.yml:Spring Boot的配置文件,用于配置应用程序的各种参数。
ActiveMQTest.java:用于测试ActiveMQ功能的测试类。
3. 示例代码
JmsConfig.java
package com.xiaokai.config;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import javax.jms.ConnectionFactory;
/**
* Author:yang
* Date:2024-10-19 15:57
*/
@Configuration
@EnableJms
public class JmsConfig {
@Bean
public ActiveMQConnectionFactory connectionFactory() {
ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory();
connectionFactory.setBrokerURL("tcp://116.198.242.56:61616");
connectionFactory.setUserName("admin");
connectionFactory.setPassword("admin");
return connectionFactory;
}
@Bean
public DefaultJmsListenerContainerFactory jmsListenerContainerFactory(ConnectionFactory connectionFactory) {
DefaultJmsListenerContainerFactory factory = new DefaultJmsListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setSessionTransacted(true);
return factory;
}
}
MessageListener.java
package com.xiaokai.listener;
import lombok.extern.slf4j.Slf4j;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
/**
* Author:yang
* Date:2024-10-19 15:55
*/
@Component
@Slf4j
public class MessageListener {
// 监听队列test.queue
@JmsListener(destination = "test.queue")
public void onMessage(String message) {
log.info("Received message: " + message);
}
}
ActiveMQService .java
package com.xiaokai.service;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsMessagingTemplate;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Service;
/**
* Author:yang
* Date:2024-10-19 16:01
*/
@Service
@Slf4j
public class ActiveMQService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
public void send(String message) {
log.info("Sending message: {}", message);
jmsMessagingTemplate.convertAndSend("test.queue", message);
}
}
注:JmsMessagingTemplate作为Spring相关bean,封装了JmsTemplate 。总的来说JmsTemplate更底层,但是在使用过程中不需要过多关注底层实现。
@Autowired private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired private JmsTemplate jmsTemplate;
application.yml
spring:
activemq:
broker-url: tcp://116.198.242.56:61616
user: admin
password: admin
注:在JmsConfig.java配置文件中配置后,可以不需要配置文件,二者选其一。
测试:
package com.xiaokai;
import com.xiaokai.service.ActiveMQService;
import lombok.extern.slf4j.Slf4j;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;
/**
* Author:yang
* Date:2024-10-21 10:28
*/
@SpringBootTest
@RunWith(SpringRunner.class)
@Slf4j
public class ActiveMQTest {
@Autowired
private ActiveMQService activeMQService;
@Test
public void testSend(){
activeMQService.send("test");
}
}
结果:
Sending message: test
Started ActiveMQApplication in 1.368 seconds (JVM running for 1.799)
Received message: test
注:在ActiveMQ提供的可视化控制台可以查看相关信息。
访问:http://116.198.242.56:8161/admin/index.jsp
4. 消息模型
4.1 P2P模型
bean
@Bean
public Destination queue() {
return new ActiveMQQueue("test.queue");
}
消息监听器
@Component
@Slf4j
public class MessageListener {
// 监听队列test.queue
@JmsListener(destination = "test.queue")
public void onMessage1(String message) {
log.info("Received queue message1: " + message);
}
// 监听队列test.queue
@JmsListener(destination = "test.queue")
public void onMessage2(String message) {
log.info("Received queue message2: " + message);
}
}
消息服务
@Service
@Slf4j
public class ActiveMQService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Destination queue;
// 发送点对点消息
public void sendP2P(String message) {
log.info("Sending queue message: {}", message);
jmsMessagingTemplate.convertAndSend(queue, message);
}
}
application.yml
spring:
activemq:
broker-url: tcp://116.198.242.56:61616
user: admin
password: admin
# true表示使用发布/订阅模式,false表示使用点对点模式
jms:
pub-sub-domain: false
结论:点对点消息模式是将消息推送到queue中,消费者通过轮训的方式消费消息
4.2 发布/订阅模型
Bean
@Bean
public Destination topic() {
return new ActiveMQTopic("test.topic");
}
消息监听器
@Component
@Slf4j
public class PubMessageListener {
// 监听主题test.topic
@JmsListener(destination = "test.topic")
public void onMessage3(String message) {
log.info("Received topic message1: " + message);
}
// 监听主题test.topic
@JmsListener(destination = "test.topic")
public void onMessage4(String message) {
log.info("Received topic message2: " + message);
}
}
消息服务
@Service
@Slf4j
public class ActiveMQService {
@Autowired
private JmsMessagingTemplate jmsMessagingTemplate;
@Autowired
private Destination topic;
// 发送发布订阅消息
public void sendPubSub(String message) {
log.info("Sending topic message: {}", message);
jmsMessagingTemplate.convertAndSend(topic, message);
}
}
application.yml
spring:
activemq:
broker-url: tcp://116.198.242.56:61616
user: admin
password: admin
# true表示使用发布/订阅模式,false表示使用点对点模式
jms:
pub-sub-domain: true
注:需要将发布/订阅开关打开
结论:发送消息后,订阅主题的消费者都能收到同一条消息去消费。
5. 消息类型
5.1 普通消息
普通消息如上述案例,生产者生产消息后,由消费者消费消息,中间不需要做额外的事情。
5.2 延迟消息
延迟消息指在生产者生产带有延迟时间的消息后,broker接收到消息后,并不立即投送到队列或者主题,而是到达延迟时间后,再将消息投送到队列、主题。
配置ActiveMQ支持延迟消息: 修改ActiveMQ的配置文件activemq.xml
,确保<broker>
标签包含schedulerSupport="true"
属性。这允许ActiveMQ的计划任务功能,从而支持延迟消息。
<broker xmlns="http://activemq.apache.org/schema/core" brokerName="localhost" dataDirectory="${activemq.data}" schedulerSupport="true">
修改配置后,需要重启ActiveMQ服务器以使更改生效。
发送延迟消息
// 发送延迟消息
public void sendDelay(String message) {
HashMap<String, Object> properties = new HashMap<>();
properties.put(ScheduledMessage.AMQ_SCHEDULED_DELAY, 10000);
log.info("Sending delay queue message: {}", message);
jmsMessagingTemplate.convertAndSend(queue, message, properties);
}
消息监听器
...
5.3 事务消息
没啥用,用ActiveMQ实现事务消息还不如不用,辣鸡(狗头保命)