rocketMq的使用和消费模式(异步消息,单项消息(使用mq处理日志),推迟任务(占座买票),批量消息)

news2025/1/23 1:04:12

rocketMq开始使用

    • rocketmq是怎么使用的
    • 消费模式
    • 异步消息
    • 单项消息:使用mq处理日志
    • 延迟任务(占座买票)
    • 批量

rocketmq是怎么使用的

public class ASimpleTest {
    @Test
    public void simpleProduce() throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("test-producer-group");//定义生产者组(可以有多个生产者一起往主题里发)
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.setSendMsgTimeout(30000); // 设置超时为30秒
        producer.start();

        try {
            Message message = new Message("testTopic", "我是一个简单的消息".getBytes());
            SendResult sendResult = producer.send(message); // 使用默认超时
            System.out.println("发送结果: " + sendResult.getSendStatus());
        } catch (Exception e) {
            e.printStackTrace(); // 打印异常信息
        } finally {
            producer.shutdown();
        }
    }

    @Test
     public void simpleConsume() throws Exception{
            //创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test-consumer-group");
        //连接namesrv
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        //订阅一个主题*标识订阅这个主题中所有的消息  后续会有消息过滤
        consumer.subscribe("testTopic", "*");
        //设置一个监听器(一直监听,异步回调)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
               //这个就是消费的方法(业务逻辑)
                System.out.println("我是消费者");
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容"+new String(msgs.get(0).getBody()));
                System.out.println("消费上下文"+context);
                //返回值,CONSUME_SUCCESS成功,消息会从mq出队
                //RECONSUME_LATER(报错/null) 失败,消息会重新回到队列,过一会重新投递出来,给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        //启动
        consumer.start();
        //挂起jvm
        System.in.read();//一直读,就不停了

    }
    }

在这里插入图片描述
在这里插入图片描述

消费模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push是服务端【MQ】主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

Pull是客户端需要主动到服务端取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

Push模式也是基于pull模式的,只能客户端内部封装了api,一般场景下,上游消息生产量小或者均速的时候,选择push模式。在特殊场景下,例如电商大促,抢优惠券等场景可以选择pull模式

在这里插入图片描述

异步消息

public class BASyncTest {
    @Test
    public void  asyncProducer()throws Exception{
        DefaultMQProducer producer=new DefaultMQProducer("async-producer-group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        Message message = new Message("asyncTopic", "我是一个异步消息".toString().getBytes());
        producer.send(message, new SendCallback() {//我们发送完消息之后,不是等待他返回,而是先去执行其他任务,如果收到消息,则执行回调函数
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }
            @Override
            public void onException(Throwable e) {
                System.err.println("发送失败:"+e.getMessage());
            }
        });
        System.out.println("我先执行");
        System.in.read();//挂起
    }
}

在这里插入图片描述

单项消息:使用mq处理日志

在这里插入图片描述
代码

public class COnewayTest {
    @Test
    public void onewayProducer()throws Exception{
        DefaultMQProducer producer = new DefaultMQProducer("oneway_producer_group");
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        producer.start();
        Message message = new Message("onewayTopic", "日志XXX".getBytes());
        producer.sendOneway(message);//单项消息,没有返回值
        System.out.println("发送成功");
        producer.shutdown();
    }
}

在这里插入图片描述

延迟任务(占座买票)

在这里插入图片描述

批量

在这里插入图片描述
在这里插入图片描述
详细代码:

public class EBatchTest {
    @Test
    public void testBatchProducer() throws Exception {
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("batch-producer-group");
        // 设置nameServer地址
        producer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        // 启动实例
        producer.start();
        List<Message> msgs = Arrays.asList(
                new Message("batchTopic", "我是一组消息的A消息".getBytes()),
                new Message("batchTopic", "我是一组消息的B消息".getBytes()),
                new Message("batchTopic", "我是一组消息的C消息".getBytes())

        );
        SendResult send = producer.send(msgs);
        System.out.println(send);
        // 关闭实例
        producer.shutdown();
    }
    @Test
    public  void  msConsumer()throws Exception{
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("batch-consumer-group");//DefaultMQPushConsumer!!PUSh
        consumer.setNamesrvAddr(MqConstant.NAME_SRV_ADDR);
        consumer.subscribe("batchTopic","*");
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("收到信息了"+new Date());
                System.out.println(msgs.size());//发的时候是捆绑一起发,消费的时候是单个消费
                System.out.println(new String(msgs.get(0).getBody()));
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
        System.in.read();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1965610.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分享IP 地址混淆知识

由于IPv4 地址资源的枯竭促使 IPv6 技术的广泛应用&#xff0c;从而形成了 IPv4 和 IPv6 并存的局面。这就逐渐出现了 IP 地址混淆导致的网络问题。 IP 地址混淆的表现形式 地址分配错误 在同时支持两种协议的网络中&#xff0c;可能会出现将 IPv4 地址错误地分配给 IPv6 接口…

正则采集器之三——前端搭建

前端使用有名的饿了么管理后台&#xff0c;vue3版本vue3-element-admin&#xff0c;首先从gitee中克隆一个vue3-element-admin模板代码vue3-element-admin: Vue3 Element Admin开箱即用的中后台管理系统前端解决方案&#xff0c;然后在此基础上进行开发。 1、修改vite.config.…

正点原子imx6ull-mini-Linux驱动之按键输入实验(9)

在前几章我们都是使用的 GPIO 输出功能&#xff0c;还没有用过 GPIO 输入功能&#xff0c;本章我们就来学 习一下如果在 Linux 下编写 GPIO 输入驱动程序。I.MX6U-ALPHA 开发板上有一个按键&#xff0c;我们 就使用此按键来完成 GPIO 输入驱动程序&#xff0c;同时利用第四十七…

LeetCode40题: 组合总和 II(原创)

【题目描述】 给定一个候选人编号的集合 candidates 和一个目标数 target &#xff0c;找出 candidates 中所有可以使数字和为 target 的组合。 candidates 中的每个数字在每个组合中只能使用 一次 。 注意&#xff1a;解集不能包含重复的组合。 示例 1: 输入: candidates [1…

安装MongoDB UI客户端工具:mongodb-compass-1.40.2-win32-x64.msi

文章目录 1、安装 mongodb-compass-1.40.2-win32-x64.msi2、安装后配置链接地址&#xff1a; 1、安装 mongodb-compass-1.40.2-win32-x64.msi 2、安装后配置链接地址&#xff1a;

读书其实并没有那么大的作用

开场白 Hey&#xff0c;书虫们和生活探索者们&#xff01;今天我们来聊聊一个老生常谈却又常谈常新的话题——读书。有人说&#xff0c;读书能改变命运&#xff0c;但也有人说&#xff0c;读书不过是生活的调味品。那么&#xff0c;读书到底有啥用&#xff1f;让我们一起来扒一…

卫星导航系统的应用领域与发展前景

当人们提到卫星导航系统&#xff0c;往往会联想到车载导航仪或手机上的地图应用。然而&#xff0c;卫星导航系统的应用远不止于此&#xff0c;它在许多领域都发挥着重要作用。下面将介绍几个卫星导航系统的应用领域及其发展前景。首先是海洋航行安全领域。在过去&#xff0c;海…

搜维尔科技:Haption:对于遥控机器人来说,触觉技术是什么

力反馈遥控机器人有哪些好处&#xff1f; 遥控机器人是机器人技术领域的一个领域&#xff0c;主要涉及远距离控制半自主机器人。它被定义为遥操作和远程呈现的结合。第一部分&#xff0c;遥控操作&#xff0c;使操作员能够远程控制机器人。第二部分&#xff0c;远程呈现&#…

全网最适合入门的面向对象编程教程:29 类和对象的Python实现-断言与防御性编程和help函数的使用

全网最适合入门的面向对象编程教程&#xff1a;29 类和对象的 Python 实现-断言与防御性编程和 help 函数的使用 摘要&#xff1a; 在 Python 中&#xff0c;断言是一种常用的调试工具&#xff0c;它允许程序员编写一条检查某个条件。本文主要介绍了断言的应用场景和特点以及 …

jmeter-beanshell学习13-设置等待时间

接口测试时候&#xff0c;如果交易成功&#xff0c;一切正常&#xff0c;如果交易失败&#xff0c;可能会涉及回滚。之前写的都是做完交易&#xff0c;紧接着去查库&#xff0c;就可能遇到还没回滚完成&#xff0c;已经查完库了&#xff0c;查出来的数据不准确。既然写beanshel…

前端低代码必备:FrontendBlocks 4.0版本重磅发布,助力Uniapp-X原生APP开发

项目介绍 本软件是一款强大的所见即所得前端页面设计器&#xff0c;是低代码开发领域的基础设施&#xff0c;生成的代码不依赖于任何框架&#xff0c;实测可以将前端布局工作的耗时减少80%以上&#xff0c;最关键的是&#xff0c;它实现了人人都可以写前端页面的梦想。 不用写…

相似度计算方法

一、相似度计算方法 相似度算法是计算两个或多个对象之间相似程度的方法&#xff0c;这些对象可以是文本、图像、音频等不同类型的数据。在计算机科学、信息检索、推荐系统、数据挖掘等领域中&#xff0c;相似度算法具有广泛的应用。 二、应用场景 搜索引擎&#xff1a;用于文…

实验3-2 计算符号函数的值

//实验3-2 计算符号函数的值#include <stdio.h> #include <math.h>int main() {int n 0;scanf("%d",&n);int sign;if(n > 0)sign1;else if(n < 0)sign-1;else sign0;printf("sign(%d) %d", n, sign); }

0731作业+梳理

一、作业 1.用两个进程完成拷贝 代码&#xff1a; #include<myhead.h> //定义一个求文件长度函数 int line(const char *pd1,const char *pd2) { int fd1 -1; int fd2 -1; //以只读形式打开源文件 if((fd1 open(pd1,O_RDONLY))-1) { p…

人最大的内耗,是不肯放过自己

你是否也有过这样的经历&#xff1a; 对别人漫不经心的一句话就琢磨很久&#xff0c;生怕产生隔阂&#xff1b;对自己曾经犯过的错误念念不忘&#xff0c;始终无法释怀&#xff1b;工作里出现一点小失误&#xff0c;便整宿翻来覆去难以入眠......每天陷在迷茫、焦虑、恐慌的情…

matlab 2022a 安装教程

下载安装包 &#xff0c;多个压缩包&#xff0c;依次解压 第一步 第二步 2、输入文件安装密钥&#xff1a;“50874-33247-14209-37962-45495-25133-28159-33348-18070-60881-29843-35694-31780-18077-36759-35464-51270-19436-54668-35284-27811-01134-26918-26782-54088” 50…

二百四十九、Linux——修改ulimit限制数量:打开文件的最大数量和用户进程的最大数量

一、目的 在安装OceanBase时脚本报错 [ERROR] OBD-1007: (127.0.0.1) The value of the ulimit parameter "open files" must not be less than 20000 (Current value: 1024), Please execute echo -e "* soft nofile 20000\n* hard nofile 20000" >&…

TiDB系列之:TiCDC同步TiDB数据库数据到Kafka集群Topic

TiDB系列之&#xff1a;TiCDC同步TiDB数据库数据到Kafka集群Topic 一、Changefeed 概述Changefeed 状态流转操作 Changefeed 二、同步数据到Kafka创建同步任务&#xff0c;复制增量数据 KafkaSink URI 配置 kafka最佳实践TiCDC 使用 Kafka 的认证与授权TiCDC 集成 Kafka Connec…

移动硬盘有盘符却难启?数据恢复全攻略

现象解析&#xff1a;移动硬盘有盘符打不开的谜团 在日常的数字生活中&#xff0c;移动硬盘作为数据存储与传输的重要工具&#xff0c;扮演着不可或缺的角色。然而&#xff0c;当用户遇到移动硬盘在系统中显示盘符却无法正常访问的情况时&#xff0c;无疑会令人感到焦头烂额。…

hackme漏洞打靶

1.安装好靶机后点击启动进入这样的一个页面 然后我们就要去找这个靶机的IP地址&#xff0c;首先将该虚拟机网卡设置为net模式&#xff0c;然后在物理机中查看自己ip&#xff0c;看看vmnet8的地址c段是什么&#xff0c;我这里是209&#xff0c;然后用工具去扫描该c段下哪个ip开放…