Kafka基础_2

news2025/1/13 10:03:06

Kafka系列

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下Kafka系列
#博学谷IT学习技术支持


文章目录

  • Kafka系列
  • 前言
  • 一、Topic的分片与副本机制
    • 1.什么是分片呢
    • 2.什么是副本呢
  • 二、kafka如何保证数据不丢失
    • 1.生产者如何保证数据不丢失
      • 1.ACK机制:
      • 2.相关思考点:
      • 3.相关的配置:
      • 4.模拟同步发送数据:
      • 5.模拟异步缓冲池发送数据:
    • 2.Broker端如何保证数据不丢失
    • 3.消费端如何保证数据不丢失
      • 1使用手动的方式提交偏移量信息:
  • 三、kafka的消息存储和查询机制
  • 四、kafka中生产者的数据分发策略
  • 五、kafka的消费者负载均衡的机制
    • 1点对点消费模式:
    • 2发布订阅消费模式:
  • 六、通过命令的方式查看数据积压问题
  • 总结


前言

Kafka是Apache旗下的一款开源免费的消息队列的中间件产品,最早是由领英公司开发的, 后期共享给Apache, 目前已经是Apache旗下的顶级开源的项目, 采用语言为Scala
在这里插入图片描述

适用场景: 数据传递工作, 需要将数据从一端传递到另一端, 此时可以通过Kafka来实现, 不局限两端的程序

​ 在实时领域中, 主要是用于流式的数据处理工作
在这里插入图片描述


一、Topic的分片与副本机制

1.什么是分片呢

分片: 逻辑概念
相当于将一个topic(大容器)拆分为N多个小的容器, 多个小的容器构建为一个Topic(大容器)

目的:
1- 提高读写的效率: 分片可以分布在不同节点上, 在进行读写的时候, 可以让多个节点一起负责
2- 分布式存储: 解决单台节点存储容量有限的问题

分片的数量: 分片是可以创建N多个, 理论上没有任何的限制

2.什么是副本呢

副本: 物理概念
针对每个分片的数据, 可以设置备份, 可以将其备份多个

目的:
提高数据的可靠性, 防止数据丢失

副本的数量: 副本的数量最多与节点的数量保持一致, 但是一般设置2个 或者 3个最多了

二、kafka如何保证数据不丢失

1.生产者如何保证数据不丢失

在这里插入图片描述

1.ACK机制:

当生产者将数据生产到Broker后, Broker应该给予一个ack确认响应, 在kafka中, 主要提供了三种ack的方案:
ack=0 : 生产者只管发送数据, 不关心不接收Broker给予的响应
ack=1 : 生产者将数据发送到Broker端, 需要等待Broker端对应的Topic上对应分片上的主副本接收到消息后, 才认为发送成功了
ack=-1|ALL: 生产者将数据发送到Broker端, 需要等待Broker端对应的Topic上对应分片上的所有的副本都接收到消息后, 才认为发送成功了

效率角度:  0  > 1 > -1

安全角度:  -1 > 1 > 0

2.相关思考点:

思考1: 如果Broker迟迟没有给予ACK响应如何解决呢?

解决方案: 设置超时时间, 如果超时触发重试策略, 如果多次重试依然无法解决, 此时程序直接报错

思考2: 每发送一次数据, broker就要给予一次ACK响应, 这样是否会对网络带宽产生影响, 如何解决?

解决方案: 会,引入缓存池, 满足一批数据后, 异步发送给Broker端, Broker端只需要针对这一批数据给予一次响应即可

3.相关的配置:

1- buffer.memory : 缓存池的大小 默认值: 33554432(32M)

2- 重试次数:
retries : 重试次数 默认值 2147483647 但是最终重试不完全取决于此参数
delivery.timeout.ms : 一次发送数据总的超时时间 默认值为 120000(120s)
request.timeout.ms : 一次请求的超时时间 默认值为30000(30s)
最终重试次数: (delivery.timeout.ms / request.timeout.ms) -1

3- 一批数据阈值: 大小 OR 时间 满足哪个执行哪个
batch.size: 一批数据的大小 默认值 16384(16kb)
linger.ms: 每一批次的间隔时间 默认值 0

4.模拟同步发送数据:

package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class KafkaProducerSyncTest {
    // 演示 如何同步发送数据
    public static void main(String[] args) {

        // 1. 创建Kafka的核心对象
        Properties props = new Properties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks",-1);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer<String,String>(props);
        
        //2.执行发送数据  同步发送模式

        try {
            kafkaProducer.send(new ProducerRecord("test01","张三111")).get();
            // 发送成功了 ....
        } catch (Exception e) {
            // 发送失败了: 一旦失败, 就会抛出异常
            // 注意:  一旦有异常, 程序已经自动完成重试操作后, 依然无法发送成功的状态
            
            // 在此处编写发送失败后, 解决方案....
            e.printStackTrace();
        }    
    }
}

5.模拟异步缓冲池发送数据:

package com.itheima.kafka.producer;

import org.apache.kafka.clients.producer.Callback;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;

import java.util.Properties;

public class KafkaProducerASyncTest {
    // 演示 如何异步发送数据
    public static void main(String[] args) {

        // 1. 创建Kafka的核心对象
        Properties props = new Properties();
        props.put("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.put("acks","-1");
        props.put("batch.size",32768);
        props.put("linger.ms",3000);
        props.put("key.serializer","org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer","org.apache.kafka.common.serialization.StringSerializer");

        KafkaProducer kafkaProducer = new KafkaProducer<String,String>(props);

        //2.执行发送数据  异步发送模式
        kafkaProducer.send(new ProducerRecord("test01", "张三111"), new Callback() {

            @Override
            public void onCompletion(RecordMetadata metadata, Exception exception) {
                // 回调函数: 底层在异步发送数据的时候, 发送一次, 就会调用一次回调函数, 如果exception不为null, 说明发送失败了
                System.out.println(exception);
                if(exception != null){
                    // 认为数据发送失败
                    // 在此处编写处理失败的逻辑代码
                }
                // 发送成功了....
            }
        });
        
        //3 .释放资源
        kafkaProducer.close();
    }
}

2.Broker端如何保证数据不丢失

保证方案: 磁盘存储 + 多副本 + ack为-1

3.消费端如何保证数据不丢失

在这里插入图片描述
总结: 通过此种方式, 可以保证消费端不会将数据丢失, 但是存在重复消费的问题

1使用手动的方式提交偏移量信息:

package com.itheima.kafka.consumer;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class KafkaConsumer2Test {
    // 演示 手动提交偏移量
    public static void main(String[] args) {
        // 1- 创建Kafka的消费者的核心对象: KafkaConsumer
        Properties props = new Properties();
        props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        props.put("group.id", "t1"); // 消费者组的ID
        props.put("enable.auto.commit", "false"); // 是否自动提交偏移量offset
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // key值的反序列化的类
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); // value的值反序列化的类

        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //2. 订阅topic: 表示消费者从那个topic来消费数据  可以指定多个
        consumer.subscribe(Arrays.asList("test01"));

        while (true) {
            // 3. 从kafka中获取消息数据, 参数表示当kafka中没有消息的时候, 等待的超时时间, 如果过了等待的时间, 返回空对象(对象存在, 但是内部没有数据  相当于空容器)
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            for (ConsumerRecord<String, String> record : records) {
                long offset = record.offset();
                String key = record.key();
                String value = record.value();
                // 偏移量: 每一条数据 其实就是一个偏移量 , 每个分片单独统计消息到达了第几个偏移量 偏移量从 0 开始的
                System.out.println("消息的偏移量为:"+offset+"; key值为:"+key + "; value的值为:"+ value);

            }
            // 每消费完一批 提交一次偏移量
            // 注意: 一旦使用手动提交偏移量, 千万要注意, 必须写提交偏移量代码, 否则会导致大量的数据重复消费
            consumer.commitAsync(); // 异步提交方式
            //consumer.commitSync();
        }
    }
}

三、kafka的消息存储和查询机制

在这里插入图片描述
查询数据的步骤:

  • 1- 确定消息被存储在那个segment片段中
  • 2- 先去对应segment端中index文件, 从这个索引文件中, 查询对应偏移量, 在Log文件的什么位置上进行存储的
  • 3- 根据返回的在log文件的具体的位置信息, 基于磁盘顺序查询方式查询Log文件, 找到对应位置上的数据即可

四、kafka中生产者的数据分发策略

生产者的数据分发策略: 生产者将数据生产到Kafka的某个Topic中, Topic可以被分为多个分片, 最终一条消息只能被其中一个分片所接收,那么最终是由哪个分区来接收数据呢? 这就是生产者的数据分发

Kafka所支持的分区策略:
1- 粘性分区策略(2.4版本下, 支持轮询策略) Java客户端支持, 但是Python客户端不支持
2- hash取模的策略
3- 指定分区策略
4- 随机分区策略: Python客户端是支持的 Java不支持
5- 自定义分区策略

五、kafka的消费者负载均衡的机制

在这里插入图片描述

1点对点消费模式:

让所有监听这个topic的消费者都 在 同一个消费者组

2发布订阅消费模式:

让所有监听这个topic的消费者都 不在 同一个消费者组

六、通过命令的方式查看数据积压问题

在这里插入图片描述


总结

以上就是今天要讲的内容,本文主要介绍了Kafka的基础和一些常规Java的操作。以后会有更多关于kafka的企业级综合案例。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/124952.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

18-RocketMQ源码解读

NameServer启动 1、功能回顾 NameServer的核心作用 一是维护Broker的服务地址并进行及时的更新。 二是给Producer和Consumer提供服务获取Broker列表。 2、启动流程-源码重点 整个NameServer的核心就是一个NamesrvController对象。这个controller对象就跟java Web开发中的Contr…

3D可视化大屏是如何实现的?

3D可视化是指拥有3D效果的数据可视化&#xff0c;对于所要展示的数据可视化内容还原出真实场景&#xff0c;并实时接入数据&#xff0c;在面对复杂操作时灵活应对&#xff0c;使得整个场景在大屏上的展示更具立体、更具科技感、更具易用性。 物联网时代&#xff0c;可视化大屏的…

【发表案例】传感器网络及电路类,仅1个月26天录用

【期刊简介】IF&#xff1a;1.0-2.0&#xff0c;JCR4区&#xff0c;中科院4区 【检索情况】SCI 在检&#xff0c;正刊 【征稿领域】自主传感器网络的高级接口电路及其应用 【参考周期】2个月左右录用 【截稿日期】2023.1.31 重要时间节点&#xff1a;仅1个月26天录用 2022/12…

神经网络中常用的权重初始化方法及为何不能全初始化为0

1.权重初始化的重要性 神经网络的训练过程中的参数学习时基于梯度下降算法进行优化的。梯度下降法需要在开始训练时给每个参数赋予一个初始值。这个初始值的选取十分重要。在神经网络的训练中如果将权重全部初始化为0&#xff0c;则第一遍前向传播过程中&#xff0c;所有隐藏层…

深度学习笔记:感知机

感知机&#xff08;perceptron&#xff09;为神经网络的起源算法。感知机接受多个输入信号&#xff0c;输出一个信号。感知机信号只有0和1。 在上图的感知机中&#xff0c;x1和x2两个输入信号会分别乘以其对应权重(weight) w1和w2&#xff0c;传入神经元。神经元计算传来信号综…

Disentangled Face Attribute Editing via Instance-Aware Latent Space Search翻译

论文地址 代码地址 摘要 最近的研究表明&#xff0c;生成对抗网络&#xff08;GAN&#xff09;的潜空间中存在一组丰富的语义方向&#xff0c;这使得各种面部属性编辑应用成为可能。然而&#xff0c;现有的方法可能会遇到属性变化不好的问题&#xff0c;从而导致在更改所需属…

JS中数组对象使用

文章目录一、创建数组对象二、数组翻转1.检测数组2.翻转数组&#xff1a;三、添加数组元素1.push方法2.unshift方法四、删除数组元素1.pop方法2.shift方法&#x1f918;案例1五、数组排序六、数组索引方法1.indexof(数组元素)2.lastIndexOf方法&#x1f91f;案例2七、数组转化为…

数字验证学习笔记——SystemVerilog芯片验证16 ——约束控制块随机函数

一、约束块控制 一个类可以包含多个约束块。可以把不同约束块用于不同测试。一般情况下&#xff0c;各个约束块之间的约束内容是互相协调不违背的&#xff0c;因此通过随机函数产生随机数时可以找到合适的解 如果子类继承父类&#xff0c;也继承了父类的约束&#xff0c;这个时…

基于蒙特卡诺的电动汽车充电负荷曲线研究(充电开始时间,充电电量,充电功率)(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【C++】 STL-vector模拟实现

文章目录vector源码的内容:成员变量默认构造函数构造函数1-无参构造构造函数2 -使用n个相同的值构造构造函数3-使用迭代器区间构造拷贝构造函数**传统写法**现代写法赋值重载函数opeartor传统写法现代写法析构函数迭代器begin & end任意类型vector容器迭代器通用遍历方式:容…

paddleOCRv3之四: rec识别部分用 tensorRT(C++)部署

文章目录1. 简介&#xff1a;速度测试2. paddle 模型转onnx3. onnx转为tensorRT的engine模型4. tensorRT在vs2017中的配置5. 源码1. 简介&#xff1a; tensorRT是nvdia GPU模型部署的一个框架&#xff0c;似乎只是部分开源&#xff0c;github地址.大多数时候用这个框架去部署模…

十九、Docker容器监控之CAdvisor+InfluxDB+Granfana

1、概述 Docker自带查询容器状态的命令&#xff1a;docker stats&#xff0c;可以看到容器的ID\名称、占用CPU、内存等信息 但是我们不能时时刻刻的盯着这个命令&#xff0c;并且这个都是实时数据不能留痕&#xff0c;如果这个时候某一个容器挂了&#xff0c;我们想查看下当时…

webpack性能优化

splitChunks webpack splitChunks minSize: 只有到目标文件超过这个minSize时才会分包。cacheGroups: 可以对某个第三方包进行单独分离出来 例如&#xff1a; splitChunks: {minSize: 300 * 1024&#xff0c;chunks: all,name: aaa,cacheGroups: {jquery: {name: jquery,test…

SCADA平台在风电场测量的应用,实现风电场的高效管理

一、应用背景 随着煤碳、石油等能源的逐渐枯竭&#xff0c;人类越来越重视可再生能源的利用。风能作为一种清洁的可再生能源日益受到世界各国的重视。中国风能储量大&#xff0c;分布面广&#xff0c;仅陆地上的风能储量就约2.53亿千瓦。我国的风电发展起步较晚&#xff0c;但…

大数据教学实训沙盘介绍

沙盘的作用主要有3个&#xff1a; 1、采集真实数据&#xff0c;解决教学中缺少真实数据的困扰&#xff1b; 2、形成从数据采集、预处理、挖掘建模、模型部署的业务闭环&#xff0c;可以把构建模型发布到沙盘系统上&#xff0c;根据模型产生真实的反馈不断的修正模型精度&#x…

DoIP协议从入门到精通系列——车辆声明

上篇文章对DoIP中物理连接做了说明和描述,介绍了以太网应用到车载网络中重要的两个组织: IEEE;OPEN联盟。本文主要对物理连接后,车辆进行自属信息声明过程做一个完整描述。 一、基础信息 DoIP协议标准由一个或多个DoIP实体实施,具体取决于车辆的网络架构。如下图是车辆网…

SuperMap iServer在不同系统中设置开机自启动--Windows篇

目录前言1.删除已有的 SuperMap iServer 系统服务2.注册 SuperMap iServer 系统服务3.设置 SuperMap iServer 系统服务开机自启动实例作者&#xff1a;kxj 前言 在成功部署SuperMap iServer之后&#xff0c;每次重启电脑都需要手动去启动iServer&#xff0c;如何能让iServer在…

HTML5 Web Worker(多线程处理)

文章目录HTML5 Web Worker(多线程处理)概述简单使用处理复杂数据HTML5 Web Worker(多线程处理) 概述 JavaScript的执行环境是单线程的&#xff0c;也就是一次只能执行一个任务。如果遇到多个任务时&#xff0c;只能排队依次执行。 在HTML5中&#xff0c;可以使用Web Worker创…

小程序集成Three.js,使用npm安装gsap动画库

0.视频演示 three.js集成gsap创建物体动画gsap作为简单易用的补间动画库&#xff0c;获得开发者一致好评。 在小程序中&#xff0c;我们集成了Three.js第三方库&#xff0c;可以创建和加载模型及场景&#xff0c;但是做动画还是需要第三方库的支持。 下面详细说明如何在小程序…

Java SPI机制详解

一、什么是SPI SPI全称Service Provider Interface&#xff0c;是Java提供的一种服务发现机制。实现服务接口和服务实现的解耦。 Java SPI 实际上是“基于接口的编程&#xff0b;策略模式&#xff0b;配置文件”组合实现的动态加载机制&#xff0c;实现不修改任何代码的情况下…