RabbitMQ之生产批量发送

news2024/12/27 14:55:12

为什么要用生产批量发送?
批量发送消息,可以提高MQ发送性能。但是 RabbitMQ 并没有提供了批量发送消息的 API 接口,使用 spring-amqp 的 BatchingRabbitTemplate 实现批量能力。
SimpleBatchingStrategy 发送策略满足以下规则会进行发送:

batchSize :超过收集的消息数量的最大条数。
bufferLimit :超过收集的消息占用的最大内存。
timeout :超过收集的时间的最大等待时长,单位:毫秒。不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
不过值得注意的是,我们一次发送十条消息到 RabbitMQ Broker 中去,在 RabbitMQ Broker 显示的也是 1 个消息
如下图所示:
在这里插入图片描述
以下为代码实战

===================================》配置类代码
@Configuration
public class RabbitConfiguration {
    @Resource
    ConnectionFactory connectionFactory;

    /**
     * 注入一个批量 template
     * Spring-AMQP 通过 BatchingRabbitTemplate 提供批量发送消息的功能。如下是三个条件,满足任一即会批量发送:
     * <p>
     * 【数量】batchSize :超过收集的消息数量的最大条数。
     * 【空间】bufferLimit :超过收集的消息占用的最大内存。
     * 【时间】timeout :超过收集的时间的最大等待时长,单位:毫秒。
     * 不过要注意,这里的超时开始计时的时间,是以最后一次发送时间为起点。也就说,每调用一次发送消息,都以当前时刻开始计时,重新到达 timeout 毫秒才算超时。
     *
     * @return BatchingRabbitTemplate
     */
    @Bean
    public BatchingRabbitTemplate batchRabbitTemplate() {
        // 创建 BatchingStrategy 对象,代表批量策略
        // 超过收集的消息数量的最大条数。
        int batchSize = 10;
        // 每次批量发送消息的最大内存 b
        int bufferLimit = 1024 * 1024;
        // 超过收集的时间的最大等待时长,单位:毫秒
        int timeout = 10 * 1000;
        BatchingStrategy batchingStrategy = new SimpleBatchingStrategy(batchSize, bufferLimit, timeout);
        // 创建 TaskScheduler 对象,用于实现超时发送的定时器
        TaskScheduler taskScheduler = new ConcurrentTaskScheduler();
        // 创建 BatchingRabbitTemplate 对象
        BatchingRabbitTemplate batchTemplate = new BatchingRabbitTemplate(batchingStrategy, taskScheduler);
        batchTemplate.setConnectionFactory(connectionFactory);
        return batchTemplate;
    }
===========================》生产者代码
@Component
public class Producer09 {
    @Resource
    private BatchingRabbitTemplate batchingRabbitTemplate;

    public void syncSend(String id, String routingKey) {
        Message09 message = new Message09();
        message.setId(id);
        batchingRabbitTemplate.convertAndSend(Message09.EXCHANGE, routingKey, message);
    }
}
==================================>消费者
@RabbitListener(queues = Message09.QUEUE)
@Component
@Slf4j
public class Consumer09 {

    @RabbitHandler
    public void onMessage(Message09 message) {
        log.info("[{}][Consumer09 onMessage][线程编号:{} 消息内容:{}]", LocalDateTime.now(), Thread.currentThread().getId(), message);
    }
}
======================================>测试类
 @Resource
    Producer09 producer09;

    @Test
    void syncSend() throws InterruptedException {
        // 循环发送十个,观察消费者情况
        for (int i = 0; i < 10; i++) {
            String id = UUID.randomUUID().toString();
            producer09.syncSend(id, Message09.ROUTING_KEY);
        }
        log.info("[{}][test producer09 syncSend] 发送成功", LocalDateTime.now());
        // 测试结果是,只有等到十秒过后,或者条数达到10条才会 推送 (满足RabbitConfiguration 的配置才会发送)
        TimeUnit.SECONDS.sleep(12);
    }

以上的是RabbitMQ之生产批量发送代码 若需完整代码 可识别二维码后 给您发代码。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1643464.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

扩展学习|国内外用户画像相关进展一览

文献来源&#xff1a;徐芳,应洁茹.国内外用户画像研究综述[J].图书馆学研究,2020(12):7-16.DOI:10.15941/j.cnki.issn1001-0424.2020.12.002. 一、用户画像的概念 用户画像概念一经提出,便被广泛应用到精准营销等领域。后来,作为一种描绘用户特征、表达用户诉求的有效工具,用户…

Angular进阶-NVM管理Node.js实现不同版本Angular环境切换

一、NVM介绍 1. NVM简介 Node Version Manager&#xff08;NVM&#xff09;是一个用于管理多个Node.js版本的工具。它允许用户在同一台机器上安装和使用多个Node.js版本&#xff0c;非常适合需要同时进行多个项目的开发者。NVM是开源的&#xff0c;支持MacOS、Windows和Linux…

LLM应用:工作流workflow创建自定义模版使用

参考: https://www.coze.cn/ 本案例是在coze平台上操作的,也有其他工具支持工作流的创建例如dify;也例如图像生成的comfyui工作流工具 创建自定义模版 可以根据自己需求创建自己的工作流工具;本文案例是创建一个联网搜索的LLM应用: 创建工作流页面: https://www.coze.c…

Java面试——不安全的集合类

​ 系统性学习&#xff0c;移步IT-BLOG-CN Java 中有许多的集合&#xff0c;常用的有List&#xff0c;Set&#xff0c;Queue&#xff0c;Map。 其中 List&#xff0c;Set&#xff0c;Queue都是Collection&#xff08;集合&#xff09;&#xff0c;List中<>的内容表示其中…

Linux CPU 飙升 排查五步法

排查思路-五步法 1. top命令定位应用进程pid 找到最耗时的CPU的进程pid top2. top-Hp[pid]定位应用进程对应的线程tid 找到最消耗CPU的线程ID // 执行 top -Hp [pid] 定位应用进程对应的线程 tid // 按shift p 组合键&#xff0c;按照CPU占用率排序 > top -Hp 111683.…

华为手机ip地址怎么切换

随着移动互联网的普及&#xff0c;IP地址成为了我们手机上网的重要标识。然而&#xff0c;在某些情况下&#xff0c;我们可能需要切换手机的IP地址&#xff0c;以更好地保护个人隐私、访问特定地区的内容或服务&#xff0c;或者出于其他网络需求。华为手机作为市场上的热门品牌…

【uniapp】H5+、APP模拟浏览器环境内部打开网页

前言 今天将智能体嵌入到我的项目中&#xff0c;当作app应用时&#xff0c;发现我使用的webview组件&#xff0c;无论H5怎么登录都是未登录&#xff0c;而APP却可以&#xff0c;于是进行了测试&#xff0c;发现以下几种情况&#xff1a; 方法<a>标签webviewAPP✅✅网页…

Spring扩展点(一)Bean生命周期扩展点

Bean生命周期扩展点 影响多个Bean的实例化InstantiationAwareBeanPostProcessorBeanPostProcessor 影响单个Bean的实例化纯粹的生命周期回调函数InitializingBean&#xff08;BeanPostProcessor 的before和after之间调用&#xff09;DisposableBean Aware接口在生命周期实例化过…

Hive大数据任务调度和业务介绍

目录 一、Zookeeper 1.zookeeper介绍 2.数据模型 3.操作使用 4.运行机制 5.一致性 二、Dolphinscheduler 1.Dolphinscheduler介绍 架构 2.架构说明 该服务内主要包含: 该服务包含&#xff1a; 3.FinalShell主虚拟机启动服务 4.Web网页登录 5.使用 5-1 安全中心…

[入门] Unity Shader前置知识(5) —— 向量的运算

在Unity中&#xff0c;向量无处不在&#xff0c;我想很多人都使用过向量类的内置方法 normalized() 吧&#xff0c;我们都知道该方法是将其向量归一化从而作为一个方向与速度相乘&#xff0c;以达到角色朝任一方向移动时速度都相等的效果&#xff0c;但内部具体是如何将该向量进…

【计算机科学速成课】笔记二

笔记一 文章目录 7.CPU阶段一&#xff1a;取指令阶段阶段二&#xff1a;解码阶段阶段三&#xff1a;执行阶段 8.指令和程序9.高级CPU设计——流水线与缓存 7.CPU CPU也叫中央处理器&#xff0c;下面我们要用ALU&#xff08;输入二进制&#xff0c;会执行计算&#xff09;、两种…

STM32之HAL开发——ADC入门介绍

ADC简介 模数转换&#xff0c;即Analog-to-Digital Converter&#xff0c;常称ADC&#xff0c;是指将连续变量的模拟信号转换为离散的数字信号的器件&#xff0c;比如将模温度感器产生的电信号转为控制芯片能处理的数字信号0101&#xff0c;这样ADC就建立了模拟世界的传感器和…

C++异常处理实现(libstdc++)

摘要&#xff1a;为了更好的理解C中异常处理的实现&#xff0c;本文简单描述了Itanium ABI中异常处理的流程和llvm/libsdc简要实现。 关键字&#xff1a;C,exception,llvm,clang C他提供了异常处理机制来对程序中的错误进行处理&#xff0c;避免在一些异常情况下无法恢复现场而…

Android C++ 开发调试 LLDB 工具的使用

文章目录 调试环境准备基础命令Breakpoint CommandsWatchpoint CommandsExamining VariablesEvaluating ExpressionsExamining Thread StateExecutable and Shared Library Query Commands 参考&#xff1a; Android 中在进行 NDK 开发的时候&#xff0c;我们经常需要进行 C 代…

漏洞挖掘之某厂商OAuth2.0认证缺陷

0x00 前言 文章中的项目地址统一修改为: a.test.com 保护厂商也保护自己 0x01 OAuth2.0 经常出现的地方 1&#xff1a;网站登录处 2&#xff1a;社交帐号绑定处 0x02 某厂商绑定微博请求包 0x02.1 请求包1&#xff1a; Request: GET https://www.a.test.com/users/auth/weibo?…

88、动态规划-乘积最大子数组

思路&#xff1a; 首先使用递归来解&#xff0c;从0开始到N&#xff0c;每次都从index开始到N的求出最大值。然后再次递归index1到N的最大值&#xff0c;再求max。代码如下&#xff1a; // 方法一&#xff1a;使用递归方式找出最大乘积public static int maxProduct(int[] num…

局部性原理和磁盘预读

局部性原理 磁盘预读 \

Linux---软硬链接

软链接 我们先学习一下怎样创建软链接文件&#xff0c;指令格式为&#xff1a;ln -s 被链接的文件 生成的链接文件名 我们可以这样记忆&#xff1a;ln是link的简称&#xff0c;s是soft的简称。 我们在下面的图片中就是给test文件生成了一个软链接mytest&#xff1a; 我们来解…

【Linux—进程间通信】共享内存的原理、创建及使用

什么是共享内存 共享内存是一种计算机编程中的技术&#xff0c;它允许多个进程访问同一块内存区域&#xff0c;以此作为进程间通信&#xff08;IPC, Inter-Process Communication&#xff09;的一种方式。这种方式相对于管道、套接字等通信手段&#xff0c;具有更高的效率&…

【skill】onedrive的烦人问题

Onedrive的迷惑行为 安装Onedrive&#xff0c;如果勾选了同步&#xff0c;会默认把当前用户的数个文件夹&#xff08;桌面、文档、图片、下载 等等&#xff09;移动到安装时提示的那个文件夹 查看其中的一个文件的路径&#xff1a; 这样一整&#xff0c;原来的文件收到严重影…