使用RabbitMQ
1 Docker安装RabbitMQ
1.1 安装RabbitMQ
# 下载含有管理页面的镜像
docker pull rabbitmq:3.8.8-management
# 创建容器
# 5672:应用访问端口;15672:控制台Web端口号;
docker run -itd \
--name=my-rabbitmq \
--restart=always \
-p 15672:15672 \
-p 5672:5672 \
-e RABBITMQ_DEFAULT_USER=rabbitmq \
-e RABBITMQ_DEFAULT_PASS=123456 \
-v /home/rabbitmq/data:/var/lib/rabbitmq rabbitmq:3.8.8-management
1.2 访问RabbitMQ
# 访问网页
http://192.168.108.200:15672
2 SpringBoot使用RabbitMQ
2.1 说明
- 引入“spring-boot-starter-amqp”依赖;
- 使用“@Configuration”配置消息队列;
- 使用RabbitTemplate发送消息;
- 使用@RabbitListener监听消息;
2.2 工程目录
2.3 pom.xml
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.mason</groupId>
<artifactId>myrabbitmq</artifactId>
<version>1.0</version>
<properties>
<java.version>11</java.version>
<spring-boot-version>2.3.12.RELEASE</spring-boot-version>
<maven.compiler.source>11</maven.compiler.source>
<maven.compiler.target>11</maven.compiler.target>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!-- Config the RebbitMQ -->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
</dependencies>
<dependencyManagement>
<!-- 注意:尽量在此配置Spring-Boot版本,子应用中可以不配置SpringBoot版本 -->
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-dependencies</artifactId>
<version>${spring-boot-version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!-- 配置Maven插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.1</version>
<configuration>
<source>${java.version}</source>
<target>${java.version}</target>
<encoding>UTF-8</encoding>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.0</version>
<configuration>
<skipTests>true</skipTests>
</configuration>
</plugin>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
<version>${spring-boot-version}</version>
</plugin>
</plugins>
</build>
</project>
2.4 yml
server:
port: 8081
servlet:
context-path: /myrm
spring:
# 设置应用名
application:
name: my-rabbitmq
# 配置RabbitMQ
rabbitmq:
host: 192.168.108.200
port: 5672
username: rabbitmq
password: 123456
2.5 config
ConfigRabbitmq.java
package com.mason.myrabbitmq.config;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import static com.mason.myrabbitmq.config.MyRoutingKey.QUEUE_HELLO;
@Configuration
public class ConfigRabbitmq {
@Bean
public Queue createDiscoverQueue(){
return new Queue(QUEUE_HELLO, true);
}
}
MyRoutingKey.java
package com.mason.myrabbitmq.config;
public class MyRoutingKey {
public static final String QUEUE_HELLO ="queue_hello";
}
2.6 controller
QueueController.java
package com.mason.myrabbitmq.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import static com.mason.myrabbitmq.config.MyRoutingKey.QUEUE_HELLO;
/**
* " @RestController是@Controller和@ResponseBody和合并 "
*/
@RestController
@RequestMapping("/test")
public class QueueController {
@Autowired
private RabbitTemplate rabbitTemplate;
@RequestMapping("/send")
public String sendMessage() {
System.out.println("1 发送数据");
System.out.println("Hello My RabbitMQ");
this.rabbitTemplate.convertAndSend(QUEUE_HELLO, "Hello My RabbitMQ ");
return "Success";
}
}
2.7 service
QueueService.java
package com.mason.myrabbitmq.service;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Service;
import static com.mason.myrabbitmq.config.MyRoutingKey.QUEUE_HELLO;
@Service
public class QueueService {
// 监听RabbitMQ
@RabbitListener(queues = QUEUE_HELLO)
public void receiveMessage(String msg){
System.out.println("2 获取RabbitMQ数据");
System.out.println(msg);
}
}
2.8 MyrabbitmqApplication.java
package com.mason.myrabbitmq;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
public class MyrabbitmqApplication {
public static void main(String[] args) {
SpringApplication.run(MyrabbitmqApplication.class, args);
}
}
2.9 截图
3 Python使用RabbitMQ
3.1 安装pika
# pika是操作RabbitMQ的依赖包
pip install pika
3,2 工程目录
3.3 main.py
# 导入channel
from my_decorator import channel
from my_rabbitmq import init_queue
init_queue()
# 运行APP
if __name__ == '__main__':
print("RabbitMQ start consuming")
channel.start_consuming()
3.4 my_decorator.py
# 导入pika使用RabbitMQ
import pika
from pika.adapters.blocking_connection import BlockingChannel
# 导入routing key
from my_routing_key import RoutingKey
# 连接RabbitMQ
def conn_rabbitmq() -> BlockingChannel:
host = "192.168.108.200"
username = "rabbitmq"
password = "123456"
port = 5672
# 配置rabbitmq连接
crenditials = pika.PlainCredentials(username=username, password=password)
conn_parameter = pika.ConnectionParameters(host=host, port=port, credentials=crenditials)
# 连接rabbitmq
connection = pika.BlockingConnection(conn_parameter)
# Generate the channel
return connection.channel()
# 连接channel
channel = conn_rabbitmq()
# 声明 routing key
def declare_routing_key():
channel.queue_declare(RoutingKey.my_routing_key)
declare_routing_key()
# 自定义装饰器
def queue(queue_name):
"""
监听rabbitmq消息
:param queue_name: rabbitmq的消息名称
:return: 装饰器方法
"""
def decorator(func):
channel.basic_consume(queue=queue_name, auto_ack=True, on_message_callback=func)
return decorator
3.5 my_rabbitmq.py
# 使用装饰器监听rabbitmq
from my_decorator import queue
# 导入routing key
from my_routing_key import RoutingKey
def init_queue():
print("初始化QueueMessage")
pass
@queue(RoutingKey.my_routing_key)
def get_message(ch, method, properties, body):
print("2 接收RabbitMQ")
print(body)
pass
3.6 my_routing_key.py
# 设置RoutingKey
class RoutingKey():
my_routing_key = "my_routing_key"
3.7 my_send_message.py
from my_decorator import channel
# 导入routing key
from my_routing_key import RoutingKey
def send_data():
print("1 发送RabbitMQ")
body = "Hello my data"
channel.basic_publish(exchange='', routing_key=RoutingKey.my_routing_key, body=body)
send_data()
3.8 截图
- 运行main.py,实时接收信息;
- 运行my_send_message.py发送消息;