RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及批量发送消息、消息过滤实战

news2025/1/12 18:07:11

文章目录

      • 批量发送消息
      • 消息过滤

批量发送消息

批量发送消息可以减少网络的 IO 开销,让多个消息通过 1 次网络开销就可以发送,提升数据发送的吞吐量

在这里插入图片描述

虽然批量发送消息可以减少网络 IO 开销,但是一次也不能发送太多消息

批量消息直接将多个消息放入集合中发送即可,生产者代码如下:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3、把我们的生产者直接启动起来
        producer.start();

        // 4、创建消息、并发送消息
        List<Message> reqList = new ArrayList<>(12);
        for (int i = 0; i < 12; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-batch-topic",
                    "batchTag",
                    "CUSTOM_BATCH",
                    ("("+i+")Hello Message From BATCH Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            reqList.add(message);

        }

        // 利用生产者对象,将消息直接批量发送出去
        producer.send(reqList);

        System.out.println("Send Finished.");
    }
}

消费者代码如下:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch_group");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("custom-batch-topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

消息过滤

消费者组中还可以有过滤操作,对同一个 Topic 下的消息的 Tag 标签进行过滤

但是使用消息过滤时需要 保证同一个消费组中消费的消息的 Tag 相同 ,如果同一个消费者组中的两个消费者订阅了不同的 Tag,比如消费者 A 订阅了 Tag1,消费者 B 订阅了 Tag2,那么可能 B 收到了 Tag1 的数据,发现不是自己想要的,于是将 Tag1 的数据过滤掉了,那么就导致了 A 也收不到 Tag1 的数据,造成数据消失的现象

消息过滤流程图如下:

在这里插入图片描述

消息过滤生产者如下:

public class FilterProducer {

    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        List<Order> list = new ArrayList<>();
        for (int i = 0; i < 12; i ++) {
            Order order = new Order();
            order.orderId = i;
            order.desc = "desc:" + i;
            order.tag = "tag" + i % 3;
            list.add(order);
        }
        for (Order order : list) {
            Message msg = new Message(
                    "Filter-Test-Topic",
                    order.tag,
                    (order.toString()).getBytes());
            msg.setKeys("Filter_Tag");
            msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
            // 直接将 msg 发送出去
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }

    public static class Order {
        int orderId;
        String desc;
        String tag;

        @Override
        public String toString() {
            return "orderId="+orderId+", desc="+desc+", tag="+tag;
        }
    }
}

过滤 tag 的几种用法:

过滤消息的 tag 主要修改一行代码:consumer.subscribe("Filter-Test-Topic", "tag1");,过滤也分几种情况:

  1. 过滤所有 tag

    consumer.subscribe("Filter-Test-Topic", "*");

  2. 过滤单个 tag

    consumer.subscribe("Filter-Test-Topic", "tag1");

  3. 过滤多个 tag

    consumer.subscribe("Filter-Test-Topic", "TG2 || TG3");

  4. 订阅 SQL92 方式(需要修改 custom.conf 文件,添加一行配置:enablePropertyFilter=true)

    consumer.subscribe("Filter-Test-Topic", MessageSelector.bySql("idx > 10"));

    这里的 idx > 10 的 idx 是在生产者中通过下边这行代码放入的:

    msg.putUserProperty("idx", new DecimalFormat("00").format(order.orderId));
    

消息过滤消费者代码如下(只过滤出 tag = tag1 的消息):

public class Subscribe02_Single_Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("Subscribe02_Single_Consumer");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("Filter-Test-Topic", "tag1");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerOrderly() {
            @Override
            public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs, ConsumeOrderlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeOrderlyStatus.SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
        System.out.println("Consumer Started Finished.");
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1319104.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

结构型设计模式(二)装饰器模式 适配器模式

装饰器模式 Decorator 1、什么是装饰器模式 装饰器模式允许通过将对象放入特殊的包装对象中来为原始对象添加新的行为。这种模式是一种结构型模式&#xff0c;因为它通过改变结构来改变被装饰对象的行为。它涉及到一组装饰器类&#xff0c;这些类用来包装具体组件。 2、为什…

将mjpg格式数转化成opencv Mat格式

该博客可以解决如下两个问题&#xff1a; 1、将mjpg格式数据转化成opencv Mat格式 2、v4l2_buffer 格式获取的mjpg格式数据转换成Mat格式。 要将 MJPEG 格式的数据转换为 OpenCV 的 Mat 格式&#xff0c;您可以使用 imdecode 函数。imdecode 函数可以将图像数据解码为 Mat 对象…

【MySQL】数据库和表的操作

数据库和表的操作 一、数据库的操作1. 创建数据库2. 字符集和校验规则&#xff08;1&#xff09;查看系统默认字符集以及校验规则&#xff08;2&#xff09;查看数据库支持的字符集&#xff08;3&#xff09;查看数据库支持的字符集校验规则&#xff08;4&#xff09;校验规则对…

B01、JVM与Java体系结构-01

字节码与多语言混合编程 字节码概述&#xff1a; 我们平时说的java字节码&#xff0c;指的是用java语言编译成的字节码。准确的说任何能在jvm平台上执行的字节码格式都是一样的。所以应该统称为&#xff1a;jvm字节码。不同的编译器&#xff0c;可以编译出相同的字节码文件&…

05 动态渲染数据

概述 实际上动态渲染数据&#xff0c;在《使用CDN开发Vue3项目》中就已经学习过了&#xff0c;核心代码如下&#xff1a; <div id"vue-app">{{text}}</div> <script src"https://cdn.staticfile.org/vue/3.0.5/vue.global.js"></sc…

jsp属性访问控制管理系统Myeclipse开发mysql数据库web结构java编程计算机网页项目

背景&#xff1a;为了解决共享数据授权访问的管理问题&#xff0c;出现了早期的自主访问控制&#xff08;Discretionary Access Control&#xff0c;DAC&#xff09;和强制访问控制&#xff08;Mandatory Access Control&#xff0c;MAC&#xff09;&#xff0c;随着计算机和技…

STM32F103RCT6开发板M3单片机教程06--定时器中断

前言 除非特别说明&#xff0c;本章节描述的模块应用于整个STM32F103xx微控制器系列&#xff0c;因为我们使用是STM32F103RCT6开发板是mini最小系统板。本教程使用是&#xff08;光明谷SUN_STM32mini开发板&#xff09; STM32F10X定时器(Timer)基础 首先了解一下是STM32F10X…

Unity中URP下的菲涅尔效果实现(URP下的法线和视线向量怎么获取)

文章目录 前言一、实现思路二、实现原理我们可以由下图直观的感受到 N 与 L夹角越小&#xff0c;点积越接近&#xff08;白色&#xff09;1。越趋近90&#xff0c;点积越接近0&#xff08;黑色&#xff09; 三、实现URP下的菲涅尔效果1、我们新建一个Shader&#xff0c;修改为最…

机器学习笔记 - 时间序列分析基础概念解释

一、简述 时间序列分析是一种统计方法,可检查定期收集的数据点以揭示潜在的模式。该技术与各个行业高度相关,因为它可以根据历史数据做出决策和预测。通过了解过去并预测未来,时间序列分析在金融、医疗保健、能源、供应链管理、天气预报、营销等领域发挥着至关重要的作用。 …

关于mysql存储过程中N/A和null的使用注意事项

oracle和mysql的存储过程大同小异&#xff0c;但是一些细节还是需要留意的。最近发现mysql的N/A和null在存储过程中容易忽略的一点&#xff0c;这会导致我们的存储过程提前结束。今天突然想起来了就记录一下。   mysql的N/A和null区别网上也说得很详细了&#xff0c;我就不赘…

频谱论文:面向频谱地图构建的频谱态势生成技术研究

#频谱# [1]李竟铭.面向频谱地图构建的频谱态势生成技术研究.2019.南京航空航天大学,MA thesis.doi:10.27239/d.cnki.gnhhu.2019.000556. &#xff08;南京航空航天大学&#xff09; 频谱地图是对无线电环境的抽象表达&#xff0c;它可以直观、多维度地展现频谱态势信息&…

部署智能合约以及 javascript 调用合约函数(Web3项目二实战之三)

在上一篇 智能合约是Web3项目的核心要务(Web3项目二实战之二) ,我们已然为项目编写了智能合约,在攥写完智能合约后,该项目将完成了一大部分,剩下无非就是用户界面交互的内容。 然而,在码完了智能合约代码后,起着承前启后关键性的便是,前端界面与智能合约的交互。 智能…

scroll-behavior属性使用方法

定义和用法&#xff1a; scroll-behavior 属性规定当用户单击可滚动框中的链接时&#xff0c;是否平滑地&#xff08;具动画效果&#xff09;滚动位置&#xff0c;而不是直线跳转。 <style>element{/* 核心代码 */scroll-behavior: smooth;} </style> 属性值&am…

gitlab ci pages

参考文章 gitlab pages是什么 一个可以利用gitlab的域名和项目部署自己静态网站的机制 开启 到gitlab的如下页面 通过gitlab.ci部署项目的静态网站 # build ruby 1/3: # stage: build # script: # - echo "ruby1"# build ruby 2/3: # stage: build …

翻译: 如何分析你的工作是否被AI替代 比如程序员、律师 Additional job analysis examples

我发现对于许多职业角色&#xff0c;人们心中都有一个标志性的任务&#xff0c;这个任务独特地定义了那个职业角色。例如&#xff0c;计算机程序员编写代码。医生可能会看病人。律师去法庭上争论案件。我认为当人们考虑人工智能的机会时&#xff0c;通常会本能地问&#xff0c;…

力扣200. 岛屿数量(java DFS解法)

Problem: 200. 岛屿数量 文章目录 题目描述思路解题方法复杂度Code 题目描述 思路 该问题可以归纳为一类遍历二维矩阵的题目&#xff0c;此类中的一部分题目可以利用DFS来解决&#xff0c;具体到本题目&#xff1a; 1.我们首先要针对于二维数组上的每一个点&#xff0c;尝试展…

基于AT89C52单片机的计算器设计与仿真

点击链接获取Keil源码与Project Backups仿真图&#xff1a; https://download.csdn.net/download/qq_64505944/88637995?spm1001.2014.3001.5503 源码获取 B 源码仿真图课程设计51 摘 要 计算器一般是指“电子计算器”,能进行数学运算的手持机器&#xff0c;拥有集成电路芯…

【Qt图书管理系统】4.系统设计与详细设计

文章目录 核心流程图软件架构设计流程图软件开发类图及功能点 核心流程图 用户登录图书查询图书借阅图书归还账户管理 软件架构设计 流程图 软件开发类图及功能点 Dlg_Login 登录界面 Cell_Main 主窗体 Cell_MyBook 我的书籍 Cell_BookMgr 书籍管理 Cell_RecoredMgr 借阅记录…

ASP.NET MVC实战之权限拦截Authorize使用

1&#xff0c;具体的实现方法代码如下 public class CustomAuthorizeAttribute : FilterAttribute, IAuthorizationFilter{/// <summary>/// 如果需要验证权限的时候&#xff0c;就执行进来/// </summary>/// <param name"filterContext"></par…

C# WPF上位机开发(知识产权ip保护)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 上位机软件如果是和硬件模块搭配开发&#xff0c;这个时候大部分上位机基本上都是白送的&#xff0c;不会收取相关的费用。但是&#xff0c;如果上…