RocketMQ-快速实战

news2024/11/23 22:34:53

MQ简介

MQ:MessageQueue,消息队列。是在互联网中使用非常广泛的一系列服务中间件。

Message:消息。消息是在不同进程之间传递的数据。这些进程可以部署在同一台机器上,也可以分布在不同机器上。(数据形式:二进制压缩数据、RPC、http,都属于进程间通讯的机制)

Queue:队列。队列原意是指一种具有FIFO(先进先出)特性的数据结构,是用来缓存数据的。对于消息中间件产品来说,能不能保证FIFO特性,尚值得考量。但是,所有消息队列都是需要具备存储消息,让消息排队的能力。

作用:

  • 异步,提高系统的响应速度、吞吐量。

  • 解耦,减少服务之间的影响。提高系统整体的稳定性以及可扩展性。另外,解耦后可以实现数据分发。生产者发送一个消息后,可以由一个或者多个消费者进行消费,并且消费者的增加或者减少对生产者没有影响。

  • 消峰,以稳定的系统资源应对突发的流量冲击。

RocketMQ产品特点

RocketMQ介绍

RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。

早期阿里使用ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿里开始关注Kafka。但是Kafka是针对日志收集场景设计的,他的高级功能并不是很贴合阿里的业务场景。尤其当他的Topic过多时,由于Partition文件也会过多,这就会加大文件索引的耗时,会严重影响IO性能。于是阿里才决定自研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最大问题就是多Topic下的IO性能压力。但是产品在阿里内部的不断改进,RocketMQ开始体现出一些不一样的优势。

RocketMQ特点

当今互联网MQ产品众多,其中,影响力和使用范围最大的当数Apache Kafka、RabbitMQ、Apache RocketMQ以及Apache Plusar。这几大产品虽然都是典型的MQ产品,但是由于设计和实现上的一些差异,造成他们适合于不同的细分场景。

优点缺点适合场景
Apache Kafka吞吐量非常大,性能非常好,集群高可用。会有丢数据的可能,功能比较单一日志分析、大数据采集
RabbitMQ消息可靠性高,功能全面。erlang语言不好定制。吞吐量比较低。企业内部小规模服务调用
Apache Pulsar基于Bookeeper构建,消息可靠性非常高。周边生态还有差距,目前使用的公司比较少。企业内部大规模服务调用
Apache RocketMQ高吞吐、高性能、高可用。功能全面。客户端协议丰富。使用java语言开发,方便定制。服务加载比较慢。几乎全场景,特别适合金融场景

其中RocketMQ,孵化自阿里巴巴。历经阿里多年双十一的严格考验,RocketMQ可以说是从全世界最严苛的高并发场景中摸爬滚打出来的过硬产品,也是少数几个在金融场景比较适用的MQ产品。从横向对比来看,RocketMQ与Kafka和RabbitMQ相比。RocketMQ的消息吞吐量虽然和Kafka相比还是稍有差距,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。而RocketMQ最大的优势就是他天生就为金融互联网而生。他的消息可靠性相比Kafka也有了很大的提升,而消息吞吐量相比RabbitMQ也有很大的提升。另外,RocketMQ的高级功能也越来越全面,广播消费、延迟队列、死信队列等等高级功能一应俱全,甚至某些业务功能比如事务消息,已经呈现出领先潮流的趋势。

RocketMQ快速实战

快速搭建RocketMQ服务

RocketMQ的官网地址: RocketMQ · 官方网站 | RocketMQ

下载页面地址:下载 | RocketMQ

当前最新的版本是5.x,这是一个着眼于云原生的新版本,给 RocketMQ 带来了非常多很亮眼的新特性。但是目前来看,企业中用得还比较少。因此,我们这里采用的还是更为稳定的4.9.5版本。

注:在2020年下半年,RocketMQ新推出了5.0的大版本,这对于RocketMQ来说,是一个里程碑式的大版本。在这个大版本中,RocketMQ对整体功能做了一次大的升级。增加了很多非常有用的新特性,也对已有功能重新做了升级。

比如在具体功能方面,在4.x版本中,对于定时消息,只能设定几个固定的延迟级别,而5.0版本中,已经可以指定具体的发送时间了。在客户端语言方面,4.x版本,RocketMQ原生只支持基于Netty框架的Java客户端。而在5.0版本中,增加了对Grpc协议的支持,这基本上就解除了对客户端语言的限制。在服务端架构方面,4.x版本只支持固定角色的普通集群和可以动态切换角色的Dledger集群,而在5.0版本中,增加了Dledger Controller混合集群模式,即可以混合使用Dledger的集群机制以及 Broker 本地的文件管理机制。

但是功能强大,同时也意味着问题会很多。所以目前来看,企业中直接用新版本的还比较少。小部分使用新版本的企业,也大都是使用内部的改造优化版本。

这里下载的是这个版本:

上传到服务器并解压:(unzip rocketmq-all-4.9.5-bin-release.zip

RocketMQ建议的运行环境需要至少12G的内存,这是生产环境比较理想的资源配置。但是我买的云服务器是2核4g,所以需要修改启动配置:(:set number临时显示行号)

注意:生产环境不建议修改上面两个配置。

RocketMQ是基于Java开发的,所以依赖Java开发环境,安装JDK步骤省略,建议采用1.8版本

RocketMQ的后端服务分为nameserver和broker两个服务:

# 第一步:启动nameserver服务,进入安装目录执行命令
nohup bin/mqnamesrv &
# 是否启动成功可以通过jps检查,启动成功或失败可以查看nohup.out文件

# 为了方便测试在conf/broker.conf文件添加配置:
autoCreateTopicEnable=true
# 注意:如果是云服务器,还需要额外添加一行配置
brokerIP1 = 你的公网IP

# 第二步:启动broker服务,进入安装目录执行命令
nohup bin/mqbroker &

注意:

1、在实际服务部署时,通常会将RocketMQ的部署地址添加到环境变量当中。例如使用vi ~/.bash_profile指令,添加以下内容

export ROCKETMQ_HOME=/home/rocket/rocketmq-all-4.9.5-bin-release // 修改为你的安装目录
PATH=$ROCKETMQ_HOME/bin:$PATH
export PATH

2、停止RocketMQ服务可以通过mqshutdown指令进行,停止服务有短暂延迟,不建议kill杀进程

mqshutdown namesrv # 关闭nameserver服务
mqshutdown broker # 关闭broker服务

快速实现消息收发

1、命令行快速实现消息收发

第一步:需要配置一个环境变量NAMESRV_ADDR,指向之前启动的nameserver服务。

通过vi ~/.bash_profile添加以下配置。然后使用source ~/.bash_profile让配置生效。

export NAMESRV_ADDR='localhost:9876'

修改后文件:

第二步:通过指令启动RocketMQ的消息生产者发送消息。默认往RocketMQ中发送1000条消息

tools.sh org.apache.rocketmq.example.quickstart.Producer 
    
...消息发送日志
SendResult [sendStatus=SEND_OK, msgId=7F0000018FBA1B6D358697CBE7FB03E7, offsetMsgId=C0A800DA00002A9F000000000005DA64, messageQueue=MessageQueue [topic=TopicTest, brokerName=hcss-ecs-3744, queueId=3], queueOffset=499]
11:25:22.820 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[127.0.0.1:9876] result: true
11:25:22.825 [NettyClientSelector_1] INFO RocketmqRemoting - closeChannel: close the connection to remote address[192.168.0.218:10911] result: true

第三步:可以启动消息消费者接收之前发送的消息

tools.sh org.apache.rocketmq.example.quickstart.Consumer
    
...消息消费日志
ConsumeMessageThread_please_rename_unique_group_name_4_15 Receive New Messages: [MessageExt [brokerName=hcss-ecs-3744, queueId=2, storeSize=192, queueOffset=199, sysFlag=0, bornTimestamp=1701312827986, bornHost=/192.168.0.218:32850, storeTimestamp=1701312827987, storeHost=/192.168.0.218:10911, msgId=C0A800DA00002A9F00000000000256D2, commitLogOffset=153298, bodyCRC=748130833, reconsumeTimes=0, preparedTransactionOffset=0, toString()=Message{topic='TopicTest', flag=0, properties={MIN_OFFSET=0, MAX_OFFSET=250, CONSUME_START_TIME=1701314617997, UNIQ_KEY=7F0000018E561B6D358697AEFE52031F, CLUSTER=DefaultCluster, TAGS=TagA}, body=[72, 101, 108, 108, 111, 32, 82, 111, 99, 107, 101, 116, 77, 81, 32, 55, 57, 57], transactionId='null'}]]

注意:这个Consumer消费者的指令并不会主动结束,他会继续挂起,等待消费新的消息。可以使用CTRL+C停止该进程。

2、搭建Maven客户端项目

第一步:创建一个标准的maven项目,在pom.xml中引入以下核心依赖

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.5</version>
</dependency>

第二步:就可以直接创建一个简单的消息生产者

public class Producer
{
    public static void main(String[] args)
        throws MQClientException, InterruptedException
    {
        // 初始化一个消息生产者
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");
        // 指定nameserver地址
        producer.setNamesrvAddr("192.168.232.128:9876");
        // 启动消息生产者服务
        producer.start();
        for (int i = 0; i < 2; i++)
        {
            try
            {
                // 创建消息。消息由Topic,Tag和body三个属性组成,其中Body就是消息内容
                Message msg =
                    new Message("TopicTest", "TagA", ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET));
                // 发送消息,获取发送结果
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            }
            catch (Exception e)
            {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // 消息发送完后,停止消息生产者服务。
        producer.shutdown();
    }
}

注意:对于生产者,需要指定对应的nameserver服务的地址,这个地址需要指向你自己的服务器。

第三步:创建一个消息消费者接收RocketMQ中的消息。

public class Consumer
{
    public static void main(String[] args)
        throws InterruptedException, MQClientException
    {
        // 构建一个消息消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
        // 指定nameserver地址
        consumer.setNamesrvAddr("192.168.232.128:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
        // 订阅一个感兴趣的话题,这个话题需要与消息的topic一致
        consumer.subscribe("TopicTest", "*");
        // 注册一个消息回调函数,消费到消息后就会触发回调。
        consumer.registerMessageListener(new MessageListenerConcurrently()
        {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
            {
                msgs.forEach(messageExt -> {
                    try
                    {
                        System.out.println("收到消息:" + new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                    }
                    catch (UnsupportedEncodingException e)
                    {
                    }
                });
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者服务
        consumer.start();
        System.out.print("Consumer Started");
    }
}

注意:对于消费者,同样需要指定nameserver的地址,另外消费者需要在RocketMQ中订阅具体的Topic,只有发送到这个Topic上的消息才会被这个消费者接收到

生产消费报错:RemotingTooMuchRequestException: sendDefaultImpl call timeout

解决方法:

1、在conf/broker.conf 中加入配置:

namesrvAddr = 你的公网IP:9876
brokerIP1 = 你的公网IP

2、重启broker,启动命令指定配置文件:

nohup mqbroker -n localhost:9876 -c conf/broker.conf &

重启完成,上面的生产者消费者测试代码通过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1274120.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

NASM安装和结合nodepad++进行编译的过程

mov ax,0x30 mov bx,0xc0 add ax,bx times 502 db 0 db 0x55 db 0xAA nasm安装地址: https://www.nasm.us/ 下载exe安装 在命令行提示符输入nasm编译命令 nasm exam.asm -f bin -o exam.bin 此时输入回车将会执行编译过程。 1&#xff0c;启动NotePad&#xff0c;在菜单上选…

【驱动】串口驱动分析(三)-serial driver

简介 前两节我们介绍串口驱动的框架和tty core部分。这节我们介绍和硬件紧密相关的串口驱动部分。 UART驱动部分依赖于硬件平台&#xff0c;而TTY驱动和具体的平台无关。虽然UART部分依赖于平台&#xff0c;但是不管是哪个硬件平台&#xff0c;驱动的思路都是一致的&#xff…

vue3中的provide与inject跨层级组件(祖孙)间通信

provide和inject提供依赖注入&#xff0c;功能类似 vue2.x 的provide/inject 实现跨层级组件(祖孙)间通信 子或孙子组件接收到的数据可以用于读取显示&#xff0c;也可以进行修改&#xff0c;同步修改父&#xff08;祖&#xff09;组件的数据。 注意&#xff1a;无论子组件…

微服务--08--Seata XA模式 AT模式

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 分布式事务Seata 1.XA模式1.1.两阶段提交1.2.Seata的XA模型1.3.优缺点 AT模式2.1.Seata的AT模型2.2.流程梳理2.3.AT与XA的区别 分布式事务 > 事务–01—CAP理论…

Constraintlayout

goneMargin 约束的View隐藏时的margin 约束链风格 chainStyle 权重 bias 设置宽高比 w,h 百分比 GuideLine 基线 上下的间距 Group 指定一系列View进行绑定进行操作 通过init加载 然后setIds进行绑定 然后通过group进行操作 Layer 设置动画 Barrier Flow

QT线程的使用 循环中程序的等待

QT线程的使用 循环中程序的等待 先看效果1 pro文件2 头文件3 源文件4 ui文件先看效果 1 pro文件 QT += concurrent2 头文件 #ifndef MAINWINDOW_H #define MAINWINDOW_H

⭐ Unity 开发bug —— 打包后shader失效或者bug (我这里用Shader做两张图片的合并发现了问题)

1.这里我代码没啥问题~~~编辑器里也没毛病 void Start(){// 加载底图和上层图片string backgroundImagePath Application.streamingAssetsPath "/background.jpg";Texture2D backgroundTexture new Texture2D(2, 2);byte[] backgroundImageData System.IO.File.R…

qt5.15播放音频示例(4种方法)

文章目录 Qt播放音频方法一 QMediaPlayer方法二 QSound方法三 QSoundEffect方法四 QAudioOutput问题1 播放无声问题2 QAudioOutput播放嗡嗡声的问题参考Qt播放音频 在linux系统中,可以通过aplay进行简单的播放音频,如 aplay /opt/Audio/test.wav在图形界面,也可以封装apla…

【机器学习】集成学习算法之AdaBoost

文章目录 基本步骤示例生成第 1 棵决策树生产第 2 棵决策树生成第 T 棵决策树加权投票 sklearn 实现 基本步骤 首先&#xff0c;是初始化训练数据的权值分布 D 1 D_1 D1​。假设有 m m m 个训练样本数据&#xff0c;则每一个训练样本最开始时&#xff0c;都被赋予相同的权值…

传统算法:使用 Pygame 实现广度优先搜索(BFS)

使用 Pygame 模块实现了广度优先搜索(BFS)的动画演示。首先,通过邻接矩阵表示了一个图的结构,其中每个节点表示一个字符,每个字符的邻居表示与之相邻的节点。然后,通过广度优先搜索算法按层级顺序访问节点,过程中通过动画效果可视化每一步的变化。每次访问一个节点,该节…

计算机服务器中了_locked勒索病毒如何处理,_locked勒索病毒解密数据恢复

网络技术的不断发展&#xff0c;给企业的生产生活提供了极大便利&#xff0c;越来越多的企业走向数字化办公时代&#xff0c;但网络的发展也为网络安全埋下隐患&#xff0c;网络安全威胁不断增加。近期&#xff0c;云天数据恢复中心陆续接到很多企业的求助&#xff0c;企业的计…

制造企业建设数字工厂管理系统的难点主要有哪些

随着科技的飞速发展&#xff0c;制造企业正面临着从传统生产模式向数字化、智能化转型的挑战。其中&#xff0c;建设数字工厂管理系统是实现这一目标的重要途径。然而&#xff0c;在实际操作过程中&#xff0c;制造企业往往会遇到一系列难点。本文将对这些难点进行详细的分析。…

kNN-NER: Named Entity Recognition with Nearest Neighbor Search

原文链接&#xff1a;https://arxiv.org/pdf/2203.17103.pdf 预发表论文 介绍 受到增强式检索方法的启发&#xff0c;作者提出了kNN-NER&#xff0c;通过检索训练集中k个邻居的标签分布来提高模型命名实体识别分类的准确性。该框架能够通过充分利用训练信息来解决样本类别不平衡…

基于STC12C5A60S2系列1T 8051单片机的液晶显示器LCD1602显示整数、小数应用

基于STC12C5A60S2系列1T 8051单片机的液晶显示器LCD1602显示整数、小数应用 STC12C5A60S2系列1T 8051单片机管脚图STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式及配置STC12C5A60S2系列1T 8051单片机I/O口各种不同工作模式介绍液晶显示器LCD1602简单介绍IIC通信简单介绍…

Qt应用开发--国产工业开发板全志T113-i的部署教程

Qt在工业上的使用场景包括工业自动化、嵌入式系统、汽车行业、航空航天、医疗设备、制造业和物联网应用。Qt被用来开发工业设备的用户界面、控制系统、嵌入式应用和其他工业应用&#xff0c;因其跨平台性和丰富的功能而备受青睐。 Qt能够为工业领域带来什么好处&#xff1a; -…

MAVEN冲突解决

MAVEN冲突解决 1.安装下面这个插件 2.安装成功点击pom文件 dependency analyzer标志&#xff0c;说明maven helper插件就安装成功 3.点击dependency analyzer之后就会进入到下面的页面 4.标记红色就是版本冲突&#xff0c;右击complie&#xff0c;排除不是使用的 5.POM 文件…

单体架构demo

idea 新建maven项目 1、外层pom.xml 2、jar 包pom.xml 3、主要pom.xml 这个打包插件放在有main 启动模块中 <build><plugins><plugin><groupId>org.springframework.boot</groupId><artifactId>spring-boot-maven-plugin</artifact…

3D模型渲染导致电脑太卡怎么办?

在线工具推荐&#xff1a; 三维数字孪生场景工具 - GLTF/GLB在线编辑器 - Three.js AI自动纹理化开发 - YOLO 虚幻合成数据生成器 - 3D模型在线转换 - 3D模型预览图生成服务 1、什么是3D渲染&#xff1f; 3D渲染是指通过计算机图形学技术将三维模型转化为二维图像的过程…

[安洵杯 2019]easy_web

打开环境 img传参还有cmd img应该是base&#xff0c;先解码看看 3535352e706e67 这个好像是十六进制的&#xff0c;再解 访问一下看看&#xff0c;得到一张图片 尝试base解码&#xff0c;但是没有什么发现 再看看地址栏出现index.php,应该是要下载源码&#xff0c;但是还没有…

Vue3-数据交互请求工具设计

1.安装axios pnpm add axios 2.利用axios.create创建一个自定义的axios来使用 参考官网&#xff1a;axios中文文档|axios中文网 | axios 在src/utils文件夹下新建request.js&#xff0c;封装axios模块 import axios from axios const baseURL const instance axios.creat…