ActiveMQ消息中间件的发布订阅模式 主题 topic
topic生产端案例(配合topic消费端测试):SpringBoot+ActiveMQ Topic 生产端
ActiveMQ版本:apache-activemq-5.16.5
案例源码:SpringBoot+ActiveMQ-发布订阅Demo
SpringBoot集成ActiveMQ Topic消费端的pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.5.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>boot.example.topic.customer</groupId>
<artifactId>boot-example-topic-demo-customer-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>boot-example-topic-demo-customer-2.0.5</name>
<description>Demo project for Spring Boot</description>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>boot.example.demo.entity</groupId>
<artifactId>boot-example-demo-entity-2.0.5</artifactId>
<version>0.0.1-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- activeMQ依赖组件 -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</exclusion>
</exclusions>
</dependency>
<!-- spring.activemq.pool.enabled=true -->
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-pool</artifactId>
<version>5.16.5</version>
</dependency>
<dependency>
<groupId>io.springfox</groupId>
<artifactId>springfox-swagger2</artifactId>
<version>2.9.2</version>
</dependency>
<dependency>
<groupId>com.github.xiaoymin</groupId>
<artifactId>swagger-bootstrap-ui</artifactId>
<version>1.9.2</version>
</dependency>
</dependencies>
<build>
<plugins>
<!-- 打包成一个可执行jar -->
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>repackage</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
application.properties
server.port=8044
spring.activemq.broker-url=tcp://127.0.0.1:61616
spring.activemq.user=admin
spring.activemq.password=admin
spring.activemq.in-memory=false
spring.activemq.packages.trust-all=true
spring.activemq.pool.enabled=true
spring.activemq.pool.max-connections=6
spring.activemq.pool.idle-timeout=30000
spring.activemq.pool.expire-timeout=0
spring.jms.pub-sub-domain=true
topic消费端启动类AppTopicCustomer
package boot.example.topic.customer;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.jms.annotation.EnableJms;
/**
*
* 蚂蚁舞
*/
@SpringBootApplication
@EnableJms
public class AppTopicCustomer {
public static void main( String[] args ) {
SpringApplication.run(AppTopicCustomer.class, args);
System.out.println( "Hello World!" );
}
}
ActiveMqConfig
package boot.example.topic.customer.config;
import javax.jms.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.config.DefaultJmsListenerContainerFactory;
import org.springframework.jms.config.JmsListenerContainerFactory;
@EnableJms
@Configuration
public class ActiveMqConfig {
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// pubSubDomain 表示开启发布订阅域
bean.setPubSubDomain(true);
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
}
ActiveMQConstant
package boot.example.topic.customer.config;
import boot.example.queue.entity.BootProvider;
import java.util.LinkedList;
import java.util.List;
/**
* 消息消费者(订阅方式)消费该消息
* 消费生产者将发布到topic中,同时有多个消息消费者(订阅)消费该消息
* 这种方式和点对点方式不同,发布到topic的消息会被所有订阅者消费
*
* 当生产者发布消息,不管是否有消费者,都不会保存消息
* 蚂蚁舞
*/
public class ActiveMQConstant {
// 默认Topic
public static final String defaultTopic = "myw_topic";
// 指定Topic
public static final String stringTopic = "stringTopic";
// 指定list<String>
public static final String stringListTopic = "stringListTopic";
// 指定Object
public static final String objTopic = "objTopic";
// 指定List<Object>
public static final String objListTopic = "objListTopic";
// 简单存储默认topic消费端收到的消息
public static List<String> defaultList = new LinkedList<>();
public static List<String> stringTopicList = new LinkedList<>();
public static List<List<String>> stringListTopicList = new LinkedList<>();
public static List<BootProvider> objTopicList = new LinkedList<>();
public static List<List<BootProvider>> objListTopicList = new LinkedList<>();
}
DefaultTopicConsumerService
package boot.example.topic.customer.service;
import boot.example.topic.customer.config.ActiveMQConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.JMSException;
import javax.jms.TextMessage;
/**
* 蚂蚁舞
*/
@Service
public class DefaultTopicConsumerService {
@JmsListener(destination = ActiveMQConstant.defaultTopic)
public void message(TextMessage textMessage) throws JMSException {
if(textMessage == null || textMessage.getText() == null){
return;
}
System.out.println("默认消费者:"+textMessage.getText());
ActiveMQConstant.defaultList.add(textMessage.getText());
}
}
ATopicConsumerService
package boot.example.topic.customer.service;
import boot.example.queue.entity.BootProvider;
import boot.example.topic.customer.config.ActiveMQConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.ObjectMessage;
import java.util.List;
/**
* 蚂蚁舞
*/
@Service
public class ATopicConsumerService {
@JmsListener(destination = ActiveMQConstant.stringTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("A-TopicConsumer接收到消息...." + msg);
ActiveMQConstant.stringTopicList.add(msg);
}
@JmsListener(destination = ActiveMQConstant.stringListTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("A-TopicConsumer接收到集合主题消息...." + list);
ActiveMQConstant.stringListTopicList.add(list);
}
@JmsListener(destination = ActiveMQConstant.objTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("A-TopicConsumer接收到对象主题消息...." + objectMessage.getObject());
BootProvider bootProvider = (BootProvider) objectMessage.getObject();
ActiveMQConstant.objTopicList.add(bootProvider);
}
@SuppressWarnings("unchecked")
@JmsListener(destination = ActiveMQConstant.objListTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("A-TopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
ActiveMQConstant.objListTopicList.add(list);
}
}
BTopicConsumerService
package boot.example.topic.customer.service;
import boot.example.queue.entity.BootProvider;
import boot.example.topic.customer.config.ActiveMQConstant;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Service;
import javax.jms.ObjectMessage;
import java.util.List;
@Service
public class BTopicConsumerService {
@JmsListener(destination = ActiveMQConstant.stringTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveStringTopic(String msg) {
System.out.println("B-TopicConsumer接收到消息...." + msg);
ActiveMQConstant.stringTopicList.add(msg);
}
@JmsListener(destination = ActiveMQConstant.stringListTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveStringListTopic(List<String> list) {
System.out.println("B-TopicConsumer接收到集合主题消息...." + list);
ActiveMQConstant.stringListTopicList.add(list);
}
@JmsListener(destination = ActiveMQConstant.objTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveObjTopic(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("B-TopicConsumer接收到对象主题消息...." + objectMessage.getObject());
BootProvider bootProvider = (BootProvider) objectMessage.getObject();
ActiveMQConstant.objTopicList.add(bootProvider);
}
@SuppressWarnings("unchecked")
@JmsListener(destination = ActiveMQConstant.objListTopic, containerFactory = "jmsListenerContainerTopic")
public void receiveObjListTopic(ObjectMessage objectMessage) throws Exception {
if(objectMessage == null || objectMessage.getObject() == null){return;}
System.out.println("B-TopicConsumer接收到的对象集合主题消息..." + objectMessage.getObject());
List<BootProvider> list = (List<BootProvider>) objectMessage.getObject();
ActiveMQConstant.objListTopicList.add(list);
}
}
BootDefaultTopicCustomerController
package boot.example.topic.customer.controller;
import boot.example.topic.customer.config.ActiveMQConstant;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 蚂蚁舞
*/
@RestController
@RequestMapping(value="/customer")
public class BootDefaultTopicCustomerController {
@GetMapping(value="/myw_topic")
public List<String> myw_topic() {
return ActiveMQConstant.defaultList;
}
}
BootTopicCustomerController
package boot.example.topic.customer.controller;
import boot.example.queue.entity.BootProvider;
import boot.example.topic.customer.config.ActiveMQConstant;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
/**
* 蚂蚁舞
*/
@RestController
@RequestMapping(value="/customer")
public class BootTopicCustomerController {
@GetMapping(value="/stringTopicList")
public List<String> stringTopicList() {
return ActiveMQConstant.stringTopicList;
}
@GetMapping(value="/stringListTopicList")
public List<List<String>> stringListTopicList() {
return ActiveMQConstant.stringListTopicList;
}
@GetMapping(value="/objTopicList")
public List<BootProvider> objTopicList() {
return ActiveMQConstant.objTopicList;
}
@GetMapping(value="/objListTopicList")
public List<List<BootProvider>> objListTopicList() {
return ActiveMQConstant.objListTopicList;
}
}
SwaggerConfig UI测试
package boot.example.topic.customer.config;
import com.google.common.base.Predicates;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import springfox.documentation.builders.ApiInfoBuilder;
import springfox.documentation.builders.PathSelectors;
import springfox.documentation.builders.RequestHandlerSelectors;
import springfox.documentation.service.ApiInfo;
import springfox.documentation.spi.DocumentationType;
import springfox.documentation.spring.web.plugins.Docket;
import springfox.documentation.swagger2.annotations.EnableSwagger2;
/**
* 蚂蚁舞
*/
@Configuration
@EnableSwagger2
public class SwaggerConfig {
@Bean
public Docket createRestApi(){
return new Docket(DocumentationType.SWAGGER_2).apiInfo(apiInfo()).select()
.apis(RequestHandlerSelectors.any()).paths(PathSelectors.any())
.paths(Predicates.not(PathSelectors.regex("/error.*")))
.paths(PathSelectors.regex("/.*"))
.build().apiInfo(apiInfo());
}
private ApiInfo apiInfo(){
return new ApiInfoBuilder()
.title("demo")
.description("demo接口")
.version("0.01")
.build();
}
/**
* http://localhost:XXXX/doc.html 地址和端口根据实际项目查看
*/
}
测试使用的对象BootProvider
package boot.example.queue.entity;
import java.io.Serializable;
import java.util.Date;
/**
* 用在activeMq消息,必须保证package一致,不然序列化后反序列化要出错
* 蚂蚁舞
*/
public class BootProvider implements Serializable {
private int id;
private String name;
private Date date = new Date();
public BootProvider() {
}
public BootProvider(int id, String name) {
this.id = id;
this.name = name;
}
public int getId() {
return id;
}
public void setId(int id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Date getDate() {
return date;
}
public void setDate(Date date) {
this.date = date;
}
@Override
public String toString() {
return "BootProvider{" +
"id=" + id +
", name='" + name + '\'' +
", date=" + date +
'}';
}
}
代码结构
├─boot-example-demo-entity-2.0.5
│ │ pom.xml
│ │
│ ├─src
│ │ └─main
│ │ └─java
│ │ └─boot
│ │ └─example
│ │ └─queue
│ │ └─entity
│ │ BootProvider.java
├─boot-example-topic-demo-customer-2.0.5
│ │ pom.xml
│ ├─src
│ │ ├─main
│ │ │ ├─java
│ │ │ │ └─boot
│ │ │ │ └─example
│ │ │ │ └─topic
│ │ │ │ └─customer
│ │ │ │ │ AppTopicCustomer.java
│ │ │ │ │
│ │ │ │ ├─config
│ │ │ │ │ ActiveMqConfig.java
│ │ │ │ │ ActiveMQConstant.java
│ │ │ │ │ SwaggerConfig.java
│ │ │ │ │
│ │ │ │ ├─controller
│ │ │ │ │ BootDefaultTopicCustomerController.java
│ │ │ │ │ BootTopicCustomerController.java
│ │ │ │ │
│ │ │ │ └─service
│ │ │ │ ATopicConsumerService.java
│ │ │ │ BTopicConsumerService.java
│ │ │ │ DefaultTopicConsumerService.java
│ │ │ │
│ │ │ └─resources
│ │ │ application.properties
│ │ │
│ │ └─test
│ │ └─java
│ │ └─boot
│ │ └─example
│ │ └─topic
│ │ └─customer
│ │ AppTest.java
启动后访问(ActiveMQ必须提前启动)
http://localhost:8044/doc.html
与topic发送端联合交互测试topic订阅 (不是持久化)
发送端发送
订阅端查看临时数据(不是持久化的,因此接收数据需要先启动的)
再看控制台a和b都收到了数据
持久化订阅
持久化订阅在springboot activemq里需要配置
我这里新建了一个发布端 两个订阅端 topic_client1 topic_client2 topic = myw_topic_p
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// pubSubDomain 表示开启发布订阅域
bean.setPubSubDomain(true);
// 开启持久订阅。即下线后重新上线依然能继续接收Topic消息
bean.setSubscriptionDurable(true);
// 持久订阅的Client端标识(多个端持久订阅需要保证唯一性,否则可能会出现问题)
bean.setClientId("topic_client1");
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
// topic模式的ListenerContainer
@Bean
public JmsListenerContainerFactory<?> jmsListenerContainerTopic(ConnectionFactory activeMQConnectionFactory) {
DefaultJmsListenerContainerFactory bean = new DefaultJmsListenerContainerFactory();
// pubSubDomain 表示开启发布订阅域
bean.setPubSubDomain(true);
// 开启持久订阅。即下线后重新上线依然能继续接收Topic消息
bean.setSubscriptionDurable(true);
// 持久订阅的Client端标识(多个端持久订阅需要保证唯一性,否则可能会出现问题)
bean.setClientId("topic_client2");
bean.setConnectionFactory(activeMQConnectionFactory);
return bean;
}
public static final String defaultTopic = "myw_topic_p";
代码目录
把三个应用分别启动我用了swagger UI因此可以在浏览器里访问到
发送端
http://localhost:8045/doc.html
订阅端1
http://localhost:8046/doc.html
订阅端2
http://localhost:8047/doc.html
在这里发送端发送消息,订阅端1和订阅端2都可以收到消息的
发送消息三条
蚂蚁舞1
蚂蚁舞2
蚂蚁舞3
订阅端1和订阅端2的控制台输出,表示都收到了三条消息,那么发布订阅模式是可以的
持久化测试一,订阅端1关停,此时订阅端2能正常收到发送端的消息
蚂蚁舞4
蚂蚁舞5
蚂蚁舞6
订阅端2的控制台
在ActiveMQ上
此时启动订阅端1,测试持久化,可以看到之前发送的(蚂蚁舞4 蚂蚁舞5 蚂蚁舞6都收到了)
持久化测试二,订阅端1和订阅端2都关停,先发送三条消息到ActiveMQ上,然后把ActiveMQ给关停后启动(重启,验证ActiveMQ有没有保存消息)
蚂蚁舞7
蚂蚁舞8
蚂蚁舞9
关停重启
启动订阅端1和启动订阅端2
订阅端1
订阅端2
可以看到订阅端1和订阅端2都收到了蚂蚁舞7 8 9三条消息