【RocketMQ】RocketMQ入门

news2025/1/10 3:01:07

【RocketMQ】RocketMQ入门

文章目录

  • 【RocketMQ】RocketMQ入门
    • 1. 消费模式
    • 2. 发送/消费 消息
      • 2.1 同步消息
      • 2.2 异步消息
      • 2.3 单向消息
      • 2.4 延迟消息
      • 2.5 批量消息
      • 2.6 顺序消息

1. 消费模式

MQ的消费模式大致分为两种,一种是推Push,一种是拉pull。

Push模式:

  1. 优点:
    • 及时性较好
  2. 缺点:
    • 客户端没有做好流控的话容易导致客户端消息堆积甚至崩溃。

Pull模式:

  1. 优点:
    • 客户端可以根据自己的消费能力进行消费
  2. 缺点:
    • 拉取频率不好控制,频繁容易造成客户端压力过大,拉取间隔长容易造成消费不及时。

Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式


2. 发送/消费 消息

参考文档:RocketMQ官方文档

以下代码采用的都是rocketmq的原生api

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.4.0</version>
</dependency>

2.1 同步消息

同步发送是最常用的方式,是指消息发送方发出一条消息后,会在收到服务端同步响应之后才发下一条消息的通讯方式,可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

image-20230525153823533

生产者发送消息代码如下:

@Test
public void simpleProducer() throws Exception {
    //创建一个生产者 (制定一个组名)
    DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");
    //连接namesrv
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //启动
    producer.start();
    for (int i = 1; i <= 10; i++) {
        //创建消息
        Message message = new Message("testTopic", ("我是一个简单的消息" + i).getBytes());
        //发送消息
        SendResult sendResult = producer.send(message);
        System.out.println(sendResult.getSendStatus());
    }
    //关闭生产者
    producer.shutdown();
}

消费者消费信息代码如下:

@Test
public void simpleConsumer() throws Exception {
    //创建一个消费者
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
    //连接namesrv
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //订阅一个主题 * 标识订阅这个主题中所有消息,后期会有消息过滤
    consumer.subscribe("testTopic", "*");
    //设置一个监听器
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            //这个就是消费的方法(业务处理)
            System.out.println("我是消费者");
            System.out.println(list.get(0).toString());
            System.out.println("消息内容:" + new String(list.get(0).getBody()));
            System.out.println("消费上下文" + consumeConcurrentlyContext);
            //返回值 CONSUME_SUCCESS 成功,消息会从mq出队
            // RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    //启动
    consumer.start();
    //TimeUnit.SECONDS.sleep(100);
    //挂起当前jvm
    System.in.read();
}

2.2 异步消息

异步发送是指发送方发出一条消息后,不等服务端返回响应,接着发送下一条消息的通讯方式。

消息发送方在发送了一条消息后,不需要等待服务端响应即可发送第二条消息,发送方通过回调接口接收服务端响应,并处理响应结果。异步发送一般用于链路耗时较长,对响应时间较为敏感的业务场景。例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

image-20230525154600960

注:异步发送生产者需要实现异步发送回调接口。

生产者发送消息代码如下:

@Test
public void asyncProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
    //连接
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    //启动
    producer.start();
    Message message = new Message("asyncTopic", "我是一个异步消息".getBytes());
    producer.send(message, new SendCallback() {
        @Override
        public void onSuccess(SendResult sendResult) {
            System.out.println("发送成功");
        }

        @Override
        public void onException(Throwable throwable) {
            System.out.println("发送失败" + throwable.getMessage());
        }
    });
    System.out.println("我先执行");
    System.in.read();
}

消费者代码基本和同步消息的相同,不展示。


2.3 单向消息

发送方只负责发送消息,不等待服务端返回响应且没有回调函数触发,即只发送请求不等待应答。此方式发送消息的过程耗时非常短,一般在微秒级别。适用于某些耗时非常短,但对可靠性要求并不高的场景,例如日志收集

image-20230525154748027

生产者发送消息代码如下:

@Test
public void onewayProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("oneway-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();
    Message message = new Message("onewayTopic", "这是一条单向消息".getBytes());
    producer.sendOneway(message);
    System.out.println("成功");
    producer.shutdown();
}

2.4 延迟消息

延迟消息发送是指消息发送到Apache RocketMQ后,并不期望立马投递这条消息,而是延迟一定时间后才投递到Consumer进行消费。应用场景是外卖15分钟未支付则取消订单。

RokcketMQ一共支持18个等级的延迟投递,具体时间如下:

投递等级(delay level)延迟时间投递等级(delay level)延迟时间
11s106min
25s117min
310s128min
430s139min
51min1410min
62min1520min
73min1630min
84min171h
95min182h

生产者发送消息代码如下:

@Test
public void msProducer() throws Exception {
    DefaultMQProducer producer = new DefaultMQProducer("ms-producer-group");
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    producer.start();

    Message message = new Message("orderMsTopic", "订单消息".getBytes());
    //设置延迟等级
    //messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h"
    message.setDelayTimeLevel(3);//10s
    producer.send(message);
    System.out.println("发送事件:" + new Date());
    producer.shutdown();
}

2.5 批量消息

在对吞吐率有一定要求的情况下,Apache RocketMQ可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

image-20230525160131302

注:批量消息大小不能超过1MIB(1024*1024),同一批的 topic 必须相同

生产者发送消息代码如下:

@Test
public void testBatchProducer() throws Exception{
    // 创建默认的生产者
    DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
    // 设置nameServer地址
    producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    // 启动实例
    producer.start();
    List<Message> msgs = Arrays.asList(
            new Message("batchTopic", "我是一组消息的A消息".getBytes()),
            new Message("batchTopic", "我是一组消息的B消息".getBytes()),
            new Message("batchTopic", "我是一组消息的C消息".getBytes())

    );
    SendResult send = producer.send(msgs);
    System.out.println(send);
    // 关闭实例
    producer.shutdown();
}

消费者消费信息代码如下:

@Test
public void msConsumer() throws Exception {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("bathc-consumer-group");
    consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
    consumer.subscribe("batchTopic", "*");
    consumer.registerMessageListener(new MessageListenerConcurrently() {
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
            System.out.println("收到消息了:" + new Date());
            System.out.println(list.size());
            System.out.println("消息体是:" + new String(list.get(0).getBody()));
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        }
    });
    consumer.start();
    System.in.read();
}

2.6 顺序消息

待续。。。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/567775.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

在变压器厂中使用 ISA-95 应用程序进行调度集成

介绍 在工业批量和连续生产/运营环境中&#xff0c;调度涉及将诸如罐、反应器和其他加工设备之类的资源分配给生产/运营任务。第 4 层生产/运营计划确定要制造什么产品、要制造多少产品以及何时制造。根据设备、物料、人员和班次的可用性&#xff0c;随着时间的推移分配资源。…

CSDN中如何获得铁粉(用心篇)

✅作者简介&#xff1a;2022年博客新星 第八。热爱国学的Java后端开发者&#xff0c;修心和技术同步精进。 &#x1f34e;个人主页&#xff1a;Java Fans的博客 &#x1f34a;个人信条&#xff1a;不迁怒&#xff0c;不贰过。小知识&#xff0c;大智慧。 &#x1f49e;当前专栏…

快速实现pytest自定义配置项,让Web自动化测试更便捷!

目录 前言&#xff1a; 一、什么是pytest.ini 二、在pytest.ini中添加自定义配置项 三、使用自定义配置项 四、结论 前言&#xff1a; WEB自动化测试是一个重要的环节&#xff0c;需要结合框架和工具进行开发。在WEB自动化测试中&#xff0c;常用的是pytest框架&#xff…

go sync包

官方文档&#xff1a;https://pkg.go.dev/sync 临界区 临界区(critical section)是指包含有共享数据的一段代码&#xff0c;这些代码可能被多个线程访问 或修改。临界区的存在就是为了保证当有一个线程在临界区内执行的时候&#xff0c;不能有其他任何线程被允许在临界区执行…

workquue

参考 讲解Linux内核工作队列workqueue源码分析 - 知乎 浅谈Linux内核中断下半部——工作队列&#xff08;work queue&#xff09; - 知乎 kernel/workqueue.c 初始化 /** 6004 * workqueue_init_early - early init for workqueue subsystem 6005 * 6006 * This is th…

字节测开5年经验之谈,1分钟了解自动化测试..

引子 写在最前面&#xff1a;目前自动化测试并不属于新鲜的事物&#xff0c;或者说自动化测试的各种方法论已经层出不穷&#xff0c;但是&#xff0c;能够明白自动化测试并很好落地实施的团队还不是非常多&#xff0c;我们接来下用通俗的方式来介绍自动化测试…… 本文共有2410…

WebSocket全双工通信SpringBoot实现

【IT老齐238】十分钟上手WebSocket全双工通信协议_哔哩哔哩_bilibili【IT老齐238】十分钟上手WebSocket全双工通信协议, 视频播放量 8348、弹幕量 23、点赞数 318、投硬币枚数 157、收藏人数 257、转发人数 30, 视频作者 IT老齐, 作者简介 老齐的个人V: itlaoqi001 ~~欢迎前来交…

kubernetes01

kubernetes基础 kubernetes介绍 Kubernetes是Google在2014年开源的一款容器集群系统&#xff0c;简称k8s Kubernetes用于容器化应用程序部署、扩展和管理&#xff0c;目标是让容器化应用简单高效 官方网站&#xff1a;https://kubernetes.io/ 官方文档&#xff1a;https://ku…

PFC(Priority Flow Control)及PFC Storm介绍

文章目录 PFCPFC Storm PFC PFC是一种流量控制机制&#xff0c;用于保证网络中的无损传输&#xff0c;常用于RDMA网络中&#xff0c;以下具体介绍其机制。 如图所示&#xff0c;发送方的出端口发送数据包给接收方的入端口。在发送方的出端口&#xff0c;数据包在至多八个队列中…

AI智能照片编辑:AI Photo for Mac

AI Photo是一款Mac平台上的智能照片编辑软件&#xff0c;它基于人工智能技术&#xff0c;可以帮助用户快速、轻松地对照片进行编辑和美化。AI Photo提供了多种智能修复和美化功能&#xff0c;包括自动调整色彩、对比度、亮度、清晰度等&#xff0c;使得照片的质量得到有效提升。…

二叉树:填充每个节点的下一个右侧节点指针(java)

leetcode116:填充每个节点的下一个右侧节点指针 leetcode原题链接&#xff1a;题目描述递归解法一递归方法二&#xff08;效率更高&#xff09;二叉树专题 leetcode原题链接&#xff1a; 116题&#xff1a;填充每个节点的下一个右侧节点指针 题目描述 给定一个 完美二叉树 &a…

【STL模版库】vector介绍及使用 {构造函数,迭代器,容量相关接口,增删查改;动态二维数组}

一、vector的介绍 vector是表示可变大小数组的序列容器。就像数组一样&#xff0c;vector也采用的连续存储空间来存储元素。也就是意味着可以采用下标对vector的元素进行访问&#xff0c;和数组一样高效。但是又不像数组&#xff0c;它的大小是可以动态改变的&#xff0c;而且它…

Shell iptales防火墙设置

文章目录 Linux 防火墙1.Linux包过滤防火墙概述2.四表五链四表五链 3.规则链之间的匹配顺序主机型防火墙网络型防火墙 4.规则链内的匹配顺序 Linux 防火墙 1.Linux包过滤防火墙概述 Linux 系统的防火墙: IP信息包过滤系统&#xff0c;它实际上由两个组件netfilter 和 iptable…

029:Mapbox GL绘制铁路黑白交替的线段

第029个 点击查看专栏目录 本示例的目的是介绍演示如何在vue+mapbox中加载数据显示铁路标识的那种黑白交替的线段。 直接复制下面的 vue+mapbox源代码,操作2分钟即可运行实现效果 文章目录 示例效果配置方式示例源代码(共94行)相关API参考:专栏目标示例效果 配置方式 1)…

湍流的数值模拟方法概述

湍流&#xff0c;又称紊流&#xff0c;是一种极其复杂、极不规则、极不稳定的三维流动。湍流场内充满着尺度大小不同的旋涡&#xff0c;大旋涡尺度可以与整个流畅区域相当&#xff0c;而小漩涡尺度往往只有流场尺度千分之一的数量级&#xff0c;最小尺度旋涡的尺度通过其耗散掉…

蓝桥杯--挖地雷

没有白走的路&#xff0c;每一步都算数&#x1f388;&#x1f388;&#x1f388; 题目&#xff1a; 已知有很多的地窖&#xff0c;每一个地窖中又藏着很多的地雷&#xff0c;每个地窖之间都存在着相连性&#xff0c;但是不是任意的地窖都是相连的&#xff0c;要求我们找出一次能…

技术干货 | 在 PostgreSQL 中设置查询超时

在 Navicat Monitor 3 监控工具中的查询分析器画面顶部&#xff0c;我们设置了一个图表&#xff0c;用以显示等待时间最长的查询&#xff1a; 能够标识出滞后的查询非常重要&#xff0c;因为它们可以让一切陷入瘫痪。 除了在标识出慢速查询并对其进行修复外&#xff0c;另一种…

【2023 · CANN训练营第一季】昇腾AI入门课(TensorFlow)第二章——TensorFlow模型迁移训练

1.AI模型开发基础知识入门 1.1具备Python编程经验 a.使用位置和关键字参数定义和调用函数 b.字典、列表、集合 (创建、访问和迭代) c.for循环&#xff0c; for具有多个迭代器变量的循环 (例如&#xff0c;for a,b in [(1,2),(3,4)]) d.if/else条件块和条件表达式 e.字符串格式…

Echarts绘制折线图,超简单,源码点击即可运行!【文末源码地址】

文章目录 前言Apache Echarts绘制基础折线图绘制带标记的折线图绘制多条折线图绘制带标签的折线图完整源码地址 前言 本文包含的代码仅为部分片段&#xff0c;完整源码有详细注释&#xff0c;可在文末领取&#xff01; 在当今数字化时代&#xff0c;数据可视化已成为一种必不…

day12 - 图像修复

在图像处理的过程中&#xff0c;经常会遇到图像存在多余的线条或者噪声的情况&#xff0c;对于这种情况我们会先对图像进行预处理&#xff0c;去除掉对图形内容有影响的噪声&#xff0c;在进行后续的处理。 本节实验我们介绍使用图像膨胀来处理图形的多余线条&#xff0c;进行…