微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题

news2024/9/25 0:37:41

~~微服务 RocketMQ-延时消息 消息过滤 管控台搜索问题~~

  • RocketMQ-延时消息
    • 实现延时消息
  • RocketMQ-消息过滤
    • Tag标签过滤
    • SQL标签过滤
  • 管控台搜索问题


RocketMQ-延时消息

给消息设置延时时间,到一定时间,消费者才能消费的到,中间件内部通过每秒钟扫描,判断是否到达要求时间
现在RocketMq并不支持任意时间的延时,需要设置几个固定的延时等级,从1s到2h分别对应着等级1到18"1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";

但这是默认的,我们可以修改
在这里插入图片描述
在这里插入图片描述
想修改可以去rocketmq的conf文件夹,修改broker.conf配置参数
该时间是指消息在中间件里面存储的时间

实现延时消息

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        //定义消息消费者(在同一个JVM中,消费者的组名不能重复)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("helloConsumerGroup");
        //设置nameServer地址
        consumer.setNamesrvAddr("43.143.161.59:9876");
        //设置订阅的主题
        consumer.subscribe("helloTopic","*");
        //设置消息的监听器
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息消费时间:"+new Date()+",消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        //定义一个生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("helloGroup");
        //连接nameServer
        producer.setNamesrvAddr("43.143.161.59:9876");
        //启动生产者
        producer.start();
        //设置消息发送的目的地
        String topic = "helloTopic";
        //发送消息
        Message msg = new Message(topic,("延时消息,发送时间:"+new Date()).getBytes(Charset.defaultCharset()));
        //设置消息延时级别
        msg.setDelayTimeLevel(3);
        producer.sendOneway(msg);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        //关闭资源
        producer.shutdown();
    }
}

RocketMQ-消息过滤

Tag标签过滤

用Tag方式进行过滤的方法是传入感兴趣的Tag标签,Tag标签是一个普通字符串,是在创建Message的时候添加的,一个Message只能有一个Tag。使用Tag方式过滤非常高效。

生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("tagProduceGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "tagFilterTopic";
        Message msg1 = new Message(topic,"TagA",("消息A").getBytes(Charset.defaultCharset()));
        Message msg2 = new Message(topic,"TagB",("消息B").getBytes(Charset.defaultCharset()));
        Message msg3 = new Message(topic,"TagC",("消息C").getBytes(Charset.defaultCharset()));
        producer.sendOneway(msg1);
        producer.sendOneway(msg2);
        producer.sendOneway(msg3);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }
}

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("tagFilterConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("tagFilterTopic","TagA || TagC");
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果
在这里插入图片描述

SQL标签过滤

可以过滤内容,像写where一样

生产者类:

public class Producer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("sqlProduceGroup");
        producer.setNamesrvAddr("43.143.161.59:9876");
        producer.start();
        String topic = "sqlFilterTopic";
        Message msg1 = new Message(topic,("美女A,年龄22,体重45").getBytes(Charset.defaultCharset()));
        msg1.putUserProperty("age","22");
        msg1.putUserProperty("weight","45");
        Message msg2 = new Message(topic,("美女B,年龄25,体重60").getBytes(Charset.defaultCharset()));
        msg2.putUserProperty("age","25");
        msg2.putUserProperty("weight","60");
        Message msg3 = new Message(topic,("美女C,年龄40,体重70").getBytes(Charset.defaultCharset()));
        msg3.putUserProperty("age","40");
        msg3.putUserProperty("weight","70");
        producer.sendOneway(msg1);
        producer.sendOneway(msg2);
        producer.sendOneway(msg3);
        System.out.println("消息发送完毕.");
        TimeUnit.SECONDS.sleep(5);
        producer.shutdown();
    }
}

消费者类:

public class Consumer {
    public static void main(String[] args) throws Exception {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sqlFilterConsumerGroup");
        consumer.setNamesrvAddr("43.143.161.59:9876");
        consumer.subscribe("sqlFilterTopic", MessageSelector.bySql("age>23 and weight>60"));
        consumer.setMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                for(MessageExt msg:list){
                    String s = new String(msg.getBody(), Charset.defaultCharset());
                    System.out.println("消息的内容:"+s);
                }
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //启动消费者
        consumer.start();
    }
}

运行结果:
在这里插入图片描述
原因是因为默认是不支持sql过滤的,需要更改配置文件之后重启broker服务
在这里插入图片描述
在文件最后一行添加enablePropertyFilter=true即可
随后重新启动broker服务
在这里插入图片描述
之后运行结果
在这里插入图片描述


管控台搜索问题

为什么有时候管控台的消息都没有显示收到此消息,但消费者却能消费?

因为时间问题,因为我们的rocketmq是部署在虚拟机上的,当我们虚拟机和windows时间是同步的时候,消息是没有问题的,控制台显示时间内上下波动一小时的消息,但当虚拟机关掉的时候,时间是不动的,windows的时间却因为电脑里面的一个物理小电池,时间还在正常运行,两者时间不同步了,造成我们发消息是虚拟机的时间,控制台显示的是windows的时间,但消费因为并没有按照时间过滤,所以还是可以接收的到,把时间改一下又可以看到消息了

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/340453.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

TCP的运输连接管理

TCP的运输连接管理 文章目录TCP的运输连接管理TCP报文格式简介首部各个字段的含义控制位(flags)TCP的连接建立抓包验证一些细节及解答TCP连接释放抓包验证一些细节及解答参考TCP是面向连接的协议。运输连接是用来传送TCP报文的。TCP运输连接的建立和释放时每一次面向连接的通信…

第一部分:简单句——第一章:简单句的核心——二、简单句的核心变化(主语/宾语/表语的变化)

二、简单句的核心变化 简单句的核心变化其实就是 一主一谓&#xff08;n. v.&#xff09; 表达一件事情&#xff0c;谓语动词是其中最重要的部分&#xff0c;谓语动词的变化主要有四种&#xff1a;三态加一否&#xff08;时态、语态、情态、否定&#xff09;&#xff0c;其中…

MSI_MSI-X中断之源码分析

MSI_MSI-X中断之源码分析 文章目录MSI_MSI-X中断之源码分析一、 怎么发出MSI/MSI-X中断1.1 在RK3399上体验1.1.1 安装工具1.1.2 查看设备MSI-X信息1.1.3 验证MSI-X信息二、 怎么使用MSI/MSI-X三、 MSI/MSI-X中断源码分析3.1 IRQ Domain创建流程3.1.1 GIC3.1.2 ITS3.1.3 PCI MSI…

Linux C/C++ timeout命令实现(运行具有时间限制)

Linux附带了大量命令&#xff0c;每个命令都是唯一的&#xff0c;并在特定情况下使用。Linux timeout命令的一个属性是时间限制。可以为任何命令设置时间限制。如果时间到期&#xff0c;命令将停止执行。 如何使用timeout命令 我们将解释如何使用Linux timeout命令 timeout […

七、Git远程仓库操作——团队成员内协作

1. github远程协作的两种方式 前面我写的笔记&#xff0c;都是自己一个人在玩&#xff0c;无论是本地操作还是推送到远程都是自己推送到自己的仓库。 如果是别人拥有这个仓库&#xff0c;而我想对这个仓库的内容更改后&#xff0c;然后想推送更新到这个仓库&#xff0c;我们要…

【随笔】我迟到的2022年度总结:突破零粉丝,1个月涨粉1000+,2023年目标3万+

前言 我是21年12月注册的csdn&#xff0c; 作为用户平时看看文章&#xff0c;从未参与过写文章这件事。 但这一年的时间我见证了很多新号的崛起&#xff0c;有的号我平时关注比较多&#xff0c;看着他们从零粉丝突破了三万甚至五万的粉丝量。 在csdn上遇到了我的贵人&#x…

位运算 | 1356. 根据数字二进制下 1 的数目排序

LeetCode 1356. 根据数字二进制下 1 的数目排序 给你一个整数数组 arr 。请你将数组中的元素按照其二进制表示中数字 1 的数目升序排序。如果存在多个数字二进制中 1 的数目相同&#xff0c;则必须将它们按照数值大小升序排列。 文章讲解https://www.programmercarl.com/1356.%…

【微电网】基于风光储能和需求响应的微电网日前经济调度(Python代码实现)

目录 1 概述 2 知识点及数学模型 3 算例实现 3.1算例介绍 3.2风光参与的模型求解 3.3 风光和储能参与的模型求解 3.5 风光储能和需求响应都参与模型求解 3.6 结果分析对比 4 Python代码及算例数据 1 概述 近年来&#xff0c;微电网、清洁能源等已成为全球关注的热点…

Linux Vim编辑器基础讲解

目录 vim三个模式 命令模式 输入模式&#xff08;insert 插入模式、编辑模式&#xff09; 末行模式 编辑简单文档 什么是vim Vim是文本编辑器&#xff0c;是Linux上最常用的文本编辑器 Vim可以建立、编辑、显示文件 绝大多数Linux都会携带vim或者vi vim编辑器和vi编辑器的…

windbg-应用层实时调试

调试符号windbg使用一个或多个目录来存放符号条件&#xff0c;并使用环境变量_NT_SYMBOL_PATH来指向这些环境变量的位置&#xff0c;对操作系统内部模块的符号文件&#xff0c;一般用http://msdl.microsoft.com/download/symbols配置如下&#xff1a;SRV*C:\Symbols*http://msd…

手把手教你部署ruoyi前后端分离版本

下载源码&#xff08;当前版本3.8.5&#xff09;RuoYi-Vue: &#x1f389; 基于SpringBoot&#xff0c;Spring Security&#xff0c;JWT&#xff0c;Vue & Element 的前后端分离权限管理系统&#xff0c;同时提供了 Vue3 的版本 (gitee.com)创建数据库(一定要是这三个&…

【STM32】【HAL库】遥控关灯3 遥控器

相关连接 【STM32】【HAL库】遥控关灯0 概述 【STM32】【HAL库】遥控关灯1主机 【STM32】【HAL库】遥控关灯2 分机 【STM32】【HAL库】遥控关灯3 遥控器 需求 硬件遥控器 控制一个灯的开关(2个按键),发射RF433或红外 使用纽扣电池供电 一键启动,低待机功耗 硬件设计 一键…

推荐系统开源工具RecBole学习

文章全文首发&#xff1a;码农的科研笔记&#xff08;公众号&#xff09; RecBole是由AI Box团队开发的基于Pytorch的推荐系统算法库。该框架从数据处理、模型开发和算法训练都有涉及&#xff0c;能方便进行算法构建和实验对比。 数据组织形式 RecBole约定了一个统一、易用的数…

发生异常: AttributeError ‘xxx’ object has no attribute ‘ooo’

python 发生异常: AttributeError ‘xxx’ object has no attribute ‘ooo’ 原因&#xff1a; 函数调用发生在变量定义之前 示例分析&#xff1a; 在apple.py文件中代码如下&#xff1a; class Apple():def __init__(self):self.eat()self.pricedef eat(self):print("吃…

Spring Security in Action 第十八章 手把手OAuth2应用

本专栏将从基础开始&#xff0c;循序渐进&#xff0c;以实战为线索&#xff0c;逐步深入SpringSecurity相关知识相关知识&#xff0c;打造完整的SpringSecurity学习步骤&#xff0c;提升工程化编码能力和思维能力&#xff0c;写出高质量代码。希望大家都能够从中有所收获&#…

Java:SpringMVC的使用(2)

目录第十二章 REST风格CRUD练习12.1 搭建环境12.2 实现功能思路第十三章 SpringMVC消息转换器13.1 消息转换器概述13.2 使用消息转换器处理请求报文(1) 使用RequestBody获取请求体(2) 使用HttpEntity\<T>获取请求体及请求头13.3 使用消息转换器处理响应报文(1) 使用Respo…

llvm 创建外部调用函数方法

llvm 创建外部调用函数方法 2023-02-12 15:26:19 sizaif 文章目录llvm 创建外部调用函数方法法一:声名参数类型及函数类型在llvm IR中处理并调用函数:外部函数&#xff1a;法二声明函数在llvm IR中处理并函数调用外部函数法一: 声名参数类型及函数类型 // Fun Ty static Fun…

【CS224W】(task3)NetworkX工具包实践

note 节点可以为任意可哈希的对象&#xff0c;比如字符串、图像、XML对象&#xff0c;甚至另一个Graph、自定义的节点对象。通过这种方式可以自由灵活地构建&#xff1a;图为节点、文件为节点、函数为节点&#xff0c;等灵活的图形式。暂时省略&#xff1a;【B5】计算机网络图…

vue3学习资料整理

一、一个后端程序员为什么要学习前端&#xff1f; 1.网上找到的学习理由 《Java后端的我也要学Node.js 了》 https://blog.csdn.net/yusimiao/article/details/104689007 《nodejs后端开发的优缺点&#xff08;nodejs的概念与特征详解&#xff09;》 https://www.1pindao.co…

2023级浙江大学MEM提面最新经验分享

一、个人背景背景&#xff1a;本人毕业于某211大学工程管理相关专业&#xff0c;目前定居在杭州&#xff0c;在某汽车制造公司工作&#xff0c;负责研发无人驾驶项目。我申请的是浙大MEM提前批面试&#xff0c;因为通过提面优秀资格顺利上岸录取&#xff0c;之前感到对自己有帮…