MQ消息中间件

news2025/1/23 10:44:05

MQ消息中间件

    • 1、应用场景
      • 1、流量削峰
      • 2、应用解耦
      • 3、异步处理
    • 2、MQ分类
      • 1、ActiveMQ
      • 2、kafka
      • 3、RocketMQ
      • 4、RabbitMQ
    • 3、RabbitMQ详解
      • 3.1、核心概念
      • 3.2 RabbitMQ基本知识点
      • 3.3消息发布确认
      • 3.4 交换机

1、应用场景

1、流量削峰

将访问的大流量通过消息队列做缓冲,我们可以取消服务器的QPS的最大瓶颈,将所有的请求都先存储在队列中,之后服务器再进行消费,避免了高峰期处理请求造成的损失。

2、应用解耦

将有前后关联业务的应用,如果后者服务宕机或者异常,这次的请求将是失败的。那么前者可以通过将请求的数据缓存到队列中,不影响前者服务的正常使用,当后者服务正常后对数据进行消费。保证了系统的可用性。

3、异步处理

在接口调用的时候,如果某一个接口数据处理特别慢,这将影响了前者不可以操作其他任何事情。那么将请求的数据存在消息队列中,等数据处理结束后,消息队列回调前者,将数据返回给前者。

2、MQ分类

1、ActiveMQ

2、kafka

该中间件是为大数据而生,处理百万级TPS的吞吐量,在数据采集、传输、存储的过程中发挥着重要作用。大多数应用于大量数据收集业务。
优点: 性能好,吞吐量高;时效性可用性非常高,是分布式部署,一个数据多个副本,保证了数据的安全性;消费者采用pull方式获取消息,保证了消息的有序性,消费不重复;有对应的web管理界面,在日志采集、实时计算领域比较成熟。
缺点: kafka单机超过64个队列/分区,Load会明显升高,队列越多,laod越高,发送消息响应时间变长;使用短轮询方式,实时性取决于轮询间隔时间,消费失败不支持重试;当一台代理服务器宕机后,会产生消息乱序。社区更新慢

3、RocketMQ

来源于阿里巴巴,用Java语言实现参考了kafka的设计模式。在订单、流计算、消息推送、日志流式处理、binlog分发等场景。大多数应用于金融互联网领域。
优点: 单机吞吐量十万级别 ;分布式架构,消息0丢失,分布式架构,扩展性高;
缺点: 支持的客户端语言较少,仅有java及c++,C++还不成熟

4、RabbitMQ

该中间件是接受、存储、转发消息数据的一个中间件。应用于中小型公司的服务业务。
优点:因为erlang语言的高并发特性好,所以性能较好;万级别吞吐量,MQ功能比较完备,健壮,易用性,可跨平台,支持多种语言;有对应的web管理界面;支持AJAX稳定齐全;社区更新较快。
缺点: 学习成本高,商业版需要收费。

3、RabbitMQ详解

3.1、核心概念

在这里插入图片描述

MQ消息主要包括:生产者(producer)、交换机(exchage)、队列(queue)、消费者(consumer);
**六大模式:**简单模式,工作模式、发布订阅模式、路由模式、发布确认模式。
中间件内部(MQServer/messageBroker)主要包含两部分:交换机与队列
交换机和队列是一对多关系,队列和消费者是一对一关系。
broker:接受和分发消息的应用。
Virtual host:出于多租户和安全因素设计,把一个AMQP的基本组件划分到一个虚拟分组中,类似namespace;当不同的用户使用MQServer时,每个用户中可以有划分个Vhost,用户可以在自己的Vhost中创建自己的Exchage\queue等。
connection:生产者/消费者与MQServer创建的一次TCP连接。
channel:在connnection中产生的信道,逻辑上的连接,主要为了减少创建connection的巨大开销。信道是完全隔离的,在每个信道中都会标识信道ID和与MQServer中识别信道的标识。如果是多线程进行,那么也是每个线程创建自己的channel。
exchange:broker接受到消息后,先通过exchange进行路由寻找,根据发放规则进行推送。规则包含:direct(点对点)、主题(发布订阅)、fanout()。
binding:是交换机与队列之间的对应虚拟关系,保存的在exchage的路由key表中,用于对消息的分发。

3.2 RabbitMQ基本知识点

消息队列首先生产者和消费者分别需要建立连接Connection,然后分别建立信道。所以一般会建立一个公共类,进行共享一些配置,比如连接、队列主题,交换机名称,路由匹配键名称等等。
设置一个工具类进行处理这些共性问题。

public class RabbitMQUtils {
    private static ConnectionFactory factory;

    //静态代码块:类加载时执行一次
    static {
        factory = new ConnectionFactory();
        factory.setHost("192.168.77.138");
        factory.setPort(5672);
        factory.setUsername("admin");
        factory.setPassword("123");
    }

    //获取连接对象
    public static Connection getConnection() {
        try {
            return factory.newConnection();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return null;
    }

    //关闭通道连接和连接对象
    public static void closeConnection(Channel channel, Connection conn) {
        if (channel != null) {
            try {
                channel.close();
            } catch (IOException e) {
                e.printStackTrace();
            } catch (TimeoutException e) {
                e.printStackTrace();
            }
        }
        if (conn != null) {
            try {
                conn.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}


生产者创建信道,发送消息。

	public class Producer {
    	public void pro() throws IOException, TimeoutException {
		// 获取连接
        Connection connection = RabbitMQUtils.getConnection();
        //获取连接中通道
        Channel channel = connection.createChannel();
        
        //通道绑定对应消息队列的声明
        channel.queueDeclare("hello", true, false, false, null);
        //发布消息
        channel.basicPublish("", "hello", MessageProperties.PERSISTENT_TEXT_PLAIN, "message".getBytes(“UTF-8”));
        //关闭连接===>使用工具类
        RabbitMQUtils.closeConnection(channel, connection);
}

对于队列声明(queueDeclare)的参数的说明

参数位置参数名描述默认值
1queue队列名称必填项
2durable用来定义队列消息是否要持久化, true 持久化队列 , false 不持久化true
3exclusive是否独占队列 : true 独占队列 , 默认 false 不独占false
4autoDelete是否自动删除,最后一个消费者消费完毕后是否断开连接false
5queueName额外附加参数null

对于发布消息(basicPublish)的参数说明

参数位置参数名描述默认值
1exchange发送给哪个交换机“”
2routeKeyl路由的key值,也就是队列名称
3props其他参数,传递息额外的设置MessageProperties.PERSISTENT_TEXT_PLAIN:代表持久化消息
4body.type消息体字节byte[]

消费者代码构建

public class Consumer {
    public static void main(String[] args) throws IOException, TimeoutException {

        //获取连接对象
        Connection connection = RabbitMQUtils.getConnection();
        //创建通道
        Channel channel = connection.createChannel();
        //通道绑定队列:与生产端一致
        channel.queueDeclare("hello", true, false, false, null);
		// 声明接收消息
		DeliverCallback deliverCallback = (consumerTag,message) ->{
                System.out.println("取出消息:===>" + new String(message.getBody()));
                // 这儿可以对消息进行手动应答消费
                /**
                * 适用于手动应答情况下
				* 第一个参数表示消息标记
				* 第二参数表示 是否批量响应未应答的消息
				*/
                // channel.basicAck(delivery.getEnvelope().getDeliveryTag(),false);
         }
        // 声明取消消息时回调
		CancelCallback cancleCallback = (consumerTag) ->{
                System.out.println("消息回调");
         }
         // 消费者消费消息
         channel.basicConsume("hello", true,deliverCallback,cancleCallback);

    }
}


消费者消费消息(basicConsume)参数说明

参数位置参数名描述默认值
1queue队列名称
2autoAck是否自动应答,true 自动应答,false 手动应答一般是手动应答
3deliverCallback消费消息
4cancleCallback取消消费消息回调方法

当多个线程同时进行消费队列消息时,默认采用的方式是轮询的方式。也可以通过设置参数更改为不公平分发和设计预取值的方式进行预订消费能力。

3.3消息发布确认

消息确认方式有三类:单次确认、批量确认、异步确认三种。
在生产者信道发布的时候声明需要确认,并且通过回调值进行确认。

// 消息发布确认
channel.confirmSelect();
// 每次的发布的情况,返回boolean值
Boolean flag = channel.waitForConfirms();

/**
*异步确认,
*异步确认不需要等待发布情况,broker会对执行情况通知生产者
*生产者需要在发布之前通过监听器对之后的发送情况进行监听
*/
// 创建回调函数监听器
//成功发布监听器
ConfirmCallback ackCallback = (deliveryTag,multipe) -> {
 System.out.println("成功发布的消息:===>" + deliveryTag);
};
//失败发布监听器
ConfirmCallback nackCallback = (deliveryTag,multipe) -> {
System.out.println("失败发布的消息:===>" + deliveryTag);
};
// 第一个参数为成功,第二个为失败
channel.addConfirmListener(ackCallback,nackCallback)

思考:如何解决异步未确认的消息?
解决方案就是把未确认的消息放到一个 基于内存的、能被发布线程访问的队列。比如,用ConcurrentLinkedQueue 、ConcurrentSkipListMap这个队列在confirm callbacks与发布线程之间进行消息传递。

3.4 交换机

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/87179.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

二叉树-二叉树的基础遍历(3)

二叉树的遍历的三种方式 1.前序遍历; 先访问根结点,然后再访问左子树,最后访问右子树 2.中序遍历; 先访问左子树,中间访问根节点,最后访问右子树 3.后序遍历; 先访问左子树,再访问右…

PicoRV32 笔记 05

接口信号 PicoRV32 提供一个本地存储器接口,Native Memory Interface。 本地存储器接口采用 valid-ready握手信号。这种机制在axi总线中使用相同,能够实现流控, 和axi总线不同点,PicoRV32本地接口使用一组valid-ready信号&…

什么是SWDM4和100G QSFP28 SWDM4光模块?

随着OM5多模光纤(MMF)的推广和40G或100G数据中心传输网络的大规模部署,SWDM技术逐渐进入人们的视野并开始得到应用。那么,什么是SWDM4呢?什么是100G SWDM4光模块?它们的优势是什么?跟着易天光通信ETU-LINK看下面的文字…

十、原型、原型链、闭包和立即执行函数、插件开发初始

十、原型、原型链、闭包和立即执行函数、插件开发初始 原型 什么是原型(prototype)? 无论何时,只要创建一个函数,就会按照特定的规则为这个函数创建一个prototype属性,指向原型对象。 function Car(){}…

Vue入门与指令

Vue入门 1.1、MVVM编程思想 MVVM:页面输入改变数据,数据改变影响页面数据展示与渲染。 M(model):普通的javascript数据对象。 V(view):前端展示页面。 VM(ViewModel&…

Jmeter初了解-接口并发测试

Jmeter初了解-接口并发测试 介绍 我们在开发的时候,经常会需要进行接口压力测试,确定接口运行的稳定情况 这里我们就使用java开发的测试工具Jmeter来进行测试。 Jmeter 官网地址 Apache JMeter™应用程序是开源软件,是一个 100% 纯 Jav…

Pytorch安装详细过程及遇到的问题解决

一、Aanconda的安装 可以参考笔者的这篇博客:Anaconda安装详细教程 二、准备工作 1、查看本机的python的版本(本机python解释器版本为3.8.5) 2、单击启动Anaconda Prompt创建新虚拟环境 3、在Anaconda Prompt依次执行以下命令,创建名字为pytorch的虚拟…

nn.Embedding使用

nn.Embedding是一种词嵌入的方式,跟one-hot相似但又不同,会生成低维稠密向量,但是初始是随机化的,需要根据模型训练时进行调节,若使用预训练词向量模型会比较好。 1. one-hot one-hot是给定每个单词一个索引&#xf…

概论_第4章__方差D(X)的定义和性质

一 定义 通常以此公式来计算: 就是说: 方差 X的平方再求期望 —— X的期望的平方 即 括号里面的平方的期望减去期望的平方, 怎样求期望点击:概论_第4章__期望的定义和性质 注意: 方差不可能为负数。 2. …

如何快速拥有自己的虚拟形象?

元宇宙(Metaverse),是人类运用数字技术构建的,由现实世界映射或超越现实世界,可与现实世界交互的虚拟世界,具备新型社会体系的数字生活空间。 可见元宇宙第一步是创建专属虚拟形象,但创建3D虚拟…

Android入门第45天-手工发送一个BroadCast

简介 上一篇我们讲了简单的动态BroadCast,今天我们通过手工来发送一条BroadCast进一步来了解BroadCast。 在上一篇里我们使用BroadCast监听网络状态,今天我们要完成的是自己发一条自自己的消息来触发BroadCast Receiver。 设计 为了让Receiver收听到…

蓝牙耳机无延迟哪款好?适合打游戏的无线蓝牙耳机

手机可以说是人手必备,随声得还有蓝牙耳机,随着3.5耳机孔得消失,蓝牙耳机可以说是现在得主流,无论哪个年龄段都可以佩戴蓝牙耳机,日常听歌、追剧,和朋友玩游戏佩戴蓝牙耳机,已经成为一种生活方式…

mybatis06:MyBatis的多表操作

目录 1.一对一关系 2.一对多查询 3.多对多查询 4例题演示 ​5.知识小结 1.一对一关系 2.一对多查询 3.多对多查询 4例题演示 前置准备 对应的依赖 <dependencies><!-- mysql驱动 --><dependency><groupId>mysql</groupId><artifactId&…

外汇交易:流行图表指标盘点

您所学到的关于交易的一切都像一种工具&#xff0c;已被添加到外汇交易者的工具箱中。当您在正确的时间使用正确的工具时&#xff0c;您的图表指标工具将为您提供更好的机会做出正确的交易决策。 布林带 布林带用来衡量市场的波动性。它们的作用类似于迷你支撑位和阻力位。 布…

MES系统为何与工厂数字化转型联系紧密

随着数字化技术的发展&#xff0c;MES系统的定义也是在不断的变化。但是&#xff0c;计划调度、质量管理、生产执行以及数据采集&#xff0c;一直都是MES的核心功能。 工厂数字化改造&#xff0c;对于制造业来说并不是一场革命。很多工厂在十年前就实现了车间设备的联网&#…

EXCEL基础:数据有效性设置与从身份证号码提取出生日期、性别操作

如下所示&#xff0c;为某公司的人员信息表&#xff0c;以下操作均是基于该表格&#xff0c;声明&#xff1a;该表格来自网络&#xff01; 下面进行【数据有效性】的设置&#xff1a; 先选中区域&#xff0c;弹出【数据有效性】对话框&#xff0c;在【设置】里的【允许】里输入…

win11设置java环境变量

python环境变量比java简单很多&#xff0c;而java比较麻烦&#xff0c;下面这些步骤应该是一步不能少&#xff0c;必须新建两个而且移动到最上面 一、找到设置环境变量 只要是windows系统&#xff0c;他就长这样&#xff0c;需要找到这个页面 很多之前的文章都会说&#xff1…

[附源码]Node.js计算机毕业设计电影院订票系统Express

项目运行 环境配置&#xff1a; Node.js最新版 Vscode Mysql5.7 HBuilderXNavicat11Vue。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分离等等。 环境需要 1.运行环境&#xff1a;最好是Nodejs最新版&#xff0c;我…

六、作用域,作用域链,预编译,闭包基础

六、作用域&#xff0c;作用域链&#xff0c;预编译&#xff0c;闭包基础 使用AO,GO说明作用域和作用域链 AO与函数有关&#xff0c;函数能创造出独立的空间&#xff0c;但是这句话不太对&#xff0c;接下来就是解释&#xff1a; 对象 每个对象都有属性和方法&#xff1a; …

MobileNetV2原理说明及实践落地

本文参考&#xff1a; 轻量级网络——MobileNetV2_Clichong的博客-CSDN博客_mobilenetv2 1、MobileNetV2介绍 MobileNetV1主要是提出了可分离卷积的概念&#xff0c;大大减少了模型的参数个数&#xff0c;从而缩小了计算量。但是在CenterNet算法中作为BackBone效果并不佳&…