RabbitMQ
- 生产者通过-》通道-》交换机-》投到消息队列-》再通过通道-》消费者
- 分布式架构
- 何谓分布式系统
通俗一点: 就是一个请求由服务器端的 多个服务 (服务或者系统)协同处理完成
和单体架构不同的是,单体架构是一个请求发起ivm调度线程(确切的是tomcat线程池)分线程hread来处理请求直到释放,而分布式是系统是:一个请求是由多个系统共同来协同完成,jvm和环境都可能是独立。如果生活中的比喻的话,单体架构就想建设一个小房子很快就能够搞定,如果你要建设一个鸟巢者大型的建筑,你就必须是各个环节的协司和分布,这样目的也是项目发展都后期的时候要去部署和思考的问题
- 中间件
- 串行与并行(异步,流量削峰)
-
协议
-
什么是持久化
简单来说就是将数据存入磁盘, 而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。
-
消息分发策略的机制对比
- 高可用–》可搭建集群
rabbitMQ的入门以及安装
简单概述:
RabbitMQ是一个开源的遵循AMQP协议实现的基于Erlang语言编写,支持多种客户端(语言)。用于在分布式系统中存储消息,转发消息,具有高可用,高可扩性,易用性等特征。
rabbitmq下载地址:https://www.rabbitmq.com/download.html
下载地址就在erlang官网:https://github.com/rabbitmq/erlang-rpm/releases
笔记地址:https://www.kuangstudy.com/zl/rabbitmq#1366709584634437634
- 解压,下载环境
rpm -Uvh erlang-25.3-1.el9.aarch64.rpm
yum install -y erlang
- 安装成功
- 启动成功
- 谨记,云服务器放开端口后,重启一下rabbitmq便可访问远程rabbitmq(web图形化)
rabbitmq的角色分类
什么是amqp
AMQP全称: Advanced Message Queuing Protocol(高级消息队列协议)。是应用层协议的一个开发标准,为面向消
息的中间件设计。
amqp消费者流转过程
- 选择指定的交换机,指定的路由key(过滤)找到指定的值
- 都会有默认交换机,由交换机发送消息,没有交换机就会默认自带的交换机
Rabbitmq的六种工作模式
simple简单模式、work工作模式、publish/subscribe订阅模式、routing路由模式、topic 主题模式、RPC模式。
-
simple简单模式为一个队列中一条消息,只能被一个消费者消费。
-
Work工作模式为一个生产者,多个消费者,每个消费者获取到的消息唯一。
-
publish/subscribe订阅模式为一个生产者发送的消息被多个消费者获取。
-
routing路由模式为生产者发送的消息主要根据定义的路由规则决定往哪个队列发送。
-
topic 主题模式为生产者,一个交换机(topicExchange),模糊匹配路由规则,多个队列,多个消费者。
-
RPC模式为客户端 Client 先发送消息到消息队列,远程服务端 Server 获取消息,然后再写入另一个消息队列,向原始客户端 Client 响应消息处理结果。
简单模式
生产者
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue1", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Consumer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routing
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicConsume("queue1", true, new DeliverCallback() {
public void handle(String consumer, Delivery message) throws IOException {
System.out.println("收到的消息是" + new String(message.getBody(), "utf-8"));
}
},new CancelCallback(){
public void handle(String s) throws IOException {
System.out.println("接受失败");
}
});
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
发布与订阅模式(fanout)
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
String exchange = "fanout-exchange";
// @params2: 队列名称/routing
String routeKey = "";
//交换机类型
String type = "fanout";
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
路由模式(direct)
- 增加路由key进行过滤
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
String exchange = "direct-exchange";
// @params2: 队列名称/routing
String routeKey = "email";//对应是queue1
//交换机类型
String type = "direct";
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
主题模式(topic)
#代表一个或者多个多级,可有可无
*有且只有一个,一定要有
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
String exchange = "topic-exchange";
// @params2: 队列名称/routing
String routeKey = "com.course.user";//对应是queue1
//交换机类型
String type = "topic";
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish(exchange, routeKey, null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
work模式-轮询模式(round-robin)
- 类型:无
- 特点:该模式接收消息是当有多个消费者接入时,消息的分配摸式是一个消费者分配一条,直至消息消费完战
轮询
生产者
package com.liang.rabbitmq.work.fair;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
public class Producer {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 6: 准备发送消息的内容
//===============================end topic模式==================================
for (int i = 1; i <= 20; i++) {
//消息的内容
String msg = "lwh:" + i;
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
// @params2: 队列名称/routingkey
// @params3: 属性配置
// @params4: 发送消息的内容
channel.basicPublish("", "queue8", null, msg.getBytes());
}
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者1
package com.liang.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue8", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(200);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2
package com.liang.rabbitmq.work.lunxun;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue8", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(1000);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
公平分发(能者多得),需要手动应答
消费者1
package com.liang.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work1 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work1");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
// channel.queueDeclare("queue1", false, false, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
// 6: 定义接受消息的回调
final Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue8", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work1-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(1000);
//改成手动应答
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work1-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
消费者2
package com.liang.rabbitmq.work.fair;
import com.rabbitmq.client.*;
import java.io.IOException;
public class Work2 {
public static void main(String[] args) {
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("消费者-Work2");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
*
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
// 这里如果queue已经被创建过一次了,可以不需要定义
//channel.queueDeclare("queue1", false, true, false, null);
// 同一时刻,服务器只会推送一条消息给消费者
//channel.basicQos(1);
// 6: 定义接受消息的回调
final Channel finalChannel = channel;
finalChannel.basicQos(1);
finalChannel.basicConsume("queue8", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
try{
System.out.println("Work2-收到消息是:" + new String(delivery.getBody(), "UTF-8"));
Thread.sleep(3000);
finalChannel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}catch(Exception ex){
ex.printStackTrace();
}
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
}
});
System.out.println("Work2-开始接受消息");
System.in.read();
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null && connection.isOpen()) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
参数模式
- 创立交换机选择对应的type
代码创建交换机并绑定队列
- 如果消费一个不存在的队列会直接报错
package com.liang.rabbitmq.simple;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.TrafficListener;
public class Producer {
public static void main(String[] args) {
// 所有的中间件技术都是基于tcp/ip协义基础之上构建新型的协议规范,只不过rabbitmq遵循的是amqp
//ip port
//1.创建连接工程
// 1: 创建连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
// 2: 设置连接属性
connectionFactory.setHost("175.178.89.93");
connectionFactory.setPort(5672);
connectionFactory.setVirtualHost("/");
connectionFactory.setUsername("guest");//注意账号有无权限
connectionFactory.setPassword("guest");
Connection connection = null;
Channel channel = null;
try {
// 3: 从连接工厂中获取连接
connection = connectionFactory.newConnection("生产者");
// 4: 从连接中获取通道channel
channel = connection.createChannel();
// 5: 申明队列queue存储消息
/*
* 如果队列不存在,则会创建
* Rabbitmq不允许创建两个相同的队列名称,否则会报错。
* @params1: queue 队列的名称
* @params2: durable 队列是否持久化
* @params3: exclusive 是否排他,即是否私有的,如果为true,会对当前队列加锁,其他的通道不能访问,并且连接自动关闭
* @params4: autoDelete 是否自动删除,当最后一个消费者断开连接之后是否自动删除消息。
* @params5: arguments 可以设置队列附加参数,设置队列的有效期,消息的最大长度,队列的消息生命周期等等。
* */
channel.queueDeclare("queue1", false, false, false, null);
// 6: 准备发送消息的内容
String message = "你好,梁伟浩!!!";
// 7: 发送消息给中间件rabbitmq-server
// @params1: 交换机exchange
String exchange = "direct-message-exchange";
// @params2: 队列名称/routing
String routeKey = "";//对应是queue1
//交换机类型
String type = "direct";
// @params3: 属性配置
// @params4: 发送消息的内容
//交换机会不会随者服务器重启造成丢失,如果是true代表不丢失,false重启就会去失
channel.exchangeDeclare(exchange, type,true);
//声明队列
channel.queueDeclare("queue5", true,false,false,null);
channel.queueDeclare("queue6", true,false,false,null);
channel.queueDeclare("queue7", true,false,false,null);
//绑定队列与交换机的关系
channel.queueBind("queue5", exchange,"order");
channel.queueBind("queue6", exchange,"order");
channel.queueBind("queue7", exchange,"course");
channel.basicPublish(exchange, "order", null, message.getBytes());
System.out.println("消息发送成功!");
} catch (Exception ex) {
ex.printStackTrace();
System.out.println("发送消息出现异常...");
} finally {
// 7: 释放连接关闭通道
if (channel != null && channel.isOpen()) {
try {
channel.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
if (connection != null) {
try {
connection.close();
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
}
}
当有多个消费者时,我们的消息会被哪个消费者消费呢,我们又该如何均衡消费者消费信息的多少呢:主要有两种模式:
-
论询模式的分发:个消费者条,按均分配;
-
公平分发: 根据消费的消费,讲行公平分发,处理快的处理的多,处理悍的处理的少;按劳分配;
rabbitmq使用场景
- 解耦、削峰、异步
- 高内聚低耦合
-
静态方法只能调用静态方法
-
版本问题
-
注意yml和properties配置文件
-
@Configuration加这个@bean项目启动自动扫描编译运行
-
@Configuration表示springboot会自动扫描 @bean
-
在Spring框架中,Bean是最基本的组件,它是Spring框架中的一个Java对象。在实例化阶段,Spring框架会创建Bean实例,并根据Bean的定义,注入Bean的属性和依赖关系。
-
在实例化阶段完成后,Spring框架会对Bean进行初始化。在初始化阶段,Spring框架会调用Bean的初始化方法,并执行各种初始化操作,例如数据源的初始化、连接的建立等
fanout消费者
在对应的消费者类加上注解并注明到spring组件会自动对队列的消息进行消费
- @Component
- @service
@Component
@RabbitListener(queues = {"duanxin.fanout.queue"})
public class FanoutdaunxinConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("duanxin.fanout.queue==接收到的订单信息是"+message);
}
springboot整合direct模式
-
比fanout多一个路由key进行过滤
-
@bean方法的名称不可以重复,否则会报错
-
生产者
-
config类
package com.liang.rabbitmqspringboot.config;
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
@Configuration
public class DirectRabbitMqConfiguration {
//: 声明注册direct模式的交换机
@Bean
public DirectExchange directExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}
// 2: 声明队列 sms.fanout.queue email.fanout.queue,duanxin.fanout.queue
@Bean
public Queue smsQueueDirect(){
System.out.println("9999999999999999999999999999999999999999999999999999999999999999");
return new Queue("sms.direct.queue",true);
}
@Bean
public Queue duanxinQueueDirect(){
System.out.println("888888888888888888888888888888888888888888888888888888888888");
return new Queue("duanxin.direct.queue",true);
}
@Bean
public Queue emailQueueDirect(){
System.out.println("11111111111111111111111111111111111111111111");
return new Queue("email.direct.queue",true);
}
// 3: 完成绑定关系(队列和交换机完成绑定关系)
@Bean
public Binding smsBingdingDirect(){
return BindingBuilder.bind(smsQueueDirect()).to(directExchange()).with("sms");
}
@Bean
public Binding duanxinBingdingDirect(){
return BindingBuilder.bind(duanxinQueueDirect()).to(directExchange()).with("duanxin");
}
@Bean
public Binding emailBingdingDirect(){
return BindingBuilder.bind(emailQueueDirect()).to(directExchange()).with("email");
}
}
- service类
package com.liang.rabbitmqspringboot.service;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import java.util.UUID;
@Service
public class OrderService {
@Autowired
private RabbitTemplate rabbitTemplate;
public void makeOrder(String userid, String productid, int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderid = UUID.randomUUID().toString();
System.out.println("订单生成成功"+orderid);
3: 通过Mp来完成消息的分发
//参数1: 交换机 参数2:路中key/queue队列名称 参数3:消息内容
String exchangeName = "fanout_order_exchange";
String routingKey = "";
rabbitTemplate.convertAndSend(exchangeName,routingKey,orderid);
}
public void makeOrderDirect(String userid, String productid, int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderid = UUID.randomUUID().toString();
System.out.println("订单生成成功"+orderid);
3: 通过Mp来完成消息的分发
//参数1: 交换机 参数2:路中key/queue队列名称 参数3:消息内容
String exchangeName = "direct_order_exchange";
rabbitTemplate.convertAndSend(exchangeName,"email",orderid);
rabbitTemplate.convertAndSend(exchangeName,"duanxin",orderid);
}
}
- 测试类
package com.liang.rabbitmqspringboot;
import com.liang.rabbitmqspringboot.service.OrderService;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import sun.applet.Main;
@SpringBootTest
class RabbitmqSpringbootApplicationTests {
@Autowired
private OrderService orderService;
@Test
void contextLoads() {
orderService.makeOrder("2", "2", 12);
}
@Test
void contextLoadsDirect() {
orderService.makeOrderDirect("3", "3", 13);
}
}
springboot整合主题模式
package com.liang.rabbitmqspringbootconsumer.service.topic;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.*;
import org.springframework.stereotype.Component;
@Component
@RabbitListener(bindings = @QueueBinding(
value = @Queue(value = "sms.topic.queue",durable = "true"
,autoDelete = "false"),
exchange = @Exchange(value = "topic_order_exchange",
type = ExchangeTypes.TOPIC),
key = "#.com.#"
))
public class TopicSmsConsumer {
@RabbitHandler
public void reviceMessage(String message){
System.out.println("sms.topic.queue==接收到的订单信息是"+message);
}
}
rabbitQM-过期时间TTL
过期时间TTL表示可以对消息设置预期的时间,在这个时间内都可以被消费者接收获取,过了之后消息将自动被删除RabbitMO可以对 消息和队列 设置TTL。目前有两种方法可以设置。
- ttl队列过期时间,过期消息的时候移除到死性队列中(queue会显示有ttl标识)
- ttl消息过期时间,直接移除掉过期信息
- config
package com.liang.rabbitmqspringboot.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class ttlRabbitMqConfiguration {
//: 声明注册direct模式的交换机
@Bean
public DirectExchange ttlDirectExchange(){
return new DirectExchange("direct_order_exchange",true,false);
}
// 2: 声明队列 sms.fanout.queue email.fanout.queue,duanxin.fanout.queue
@Bean
public Queue smsQueuettlDirect(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);//int类型
return new Queue("ttl.direct.queue",true,false,false,args);
}
@Bean
public Queue smsQueuettlDirectMessage(){
return new Queue("ttl.message.direct.queue",true);
}
@Bean
public Binding emailBingdingDirectTTL(){
return BindingBuilder.bind(smsQueuettlDirectMessage()).to(ttlDirectExchange()).with("ttl");
}
@Bean
public Binding emailBingdingDirectTTLmessage(){
return BindingBuilder.bind(ttlDirectExchange()).to(ttlDirectExchange()).with("ttlMessage");
}
}
public void makeOrderTTL(String userid, String productid, int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderid = UUID.randomUUID().toString();
System.out.println("订单生成成功"+orderid);
3: 通过Mp来完成消息的分发
//参数1: 交换机 参数2:路中key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_direct_exchange";
String routKey = "ttl";
rabbitTemplate.convertAndSend(exchangeName,routKey,orderid);
}
public void makeOrderTTLMessage(String userid, String productid, int num){
// 1: 根据商品id查询库存是否充足
// 2: 保存订单
String orderid = UUID.randomUUID().toString();
System.out.println("订单生成成功"+orderid);
3: 通过Mp来完成消息的分发
//参数1: 交换机 参数2:路中key/queue队列名称 参数3:消息内容
String exchangeName = "ttl_direct_exchange";
String routKey = "ttlMessage";
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setExpiration("5000");
message.getMessageProperties().setContentEncoding("utf-8");
return message;
}
};
rabbitTemplate.convertAndSend(exchangeName,routKey,orderid,messagePostProcessor);
}
死信队列
DLX,全称为Dead-Letter-Exchange,可以称之为死信交换机,也有人称之为死信邮箱。当消息在一个队列中变成死信dead message)之后,它能被重新发送到另一个交换机中,这个交换机就是DLX,绑定DLX的队列就称之为死信队列.消息变成死信,可能是由于以下的原区”
。消息被拒绝
。消息过期
。队列达到最大长度
- 设置死信队列
@Bean
public Queue smsQueuettlDirect(){
Map<String, Object> args = new HashMap<>();
args.put("x-message-ttl", 5000);//int类型
args.put("x-dead-letter-exchange", "dead_direct_exchange");
args.put("x-dead-letter-routing-key", "dead");//fanout不需要路由key
return new Queue("ttl.direct.queue",true,false,false,args);
}
- 消息队列过期后会到对应设置的死信队列中
- 内存不足(Memory)
- 内存换页,把内存的值设置到0.5百分比左右,消息转移到磁盘里,因为磁盘比内存大的多得多
rabbitMQ集群
- 从节点挂了,数据不会丢失
- 主节点挂了,从节点无法启动