KAFKA第二课之生产者(面试重点)

news2025/2/9 6:50:31

生产者学习

1.1 生产者消息发送流程

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。
生产者如何发送的?
现在Main线程中将数据进行处理,处理成IO型数据,然后调用sender进行发送
Main:
1.读取生产者配置
2.产生数据
3.过滤数据(校验什么的)
4.序列化
5.放入缓冲区 RecordAccumulator
6.发送Sender

细节: 考虑的问题 1.生产者配置的读取和修改 2.数据的过滤与分区, 3.缓冲区是如何设置的,大小
4.发送(发送失败怎么样,请求区的大小)
这里注意一下,可以在缓冲区对数据进行压缩,这样就提高缓冲区的容量和发送的数据量,提高吞吐量

1.2 同步发送与异步发送

1.什么是同步和异步

同步就是,串行,一条龙 异步 一起运行
举例: 餐馆点餐
同步: 需要等服务员过来,让服务员记录,
异步: 点餐APP直接点餐,交给队列,让他自己运行

2.发送的同步异步

同步:需要得到返回值
异步:发送过去不管了

3. 分区好处

啥是分区?
将一个数据块分成多个数据块
将数据分布式处理了
存储: 可以分在多个机器上, 也可以整多个副本。便于存储,同时提高健壮性
IO:多个数据块可以同时进行发送接收消费。生产者可以以分区为单位发送数据,消费者可以以分区为单位进行消费

4. 默认分区器

前提条件: 1.分区 2.key值
规则:

  • 1存在,按1分区
  • 1不存在,按2.key值对分区数取余得到的值分区
  • 1.2都不存在 随机选个分区,等这个批次发送完了,再换

3 就是粘性分区
那么粘性分区的缺点是什么?
因为缓冲区溢出的条件是,大小和时间双重判断,如果大小不够,但是时间够了,还是会发走,这样,最后导致,分区上产生数据倾斜
如何解决的?
3.3.1 Kafka去掉粘性分区的时间控制,批次只由大小判断

1.3.自定义分区器

1.思路

  • 1.实现接口Parititoner,重写相关方法
  • 2.修改配置 将partitioner设置为默认配置

2.1 自定义分区器代码

public class MyPartitioner implements Partitioner {
    //  自定义分区器 实现partitioner接口

    // 1.分区方法
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取消息
        String data = value.toString();
        // 创建partition 作为最后的分区标识
        int partitions;

        // 分区逻辑
        // 根据含有的字符串进行判断 判断进入哪个分区
        if (data.contains("atguigu")){
             partitions = 0;
        } else if (data.contains("shangguigu")){
            partitions = 1;
        } else {
            partitions = 2;
        }
        return partitions;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

2.2 主类

package com.atguigu.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

public class ProducerClientAsync {
    public static void main(String[] args) {
        // 0 配置对象
        Properties properties = new Properties();

        //  --指定kafka的Broker地址
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092,hadoop103:9092");
        //  -- 1.指定序列化器 序列化器的全限定类名
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //.setProperty(ProducerConfig.LINGER_MS_CONFIG,"0");
        // -- 2.设置分区器
        properties.setProperty(ProducerConfig.PARTITIONER_CLASS_CONFIG,MyPartitioner.class.getName());
        // -- 3.获取客户端连接对象
        KafkaProducer<String,String> kafkaProducer= new KafkaProducer<String,String>(properties);
        //  key是主题  v是发送内容  这里注意一下
        // -- 4.发送数据
        String[] str= {"atguigu","111","atguigu","shangguigu","222"};
        for (int i =0; i < str.length; i++) {
            System.out.println(str[i]);
            try {
                kafkaProducer.send(new ProducerRecord<>("first", str[i]), new Callback() {
                    @Override
                    public void onCompletion(RecordMetadata metadata, Exception exception) {
                        if (exception == null){

                            System.out.println("主题:" + metadata.topic() + "->"  + "分区:" + metadata.partition());
                        }else {
                            // 出现异常打印
                            exception.printStackTrace();
                        }
                    }
                }).get();
            } catch (InterruptedException e) {
                throw new RuntimeException(e);
            } catch (ExecutionException e) {
                throw new RuntimeException(e);
            }
        }

        kafkaProducer.close();
    }
}

在这里插入图片描述

3.面试细节

1.如何提高生产者的吞吐量

  • 批次大小调到16
  • 将等待时间改成50-100ms 默认是0
  • 压缩数据量,这样每次发送的数据就多了
  • 加大缓冲区大小,进来的数据变多,发送也能提上去

2.生产者如何保证数据可靠性的

主要通过ack机制

1.什么是ACK机制?

根据ack值来决定Kafka集群服务端的存储应答

  • ack=0 最低 生产者只管发送,不用接收
  • ack=1 中等 生产者发送完需要等待Leader保存后回应,
  • ack=-1 最高 生产者发送完需要等待所有副本保存后回应

2.分析ACK机制

性能与安全是成反比的
所以,-1虽然最安全,但是效率最低

3.如果将ACK调到-1会出现什么问题?

有可能出现数据重复发送与接收
比如,在同步的瞬间,Leader死掉,但是其他副本已经落盘,这时候,就是问题了。
因为Leader死掉了,所以会直接更换Leader,选出一个副本作为Leader,注意,这时显示没有收到内容,所以,send重新发送,这时候,每个副本上,收到的就是2份该数据了。

4.应用场景

acks=0 几乎不用
acks=1 传输普通日志,允许丢失
acks=-1 传输高可靠性数据,一般与钱有关

5.ACK=-1一定可靠么?

不一定
如果分区副本数设置为1 ,或者ISR里应答的最小副本数设置为1(默认也是1),这时候,ack=1效果相同了。
也就是说,应答一个,就能走,就没意义了
所以需要完全可靠就需要配置一下
ACK=-1 & 分区副本大于等于2 & ISR应答最小副本数量大于等于2

3. 数据去重

1.概念

至少一次:一次或者多次 完全可靠
在这里插入图片描述
最多一次:直接不管回复只管发送 ack=0

至少:保证数据不丢失,但是无法保证数据不重复
最多: 无法保证数据不丢失

1.如何解决数据的重复发送与接收的问题,同时保证数据的不丢失

注意,这里解决的是sender和服务端的重复发送与接收,而不是生产者本身发送多个重复消息的问题,这个要搞清楚。
一般重复问题,都是通过标识来判别,从而去重的
Kafka 0.11 引入 幂等性和事务
精确一次: 幂等性 +至少一次(ack=-1 & 分区副本>=2 & ISR最小副本>=2)

4.幂等性

1.概念

啥是幂等性,标识一个消息的唯一标识
<pid,partition,Seqnumber>
Pid 是会话ID,每次重新生成会话,就会重新生成PID
partition是分区 标识 消息是哪个分区的
Seqnumber是单调递增的标识,注意,这是每个分区独享的
这三个在一起,才是唯一标识。

2.如何使用幂等性

开启参数enable.idempotence 默认为true,false关闭。
开启开关就行

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/864626.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

vant van-tabs van-pull-refresh van-list 标签栏+上拉加载+下拉刷新

<template><div class"huibj"><div class"listtab"><!--顶部导航--><div class"topdh"><topnav topname"余额明细"></topnav></div><!--Tab 标签--><van-tabs v-model"…

126、高频Redis面试题:如何保证Redis和数据库数据一致性

高频Redis面试题:如何保证Redis和数据库数据一致性 缓存用法如何更新缓存先更新缓存&#xff0c;再更新数据库先更新数据库&#xff0c;再更新缓存先删除缓存&#xff0c;再更新数据库延时双删&#xff08;删除缓存&#xff0c;更新数据库&#xff0c;再延时删除缓存&#xff0…

Linux 内存管理新特性 - Memory folios 解读 | 龙蜥技术

本文内容基于 Linux 5.16&#xff0c;folio 基础部分开始合入。截止到目前 Linux 6.5&#xff0c;folio 已经有很大进展&#xff0c;会在后续文章中介绍。作者&#xff1a;徐宇。 01 folio [ˈfoʊlioʊ] 是什么 引用 LWN: Memory folios &#xff1a;https://lwn.net/Articl…

基于Selenium技术方案的爬取界面内容实践

1. 定位页面&#xff08;多窗口切换&#xff09; WebDriver提供了处理多个窗口的能力&#xff0c;这是通过使用“WebDriver.switchTo.window()”方法来切换到已知名称的窗口来实现的。如果名称未知&#xff0c;您可以使用“WebDriver.getWindowHandles()”获取已知窗口列表。您…

苍穹外卖项目解读(五 完结) POI Easyexcel excel操作

前言 HM新出springboot入门项目《苍穹外卖》&#xff0c;笔者打算写一个系列学习笔记&#xff0c;“苍穹外卖项目解读”&#xff0c;内容主要从HM课程&#xff0c;自己实践&#xff0c;以及踩坑填坑出发&#xff0c;以技术&#xff0c;经验为主&#xff0c;记录学习&#xff0…

Java课题笔记~ JSP内置对象

(1)九个内置对象 jsp的内置对象&#xff1a;JSP内置对象是不需要声明和创建就可以在JSP页面脚本中使用的成员变量。 九个内置对象&#xff1a; 1.out对象 在JSP页面中&#xff0c;经常需要向客户端发送文本内容&#xff0c;这时&#xff0c;可以使用out对象来实现。out对象…

引入三阶失真的非线性放大器的模拟输出及使用中值滤波器去除峰值研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

【Spring】-Spring中Bean对象的存取

作者&#xff1a;学Java的冬瓜 博客主页&#xff1a;☀冬瓜的主页&#x1f319; 专栏&#xff1a;【Framework】 主要内容&#xff1a;往spring中存储Bean对象的三大方式&#xff1a;XML方式(Bean标签)&#xff1b;五大类注解&#xff1b;方法注解。从spring中取对象的两种方式…

穿越未来:探索虚拟现实科技的未来前景

虚拟现实&#xff08;Virtual Reality&#xff0c;简称VR&#xff09;科技&#xff0c;正如一颗崭新的明星&#xff0c;迅猛崛起&#xff0c;为人类带来前所未有的体验和想象空间。随着科技的飞速发展&#xff0c;VR 科技的未来充满了无限的可能性&#xff0c;正将我们引向一个…

【LNMP(分布式)】

目录 一、LNMP是什么 二、实际步骤 1.启用虚拟机 1.1 启动三台虚拟机分别命名为nginx&#xff0c;mysql&#xff0c;php 1.2 分别配置基础环境 1.3 测试外网连通性 2.更新源 3.安装nginx并配置 3.1 下载nginx源码包并安装 3.2 配置nginx 4.安装mysql并配置 4.1 安装…

在.NET 6.0中自定义接口路由

在本文中&#xff0c;我们将讨论ASP.NET Core中的新路由。我们将了解什么是接口(endpoints)路由&#xff0c;它是如何工作的&#xff0c;它在哪里使用&#xff0c;以及如何创建自己的路由。 本文主题&#xff1a; 探索接口路由创建自定义接口创建更复杂的接口 名词定义&#…

029 - integer types 整数类型

MySQL支持SQL标准整数类型 INTEGER&#xff08;或INT&#xff09;和 SMALLINT。作为一个可扩展标准&#xff0c;MySQL也支持整数类型 TINYINT&#xff0c;MEDIUMINT和 BIGINT。下表显示了每种整数类型所需的存储空间和范围。 表11.1 MySQL支持的整数类型的必需存储和范围 类型…

日常问题——idea工具中SpringBoot使用@AutoWriter 爆红

&#x1f61c;作 者&#xff1a;是江迪呀✒️本文关键词&#xff1a;日常BUG、BUG、问题分析☀️每日 一言 &#xff1a;存在错误说明你在进步&#xff01; 一、问题描述 使用AutoWrited注解注入Bean时&#xff0c;变量下面有红线&#xff0c;但是不影响运行。 二…

【idea】点击idea启动没反应

RT 点击idea启动的时候没反应&#xff0c;接着百度报错&#xff0c;基本跟他们的也不一样。 首先我是做版本升级。其次&#xff0c;我之前是破解的。如果你也是跟我一样的话&#xff0c;那问题可能就处在破解上了 解决方式 首先&#xff0c;是跟大部分解决思路一样。先找到项…

【趋势检测和隔离】使用小波进行趋势检测和隔离研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

基于Java+SpringMVC+Mybaties+layui+Vue+elememt基于协同过滤的电影推荐系统的设计与实现

一.项目介绍 基于协调过滤的电影推荐系统的设计与实现 本系统分为普通用户以及管理员两类 普通用户&#xff1a; 登录系统查看最新推荐电影、收藏、评论、查看电影信息、 对电影进行评分、浏览电影资讯信息、查看个人信息以及浏览收藏…

【Python数据容器】--- 元组的基本使用

个人主页&#xff1a;平行线也会相交 欢迎 点赞&#x1f44d; 收藏✨ 留言✉ 加关注&#x1f493;本文由 平行线也会相交 原创 收录于专栏【Python小白从入门到精通】&#x1f388; 本专栏旨在分享学习Python的一点学习心得&#xff0c;欢迎大家在评论区讨论&#x1f48c; 元组…

系统学习Linux-Redis集群

目录 一、Redis主从复制 概念 作用 缺点 流程 二、Reids哨兵模式&#xff08;sentinel&#xff09; 概念 作用 缺点 结构 搭建 三、redis集群 概述 原理 架构细节 选举过程 实验环境模拟 一、Redis主从复制 概念 是指将一台Redis服务器的数据&#xff0c;复制…

linux查看进程绑定cpu核是否成功

运行top命令&#xff0c;可以看到进程以及进程cpu占有率 然后查看是否有P属性&#xff0c;这个属性用来查看进程绑定的cpu核 这里没有看到cpu占用核心 运行top后&#xff0c;按 " f "键进入top配置界面&#xff0c;然后按上下键选择P选项&#xff0c;此时可以看到P选…

(二)结构型模式:2、桥接模式(Bridge Pattern)(C++实现示例)

目录 1、桥接模式&#xff08;Bridge Pattern&#xff09;含义 2、桥接模式应用场景 3、桥接模式的UML图学习 4、C实现桥接模式的示例 1、桥接模式&#xff08;Bridge Pattern&#xff09;含义 桥接模式是一种结构型设计模式&#xff0c;它将抽象部分与实现部分分离&#…