你像天外来物一样,求之不得(咳咳,指offer)🌹
文章目录
- 什么是MQ?
- MQ的优势与劣势
- 使用MQ需要满足的条件
- 常见的MQ产品
- 关于RabbitMQ
- 生产者
- 消费者
- 工作模式
- 订阅模式
- 路由模式
- 通配符模式
什么是MQ?
Message Queue
消息队列,是在消息的传输过程中保存消息的容器,多用于分布式系统之间通信。
MQ的优势与劣势
优势:
应用解耦
:比如一个订单系统需要和库存系统、支付系统等配合起来才能完成支付操作,如果库存系统挂了,那么订单系统也不能正常工作了;但是引入MQ之后,即使库存系统发生故障,只需要将订单系统要求库存系统执行的操作保存到MQ中就可以了,不会影响到订单系统。
异步提速
:如果两个系统直接耦合,那么所执行的操作都是同步的,如果涉及到多个系统,那么同步就会耗费较多的时间,加入MQ之后,可以实现消息的异步发送,达到一个提速的作用,给客户一个较好的体验。
削峰填谷
:在用户请求量很大的情况下,一个系统所容纳的请求量又是有限的,在这种情况下,可以引入MQ,MQ的作用就是承载用户请求,起到一个缓冲的作用。
劣势:
系统可用性降低
:系统引入的外部依赖越多,系统的稳定性就越差。引入MQ之后需要保证MQ的高可用。
系统复杂度提高
:需要保证消息传递的顺序性以及消息不会被重复消费。
一致性问题
:通过MQ给多个系统发送消息,有的系统处理数据成功,而有的处理数据失败,如何保证数据的一致性问题。
使用MQ需要满足的条件
生产者不需要从消费者获得反馈; 容许短暂的不一致; 解耦、提速等方面的收益要超过加入、管理MQ的成本
常见的MQ产品
关于RabbitMQ
基础架构:
六种工作模式:简单模式、workqueues、发布订阅模式、路由模式、主题模式、远程调用模式
生产者
package com.thorn.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare("hello_world",true,false,false,null);
String body = "hello rabbitmq";
// 发送消息
channel.basicPublish("","hello_world",null,body.getBytes());
// 释放资源
channel.close();
connection.close();
}
}
运行上述程序之后,就能看到消息队列了。
消费者
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_HelloWorld {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare("hello_world",true,false,false,null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println(consumerTag );
System.out.println(envelope.getRoutingKey());
System.out.println(envelope.getExchange());
System.out.println(properties);
System.out.println(new String(body));
}
};
channel.basicConsume("hello_world",true,consumer);
}
}
执行结果,拿到消息队列中的消息了。
这是简单模式下的。
工作模式
工作队列模式:
多个消费者共同监听一个生产者,主要适用于任务较多的情况。
生产者(一口气生产十条消息):
package com.thorn.producer;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_WorkQueues {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare("work_queues",true,false,false,null);
for (int i = 0; i < 10; i++) {
String body = i + " hello work_queues~~~";
// 发送消息
channel.basicPublish("","work_queues",null,body.getBytes());
}
// 释放资源
channel.close();
connection.close();
}
}
消费者(需要创建两个):
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_WorkQueues1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建队列
channel.queueDeclare("work_queues",true,false,false,null);
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("body:"+new String(body));
}
};
channel.basicConsume("work_queues",true,consumer);
}
}
第一个消费者:
第二个消费者:
可以看到,一共生产了十条消息,每个消费者消费五条消息。
订阅模式
引入了交换机,交换机去创建两个队列。
生产者:
package com.thorn.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Pubhub {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "test_fanout";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT,true,false,false,null);
// 创建队列
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"");
channel.queueBind(queue2Name,exchangeName,"");
// 发送消息
String body = "日志信息:张三调用方法findAll,日志级别为info";
channel.basicPublish(exchangeName,"",null,body.getBytes());
// 释放资源
channel.close();
connection.close();
}
}
消费者:
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台:");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_PubSub2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_fanout_queue1";
String queue2Name = "test_fanout_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息保存到数据库:");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
路由模式
生产者:
package com.thorn.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Routing {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "test_direct";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.DIRECT,true,false,false,null);
// 创建队列
String queue1Name = "test_direct_queue1";
String queue2Name = "test_direct_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
// 队列1绑定
channel.queueBind(queue1Name,exchangeName,"error");
// 队列2绑定
channel.queueBind(queue2Name,exchangeName,"info");
channel.queueBind(queue2Name,exchangeName,"error");
channel.queueBind(queue2Name,exchangeName,"warning");
// 发送消息
String body = "日志信息:张三调用方法findAll,日志级别为info";
channel.basicPublish(exchangeName,"info",null,body.getBytes());
// 释放资源
channel.close();
connection.close();
}
}
将error级别的信息放到队列1,将error、info、warning级别的信息放到队列2.
消费者1:
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue2Name = "test_direct_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台:");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
消费者2:
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Routing2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_direct_queue1";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息保存到数据库:");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
可以看到消费者1可以收到消息,而消费者2收不到消息
通配符模式
生产者:
package com.thorn.producer;
import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Producer_Topics {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
// 创建交换机
String exchangeName = "test_topic";
channel.exchangeDeclare(exchangeName, BuiltinExchangeType.TOPIC,true,false,false,null);
// 创建队列
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
channel.queueDeclare(queue1Name,true,false,false,null);
channel.queueDeclare(queue2Name,true,false,false,null);
// 绑定队列和交换机
channel.queueBind(queue1Name,exchangeName,"#.error");
channel.queueBind(queue1Name,exchangeName,"order.*");
// 队列2打印所有信息
channel.queueBind(queue2Name,exchangeName,"*.*");
// 发送消息
String body = "日志信息:张三调用方法findAll,日志级别为info";
channel.basicPublish(exchangeName,"order.info",null,body.getBytes());
// 释放资源
channel.close();
connection.close();
}
}
消费者1:
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topics1 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息存入到数据库");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue1Name,true,consumer);
}
}
消费者2:
package com.thorn.consumer;
import com.rabbitmq.client.*;
import java.io.IOException;
import java.util.concurrent.TimeoutException;
public class Consumer_Topics2 {
public static void main(String[] args) throws IOException, TimeoutException {
// 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 设置参数
factory.setVirtualHost("/thorns");
factory.setUsername("lwj");
factory.setPassword("lwj");
// 创建链接
Connection connection = factory.newConnection();
// 创建channel
Channel channel = connection.createChannel();
String queue1Name = "test_topic_queue1";
String queue2Name = "test_topic_queue2";
// 接收消息
Consumer consumer = new DefaultConsumer(channel){
// 回调方法,当收到消息后,会自动执行该方法
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
System.out.println("将日志信息打印到控制台");
System.out.println("body:"+new String(body));
}
};
channel.basicConsume(queue2Name,true,consumer);
}
}
消费者1和消费者2都能收到消息:
恭喜你又学会了一项技术!
如果觉得有帮助的小伙伴点个赞吧~感谢收看!