rocketmq Listener 消费消息的优雅方式(基于SPEL)

news2025/1/13 3:31:00

DefaultMQPushConsumer 配置

package repayment.config;

import cn.itcast.wanxinp2p.repayment.message.diy.DefaultMessageListenerConcurrently;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.springframework.beans.BeansException;
import org.springframework.context.ApplicationContext;
import org.springframework.context.ApplicationContextAware;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RocketMQConsumerConfig {

    // 如果定义了多个 DefaultMQPushConsumer, 请注意 形参 的名字
    @Bean(initMethod = "start", destroyMethod = "shutdown")
    public DefaultMQPushConsumer defaultMQPushConsumer(DefaultMessageListenerConcurrently messageListener) throws MQClientException {
        // 初始化consumer,并设置consumer group name
        DefaultMQPushConsumer consumer = new     DefaultMQPushConsumer("DEFAULT_CONSUMER_GROUP");

        // 设置NameServer地址
        consumer.setNamesrvAddr("localhost:9876");
        //订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
        consumer.subscribe("TEST_TOPIC", "*");

        //注册回调接口来处理从Broker中收到的消息
        consumer.registerMessageListener(messageListener);
        return consumer;
    }


}

自定义 MessageListener

需要特别注意 MessageListener 使用的是 @Autowired 注入的是 MessageHandler 类型的接口

并且执行了 MessageHandler  的getELFilter(),[通过SPEL计算得出]和 test()

计算是该MessageExt否符合.

对于符合的MessageHandler , 先对其 MessageExt 提取Body. 再 执行 具体处理消息的逻辑onMessage()

package repayment.message;

import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.common.message.MessageExt;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.expression.EvaluationContext;
import org.springframework.expression.ExpressionParser;
import org.springframework.expression.spel.standard.SpelExpressionParser;
import org.springframework.expression.spel.support.SimpleEvaluationContext;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

@Slf4j
@Component
public class DefaultMessageListenerConcurrently implements MessageListenerConcurrently {

    @Autowired
    private List<MessageHandler> rocketMQListenerList;

    @SuppressWarnings("unchecked")
    @Override
    public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
        for (MessageExt messageExt : msgs) {
            log.debug("received msg: {}", messageExt);
            try {
                long now = System.currentTimeMillis();
                // rocketMQListener 选择
                ExpressionParser parser = new SpelExpressionParser();
                EvaluationContext cont = SimpleEvaluationContext.forReadWriteDataBinding().build();
                cont.setVariable("messageExt", messageExt);
                Optional<MessageHandler> first = rocketMQListenerList.stream()
                        .filter(rocketMQListener -> {
                            String elFilter = rocketMQListener.getELFilter();
                            if (StringUtils.isBlank(elFilter))
                                return true;
                            return parser.parseExpression(elFilter).getValue(cont, Boolean.class);
                        })
                        .filter(rocketMQListener -> rocketMQListener.test(messageExt))
                        .findFirst();
                // 注意,如果筛选完成没有获取到 rocketMQListener 则自此会抛出异常
                MessageHandler rocketMQListener = first.get();

                // 转换消息并执行
                rocketMQListener.onMessage(rocketMQListener.convertMessage(messageExt));

                long costTime = System.currentTimeMillis() - now;
                log.debug("consume {} cost: {} ms", messageExt.getMsgId(), costTime);
            } catch (Exception e) {
                log.warn("consume message failed. messageExt:{}", messageExt, e);
                return ConsumeConcurrentlyStatus.RECONSUME_LATER;
            }
        }

        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    }
}

MessageHandler 接口定义

package repayment.message;

import org.apache.rocketmq.common.message.MessageExt;

/**
 * @param <T> 消息 body 的数据类型,如果没有重写 convertMessage 方法, 则建议<T> 为 String
 *            消费 RocketMQ 消息的帮助类
 */
public interface MessageHandler<T> {

    /**
     * 通过 SPEL 筛选 MessageHandler 的方式
     * SPEL 上下文设置的了 #messageExt
     *
     * @return 稍后用于计算的 SPEL 表达式(默认返回空字符串,代表不过滤)
     */
    default String getELFilter() {
        return "";
    }

    /**
     * 通过 messageExt  筛选 MessageHandler 的普通方式
     * 默认返回 空字符串,代表不过滤。
     *
     * @param messageExt MessageExt
     * @return true:保留,false:丢弃
     */
    default boolean test(MessageExt messageExt) {
        return true;
    }


    /**
     * @param messageExt MessageExt
     * @return 默认为字符串类型的数据
     */
    default T convertMessage(MessageExt messageExt) {
        return (T) new String(messageExt.getBody());
    }

    /**
     * 具体处理消息的逻辑
     *
     * @param message messageExt.body
     */
    void onMessage(T message);


}

自定义的 HelloMessageHandler

用于解析 topic = TEST_TOPIC, Tags.contains("tag0") 的消息

package repayment.message.handler;

import cn.itcast.wanxinp2p.repayment.ann.MQSelect;
import org.apache.rocketmq.common.message.MessageExt;

// topic = TEST_TOPIC, Tags.contains("tag0")
@Component
public class HelloMessageHandler implements MessageHandler<String> {

    @Override
    public String getELFilter() {
        return "#messageExt.topic == 'TEST_TOPIC'";
    }

    @Override
    public boolean test(MessageExt messageExt) {
        return messageExt.getTags().contains("tag0");
    }

    @Override
    public void onMessage(String message) {
        System.out.println(message);
    }

}

Debug 调试效果

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1493155.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

go语言基础 -- 文件操作

基础的文件操作方法 go里面的文件操作封装在os包里面的File结构体中&#xff0c;要用的时候最好去查下官方文档&#xff0c;这里介绍下基本的文件操作。 打开关闭文件 import("os" ) func main() {// Open返回*File指针&#xff0c;后续的操作都通过*File对象操作…

【XR806开发板试用】第一篇-基于ArchLinux配置开发环境并烧录

前段时间很幸运的申请到了XR806的这块开发板子。由于出差、生病还有各种事情的影响&#xff0c;这周末才有空拿出来收到的板子把玩一番。前段时间也抽空看了看其他工程师的体验文章。初步了解到全志为这块板子提供了比较方便的Linux开发环境。嗯&#xff0c;毕竟搞嵌入式嘛&…

Python乱码恢复

比如说网页是ISO-8859-1编码&#xff0c;然后requests得到的是乱码&#xff0c; 那么这样操作就可以还原数据&#xff1a;res.text.encode(‘ISO-8859-1’).decode(‘utf-8’) 乱码恢复网站&#xff0c;可以知道是什么编码http://www.mytju.com/classCode/tools/messyCodeReco…

【深度学习笔记】计算机视觉——风格迁移

风格迁移 摄影爱好者也许接触过滤波器。它能改变照片的颜色风格&#xff0c;从而使风景照更加锐利或者令人像更加美白。但一个滤波器通常只能改变照片的某个方面。如果要照片达到理想中的风格&#xff0c;可能需要尝试大量不同的组合。这个过程的复杂程度不亚于模型调参。 本…

2024-阿里巴巴灵犀互娱校招内推

灵犀互娱是阿里集团旗下研运一体游戏品牌&#xff0c;在业务模式上&#xff0c;灵犀互娱面向全球&#xff0c;研运一体&#xff0c;坚持精品&#xff0c;打造爆款&#xff0c;重视服务玩家。 访问链接即刻开启内推&#xff1a;https://talent.lingxigames.com/campus/qrcode/…

第十二篇:学习python数据清洗

文章目录 一、啥是数据清洗二、将表格数据导入pandas中1. 准备工作2. 引入csv文件2.1 引入pandas库2.2 读取文件/修改名称3.2 快速浏览数据2.4 修改名字2.5 查找缺失值2.6 删除缺失值 3. 引入Excel文件3.1 引入pandas库3.2 读取Excel文件的人均GDP数据3.3 查看数据类型和non-nu…

速卖通关键字搜索API接口实战:Python代码与搜索策略解析

一、速卖通关键字搜索API简介 速卖通&#xff08;AliExpress&#xff09;作为阿里巴巴旗下的国际电商平台&#xff0c;为卖家和买家提供了便捷的交易渠道。其开放平台提供的API接口允许开发者集成速卖通的各种功能&#xff0c;其中之一就是关键字搜索API。通过这个API&#xf…

备考2024年北京高考数学:20114~2023十年选择题练习和解析

距离2024年高考还有三个月的时间&#xff0c;如何用三个月的时间再提高北京数学高考的成绩&#xff1f;吃透历年真题以及背后的知识点是行之有效的方法 之一。 今天我们来看一下2014-2023年的北京市高考数学的选择题&#xff0c;从过去十年&#xff08;2014-2023&#xff09;的…

【JSON2WEB】09 Amis-editor的代码移植到json2web

【JSON2WEB】01 WEB管理信息系统架构设计 【JSON2WEB】02 JSON2WEB初步UI设计 【JSON2WEB】03 go的模板包html/template的使用 【JSON2WEB】04 amis低代码前端框架介绍 【JSON2WEB】05 前端开发三件套 HTML CSS JavaScript 速成 【JSON2WEB】06 JSON2WEB前端框架搭建 【J…

大语言模型的Scaling laws(尺度定律)的正确认识

源自&#xff1a;人工智能前沿讲习 “人工智能技术与咨询” 发布 实验一 声明:公众号转载的文章及图片出于非商业性的教育和科研目的供大家参考和探讨&#xff0c;并不意味着支持其观点或证实其内容的真实性。版权归原作者所有&#xff0c;如转载稿涉及版权等问题&#xff0c;…

数据分析案例-二手车用户数据可视化分析(文末送书)

&#x1f935;‍♂️ 个人主页&#xff1a;艾派森的个人主页 ✍&#x1f3fb;作者简介&#xff1a;Python学习者 &#x1f40b; 希望大家多多支持&#xff0c;我们一起进步&#xff01;&#x1f604; 如果文章对你有帮助的话&#xff0c; 欢迎评论 &#x1f4ac;点赞&#x1f4…

为什么被蜜蜂蛰了会肿得像馒头

有的人却只是一点点小鼓包。 病情分析&#xff1a;蜜蜂体内存在一种有毒物质&#xff0c;其主要成分是蚁酸&#xff0c;这种成分进入人体后&#xff0c;会和血液发生反应&#xff0c;导致皮肤表现出红肿和瘙痒的症状。一些人群还会对蜜蜂表现出过敏反应&#xff0c;此类人群在…

活动策划整体流程需要考虑哪些要素

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 活动策划整体流程中需要考虑的要素非常多&#xff0c;这些要素通常涵盖从策划前的准备到活动结束后的总结&#xff0c;以下是一些关键的考虑要素&#xff1a; 活动目标&#xff1a;确定活…

单片机为什么需要时钟?2种时钟电路对比?

目录 一、晶体振荡器&#xff08;Crystal Oscillator&#xff09;的核心知识 二、单片机为什么需要时钟电路&#xff1f; 三、单片机的时钟电路方案 01、外部晶振方案 02、内部晶振方案 四、总结 单片机研发设计的项目中&#xff0c;它的最小电路系统包含 电源电路复位…

QT:颜色选择器

普通 Qt提供了一个现成的QColorDialog类。 用法: #include <QColorDialog>QColor color QColorDialog::getColor(Qt::white, this); if(!color.isValid()){//点击 关闭 或 cancel 颜色无效 }else {ui->text->setText(color.name());//类似##ffffQRgb rgb colo…

Android9-W517-使用NotificationListenerService监听通知

目录 一、前言 二、前提 三、方案 方案一 方案二 方案三 方案四 方案五 方案六 方案七 四、关于NotificationListenerService类头注释 五、结论 一、前言 NotificationListenerService可以让应用监听所有通知&#xff0c;但是无法获得监听通知的权限&#xff0c;如…

x86 Ubuntu上编译eudev给龙芯loongarch64架构主机使用

1、下载eudev库eudev-master.zip&#xff0c;链接&#xff1a;eudev库官方地址 2、下载龙芯的交叉编译工具&#xff1a;loongson-gnu-toolchain-8.3-x86_64-loongarch64-linux-gnu-rc1.2.tar.xz&#xff0c;链接&#xff1a;龙芯交叉编译官方地址 3、交叉编译器环境搭建 (1)、…

Spring-Cloud中服务发现是什么?干什么的?怎么用?

&#x1f413; 是什么 Spring Cloud通过Eureka或Consul等服务注册与发现组件来实现微服务间的相互感知。服务提供者将自己的服务信息注册到注册中心&#xff0c;服务消费者从注册中心获取服务提供者的信息&#xff0c;从而进行服务调用。 &#x1f413; 干什么 在Spring Cloud…

nodejs安装教程(及过程中的易错)

nodejs&#xff1a;Nodejs 是基于 Chrome 的 V8 引擎开发的一个 C 程序&#xff0c;目的是提供一个 JS 的运行环境。 npm&#xff1a;npm 是 Node Package Manager 的缩写&#xff0c;意思是 Node 的包管理系统&#xff0c;是最大的软件包仓库 下载nodejs 首先我们需要在node…

VNC 与 虚拟机 保姆级 快速入门图文指导

Time: 2024年3月5日22:31:49 By[ V ]: MemoryErHero 重要的事情先说三遍: 1 虚拟机内无需安装 VNC-Viewer-7.0.1-Windows 2 虚拟机内无需安装 VNC-Viewer-7.0.1-Windows 3 虚拟机内无需安装 VNC-Viewer-7.0.1-Windows 1 VNC 图文安装 流程 ① VNC-Viewer-7.0.1-Windows.e…