kafka客户端调用

news2024/12/13 9:08:56

kafka客户端调用

  • springboot整合kafka
  • java调用kafka
  • 其他问题

springboot整合kafka

  • 手动提交需要在配置文件配置kafka属性 kafka.listener.ack-mode: manual
    
    @Component
    public class MyKafkaListener {
    
        @Autowired
        private SaslClient saslClient;
        //监听所有分区
        @KafkaListener(topics ={ "主题" },groupId = "消费组")
        //监听指定分区
    //    @KafkaListener(
    //            topicPartitions ={
    //                    @TopicPartition(topic = "主题"
                partitionOffsets = {
                        @PartitionOffset(partition = "3", initialOffset = "323"),
                        @PartitionOffset(partition = "4", initialOffset = "6629")
                }
    //   ) },groupId = "消费组")
        public void onMessage(ConsumerRecord<String, String> record, Acknowledgment  ack) {
    
            try {
                saslClient.consume(record);
            } catch (Exception e) {
                e.printStackTrace();
            } finally{
              ack.acknowledge();//手动提交 
            }
        }
    
    }
    
  • yml增加配置
    kafka:
      listener:
        ack-mode: manual # 配置手动提交
      bootstrap-servers: # 服务组
      consumer:
        isolation-level: read-committed
        enable-auto-commit: false #关闭自动提交
        #auto-commit-interval: 1000
        auto-offset-reset: earliest #当各分区下无初始偏移量或偏移量不再可用时,从最早的消息记录开始读取
        key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        max-poll-records: 2
      properties:
         #安全认证
        security:
          protocol: SASL_PLAINTEXT
        sasl:
          mechanism: SCRAM-SHA-512
          jaas:
            config: org.apache.kafka.common.security.scram.ScramLoginModule required username="用户" password="密码";  
        session:
          timeout:
            ms: 24000
        max:
          poll:
            interval:
              ms: 30000
    

java调用kafka

  //主题
    @Value("${kafakaData.topic}")
    private String topic;

    //消费
    @Value("${kafkaData.group}")
    private String group;

    @Value("${kafkaData.jaas}")
    private String jaas;

    @Value("${kafkaData.key}")
    private String key;

    @Value("${kafkaData.brokers}")
    private String brokers; //需使用安全接入点的地址

    public void consume() throws Exception {
        Properties properties = new Properties();
        properties.put("security.protocol", "SASL_PLAINTEXT");
        properties.put("sasl.mechanism", "SCRAM-SHA-512");
        properties.put("bootstrap.servers", brokers);
        properties.put("group.id", group);
        properties.put("enable.auto.commit", "false");
        properties.put("auto.offset.reset", "earliest");
        properties.put("max.poll.records", 2); //每次poll的最大数量
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("sasl.jaas.config", jaas);

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        // 重置消费者的偏移量到最早的偏移量
        consumer.subscribe(Arrays.asList(topic));
        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(3000));//deprecated
            System.out.printf("poll records size = %d%n", records.count());
            try{
                for (ConsumerRecord<String, String> record : records) {
                    //对加密数据解密
                    String publicDecrypt = RSAUtil.publicDecrypt(record.value(), RSAUtil.getPublicKey(key));
                    JSONObject jsonObject = JSONObject.parseObject(publicDecrypt);
                    String msg = jsonObject.getString("msg");
                    String page = jsonObject.getString("page");
                    String size = jsonObject.getString("size");
                    String time = jsonObject.getString("time");
                    String total = jsonObject.getString("total");
                    String type = jsonObject.getString("type");
                    String operation = jsonObject.getString("operation");
                    //todo 业务处理
                }

            }catch (Exception e){
                e.printStackTrace();
            }finally {
                consumer.commitAsync();//手动提交
            }
        }
    }

其他问题

  • 每次消费一条数据必须提交,否则会影响分区,导致偏移量错位,后面就消费不到数据了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2258697.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

FireFox火狐浏览器企业策略禁止更新

一直在用火狐浏览器&#xff0c;但是经常提示更新&#xff0c;进入浏览器右上角就弹出提示&#xff0c;比较烦。多方寻找&#xff0c;一直没有找到合适的方案&#xff0c;毕竟官方没有给出禁用检查更新的选项&#xff0c;甚至about:config里都没有。 最终找到了通过企业策略控…

鲲鹏麒麟安装Kafka-v1.1.1

因项目需要在鲲鹏麒麟服务器上安装Kafka v1.1.1&#xff0c;因此这里将安装配置过程记录下来。 环境说明 # 查看系统相关详细信息 [roottest kafka_2.12-1.1.1]# uname -a Linux test.novalocal 4.19.148 #1 SMP Mon Oct 5 22:04:46 EDT 2020 aarch64 aarch64 aarch64 GNU/Li…

EFAK kafka可视化管理工具部署使用

简介&#xff1a;EFAK是开源的可视化和管理软件。它允许您查询、可视化、提醒和探索您的指标&#xff0c;无论它们存储在何处。简单来说&#xff0c;它为您提供了将 Kafka 集群数据转换为漂亮的图形和可视化效果的工具。 环境&#xff1a;①操作系统&#xff1a;CentOS7.6&…

YOLOv11改进,YOLOv11添加引入U-Netv2分割网络中SDI信息融合模块+GSConv卷积,助力小目标

# 理论介绍 完成本篇需要参考以下两篇文章,并已添加到YOLOv11代码中 YOLOv11改进,YOLOv11添加GSConv卷积+Slim-neck,助力小目标检测,二次创新C3k2结构YOLOv11改进,YOLOv11添加U-Netv2分割网络中SDI信息融合模块,助力小目标检测下文都是手把手教程,跟着操作即可添加成功…

Linux dd命令读写flash之误区

1. 问题 通常在Linux系统上需使用dd命令读写flash设备&#xff0c;个人最近调试了一款spi-nor flash芯片&#xff0c;分区分配了8MB大小的分区&#xff0c;是用dd命令验证读写flash时&#xff0c;出现校验失败。 使用如下命令读写8KB数据就会出现校验数据失败 time dd if/dev…

六、nginx负载均衡

负载均衡&#xff1a;将四层或者七层的请求分配到多台后端的服务器上。 从而分担整个业务的负载。提高系统的稳定性&#xff0c;也可以提高高可用&#xff08;备灾&#xff0c;其中一台后端服务器如果发生故障不影响整体业务&#xff09;. 负载均衡的算法 round robin 轮询 r…

Python使用Selenium库获取 网页节点元素、名称、内容的方法

我们要用到一些网页源码信息&#xff0c;例如获取一些节点的class内容&#xff0c; 除了使用Beautifulsoup来解析&#xff0c;还可以直接用Selenium库打印节点&#xff08;元素&#xff09;名称&#xff0c;用来获取元素的文本内容或者标签名。 例如获取下面的class的内容&am…

scala的泛型2

package test55 //隐式转换 //1.隐式函数 //2.隐式类 //3.隐式对象 //4.函数的隐式参数//泛型&#xff1a;类型参数化。 //Pair 约定一对数据 class Pair[T](var x:T, var y:T) //泛型的应用场景&#xff1a; //1.泛型函数 //2.泛型类 //3.泛型特质 object test2 {def main(arg…

服务器数据恢复—热备盘上线过程中硬盘离线导致raid5阵列崩溃的数据恢复案例

服务器数据恢复环境&#xff1a; 两组分别由4块SAS接口硬盘组建的raid5阵列&#xff0c;两组raid5阵列划分LUN并由LVM管理&#xff0c;格式化为EXT3文件系统。 服务器故障&#xff1a; RAID5阵列中一块硬盘未知原因离线&#xff0c;热备盘自动激活上线替换离线硬盘。在热备盘上…

基于Qwen2-VL模型针对LaTeX OCR任务进行微调训练 - 数据处理

基于Qwen2-VL模型针对LaTeX OCR任务进行微调训练 - 数据处理 flyfish 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_LoRA配置如何写 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_单图推理 基于Qwen2-VL模型针对LaTeX_OCR任务进行微调训练_-_原模型_单图推理 基于Q…

华为开源自研AI框架昇思MindSpore应用案例:基于MindSpore框架的SGD优化器案例实现

SGD优化器基本原理讲解 随机梯度下降&#xff08;SGD&#xff09;是一种迭代方法&#xff0c;其背后基本思想最早可以追溯到1950年代的Robbins-Monro算法&#xff0c;用于优化可微分目标函数。 它可以被视为梯度下降优化的随机近似&#xff0c;因为它用实际梯度&#xff08;从…

【C++】闰年判断问题完整解析与代码优化

博客主页&#xff1a; [小ᶻ☡꙳ᵃⁱᵍᶜ꙳] 本文专栏: C 文章目录 &#x1f4af;前言&#x1f4af;题目描述&#x1f4af;我的解法分析 &#x1f4af;老师解法分析代码 1&#xff08;未优化版本&#xff09;分析 代码 2&#xff08;优化版本&#xff09;分析 &#x1f4af…

云和恩墨 zCloud 与华为云 GaussDB 完成兼容性互认证

近日&#xff0c;云和恩墨&#xff08;北京&#xff09;信息技术有限公司&#xff08;以下简称&#xff1a;云和恩墨&#xff09;的多元数据库智能管理平台 zCloud 与华为云计算技术有限公司&#xff08;以下简称&#xff1a;华为云&#xff09;的 GaussDB 数据库完成了兼容性互…

HtmlRAG开源,RAG系统联网搜索能力起飞~

网络是RAG系统中使用的主要外部知识来源&#xff0c;许多商业系统&#xff0c;如ChatGPT和Perplexity&#xff0c;都使用网络搜索引擎作为他们的主要检索系统。传统的RAG系统将网页的HTML内容转换为纯文本后输入LLM&#xff0c;这会导致结构和语义信息的丢失。 HTML转换为纯文…

Android显示系统(10)- SurfaceFlinger内部结构

一、前言&#xff1a; 之前讲述了native层如何使用SurfaceFlinger&#xff0c;我们只是看到了简单的API调用&#xff0c;从本文开始&#xff0c;我们逐步进行SurfaceFlinger内部结构的分析。话不多说&#xff0c;莱茨狗~ 二、类图&#xff1a; 2.1、总体架构&#xff1a; 先…

PPT技巧:将幻灯片里的图片背景设置为透明

在PPT中添加了图片&#xff0c;想要将图片中的背景设置为透明或者想要抠图&#xff0c;有什么方法吗&#xff1f;今天分享两个方法。 方法一&#xff1a; 添加图片&#xff0c;选中图片之后&#xff0c;点击【图片格式】功能&#xff0c;点击最左边的【删除背景】 PPT会自动帮…

以太网链路详情

文章目录 1、交换机1、常见的概念1、冲突域2、广播域3、以太网卡1、以太网卡帧 4、mac地址1、mac地址表示2、mac地址分类3、mac地址转换为二进制 2、交换机的工作原理1、mac地址表2、交换机三种数据帧处理行为3、为什么会泛洪4、转发5、丢弃 3、mac表怎么获得4、同网段数据通信…

CNCF云原生生态版图

CNCF云原生生态版图 概述什么是云原生生态版图如何使用生态版图 项目和产品&#xff08;Projects and products&#xff09;会员&#xff08;Members&#xff09;认证合作伙伴与提供商&#xff08;Certified partners and providers&#xff09;无服务&#xff08;Serverless&a…

每日计划-1212

1. 完成 SQL1 查询所有列 2. 八股部分 1) C 中多态性在实际项目中的应用场景有哪些&#xff1f; &#xff08;1&#xff09;图形绘制系统 场景描述&#xff1a;在一个图形绘制软件中&#xff0c;可能有多种图形&#xff0c;如圆形、矩形、三角形等。每种图形都需要有自己的…

Autosar CP RTE:一个例子简要介绍工作原理

以下是一个示例&#xff0c;展示如何通过AUTOSAR的RTE机制利用配置&#xff08;ARXML文件&#xff09;来实现软件组件集成&#xff0c;包含对应的C源代码以及模拟自动生成的RTE框架代码的示例。请注意&#xff0c;实际的AUTOSAR项目会复杂得多&#xff0c;这里只是一个简化且示…