SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

news2024/10/6 5:57:19

SpringBoot对接kafka,批量、并发、异步获取消息,并动态、批量插入库表

 更多优秀文章,请扫码关注个人微信公众号或搜索“程序猿小杨”添加。

一、背景

        因业务发展需要,需要对接kafka,快速批量接收消息日志,避免消息日志累积过多,必须做到数据处理后,动态插入到库表(相同表结构,不同表名)下,并且还要支持批量事务提交,实现消息快速消费。(注意:源码文章最后有获取方式)

二、核心代码

2.1、开启批量、并发消费

kafka:
    bootstrap-servers: 10.1.*.*:9092     #服务器的ip及端口,可以写多个,服务器之间用“:”间隔
    producer: #生产者配置 
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer: #消费者配置
      #指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: myGroup                 #设置消费者的组id default:Group
      enable-auto-commit: true  #设置自动提交offset
      auto-commit-interval: 2000  #默认值为5000
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      #值的反序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      auto-offset-reset: latest
      max-poll-records: 2000  #批量一次最大拉取数据量 默认500
    listener:
      # poll-timeout: 1000
      type: batch  # 开启批量消费
      concurrency: 3  #指定listener 容器中的线程数,用于提高并发量
    properties:
      session:
        timeout:
          ms: 120000  #默认10000
        max:
          poll:
            interval:
              ms: 600000  #默认300000(5分钟)

       说明:type: batch  # 开启批量消费, max-poll-records: 2000,批量消费每次最多消费记录数。这里设置 max-poll-records是2000,并不是说如果没有达到2000条消息,我们就一直等待。而是说一次poll最多返回的记录数为2000。concurrency: 3  #指定listener 容器中的线程数,用于提高并发量。注意:并发量根据实际分区数决定,必须小于等于分区数,否则会有线程一直处于空闲状态。例如:设置concurrency为3,也就是将会启动3条线程进行监听,而要监听的topic有5个partition,意味着将有2条线程都是分配到2个partition,还有1条线程分配到1个partition。

2.2、多线程异步配置

    具体配置参加前面文章:SpringBoot使用@Async实现多线程异步

    注意:在启动类上需要加上注解@EnableAsync,开启异步。

2.3、redis相关配置

1、yml相关配置:

spring:
  redis:
    # 地址
    host: 127.0.0.1
    # 端口,默认为6379
    port: 6379
    # 密码
    # 连接超时时间
    timeout: 10s
    lettuce:
      pool:
        # 连接池中的最小空闲连接
        min-idle: 0
        # 连接池中的最大空闲连接
        max-idle: 8
        # 连接池的最大数据库连接数
        max-active: 8
        # #连接池最大阻塞等待时间(使用负值表示没有限制)
        max-wait: -1ms

2、RedisConfig配置

package com.wonders.config;

import com.fasterxml.jackson.annotation.JsonAutoDetect;
import com.fasterxml.jackson.annotation.PropertyAccessor;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cache.annotation.CachingConfigurerSupport;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.RedisConnectionFactory;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.serializer.Jackson2JsonRedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

/**
 * 〈自定义redis序列化方式〉
 * @author yangyalin
 * @create 2018/11/1
 * @since 1.0.0
 */
@Configuration
public class RedisConfig extends CachingConfigurerSupport {
    /**
     * @Author yangyalin
     * @Description redisTemplate序列化使用的jdkSerializeable, 存储二进制字节码(默认), 所以自定义序列化类
     * 用于存储可视化内容
     * @Date 15:07 2018/11/1
     * @Param [redisConnectionFactory]
     * @return org.springframework.data.redis.core.RedisTemplate<java.lang.Object,java.lang.Object>
     **/
    @Bean
    public RedisTemplate<Object, Object> redisTemplate(RedisConnectionFactory redisConnectionFactory){
        RedisTemplate<Object,Object> redisTemplate=new RedisTemplate();
        redisTemplate.setConnectionFactory(redisConnectionFactory);
        //使用jackson2JsonRedisSerializer替换默认序列化
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer=new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper=new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);
        //设置key和value的序列化规则
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashKeySerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }
}

2.4、动态表名

    <!--插入到kafka日志临时表中-->
    <insert id="insertMsgInfoTemp" parameterType="com.wonders.entity.KafkaMsgConfig">
      INSERT INTO ${logTableName}("EVN_LOG_ID", "TABLE_NAME", "OPERATION", "PK_VALUE1", "PK_VALUE2",
           "PK_VALUE3", "PK_VALUE4", "PK_VALUE5", "TRANS_FLAG", "PKS", "BASE_CODE", "PLA_BRANCH_CODE",
           "CREATE_TIME","MSG_PRODUCE_TIME")
      VALUES (#{id,jdbcType=VARCHAR}, #{tableName,jdbcType=VARCHAR}, #{operation,jdbcType=VARCHAR},
            #{pk1,jdbcType=VARCHAR}, #{pk2,jdbcType=VARCHAR},#{pk3,jdbcType=VARCHAR},
            #{pk4,jdbcType=VARCHAR},#{pk5,jdbcType=VARCHAR}, 'Y',
            #{pks,jdbcType=VARCHAR}, #{baseCode,jdbcType=VARCHAR},
            #{plaBranchCode,jdbcType=VARCHAR},sysdate,#{msgProduceTime,jdbcType=VARCHAR})
    </insert>

    说明:1、#{} :会根据参数的类型进行处理,当传入String类型,则会为参数加上双引号(占位符);2、${} :将参数取出不做任何处理,直接放入语句中,就是简单的字符串替换(替换符)。

2.5、sql批量提交

public void batchInsert(List<KafkaMsgInfo> kafkaMsgInfoList) throws Exception{
        //如果自动提交设置为true,将无法控制提交的条数,改为最后统一提交
        // 创建session实列
        SqlSessionFactory sqlSessionFactory = ApplicationContextUtils.getBean("sqlSessionFactory");
        // 开启批量处理模式 BATCH 、关闭自动提交事务 false
        SqlSession sqlSession = sqlSessionFactory.openSession(ExecutorType.BATCH,false);
        KafkaMsgConfigMapper KafkaMsgMapper = sqlSession.getMapper(KafkaMsgConfigMapper.class);
        int BATCH = 1000;
        for (int i = 0,size=kafkaMsgInfoList.size(); i < size; i++) {
            //循环插入 + 开启批处理模式
            KafkaMsgMapper.insertKafkaMsgInfo(kafkaMsgInfoList.get(i));
            if (i != 0 && i % BATCH == 0) {
                sqlSession .commit();
            }
        }
        // 一次性提交事务
        sqlSession.commit();
        // 关闭资源
        sqlSession.close();
    }
2.6、业务代码
 @KafkaListener(topics = {"${mykafka.topics:mytopic}"})
    public void myMQConsumer(List<String> msgList){
        log.info("接收到的消息条数size:"+msgList.size());
        //计算程序耗时时间
        StopWatch stopWatch = new StopWatch();
        // 开始计时
        stopWatch.start();
        this.getKafkaMsgAndDel(msgList);  //2、接收kafka日志并解析
        stopWatch.stop();
        log.info("本次任务耗时(秒):" + stopWatch.getLastTaskTimeMillis()/1000 + "s");
    }

三、测试结果

序号kafka数量(万条)消耗(秒)
113
21013
3100120

 

更多详细资料,请关注个人微信公众号或搜索“程序猿小杨”添加。

回复:源码,可以获取该项目对应的源码及表结构,开箱即可使用。

推荐文章:

    1、SpringBoot使用@Async实现多线程异步;

    2、SpringBoot用线程池ThreadPoolTaskExecutor异步处理百万级数据;

    3、SpringBoot用线程池ThreadPoolExecutor处理百万级数据。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/661898.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

客服岗位必备的在线工具:内部知识库

随着互联网的快速发展&#xff0c;越来越多的企业开始将客户服务转移到在线平台上&#xff0c;以便更好地满足客户需求。在线客服是这一趋势中的重要组成部分&#xff0c;它不仅能够提高客户满意度&#xff0c;还能够提高客户忠诚度和品牌形象。但是在进行在线客服工作时&#…

Vault从入门到精通系列之一:深入了解安全工具Vault、Vault根令牌和解封密钥,详细整理部署Vault的详细步骤

Vault从入门到精通系列之一&#xff1a;深入了解安全工具Vault、Vault根令牌和解封密钥&#xff0c;详细整理部署Vault的详细步骤 一、深入了解安全工具Vault二、Vault根令牌和解封密钥的含义和作用三、centos7上部署和启动Vault的流程四、vault下载地址五、安装vault六、启动V…

爬虫程序采集网络数据

目录 一、Xampp搭建本地网站 二、认识Html标签 三、爬虫程序范例 &#xff08;一&#xff09;调用模块 &#xff08;二&#xff09;加载需要爬虫的网址 &#xff08;三&#xff09;爬取内容选取 &#xff08;四&#xff09;爬取内容保存 &#xff08;五&#xff09; 完整爬…

【MySQL】不就是多表查询

前言 嗨&#xff01;小伙伴们大家好呀&#xff0c;忙碌的一周就要开始&#xff01;在此之前我们学习的MySQL数据库的各种操作都是在一张表之中&#xff0c;今天我们学习要对多张表进行相关操作&#xff0c;相比较于单一的表来说&#xff0c;多张表操作相对复杂一些&#xff0c;…

GaussDB云数据库SQL应用系列—分区表管理

目录 前言 一、分区表基本原理 二、分区表主要优势 三、分区表常见场景 四、GaussDB分区表管理&#xff08;示例&#xff09; 示例一&#xff1a;创建范围分区表(RANGE) 示例二&#xff1a;创建哈希分区表&#xff08;HASH&#xff09; 示例三&#xff1a;创建列表分区…

Python基础(8)——转换数据类型

Python基础&#xff08;8&#xff09;——转换数据类型 文章目录 Python基础&#xff08;8&#xff09;——转换数据类型目标一. 转换数据类型的作用二. 转换数据类型的函数三. 快速体验四. 实验总结 目标 数据类型转换的必要性数据类型转换常用方法 一. 转换数据类型的作用 …

【数据库原理与实践】DS系的期末考题(2023)

前排感谢 感谢在数据库期末考试中进行截图保存题目的大数据同学,给隔壁计算机同学一些小安慰呜呜 选择题 由于顺序其实无关紧要,这里遂不再按题号进行整理 答案仅是本人初步作答,可能存在错误,欢迎指出 8:B 数据的定义:数据库中存储的基本对象 9:ABC 数据库系统DBS组…

神经网络中的损失函数

在《神经网络中常见的激活函数》一文中对激活函数进行了回顾&#xff0c;下图是激活函数的一个子集—— 而在神经网络领域中的另一类重要的函数就是损失函数&#xff0c;那么&#xff0c;什么是损失函数呢&#xff1f; 损失函数是将随机事件或其有关随机变量的取值映射为非负实…

网络安全学术顶会——CCS '22 议题清单、摘要与总结(上)

注意&#xff1a;本文由GPT4与Claude联合生成。 按语&#xff1a;ChatGPT在计算机领域的翻译质量还是欠缺一些&#xff0c;翻译出来的中文有的不够自然&#xff0c;经常完全按照英文的表达方式来&#xff0c;导致中文特别长&#xff0c;很绕。GPT4的翻译效果相对ChatGPT效果要好…

第38步 深度学习图像识别:VGG19建模(Tensorflow)

基于WIN10的64位系统演示 一、写在前面 &#xff08;1&#xff09;预训练模型和迁移学习 预训练模型就像是一个精心制作的省力工具&#xff0c;它是在大量的数据上进行训练&#xff0c;然后将学习到的模型参数保存下来。然后&#xff0c;我们可以直接使用这些参数&#xff0…

gitlab_ci.yml展示单元测试报告 (FREE)

CI/CD 流水线通常包含验证您的代码的测试作业。 如果测试失败&#xff0c;流水线将失败并通知用户。处理合并请求的人必须检查作业日志并查看测试失败的地方&#xff0c;以便可以修复它们。 您可以将作业配置为使用单元测试报告&#xff0c;极狐GitLab 会显示有关合并请求的报…

一种实现Spring动态数据源切换的方法 | 京东云技术团队

1 目标 不在现有查询代码逻辑上做任何改动&#xff0c;实现dao维度的数据源切换&#xff08;即表维度&#xff09; 2 使用场景 节约bdp的集群资源。接入新的宽表时&#xff0c;通常uat验证后就会停止集群释放资源&#xff0c;在对应的查询服务器uat环境时需要查询的是生产库…

SegNeXt:重新思考语义分割中卷积注意力设计

论文链接&#xff1a;https://arxiv.org/abs/2209.08575 github&#xff1a; https://github.com/Visual-Attention-Network/SegNeXt 参考视频&#xff1a;【翻译成中文带你读】SegNext论文逐行精读&#xff0c;30分钟就能快速了解其奥秘&#xff01;-人工智能/深度学习/计算…

Triton教程---存储代理

Triton教程—存储代理 存储库代理使用在加载或卸载模型时运行的新功能扩展了 Triton。 您可以在加载模型时引入自己的代码来执行身份验证、解密、转换或类似操作。 测试版&#xff1a;存储库代理 API 是测试版质量&#xff0c;并且会针对一个或多个版本进行非向后兼容的更改。…

牛客网 2023 最新 “Java 面试八股文+各大厂的面试真题“出炉,面面俱到,太全了

一转眼 2023 年已经过了大半了&#xff0c;不知道你金三银四上岸了&#xff0c;还是等着秋招呢&#xff1f;大家从 Boss 直聘上或者其他招聘网站上都可以看到 Java 岗位众多&#xff0c;Java 岗位的招聘薪酬天差地别&#xff0c;人才要求也是五花八门。而很多 Java 工程师求职过…

css3 grid 布局

特别声明&#xff1a;这篇博客转载于阮一峰老师&#xff0c;转载是为了方便日后复习&#xff0c;实在写的太棒了。 目录 一、概述 二、基本概念 2.1 容器和项目 2.2 行和列 2.3 单元格 2.4 网格线 三、容器属性 3.1 display 属性 3.2grid-template-columns 属性&#x…

[Studio3T]无限试用

新建文本文件 echo off ECHO 重置Studio 3T的使用日期...... REG DELETE "HKEY_CURRENT_USER\Software\JavaSoft\Prefs\3t\mongochef\enterprise" /f RMDIR /s /q %USERPROFILE%\.3T\studio-3t\soduz3vqhnnja46uvu3szq-- RMDIR /s /q %USERPROFILE%\.3T\studio-3t\L…

数字电路基础---触发器

数字电路基础---触发器 触发器&#xff08;Flip-Flop&#xff09;也是数字电路中的一种具有记忆功能的逻辑元件。触发器对脉冲边沿敏感的存储单元电路&#xff0c;它只在触发脉冲的上升沿&#xff08;或下降沿&#xff09;瞬间改变其状态。在数字电路中可以记录二进制数字信号“…

crfclust.bdb过大

有套11204集群环境&#xff0c;现场反馈/u01使用率100%&#xff0c;数据库无法使用了&#xff0c;本以为是aud文件太多导致的&#xff0c;查看后发现是crfclust.bdb多大了&#xff0c;有100多G了 [roothydb1 hydb1]#du -sh crfclust.bdb 101G crfclust.bdb [roothydb1 hydb…

31个最佳 JavaScript 片段

这里有 20 个有用的 JavaScript 片段&#xff0c;可以在您处理项目时为您提供帮助&#xff1a; 1.获取当前日期和时间&#xff1a; const now new Date(); 2. 检查变量是否为数组&#xff1a; Array.isArray(variable); 3.合并两个数组&#xff1a; const newArray array1.co…