RocketMQ下载安装及基本使用

news2025/1/13 10:12:22

目录

消息队列的作用

消息队列的优势

应用解耦

异步提速

削峰填谷

RocketMQ介绍

RocketMQ特点

RocketMQ安装下载(4.9.5版本)

RocketMQ启动可视化管理服务

RocketMQ实现基本消息收发


消息队列的作用

       队列是一种FIFO先进先出的数据结构。消息则是跨进程传递的数据。一个典型的MQ系统,会将消息消息由生产者发送到MQ进行排队,然后根据一定的顺序交由消息的消费者进行处理。

消息队列的优势

应用解耦

                   

       假如用户访问订单系统,而订单系统跟其他系统是强耦合的,如图如果库存系统挂了,那么整个订单系统也都不能用了。如果这种情况还想要增加新的XX系统进来,那么就只能修改源代码来完成。系统的耦合性越高,容错性就越低,可维护性就越低。

                  

        通过引入MQ做到应用解耦,库存系统出现异常可以等库存系统恢复后去MQ中拿消息,此时不影响别的系统调用,如果还要加入新的系统比如XX系统,那么只需XX系统去MQ中拿取消息进行处理即可。使用MQ可以提升容错性和可维护性。

异步提速

                      

原先用户请求订单系统,需要等到订单系顺序调用其他系统无误后返回,比较耗时。

                       

      现在通过引入MQ,订单系统只需要把信息发送到MQ中即可,相当于完成了之前顺序请求其他系统的步骤,时间成本大大减低。

削峰填谷

                         

以上场景中激增请求会打垮系统,造成服务不可用。

                       

 通过将激增请求先放到MQ当前,然后系统再根据自身情况拉取请求来消费

                        

RocketMQ介绍

       RocketMQ是阿⾥巴巴开源的⼀个消息中间件,在阿⾥内部历经了双⼗⼀等很多⾼并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的⼀个顶级项⽬。

       早期阿⾥使⽤ActiveMQ,但是,当消息开始逐渐增多后,ActiveMQ的IO性能很快达到了瓶颈。于是,阿⾥开始关注Kafka。但是Kafka是针对⽇志收集场景设计的,他的⾼级功能并不是很贴合阿⾥的业务场景。尤其当他的 Topic过多时,由于Partition⽂件也会过多,这就会加⼤⽂件索引的耗时,会严重影响IO性能。于是阿⾥才决定⾃研中间件,最早叫做MetaQ,后来改名成为RocketMQ。最早他所希望解决的最⼤问题就是多Topic下的IO性能压⼒。但是产品在阿⾥内部的不断改进,RocketMQ开始体现出⼀些不⼀样的优势。

RocketMQ特点


RocketMQ安装下载(4.9.5版本)

        RocketMQ的官⽹地址: RocketMQ · 官方网站 | RocketMQ 。在下载⻚⾯可以获取RocketMQ的源码包以及运⾏包。下载⻚⾯地址:下载 | RocketMQ

下载完成后解压后如图:

配置环境变量

              

配置完成后进入bin目录下,打开cmd通过以下命令启动nameserver和broker。

//启动nameserver
start mqnamesrv.cmd
//启动broker
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

启动后如下图所示即启动完成


RocketMQ启动可视化管理服务

在之前的下载界面,一直往下拉会看到如下

     

       这是一个springboot项目,要求maven版本在3.2以上,可以选择直接启动,也可以选择将该项目打成一个jar包,然后用java -jar的方式启动,本人因为用jar包启动访问管理服务很慢,所以选择本地启动项目。访问localhost:8080即可访问。


RocketMQ实现基本消息收发

引入依赖

        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.5</version>
        </dependency>

生产者代码

public class Producer {
    public static void main(String[] args) throws MQClientException, InterruptedException {

        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_group_name");

        producer.setNamesrvAddr("127.0.0.1:9876");

        producer.start();

        for (int i = 0; i < 2; i++) {
            try {
                Message msg = new Message("TopicTest" /* Topic */,
                    "TagA" /* Tag */,
                    ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) 
                );
                msg.setKeys("testProducer"+i);

                SendResult sendResult = producer.send(msg);

                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
        producer.shutdown();
    }
}

消费者代码

public class Consumer {

    public static void main(String[] args) throws InterruptedException, MQClientException {

        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("please_rename_unique_group_name_4");
      
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);

        consumer.subscribe("TopicTest", "*");

        consumer.registerMessageListener(new MessageListenerConcurrently() {

            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                ConsumeConcurrentlyContext context) {
                msgs.forEach(messageExt -> {
                    try {

                        System.out.println("收到消息:"+new String(messageExt.getBody(), RemotingHelper.DEFAULT_CHARSET));
                        System.out.println("消息Key:"+messageExt.getKeys());
                    } catch (UnsupportedEncodingException e) {}
                });
                System.out.printf("%s Receive New Messages: %s %n", Thread.currentThread().getName(), msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.out.printf("Consumer Started.%n");
    }
}

启动后结果如下:

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1440138.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多线程JUC:多线程的实现和常用成员方法(守护、礼让、插入线程)

&#x1f468;‍&#x1f393;作者简介&#xff1a;一位大四、研0学生&#xff0c;正在努力准备大四暑假的实习 &#x1f30c;上期文章&#xff1a;首期文章 &#x1f4da;订阅专栏&#xff1a;多线程&JUC 希望文章对你们有所帮助 JUC的学习也是需要一些计算机、操作系统的…

【算法设计与分析】求根节点到叶节点数字之和

&#x1f4dd;个人主页&#xff1a;五敷有你 &#x1f525;系列专栏&#xff1a;算法分析与设计 ⛺️稳中求进&#xff0c;晒太阳 题目 给你一个二叉树的根节点 root &#xff0c;树中每个节点都存放有一个 0 到 9 之间的数字。 每条从根节点到叶节点的路径都代表一个数…

高可用 k8s 1.29 一键安装脚本, 丝滑至极

博客原文 文章目录 集群配置配置清单集群规划集群网络规划 环境初始化主机配置 配置高可用ApiServer安装 nginx安装 Keepalived 安装脚本需要魔法的脚本不需要魔法的脚本配置自动补全加入其余节点 验证集群 集群配置 配置清单 OS&#xff1a; ubuntu 20.04kubernetes&#xf…

HarmonyOS class类对象基础使用

按我们之前的写法 就是 Entry Component struct Dom {p:Object {name: "小猫猫",age: 21,gf: {name: "小小猫猫",age: 18,}}build() {Row() {Column() {// ts-ignoreText(this.p.gf.name)}.width(100%)}.height(100%)} }直接用 Object 一层一层往里套 这…

MySQL用心总结

大家好&#xff0c;好久不见&#xff0c;今天笔者用心一步步写一份mysql的基础操作指南&#xff0c;欢迎各位点赞收藏 -- 启动MySQL net start mysql-- 创建Windows服务 sc create mysql binPath mysqld_bin_path(注意&#xff1a;等号与值之间有空格) mysql -h 地址 -…

【Flink状态管理(二)各状态初始化入口】状态初始化流程详解与源码剖析

文章目录 1. 状态初始化总流程梳理2.创建StreamOperatorStateContext3. StateInitializationContext的接口设计。4. 状态初始化举例&#xff1a;UDF状态初始化 在TaskManager中启动Task线程后&#xff0c;会调用StreamTask.invoke()方法触发当前Task中算子的执行&#xff0c;在…

如何从 iPhone 上恢复永久删除的照片

您的 iPhone 上缺少照片吗&#xff1f;讽刺的是&#xff0c;iPhone 的许多高级功能可能正是这个问题如此普遍的原因。幸运的是&#xff0c;还有很多方法可以从 iPhone 恢复已删除的照片&#xff0c;具体取决于您设备的设置方式。 本文涵盖了所有这些内容。该过程根据您的具体情…

git安装配置

1、下载安装 下载地址 2、配置git用户 git config --global user.name "yw" git config --global user.email "88888qq.com" 3、git init 初始化 4、生成ssh密钥 mkdir .ssh //创建文件夹cd .ssh //进入新建文件夹 ssh-keygen -t rsa // 输入密钥文…

计算机服务器中了halo勒索病毒如何处理,halo勒索病毒解密数据恢复

网络技术的不断发展与应用&#xff0c;为企业的生产生活提供了极大便利&#xff0c;但网络数据安全威胁无处不在&#xff0c;近日&#xff0c;云天数据恢复中心接到某连锁超市求助&#xff0c;企业计算机服务器被halo勒索病毒攻击&#xff0c;导致计算机系统瘫痪&#xff0c;无…

将xyz格式的GRACE数据转成geotiff格式

我们需要将xyz格式的文件转成geotiff便于成图&#xff0c;或者geotiff转成xyz用于数据运算&#xff0c;下面介绍如何实现这一操作&#xff0c;采用GMT和matlab两种方法。 1.GMT转换 我们先准备一个xyz文件&#xff0c;这里是一个降水文件。在gmt中采用以下的语句实现xyz转grd…

【Spring】GoF 之工厂模式

一、GoF 23 设计模式简介 设计模式&#xff1a;一种可以被重复利用的解决方案 GoF&#xff08;Gang of Four&#xff09;&#xff0c;中文名——四人组 《Design Patterns: Elements of Reusable Object-Oriented Software》&#xff08;即《设计模式》一书&#xff09;&…

TCP 传输控制协议

1 TCP 1.1 TCP 最主要的特点 1.TCP 是面向连接的运输层协议。 2.每一条 TCP 连接只能有两个端点 (endpoint)&#xff0c;每一条 TCP 连接只能是点对点的&#xff08;一对一&#xff09;。 3.TCP 提供可靠交付的服务。 4.TCP 提供全双工通信。 5.面向字节流 TCP 中的“流…

【Qt】Android上运行keeps stopping, Desktop上正常

文章目录 问题 & 背景背景问题 解决方案One More ThingTake Away 问题 & 背景 背景 在文章【Qt】最详细教程&#xff0c;如何从零配置Qt Android安卓环境中&#xff0c;我们在Qt中配置了安卓开发环境&#xff0c;并且能够正常运行。 但笔者在成功配置并完成上述文章…

Git、github与gitee码云

1.git核心是两个仓库&#xff1a;本地仓库和远程仓库 主要用于团队合作和代码版本控制&#xff08;个人现有版本代码出错可回溯上个提交版本的代码&#xff09; 远程仓库国际主流githut&#xff0c;但外网速度问题&#xff0c;国内可使用码云gitee github&#xff1a;https:…

Springboot拦截器中跨域失效的问题、同一个接口传入参数不同,一个成功,一个有跨域问题、拦截器和@CrossOrigin和@Controller

Springboot拦截器中跨域失效的问题 一、概述 1、具体场景 起因&#xff1a; 同一个接口&#xff0c;传入不同参数进行值的修改时&#xff0c;一个成功&#xff0c;另一个竟然失败&#xff0c;而且是跨域问题拦截器内的request参数调用getHeader方法时&#xff0c;获取不到前端…

CSS:九宫格布局

九宫格布局效果如下&#xff1a; HTML 结构&#xff1a; <div class"container"><div class"item">1</div><div class"item">2</div><div class"item">3</div><div class"item&q…

机器学习系列——(十六)回归模型的评估

引言 在机器学习领域&#xff0c;回归模型是一种预测连续数值输出的重要工具。无论是预测房价、股票价格还是天气温度&#xff0c;回归模型都扮演着不可或缺的角色。然而&#xff0c;构建模型只是第一步&#xff0c;评估模型的性能是确保模型准确性和泛化能力的关键环节。本文…

【力扣】快乐数,哈希集合 + 快慢指针 + 数学

快乐数原题地址 方法一&#xff1a;哈希集合 定义函数 getNext(n) &#xff0c;返回 n 的所有位的平方和。一直执行 ngetNext(n) &#xff0c;最终只有 2 种可能&#xff1a; n 停留在 1 。无限循环且不为 1 。 证明&#xff1a;情况 1 是存在的&#xff0c;如力扣的示例一…

(十八)springboot实战——spring securtity注解方式的授权流程源码解析

前言 在上一节内容中&#xff0c;我们介绍了如何在FilterSecurityInterceptor过滤器中处理用户的授权流程&#xff0c;并分析了其源码&#xff0c;spring security还提供了方法级别的授权方式&#xff0c;通过EnableMethodSecurity注解启用权限认证流程&#xff0c;只需要在方…

每日一题——LeetCode1422.分割字符串的最大得分

方法一 暴力枚举 枚举所有分割点的情况&#xff0c;取最大得分 var maxScore function(s) {let res 0;const n s.length;for (let i 1; i < n; i) {let score 0;for (let j 0; j < i; j) {if (s[j] 0) {score;}}for (let j i; j < n; j) {if (s[j] 1) {sco…