Kafka核心参数(带完善)

news2024/11/17 13:24:02

客户端

api

Kafka提供了以下两套客户端API

  • HighLevel(重点)
  • LowLevel 

HighLevel API封装了kafka的运行细节,使用起来比较简单,是企业开发过程中最常用的客户端API。 而LowLevel API则需要客户端自己管理Kafka的运行细节,Partition,Offset这些数据都由客户端自行管理。这层API功能更灵活,但是使用起来非常复杂,也更容易出错。只在极少数对性能要求非常极致的场景才会偶尔使用

生产者发送消息

发送流程:

  1. 组装生产者核心配置参数
  2. 初始化生产者
  3. 组装消息
  4. 发送消息, 三种模式
    1. 单向发送, 不等待broker返回结果
    2. 同步发送
    3. 异步发送
  5. 关闭生产者

代码:

package com.kk.kafka.demo;

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class ProducerTest {

    public static final String KAFKA_URL = "192.168.6.128:9092";

    public static final String TOPIC = "oneTopic";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 组装生产者配置
        Properties ps = new Properties();
        ps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
        ps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        ps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 初始化生产者
        Producer<String, String> producer = new KafkaProducer<>(ps);
        for (int i = 0; i < 5; i++) {
            ProducerRecord<String, String> producerRecord = new ProducerRecord<>(TOPIC, "key" + i, "message" + i);
            // 同步发送
            producer.send(producerRecord);
            // 同步发送
            RecordMetadata metadata = producer.send(producerRecord).get();
            //异步发送
            producer.send(producerRecord, new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception e) {
                    if (metadata != null) {
                        System.out.println("Message sent successfully! Topic: " + metadata.topic() +
                                ", Partition: " + metadata.partition() +
                                ", Offset: " + metadata.offset() +
                                ", message: " + producerRecord.value());
                    } else {
                        System.err.println("Error sending message: " + e.getMessage());
                    }
                }
            });
        }
        producer.close();
    }
}

消费者消费消息

消费流程:

  1. 组装消费者核心配置参数
  2. 初始化消费者
  3. 订阅topic, 可订阅多个
  4. 拉取消息, 可配置超时时间
  5. 提交offset, 分为同步和异步两种方式, 服务端维护offset消费进度

代码:

package com.kk.kafka.demo;

import org.apache.kafka.clients.consumer.*;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class ConsumerTest {

    public static final String KAFKA_URL = "192.168.6.128:9092";

    public static final String TOPIC = "oneTopic";

    public static void main(String[] args) {
        // 组装消费者配置参数
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_URL);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, "your-consumer-group");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
        // 初始化消费者
        Consumer<String, String> consumer = new KafkaConsumer<>(props);
        // 订阅topic
        consumer.subscribe(Arrays.asList(TOPIC));
        while (true) {
            // 拉取消息,  100毫秒超时时间
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofNanos(100));
            //处理消息
            for (ConsumerRecord<String, String> record : records) {
                System.out.println("start Consumer offset = " + record.offset() + ";key = " + record.key() + "; value= " + record.value());
            }
            //提交offset,消息就不会重复推送。
            //同步提交,表示必须等到offset提交完毕,再去消费下一批数据。
            consumer.commitSync();
            //异步提交,表示发送完提交offset请求后,就开始消费下一批数据了。不用等到Broker的确认。
//            consumer.commitAsync();
        }
    }
}

客户端整体流程

拦截器

序列化器

发送到Dequeue

Dequeue满了或者批次满了或者阈值时间 推到InflightRequest

send线程将InflightRequest推到服务端Partition, 满足一定阈值

缓存机制

broker给生产者ack

其他重要核心参数 

客户端管理offset

幂等性原理

事务消息

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1322978.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Axure RP9】中继器应用及相关案例

一 中继器简介 1.1 中继器是什么 中继器&#xff08;Repeater&#xff09;是一种高级的组件&#xff08;Widget&#xff09;&#xff0c;用于显示文本、图像和其他元素的重复集合。它是一个容器&#xff0c;容器中的每一个项目称作“item”&#xff0c;由于“item”中的数据由…

open3d bug:pcd转txt前后位姿发生改变

1、open3d bug&#xff1a;pcd转txt前后位姿发生改变 open3d会对原有结果进行一个微小位姿变换 import open3d as o3d import numpy as np# 读取PCD点云文件 pcd o3d.io.read_point_cloud(/newdisk/darren_pty/zoom_centered_s2.pcd)# 获取点云坐标 points pcd.points# 指定…

多维时序 | MATLAB实现SSA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测

多维时序 | MATLAB实现SSA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测 目录 多维时序 | MATLAB实现SSA-CNN-LSTM-Multihead-Attention多头注意力机制多变量时间序列预测预测效果基本介绍模型描述程序设计参考资料 预测效果 基本介绍 MATLAB实现SSA-CNN-LST…

我的创作纪念日——成为创作者第1024天

机缘 一、前言 早上收到CSDN的推送信息&#xff0c;今天是我成为创作者的第1024天&#xff0c;回想起自己已经好久没有写博客了&#xff0c;突然间很有感触&#xff0c;想水一篇文章&#xff0c;跟小伙伴们分享一下我的经历。 二、自我介绍 我出生在广东潮汕地区的一个小城…

IntelliJ IDE 插件开发 | (三)消息通知与事件监听

系列文章 IntelliJ IDE 插件开发 |&#xff08;一&#xff09;快速入门IntelliJ IDE 插件开发 |&#xff08;二&#xff09;UI 界面与数据持久化IntelliJ IDE 插件开发 |&#xff08;三&#xff09;消息通知与事件监听 前言 在前两篇文章中讲解了关于插件开发的基础知识&…

Python Pandas Excel/csv文件的保存与读取(第14讲)

Python Pandas Excel/csv文件的读取于保存(第14讲)         🍹博主 侯小啾 感谢您的支持与信赖。☀️ 🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔ꦿ🌹꧔…

1.【分布式】分布式事务详解

分布式事务 1.分布式事务是什么&#xff1f;数据库事务 2.分布式事务产生的原因&#xff1f;存储层拆分服务层拆分 3.分布式事务解决方案4.分布式事务有哪些开源组件SeateTCC 分布式服务组件基于消息补偿的最终一致性 5.两阶段提交&#xff0c;三阶段协议详解二阶段提交协议三阶…

Excel怎样统计一列中不同的数据分别有多少个?

文章目录 1.打开Excel数据表2.选择“插入”&#xff0c;“数据透视表”3.选择数据透视表放置位置4.将统计列分别拖到“行”和“数值”区间5.统计出一列中不同的数据分别有多少个 1.打开Excel数据表 2.选择“插入”&#xff0c;“数据透视表” 3.选择数据透视表放置位置 4.将统计…

设计模式(三)-结构型模式(5)-外观模式

一、为何需要外观模式&#xff08;Facade&#xff09;? 要实现一个大功能&#xff0c;我们需要将它拆分成多个子系统。然后每个子系统所实现的功能&#xff0c;就由一个称为外观的高层功能模块来调用。这种设计方式就称为外观模式。该模式在开发时常常被使用过&#xff0c;所…

每日一练 | 华为认证真题练习Day150

1、IEEE802.1Q定义的VLAN帧格式总长度为多少字节&#xff1f; A. 4 B. 2 C. 3 D. 1 2、运行STP的交换机会发送BPDU。下面关于BPDU的说法正确的是&#xff08;&#xff09;&#xff08;多选&#xff09; A. BPDU帧的Control字段值为3 B. BFDU使用的是IEEE 802.3标准的帧 …

<软考>软件设计师-5计算机网络(总结)

1 网络功能和分类 1-1计算机网络的功能 计算机网络是计算机技术与通信技术相结合的产物&#xff0c;它实现了远程通信、远程信息处理和资源共享。计算机网络的功能:数据通信、资源共享、负载均衡、高可靠性。 1-2计算机网络按分布范围划分 1-3网络的拓扑结构 总线型&#xff0…

Linux常用基本命令操作

目录 一、认识shell 1、什么是shell 2、命令的本质 3、内部命令和外部命令 4、harsh缓存 5、命令执行的过程 6、如果打了一个命令&#xff0c;提示该命令不存在 7、命令提示符 8、Linux系统文件夹 二、Linux常用命令 1、通用Linux命令行格式 2、编辑Linux命令行的辅…

代码随想录第三十六天(一刷C语言)|背包问题理论基础分割等和子集

创作目的&#xff1a;为了方便自己后续复习重点&#xff0c;以及养成写博客的习惯。 一、背包问题 题目&#xff1a;有n件物品和一个最多能背重量为w 的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品只能用一次&#xff0c;求解将哪些物品装…

PostgreSQL入门指南:快速学会创建和管理数据库!

当谈到数据库管理系统时&#xff0c;PostgreSQL是一个功能强大且广泛使用的开源关系型数据库。在本次讲解中&#xff0c;我将为您介绍如何创建和管理数据库&#xff0c;并提供一些有关PostgreSQL的基本概念和最佳实践的指导。 创建数据库 在开始之前&#xff0c;请确保您已经成…

Mac M系列安装配置VSCode

一、终端输入 安装command line tools xcode-select --install 这里是已经下载了 如果没有下载点击安装&#xff0c;等待安装完成即可 检验是否安装成功&#xff0c;终端输入 clang 如图所示是代表之前的command line tools安装 是安装成功的&#xff08;Clang会不断更新…

设计模式——代理模式(结构型)

引言 代理模式是一种结构型设计模式&#xff0c; 让你能够提供对象的替代品或其占位符。 代理控制着对于原对象的访问&#xff0c; 并允许在将请求提交给对象前后进行一些处理。 问题 为什么要控制对于某个对象的访问呢&#xff1f; 举个例子&#xff1a; 有这样一个消耗大量…

10.鸿蒙应用程序app创建第一个程序Helloworld

鸿蒙应用程序开发app_hap开发环境搭建 1.打开DevEco 2.创建项目 3.选择Empty Ability 4. 选择API6,支持java开发 5.点击Finish 6.启动本地模拟器参考方法 7.启动成功 8.运行程序 9.运行成功 其它文章点击专栏

spring之面向切面:AOP(2)

学习的最大理由是想摆脱平庸&#xff0c;早一天就多一份人生的精彩&#xff1b;迟一天就多一天平庸的困扰。各位小伙伴&#xff0c;如果您&#xff1a; 想系统/深入学习某技术知识点… 一个人摸索学习很难坚持&#xff0c;想组团高效学习… 想写博客但无从下手&#xff0c;急需…

Python Opencv实践 - 手部跟踪

使用mediapipe库做手部的实时跟踪&#xff0c;关于mediapipe的介绍,请自行百度。 mediapipe做手部检测的资料&#xff0c;可以参考这里&#xff1a; MediaPipe Hands: On-device Real-time Hand Tracking 论文阅读笔记 - 知乎论文地址&#xff1a; https://arxiv.org/abs/2006…

不用再找了,这是大模型实践最全的总结

随着ChatGPT的迅速出圈&#xff0c;加速了大模型时代的变革。对于以Transformer、MOE结构为代表的大模型来说&#xff0c;传统的单机单卡训练模式肯定不能满足上千&#xff08;万&#xff09;亿级参数的模型训练&#xff0c;这时候我们就需要解决内存墙和通信墙等一系列问题&am…