【Kafka】SASL认证的Kafka客户端代码示例(spring-kafka和原生客户端)

news2025/1/10 16:00:41

文章目录

  • spring-kafka
  • 原生客户端
  • Tips

spring-kafka

添加依赖:

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
            <version>2.6.3</version>
        </dependency>

添加spring-kafka相关配置:

#=============== 集群通用配置 ================
spring.kafka.bootstrap-servers=kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092
spring.kafka.security.protocol=SASL_PLAINTEXT
spring.kafka.properties.sasl.mechanism=PLAIN
spring.kafka.properties.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="zhurunhua" password="pwd";

#=============== producer  =================
spring.kafka.producer.retries=5
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=1000
spring.kafka.producer.buffer-memory=1000000
# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer  ==================
spring.kafka.consumer.group-id=zhurunhua-test-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

消费者:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"zhurunhua-test-topic"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}

topic可以从配置文件读取,代码中通过${}的方式获取配置的topic:

@Component
public class TestPlainConsumer {

    @KafkaListener(topics = {"${kafka.subscribe.topic}"})
    public void consumer(ConsumerRecord<String, String> record) {
        System.out.println(record.value());
    }

}

生产者:

@Component
public class TestPlainProducer {

    @Autowired
    private KafkaTemplate<String, String> kafkaTemplate;

    @Value("${kafka.subscribe.topic}")
    private String topic;

    public void send(String message) {
        ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topic, message);
        future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
            @Override
            public void onFailure(Throwable throwable) {
                System.err.println(throwable);
            }

            @Override
            public void onSuccess(SendResult<String, String> sendResult) {
                System.out.println("发送结果:" + sendResult);
            }
        });
    }
}

原生客户端

引入依赖

    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>2.6.0</version>
      <scope>compile</scope>
    </dependency>

客户端代码

public class TestSaslClient {

    private final static String TOPIC = "zhurunhua-test-topic";

    private final static String BROKERS = "kafka-dyskevxt-headless.kafka-uat.svc.xke.test.xdf.cn:29092";

    private static KafkaConsumer<String, String> consumer;

    private static KafkaProducer<String, String> producer;

    static {
        Properties c = initConfig();
        consumer = new KafkaConsumer<>(c);
        producer = new KafkaProducer<>(c);
    }

    /**
     * 初始化配置
     *
     * @return java.util.Properties
     * @date 2023/04/17 5:51 下午
     */
    public static Properties initConfig() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BROKERS);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "zhurunhua-test-1");

        // SASL/PLAIN 认证配置
        props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, SecurityProtocol.SASL_PLAINTEXT);
        props.put(SaslConfigs.SASL_MECHANISM, PlainSaslServer.PLAIN_MECHANISM);
        props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"zhurunhua\" password=\"pwd\";");

        // 可选设置属性
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest");
        // 自动提交offset,每1s提交一次
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, 1000);
        return props;
    }

    /**
     * 消费者示例
     *
     * @date 2023/04/17 5:51 下午
     */
    public void subscribe() {
        consumer.subscribe(Collections.singleton(TOPIC));
        try {
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(1));
                for (ConsumerRecord<String, String> record : records) {
                    System.out.println(record);
                }
            }
        } catch (Exception e) {
            System.out.println("consumer error : " + e);
        } finally {
            consumer.close();
        }
    }

    /**
     * 生产者示例
     *
     * @date 2023/04/17 5:52 下午
     */
    public void send() {
        ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC, 0, "zhurunhua", "测试消息");
        producer.send(record, (recordMetadata, e) -> {
            if (Objects.nonNull(e)) {
                System.out.println("send error: " + e);
            } else {
                System.out.println(recordMetadata);
            }
        });
    }
}

Tips

  1. SecurityProtocol定义在枚举:org.apache.kafka.common.security.auth.SecurityProtocol中:

  1. sasl.mechanism配置项有:
    1. GSSAPI
    2. PLAIN
    3. SCRAM-SHA-256
    4. SCRAM-SHA-512
    5. OAUTHBEARER

更多安全和认证相关资料,参考:https://kafka.apache.org/documentation/#security_overview

  1. 若使用ssl,相关配置示例:

    spring.kafka.ssl.trust-store-type=JKS
    spring.kafka.ssl.key-store-type=JKS
    spring.kafka.ssl.protocol=SSL
    spring.kafka.ssl.key-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.trust-store-password=rxu4G5kPyAqTkERk
    spring.kafka.ssl.key-store-location=/kafka-jks/keystore.jks
    spring.kafka.ssl.trust-store-location=/kafka-jks/keystore.jks
    
  2. 使用不同的sasl.mechanism时,需要注意sasl.jaas.config的配置中,LoginModule不同:

    1. PLAIN对应的是:PlainLoginModule
    2. SCRAM相关对应的是:ScramLoginModule
  3. kafka认证的参数相对来说比较复杂,需要理解每个参数的含义,错一个就会失败,启动日志会打印客户端配置,可用于协助排查问题

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/440798.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

pytorch 39 yolov5_obb的onnx部署及其优化

进行部署要求配置opencv和onnxruntime环境,这里不累述。 1、模型导出 yolov5_obb项目的使用可以参考:https://hpg123.blog.csdn.net/article/details/129366477 下载yolov5s_csl_dotav1_best.pt,并执行以下命令,得到yolov5s_csl_dotav1_best.onnx python export.py --we…

【Java】文件类 File 中的文件操作与文件读写

文件操作 File 类 属性 修饰符及类型属性说明static StringpathSeparator路径分隔符&#xff0c;String 类型表示static charpathSeparator路径分隔符&#xff0c;char 类型表示 构造方法 方法签名说明File(File parent, String child)根据父目录 File 对象 孩子路径&…

JavaSE学习进阶day05_01 Collection集合

第九章 Collection集合 9.1 集合概述 在前面基础班我们已经学习过并使用过集合ArrayList<E> ,那么集合到底是什么呢? 集合&#xff1a;集合是java中提供的一种容器&#xff0c;可以用来存储多个数据。 集合和数组既然都是容器&#xff0c;它们有什么区别呢&#xff…

华特转债上市价格预测

华特转债 基本信息 转债名称&#xff1a;华特转债&#xff0c;评级&#xff1a;AA-&#xff0c;发行规模&#xff1a;6.46亿元。 正股名称&#xff1a;华特气体&#xff0c;今日收盘价&#xff1a;93.75元&#xff0c;转股价格&#xff1a;84.22元。 当前转股价值 转债面值 / …

如何成为一名优秀的自动化测试开发工程师?

目录 前言 精通编程语言 掌握自动化测试框架 熟悉测试方法和流程 熟练使用测试工具 具备团队协作能力 学习新技术和工具 以下是更为具体的建议&#xff1a; 总结 前言 自动化测试作为软件测试领域中发展最快的一个分支&#xff0c;已经成为了许多企业提升软件质量和效…

C语言断言函数的应用

对于断言&#xff0c;相信大家都不陌生&#xff0c;大多数编程语言也都有断言这一特性。简单地讲&#xff0c;断言就是对某种假设条件进行检查。 在 C 语言中&#xff0c;断言被定义为宏的形式&#xff08;assert(expression)&#xff09;&#xff0c;而不是函数&#xff0c;其…

vue element-ui web端 引入高德地图,并获取经纬度

发版前接到一个临时新需求 &#xff0c;需要在web端地址选择时用地图&#xff0c;并获取经纬度。 临阵发版之际加需求&#xff0c;真的是很头疼&#xff0c;于是赶紧找度娘&#xff0c;找api。 我引入的是高德地图&#xff0c;首先要去申请key &#xff0c; 和密钥&#xff0c;…

【Typora-使用手册】Typora使用手册 常用设置 常用快捷键

【Typora-使用手册】Typora使用手册 & 常用设置 & 常用快捷键 1&#xff09;Typora简介1.1.Typora是什么1.2.下载地址1.3.下载注意事项 2&#xff09;Markdown语法总结2.1.标题编写2.1.1.大标题2.1.2.小标题 2.2.单选框2.3.删除线2.4.表情包2.5.字体加粗2.6.斜体2.7.表…

4.HDFS概述

如果说HDFS是存储,则Yarn就是cpu和内存,mapreduce就是程序。 1.HDFS文件块大小 HDFS中的文件在物理.上是分块存储(Block) ,block默认保存3份块的大小可以通过配置参数(dfs blocksize)来规定,默认大小在Hadoop2 .x版本中是128M,老版本中是64M。 解释:块的大小:10ms*100*…

分布式链路追踪—SkyWalking

文章目录 1. 总览2. 为什么要使用分布式链路追踪3. 了解OpenTracingOpenTracing数据模型 4. 使用分布式链路追踪的好处5. SkyWalking相关问题思考5.1 如何自动采集数据5.2 如何跨进程传递5.3 traceId如何保证全局唯一5.4 请求量大&#xff0c;采集数据对性能的影响 1. 总览 2. …

ES6 总结

概述 笔记内容为参考《JavaScript 高级程序设计 (第4版)》相关内容进行 ES6 部分知识的总结。主要涉及的知识是变量声明、对象解构、函数和对象的扩展内容、集合引用类型的扩展和面向对象编程等。 ES6 学习系列笔记 ES6 总结Symbol、Map、SetES6 中的类&#xff08;class&am…

LeetCode刷题集(三)(26 删除有序数组中的重复项)

学习目标&#xff1a; 基本掌握LeetCode中的26删除有序数组中的重复项 学习内容&#xff1a;LeetCode 26删除有序数组中的重复项 题目描述&#xff1a; 给你一个 升序排列 的数组 nums &#xff0c;请你 原地 删除重复出现的元素&#xff0c;使每个元素 只出现一次 &#xff0c…

刘二大人《Pytorch深度学习实践》第十一讲卷积神经网络(高级篇)

文章目录 Inception-v1实现Skip Connect实现 Inception-v1实现 Inception-v1中使用了多个11卷积核&#xff0c;其作用&#xff1a; (1)在大小相同的感受野上叠加更多的卷积核&#xff0c;可以让模型学习到更加丰富的特征。传统的卷积层的输入数据只和一种尺寸的卷积核进行运算&…

windows系统本地批量预览svg图标

一、为何需要此操作 目前前端使用图标大致分为两类&#xff1a; iconfont方式&#xff1a;通过引入在线或者下载到本地的iconfont.css类文件实现显示图标第二类是封装图标组件&#xff0c;通过传入指定的svg名称快速生成图标 目前第二种是比较方便的&#xff0c;不需要频…

【记录】Truenas Scale|中危漏洞,需要SMB签名

部分内容参考&#xff1a;等保测试问题——需要SMB签名(SMB Signing not Required) 以及 ChatGPT。 Truenas常用SMB服务&#xff0c;但默认并不开启SMB签名。这样具有中间人攻击的风险。 一、漏洞详情 1.1 漏洞报告 漏洞提示如下&#xff1a; 1.2 漏洞介绍 SMB是一个协议名…

Mybatis-Plus -01 Mybatis-Plus入门

Mybatis-Plus入门 1 Mybatis-Plus1.1 Mybatis-Plus简介1.2 Mybatis-Plus特性1.3 Mybatis-Plus框架结构1.1 Mybatis-Plus简介1.2 Mybatis-Plus特性1.3 Mybatis-Plus框架结构 2 Mybatis-Plus 快速入门2.1 数据库准备2.2 导入mybatis-plus依赖2.3 Spring整合MP2.4 编写实体类2.5 编…

i.MX8MP平台开发分享(gicv3篇)-- set_handle_irq及中断路由过程分析

专栏目录&#xff1a;专栏目录传送门 平台内核i.MX8MP5.15.71 文章目录 set_handle_irqhard中断入口 set_handle_irq(gic_handle_irq);set_handle_irq 这个函数的功能很简单&#xff0c;将gic_handle_irq设置为中断处理函数。在发生中断异常后&#xff0c;内核就会切入到这个…

060201面积-定积分在几何学上的应用-定积分的应用

文章目录 1 平面图形的面积1.1 直角坐标情形1.2 极坐标情形1.2.1 极坐标的定义1.1.2 曲边扇形的面积 结语 1 平面图形的面积 1.1 直角坐标情形 ①平面图形由 y f ( x ) , y 0 , x a , y b yf(x),y0,xa,yb yf(x),y0,xa,yb围成图像的面积&#xff0c;如下图1.1-1所示&#…

防洪决策指挥系统(Axure高保真原型)

使用Axure制作的rp高保真原型防洪决策指挥系统可用于行业参考、实际业务需求开发、学习交流使用&#xff0c;本原型需求可以作为开发使用&#xff0c;业务需求均为作者本人行业经验。本系统包括水系展示系统、城区调度决策系统、实时监测预警和防洪调度四大功能模块的界面。 原…

MICCAI 2023 FLARE国际竞赛:打造腹部泛癌CT分割Foundation Models

竞赛官网 CodaLab - Competitionhttps://codalab.lisn.upsaclay.fr/competitions/12239 背景介绍 腹部器官是相当常见的患癌部位&#xff0c;例如结直肠癌和胰腺癌&#xff0c;分别位列癌症死亡率排名的第二位和第三位。Computed tomography (CT) 成像可以为医生提供重要的诊…