文章目录
- 前言
- RabbitMQ
- 1 同步调用和异步调用
- 2 常见的MQ对比
- 3 安装RabbitMQ
- 4 RabbitMQ学习
- 4.1 helloworld学习
- 5 Spring AMQP
- 5.1 AMQP的入门案例(使用rabbittemplate进行消息发送和接受)
- 5.2 RabbitMQ的workquene
- 5.3 发布订阅模型(exchange(广播fanout 路由direct 话题topic))
- 5.3.1 fanout 广播
- 5.3.2 direct 路由
- 5.3.3 topic 话题
- 5.3.4 消息转换器(默认我们传一个对象给rabbitmq spring会使用默认的jdk objectoutputstream进行序列化)
- 总结
前言
RabbitMQ
1 同步调用和异步调用
1 同步调用适用于大多数场景 比如差一个订单状态 我们要求时效性
2 异步调用适用于高并发场景,依赖于异步管理器,能提高吞吐量,反应速度。
2 常见的MQ对比
3 安装RabbitMQ
我使用的是centos8
直接安装rabbitmq会有很多问题 因为centos8
021年12月31日CentOS 8操作系统版本结束了生命周期(EOL),Linux社区已不再维护该操作系统版本。后续新的服务器建议使用CentOS Stream,或者其他linux版本,按照社区规则,CentOS 8的源地址http://mirror.centos.org/centos/8/内容已移除
这里使用Docker的方式进行安装
1 安装rabbitmq
docker pull rabbitmq
2 启动rabbitmq
默认方式
登录的时候用户名密码都是guest
docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 -p 5672:5672 rabbitmq
配置用户名密码
docker run \
-e RABBITMQ_DEFAULT_USER=dongjiming \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
3 安装web插件
先执行docker ps 拿到当前的镜像ID
进入容器
安装插件
ctrl+p+q退出当前容器
docker ps
docker exec -it 镜像ID /bin/bash
rabbitmq-plugins enable rabbitmq_management
访问地址
http://linuxip地址:15672,这里的用户名和密码输你配置的
可以配置用户信息
# 创建一个rabbitmq用户
rabbitmqctl add_user [账号] [密码]
# 给具体的一个用户设置身份权限
rabbitmqctl set_user_tags [账号] administrator
# 给具体的一个用户修改密码
rabbitmqctl change_password [username] [new password]
# 删除一个用户
rabbitmqctl delete_user [username]
# 列出所有用户清单
rabbitmqctl list_users
# 为用户设置 administrator 角色
rabbitmqctl.bat set_permission -p / [username] ".*" ".*" ".*"
rabbitmqctl.bat set_permission -p / root ".*" ".*" ".*"
4 RabbitMQ学习
4.1 helloworld学习
依赖
<dependencies>
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.29</version>
</dependency>
</dependencies>
生产者
package helloworld;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述: Hello World 的发送类,连接到RabbitMQ服务端,然后发送一条消息,然后退出。
*/
public class Send {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("139.224.237.247");
factory.setUsername("admin");
factory.setPassword("11111");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//发布消息
String message = "Hello World!11";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
System.out.println("发送了消息:" + message);
//关闭连接
channel.close();
connection.close();
}
}
消费者
package helloworld;
import com.rabbitmq.client.AMQP.BasicProperties;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* 描述: 接收消息,并打印,持续运行
*/
public class Recv {
private final static String QUEUE_NAME = "hello";
public static void main(String[] args) throws IOException, TimeoutException {
//创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
//设置RabbitMQ地址
factory.setHost("139.224.237.247");
factory.setUsername("admin");
factory.setPassword("1111");
//建立连接
Connection connection = factory.newConnection();
//获得信道
Channel channel = connection.createChannel();
//声明队列
channel.queueDeclare(QUEUE_NAME,false,false,false,null);
//接收消息并消费
channel.basicConsume(QUEUE_NAME, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
BasicProperties properties, byte[] body) throws IOException {
String message = new String(body, "UTF-8");
System.out.println("收到消息:" + message);
}
});
}
}
5 Spring AMQP
5.1 AMQP的入门案例(使用rabbittemplate进行消息发送和接受)
发送端send
1 导入依赖
<dependencies>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
<!--单元测试-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
</dependencies>
2 配置文件
logging:
pattern:
dateformat: MM-dd HH:mm:ss:SSS
spring:
rabbitmq:
host: 139.224.237.247 # rabbitMQ的ip地址
port: 5672 # 端口
username: admin
password: 11111
virtual-host: /
3 引入使用
@SpringBootTest
public class SpringAmqpTest {
//注入模板
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
//指定队列发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
接收端
1 导入依赖
2 配置文件
这两步和上面相同
3 编写类注册Bean开启监听
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}
}
5.2 RabbitMQ的workquene
workquene就是一个队列 多个消费者消费
每条信息只能被一个消费者消费
其目的是提高性能 避免消息队列中信息堆积
5.3 发布订阅模型(exchange(广播fanout 路由direct 话题topic))
5.3.1 fanout 广播
把消息发给交换机
交换机会把消息发送给每一个和它绑定的队列
1 首先使用配置类 配置Bean的方式 声明交换机 队列1 队列2 并进行交换机和队列的绑定
package cn.itcast.mq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.FanoutExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class FanoutConfig {
// itcast.fanout
@Bean
public FanoutExchange fanoutExchange(){
return new FanoutExchange("itcast.fanout");
}
// fanout.queue1
@Bean
public Queue fanoutQueue1(){
return new Queue("fanout.queue1");
}
// 绑定队列1到交换机
@Bean
public Binding fanoutBinding1(Queue fanoutQueue1, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue1)
.to(fanoutExchange);
}
// fanout.queue2
@Bean
public Queue fanoutQueue2(){
return new Queue("fanout.queue2");
}
// 绑定队列2到交换机
@Bean
public Binding fanoutBinding2(Queue fanoutQueue2, FanoutExchange fanoutExchange){
return BindingBuilder
.bind(fanoutQueue2)
.to(fanoutExchange);
}
2 配置Bean 加 @rabbitlistener(指定队列名称) 的方式进行监听消费
@Component
public class SpringRabbitListener {
@RabbitListener(queues = "fanout.queue1")
public void listenFanoutQueue1(String msg) {
System.out.println("消费者接收到fanout.queue1的消息:【" + msg + "】");
}
@RabbitListener(queues = "fanout.queue2")
public void listenFanoutQueue2(String msg) {
System.out.println("消费者接收到fanout.queue2的消息:【" + msg + "】");
}
}
3 向交换机发送消息(发送前先启动消费者端 进行监听)
@Test
public void testSendFanoutExchange() {
// 交换机名称
String exchangeName = "itcast.fanout";
// 消息
String message = "hello, every one!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
发现消息同时被两个消息队列消费
5.3.2 direct 路由
生产者 发送消息时会指定 交换机 和 routingkey
交换机再根据 routingkey 和 队列绑定的bindingkey比较
相同则会把消息发给这个队列
1 使用@rabbitlistener的方式 声明交换机 队列 绑定 binding-key 消费方法
@Component
public class SpringRabbitListener {
//生命交换机 队列 进行绑定 并且指定bindingkey 和消费方法
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "itcast.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者接收到direct.queue2的消息:【" + msg + "】");
}
}
2 发送端编写代码 指定routingkey和消息
@Test
public void testSendDirectExchange() {
// 交换机名称
String exchangeName = "itcast.direct";
// 消息
String message = "hello, red!";
// 发送消息 (指定routingkey为red 这样两个队列都会收到消息)
rabbitTemplate.convertAndSend(exchangeName, "red", message);
}
指定routingkey red
指定routingkey yellow
5.3.3 topic 话题
和direct 几乎一样 不过topic的key 以 . 分割并且可以使用通配符
@Component
public class SpringRabbitListener {
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String msg){
System.out.println("消费者接收到topic.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "itcast.topic", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String msg){
System.out.println("消费者接收到topic.queue2的消息:【" + msg + "】");
}
}
@Test
public void testSendTopicExchange() {
// 交换机名称
String exchangeName = "itcast.topic";
// 消息
String message = "今天天气不错,我的心情好极了!";
// 发送消息
rabbitTemplate.convertAndSend(exchangeName, "china.weather", message);
}
5.3.4 消息转换器(默认我们传一个对象给rabbitmq spring会使用默认的jdk objectoutputstream进行序列化)
总结
提示:这里对文章进行总结:
例如:以上就是今天要讲的内容,本文仅仅简单介绍了pandas的使用,而pandas提供了大量能使我们快速便捷地处理数据的函数和方法。