MQ消息中间件
- 1、应用场景
- 1、流量削峰
- 2、应用解耦
- 3、异步处理
- 2、MQ分类
- 1、ActiveMQ
- 2、kafka
- 3、RocketMQ
- 4、RabbitMQ
- 3、RabbitMQ详解
- 3.1、核心概念
- 3.2 RabbitMQ基本知识点
- 3.3消息发布确认
- 3.4 交换机
1、应用场景
1、流量削峰
将访问的大流量通过消息队列做缓冲,我们可以取消服务器的QPS的最大瓶颈,将所有的请求都先存储在队列中,之后服务器再进行消费,避免了高峰期处理请求造成的损失。
2、应用解耦
将有前后关联业务的应用,如果后者服务宕机或者异常,这次的请求将是失败的。那么前者可以通过将请求的数据缓存到队列中,不影响前者服务的正常使用,当后者服务正常后对数据进行消费。保证了系统的可用性。
3、异步处理
在接口调用的时候,如果某一个接口数据处理特别慢,这将影响了前者不可以操作其他任何事情。那么将请求的数据存在消息队列中,等数据处理结束后,消息队列回调前者,将数据返回给前者。
2、MQ分类
1、ActiveMQ
2、kafka
该中间件是为大数据而生,处理百万级TPS的吞吐量,在数据采集、传输、存储的过程中发挥着重要作用。大多数应用于大量数据收集业务。
优点: 性能好,吞吐量高;时效性可用性非常高,是分布式部署,一个数据多个副本,保证了数据的安全性;消费者采用pull方式获取消息,保证了消息的有序性,消费不重复;有对应的web管理界面,在日志采集、实时计算领域比较成熟。
缺点: kafka单机超过64个队列/分区,Load会明显升高,队列越多,laod越高,发送消息响应时间变长;使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;当一台代理服务器宕机后,会产生消息乱序。社区更新慢
3、RocketMQ
来源于阿里巴巴,用Java语言实现参考了kafka的设计模式。在订单、流计算、消息推送、日志流式处理、binlog分发等场景。大多数应用于金融互联网领域。
优点: 单机吞吐量十万级别 ;分布式架构,消息0丢失,分布式架构,扩展性高;
缺点: 支持的客户端语言较少,仅有java及c++,C++还不成熟
4、RabbitMQ
该中间件是接受、存储、转发消息数据的一个中间件。应用于中小型公司的服务业务。
优点:因为erlang语言的高并发特性好,所以性能较好;万级别吞吐量,MQ功能比较完备,健壮,易用性,可跨平台,支持多种语言;有对应的web管理界面;支持AJAX稳定齐全;社区更新较快。
缺点: 学习成本高,商业版需要收费。
3、RabbitMQ详解
3.1、核心概念
MQ消息主要包括:生产者(producer)、交换机(exchage)、队列(queue)、消费者(consumer);
**六大模式:**简单模式,工作模式、发布订阅模式、路由模式、发布确认模式。
中间件内部(MQServer/messageBroker)主要包含两部分:交换机与队列。
交换机和队列是一对多关系,队列和消费者是一对一关系。
broker:接受和分发消息的应用。
Virtual host:出于多租户和安全因素设计,把一个AMQP的基本组件划分到一个虚拟分组中,类似namespace;当不同的用户使用MQServer时,每个用户中可以有划分个Vhost,用户可以在自己的Vhost中创建自己的Exchage\queue等。
connection:生产者/消费者与MQServer创建的一次TCP连接。
channel:在connnection中产生的信道,逻辑上的连接,主要为了减少创建connection的巨大开销。信道是完全隔离的,在每个信道中都会标识信道ID和与MQServer中识别信道的标识。如果是多线程进行,那么也是每个线程创建自己的channel。
exchange:broker接受到消息后,先通过exchange进行路由寻找,根据发放规则进行推送。规则包含:direct(点对点)、主题(发布订阅)、fanout()。
binding:是交换机与队列之间的对应虚拟关系,保存的在exchage的路由key表中,用于对消息的分发。
3.2 RabbitMQ基本知识点
消息队列首先生产者和消费者分别需要建立连接Connection,然后分别建立信道。所以一般会建立一个公共类,进行共享一些配置,比如连接、队列主题,交换机名称,路由匹配键名称等等。
设置一个工具类进行处理这些共性问题。
public class RabbitMQUtils {
private static ConnectionFactory factory;
//静态代码块:类加载时执行一次
static {
factory = new ConnectionFactory();
factory.setHost("192.168.77.138");
factory.setPort(5672);
factory.setUsername("admin");
factory.setPassword("123");
}
//获取连接对象
public static Connection getConnection() {
try {
return factory.newConnection();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
return null;
}
//关闭通道连接和连接对象
public static void closeConnection(Channel channel, Connection conn) {
if (channel != null) {
try {
channel.close();
} catch (IOException e) {
e.printStackTrace();
} catch (TimeoutException e) {
e.printStackTrace();
}
}
if (conn != null) {
try {
conn.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
生产者创建信道,发送消息。
public class Producer {
public void pro() throws IOException, TimeoutException {
// 获取连接
Connection connection = RabbitMQUtils.getConnection();
//获取连接中通道
Channel channel = connection.createChannel();
//通道绑定对应消息队列的声明
channel.queueDeclare("hello", true, false, false, null);
//发布消息
channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "message".getBytes(“UTF-8”));
//关闭连接===>使用工具类
RabbitMQUtils.closeConnection(channel, connection);
}
对于队列声明(queueDeclare)的参数的说明
参数位置 | 参数名 | 描述 | 默认值 |
---|---|---|---|
1 | queue | 队列名称 | 必填项 |
2 | durable | 用来定义队列消息是否要持久化, true 持久化队列 , false 不持久化 | true |
3 | exclusive | 是否独占队列 : true 独占队列 , 默认 false 不独占 | false |
4 | autoDelete | 是否自动删除,最后一个消费者消费完毕后是否断开连接 | false |
5 | queueName | 额外附加参数 | null |
对于发布消息(basicPublish)的参数说明
参数位置 | 参数名 | 描述 | 默认值 |
---|---|---|---|
1 | exchange | 发送给哪个交换机 | “” |
2 | routeKey | l路由的key值,也就是队列名称 | |
3 | props | 其他参数,传递息额外的设置 | MessageProperties.PERSISTENT_TEXT_PLAIN:代表持久化消息 |
4 | body.type | 消息体字节 | byte[] |
消费者代码构建
public class Consumer {
public static void main(String[] args) throws IOException, TimeoutException {
//获取连接对象
Connection connection = RabbitMQUtils.getConnection();
//创建通道
Channel channel = connection.createChannel();
//通道绑定队列:与生产端一致
channel.queueDeclare("hello", true, false, false, null);
// 声明接收消息
DeliverCallback deliverCallback = (consumerTag,message) ->{
System.out.println("取出消息:===>" + new String(message.getBody()));
// 这儿可以对消息进行手动应答消费
/**
* 适用于手动应答情况下
* 第一个参数表示消息标记
* 第二参数表示 是否批量响应未应答的消息
*/
// channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
}
// 声明取消消息时回调
CancelCallback cancleCallback = (consumerTag) ->{
System.out.println("消息回调");
}
// 消费者消费消息
channel.basicConsume("hello", true,deliverCallback,cancleCallback);
}
}
消费者消费消息(basicConsume)参数说明
参数位置 | 参数名 | 描述 | 默认值 |
---|---|---|---|
1 | queue | 队列名称 | |
2 | autoAck | 是否自动应答,true 自动应答,false 手动应答 | 一般是手动应答 |
3 | deliverCallback | 消费消息 | |
4 | cancleCallback | 取消消费消息回调方法 |
当多个线程同时进行消费队列消息时,默认采用的方式是轮询的方式。也可以通过设置参数更改为不公平分发和设计预取值的方式进行预订消费能力。
3.3消息发布确认
消息确认方式有三类:单次确认、批量确认、异步确认三种。
在生产者信道发布的时候声明需要确认,并且通过回调值进行确认。
// 消息发布确认
channel.confirmSelect();
// 每次的发布的情况,返回boolean值
Boolean flag = channel.waitForConfirms();
/**
*异步确认,
*异步确认不需要等待发布情况,broker会对执行情况通知生产者
*生产者需要在发布之前通过监听器对之后的发送情况进行监听
*/
// 创建回调函数监听器
//成功发布监听器
ConfirmCallback ackCallback = (deliveryTag,multipe) -> {
System.out.println("成功发布的消息:===>" + deliveryTag);
};
//失败发布监听器
ConfirmCallback nackCallback = (deliveryTag,multipe) -> {
System.out.println("失败发布的消息:===>" + deliveryTag);
};
// 第一个参数为成功,第二个为失败
channel.addConfirmListener(ackCallback,nackCallback)
思考:如何解决异步未确认的消息?
解决方案就是把未确认的消息放到一个 基于内存的、能被发布线程访问的队列。比如,用ConcurrentLinkedQueue 、ConcurrentSkipListMap这个队列在confirm callbacks与发布线程之间进行消息传递。