【大数据之Kafka】三、Kafka生产者之消息发送流程及同步异步发送API

news2025/1/22 16:59:52

  将外部传送给过来的数据发送到kafka集群。

1 发送原理

(1)创建main()线程,创建producer对象,调用send方法,经过拦截器(可选)、序列化器、分区器。

(2)分区器将数据发送到分区中,每个分区创建一个队列(分区是在内存中完成的),内存总大小为32M,每个批次的大小为16K。

(3)sender线程将缓冲队列中的数据读取出来发往Kafka集群,根据batch.size和linger.ms拉取数据(即每批次的数据满了之后或者设置的时间到了之后拉取数据)。

(4)sender线程拉取数据,以每个节点为一组,当第一个请求数据发送到broker1中,broker没有及时应答,还是能发送第二个请求,最多有5个请求都没有收到应答就不会再继续发送请求。

(5)selector打通输入流和输出流。

(6)链路接通后发送数据。

(7)Kafka集群收到数据后根据副本机制进行副本同步。

(8)Kafka集群收到数据后根据应答机制进行应答。

(9)selector根据Kafka集群反馈的消息进行判断。

(10)如果成功则删掉该请求同时在缓冲队列里清理掉对应的每一个分区的数据;如果失败则进行重试,重新发送请求,知道成功为止。

在这里插入图片描述

2 生产者重要参数列表

在这里插入图片描述

3 异步发送API

3.1 普通异步发送

(1)需求:创建Kafka 生产者,采用异步的方式发送到 Kafka Broker
(2)分析:异步发送即将外部的数据发送到缓冲队列里(不管缓冲队列中的数据有没有发送到Kafka集群)。

步骤:
(1)创建kafka工程,在pom.xml中导入依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>3.0.0</version>
    </dependency>
</dependencies>

(2)创建类:com.astudy.kafka.producer.CustomProducer

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //2.调用 send 方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","test"+i));
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

(3)测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述

3.2 带回调函数的异步发送

分析:
  回调函数会在 producer 收到 ack 时调用,为异步调用,该方法有两个参数,分别是元数据信息(RecordMetadata)和异常信息(Exception),如果 Exception 为 null,说明消息发送成功,如果Exception 不为 null,说明消息发送失败。

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) throws InterruptedException {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //2.调用 send 方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "test" + i), new Callback() {
                // 该方法在 Producer 收到 ack 时调用,为异步调用
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        // 没有异常,输出信息到控制台
                        System.out.println("topic:" + recordMetadata.topic() + "  partition:" + recordMetadata.partition());
                    }else {
                        // 出现异常打印
                        e.printStackTrace();
                    }

                }
            });
            // 延迟一会会看到数据发往不同分区
            Thread.sleep(2);
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述
在这里插入图片描述

4 同步发送API

分析:只需在异步发送的基础上,再调用一下 get()方法即可。

package com.study.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        //0.创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        //给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // key,value 序列化(必须):key.serializer,value.serializer
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());


        //1.创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        //2.调用 send 方法,发送消息
        for (int i = 0; i < 3; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","test"+i)).get();
        }

        //3.关闭资源
        kafkaProducer.close();
    }
}

测试:
在hadoop102上开启Kafka消费者:

 kafka-console-consumer.sh --bootstrap-server hadoop102:9092,hadoop103:9092 --topic first

在 IDEA 中执行代码,观察 hadoop102 控制台中是否接收到消息:
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/883530.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

java面试题(16):Mysql一致性视图是啥时候建立的

1 演示错误案例 先给大家来一个错误演示。 我们打开两个会话窗口&#xff0c;默认情况下隔离级别是可重复读&#xff0c;我们来看下&#xff1a; 首先在 A 会话中查看当前 user 表&#xff0c;查看完成后开启事务&#xff1a; 可以看到id3的数据sex是男。 接下来在 B 会话中…

场景入门12----关卡切换和流送

在游戏时往往需要切换关卡&#xff0c;有两种方法&#xff0c;关卡切换和推送。关卡切换的方法是进入了一个新的地图&#xff0c;这时人物的值都复原了&#xff0c;一般都是在一个地图上推送关卡。 关卡切换 首先&#xff0c;文件新建一个新关卡&#xff0c;命名。找到之前制…

地址解析协议-ARP

ARP协议 无论网络层使用何种协议&#xff0c;在实际网络的链路上传输数据帧时&#xff0c;最终必须使用硬件地址 地址解析协议&#xff08;Address Resolution Protocol&#xff0c;ARP&#xff09;&#xff1a;完成IP地址到MAC地址的映射&#xff0c;每个主机都有一个ARP高速缓…

C语言刷题训练DAY.4

1.计算体重指数 解题思路&#xff1a; 这里我们只需要按照他的要求写出公式。 注意&#xff1a;身高要换算成米&#xff0c;打印的结构是个浮点数&#xff0c;打印的格式要相对应 解题代码&#xff1a; #include<stdio.h> int main() {int weight 0;int height 0;…

Python自动化小技巧18——自动化资产月报(word设置字体表格样式,查找替换文字)

案例背景 每月都要写各种月报&#xff0c;经营管理月报&#xff0c;资产月报.....这些报告文字目标都是高度相似的&#xff0c;只是需要替换为每个月的实际数据就行&#xff0c;如下&#xff1a; (打码是怕信息泄露.....) 可以看到&#xff0c;这个报告的都是高度模板化&…

浏览器 - 事件循环机制详解

目录 1&#xff0c;浏览器进程模型进程线程浏览器的进程和线程1&#xff0c;浏览器进程2&#xff0c;网络进程3&#xff0c;渲染进程 2&#xff0c;渲染主线程事件循环异步同步 JS 为什么会阻塞渲染任务优先级 3&#xff0c;常见面试题1&#xff0c;如何理解 js 的异步2&#x…

时序预测 | MATLAB实现基于KNN K近邻的时间序列预测-递归预测未来(多指标评价)

时序预测 | MATLAB实现基于KNN K近邻的时间序列预测-递归预测未来(多指标评价) 目录 时序预测 | MATLAB实现基于KNN K近邻的时间序列预测-递归预测未来(多指标评价)预测结果基本介绍程序设计参考资料 预测结果 基本介绍 基于KNN K近邻的时间序列预测-递归预测未来(多指标评价) …

数据集成与流动优化:解锁企业数据的无限潜力

在当今数字化时代&#xff0c;企业拥有海量数据&#xff0c;这些数据散落在不同部门、系统和平台之间&#xff0c;形成了所谓的“数据孤岛”。要想实现数据的最大化价值&#xff0c;就必须解决数据集成与流动的挑战。本文将深入探讨数据集成与流动优化的重要性&#xff0c;以及…

833-字符串中查找与替换

题目描述&#xff1a; 你会得到一个字符串 s (索引从 0 开始)&#xff0c;你必须对它执行 k 个替换操作。替换操作以三个长度均为 k 的并行数组给出&#xff1a;indices, sources, targets。 要完成第 i 个替换操作: 检查 子字符串 sources[i] 是否出现在 原字符串 s 的索…

POSTGRESQL 关于安装中自动启动的问题 详解

开头还是介绍一下群&#xff0c;如果感兴趣Polardb ,mongodb ,MySQL ,Postgresql ,redis &#xff0c;SQL SERVER ,ORACLE,Oceanbase 等有问题&#xff0c;有需求都可以加群群内有各大数据库行业大咖&#xff0c;CTO&#xff0c;可以解决你的问题。加群请加 liuaustin3微信号 &…

使用SpringBoot + Thymeleaf 完成简单的用户登录

&#x1f600;前言 本篇博文是关于Thymeleaf 的综合案例&#xff0c; 使用SpringBoot Thymeleaf 完成简单的用户登录-列表功能&#xff0c;希望你能够喜欢&#x1f60a; &#x1f3e0;个人主页&#xff1a;晨犀主页 &#x1f9d1;个人简介&#xff1a;大家好&#xff0c;我是晨…

数据结构--最短路径 Floyd算法

数据结构–最短路径 Floyd算法 F l o y d 算法&#xff1a;求出每⼀对顶点之间的最短路径 \color{red}Floyd算法&#xff1a;求出每⼀对顶点之间的最短路径 Floyd算法&#xff1a;求出每⼀对顶点之间的最短路径 使⽤动态规划思想&#xff0c;将问题的求解分为多个阶段 对于n个顶…

re学习(31)BUUCTF-xx(多层加密)

参考文章&#xff1a;【BUUCTF逆向 [2019红帽杯]xx】_nb_What_DG的博客-CSDN博客 re学习笔记&#xff08;26&#xff09;BUUCTF-re-[2019红帽杯]xx_Forgo7ten的博客-CSDN博客 还有B站 水番正文 IDA64位载入 shiftF12查看字符串 交叉引用找到关键代码 使用findcrypt插件找到…

H13-922题库 HCIP-GaussDB-OLAP V1.5

**H13-922 V1.5 GaussDB(DWS) OLAP题库 华为认证GaussDB OLAP数据库高级工程师HCIP-GaussDB-OLAP V1.0自2019年10月18日起&#xff0c;正式在中国区发布。当前版本V1.5 考试前提&#xff1a; 掌握基本的数据库基础知识、掌握数据仓库运维的基础知识、掌握基本Linux运维知识、…

互联网发展历程:速度与效率,交换机的登场

互联网的演进就像一场追求速度与效率的竞赛&#xff0c;每一次的技术升级都为我们带来更快、更高效的网络体验。然而&#xff0c;在网络的初期阶段&#xff0c;人们面临着数据传输速度不够快的问题。一项关键的技术应运而生&#xff0c;那就是“交换机”。 速度不足的困境&…

SpringBoot + Mybatis多数据源

一、配置文件 spring: # datasource: # username: root # password: 123456 # url: jdbc:mysql://127.0.0.1:3306/jun01?characterEncodingutf-8&serverTimezoneUTC # driver-class-name: com.mysql.cj.jdbc.Driverdatasource:# 数据源1onedata:jdbc-url: j…

希尔排序【Java算法】

文章目录 1. 概念2. 思路3. 代码实现 1. 概念 希尔排序也是一种插入排序&#xff0c;它是简单插入排序经过改进之后的一个更高效的版本&#xff0c;也称为缩小增量排序。希尔排序在数组中采用跳跃式分组的策略&#xff0c;通过某个增量将数组元素划分为若干组&#xff0c;然后分…

linux学习(自写shell)[11]

打印出提示信息获取用户键盘输入 cmd_line[NUM];用来保存完整的命令行 #include <stdio.h> #include <stdlib.h> #include <string.h> #include <unistd.h> #include <sys/wait.h>#define NUM 1024 char cmd_line[NUM]; //shell int main() {wh…

AI Chat 设计模式:15. 桥接模式

本文是该系列的第十五篇&#xff0c;采用问答式的方式展开&#xff0c;问题由我提出&#xff0c;答案由 Chat AI 作出&#xff0c;灰色背景的文字则主要是我的一些思考和补充。 问题列表 Q.1 如果你是第一次接触桥接模式&#xff0c;那么你会有哪些疑问呢&#xff1f;A.1Q.2 什…

FreeRTOS(独立看门狗监测任务执行与低功耗Tickless模式)

资料来源于硬件家园&#xff1a;资料汇总 - FreeRTOS实时操作系统课程(多任务管理) 目录 一、独立看门狗介绍 二、看门狗监测多任务执行思路 1、监测目标 2、监测方案 3、应用注意事项 三、看门狗监测多任务编程 1、STM32cubeMX配置 2、代码编写 四、低功耗Tickless模…