RockerMQ学习

news2024/12/24 11:41:04

消息中间件以前常用RabbitMQ和ActiveMQ,由于业务需要,后期业务偏向大数据,现着重学习一下RocketMQ(RocketqMQ原理同ctg-mq),后续更新Kafka

一、RocketMQ特性

在这里插入图片描述

  • Kafka特性 (高性能分布式)
    吞吐量大,支持消息大量推挤,支持topic离线,支持分布式,使用ZooKeeper实现负载均衡,支持Hadoop数据并行加载

  • RocketMQ特性
    (1)Broker 服务器:Broker 服务器是RocketMQ的核心,主要功能:消息的处理(接收生产者Producer发送过来的消息(持久化),推送消息给消费者Consumer),消息的存储。NameServer 服务器:记录Producert信息、Broker信息、Consumer信息、Topic主题信息,NameServer服务器在这里作为控制中心、注册中心、路由,
    服务启动顺序,先启动NameServer再启动Broker,将Broker服务器的ip注册到NameServer服务中
    业务流程:如果生产者Producer需要推送消息至Broker服务器中,需要先去NameServer服务器中查找到对应的Broker服务器,然后生产者端Producer与Broker服务器建立连接
    (2)能够保证严格的消息顺序(顺序消费、顺序拉取)
    丰富的消息拉取模式:push模式(等待Broker推送消息,推荐使用,),pull模式(主动向Broker主动拉取消息),pull与push模式同时可以满足使用需求的情况下,建议优先使用push模式
    (3)可以多节点生产和多节点消费
    (4)消息事务机制,目前只有RocketMQ支持,Kafka和RabbitMQ不支持
    (5)亿级消息堆积
    (6)吞吐量高,但比Kafka低
    (7)消息重推、死信队列

  • RabbitMQ特性
    吞吐量比Kafka、RocketMQ低…
    在这里插入图片描述

二、RocketMQ消费模式

1.Push推模式-DefaultMQPushConsumer原理

Consumer消费者向Broker服务器发送请求,Consumer通过请求与Broker服务器保持一种长连接的形式,Broker服务器每5s检查一次是否存在消息,如果有就推送给Consumer消费者

2.Pull拉模式-DefaultMQPullConsumer原理

Consumer消费者主动去Broker服务器拉取数据,一般使用本地定时任务去拉取,由于需要保证消息的及时性,一般推荐使用Push推模式订阅消息

3. 轮询监控机制

在这里插入图片描述
RocketMQ默认将Producer生产者消息发送至4(不一定4个)个队列中进行存储,Consumer消费方通过轮询的方式去监控这个4个队列(轮询监控机制)

4.ack机制

LocalTransactionState标识消息的状态,通过判断返回的枚举值enum做出相应处理

  1. COMMIT_MESSAGE
    消息可见,目前事务消息分为提交不可见消息和可见消息
  2. ROLLBACK_MESSAGE
    消息需要回滚
  3. UNKNOW
    消息异常或超时时返回该枚举值,重复回查信息
    在这里插入图片描述

三、RocketMQ实战

(1)消息发送实现流程

  1. 引入pom依赖,目前最新版本为5.3.0,推荐使用4.4.0
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>版本</version>
</dependency>

<dependency>
    <groupId>com.alibaba.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rocketmq</artifactId>
</dependency>
  1. 创建DefualtMQProducer实例对象
  2. 设置NaemServer地址
  3. 开启DefaultMQProducer
  4. 创建消息Message
  5. 发送消息
  6. 关闭DefaultMQProducer

(2)消息消费流程实现

目前业务需要实现消费业务,着重学习消费端逻辑

  1. 创建DefaultMQPushConsumer
  2. 设置NameServer地址
  3. 设置subscribe,这里是要读取的主题信息
  4. 创建监听器MessageListener
  5. 获取消息信息
  6. 返回消息读取状态

消费者流程代码,这里使用Push推模式实现,并设置了消息拉取最大上限setConsumeMessageBatchMaxSize(2)为2条消息
监听器这么选择普通监听器MessageListenerConcurrently,如果需要实现顺序消费可以选用MessageListenerOrderly
消息消费时如果有异常出现,注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发RocketMQ消息重推机制
如果消息消费成功,只需返回枚举类enum ConsumeConcurrentlyStatus.CONSUME_SUCCESS即可表示消息推送成功

public class Consumer {
    public static void main(String[] args) throws MQClientException {

        //1. 创建DefaultMQPushConsumer
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("demo_consumer_group");
        //2. 设置NameServer地址
        consumer.setNamesrvAddr("192.168.211.141:9876");
        //3. 设置subscribe,这里是要读取的主题信息
        consumer.subscribe("Topic_Name",//执行要消费的主题
                "Tags || TagsA || TagsB");//过滤规则  "*"则表示全部订阅
        //4. 创建监听器MessageListener
        //4.1 设置消息拉取最大数(上限)-最大拉取两条
        consumer.setConsumeMessageBatchMaxSize(2);
        consumer.setMessageListener(new MessageListenerConcurrently() {
            /*
                MessageListenerConcurrently 普通消息的接收
                MessageListenerOrderly 顺序消息的接收
             */
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                /**
                 * List<MessageExt> msgs 可以从Broker获取多条数据,默认是32条,可以设置上限
                 */
                //5. 获取消息信息
                //迭代消息信息
                for (MessageExt msg : msgs) {
                    //获取主题
                    String topic = msg.getTopic();
                    //获取标签
                    String tags = msg.getTags();

                    //获取信息
                    byte[] body = msg.getBody();
                    try {
                        String result = new String(body, RemotingHelper.DEFAULT_CHARSET);

                        //todo 实现业务...
                        System.out.println("Consumer消费信息--topic: " + topic + ", tags: " + tags + ", result: "+ result);

                    } catch (UnsupportedEncodingException e) {
                        //throw new RuntimeException(e);
                        //注意这里不要抛出异常,打印异常日志即可,直接返回消息失败枚举值 RECONSUME_LATER 触发重推机制
                        e.printStackTrace();
                        //消息消费失败,重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                    //6. 返回消息读取状态
                    //消息消费成功
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;

                }
                return null;
            }
        });

        //开启RockerMQ消费端
        consumer.start();

    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2047037.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

MySQL数据库进阶知识(三)《优化》

学习目标&#xff1a; 一周掌握SQL优化知识 学习内容&#xff1a; 一、插入数据 1.insert优化 批量插入 insert into tb_test values(1,Tom),(2,Cat),(3,Jerry);手动提交事务 start transaction; insert into tb_test values(1,Tom),(2,Cat),(3,Jerry); insert into tb_te…

03 网络编程 TCP传输控制协议

目录 1、TCP基本特征 2、TCP通信流程基本原理 &#xff08;1&#xff09;基本原理 &#xff08;2&#xff09;TCP通信代码实现 &#xff08;3&#xff09;核心API解析 1&#xff09;地址绑定--bind 2)设置监听-listen 3)等待连接请求-accept-产生一个已连接套接字 4&a…

WSL2与Windows之间的网络互访

文章目录 1.环境2.WSL访问Windows上的服务3.Windows访问WSL上的服务 1.环境 1.宿主机 Windows 10 2.WSL ubuntu 18.04 LTS 3.Windows 10上的 vEthernet (WSL) 已启用 2.WSL访问Windows上的服务 1.防火墙设置 2.查看访问Windows的IP 172.21.112.1, 使用该IP访问Windows上的服务…

玩机进阶教程-----回读 备份 导出分区来制作线刷包 回读分区的写入与否 修改xml脚本

很多工作室需要将修改好的系统导出来制作线刷包。前面分享过很多制作线刷包类的教程。那么一个机型中有很多分区。那些分区回读后要写入。那些分区不需要写入。强写有可能会导致不开机 不进系统的故障。首先要明白。就算机型全分区导出后在写回去 都不一定可以开机进系统。那么…

【JVM】JVM 实战调优指南赋案例(保姆篇)

文章目录 JVM 实战调优指南引言1. JVM基础知识1.1 JVM架构1.2 JVM垃圾回收 2. 垃圾回收调优2.1 垃圾回收日志2.2 GC日志分析2.3 调优策略2.3.1 调整堆大小2.3.2 选择合适的GC算法2.3.3 调整垃圾回收线程 3. 内存管理调优3.1 内存泄漏检测3.2 堆转储分析3.3 内存分配策略 4. 线程…

基于飞桨框架的稀疏计算使用指南

本文作者-是 Yu 欸&#xff0c;华科在读博士生&#xff0c;定期记录并分享所学知识&#xff0c;博客关注者5w。本文将详细介绍如何在 PaddlePaddle 中利用稀疏计算应用稀疏 ResNet&#xff0c;涵盖稀疏数据格式的础知识、如何创建和操作稀疏张量&#xff0c;以及如何开发和训练…

在阿里云上部署 Docker并通过 Docker 安装 Dify

目录 一、在服务器上安装docker和docker compose 1.1 首先关闭防火墙 1.2 安装docker依赖包 1.3 设置阿里云镜像源并安装docker-ce社区版 1.4 开启docker服务并设置开机自启动 1.5 查看docker版本信息 1.6 设置镜像加速 1.7 将docker compose环境复制到系统的bin目录下…

【计算机网络】应用层自定义协议与序列化

记得在上一节我们说过TCP中的读取时需要改进&#xff0c;这节就可以解决读取问题了。 目录 应用层再谈 "协议"网络版计算机方案一方案二 序列化 和 反序列化 重新理解 read、write、recv、send 和 tcp 为什么支持全双工 应用层 再谈 “协议” 我们在UDP与TCP中写的…

力扣高频SQL 50题(基础版)第四十七题之1321.餐馆营业额变化增长

力扣高频SQL 50题&#xff08;基础版&#xff09;第四十七题 1321.餐馆营业额变化增长 题目说明 表: Customer ---------------------- | Column Name | Type | ---------------------- | customer_id | int | | name | varchar | | visited_on | date | | amount | …

后端开发刷题 | 排序算法--冒泡排序

描述 有一个长度为7的无序数组&#xff0c;按照从小到大的顺序排序后输出。 输入描述&#xff1a; 数组中的数据 输出描述&#xff1a; 数组中数据排序后输出 示例1&#xff1a; 输入&#xff1a; 13 11 9 7 5 3 1输出&#xff1a; 1 3 5 7 9 11 13 算法思想&#xf…

Type-C PD芯片与OTG功能:边充电边数据同时进行 LDR6028

在科技飞速发展的今天&#xff0c;智能设备已成为我们日常生活中不可或缺的一部分。从智能手机到平板电脑&#xff0c;再到笔记本电脑&#xff0c;这些设备不仅极大地丰富了我们的生活方式&#xff0c;也对充电与数据传输技术提出了更高要求。Type-C PD&#xff08;Power Deliv…

WPF篇(19)-TabControl控件+TreeView树控件

TabControl控件 TabControl表示包含多个共享相同的空间在屏幕上的项的控件。它也是继承于Selector基类&#xff0c;所以TabControl也只支持单选操作。另外&#xff0c;TabControl的元素只能是TabItem&#xff0c;这个TabItem继承于HeaderedContentControl类&#xff0c;所以Ta…

EE trade:黄金的基础知识点

黄金&#xff0c;这种闪耀着金色光芒的贵金属&#xff0c;自古以来就吸引着人类的目光&#xff0c;并深深地影响着人类文明进程。从古代文明的装饰品到现代社会的投资工具&#xff0c;黄金始终扮演着重要的角色。本文整理了黄金的必备常识、黄金的基础知识点。 一、黄金的独特…

达梦数据库系列—48.DMHS实现Mysql到DM8的同步

目录 DMHS实现Mysql到DM8的同步 1、准备介质 2、安装 3、准备源端Mysql和目标端DM8 软件安装 数据库创建 打开归档 开启附加日志 创建辅助表 Mysql客户端驱动 Mysql端安装ODBC 检查依赖包 创建连接用户 创建测试表 4、同步配置 修改服务配置 Mysql到Dm单向同步…

CVPR2023《DNF: Decouple and Feedback Network for Seeing in the Dark》暗光图像增强论文阅读笔记

相关链接 论文链接 https://openaccess.thecvf.com/content/CVPR2023/papers/Jin_DNF_Decouple_and_Feedback_Network_for_Seeing_in_the_Dark_CVPR_2023_paper.pdf 代码链接 https://github.com/Srameo/DNF 摘要 RAW数据的独特属性在低光照图像增强方面展现出巨大潜力。…

ansible环境搭建

任务背景 公司的服务器越来越多, 维护⼀些简单的事情都会变得很繁琐。⽤ shell脚本来管理少量服务器效率还⾏, 服务器多了之后, shell脚本⽆ 法实现⾼效率运维。这种情况下&#xff0c;我们需要引⼊⾃动化运维⼯具, 对 多台服务器实现⾼效运维。 任务要求 通过管理服务器能够…

nginx核心配置示例

目录 1、nginx location的详细使用 &#xff08;1&#xff09;精确匹配 &#xff08;2&#xff09;区分大小写 &#xff08;3&#xff09;不区分大小写 &#xff08;4&#xff09;匹配文件名后缀 2、nginx下的用户认证 3、nginx自定义错误页面 4、自定义错误日志 5、n…

Scrapy框架进阶攻略:代理设置、请求优化及链家网实战项目全解析

scrapy框架 加代理 付费代理IP池 middlewares.py # 代理IP池 class ProxyMiddleware(object):proxypool_url http://127.0.0.1:5555/randomlogger logging.getLogger(middlewares.proxy)async def process_request(self, request, spider):async with aiohttp.ClientSess…

【乐吾乐大屏可视化组态编辑器】状态切换

状态切换 开关状态 开关的断开和闭合。可以拖拽国家电网图库中的“开”与“关”两个组件&#xff0c;选中对齐重叠在一起后&#xff0c;右键选择“组合为状态”&#xff0c;在“外观”面板可以任意切换状态。 想实现点击开关图元就可以切换开关状态&#xff0c;可以选中图元添…

基于 springboot 2 和 vue 3 的 博客论坛系统

1. 网站信息 博客论坛系统&#xff1a;http://106.53.164.141:8200 本网站是 基于 SpringBootVue 前后端分离的博客论坛系统 前台用户&#xff1a;注册登录&#xff1b;博客和活动相关的展示、浏览、点赞、收藏、评论、编辑等功能 后台管理员&#xff1a;管理公告、博客、活…