B站学习视频https://www.bilibili.com/video/BV1LQ4y127n4?p=61&vd_source=8665d6da33d4e2277ca40f03210fe53a
文档资料:
链接:https://pan.baidu.com/s/1P_Ag1BYiPaF52EI19A0YRw?pwd=d03r
提取码:d03r
一 初始MQ
1. 同步通讯
2. 异步通讯
3. MQ常见架构
二 RabbitMQ 快速入门
1. RabbitMQ概述和安装 --单机部署
我们在Centos7虚拟机中使用Docker来安装
1.1.下载镜像
方式一:在线拉取
docker pull rabbitmq:3-management
方式二:从本地加载
在课前资料已经提供了镜像包:
上传到虚拟机中后,使用命令加载镜像即可:
docker load -i mq.tar
1.2.安装MQ
执行下面的命令来运行MQ容器:
docker run \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=123321 \
--name mq \
--hostname mq1 \
-p 15672:15672 \
-p 5672:5672 \
-d \
rabbitmq:3-management
1.2. RabbitMQ概述和安装 --集群部署
2.常见消息模型
3.快速入门 --Basic Queue 简单队列模型
三 SpringAMQP
1. Basic Queue 简单队列模型
- 结构
1.1 消息发送
- 16572是UI端口 5672消息端口
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import org.junit.Test;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class PublisherTest {
@Test
public void testSendMessage() throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.发送消息
String message = "hello, rabbitmq!";
channel.basicPublish("", queueName, null, message.getBytes());
System.out.println("发送消息成功:【" + message + "】");
// 5.关闭通道和连接
channel.close();
connection.close();
}
}
1.2 消息接收
- 16572是UI端口 5672消息端口
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class ConsumerTest {
public static void main(String[] args) throws IOException, TimeoutException {
// 1.建立连接
ConnectionFactory factory = new ConnectionFactory();
// 1.1.设置连接参数,分别是:主机名、端口号、vhost、用户名、密码
factory.setHost("127.0.0.1");
factory.setPort(5672);
factory.setVirtualHost("/");
factory.setUsername("admin");
factory.setPassword("123");
// 1.2.建立连接
Connection connection = factory.newConnection();
// 2.创建通道Channel
Channel channel = connection.createChannel();
// 3.创建队列
String queueName = "simple.queue";
channel.queueDeclare(queueName, false, false, false, null);
// 4.订阅消息
channel.basicConsume(queueName, true, new DefaultConsumer(channel){
@Override
public void handleDelivery(String consumerTag, Envelope envelope,
AMQP.BasicProperties properties, byte[] body) throws IOException {
// 5.处理消息
String message = new String(body);
System.out.println("接收到消息:【" + message + "】");
}
});
System.out.println("等待接收消息。。。。");
}
}
2. SpringAMQP入门案例
2.1 介绍
2.2 案例 -- 发送消息
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: admin
password: 123
virtual-host: /
@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
}
2.3 案例 --接收消息
spring:
rabbitmq:
host: 127.0.0.1 # rabbitMQ的ip地址
port: 5672 # 端口
username: admin
password: 123
virtual-host: /
listener:
simple:
prefetch: 1
@Component //声明成 bean
public class SpringRabbitListener {
// @RabbitListener(queues = "simple.queue")
// public void listenSimpleQueue(String msg) {
// System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
// }
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
}
3. Work Queue 工作队列模型
3.1 介绍
- 接收信息
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
import java.time.LocalTime;
import java.util.Map;
@Component //声明成 bean
public class SpringRabbitListener {
/* @RabbitListener(queues = "simple.queue")
public void listenSimpleQueue(String msg) {
System.out.println("消费者接收到simple.queue的消息:【" + msg + "】");
}
*/
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue1(String msg) throws InterruptedException {
System.out.println("消费者1接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(20);
}
@RabbitListener(queues = "simple.queue")
public void listenWorkQueue2(String msg) throws InterruptedException {
System.err.println("消费者2........接收到消息:【" + msg + "】" + LocalTime.now());
Thread.sleep(200);
}
}
- 发送信息
@RunWith(SpringRunner.class) // @Autowired 可以注入
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSendMessage2SimpleQueue() {
String queueName = "simple.queue";
String message = "hello, spring amqp!";
rabbitTemplate.convertAndSend(queueName, message);
}
@Test
public void testSendMessage2WorkQueue() throws InterruptedException {
String queueName = "simple.queue";
String message = "hello, message__";
for (int i = 1; i <= 50; i++) {
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
}
}
}
-
application.yml
logging: pattern: dateformat: MM-dd HH:mm:ss:SSS spring: rabbitmq: host: 127.0.0.1 # rabbitMQ的ip地址 port: 5672 # 端口 username: admin password: 123 virtual-host: / #虚拟主机 listener: simple: prefetch: 1 # 预取 每次只取一条消息,处理完才能获取下一条消息
4. 发布模型介绍
5. 发布、订阅模型-Fanout
5.1 利用SpringAMQP演示FanoutExchange的使用 ( 发布、订阅模型-Direct)
5.2 步骤一 在consumer服务生命Exchange、QUEUE、Binding
6. 发布、订阅模型-Direct
6.1 案例
7. 发布、订阅模型-Topic
8. 消息转换器
推荐使用json