kafka使用

news2024/11/13 12:38:54

异步发送数据

package com.shf.kafka.producer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;
import java.util.Properties;
/**
 * 异步发送
 */
public class CustomProducer {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.1.4:9092");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

异步回调

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

/**
 * 返回消息的信息
 */
public class CustomProducerCallback {
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.120.20:9092,192.168.120.20:9093,192.168.120.20:9094");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i), new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    if (e == null) {
                        System.out.println("success");
                        System.out.println("主体:"+recordMetadata.topic());
                        System.out.println("分区:"+recordMetadata.partition());
                    } else {
                        System.out.println("fail");
                    }
                }
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }

}

看原理图,返回参数就是RecordAccumulator中的
在这里插入图片描述

同步发送

import lombok.SneakyThrows;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class CustomProducerSync {
    @SneakyThrows
    public static void main(String[] args) {
        // 1. 创建 kafka 生产者的配置对象
        Properties properties = new Properties();

        // 2. 给 kafka 配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.120.20:9092,192.168.120.20:9093,192.168.120.20:9094");

        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);

        // 3. 创建 kafka 生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用 send 方法,发送消息
        for (int i = 0; i < 5; i++) {
            kafkaProducer.send(new ProducerRecord<>("first", "atguigu " + i)).get();
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }

}

原理图如下,保证生产者100%发送消息
在这里插入图片描述

分区情况

在这里插入图片描述
可以通过如果设置了key,那么分区则会通过对key进行取模得出对应的分区,自定义分区

public class MyPartitioner implements Partitioner {

    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        String msgValues = value.toString();

        int partition;

        if (msgValues.contains("shf")) {
            partition = 0;
        } else {
            partition = 1;
        }
        return partition;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

docker 创建kafka

https://www.cnblogs.com/JcHome/p/16475990.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2095474.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法基础-双指针算法

最长连续不重复子序列 双指针[j, i]维护的是以a[i]结尾的最长连续不重复子序列[j, i - 1]是前一步得到的最长连续不重复子序列&#xff0c;所以如果[j, i]中有重复元素&#xff0c;一定是a[i]&#xff0c;所以[j, i - 1]中一定有一个数字与a[i]重复&#xff0c;因此右移j直到a[…

【AI学习笔记】AIGC,AI绘画 ComfyUI+ComfyUI Manager安装

【AI学习笔记】ComfyUIComfyUI Manager安装 最近在面向BOSS直聘学习ComfyUI的使用&#xff0c;但是不出意外&#xff0c;因为学习者们迥异的电脑配置以及杂乱的AI软件工具包互相纠缠&#xff0c;跟人工智能相关的环境安装多少都会遇到点教程预料不到的BUG。 推荐入门教程&…

基于SpringBoot的智能医院管理系统

&#x1f4a5;&#x1f4a5;源码和论文下载&#x1f4a5;&#x1f4a5;&#xff1a;基于SpringBoot的智能医院管理系统-源码论文报告数据库文件.rar 1. 系统介绍 随着计算机科学的迅猛发展和互联网技术的不断推进&#xff0c;人们的生活方式发生了巨大的变化&#xff0c;同时也…

华为云征文 | Tomcat保姆级安装教程

简介 华为云Flexus云服务是新一代开箱即用、体验跃级、面向中小企业和开发者打造的高品价比云服务产品。Flexus云服务器X实例是华为云Flexus云服务的一个产品。 Flexus云服务器X实例是新一代面向中小企业和开发者打造的柔性算力云服务器&#xff0c;可智能感知业务负载&#…

借鉴腾讯系统架构从小到大的过程 - 如何做好一个系统设计?不限于(慧哥)慧知开源充电桩平台

推荐一套企业级开源充电桩平台&#xff1a;完整代码包含多租户、硬件模拟器、多运营商、多小程序&#xff0c;汽车 电动自行车、云快充协议&#xff1b;——(慧哥)慧知开源充电桩平台&#xff1b;https://liwenhui.blog.csdn.net/article/details/134773779?spm1001.2014.3001…

华为云征文|Flexus X实例性能测评

在数字化转型时代&#xff0c;云服务器成为企业 IT 基础设施的核心&#xff0c;其性能直接影响业务运行效率和用户体验。 面对众多云服务商提供的多样配置&#xff0c;如何选择合适的云服务器就变得尤为重要。 云服务器的性能测试对于识别潜在性能瓶颈&#xff0c;确保在高并…

安装python软件

系统是32位还是64位 “此电脑"或者"我的电脑”&#xff0c;鼠标右键——属性&#xff0c;出现如下图查看电脑系统类型&#xff08;图中显示电脑系统类型是64位系统&#xff0c;安装Python则选择其名含有"adm64"字样的文件&#xff09;: 软件安装地址 全…

AtCoder ABC 369 C 题 题解

题目传送门 C - Count Arithmetic Subarrays (atcoder.jp) 题解&#xff1a; 本题可以先预处理好 与 之间的差值。首先每个数都是一个等差数列&#xff0c;接着&#xff0c;每两个数也是一个等差数列&#xff0c;然后可以看一个数字持续了几位&#xff0c;如果持续了位&am…

layui中 子页面获取父页面的数据

父页面中 window.autosend function (obj) {//获取对应行的数据&#xff0c;var datafather obj.data;// console.log(data)layer.open({type: 2,maxmin: true,title: 选择发送时间,shade: 0.1,// area: screen(),area: [600px, 400px],content: autosend,success: function(…

Linux 背景、命令

一、嵌入式、Linux背景 1、嵌入式&#xff1a; 硬件与软件相结合 定制、为硬件设计相关代码来进行操作&#xff0c;代码测试&#xff0c;烧进板子&#xff0c;通过语音、图像、按钮等操作方式来调用。 2、操作系统种类&#xff1a; Dos&#xff0c;Windows&#xff0c;Uni…

【小白教程(无伤速通)】Visual Studio中Libtorch安装与配置

1. Libtorch下载 Download here (Release version): https://download.pytorch.org/libtorch/cpu/libtorch-win-shared-with-deps-1.8.0%2Bcpu.zipDownload here (Debug version): https://download.pytorch.org/libtorch/cpu/libtorch-win-shared-with-deps-debug-1.8.0%2B…

嵌入式Linux:信号分类

目录 1、不可靠信号与可靠信号 1.1、不可靠信号 1.2、可靠信号 2、实时信号和非实时信号 2.1、非实时信号 2.2、实时信号 在Linux系统中&#xff0c;信号可以从两个不同的角度进行分类&#xff1a;一是从可靠性方面&#xff0c;将信号分为可靠信号与不可靠信号&#xff1…

小柴带你学AutoSar系列三、标准和规范篇(3)ModeManagement

目录 ModeManagementGuide 2 Overall mechanisms and concepts 2.1 Declaration of modes 2.2 Mode managers and mode users 2.3 Modes in the RTE 2.4 Modes in the Basic Software Scheduler 2.5 Communication of modes 3 Configuration of the Basic Software Mod…

系统思考—盲点突破

最‮发近‬现服务的一些‮业企‬明明‮经已‬投入了大‮资量‬源在‮化优‬产品‮服和‬务上&#xff0c;但‮觉总‬得缺少一些‮键关‬点来‮正真‬实现突破&#xff1f;团‮每队‬天都在忙碌&#xff0c;但‮绩业‬增长却‮来越‬越缓慢&#xff0c;问‮总题‬是层出不穷&…

华为云征文 | Flexus X与宝塔面板的完美结合,让云管理更轻松

需要了解 本文章主要讲述在华为云Flexus X实例 上通过命令行的方式安装宝塔面板&#xff0c;搭建项目基础软件&#xff0c;实现服务器可视化管理&#xff0c;一键部署业务代码&#xff0c;简化操作流程。选择合适的云服务器&#xff1a; 本文采用的是 华为云服务器 Flexus X 实…

22. Lammps命令学习-7之read_restart

来源&#xff1a; “码农不会写诗”公众号 链接&#xff1a;Lammps命令学习-6之read_data read_restart file 从restart文件中读取先前保存的系统配置从而可以继续先前的模拟。   https://docs.lammps.org/read_restart.html Syntax read_restart fileDescription 从restart…

Unity3D 资源引用列表

Unity3D 窗口绘制资源引用列表。 资源引用列表 我们可以在自定义窗口上绘制一个资源引用列表&#xff0c;筛选资源&#xff0c;点击引用&#xff0c;快速定位到资源文件夹的某个资源。 关于自定义窗口的基本实现&#xff0c;可以参考之前的文章《Unity3D 自定义窗口》。 获…

【函数模板】参数类型

一、默认参数 1.默认参数的调用 函数模板的参数类型可以指定一个默认值&#xff0c;在不传入参数类型的时候将使用默认参数类型来实例化函数模板。 例如&#xff1a; template<typename T, typename R int> auto add(T a, R b 0) -> decltype(a b) {std::cout &…

HTML5CSS3--CSS3的各种用法

1.background-origin 背景图起点&#xff1a; padding-box背景图像相对于内边距框来定位。border-box背景图像相对于边框盒来定位。content-box背景图像相对于内容框来定位。 2.background-clip 背景图裁剪&#xff1a; border-box默认值。背景绘制在边框方框内&#xff0…

AcWing 282. 石子合并

必看的视频讲解↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓↓ 【E28【模板】区间DP 石子合并——信息学竞赛算法】 合并过程总开销等于红色数字总和&#xff0c;可以理解为花费的总体力&#xff01; f数组的含义是f【i】【j】是从第i堆石子开始到第…