多线程拉取+kafka推送

news2025/1/3 20:58:21

多线程拉取+kafka推送

1 多线程

在本次需求中,多线程部分我主要考虑了一个点,就是线程池的配置如何最优。因为数据量级比较大,所以这个点要着重处理,否则拉取的时间会非常长或者是任务失败会比较频繁;
因为数据的量级比较大,所以我决定进行分组,然后循环,一个组作为一个任务批次丢到线程池中,当该组拉取结束后,把该组拉取的结果进行数据推送。可以理解为我们采用了小步快跑的方式;
在这个过程中我们需要考虑的点有 一、每组多少条任务比较合适、二、如何感知该组是否已经拉取完成,因为它决定了我们推送的时机;
首先是每组多少条任务比较合适这个问题;
我的策略是每组的任务数和核心线程数保持一致,然后把工作队列的大小也设置为核心线程数的大小并将非核心线程数置为0。所以这就必须要处理线程池的配置问题:

@Bean("privatePoolExecutor")
	public ThreadPoolTaskExecutor executor() {
		ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
		//配置核心线程数-CPU密集型
		executor.setCorePoolSize(Runtime.getRuntime().availableProcessors() + 1);
		//配置最大线程数: 其值减去核心线程数就是非核心线程数
		executor.setMaxPoolSize(Runtime.getRuntime().availableProcessors() + 1);
		//配置队列大小
		executor.setQueueCapacity(Runtime.getRuntime().availableProcessors() + 1);
		//线程池维护线程所允许的空闲时间
		executor.setKeepAliveSeconds(30);
		//配置线程池中的线程的名称前缀
		executor.setThreadNamePrefix("BIJob-privatePoolExecutor-");
		//设置线程池关闭的时候等待所有任务都完成再继续销毁其他的Bean
		executor.setWaitForTasksToCompleteOnShutdown(true);
		// 拒绝策略:CALLER_RUNS的含义是不在新线程中执行任务,而是由调用者所在的线程来执行
		executor.setRejectedExecutionHandler(new ThreadPoolExecutor.CallerRunsPolicy());
		//执行初始化
		executor.initialize();
		return executor;
	}

首先可以看到我把核心线程数置为CPU核心数+1,非核心线程数置为0且阻塞队列大小也设置为CPU核心数+1,这是一个非常定制化的配置。因为这建立在我能确定该工作队列的大小能保证我每组的任一条任务都不会去执行拒绝策略[介绍1],我之所以可以确定的原因就是因为我把每组的任务数设定为了CPU核心数+1,这样即使在一个循环中所有的线程都被阻塞时,我的工作队列也能确保任务的不丢失;

接下来我们来看一下参数为什么这样配置;
首先是核心线程数为什么要配置成这么多,依据是什么;
这个点是我通过查询的结果,一种是加1这种叫CPU密集型任务,还有一种是乘2这种叫做IO密集型。之所以选择前者是因为我发现如果我采用后者那我数据拉取失败的概率较大,每次都有超时。所以我是被迫的;
接下来我们介绍第二个大问题,也就是如何感知该组的任务已经执行结束了呢?
此处需要我们使用一个类叫做CountDownLatch,伪代码:

// 将阻塞次数设定为数组长度
CountDownLatch latch = new CountDownLatch(loadList.size);
// 单次循环
loadList.forEach((value) -> privatePoolExecutor.execute(() -> {
try{
        pushList.add(loadData(value));
}catch(Ex e){
    log.err(e);
}finally{
    // 执行一次,次数-1
    latch.countDown();
}
}));
try{
    // 不为0就一直阻塞在这里
    latch.await();
}catch(Ex e){
    log.err(e);
}
// 为0后开始推送
pushData(pushList);

使用该类之后,我们就可以感知到该组任务在什么时候执行结束了;

2 kafka部分

这部分没有什么特殊的,就是简单的导入依赖然后使用boot封装的kafkaTemplate这个Bean来推送消息;
主要介绍一下它的属性配置:

# kafka配置
# kafka服务器集群
spring.kafka.bootstrap-servers=xxx:9092,xxx:9092,xxx:9092
# 消息压缩算法,批量时提效
spring.kafka.producer.compression-type=lz4
# 生产者日志记录ID
spring.kafka.producer.client-id=xxx
# ack确认方式,0发送给主broker直接返回、1发送后要确认主broker入盘,-1发送后确认主broker入盘且从broker也入盘
spring.kafka.producer.acks=1
# 批量消息大小16KB,其中满足batch-size或linger.ms其中任意一个条件时会自动触发消息推送
spring.kafka.producer.batch-size=16384
# 生产端缓冲区大小32M
spring.kafka.producer.buffer-memory=33554432
# 失败重试次数
spring.kafka.producer.retries=3
# 提交延时,接收到主broker消息后延迟1s发送下一个消息
spring.kafka.producer.properties.linger.ms=1000
# 单条消息最大字节数10M
spring.kafka.producer.properties.max.request.size=10485760

3 介绍

介绍1:
说到这里我们就必须要知道线程池的运行机制,它里面是支持corePoolSize、maximumPoolSize、keepAliveTime、unit、workQueue、threadFactory、handler(jdk1.8-帮助文档搜ThreadPoolExecutor)这七大参数的;
但是呢,在这里我们只介绍corePoolSize、maximumPoolSize、workQueue、handler这四个,它们四个之间的在有界工作队列的联动机制:
1 线程池收到任务后,首先判断此时是否存在空闲的核心线程,如果有就启动核心线程然后处理。如果没有就把任务放到工作队列中;
2 如果工作队列没有满就继续插入,如果满了就需要判断是否存在非核心线程,如果存在就启动,然后执行任务。如果不存在就执行拒绝策略;
说到这里,这句话就非常容易理解了;在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/442661.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring Security OAuth2.0(一)-----前言-授权码模式及代码实例

什么是 OAuth2 OAuth 是一个开放标准,该标准允许用户让第三方应用访问该用户在某一网站上存储的私密资源(如头像、照片、视频等),而在这个过程中无需将用户名和密码提供给第三方应用。实现这一功能是通过提供一个令牌&#xff08…

如何治理“网络暴力” 在人类文明不断发展向前的进程中,大数据时代应运而来。数学建模解题步骤,愚见而已,欢迎指错和探讨呀~

题目可见文章:(20条消息) 如何治理“网络暴力” 在人类文明不断发展向前的进程中,大数据时代应运而来。 数学建模,90%成品论文,附附件、原题、代码 注,水平有限,非广告,仅供交流参考&#xff0c…

6、ThingsBoard使用jar包自己构建镜像部署

1、概述 这一节主要讲解你自己使用jar包构建镜像,一般在很多企业中,都是使用Jenkins配置流水线,自动打包,然后拷贝程序在target目录下生成的jar包,然后使用Dockerfile文件进行构建镜像,其实我这一节讲的也是类似,只是不使用Jenkins来实现自动,原理都一样,估计也是很多…

C++ MySQL存储二进制数据、存储照片

版权声明:本文为CSDN博主「intfre」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。 原文链接:https://blog.csdn.net/nibiru_holmes/article/details/51387047 0x01 首先MySQL支持二进制的类型有Blob: …

Doris-1.2.0升级到Doris-1.2.4

0 背景 在使用doris-1.2.0版本时发现BE节点无故宕机,自己尝试解决无果后再官网寻找解决方案,发现在doris-1.2.0版本中存在这样的隐患bug导致BE节点宕机。 而在咨询社区之后建议对doris进行升级,升级版本doris-1.2.4。该版本是解决1.2.x问题…

Springboot集成neo4j实现知识图谱关系图

提示:文章写完后,目录可以自动生成,如何生成可参考右边的帮助文档 文章目录 前言一、neo4j是什么?二、安装步骤1.启动2.使用2.简单命令 二、使用springboot集成neo4j1.引入依赖2.功能实现3.查询关系节点4. 查询指定评委和指定选手…

基于matlab使用光线追踪自定义 CDL 通道模型

一、前言 此示例演示如何使用光线追踪分析的输出自定义 CDL 通道模型参数。该示例演示如何: 指定发射器和接收器在 3D 环境中的位置。 使用光线追踪来计算通道的几何方面:光线数量、角度、延迟和衰减。 使用光线追踪分析的结果配置 CDL 通道模型。 使用相…

KDYZ-YM晶闸管伏安特性测试仪

一、概述 晶闸管的伏安特性是晶闸管的基本特性,这项特性的好坏,直接影响到器件在整机上的正常使用。因此,检测晶闸管的伏安特性在晶闸管器件的生产、经销及使用过程中都是十分重要的。 测试方法符合国标JB/T7624-94《整流二极管测试方法》和J…

深入理解Go语言中的函数【单元测试】14

文章目录 go test工具测试函数测试函数的格式测试函数示例 测试组子测试测试覆盖率基准测试基准测试函数格式基准测试示例性能比较函数重置时间并行测试 Setup与TearDownTestMain子测试的Setup与Teardown 示例函数示例函数示例 go test工具 Go语言中的测试依赖go test命令。编…

Three.js+TypeScript+Webpack学习记录(一)

使用环境参考 Node.js v16.19.1 VSCode 插件:Live Server 正文 初始化新工程 安装好 node 环境后,新建一个空项目文件夹,执行 npm init 一路回车即可。 然后配置 npm 所需要的包,直接列一下 package.json {"name":…

SpringBoot集成模板引擎Thymeleaf

本博文重点内容共3个方面&#xff0c;分别是&#xff1a; 在SpringBoot中使用thymeleaf自定义thymeleaf工具类导出静态网页thymeleaf常用的标签 一、在SpringBoot中使用thymeleaf pom.xml <!--Thymeleaf 启动器--><dependency><groupId>org.springframewo…

java基础总结-java技术栈快速复习

java基础 java基础概念 java概述和语言背景 java语言是没有sun公司&#xff08;Stanford University Network&#xff1a;斯坦福大学网络&#xff09;在1995年推出的计算机语言java之父&#xff1a;詹姆斯高斯林&#xff08;James Gosling&#xff09;2009年&#xff0c;sun公…

2021地理设计组二等奖:基于多源数据的黑龙江省森林康养适宜性评价及康养产品设计

一、作品背景 1.森林康养产业蓬勃发展 为适应逐渐增加的康养需求&#xff1b;国家草原局印发《关于促进森林康养产业发展的意见》&#xff0c;在《意见》中指出到2022年&#xff0c;建成基础设施基本完善、产业布局较为合理的区域性森林康养服务体系&#xff1b;到2035年&…

网络工程师的水平检测1

水平测试 文章目录 水平测试填空题&#xff08;11分&#xff09;判断题&#xff08;9分&#xff09;选择题&#xff08;8分&#xff09;简答题&#xff08;26分&#xff09;子网划分&#xff08;24分&#xff09;实验拓扑&#xff08;19分&#xff09;填空题&#xff08;5分&am…

变压器绕制

变压器同名端 1、变压器同名端&#xff0c;是指在变压器绕制的时候&#xff0c;各绕组方向统一&#xff0c;同名端同时都为进线&#xff08;起始端&#xff09; 或出线&#xff08;结束端)。若某一个绕组骨架插入夹头方向反向&#xff0c;则相应该绕组进出线同时反向。同名端&a…

CCTV-TIME特别关注:首届医药港国际健康美食文化嘉年华

“食在广州 味在方舟”首届医药港国际健康美食文化嘉年华4月28日在健康方舟盛大开幕! 【央媒时代TOP中国时代周刊中国品牌万里行CCTV-TIME特别关注】健康美食、滋补靓汤、异国风情、非遗文化、治愈萌宠、灯光夜市、亲子玩乐、浪漫许愿树……五一长假,广州的这场精彩嘉年华活动…

vue3+ts+pinia+vite一次性全搞懂

vue3tspiniavite项目 一&#xff1a;新建一个vue3ts的项目二&#xff1a;安装一些依赖三&#xff1a;pinia介绍、安装、使用介绍pinia页面使用pinia修改pinia中的值 四&#xff1a;typescript的使用类型初识枚举 一&#xff1a;新建一个vue3ts的项目 前提是所处vue环境为vue3&…

springboot文件上传

1.新建文件上传页面 在static目录中新建upload-test.html&#xff0c;上传页面代码如下所示&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><title>springboot文件上传测试</title> <…

编译时报Clang SA is not enabled问题解决

报此问题应该是swap不足导致的&#xff0c;原因是用的虚拟机&#xff0c;改为16G内存问题排除 具体解决如下&#xff1a; 1.free -h 查看当前分区大小和使用情况 2.扩展分区大小 2.1首先删除系统默认分区 sudo swapoff /swapfile sudo rm /swapfile 2.2新建swap分区&#xf…

ROS学习第十六节—— 头文件与源文件

https://download.csdn.net/download/qq_45685327/87708569 1.自定义头文件调用 新建功能包 在该路径下创建头文件 /home/xiaoming/demo0/src/hello_head/include/hello_head 编写以下代码 #ifndef _HELLO_H #define _HELLO_Hnamespace hello_ns{class HelloPub {public:vo…