kafka(三)springboot集成kafka(1)介绍

news2024/11/16 21:29:03

基于kafka新版本

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

一、kafkaProducer

1、介绍

设计上比consumer要简单一些,因为不涉及组管理,即每个producer都是独立工作的。

(1)目前producer的主要功能是向某个topic的某个分区发送一条消息,这就涉及分区选择策略,在ProducerRecord中介绍。

(2)因为有ISR,因此在发送消息时,producer有多种选择来实现消息发送,如不等待任何副本的影响便返回、只等待leader副本响应返回等等。

2、发送流程

produce的发送主要流程概述如下:

  1. 拦截器对发送的消息拦截处理;

  2. 获取元数据信息;

  3. 序列化处理;

  4. 分区处理;

  5. 批次添加处理;

  6. 发送消息。

 

3、主要参数
3.1、acks:

有三个参数:0、1、all,数据可靠性的重要参数,可以保证消息不丢失。

Properties properties = new Properties();
properties.put(ProducerConfig.ACKS_CONFIG,"1");
3.2、 buffer.memory

指定了producer端用于缓冲消息的缓冲区大小,单位是字节。

3.3、compression.type

producer是否压缩消息。

3.4、batch.size

二、ProducerRecord

1、介绍

发送给Kafka Broker的key/value 值对,producer将待发送的消息封装进ProducerRecord实例类。

2、发送消息分区策略
 (1)指定了分区:

当发送时指定了partition就使用该partition。即kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)指定了发送到哪个具体的分区。

(2)轮询

如果kafka生产者发送的消息ProducerRecord(String topic, Integer partition, K key, V value)没有指定发送到哪个具体的分区,即partition=null(并且key也为空时,如果此时key不为空的话就会采用另一种分区策略key哈希分区策略),并且使用了默认的分区器,那么消息将被随机的发送到主题的各个可用分区上,分区器使用轮询的算法将消息均衡的分布到各个分区。

(3)key哈希分区策略

根据消息的key进行哈希计算,并将消息发送到对应的分区。保证相同key的消息始终被发送到同一个分区,确保消息的顺序性。

(4)自定义分区策略(即自定义Partitioner)

用户可以根据自己的需求实现自定义的分区策略,通过实现org.apache.kafka.clients.producer.Partitioner接口来自定义分区选择逻辑。

二、 KafkaConsumer

三、生产者发送消息应用

1、同步发送消息

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回 ack。 由于 send 方法返回的是一个 Future 对象,根据 Futrue 对象的特点,我们也可以实现同 步发送的效果,只需在调用 Future 对象的 get 方发即可。

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
import java.util.concurrent.ExecutionException;
 
public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 默认为异步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
            // 末尾加get为同步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
        }
 
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
2、异步发送消息
2.1、普通异步
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
 
import java.util.Properties;
 
public class CustomProducer {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "wtyy"));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}
2.2、带回调函数的异步发送

 回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是 RecordMetadata 和 Exception,如果 Exception 为 null,说明消息发送成功,如果 Exception 不为 null,说明消息发送失败。

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

import org.apache.kafka.clients.producer.*;
 
import java.util.Properties;
 
public class CustomProducerCallBack {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        // 序列化器的serialization是一个接口,找到他的实现类
        // 我们一般都是使用String
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
                    new Callback() {
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) {
                           //(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法
                           //(2)消息发送失败  exception != null  也会调用该方法
                           if (exception == null) {
                               System.out.println(metadata);//使用打印演示
                           }else{
                               exception.printStackTrace();//打印异常信息
                           }
                       }
                    });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

四、消费者接收消息应用

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;
 
import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;
 
public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
        // 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)
        //  broker的ip地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置  反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组(组名必须)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 3. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 注册消费主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
        // 4.调用方法消费数据
        // 如果kafka集群没有新数据会造成空转
        // 填写参数为时间,如果没有拉取数据,线程睡眠一会
        while (true) {
            // 设置1s中消费的一批数据
            // Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1s
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 打印消费数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
            }
        }
        //5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1502120.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Python工具小技巧

Python工具小技巧 将.py后缀文件转化为.exe后缀文件安装PyinstallerPyinstaller参数大全 将.py后缀文件转化为.exe后缀文件 目前比较常见的打包exe方法都是通过Pyinstaller来实现的&#xff0c;本文也将使用这种常规方法。 安装Pyinstaller 首先我们要先安装Pyinstaller&…

Sora的新商业视角:从生态构建到未来产业协同

在科技飞速发展的当下&#xff0c;人工智能与机器学习不仅重塑了我们的生活方式&#xff0c;还深刻地改变了商业模式的构建方式。Sora&#xff0c;作为一款前沿的AI视频生成工具&#xff0c;其盈利路径和未来产业协同的可能性值得深入探讨。 Sora学习资料&#xff1a;使用方式…

前端javascript的DOM对象操作技巧,全场景解析

✨✨ 欢迎大家来到景天科技苑✨✨ &#x1f388;&#x1f388; 养成好习惯&#xff0c;先赞后看哦~&#x1f388;&#x1f388; 所属的专栏&#xff1a;前端泛海 景天的主页&#xff1a;景天科技苑 文章目录 1.js的DOM介绍2.节点元素层级关系3.通过js修改&#xff0c;清空节点…

新零售SaaS架构:订单履约系统架构设计(万字图文总结)

什么是订单履约系统&#xff1f; 订单履约系统用来管理从接收客户订单到将商品送达客户手中的全过程。 它连接了上游交易&#xff08;客户在销售平台下单环&#xff09;和下游仓储配送&#xff08;如库存管理、物流配送&#xff09;&#xff0c;确保信息流顺畅、操作协同&…

基于单片机的商品RFID射频安全防盗报警系统设计

目 录 摘 要 I Abstract II 引 言 1 1 系统方案设计 3 1.1 总体设计要求 3 1.2 总体设计方案选择 3 1.3 总体控制方案选择 4 1.4 系统总体设计 5 2 项目硬件设计 7 2.1 单片机控制设计 7 2.2 按键电路设计 10 2.3 蜂鸣器报警电路设计 10 2.4 液晶显示电路设计 11 2.5 射频识别…

计算数据集的总体标准差pstdev()

【小白从小学Python、C、Java】 【计算机等考500强证书考研】 【Python-数据分析】 计算数据集的总体标准差 pstdev() [太阳]选择题 pstdev() 的作用是&#xff08;&#xff09; import statistics a [0, 0, 8, 8] print("【显示】a ",a) print("【执行】st…

C++ Qt开发:QHostInfo主机地址查询组件

Qt 是一个跨平台C图形界面开发库&#xff0c;利用Qt可以快速开发跨平台窗体应用程序&#xff0c;在Qt中我们可以通过拖拽的方式将不同组件放到指定的位置&#xff0c;实现图形化开发极大的方便了开发效率&#xff0c;本章将重点介绍如何运用QHostInfo组件实现对主机地址查询功能…

考研复习-函数栈帧(详解)

目录 1. 什么是函数栈帧 2.函数栈帧的创建和销毁解析 2.1相关寄存器&#xff1a; 2.2相关汇编命令 运行代码&#xff1a; 3.预备知识&#xff1a; 4.正式开始&#xff1a; 4.1转到反汇编 4.2函数栈帧的创建 4.3函数栈帧的销毁 1. 什么是函数栈帧 我们在写C语言代码的时…

签约仪式如何策划和安排流程?如何邀约媒体现场见证报道

传媒如春雨&#xff0c;润物细无声&#xff0c;大家好&#xff0c;我是51媒体网胡老师。 签约仪式的策划和安排流程&#xff0c;以及邀约媒体现场见证报道&#xff0c;都是确保活动成功和提升影响力的关键环节。以下是一些建议&#xff1a; 签约仪式的策划和安排流程 明确目标…

UE5 UE4 开发常用工具AssetDeveTool

AssetDeveTool工具&#xff0c;支持UE5 5.0-.5.3 UE4 4.26/4.27 下载链接&#xff1a; 面包多 https://mbd.pub/o/bread/ZZubkphu 工坊&#xff1a; https://gf.bilibili.com/item/detail/1104960041 包含功能&#xff1a; 自动化批量展UV功能 快速选择功能 自动化批量减面功能…

启动查看工具总结

启动目标&#xff1a;2s内优秀&#xff0c;2-5s普通&#xff0c;之后的都需要优化&#xff0c;热启动则是1.5s-2s内 1 看下大致串联启动流程&#xff1a; App 进程在 Fork 之后&#xff0c;需要首先执行 bindApplication Application 的环境创建好之后&#xff0c;就开始activ…

【CSP试题回顾】202109-2-非零段划分

CSP-202109-2-非零段划分 关键点&#xff1a;差分数组 详见&#xff1a;【CSP考点回顾】差分数组 时间复杂度分析 使用差分数组的优势在于&#xff0c;它将问题转化为了在一次遍历中识别并利用关键变化点&#xff08;波峰和波谷&#xff09;&#xff0c;从而避免了对每个可能…

【重要公告】BSV区块链协会开始对Teranode节点软件进行技术测试

​​发表时间&#xff1a;2024年2月22日 Teranode节点软件将使BSV区块链网络的交易处理速度提升至每秒110万笔&#xff0c;从而拓宽企业和政府客户的区块链应用范围。 2024年2月22日&#xff0c;瑞士楚格 - BSV区块链协会宣布已经开始对Teranode节点软件进行技术测试&#xff…

软考高级:系统工程生命周期阶段概念和例题

作者&#xff1a;明明如月学长&#xff0c; CSDN 博客专家&#xff0c;大厂高级 Java 工程师&#xff0c;《性能优化方法论》作者、《解锁大厂思维&#xff1a;剖析《阿里巴巴Java开发手册》》、《再学经典&#xff1a;《Effective Java》独家解析》专栏作者。 热门文章推荐&am…

Vue:自动按需导入element-plus图标

自动导入使用 unplugin-icons 和 unplugin-auto-import 从 iconify 中自动导入任何图标集。 完整vite.config.js参考模板 https://download.csdn.net/download/ruancexiaoming/88928539 导入element-plus图标 命令行安装unplugin-icons pnpm i -D unplugin-icons//没有安装自…

Vue中用户权限如何处理?

Vue中用户权限如何处理&#xff1f; 在 Vue 中&#xff0c;可以采用多种方式来处理用户权限&#xff0c;以下是一些常见的方法&#xff1a; 1. 使用路由守卫 Vue Router 提供了 beforeEach 导航守卫&#xff0c;可以在路由跳转之前进行权限检查。例如&#xff1a; router.be…

【vue.js】文档解读【day 3】 | 条件渲染

如果阅读有疑问的话&#xff0c;欢迎评论或私信&#xff01;&#xff01; 文章目录 条件渲染前言&#xff1a;v-ifv-elsev-else-iftemplate中的v-ifv-showv-if vs v-show 条件渲染 前言&#xff1a; 在JavaScript中&#xff0c;我们知道条件控制语句可以控制程序的走向&#…

手写分布式配置中心(六)整合springboot(自动刷新)

对于springboot配置自动刷新&#xff0c;原理也很简单&#xff0c;就是在启动过程中用一个BeanPostProcessor去收集需要自动刷新的字段&#xff0c;然后在springboot启动后开启轮询任务即可。 不过需要对之前的代码再次做修改&#xff0c;因为springboot的配置注入value("…

746. 使用最小花费爬楼梯 (Swift版本)

题目 给你一个整数数组 cost&#xff0c;其中 cost[i] 是从楼梯第 i 个台阶向上爬需要支付的费用。一旦你支付此费用&#xff0c;即可选择向上爬一个或者两个台阶。 你可以选择从下标为 0 或下标为 1 的台阶开始爬楼梯。 请你计算并返回达到楼梯顶部的最低花费。 限制条件 2…

6. 虚拟机及Linux安装

虚拟机及Linux安装 进行嵌入式项目开发&#xff0c;第一步就是要建立嵌入式开发环境&#xff0c;主要包括安装 Bootloader 工具、不同平台的交叉编译器&#xff08;如ARM 平台的arm-linux-gcc&#xff09;、内核源码树&#xff08;在需要编译和配置内核时&#xff09;、在调试…