RabbitMQ的使用以及整合到SpringBoot中
一、比较:
(1)、传统请求服务器:
(2)、通过MQ去操作数据库:
通过MQ去操作数据库,从而达到削峰的效果;
问题现象:
(1)、海量数据;
(2)、高并发;
解决方法:
其中最为代表的解决方法:流量削峰-----》使用MQ
二、常见的中间件:
前端:nginx
后端数据库:Mycat
消息中间件:RocketMQ Kafka-----》Kafka应用在大数据领域容易掉消息
三、MQ应用场景:
(1)、异步解耦:
同步:比如用洗衣机洗衣服的时候,洗衣机洗完衣服了,你才能干别的事。没有洗完,你不能干别的事;
异步就是洗衣机洗衣服的时候同时我们该干嘛干嘛;
(2)、削峰填谷。
(3)、消息分发。
四、RabbitMQ 的组成:
过程:
五、基础使用:
开发中使用:
只需要:
(1)创建生产者----->生产消息
(2)创建消费者---------》消费消息
就可以了
平常使用步骤:
(1)对于生产者:
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.创建并设置队列属性
channel.queueDeclare("QUEUE_HELLOWORD", false, false, false, null);、
/**
queueDeclare中参数:
(1)第一个参数:队列名称;
(2)第二个参数:队列是否要持久化,避免重启丢失消息,持久化:即保存到磁盘;
(3)第三个参数:是否排他性:设置成true时只能在同一个连接对象即当前连接对象中操作,
在其他连接对象拿不到对象,一般设置成falsh;
(4)第四个参数:是否自动删除消息,当指定为true时,没有消费者链接的时候,会把我们的队列删除掉;
(5)第五个参数:是否要设置一些额外设置,比如 死信队列;
*/
//6.创建消息
String xiaoxi="这是生成这消息111111";
//7.发送消息
channel.basicPublish("","QUEUE_HELLOWORD",null,xiaoxi.getBytes());
/**
basicPublish中参数:
(1)第一个参数:交换机名称;
(2)第二个参数:路由 key----》订阅广播模式没有;
(3)第三个参数:消息属性;
(4)第四个参数:消息的内容:---》是Byte数组格式传输,其他类型转换为JSON,再转成字符串,再转换Byte数组;
*/
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
(2)对于消费者:
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.创建并设置队列属性
channel.queueDeclare("QUEUE_HELLOWORD", true, false, false, null);、
/**
queueDeclare中参数:
(1)第一个参数:队列名称;
(2)第二个参数:队列是否要持久化,避免重启丢失消息,持久化:即保存到磁盘;
(3)第三个参数:是否排他性:设置成true时只能在同一个连接对象即当前连接对象中操作,
在其他连接对象拿不到对象,一般设置成falsh;
(4)第四个参数:是否自动删除消息,当指定为true时,没有消费者链接的时候,会把我们的队列删除掉;
(5)第五个参数:是否要设置一些额外设置,比如 死信队列;
*/
//6.监听消息,消费消息
channel.basicConsume("QUEUE_HELLOWORD", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.printf("接收到消息:"+new String(delivery.getBody()));
//手动签收
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("11111");
}
});
/**
basicConsume中参数:
(1)第一个参数:队列名称与生产者保持一致;
(2)第二个参数:是否自动签收 :true自动;
(3)第三个参数:是两个匿名函数,第一个:当消息从mq中取出来之后会回调这个DeliverCallback函数
中handle方法,消费者就在方法中参数delivery中拿到消息信息进行处理;
第二个:当消息从mq消息取消了会回调这个 CancelCallback函数中handle方法。;
当第二个参数设置成falsh时:
不再自动签收,可设置为手动签收,当消息业务执行成功后再手动签收。
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
*/
六、MQ常见工作模式:
(1)、简单模式:
生产者:
package cn.js.hello;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 20:49
* @Version: 1.0
* @introduce:
*/
public class PermissTest01 {
public static void main(String[] args) throws Exception{
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置队列属性
channel.queueDeclare("QUEUE_HELLOWORD", false, false, false, null);
//6.创建消息
String xiaoxi="这是生成这消息111111";
//7.发送消息
channel.basicPublish("","QUEUE_HELLOWORD",null,xiaoxi.getBytes());
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
System.out.println("消息发送完毕");
}
}
消费者:
package cn.js.hello;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 21:37
* @Version: 1.0
* @introduce:
*/
public class consum {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置队列属性
channel.queueDeclare("QUEUE_HELLOWORD", false, false, false, null);
//监听消息,消费消息
channel.basicConsume("QUEUE_HELLOWORD", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.printf("接收到消息:"+new String(delivery.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("11111");
}
});
}
}
(2)、工作模式(work)
生产者:
package cn.js.work;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 20:49
* @Version: 1.0
* @introduce:
*/
public class WorkTest01 {
public static void main(String[] args) throws Exception{
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置队列属性
channel.queueDeclare("QUEUE_Work", true, false, false, null);
for(int i=0 ;i<20 ;i++){
//6.创建消息
String xiaoxi="这是生产者生成的消息"+" "+i;
//7.发送消息
channel.basicPublish("","QUEUE_Work",null,xiaoxi.getBytes());
}
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
System.out.println("消息发送完毕");
}
}
消费者1:
package cn.js.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 21:37
* @Version: 1.0
* @introduce:
*/
public class consum1 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//轮巡 一次一个
channel.basicQos(1);
//5.设置队列属性
channel.queueDeclare("QUEUE_Work", true, false, false, null);
try {
Thread.sleep(2000);
//监听消息,消费消息 改为不是自动签收
channel.basicConsume("QUEUE_Work", false, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("消费者1:接收到消息:"+new String(delivery.getBody()));
//同时设置手动签收
channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("11111");
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
消费者2:
package cn.js.work;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 21:37
* @Version: 1.0
* @introduce:
*/
public class consum2 {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//轮巡 一次一个
channel.basicQos(1);
//5.设置队列属性
channel.queueDeclare("QUEUE_Work", true, false, false, null);
try {
Thread.sleep(1000);
//监听消息,消费消息
channel.basicConsume("QUEUE_Work", true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("消费者1:接收到消息:"+new String(delivery.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("11111");
}
});
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
(3)、订阅模式:
交换机类型:
(1).广播---》fanout
(2).定向-----》Direct
(3).通配符-----》Topic
步骤:
订阅模式与其他模式的区别?
生产者:
在第五步的时候:
创建交换机:
channel.exchangeDeclare("03-pubsub1","fanout");
/**
exchangeDeclare参数:
1.交换机名称
2.交换机类型
*/
在发送消息的时候:
给交换机指定名称;
消费者:
前五步与上述生产者的前五步一样,
第六步,创建队列。
//1.创建队列
String queue = channel.queueDeclare().getQueue();
第七步,绑定队列与交换机。
channel.queueBind(queue,"03-pubsub1","");
queueBind参数:
1.队列名称。
2.交换机名称。
3.路由 KEY 和生产者发消息的第二个参数相同。
或者:
//第六步,创建队列并设置属性。
channel.queueDeclare("队列名称",true,false,false,null);
//第七步,绑定队列与交换机。
channel.queueBind("队列名称","03-pubsub1","");
两者都是为了创建队列并获取队列名称
生产者:
package cn.js.pubsub3;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.MessageProperties;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 13:38
* @Version: 1.0
* @introduce:
*/
public class pubsubTest {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.创建设置交换机
channel.exchangeDeclare("03-pubsub1","fanout");
//6.创建消息
String xiaoxi="这是生产者生成的消息";
//7.发送消息
channel.basicPublish("03-pubsub1","", MessageProperties.PERSISTENT_TEXT_PLAIN,xiaoxi.getBytes());
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
System.out.println("消息发送完毕");
}
}
消费者1:
package cn.js.pubsub3;
import com.rabbitmq.client.*;
import java.io.IOException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 21:37
* @Version: 1.0
* @introduce:
*/
public class consum1 {
public static void main(String[] args) throws Exception {
//1 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2 设置 rabbititmq ip 地址
connectionFactory.setHost("localhost");
//3 创建 Conection 对象
Connection connection = connectionFactory.newConnection();
//4 创建 Chanel
Channel channel = connection.createChannel();
//5 创建交换机设置交换机属性
channel.exchangeDeclare("03-pubsub1","fanout");
//6.创建队列设置队列属性
String queue = channel.queueDeclare().getQueue();
//7.绑定队列与交换机
channel.queueBind(queue,"03-pubsub1","");
//8.使用 chanel 去 rabbitmq 中去取消息进行消费
/**
* 第一个参数:队列名称
* 第二个参数:是否自动签收
*/
channel.basicConsume(queue, true, new DeliverCallback() {
/**
* 当消息从 mq 中取出来了会回调这个方法
* 消费者消费消息就在这个 handle中去进行处理
*/
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者 1 消息内容为:" + new String(message.getBody()));
}
}, new CancelCallback() {
/**
* 当消息取消了会回调这个方法
* @param consumerTag
* @throws IOException
*/
public void handle(String consumerTag) throws IOException {
System.out.println("1111");
}
});
}
}
消费者2:
package cn.js.pubsub3;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-03 21:37
* @Version: 1.0
* @introduce:
*/
public class consum2 {
public static void main(String[] args) throws Exception {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置交换机
channel.exchangeDeclare("03-pubsub1","fanout");
String queue = channel.queueDeclare().getQueue();
//6.绑定交换机与队列
channel.queueBind(channel.queueDeclare().getQueue(), "QUEUE_propub", "");
channel.queueBind(queue,"03-pubsub1",""); //7.监听消息,消费消息 改为不是自动签收
channel.basicConsume(queue, true, new DeliverCallback() {
@Override
public void handle(String s, Delivery delivery) throws IOException {
System.out.println("消费者2:接收到消息:" + new String(delivery.getBody()));
}
}, new CancelCallback() {
@Override
public void handle(String s) throws IOException {
System.out.println("11111");
}
});
}
}
(4)、路由模式:
注意:
队列和交换机的绑定已经是不能任意绑定了。而是指定一个路由key只有队列的key和消息的key一致才会接收到消息
交换机类型设置为Direct
步骤和订阅模式相同。:
1.只是在生产者发送消息时指定一个key。
2.消费者绑定队列的时候也配置一个key,两者一样。
3.当两者相同时,消费者才能拿到消息进行消费。
生产者:
package cn.js.Routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 20:12
* @Version: 1.0
* @introduce:
*/
public class RoutingTest {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置交换机
channel.exchangeDeclare("04-Routing", BuiltinExchangeType.DIRECT);
//6.创建消息
String xiaoxi="这是Routing生产者生成的消息";
//7.发送消息
channel.basicPublish("04-Routing","info",null,xiaoxi.getBytes());
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
System.out.println("消息发送完毕");
}
}
消费者1:
package cn.js.Routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 20:15
* @Version: 1.0
* @introduce:
*/
public class Routingconsum01 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2 设置 rabbititmq ip 地址
connectionFactory.setHost("localhost");
//3 创建 Conection 对象
Connection connection = connectionFactory.newConnection();
//4 创建 Chanel
Channel channel = connection.createChannel();
//5 创建交换机
channel.exchangeDeclare("04-Routing",BuiltinExchangeType.DIRECT);
//6.创建队列设置队列属性
String queue = channel.queueDeclare().getQueue();
//7.绑定队列与交换机
channel.queueBind(queue, "04-Routing", "info");
channel.queueBind(queue, "04-Routing", "error");
channel.queueBind(queue, "04-Routing", "warning");
//8使用 chanel 去 rabbitmq 中去取消息进行消费
/**
* 第一个参数:队列名称
* 第二个参数:是否自动签收
*/
channel.basicConsume(queue, true, new DeliverCallback() {
/**
* 当消息从 mq 中取出来了会回调这个方法
* 消费者消费消息就在这个 handle中去进行处理
*/
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者 1 消息内容为:" + new String(message.getBody()));
}
}, new CancelCallback() {
/**
* 当消息取消了会回调这个方法
* @param consumerTag
* @throws IOException
*/
public void handle(String consumerTag) throws IOException {
System.out.println("1111");
}
});
}
}
消费者2:
package cn.js.Routing;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 20:15
* @Version: 1.0
* @introduce:
*/
public class Routingconsum02 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2 设置 rabbititmq ip 地址
connectionFactory.setHost("localhost");
//3 创建 Conection 对象
Connection connection = connectionFactory.newConnection();
//4 创建 Chanel
Channel channel = connection.createChannel();
//5 创建交换机
channel.exchangeDeclare("04-Routing",BuiltinExchangeType.DIRECT);
//6.创建队列设置队列属性
String queue = channel.queueDeclare().getQueue();
//7.绑定队列与交换机
channel.queueBind(queue, "04-Routing", "error");
channel.queueBind(queue, "04-Routing", "warning");
//8.使用 chanel 去 rabbitmq 中去取消息进行消费
/**
* 第一个参数:队列名称
* 第二个参数:是否自动签收
*/
channel.basicConsume(queue, true, new DeliverCallback() {
/**
* 当消息从 mq 中取出来了会回调这个方法
* 消费者消费消息就在这个 handle中去进行处理
*/
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者 2 消息内容为:" + new String(message.getBody()));
}
}, new CancelCallback() {
/**
* 当消息取消了会回调这个方法
* @param consumerTag
* @throws IOException
*/
public void handle(String consumerTag) throws IOException {
System.out.println("1111");
}
});
}
}
(4)、通配符模式:
步骤:
1、生产者和路由模式生产者相同,定义一个路由key 交换机类型设置为TOPic 如 emp.save
2、在消费者上绑定队列时,可以使用通配符 如 emp.*
生产者:
package cn.js.Topic;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 20:42
* @Version: 1.0
* @introduce:
*/
public class TopicTest {
public static void main(String[] args) throws IOException, TimeoutException {
//1.创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//设置RabbimtMQ 的IP地址
connectionFactory.setHost("localhost");
//3.通过连接工厂创建连接对象
Connection connection = connectionFactory.newConnection();
//4.通过连接对象创建连接通道
Channel channel = connection.createChannel();
//5.设置交换机
channel.exchangeDeclare("05-Topic", BuiltinExchangeType.TOPIC);
//6.创建消息
String xiaoxi1="这是Routing生产者生成的消息order1.save";
String xiaoxi2="这是Routing生产者生成的消息order1.delete";
String xiaoxi3="这是Routing生产者生成的消息order1.insert";
//7.发送消息
channel.basicPublish("05-Topic","order1.save",null,xiaoxi1.getBytes());
channel.basicPublish("05-Topic","order1.delete",null,xiaoxi2.getBytes());
channel.basicPublish("05-Topic","order1.insert",null,xiaoxi3.getBytes());
//8.关闭资源
//1.关闭通道
channel.close();
//2.关闭连接对象
connection.close();
System.out.println("消息发送完毕");
}
}
消费者:
package cn.js.Topic;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
/**
* @BelongsProject: RabbitMQ
* @Author: com.js
* @CreateTime: 2023-03-04 20:45
* @Version: 1.0
* @introduce:
*/
public class Topicconsum01 {
public static void main(String[] args) throws IOException, TimeoutException {
//1 创建一个连接工厂
ConnectionFactory connectionFactory = new ConnectionFactory();
//2 设置 rabbititmq ip 地址
connectionFactory.setHost("localhost");
//3 创建 Conection 对象
Connection connection = connectionFactory.newConnection();
//4 创建 Chanel
Channel channel = connection.createChannel();
//5 创建交换机
channel.exchangeDeclare("05-Topic",BuiltinExchangeType.TOPIC);
//6.创建队列设置队列属性
String queue = channel.queueDeclare().getQueue();
//7.绑定队列与交换机
channel.queueBind(queue, "05-Topic", "order1.*");
//8使用 chanel 去 rabbitmq 中去取消息进行消费
/**
* 第一个参数:队列名称
* 第二个参数:是否自动签收
*/
channel.basicConsume(queue, true, new DeliverCallback() {
/**
* 当消息从 mq 中取出来了会回调这个方法
* 消费者消费消息就在这个 handle中去进行处理
*/
public void handle(String consumerTag, Delivery message) throws IOException {
System.out.println("消费者 1 消息内容为:" + new String(message.getBody()));
}
}, new CancelCallback() {
/**
* 当消息取消了会回调这个方法
* @param consumerTag
* @throws IOException
*/
public void handle(String consumerTag) throws IOException {
System.out.println("1111");
}
});
}
}
七、SpringBoot整合MQ
1、步骤(简单模式):
1.导入依赖,配置yml文件
2.生产者:
SpringBoot已经将MQ封装好了:
直接使用:
@Autowired
private RabbitTemPlate rabbitTemPlate;
发送消息:
rabbitTemPlate.convertAndsend();
convertAndsend中参数:
1.交换机名称
2.队列名称
3.消息内容
3.消费者:
定义一个消费者类,多个消费者就定义多个消费者类
@Component
public class QueueListener {
@RabbitListener(queuesToDeclare = @Queue("boot_queue"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel){
System.out.println("收到消息:"+msg);
}
}
生产者:
package cn.js.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 21:16
* @Version: 1.0
* @introduce:
*/
@RestController
@RequestMapping("/get")
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/hello/{msg}")
public String HelloTest(@PathVariable("msg") String msg){
System.out.println(msg);
rabbitTemplate.convertAndSend("","hello",msg);
return "发送成功";
}
}
消费者:
package cn.js.compert;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:01
* @Version: 1.0
* @introduce:
*/
@Component
public class Consumer {
@RabbitListener(queuesToDeclare = @Queue("hello"))
public void consum(String msg){
System.out.println("消费者收到消息:"+msg);
}
}
2、集成工作模式:
步骤:
1.生产者和简单模式一样不用东动
2.消费者中配置yml文件:
(1).签收模式 (手动签收)
(2).轮巡分配
spring.rabbitmq.listener.simple.acknowledge-mode=manual
spring.rabbitmq.listener.simple.prefetch=1
3.在消费者类中方法上面的参数中添加
@Component
public class Worker {
@RabbitListener(queuesToDeclare = @Queue("boot_worker"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("工作者1:"+msg);
channel.basicAck(deliveryTag,false);
}
}
生产者:
package cn.js.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 21:16
* @Version: 1.0
* @introduce:
*/
@RestController
@RequestMapping("/get")
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/work/{msg}")
public String workTest(@PathVariable("msg") String msg){
System.out.println(msg);
for(int i= 0;i<20;i++){
rabbitTemplate.convertAndSend("","boot_worker","msg:"+i);
}
return "发送成功";
}
}
消费者1
package cn.js.compert.work;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:23
* @Version: 1.0
* @introduce:
*/
@Component
public class workconsum1 {
@RabbitListener(queuesToDeclare = @Queue("boot_worker"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("工作者1:"+msg);
channel.basicAck(deliveryTag,false);
}
}
消费者2:
package cn.js.compert.work;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:23
* @Version: 1.0
* @introduce:
*/
@Component
public class workconsum {
@RabbitListener(queuesToDeclare = @Queue("boot_worker"))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("工作者2:"+msg);
channel.basicAck(deliveryTag,false);
}
}
3、集成订阅模式:
步骤:
1.生产者:
指定交换机:定义交换机名称
注意此模式生产者没有队列
2.消费者:
在消费者方法上: //value:队列名称
@RabbitListener(bindings = @QueueBinding(value = @Queue,exchange = @Exchange(name = "boot_pubsub",type = "fanout")))
加上:exchange = @Exchange(name = "boot_pubsub",type = "fanout")//绑定定交换机,以及交换机类型 广播模式
生产者:
package cn.js.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 21:16
* @Version: 1.0
* @introduce:
*/
@RestController
@RequestMapping("/get")
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/PubSub/{msg}")
public String PubSub(@PathVariable("msg") String msg){
System.out.println(msg);
rabbitTemplate.convertAndSend("PubSub","",msg);
return "发送成功";
}
}
消费者1:
package cn.js.compert.PubSub;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:38
* @Version: 1.0
* @introduce:
*/
@Component
public class PubSubconsum1 {
@RabbitListener(bindings =
@QueueBinding(value = @Queue,exchange = @Exchange(name = "PubSub",type = "fanout")))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
System.out.println("收到消息1:"+msg);
channel.basicAck(deliveryTag,false);
}
}
消费者2:
package cn.js.compert.PubSub;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:38
* @Version: 1.0
* @introduce:
*/
@Component
public class PubSubconsum2 {
@RabbitListener(bindings =
@QueueBinding(value = @Queue,exchange = @Exchange(name = "PubSub",type = "fanout")))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG)long deliveryTag, Channel channel) throws Exception {
System.out.println("收到消息2:"+msg);
channel.basicAck(deliveryTag,false);
}
}
4、集成路由模式
步骤:
1.生产者:
1.生产者指定路由Key
2.此模式生产者没有队列,交换机类型设置为Direct
2.消费者:
在消费者方法的注解上指定路由Key要与生产者路由Key相同,也可以配置多个路由key
@RabbitListener(bindings = @QueueBinding(value = @Queue(name = "boot_rounting_queue01"),
exchange = @Exchange(name = "boot_rounting_exchange",type = "direct"),
key = {"error","info"}//-------》配置多个路由key
))
生产者:
package cn.js.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 21:16
* @Version: 1.0
* @introduce:
*/
@RestController
@RequestMapping("/get")
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/Routing/{msg}/{key}")
public String Routing(@PathVariable("msg") String msg,@PathVariable("key") String key){
System.out.println(msg);
System.out.println(key);
rabbitTemplate.convertAndSend("Routing",key,msg);
return "发送成功";
}
}
消费者1:
package cn.js.compert.Routing;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:56
* @Version: 1.0
* @introduce:
*/
@Component
public class Routingconsum1 {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue,
exchange =@Exchange(value = "Routing",type = "direct"),
key = {"info","error"}))
public void consumer(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("消费者 1-消息内容为:" + msg);
channel.basicAck(deliveryTag,true);
}
}
消费者2:
package cn.js.compert.Routing;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:56
* @Version: 1.0
* @introduce:
*/
@Component
public class Routingconsum2 {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue,
exchange =@Exchange(value = "Routing",type = "direct"),
key = {"info","error"}))
public void consumer(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("消费者 2-消息内容为:" + msg);
channel.basicAck(deliveryTag,true);
}
}
5、集成通配符模式
步骤:
1.和路由模式一样,只是在消费者中绑定路由key,是以通配符的方式绑定;
2.交换机类型设置为Topic
生产者:
package cn.js.controller;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 21:16
* @Version: 1.0
* @introduce:
*/
@RestController
@RequestMapping("/get")
public class producerController {
@Autowired
private RabbitTemplate rabbitTemplate;
@GetMapping("/Topic/{msg}/{key}")
public String Topic(@PathVariable("msg") String msg,@PathVariable("key") String key){
System.out.println(msg);
System.out.println(key);
rabbitTemplate.convertAndSend("Topic",key,msg);
return "发送成功";
}
}
消费者1:
package cn.js.compert.Topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:23
* @Version: 1.0
* @introduce:
*/
@Component
public class Topicconsum1 {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue,
exchange =@Exchange(value = "Topic",type = "topic"),
key = {"emp.save"}))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Topic工作者2:"+msg);
channel.basicAck(deliveryTag,false);
}
}
消费者2:
package cn.js.compert.Topic;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @BelongsProject: SpringBootMQ
* @Author: com.js
* @CreateTime: 2023-03-04 22:23
* @Version: 1.0
* @introduce:
*/
@Component
public class Topicconsum2 {
@RabbitListener(bindings =
@QueueBinding(
value = @Queue,
exchange =@Exchange(value = "Topic",type ="topic"),
key = {"emp.save"}))
public void receiveMsg(String msg, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws IOException {
System.out.println("Topic工作者2:"+msg);
channel.basicAck(deliveryTag,false);
}
}
代码地址:
整合代码