kafka数据拉取和发送

news2025/2/26 14:08:16

文章目录

  • 一、原生 KafkaConsumer
    • 1、pom文件引入kafka
    • 2、拉取数据
    • 3、发送数据
  • 二、在spring boot中使用@KafkaListener
    • 1、添加依赖
    • 2、application.yml
    • 3、消息拉取:consumer
    • 4、自定义ListenerContainerFactory
    • 5、消息发送:producer
    • 6、kafka通过clientId鉴权时的鉴权失败问题

一、原生 KafkaConsumer

1、pom文件引入kafka

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.12</artifactId>
</dependency>

2、拉取数据

简单说只要以下几个步骤:
1、获取kafka地址,并设置Properties
2、获取consumer:KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
3、订阅topic:consumer.subscribe(topic);
4、拉取数据:consumer.poll()
5、遍历数据
6、示例:

package com.yogi.test.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.springframework.beans.factory.InitializingBean;
import java.util.Properties;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.Node;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.common.serialization.StringSerializer;

@Component
public class TestMsgConsumer implements InitializingBean {
   

    @Value("${test.kafka.address:127.0.0.1:9092}")
    private String kafkaAddress;
    @Value("${test.kafka.msg.topic:topic_test_1,topic_test_2}")
    private String msgTopic;
    @Value("${test.consumer.name:yogima}")
    private String consumerGroupId;

    /**
     * 消费开关: true-消费,false-暂停消费
     * 在服务正常停止时用于停止继续消费数据,将缓存中的数据发送完即可
     */
    private Boolean consumeSwitch = true;

    public void consumerMessage(List<String> topic, String groupId) {
   
        LOGGER.info("consumer topic list1:{}",topic.toString());
        Properties props = new Properties();
        /**
         * 指定一组host:port对,用于创建与Kafka broker服务器的Socket连接,可以指定多组,使用逗号分隔,对于多broker集群,只需配置
         * 部分broker地址即可,consumer启动后可以通过这些机器找到完整的broker列表
         */
        LOGGER.info("test.kafka.address:{}",kafkaAddress);
        props.put("bootstrap.servers", kafkaAddress);
        /**
         * 指定group名字,能唯一标识一个consumer group,如果不显示指定group.id会抛出InvalidGroupIdException异常,通常为group.id
         * 设置一个有业务意义的名字即可
         */
        props.put("group.id", groupId);
        /**
         * 自动提交位移
         */
        props.put("enable.auto.commit", Boolean.TRUE);
        /**
         * 位移提交超时时间
         */
        props.put("auto.commit.interval.ms", "1000");
        /**
         * 从最早的消息开始消费
         * 1,earliest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
         * 2,latest 当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
         */
        props.put("auto.offset.reset", "latest");
        /**
         * 指定消费解序列化操作。consumer从broker端获取的任何消息都是字节数组的格式,因此需要指定解序列化操作才能还原为原本对象,
         * Kafka对绝大部分初始类型提供了解序列化器,consumer支持自定义解序列化器org.apache.kafka.common.serialization.Deserializer
         * org.apache.kafka.common.serialization.ByteArrayDeserializer
         * StringDeserializer
         */
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        /**
         * 对消息体进行解序列化,与key解序列化类似
         */
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //一次从kafka中poll出来的数据条数,max.poll.records条数据需要在在session.timeout.ms这个时间内处理完
        props.put("max.poll.records", "500");
        //fetch.message.max.bytes (默认 1MB) – 消费者能读取的最大消息。这个值应该大于或等于message.max.bytes。
        props.put("fetch.message.max.bytes", "300000000");

        KafkaConsumer<String, String> consumer;

        try{
   
            /**
             * 通过Properties实例对象构建KafkaConsumer对象,可同时指定key、value序列化器
             */
            LOGGER.info("start set consumer,props:{}",props.toString());
            consumer = new KafkaConsumer<>(props);
            LOGGER.info("set consumer finished");
            /**
             * 订阅consumer group需要消费的topic列表
             */
            LOGGER.info("consumer topic list:{}",topic.toString());
            consumer.subscribe(topic);
        }catch (Exception e){
   
            LOGGER.info("consumer subscribe failed,msg:{},cause:{},e:{}",e.getMessage(),e.getCause(),e);
            return;
        }

        /**
         * 并行从订阅topic获取多个分区消息,为此新版本consumer的poll方法使用类似Linux的 selec I/O机制,
         * 所有相关的事件都发生在一个事件循环中,这样consuner端只使用一个线程就能完成所有类型I/o操作
         */
        try {
   
            while (true) {
   
                if (!consumeSwitch) {
   
                    try {
   
                        Thread.sleep(30000);
                    } catch (InterruptedException e) {
   
                        LOGGER.error("err msg:" + e.getMessage());
                    }
                }
                /**
                 * 指定超时时间,通常情况下consumer拿到了足够多的可用数据,会立即从该方法返回,但若当前没有足够多数据
                 * consumer会处于阻塞状态,但当到达设定的超时时间,则无论数据是否足够都为立即返回
                 */
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1L));
                /**
                 * poll调用返回ConsumerRecord类分装的Kafka消息,之后会根据自己业务实现信息处理,对于consumer而言poll方法
                 * 返回即认为consumer成功消费了消息
                 */
                for (ConsumerRecord<String, String> record : records) {
   
                    LOGGER.debug("offset = {}, key = {}, value = {}"

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2306397.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多智能体框架

多个不同的角色的Agent&#xff0c;共同完成一份复杂的工作。由一个统筹管理的智能体&#xff0c;自主规划多个智能体分别做什么&#xff0c;以及执行的顺序。 agent 应该包含的属性 执行特定任务 根据其角色和目标做出决策 能够使用工具来实现目标 与其他代理沟通和协作 保留…

C#中级教程(1)——解锁 C# 编程的调试与错误处理秘籍

一、认识错误&#xff1a;编程路上的 “绊脚石” 在 C# 编程中&#xff0c;错误大致可分为两类&#xff1a;语法错误和语义错误&#xff08;逻辑错误&#xff09;。语法错误就像是写作文时的错别字和病句&#xff0c;编译器一眼就能识别出来&#xff0c;比如变量名拼写错误、符…

Jmeter接口并发测试

Apache JMeter 是一款开源的性能测试工具&#xff0c;广泛用于接口并发测试、负载测试和压力测试。以下是使用 JMeter 进行接口并发测试的详细步骤&#xff1a; 一、准备工作 安装 JMeter 下载地址&#xff1a;Apache JMeter 官网 确保已安装 Java 环境&#xff08;JMeter 依…

MySQL-增删改查

一、Create(创建) &#x1f4d6; 语法&#xff1a; INSERT INTO table_name(value_list); 当我们使用表的时候&#xff0c;就可以使用这个语法来向表中插入元素~ 我们这边创建一个用于示范的表(Student)~ create table student( id int, name varchar(20), chinese int, math…

开源堡垒机 JumpServer 社区版实战教程:发布机的配置与Website资产配置使用

文章目录 开源堡垒机 JumpServer 社区版实战教程&#xff1a;发布机的配置与Website资产配置使用一、功能简述二、应用发布机2.1 版本要求2.2 创建应用发布机2.2.1 通过WinRM的协议进行应用发布机的创建2.2.2 通过OpenSSH的协议进行应用发布机的创建2.2.2.1 下载OpenSSH2.2.2.2…

代码随想录算法训练day64---图论系列8《拓扑排序dijkstra(朴素版)》

代码随想录算法训练 —day64 文章目录 代码随想录算法训练前言一、53. 117. 软件构建—拓扑排序二、47. 参加科学大会---dijkstra&#xff08;朴素版&#xff09;总结 前言 今天是算法营的第64天&#xff0c;希望自己能够坚持下来&#xff01; 今天继续图论part&#xff01;今…

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷(四)

2024-2025 学年广东省职业院校技能大赛 “信息安全管理与评估”赛项 技能测试试卷&#xff08;四&#xff09; 第一部分&#xff1a;网络平台搭建与设备安全防护任务书第二部分&#xff1a;网络安全事件响应、数字取证调查、应用程序安全任务书任务 1&#xff1a;应急响应&…

单片机的串口(USART)

Tx - 数据的发送引脚&#xff0c;Rx - 数据的接受引脚。 串口的数据帧格式 空闲状态高电平&#xff0c;起始位低电平&#xff0c;数据位有8位校验位&#xff0c;9位校验位&#xff0c;停止位是高电平保持一位或者半位&#xff0c;又或者两位的状态。 8位无校验位传输一个字节…

动态规划(背包问题)--是否逆序使用的问题--二进制拆分的问题

动态规划&#xff08;背包问题&#xff09; 题目链接01背包代码 完全背包问题代码 多重背包问题 I代码 什么时候适用逆序多重背包问题 II&#xff08;超百万级的复杂度&#xff09;代码 关于二进制拆分 题目链接 01背包 代码 #include <iostream> #include <vector&…

Mac 版 本地部署deepseek ➕ RAGflow 知识库搭建流程分享(附问题解决方法)

安装&#xff1a; 1、首先按照此视频的流程一步一步进行安装&#xff1a;(macos版&#xff09;ragflowdeepseek 私域知识库搭建流程分享_哔哩哔哩_bilibili 2、RAGflow 官网文档指南&#xff1a;https://ragflow.io 3、RAGflow 下载地址&#xff1a;https://github.com/infi…

姿态矩阵/旋转矩阵/反对称阵

物理意义&#xff0c;端点矢量角速率叉乘本身向量&#xff1b; 负号是动系b看固定系i是相反的&#xff1b; 一个固定 在惯性导航解算中&#xff0c;旋转矢量的叉乘用于描述姿态矩阵的微分方程。你提到的公式中&#xff0c; ω i b b \boldsymbol{\omega}_{ib}^b \times ωibb…

【大语言模型】【整合版】DeepSeek 模型提示词学习笔记(散装的可以看我之前的学习笔记,这里只是归纳与总结了一下思路,内容和之前发的差不多)

以下是个人笔记的正文内容: 原文在FlowUs知识库上&#xff0c;如下截图。里面内容和这里一样&#xff0c;知识排版好看一点 一、什么是 DeepSeek 1. DeepSeek 简介 DeepSeek 是一家专注于通用人工智能&#xff08;AGI&#xff09;的中国科技公司&#xff0c;主攻大模型研发与…

ollama无法通过IP:11434访问

目录 1.介绍 2.直接在ollama的当前命令窗口中修改&#xff08;法1&#xff09; 3.更改ollama配置文件&#xff08;法2&#xff09; 3.1更新配置 3.2重启服务 1.介绍 ollama下载后默认情况下都是直接在本地的11434端口中运行&#xff0c;绑定到127.0.0.1(localhost)&#x…

Bugku CTF CRYPTO

Bugku CTF CRYPTO 文章目录 Bugku CTF CRYPTO聪明的小羊ok[-<>]散乱的密文.!? 聪明的小羊 描 述: 一只小羊翻过了2个栅栏 fa{fe13f590lg6d46d0d0} 分 析&#xff1a;栅栏密码&#xff0c;分2栏&#xff0c;一个栏里有11个 ①手动解密 f a { f e 1 3 f 5 9 0 l g 6 d 4 …

【洛谷】【ARC100E】Or Plus Max(高维前缀和)

传送门&#xff1a;Or Plus Max 高维前缀和 题目描述 長さ 2N の整数列 A0​, A1​, ..., A2N−1​ があります。&#xff08;添字が 0 から始まることに注意&#xff09; 1 ≤ K ≤ 2N−1 を満たすすべての整数 K について、次の問題を解いてください。 i,j を整数と…

SmolLM2:多阶段训练策略优化和高质量数据集,小型语言模型同样可以实现卓越的性能表现

SmolLM2 采用创新的四阶段训练策略&#xff0c;在仅使用 1.7B 参数的情况下&#xff0c;成功挑战了大型语言模型的性能边界&#xff1a; 在 MMLU-Pro 等测试中超越 Qwen2.5-1.5B 近 6 个百分点数学推理能力&#xff08;GSM8K、MATH&#xff09;优于 Llama3.2-1B在代码生成和文…

《Effective Objective-C》阅读笔记(中)

目录 接口与API设计 用前缀避免命名空间冲突 提供“全能初始化方法” 实现description方法 尽量使用不可变对象 使用清晰而协调的命名方式 方法命名 ​编辑类与协议命名 为私有方法名加前缀 理解OC错误模型 理解NSCopying协议 协议与分类 通过委托与数据源协议进行…

Hbase客户端API——语句大全

目录 创建表&#xff1a; 插入数据&#xff1a; 删除数据&#xff1a; 修改数据&#xff1a; 查询数据&#xff1a;Get 查询数据&#xff1a;Scan 查询数据&#xff1a;过滤查询 创建表&#xff1a; 检验&#xff1a; 插入数据&#xff1a; 验证 一次多条数据插入 验证&…

MQ(Message Queue)

目录 MQ(Message Queue)基本概念 为什么要使用消息队列&#xff1f; 使用消息队列有什么缺点&#xff1f; 如何保证消息不丢失?(如何保证消息的可靠性传输?/如何处理消息丢失的问题?) 通用的MQ场景&#xff1a; RabbitMQ如何保证消息不丢失&#xff1f; 生产者丢数据…

计算机网络————(三)

前文二 前文一 Websocket协议 是一种存在TCP协议之上的协议 当客户端需要了解服务器是否更新就需要不断给客户端发送请求询问是否更新&#xff0c;这行会造成服务端压力很大 而Websocket相当于服务器一旦更新了就会给客户端发送消息表明自己更新了&#xff0c;类似客户端订阅…