三、Kafka生产者

news2024/10/7 12:18:56

目录

    • 3.1 生产者消息发送流程
      • 3.1.1 发送原理
    • 3.2 异步发送 API
    • 3.3 同步发送数据
    • 3.4 生产者分区
      • 3.4.1 kafka分区的好处
      • 3.4.2 生产者发送消息的分区策略
      • 3.4.3 自定义分区器
    • 3.5 生产者如何提高吞吐量
    • 3.6 数据可靠性

3.1 生产者消息发送流程

3.1.1 发送原理

3.2 异步发送 API

3.3 同步发送数据

3.4 生产者分区

3.4.1 kafka分区的好处

  • 便于合理使用存储资源,每个Partition在一个Broker上存储,可以把海量的数据按照分区切割成一块一块数据存储在多台Broker上。合理控制分区的任务,可以实现负载均衡的效果
  • 提高并行度,生产者可以以分区为单位发送数据;消费者可以以分区为单位进行消费数据。

3.4.2 生产者发送消息的分区策略

在这里插入图片描述

3.4.3 自定义分区器

1、需求:
例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区

2、定义类实现 Partitioner 接口,重写 partition()方法。

public class MyPartitioner implements Partitioner {
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {

        // 获取数据 atguigu  hello
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("atguigu")){
            partition = 0;
        }else {
            partition = 1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

3、使用分区器的方法,在生产者的配置中添加分区器参数

public class CustomProducerCallbackPartitions {

    public static void main(String[] args) throws InterruptedException {

        // 0 配置
        Properties properties = new Properties();

        // 连接集群 bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.239.11:9092");

        // 指定对应的key和value的序列化类型 key.serializer
//        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 关联自定义分区器
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.producer.MyPartitioner");

        // 1 创建kafka生产者对象
        // "" hello
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata metadata, Exception exception) {

                    if (exception == null) {
                        System.out.println("主题: " + metadata.topic() + " 分区: " + metadata.partition());
                    }
                }
            });

            Thread.sleep(2);
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述



3.5 生产者如何提高吞吐量

  • 分批次发送消息
  • 对生产者消息采用压缩

四个重要参数:
在这里插入图片描述

public class CustomProducerParameters {

    public static void main(String[] args) {

        // 0 配置
        Properties properties = new Properties();

        // 连接kafka集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.239.11:9092");

        // 序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 缓冲区大小
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,33554432);

        // 批次大小
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,16384);

        // linger.ms
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 1);

        // 压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"snappy");


        // 1 创建生产者
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2 发送数据
        for (int i = 0; i < 50; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3 关闭资源
        kafkaProducer.close();
    }
}

3.6 数据可靠性

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/904182.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SAP MM学习笔记26- SAP中 振替转记(转移过账)和 在库转送(库存转储)2- 品目Code振替转记 和 在库转送

SAP 中在库移动 不仅有入库&#xff08;GR&#xff09;&#xff0c;出库&#xff08;GI&#xff09;&#xff0c;也可以是单纯内部的转记或转送。 1&#xff0c;振替转记&#xff08;转移过账&#xff09; 2&#xff0c;在库转送&#xff08;库存转储&#xff09; 1&#xff…

代码部署到服务器

前言&#xff1a;相信看到这篇文章的小伙伴都或多或少有一些编程基础&#xff0c;懂得一些linux的基本命令了吧&#xff0c;本篇文章将带领大家服务器如何部署一个使用django框架开发的一个网站进行云服务器端的部署。 文章使用到的的工具 Python&#xff1a;一种编程语言&…

阿里云无影云电脑/云桌面收费价格表_使用申请方法

阿里云无影云电脑配置具体收费价格表&#xff0c;4核8G企业办公型云电脑可以免费使用3个月&#xff0c;无影云电脑地域不同价格不同&#xff0c;无影云电脑费用是由云桌面配置、云盘、互联网访问带宽、AD Connector 、桌面组共用桌面session 等费用组成&#xff0c;阿里云百科分…

Linux驱动开发(Day5)

思维导图&#xff1a; 不同设备号文件绑定&#xff1a;

springboot sl4j2 写入日志到mysql

问题描述 springboot初始化的时候&#xff0c;会先初始化日志然后再加载数据源如果用配置文件进行初始化&#xff0c;那么会出现数据源没有加载成功&#xff0c;导致空指针异常 报错排查如下&#xff1a; 搜索报错信息&#xff0c;OBjects.invoke is Null打断点发现。dataso…

简历本-专业在线简历制作下载网站 自带智能简历诊断

简历本是一个高效的在线简历制作与管理工具&#xff0c;为求职者提供专业简历模板&#xff0c;使用简历本5分钟就能制作一份优秀简历&#xff0c;可随时随地将简历下载为Word、PDF、图片格式文件&#xff0c;可在线发送或投递&#xff0c;不过使用需要注册登陆&#xff0c;提供…

【NLP】1、BERT | 双向 transformer 预训练语言模型

文章目录 一、背景二、方法 论文&#xff1a;BERT: Pre-training of Deep Bidirectional Transformers for Language Understanding 出处&#xff1a;Google 一、背景 在 BERT 之前的语言模型如 GPT 都是单向的模型&#xff0c;但 BERT 认为虽然单向&#xff08;从左到右预测…

【机器学习实战】朴素贝叶斯:过滤垃圾邮件

【机器学习实战】朴素贝叶斯&#xff1a;过滤垃圾邮件 0.收集数据 这里采用的数据集是《机器学习实战》提供的邮件文件&#xff0c;该文件有ham 和 spam 两个文件夹&#xff0c;每个文件夹中各有25条邮件&#xff0c;分别代表着 正常邮件 和 垃圾邮件。 这里需要注意的是需要…

Brain:背内侧前额叶/背侧前扣带皮层(dmPFC/dACC)的相关争议

摘要 背内侧前额叶皮层/背侧前扣带皮层(dmPFC/dACC)是一个功能存在诸多理论和争议的脑区。甚至其精确的解剖边界也饱受争议。在过去的几十年里&#xff0c;dmPFC/dACC与15种以上的认知过程相关联&#xff0c;这些过程有时看起来完全无关(例如&#xff0c;身体感知、认知冲突)。…

c++优先级队列的模拟实现代码

了解&#xff1a; 1.优先队列是一种容器适配器&#xff0c;根据严格的弱排序标准&#xff0c;它的第一个元素总是它所包含的元素中最大的。 2. 类似于堆&#xff0c;在堆中可以随时插入元素&#xff0c;并且只能检索最大堆元素(优先队列中位于顶部的元素)。 3. 优先队列被实现为…

开箱报告,Simulink Toolbox库模块使用指南(四)——S-Fuction模块

文章目录 前言 S-Fuction模块 电路方程模型 编写S函数 仿真验证 Tips 分析和应用 总结 前言 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff08;一&#xff09;——powergui模块》 见《开箱报告&#xff0c;Simulink Toolbox库模块使用指南&#xff…

三维重建 PyQt Python MRP 四视图(横断面,冠状面,矢状面,3D)

本文实现了 Python MPR 的 四视图&#xff0c;横断面&#xff0c;冠状面&#xff0c;矢状面&#xff0c;3D MPR(multi-planner reformation)也称多平面重建&#xff0c;多重面重建是将扫描范围内所有的轴位图像叠加起来再对某些标线标定的重组线所指定的组织进行冠状、矢状位、…

(win系统)MSVCP100/110/120/140.dll丢失 - 解决方案

首先我们来介绍一下什么是dll dll简称动态链接库它可以节省存储空间&#xff1a;由于DLL可以被多个程序共享&#xff0c;因此可以减少磁盘空间的使用。也能提高代码重用率&#xff1a;通过使用DLL,我们可以将一些常用的功能封装成独立的模块&#xff0c;从而提高代码的重用率。…

库克很高兴中国成全球最大的iPhone市场,人民日报的呼吁该重视了

日前苹果CEO库克在微博发文&#xff0c;庆祝苹果公司进入中国市场30周年&#xff0c;对员工、顾客和合作伙伴感谢&#xff0c;库克如此高兴的原因在于今年二季度iPhone在中国市场的销量超越美国市场&#xff0c;由此中国市场首次成为苹果的最大收入来源市场。 中国超越美国成为…

【C++_primary】类和对象 —— 类

类 ~ ~ ~ 一、面向过程和面向对象初步认识a. 面向过程编程b. 面向对象编程例如&#xff1a;无人机送货系统1、面向过程编程方式2、面向对象编程方式 二、类的引入1、定义类的关键字2、栈的手动实现a. C语言实现栈b. C实现栈 三、类的定义类的两种定义方式&#xff1a; 四、类的…

合宙Air724UG LuatOS-Air LVGL API--对象

对象 概念 在 LVGL 中&#xff0c;用户界面的基本构建块是对象。例如&#xff0c;按钮&#xff0c;标签&#xff0c;图像&#xff0c;列表&#xff0c;图表或文本区域。 属性 基本属性 所有对象类型都共享一些基本属性&#xff1a; Position (位置) Size (尺寸) Parent (父母…

ATTCK实战系列——红队实战(一)

目录 搭建环境问题 靶场环境 web 渗透 登录 phpmyadmin 应用 探测版本 写日志获得 webshell 写入哥斯拉 webshell 上线到 msf 内网信息收集 主机发现 流量转发 端口扫描 开启 socks 代理 服务探测 getshell 内网主机 浏览器配置 socks 代理 21 ftp 6002/700…

vue3 基础知识

vue3创建一个项目 PS D:\code> npm init vuelatestVue.js - The Progressive JavaScript Framework√ Add TypeScript? ... No / Yes √ Add JSX Support? ... No / Yes √ Add Vue Router for Single Page Application development? ... No / Yes √ Add Pinia for sta…

五、Spring MVC 接收请求参数以及数据回显、乱码问题

文章目录 一、Spring MVC 接收请求参数二、Spring MVC 数据回显三、SpringMVC 返回中文乱码问题 一、Spring MVC 接收请求参数 客户端或者前端通过 URL 请求传递过来的参数&#xff0c;在控制器中如何接收&#xff1f; 1、当参数和 Controller 中的方法参数一致时&#xff0c;无…

C语言:分支语句和循环语句(超详解)

目录 ​编辑 什么是语句&#xff1f; 分支语句&#xff08;选择结构&#xff09; if语句&#xff1a; 应该注意的是&#xff1a; switch语句&#xff1a; 运用练习&#xff1a; 循环语句 while循环&#xff1a; for循环&#xff1a; break和continue在for循环中&…