RocketMQ发送消息

news2024/10/6 5:58:47

一.消费模式

MQ的消费模式可以大致分为两种,一种是 推Push,一种是 拉Pull。 

  • Push 是 服务端 (MQ) 主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。
  • Pull 是 客户端 需要主动到 服务端 (MQ) 取数据,优点是客户端可以依据自己的消费能力进行消费,但拉取的频率也需要用户自己控制,拉取频繁容易造成服务端和客户端的压力,拉取间隔长又容易造成消费不及时。

Push模式也是基于Pull模式的,所以不管是Push模式还是Pull模式,都是Pull模式。一般情况下,优先选择Pull模式

二.同步消息(***) 

同步消息 发送过后会有一个返回值,也就是mq服务器接收到消息后返回的一个确认,这种方式非常安全,但是性能上并没有这么高,而且在mq集群中,也是要等到所有的从机都复制了消息以后才会返回,所以针对重要的消息可以选择这种方式。

可靠的同步传输被广泛应用于各种场景,如重要的通知消息、短消息通知等。

原生依赖引入:

        <!--  原生api,不是starter      -->
        <dependency>
            <groupId>org.apache.rocketmq</groupId>
            <artifactId>rocketmq-client</artifactId>
            <version>4.9.0</version>
        </dependency>

同步消息生产者:

public class Producer {
    public static void main(String[] args) throws Exception {
            /*
             1. 谁来发?
             2. 发给谁?
             3. 怎么发?
             4. 发什么?
             5. 发的结果是什么?
             6. 关闭连接
             **/
            //1.创建一个发送消息的对象Producer,并指定生产者组名
            DefaultMQProducer producer = new DefaultMQProducer("sync-producer-group");
            //2.设定发送的命名服务器地址
            producer.setNamesrvAddr("ip:9876");
            producer.setSendMsgTimeout(1000000);

            //3.1启动发送的服务
            producer.start();
            //4.创建要发送的消息对象,指定topic,指定内容body
            Message msg = new Message("sync-topic", "hello-rocketmq".getBytes(StandardCharsets.UTF_8));
            //3.2发送消息
            SendResult result = producer.send(msg);
            System.out.println("返回结果:" + result);
            //5.关闭连接
            producer.shutdown();
    }
}

同步消息消费者:

public class Consumer {
    public static void main(String[] args) throws Exception {
        //1.创建一个接收消息的对象Consumer,并指定消费者组名
        //两种模式:①消费者定时拉取模式  ②建立长连接让Broker推送消息(选择第二种)
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("sync-producer-group");
        //2.设定接收的命名服务器地址
        consumer.setNamesrvAddr("ip:9876");
        //3.订阅一个主题,* 表示订阅这个主题的所有消息,后期会有消息过滤
        consumer.subscribe("sync-topic","*");
        //设置当前消费者的消费模式(默认模式:负载均衡)
        consumer.setMessageModel(MessageModel.CLUSTERING);
        //3.设置监听器,用于接收消息(一直监听,异步回调,异步线程)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            //消费消息
            //消费上下文:consumeConcurrentlyContext
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                // 这个就是消费的方法 (业务处理)
                System.out.println("我是消费者");
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("消费上下文:" + context);

                //签收消息,消息会从mq出队
                //如果返回 RECONSUME_LATER 或 null 或 产生异常 那么消息会重新 回到队列 过一会重新投递出来 ,给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        //4.启动接收消息的服务
        consumer.start();
        System.out.println("接受消息服务已经开启!");
        //5 不要关闭消费者!因为需要监听!
        //挂起
        System.in.read();
    }
}

三.异步消息(***)

异步消息通常用在对响应时间敏感的业务场景,即发送端不能容忍长时间地等待Broker的响应。发送完以后会有一个异步消息通知。

例如,视频上传后通知启动转码服务,转码完成后通知推送转码结果等。

 异步消息生产者:

public class AsyncProducer {
    public static void main(String[] args) throws Exception {
        DefaultMQProducer producer = new DefaultMQProducer("async-producer-group");
        producer.setNamesrvAddr("ip:9876");
        producer.start();
        Message message = new Message("async-topic", "我是一个异步消息".getBytes());
        //没有返回值的
        producer.send(message, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {
                System.out.println("发送成功");
            }

            @Override
            public void onException(Throwable e) {
                System.err.println("发送失败:" + e.getMessage());
            }
        });
        System.out.println("我先执行");
        //需要接收异步回调,这里需要挂起
        System.in.read();
    }
}

消费者无特殊变化:

public class SimpleConsumer {
    public static void main(String[] args) throws Exception{
        // 创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("async-producer-group");
        // 连接namesrv
        consumer.setNamesrvAddr("ip:9876");
        // 订阅一个主题  * 标识订阅这个主题中所有的消息  后期会有消息过滤
        consumer.subscribe("async-topic", "*");
        // 设置一个监听器 (一直监听的, 异步回调方式)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                // 这个就是消费的方法 (业务处理)
                System.out.println("我是消费者");
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("消费上下文:" + context);
                // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
                // RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        // 挂起当前的jvm
        System.in.read();
    }
}

四.单向消息(*)

这种方式主要用在不关心发送结果的场景,这种方式吞吐量很大,但是存在消息丢失的风险,一般用于结果不重要的场景,例如日志信息的发送

 

单向消息生产者:

public class SingleWayProducer {
    public static void main(String[] args) throws Exception{
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("single-way-producer-group");
        // 设置nameServer地址
        producer.setNamesrvAddr("ip:9876");
        // 启动实例
        producer.start();
        Message msg = new Message("single-way-topic", ("单向消息").getBytes());
        // 发送单向消息
        producer.sendOneway(msg);
        // 关闭实例
        producer.shutdown();
    }

}

日志服务的编写思路

产生日志的服务利用MQ发送单向消息,不用等回复,大大减少了发送日志的时间,由log-service统一写入日志表中。并且由于日志过于庞大,可以对日志进行冷热分离,近一个月的为热数据,近一年的为冷数据(实际情况据业务而定),存储的位置不同,时间过于久远的日志可以删掉

五.延迟消息(***)

消息放入MQ后,过一段时间,才会被监听到,然后消费

比如下订单业务,提交了一个订单就可以发送一个延时消息,15min后去检查这个订单的状态,如果还是未付款就取消订单释放库存(订单超时)

在分布式定时调度触发、任务超时处理等场景,使用 RocketMQ 的延时消息可以简化定时调度任务的开发逻辑,实现高性能、可扩展、高可靠的定时触发能力。

延迟等级 

延迟消息生产者:

public class DelayProducer {
    public static void main(String[] args) throws Exception{
        // 创建默认的生产者
        DefaultMQProducer producer = new DefaultMQProducer("delay-producer-group");
        // 设置nameServer地址
        producer.setNamesrvAddr("ip:9876");
        // 启动实例
        producer.start();
        Message msg = new Message("delay-topic", ("延迟消息").getBytes());
        // 给这个消息设定一个延迟等级
        // messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h
        msg.setDelayTimeLevel(3);
        // 发送单向消息
        producer.send(msg);
        // 打印时间
        System.out.println(new Date());
        // 关闭实例
        producer.shutdown();
    }
}

 延迟消息消费者(无特殊变化):

public class MSConsumer {
    public static void main(String[] args) throws Exception{
        // 创建一个消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay-producer-group");
        // 连接namesrv
        consumer.setNamesrvAddr("ip:9876");
        // 订阅一个主题  * 标识订阅这个主题中所有的消息  后期会有消息过滤
        consumer.subscribe("delay-topic", "*");
        // 设置一个监听器 (一直监听的, 异步回调方式)
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println(msgs.get(0).toString());
                System.out.println("消息内容:" + new String(msgs.get(0).getBody()));
                System.out.println("收到时间:"+new Date());
                // 返回值  CONSUME_SUCCESS成功,消息会从mq出队
                // RECONSUME_LATER(报错/null) 失败 消息会重新回到队列 过一会重新投递出来 给当前消费者或者其他消费者消费的
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动
        consumer.start();
        // 挂起当前的jvm
        System.in.read();
    }
}

可以通过打印一下时间差来检测一下(第一次有误差很正常)

六.批量消息

Rocketmq可以一次性发送一组消息,那么这一组消息会被当做一个消息消费。

在对吞吐率有一定要求的情况下,可以将一些消息聚成一批以后进行发送,可以增加吞吐率,并减少API和网络调用次数。

将消息打包成 Collection<Message> msgs 传入方法中即可,需要注意的是批量消息的大小不能超过 1MiB(否则需要自行分割),其次同一批 batch 中 topic 必须相同。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/668931.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

分布式光伏电站智能管理系统

随着能源需求的增加&#xff0c;各种各样的光伏电站工程建设出现不同形式的技术缺陷。设计了分布式光伏电站区域智能系统&#xff0c;实现 了各个园区用电数据的集中管理。对光伏电站分布式运营管理进行了研究&#xff0c;采用集中运营管理中心的方法&#xff0c;建立了区域分布…

蓝牙L2CAP协议简介及报文格式

概述 逻辑链路控制和适配协议&#xff08;Logical Link Control and Adaptation Protocol&#xff0c;L2CAP&#xff09;是蓝牙的核心协议&#xff0c;负责适配基带中的上层协议。它同链路管理器并行工作&#xff0c;向上层协议提供定向连接的和无连接的数据业务。L2CAP具有分…

Baumer工业相机堡盟工业相机如何通过BGAPISDK将相机图像写入相机内存(C++)

Baumer工业相机堡盟工业相机如何通过BGAPISDK将相机图像写入相机内存&#xff08;C&#xff09; Baumer工业相机Baumer工业相机BGAPISDK和相机内存的技术背景Baumer工业相机通过BGAPISDK将相机图像写入相机内存功能1.引用合适的类文件2.通过BGAPISDK将相机图像写入相机内存功能…

Cortext-M3系统:异常(3)

1、异常 异常响应系统是再M3内核水平上的&#xff0c;支持众多的系统异常和外部中断。1-15为系统异常&#xff0c;大于16为外部中断。除了个别异常的优先级被定死外&#xff0c;其它异常的优先级都是可编程的。优先级数值越小&#xff0c;优先级越高。CM3支持中断嵌套&#xff…

网络嗅探与ARP欺骗

目录 一、网络嗅探概述 1.1 网络嗅探的概念 1.1.2 一把双刃剑 1.1.3 特点 1.2 网络嗅探的原理 1.2.1 网络嗅探的条件 1.2.2 网卡的工作模式 1.2.3 局域网的传输技术 1.3 网络嗅探的前提 1.3.1 网卡设置为混杂模式 1.3.2 同处在一个广播式局域网内 1.4 嗅探工具的使…

4.23 TCP状态转换 4.24半关闭、端口复用

4.23 TCP状态转换 2MSL(Maximum Segment Lifetime) 主动断开连接的一方&#xff0c;最后进入一个TIME_WAIT状态&#xff0c;这个状态会持续&#xff1a;2msl msl&#xff1a;官方建议&#xff1a;2分钟&#xff0c;实际是30s 当 TCP 连接主动关闭方接收到被动关闭方发送的 FIN…

【kubernetes】部署kube-apiserver与kubectl

前言:二进制部署kubernetes集群在企业应用中扮演着非常重要的角色。无论是集群升级,还是证书设置有效期都非常方便,也是从事云原生相关工作从入门到精通不得不迈过的坎。通过本系列文章,你将从虚拟机准备开始,到使用二进制方式从零到一搭建起安全稳定的高可用kubernetes集…

Flutter 组件集录 | RawMagnifier 组件 - 拿起你的八倍镜

theme: cyanosis 1. 前言 今天看 Flutter 源码&#xff0c;偶然发现 Magnifier 组件&#xff0c;这单词不就是 放大镜 嘛! 再结合新版 Flutter 中输入文本的放大镜效果&#xff0c;直觉告诉我这玩意应该可以放大任何组件。如下所示&#xff0c;背景是一张图片&#xff0c;使用 …

0013-TIPS-pawnyable : Race-Condition

原文 Linux Kernel PWN | 040204 Pawnyable之竞态条件 Holstein v4: Race Condition 题目下载 漏洞代码 #include <linux/module.h> #include <linux/kernel.h> #include <linux/cdev.h> #include <linux/fs.h> #include <linux/uaccess.h> #i…

使用 Vite + Vue3 + Element-Plus + Pinia + Ts 搭建 Vue3 项目

使用 Vite Vue3 Element-Plus Pinia Ts 搭建 Vue3 项目 使用Vite搭建配置Router配置 Element-Plus配置sass配置Pinia配置解析 符号&#xff0c;并找到对应的路径TypeScript忽略类型检查 使用Vite搭建 Vite 需要 Node.js 版本 14.18&#xff0c;16。然而&#xff0c;有些模…

chatgpt赋能python:Python指定小数点位数的完整指南

Python指定小数点位数的完整指南 Python是一种高级编程语言&#xff0c;广泛用于科学、统计和数学计算。在许多情况下&#xff0c;我们需要对浮点数进行更精确的计算。Python 中保留小数位数的能力很强&#xff0c;本文将向您介绍如何在 Python 中指定小数点后的位数。 为什么…

购买服务器/安装宝塔

1、服务器的选择 本人知道并了解一丢丢的就这四个平台&#xff1a; 1、阿里云 2、腾讯云 3、硅云 4、亚马逊 个人觉得阿里云是YYDS&#xff0c;啥都挺方便的&#xff0c;唯一不足就是有点小贵&#xff0c;但是新用户第一次购买还是很优惠的。 腾讯云有的云服务器是真的便宜&am…

【Batch_size 与 梯度 之间的关系】

chatGPT 回答 梯度更新与批大小&#xff08;batch size&#xff09;之间有密切的关系。批大小是指在训练过程中一次迭代所使用的样本数量。 在深度学习中&#xff0c;梯度下降是一种常用的优化算法&#xff0c;用于更新模型参数以最小化损失函数。梯度是损失函数对于模型参数…

Gradio Flagging模块解析与实践

❤️觉得内容不错的话&#xff0c;欢迎点赞收藏加关注&#x1f60a;&#x1f60a;&#x1f60a;&#xff0c;后续会继续输入更多优质内容❤️ &#x1f449;有问题欢迎大家加关注私戳或者评论&#xff08;包括但不限于NLP算法相关&#xff0c;linux学习相关&#xff0c;读研读博…

基于html+css的图展示135

准备项目 项目开发工具 Visual Studio Code 1.44.2 版本: 1.44.2 提交: ff915844119ce9485abfe8aa9076ec76b5300ddd 日期: 2020-04-16T16:36:23.138Z Electron: 7.1.11 Chrome: 78.0.3904.130 Node.js: 12.8.1 V8: 7.8.279.23-electron.0 OS: Windows_NT x64 10.0.19044 项目…

大数据大作业(课程设计)

题目&#xff1a;信息爬取字数统计及可视化 内容及要求&#xff1a; 配置Hadoop平台&#xff1b;利用爬虫技术爬取任一门户网站新闻栏目一定时间段内的新闻信息&#xff0c;保存为一个或多个文件并上传到Hadoop平台以本人学号命名的文件夹下&#xff1b;利用MapReduce框架编程完…

CSS3-显示模式

显示模式 1 块级显示 2 行内显示 3 行内块显示 4 元素显示模式转换 5 拓展 1 块级显示 属性&#xff1a;display:block 显示特点&#xff1a; 1 独占一行&#xff08;一行只能显示一个&#xff09; 2 宽度默认是父元素的宽度&#xff0c;高度默认由内容撑开 3 可以设置宽高 代表…

Cortext-M3系统:异常系统(5)

1、使用中断 在CM3中&#xff0c;NVIC为我们搞定了使用中断时的很多例行任务&#xff0c;如优先级检查、入栈/出栈、取向量等。不过在NVIC能行使职能之前&#xff0c;还需要我们做好如下的初始化工作&#xff1a;建立堆栈、建立向量表、分配各中断的优先级、使能中断。 1.1 建…

node笔记_读取目录的文件

文章目录 ⭐前言⭐fs.readdirSync&#x1f496; 读取目录 不加withFileTypes&#x1f496; 读取目录 加withFileTypes&#x1f496; 读取目录时 判断元素文件还是目录 ⭐结束 ⭐前言 大家好&#xff0c;我是yma16&#xff0c;本文分享关于node读取目录文件 往期文章 node_wind…

【java】Jconsole 开启远程连接遇到的一些坑

文章目录 背景一、JMX二、配置远程连接2.1、Java 程序启动2.2、tomcat 启动2.3、无法远程问题排查2.4、解决方案 三、关闭 tomcat 报错3.1、问题分析3.2、问题解决 总结 背景 最近在学习 JVM&#xff0c;其中涉及到性能、内存等指标分析需要使用工具分享&#xff0c;Java 提供…