说明:Topic主题交换机它的大致流程是交换机和一个或者多个队列绑定,这个绑定的Routingkey是包含通配符的,满足通配符的队列会接收到消息。
通配符规则:
#:匹配一个或多个词
*:匹配一个词
例如:
topic.#:能匹配 topic.xxx 或者 topic.xxx.xxx
topic.*:只能匹配 topic.xxx
工程图:
A.总体maven依赖
pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<artifactId>spring-boot-starter-parent</artifactId> <!-- 被继承的父项目的构件标识符 -->
<groupId>org.springframework.boot</groupId> <!-- 被继承的父项目的全球唯一标识符 -->
<version>2.2.2.RELEASE</version> <!-- 被继承的父项目的版本 -->
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>RabbitMqSpringbootDemo</groupId>
<artifactId>RabbitMqSpringbootDemo</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>pom</packaging>
<modules>
<module>MqCustomer</module>
<module>MqProducer</module>
</modules>
<name>RabbitMqSpringbootDemo Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
</dependencies>
<build>
<finalName>RabbitMqSpringbootDemo</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
B.生产者
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RabbitMqSpringbootDemo</artifactId>
<groupId>RabbitMqSpringbootDemo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>MqProducer</artifactId>
<packaging>jar</packaging>
<name>MqProducer Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--spring boot核心-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring boot 测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--springmvc web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--开发环境调试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--amqp 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<!-- commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
</dependencies>
<build>
<finalName>MqProducer</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.application.yml
server:
port: 8080
spring:
rabbitmq:
port: 5672
host: 192.168.18.145
username: admin
password: admin
virtual-host: /
3.TopicRabbitConfig
package com.dev.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
/**
* 类名称:主题交换机的使用
*
* @author lqw
* @date 2024年02月28日 10:40
*/
@Configuration
public class TopicRabbitConfig {
//绑定键
public final static String high = "topic.highSchool.one";
public final static String middle = "topic.middleSchool.one";
@Bean
public Queue oneQueue() {
return new Queue(TopicRabbitConfig.high);
}
@Bean
public Queue twoQueue() {
return new Queue(TopicRabbitConfig.middle);
}
@Bean
TopicExchange exchange() {
return new TopicExchange("topicExchange");
}
@Bean
Binding bindingExchangeMessageA() {
return BindingBuilder.bind(oneQueue()).to(exchange()).with("topic.highSchool.#");
}
@Bean
Binding bindingExchangeMessageB() {
return BindingBuilder.bind(twoQueue()).to(exchange()).with("topic.middleSchool.#");
}
}
4.TopicRabbitController
package com.dev.controller;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.HashMap;
import java.util.Map;
/**
* 类名称:主题交换机 消息生产者
*
* @author lqw
* @date 2024年02月27日 14:47
*/
@Slf4j
@RestController
@RequestMapping("topic")
public class TopicRabbitController {
@Autowired
RabbitTemplate rabbitTemplate; //使用RabbitTemplate,这提供了接收/发送等等方法
/**
* topic测试
* @return
*/
@GetMapping("/sendMessageA")//匹配到 topic.highSchool.one
public String sendMessageA() {
Map<String,Object> map = new HashMap<>();
map.put("name","张龙");
map.put("school","高中");
rabbitTemplate.convertAndSend("topicExchange", "topic.highSchool.two", map);
return "ok";
}
@GetMapping("/sendMessageB")//匹配到 topic.middleSchool.one
public String sendMessageB() {
Map<String,Object> map = new HashMap<>();
map.put("name","赵虎");
map.put("school","初中");
rabbitTemplate.convertAndSend("topicExchange", "topic.middleSchool.two", map);
return "ok";
}
@GetMapping("/sendMessageBB")//匹配到 topic.middleSchool.one
public String sendMessageBB() {
Map<String,Object> map = new HashMap<>();
map.put("name","赵虎B");
map.put("school","初中B");
rabbitTemplate.convertAndSend("topicExchange", "topic.middleSchool.two.cc", map);
return "ok";
}
}
5.AppP
package com.dev;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 类名称:
*
* @author 李庆伟
* @date 2024年02月27日 14:20
*/
@SpringBootApplication
public class AppP {
public static void main(String[] args) {
SpringApplication.run(AppP.class);
}
}
C.消费者
1.pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>RabbitMqSpringbootDemo</artifactId>
<groupId>RabbitMqSpringbootDemo</groupId>
<version>1.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>MqCustomer</artifactId>
<packaging>jar</packaging>
<name>MqCustomer Maven Webapp</name>
<!-- FIXME change it to the project's website -->
<url>http://www.example.com</url>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<dependencies>
<!--spring boot核心-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--spring boot 测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--springmvc web-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--开发环境调试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-devtools</artifactId>
<optional>true</optional>
</dependency>
<!--amqp 支持-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!-- commons-lang3 -->
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.4</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.16.10</version>
</dependency>
</dependencies>
<build>
<finalName>MqCustomer</finalName>
<pluginManagement><!-- lock down plugins versions to avoid using Maven defaults (may be moved to parent pom) -->
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<version>3.1.0</version>
</plugin>
<!-- see http://maven.apache.org/ref/current/maven-core/default-bindings.html#Plugin_bindings_for_war_packaging -->
<plugin>
<artifactId>maven-resources-plugin</artifactId>
<version>3.0.2</version>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.0</version>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
</plugin>
<plugin>
<artifactId>maven-war-plugin</artifactId>
<version>3.2.2</version>
</plugin>
<plugin>
<artifactId>maven-install-plugin</artifactId>
<version>2.5.2</version>
</plugin>
<plugin>
<artifactId>maven-deploy-plugin</artifactId>
<version>2.8.2</version>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
2.application.yml
server:
port: 8081
spring:
rabbitmq:
port: 5672
host: 192.168.18.145
username: admin
password: admin
3.TopicRabbitListenerA
package cn.ct.listeners;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 类名称:主题交换机,消息消费者
* 直连
* @author lqw
* @date 2024年02月27日 14:58
*/
@Component
public class TopicRabbitListenerA {
@RabbitListener(queues = "topic.highSchool.one")
@RabbitHandler
public void process(Map msg) {
System.out.println("Rabbitmq topic : " + msg);
System.out.println("Rabbitmq topic : " + msg);
}
}
4.TopicRabbitListenerB
package cn.ct.listeners;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.util.Map;
/**
* 类名称:主题交换机,消息消费者
* 直连
* @author lqw
* @date 2024年02月27日 14:58
*/
@Component
public class TopicRabbitListenerB {
@RabbitListener(queues = "topic.middleSchool.one")
@RabbitHandler
public void process(Map msg) {
System.out.println("Rabbitmq topic : " + msg);
System.out.println("Rabbitmq topic : " + msg);
}
}
5.AppC
package cn.ct;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
/**
* 类名称:
*
* @author lqw
* @date 2024年02月27日 14:20
*/
@SpringBootApplication
public class AppC {
public static void main(String[] args) {
SpringApplication.run(AppC.class);
}
}