RocketMQ安装与使用

news2024/9/24 0:36:53

什么是消息中间件

消息中间件利用高效可靠的消息传递机制进行平台无关的数据交流,并基于数据通信来进行分布式系统的集成。通过提供消息传递和消息排队模型,它可以在分布式环境下扩展进程间的通信。对于消息中间件,常见的角色大致也就有Producer(生产者)、Consumer(消费者)例如:寄快递

消息中间件使用场景

异步处理

场景说明:用户注册后,需要发注册邮件和注册短信

将注册信息写入数据库成功后,发送注册邮件,再发送注册短信,以上三个任务全部完成后,返回给客户端

引入消息队列,改造后的架构如下

按照以上约定,用户的响应时间相当于是注册信息写入数据库的时间,也就是50毫秒。注册邮件,发送短信写入消息队列后,直接返回,因此写入消息队列的速度很快,基本可以忽略,因此架构改变后,系统的吞吐量比串行提高了3倍,比并行提高了2倍

应用解耦

场景说明:用户下单后,订单系统需要通知库存系统,传统的做法是订单系统通过调用库存系统的接口来对库存进行操作

解耦合后:

订单系统:假如在下单时库存系统不能正常使用,也不影响正常下单,因为下单后,订单系统写入消息队列就不再关心其他的后续操作了,实现订单系统与库存系统的应用解耦

常见消息中间件比较

特性MQ

ActiveMQ

RabbitMQ

RocketMQ

Kafka

生产者消费者模式

支持

支持

支持

支持

发布订阅模式

支持

支持

支持

支持

请求回应模式

支持

支持

不支持

不支持

Api完备性

多语言支持

支持

支持

java

支持

单机吞吐量

万级

万级

万级

十万级

消息延迟

微秒级

毫秒级

毫秒级

可用性

高(主从)

高(主从)

非常高(分布式)

非常高(分布式)

消息丢失

理论上不会丢失

理论上不会丢失

文档的完备性

较高

提供快速入门

社区活跃度

商业支持

商业云

商业云

安装

解压即安装,注意 jdk 版本为 8

启动

启动broker使用命令,可以开启自动创建topic,否则会报错

建议使用命令启动

start mqnamesrv.cmd
start mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true

autoCreateTopicEnable=true开启自动创建topic

使用

硬编码方式发送

导入依赖无需配置

<!--MQ-->
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-client</artifactId>
  <version>4.4.0</version>
</dependency>
发送同步消息

这种可靠性同步地发送方式使用的比较广泛,比如:重要的消息通知,短信通知。

  1. 创建消息生产者, 指定生产者所属的组名
  2. 指定Nameserver地址
  3. 启动生产者
  4. 创建消息对象,指定主题、标签和消息体
  5. 发送消息
  6. 关闭生产者
package com.ape.rocketmq.test;

import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.exception.RemotingException;

// 发送同步消息
public class RocketMQSendTest01 {
    public static void main(String[] args) throws MQClientException, MQBrokerException, RemotingException, InterruptedException {
        //1. 创建消息生产者, 指定生产者所属的组名
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        //2. 指定Nameserver地址
        producer.setNamesrvAddr("127.0.0.1:9876");
        //3. 启动生产者
        producer.start();
        for (int i = 0; i < 10; i++) {
            //4. 创建消息对象,指定主题、标签和消息体
            Message msg = new Message("myTopic", "myTag", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流" + i).getBytes());
            //5. 发送消息
            SendResult sendResult = producer.send(msg);
            System.out.println(sendResult);
        }
        //6. 关闭生产者
        producer.shutdown();
    }
}
发送异步消息

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。只会等待MQ发送状态

相比于同步消息,就是在发送的时候new一个SendCallback类重写onSuccess以及onException方法

package com.ape.rocketmq.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendCallback;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

//发送异步消息
//异步发送比较浪费性能,经常会失败,所以发送多几次并且让线程休眠几秒
public class RocketMQSendTest02 {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag2", ("十行代码九个错误八个警告竟敢说七日精通六天学会五湖四海也不见如此三心二意之程序简直一等下流" + i).getBytes());
            //            发送消息时候new一个类
            producer.send(message, new SendCallback() {
                @Override
                public void onSuccess(SendResult sendResult) {
                    System.out.println("发送成功:" + sendResult);
                }

                @Override
                public void onException(Throwable throwable) {
                    System.out.println("发送失败:" + throwable);
                }
            });
            TimeUnit.SECONDS.sleep(3);
        }
        producer.shutdown();
    }
}
发送单向消息

单向发送消息 类似UPD 只管发不管能不能收到,这种方式主要用在不特别关心发送结果的场景,例如日志发送。

相比于同步消息发送时候没有返回值

package com.ape.rocketmq.test;

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.util.concurrent.TimeUnit;

// 单向发送消息 类似UPD 只管发不管能不能收到
public class RocketMQSendTest03 {
    public static void main(String[] args) throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("myproducer-group");
        producer.setNamesrvAddr("localhost:9876");
        producer.start();
        for (int i = 0; i < 10; i++) {
            Message message = new Message("myTopic", "myTag3", ("我是单向发送,类似于UPD" + i).getBytes());
            // 发送单向消息,没有任何返回结果
            producer.send(message);
            TimeUnit.SECONDS.sleep(3);
        }
        producer.shutdown();
    }
}

硬编码方式接收

消息接收步骤:

  1. 创建消息消费者, 指定消费者所属的组名
  2. 指定Nameserver地址
  3. 指定消费者订阅的主题和标签
  4. 设置回调函数,编写处理消息的方法
  5. 启动消息消费者
package com.ape.rocketmq.test;

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;

import java.util.List;

// 接收消息
public class RocketMQReceiveTest01 {
    public static void main(String[] args) throws Exception {
        //        创建消费者 指定所属组
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("myConsumer-group");
        //        指定NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        //        指定订阅生产者的主题和标签
        consumer.subscribe("myTopic", "*");
        //CLUSTERING-clustering   集群
        //BROADCASTING-broadcasting  广播
        //        consumer.setMessageModel(MessageModel.BROADCASTING);
        //        设置回调方法 编写消息处理方法
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                System.out.println(new String(list.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //5. 启动消息消费者
        consumer.start();
        System.out.println("消费者1已经启动……");
    }
}
负载均衡模式(默认模式)

不设置接收模式默认就是负载均衡,轮询

也就是不写consumer.setMessageModel(MessageModel.BROADCASTING);

广播模式

消费者采用广播的方式消费消息,每个消费者(订阅同一个主题的)都能接收到消息,并且每个消费者消费的消息都是相同的

consumer.setMessageModel(MessageModel.BROADCASTING);

集群模式

consumer.setMessageModel(MessageModel.CLUSTERING);

与spingboot集成

业务场景:下单成功之后,向下单用户发送短信

添加依赖与配置文件
<dependency>
  <groupId>org.apache.rocketmq</groupId>
  <artifactId>rocketmq-spring-boot-starter</artifactId>
  <version>2.0.2</version>
</dependency>

生产者配置文件

server:
  port: 8091

rocketmq:
  name-server: localhost:9876
  producer:
    group: shop-order

消费者配置文件

server:
  port: 8071
rocketmq:
  name-server: localhost:9876

项目地址E:\Codes\Idea_java_works\apesource\springboot\微服务\springboot_rocketmq02

结构如下

生产者
package com.apesource.shoporder.controller;

import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.core.RocketMQTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;


@RestController
public class OrderController {
    @Autowired(required = false)
    private RocketMQTemplate template;

    @RequestMapping("/order/prod/{pid}")
    public Order order(Integer pid){
        //        下单创建订单 根据商品id查询数据库的到信息赋值给order  这里直接新建order模拟
        Order order = new Order();
        order.setUid(1);
        order.setUsername("测试用户");
        order.setPid(pid);
        order.setPname("大豫竹");
        order.setPprice(2.0);
        order.setNumber(1);

        //        下订单 给数据库的order表新增一行数据  这里输出来模拟
        System.out.println(order);
        //        下单成功 给用户发短信 这里直接发送order对象来模拟
        template.convertAndSend("order-topic",order);
        return order;
    }
}
消费者监听器
package com.apesource.shopuser.listener;

import com.apesource.shopcommon.pojo.Order;
import org.apache.rocketmq.spring.annotation.RocketMQMessageListener;
import org.apache.rocketmq.spring.core.RocketMQListener;
import org.springframework.stereotype.Component;

//接收信息并且发送短信给用户
@Component
@RocketMQMessageListener(consumerGroup = "shop-user", topic = "order-topic")
public class SmsListener implements RocketMQListener<Order> {
    @Override
    public void onMessage(Order order) {
        System.out.println(order);
    }
}

按照逻辑前端发送请求

order服务会打印order对象,user服务也会打印order对象

order服务

user服务

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2126608.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV高阶操作

在图像处理与计算机视觉领域&#xff0c;OpenCV&#xff08;Open Source Computer Vision Library&#xff09;无疑是最为强大且广泛使用的工具之一。从基础的图像读取、 1.图片的上下&#xff0c;采样 下采样&#xff08;Downsampling&#xff09; 下采样通常用于减小图像的…

日志相关知识

1.作用 a.为了代替System.out.println()&#xff0c;可以定义格式&#xff0c;重定向文件等。 b.可以存档&#xff0c;便于追踪问题。 c.可以按级别分类&#xff0c;便于打开或关闭某些级别。 d.可以根据配置文件调整日志&#xff0c;无需修改代码。 …

如何逆转Instagram账号流量减少?实用技巧分享

Instagram作为全球十大社媒之一&#xff0c;不仅是个人分享生活的平台&#xff0c;还是跨境卖家进行宣传推广和客户开发的关键工具。在运营Instagram的过程中&#xff0c;稍有不慎就容易出现账号被限流的情况&#xff0c;对于账号状态和运营工作的进行都十分不利。 一、如何判断…

图片预览、拖拽和缩放组件分享

业务场景 项目中不需要点击小图然后展示大图&#xff0c;类似于elementui中的Image图片组件。适用于直接展示大图&#xff0c;支持拖拽和缩放的场景&#xff0c;比如&#xff1a;用户需要比对两种数据的图片展示&#xff0c;左右两侧进行展示。 效果图 使用方式 在components…

宏任务和微任务+超全面试真题

概念 微任务和宏任务是在异步编程中经常使用的概念&#xff0c;用于管理任务的执行顺序和优先级。 宏任务&#xff1a;setTimeout, setInterval&#xff0c;I/O 操作和 UI 渲染等。微任务&#xff1a; Promise 回调、async/await等 微任务通常比宏任务具有更高的优先级。 执…

S7-1500替代S7-300全解析系列

硬件篇上 01 概述工控人加入PLC工业自动化精英社群 2022年十月初的时候&#xff0c;想必工控圈的小伙伴们都被S7-300系列即将于2023年10月1日退市的消息刷屏了吧&#xff1f;倒退到2020年的10月1日&#xff0c;同样伴随我们多年的ET200S系列也已经悄无声息地退市了。在感叹经…

GEE 将本地 GeoJSON 文件上传到谷歌资产

在地理信息系统&#xff08;GIS&#xff09;领域&#xff0c;Google Earth Engine&#xff08;GEE&#xff09;是一个强大的平台&#xff0c;它允许用户处理和分析大规模地理空间数据。本文将介绍如何使用 Python 脚本批量上传本地 GeoJSON 文件到 GEE 资产存储&#xff0c;这对…

Qt (16)【Qt 事件 —— Qt 事件简介 | 如何重写相关的 Event 函数】

阅读导航 引言一、事件介绍二、如何重写相关的 Event 函数1. 事件的处理简介2. 示例重写鼠标相关的 Event 函数&#xff08;1&#xff09;新建Qt项目&#xff0c;设计UI文件&#xff08;2&#xff09;新添加MyLabel类&#xff08;3&#xff09;重写enterEvent()方法和leaveEven…

分享一个爬虫数据挖掘 农村产权交易数据可视化平台 数据分析大数据 Java、python双版(源码、调试、LW、开题、PPT)

&#x1f495;&#x1f495;作者&#xff1a;计算机源码社 &#x1f495;&#x1f495;个人简介&#xff1a;本人 八年开发经验&#xff0c;擅长Java、Python、PHP、.NET、Node.js、Android、微信小程序、爬虫、大数据、机器学习等&#xff0c;大家有这一块的问题可以一起交流&…

形式向好、成本较低、可拓展性较高的名厨亮灶开源了。

简介 AI视频监控平台, 是一款功能强大且简单易用的实时算法视频监控系统。愿景在最底层打通各大芯片厂商相互间的壁垒&#xff0c;省去繁琐重复的适配流程&#xff0c;实现芯片、算法、应用的全流程组合&#xff0c;减少企业级应用约 95%的开发成本&#xff0c;在强大视频算法加…

建筑业首个通过算法备案的大模型发布

建筑业首个通过算法备案的大模型发布 9月10日上午&#xff0c;上海建工四建集团与中国建筑出版传媒有限公司携手推出了Construction-GPT PRO版&#xff0c;这是一款专为建筑行业设计的施工知识大模型。该模型能够理解和生成长达8000字符的内容&#xff0c;其回答速度达到毫秒级…

LLM大模型学习:NLP三大特征抽取器(CNN/RNN/TF)

NLP三大特征抽取器&#xff08;CNN/RNN/TF&#xff09; 结论&#xff1a;RNN已经基本完成它的历史使命&#xff0c;将来会逐步退出历史舞台&#xff1b;CNN如果改造得当&#xff0c;将来还是有希望有自己在NLP领域的一席之地&#xff1b;而Transformer明显会很快成为NLP里担当…

Linux 信息安全:构建坚固的防御体系

摘要&#xff1a; 本文围绕 Linux 信息安全展开。阐述了 Linux 在信息技术中的重要地位&#xff0c;强调信息安全的重要性以及 Linux 信息安全面临复杂网络环境、演变攻击手段与内部威胁等挑战。详细介绍了 Linux 系统的安全架构与机制&#xff0c;包括用户与权限管理、文件系统…

Hexo框架学习——从安装到配置

第一章 Hexo入门 Hexo 是一个快速、简洁且高效的博客框架。 1.1 Hexo的下载与安装 1.1.1 Hexo下载 在下载Hexo之前&#xff0c;我们需要确保电脑上已经安装好以下软件&#xff1a; Node.js (Node.js 版本需不低于 10.13&#xff0c;建议使用 Node.js 12.0 及以上版本) Git…

你真的懂吗系列——串口通信

你真的懂吗 文章目录 你真的懂吗前言二、什么是串口通信二、STM32的串口三、什么是数据通信 前言 串口通信是一种设备间常用的串行通信方式&#xff0c;串口按位&#xff08;bit&#xff09;发送和接收字节。尽管比字节&#xff08;byte&#xff09;的串行通信慢&#xff0c;但…

机器学习算法-决策树算法

文章目录 什么是决策树&#xff1f;决策树的基本概念决策树的构建过程决策树的优缺点优点&#xff1a;缺点&#xff1a; 决策树的优化决策树的应用决策树的实现工具 特征选择准则1. 信息增益&#xff08;Information Gain&#xff09;计算公式&#xff1a;熵&#xff08;Entrop…

ubuntu20.4安装Qt5.15.2

ubantu20.4镜像下载地址&#xff1a; https://releases.ubuntu.com/focal/ubuntu-20.04.6-desktop-amd64.iso Qt5.15.2下载地址&#xff1a; https://download.qt.io/official_releases/online_installers/ 安装步骤 1、进入地址后选择对应安装包&#xff0c;我这是ubuntu…

Redis进阶(二)--Redis高级特性和应用

文章目录 第二章、Redis高级特性和应用一、Redis的慢查询1、慢查询配置2、慢查询操作命令3、慢查询建议 二、Pipeline三、事务1、Redis的事务原理2、Redis的watch命令3、Pipeline和事务的区别 四、Lua1、Lua入门&#xff08;1&#xff09;安装Lua&#xff08;2&#xff09;Lua基…

虚幻引擎 | (类恐鬼症)玩家和NPC语音聊天

SETUP&#xff1a;工具和插件 工具&#xff1a;elevenlabs或者讯飞&#xff0c;用于Speech Synthesis&#xff08;语音合成&#xff0c;text to speech&#xff09;。 https://elevenlabs.io/app/speech-synthesis/text-to-speechhttps://elevenlabs.io/app/speech-synthesis…

海外云手机——跨国业务的高效工具

海外云手机是一种基于云计算的虚拟手机服务&#xff0c;依托海外服务器实现跨国网络访问。这项服务不仅具备传统智能手机的所有功能&#xff0c;还突破了地域限制&#xff0c;为跨国业务提供更加便捷、高效、安全的解决方案。 随着全球化的加速和互联网的快速普及&#xff0c;跨…