2.11日学习打卡----初学RocketMQ(二)

news2025/1/25 1:43:20

2.11日学习打卡

一. RocketMQ整合springboot

首先配置pom.xml文件

       <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>annotationProcessor</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-spring-boot-starter</artifactId>
            <version>2.2.3</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <scope>test</scope>
        </dependency>

其次在配置application.yml文件

spring:
  application:
    name: RocketMq

rocketmq:
  name-server: 192.168.66.100:9876(自己的地址监听9876端口)
  producer:
    group: my-group

编写测试类

生产者

package com.jjy.rocketmq;

import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.junit.jupiter.api.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.junit4.SpringRunner;

//@RunWith(SpringRunner.class)
//@SpringBootTest(classes = {RocketMqApplication.class})
@SpringBootTest
class RocketMqApplicationTests {
    @Autowired
    private RocketMQTemplate rocketMQTemplate;
    @Test
    void testSendMessage() {
        //向broker发送消息
        //第一个参数是主题(当时自己创建的主题)
        //第二个参数是消息内容
        rocketMQTemplate.convertAndSend("topicWarning","hello,RocketMQ");
    }

}

消费者(新建一个springboot项目)

pom.xml文件同producer工程

application.yml配置

spring:
  application:
    name: RocketMQConsumer(自己的项目名字)

rocketmq:
  name-server: 192.168.66.100:9876

消息监听器

package com.jjy.rocketmqconumers.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

@Slf4j
@Component
@RocketMQMessageListener(topic="topicWarning",consumerGroup="myconsumer")
//consumerGroup(里面的参数随便写就行自己编个名字)
public class Consumer implements RocketMQListener<String> {


    @Override
    
    public void onMessage(String message) {
    //使用日志的形式写出来 也开始使用控制台
        log.info("Received messsge:" + message);
     //使用控制台
     System.out.println(message);
    }
}

在这里插入图片描述

二. RocketMQ架构

技术架构

在这里插入图片描述
RocketMQ架构上主要分为四部分,如上图所示:

  • Producer:消息发布的角色,支持分布式集群方式部署。
  • Consumer:消息消费的角色,支持分布式集群方式部署。支持以push推,pull拉两种模式消费消息。
  • NameServer:管理Broker代理服务器。
  • BrokerServer:RocketMQ的核心,负责消息的接收和转发。

部署架构

在这里插入图片描述
RocketMQ 网络部署特点:

  • NameServer是一个几乎无状态节点,可集群部署,节点之间无
    任何信息同步。
  • Broker部署相对复杂,Broker分为Master与Slave,一个Master可以对应多个Slave,但是一个Slave只能对应一个Master,Master与Slave 的对应关系通过指定相同的BrokerName,不同的BrokerId 来定义,BrokerId为0表示Master,非0表示Slave。Master也可以部署多个。每个Broker与NameServer集群中的所有节点建立长连接,定时注册Topic信息到所有NameServer。
  • Producer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic 服务的Master建立长连接,且定时向Master发送心跳。Producer完全无状态,可集群部署。
  • Consumer与NameServer集群中的其中一个节点(随机选择)建立长连接,定期从NameServer获取Topic路由信息,并向提供Topic服务的Master、Slave建立长连接,且定时向Master、Slave发送心跳。Consumer既可以从Master订阅消息,也可以从Slave订阅消息。
  • 结合部署架构图,集群工作流程可作如下描述:
    • 启动NameServer,通过监听端口,等待Broker、Producer、Consumer连上来,相当于一个路由控制中心。
    • Broker启动,跟所有的NameServer保持长连接,定时发送心跳包。心跳包中包含当前Broker信息(IP+端口等)以及存储所有Topic信息。注册成功后,NameServer集群中就有Topic跟Broker的映射关系。
    • 收发消息前,先创建Topic,创建Topic时需要指定该Topic要存储在哪些Broker上,也可以在发送消息时自动创建Topic。
    • Producer发送消息,启动时先跟NameServer集群中的其中一台建立长连接,并从NameServer中获取当前发送的Topic存在哪些Broker上,包含Topic中所有队列列表然后选择一个队列,与队列所在的Broker建立长连接再向Broker发消息。
    • Consumer跟Producer类似,跟其中一台NameServer建立长连接,获取当前订阅Topic存在哪些Broker上,然后直接跟Broker建立连接通道,开始消费消息。

三.RocketMQ高级特性

消息存储

在这里插入图片描述

  • 存储介质
    • 关系型数据库
      ActiveMQ(默认采用的KahaDB做消息存储)可选用JDBC的方式来做消息持久化,通过简单的xml配置信息即可实现JDBC消息存储。由于,普通关系型数据库(如Mysql)在单表数据量达到千万级别的情况下,其IO读写性能往往会出现瓶颈。在可靠性方面,该种方案非常依赖DB,如果一旦DB出现故障,则MQ的消息就无法落盘存储会导致线上故障。
    • 文件
      目前业界较为常用的几款产品(RocketMQ/Kafka/RabbitMQ)均采用的是消息刷盘至所部署虚拟机/物理机的文件系统来做持久化(刷盘一般可以分为异步刷盘和同步刷盘两种模式)。消息刷盘为消息存储提供了一种高效率、高可靠性和高性能的数据持久化方式。除非部署MQ机器本身或是本地磁盘损坏,否则一般是不会出现无法持久化的故障问题
      同步的可靠性高 异步的效率高
      同步:就是假如你是老板让员工区送邮件在员工送完邮件向你报告的期间一直属于等待状态;
      异步: 在等待报告期间回去做其他事情
    • 性能对比
      文件系统 > 关系型数据库DB

负载均衡

在这里插入图片描述
RocketMQ中的负载均衡都在Client端完成,具体来说的话,主要可以分为Producer端发送消息时候的负载均衡和Consumer端订阅消息的负载均衡

Producer的负载均衡

在这里插入图片描述
如图所示,5 个队列可以部署在一台机器上,也可以分别部署在 5 台不同的机器上,发送消息通过轮询队列的方式 发送,x每个队列接收平均的消息量。通过增加机器,可以水平扩展队列容量。 另外也可以自定义方式选择发往哪个队列。

Consumer的负载均衡
在这里插入图片描述
如图所示,如果有 5 个队列,2 个 consumer,那么第一个Consumer 消费 3 个队列,第二consumer 消费 2 个队列。 这样即可达到平均消费的目的,可以水平扩展 Consumer 来提高消费能力。但是 Consumer 数量要小于等于队列数 量,如果 Consumer超过队列数量,那么多余的Consumer 将不能消费消息 。

事务消息

在这里插入图片描述
Apache RocketMQ在4.3.0版中已经支持分布式事务消息,这里RocketMQ采用了2PC的思想来实现了提交事务消息,同时增加一个补偿逻辑来处理二阶段超时或者失败的消息,如下图所示。
在这里插入图片描述
事务消息发送步骤如下:

  • 生产者将半事务消息发送至消息队列RocketMQ服务端。

  • 消息队列RocketMQ服务端将消息持久化成功之后,向生产者返回Ack确认消息已经发送成功,此时消息为半事务消息。

  • 生产者开始执行本地事务逻辑。

  • 生产者根据本地事务执行结果向服务端提交二次确认结果(Commit或是Rollback),服务端收到确认结果后处理逻辑如下:

    • 二次确认结果为Commit:服务端将半事务消息标记为可投递,并投递给消费者。
    • 二次确认结果为Rollback:服务端不会将该消息投递给消费者,并按照如下逻辑进行回查处理。
  • 事务消息回查步骤如下:

    • 在断网或者是生产者应用重启的特殊情况下,上述步骤4提交的二次确认最终未到达服务端,经过固定时间后,服务端将对消息生产者即生产者集群中任一生产者实例发起消息回查。
    • 生产者收到消息回查后,需要检查对应消息的本地事务执行的最终结果。
    • 生产者根据检查得到的本地事务的最终状态再次提交二次确认,服务端仍按照步骤4对半事务消息进行处理。

顺序消息

消息有序指的是按照消息的发送顺序来消费(FIFO)。RocketMQ可以保证消息有序,消息有序分为部分有序和全局有序。全局有序是指某个Topic下的所有消息都要保证顺序;部分顺序消息只要保证每一组消息被顺序消费即可。

如果要实现全局顺序消息,那么只能使用一个队列,一个生产者,这会严重影响性能。
在这里插入图片描述

因此,我们常说的顺序消息通常指部分顺序消息。
在这里插入图片描述

顺序消费的原理解析,在默认的情况下消息发送会采取轮询方式把消息发送到不同的分区队列;而消费消息的时候从多个queue上拉取消息,这种情况发送和消费是不能保证顺序。但是如果控制发送的顺序消息只依次发送到同一个queue中,消费的时候只从这个queue上依次拉取,则就保证了顺序。当发送和消费参与的queue
只有一个,则是全局有序;如果多个queue参与,则为分区有序,即相对每queue,消息都是有序的。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1446134.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Java图形化界面编程——处理位图 笔记

2.8.3 处理位图 ​ 如果仅仅绘制一些简单的几何图形&#xff0c;程序的图形效果依然比较单调 。 AWT 也允许在组件上绘制位图&#xff0c; Graphics 提供了 drawlmage() 方法用于绘制位图&#xff0c;该方法需要一个Image参数一一代表位图&#xff0c;通过该方法就可 以绘制出…

苹果Mac键盘如何将 F1 到 F12 取消按Fn

苹果电脑安装了Win10操作系统之后&#xff0c;F1到F12用不了怎么办的解决方法。本文将介绍一些解决方法&#xff0c;帮助您解决无法使用F1到F12功能键的问题。 使用 Mac系统的人都知道&#xff0c;Mac系统默认是没有开启 F1-F12 的使用的&#xff0c;平时我们使用的系统都可以使…

【C语言】实现双向链表

目录 &#xff08;一&#xff09;头文件 &#xff08;二&#xff09; 功能实现 &#xff08;1&#xff09;初始化 &#xff08;2&#xff09;打印链表 &#xff08;3&#xff09; 头插与头删 &#xff08;4&#xff09;尾插与尾删 &#xff08;5&#xff09;指定位置之后…

降噪和音频修复 iZotope RX 7 Advanced

iZotope RX 7 Advanced 是一款功能强大的音频修复和增强软件&#xff0c;它能够帮助用户轻松应对各种音频问题&#xff0c;提供全面的工具和技术来优化和改善音频质量。 首先&#xff0c;iZotope RX 7 Advanced 具有出色的降噪功能。无论是背景噪音、杂音还是其他干扰因素&…

【Java EE初阶十二】网络编程TCP/IP协议(二)

1. 关于TCP 1.1 TCP 的socket api tcp的socket api和U大片的socket api差异很大&#xff0c;但是和前面所讲的文件操作很密切的联系 下面主要讲解两个关键的类&#xff1a; 1、ServerSocket&#xff1a;给服务器使用的类&#xff0c;使用这个类来绑定端口号 2、Socket&#xf…

服务治理中间件-Eureka

目录 简介 搭建Eureka服务 注册服务到Eureka 简介 Eureka是Spring团队开发的服务治理中间件&#xff0c;可以轻松在项目中&#xff0c;实现服务的注册与发现&#xff0c;相比于阿里巴巴的Nacos、Apache基金会的Zookeeper&#xff0c;更加契合Spring项目&#xff0c;缺点就是…

论文阅读:《Deep Learning-Based Human Pose Estimation: A Survey》——Part 1:2D HPE

目录 人体姿态识别概述 论文框架 HPE分类 人体建模模型 二维单人姿态估计 回归方法 目前发展 优化 基于热图的方法 基于CNN的几个网络 利用身体结构信息提供构建HPE网络 视频序列中的人体姿态估计 2D多人姿态识别 方法 自上而下 自下而上 2D HPE 总结 数据集…

GEE:最小距离(minimumDistance)回归教程(样本点、特征添加、训练、精度、参数优化)

作者:CSDN @ _养乐多_ 对于分类问题,这个输出通常是一个类别标签 ,而对于回归问题,输出通常是一个连续的数值。回归可以应用于多种场景,包括预测土壤PH值、土壤有机碳、土壤水分、碳密度、生物量、气温、海冰厚度、不透水面积百分比、植被覆盖度等。 本文将介绍在Google…

位运算+leetcode(1)

基础 1.基础知识 以下都是针对数字的二进制进行操作 >> 右移操作符<< 左移操作符~ 取反操作符 & 有0就是0&#xff0c;全一才一 | 有一才一 &#xff0c;全0才0^ 相同为0&#xff0c;相异为1 异或( ^ )运算的规律 a ^ 0 a a ^ a 0a ^ b ^ c a ^ (b …

2本对微服务拆分有帮助的书

迁移到云原生应用架构 可在线观看的免费书籍 https://pivotal.io/platform-as-a-service/migrating-to-cloud-native-application-architectures-ebook 微服务架构设计模式 世界十大架构师之一&#xff1a;克里斯理查森著

【51单片机】LED点阵屏(江科大)

9.1LED点阵屏 1.LED点阵屏介绍 LED点阵屏由若干个独立的LED组成,LED以矩阵的形式排列,以灯珠亮灭来显示文字、图片、视频等。 2.LED点阵屏工作原理 LED点阵屏的结构类似于数码管,只不过是数码管把每一列的像素以“8”字型排列而已。原理图如下 每一行的阳极连在一起,每一列…

如何在C# Windows Forms应用程序中实现控件之间的连接线

帮我实现绘图工具多个控件连接线&#xff0c;请用c#代码实现 实现绘图工具中多个控件之间的连接线功能&#xff0c;可以通过以下几个步骤来进行&#xff1a; 定义连接线的数据模型&#xff1a;首先需要定义一个模型来表示连接线&#xff0c;这个模型应该包含起点和终点的坐标。…

【Effective Objective - C 2.0】——读书笔记(四)

文章目录 二十三、通过委托与数据源协议进行对象间通信二十四、将类的实现代码分散到便于管理的数个分类之中二十五、总是为第三方的分类名称加前缀二十六、切勿在分类里面声明属性二十七、使用“class-continuation分类”隐藏实现细节二十八、通过协议提供匿名对象 二十三、通…

【Chrono Engine学习总结】4-vehicle-4.1-vehicle的基本概念

由于Chrono的官方教程在一些细节方面解释的并不清楚&#xff0c;自己做了一些尝试&#xff0c;做学习总结。 1、基本介绍 Vehicle Overview Vehicle Mannel Vehicle的官方demo 1.1 Vehicle的构型 一个车辆由许多子系统构成&#xff1a;悬挂、转向、轮子/履带、刹车/油门、动…

书生谱语-大语言模型测试demo

课程内容简介 通用环境配置 开发机 InterStudio pip 换源 临时使用镜像源安装&#xff0c;如下所示&#xff1a;some-package 为你需要安装的包名 pip install -i https://mirrors.cernet.edu.cn/pypi/web/simple some-package设置pip默认镜像源&#xff0c;升级 pip 到最新…

力扣1122. 数组的相对排序(哈希表)

Problem: 1122. 数组的相对排序 文章目录 题目描述思路及解法复杂度Code 题目描述 思路及解法 1.利用arr2创建一个无序映射&#xff08;map集合&#xff09;&#xff0c;以其中的元素作为键&#xff0c;值默认设置为0&#xff1b; 2.扫描arr1数组统计arr2元素在其中的个数(将个…

Pandas教程12:常用的pd.set_option方法,显示所有行和列+不换行显示等等...

---------------pandas数据分析集合--------------- Python教程71&#xff1a;学习Pandas中一维数组Series Python教程74&#xff1a;Pandas中DataFrame数据创建方法及缺失值与重复值处理 Pandas数据化分析&#xff0c;DataFrame行列索引数据的选取&#xff0c;增加&#xff0c…

matlab发送串口数据,并进行串口数据头的添加,我们来看下pwm解析后并通过串口输出的效果

uintt16位的话会在上面前面加上00&#xff0c;16位的话一定是两个字节&#xff0c;一共16位的数据 如果是unint8的话就不会&#xff0c; 注意这里给的是13&#xff0c;但是现实的00 0D&#xff0c;这是大小端的问题&#xff0c;在matlanb里设置&#xff0c;我们就默认用这个模式…

python 笔记:shapely(形状篇)

主要是点&#xff08;point&#xff09;、线&#xff08;linestring&#xff09;、面&#xff08;surface&#xff09; 1 基本方法和属性 object.area 返回对象的面积&#xff08;浮点数&#xff09; object.bounds 返回一个&#xff08;minx, miny, maxx, maxy&#xff09;元…