Kafka第二章:生产者案例

news2025/1/2 3:02:01

系列文章目录

Kafka第一章:环境搭建
Kafka第二章:生产者案例


文章目录

  • 系列文章目录
  • 前言
  • 一、创建项目
    • 1.创建包
    • 2.添加依赖
  • 二、编写代码
    • 1.普通异步发送
    • 2.同步发送
  • 三.生产者发送消息的分区策略
    • 1.指定分区
    • 2.自定义分区
  • 总结


前言

上次完成了Kafka的环境搭建,这次来完成一些有关生产者的项目。


一、创建项目

1.创建包

com.atguigu.kafka.producer
在这里插入图片描述

2.添加依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>3.4.0</version>
        </dependency>
    </dependencies>

二、编写代码

1.普通异步发送

需求:创建 Kafka 生产者,采用异步的方式发送到 Kafka Broker
注:不带回调
CustomProducer.java

package com.atguigu.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;


import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i));
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

建议先用命令行启动环境,然后发送几条数据,确认集群可以正常使用。
在这里插入图片描述
运行文件
在这里插入图片描述
加回调函数
CustomProducerCallback.java

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallback {
    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题: "+recordMetadata.topic()+"分区: "+recordMetadata.partition());
                    }

                }
            });
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述

2.同步发送

只需在异步发送的基础上,再调用一下 get()方法即可。
CustomProducerSync.java

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 0 配置
        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu"+i)).get();
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述
由于数据量小,结果和异步发送没区别

三.生产者发送消息的分区策略

1.指定分区

CustomProducerCallbackPatitions.java

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPatitions {
    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first",1,"", "atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题: "+recordMetadata.topic()+"分区: "+recordMetadata.partition());
                    }

                }
            });
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述
在这里插入图片描述

2.自定义分区

例如我们实现一个分区器实现,发送过来的数据中如果包含 atguigu,就发往 0 号分区,不包含 atguigu,就发往 1 号分区。
MyPartitioner.java

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String s, Object o, byte[] bytes, Object o1, byte[] bytes1, Cluster cluster) {

        //获取数据 atguigu hello
        String msgValues = o1.toString();

        int partition;

        if (msgValues.contains("atguigu")) {
            partition=0;
        }else {
            partition=1;
        }

        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> map) {

    }
}

将自定义分区代码,与之前的代码相关联
在CustomProducerCallbackPatitions.java中简单修改
新添加一行代码
在这里插入图片描述
然后将指定分区的数字和key都取消
在这里插入图片描述

修改后的代码

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerCallbackPatitions {
    public static void main(String[] args) {
        // 0 配置
        Properties properties = new Properties();

        // 连接集群
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092");

        // 指定对应的key和value的序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        //关联自定义分区
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG,"com.atguigu.kafka.producer.MyPartitioner");

        // 1.创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<>(properties);

        // 2.发送数据
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first","atguigu" + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("主题: "+recordMetadata.topic()+"分区: "+recordMetadata.partition());
                    }

                }
            });
        }

        // 3.关闭资源
        kafkaProducer.close();
    }
}

在这里插入图片描述
在这里插入图片描述


总结

Kafka第二次博客就暂时到这里,后边其实还有一些内容,但我觉得,那些对于新手来说,有些深入了,以后在写吧。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/337915.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

RabbitMQ-Exchanges交换机

一、介绍 RabbitMQ消息传递模型的核心思想是&#xff1a;生产者生产的消息从不会直接发送到队列。实际上&#xff0c;通常生产者甚至不知道这些消息传递到了哪些队列中。相反&#xff0c;生产者只能将消息发送到交换机&#xff0c;交换机工作的内容非常简单&#xff0c;一方…

七、Java框架之MyBatisPlus

黑马课程 文章目录1. MyBatisPlus入门1.1 MyBatisPlus入门案例步骤1&#xff1a;创建spring boot工程步骤2&#xff1a;配置application.yml步骤3&#xff1a;创建数据库表&#xff08;重点&#xff09;步骤4&#xff1a;编写dao层步骤5&#xff1a;测试1.2 标准数据层开发标准…

CSDN每日一练:一维数组的最大子数组和

题目名称&#xff1a;一维数组的最大子数组和 时间限制&#xff1a;1000ms内存限制&#xff1a;256M 题目描述 下面是一个一维数组的 “最大子数组的和” 的动态规划的解法 # include <iostream> # include <stdio.h> # include <string.h>int MaxSum(int* a…

多传感器融合定位十五-多传感器时空标定(综述)

多传感器融合定位十五-多传感器时空标定1. 多传感器标定简介1.1 标定内容及方法1.2 讲解思路2. 内参标定2.1 雷达内参标定2.2 IMU内参标定2.3 编码器内参标定2.4 相机内参标定3. 外参标定3.1 雷达和相机外参标定3.2 多雷达外参标定3.3 手眼标定3.4 融合中标定3.5 总结4. 时间标…

基于live555源码的rtsp服务器

下载live555源码 http://www.live555.com/liveMedia/public/ 在Linux系统的自定义目录下输入&#xff0c;下载源码&#xff1a; wget http://www.live555.com/liveMedia/public/live.2023.01.19.tar.gz解压源码&#xff1a; tar -xvf live.2023.01.19.tar.gz当前目录下运行&…

前端卷算法系列(一)

前端卷算法系列&#xff08;一&#xff09; 两数之和 给定一个整数数组 nums 和一个整数目标值 target&#xff0c;请你在该数组中找出 和为目标值 target 的那 两个 整数&#xff0c;并返回它们的数组下标。 你可以假设每种输入只会对应一个答案。但是&#xff0c;数组中同…

无重复字符的最长子串-力扣3-java

一、题目描述给定一个字符串 s &#xff0c;请你找出其中不含有重复字符的 最长子串 的长度。示例 1:输入: s "abcabcbb"输出: 3 解释: 因为无重复字符的最长子串是 "abc"&#xff0c;所以其长度为 3。示例 2:输入: s "bbbbb"输出: 1解释: 因为…

Springboot Web开发

文章目录一. 静态资源访问1. 配置静态资源访问前缀2. 修改默认静态资源存放目录3. Webjars4. 欢迎页支持5. 自定义Favicon二. 请求处理1. 路径变量2. 请求头处理3. 查询字符串处理4. 获取Cookie的值5. 获取请求体的值6. 获取请求域中的数据7. 矩阵变量一. 静态资源访问 只要静…

ChatGPT全球爆火,究竟有何特别之处?

​ 新年复工不过半月&#xff0c;职场人似乎就受到了来自AI的“威胁”&#xff0c;关于人工智能聊天系统ChatGPT的热搜频频上榜。 ChatGPT不仅拟人&#xff0c;更类人&#xff0c;甚至被认为是职场人的“敌人”。 那么&#xff0c;ChatGPT能替代哪些职业&#xff1f; 它自己的回…

STP协议基础

STP协议技术来源二层环路及危害二层交换机网络的冗余性与环路人为错误导致的二层环路二层环路带来的问题STP生成树协议STP概述STP基本概念桥ID根桥COSTRPC&#xff08;Root Path Cost&#xff09;根路径开销PORT ID端口IDBPDU桥协议数据单元STP的计算过程&#xff08;1&#xf…

VS中安装gismo库

文章目录前言一、下载安装paraview直接下载压缩包安装就可以了解压后按步骤安装即可二、gismo库的安装gismo库网址第一种方法&#xff1a;第二种方法第三种方法&#xff1a;用Cmake软件直接安装首先下载cmake软件[网址](https://cmake.org/download/)安装gismo库三、gismo库的使…

ChatGPT闭包解答

怎么理解javaScript闭包 JavaScript 闭包是一种特殊的对象&#xff0c;它包含了函数及其相关作用域中的变量。它允许函数访问并保存其外部作用域中的变量&#xff0c;即使该函数已经离开了其作用域。 闭包的一个常见应用场景是封装私有变量。例如&#xff0c;在一个对象的方法内…

Android IO 框架 Okio 的实现原理,如何检测超时?

本文已收录到 AndroidFamily&#xff0c;技术和职场问题&#xff0c;请关注公众号 [彭旭锐] 提问。 前言 大家好&#xff0c;我是小彭。 在上一篇文章里&#xff0c;我们聊到了 Square 开源的 I/O 框架 Okio 的三个优势&#xff1a;精简且全面的 API、基于共享的缓冲区设计以…

电机参数中力矩单位kgf.cm,Nm,mNm表示的含义

力的基本知识 质量和力的比例系数 质量和重力的关系有一个重力系数&#xff1a;g≈9.8 N/kg≈10,后面看到的1kgf就相当于1kg物体的力也就是10N 杠杆原理 对于同一个支点&#xff0c;在不考虑杠杆的重量的情况下&#xff0c;实现同样的作用效果&#xff0c;距离支点越近&…

vscode搭建python Django网站开发环境

这里使用pip安装的方式&#xff0c;打开命令行&#xff0c;输入执行&#xff1a; pip install django2.2这里选择安装2.2版本是因为是新的lts版本&#xff0c;长期支持稳定版。 接下来再安装pillow&#xff0c;Django底层一部分是基于pillow进行的。 pip install pillowpylint…

SpringBoot + Disruptor实现高并发内存消息队列

1. 简介 Disruptor是英国外汇交易公司LMAX开发的一个高性能队列&#xff0c;研发的初衷是解决内存队列的延迟问题(在性能测试中发现竟然与I/O操作处于同样的数量级)。基于Disruptor开发的系统单线程能支撑每秒600万订单&#xff0c;2010年在QCon演讲后&#xff0c;获得了业界关…

【思维模型】概率思维的价值:找到你的人生算法,实现阶级跃迁!

把同样公平的机会放在放在很多人面前,不同的人生算法,会得到迥然不同的结果。 概率思维是什么? 【ChatGPT】概率思维是一种通过使用数学模型来思考和评估不确定性事件的方法。它通过计算不同可能性的概率来预测事件的结果,并评估风险和机会。 概率思维的价值在于它可以帮…

ChatYuan元语AI: 类似ChatGPT功能型对话大模型

ChatYuan元语AI 元语智能开发团队训练了一个类似ChatGPT的功能型对话大模型ChatYuan. 类似ChatGPT模型, 中文开源版,功能型对话大语言模型. 功能有:支持训练端到端文本生成文本生成情感分析句子相似度零样本分类命名实体识别翻译自然语言推理问答文本纠错文本摘要FAQ问答文本…

终于体验了一下ChatGPT

再次尝试 隔了一天&#xff0c;今天&#xff08;2023-2-11&#xff09;再试一下。真的是一下。。。&#xff08;如果没有境外环境的&#xff0c;大家还是在网上找个共享账号试一下吧。网上有人分享的&#xff0c;大家细找一下就可以&#xff0c;我就不在这里发出来了。。。&…

微信小程序 Springboot校运会高校运动会管理系统

3.1小程序端 小程序登录页面&#xff0c;用户也可以在此页面进行注册并且登录等。 登录成功后可以在我的个人中心查看自己的个人信息或者修改信息等 在广播信息中我们可以查看校运会发布的一些信息情况。 在首页我们可以看到校运会具体有什么项目运动。 在查看具体有什么活动我…