Kafka - 异步/同步发送API

news2025/1/11 3:00:53

文章目录

  • 异步发送
    • 普通异步发送
      • 异步发送流程
      • Code
    • 带回调函数的异步发送
      • 带回调函数的异步发送流程
      • Code
  • 同步发送API

在这里插入图片描述


异步发送

普通异步发送

需求:创建Kafka生产者,采用异步的方式发送到Kafka broker

异步发送流程

在这里插入图片描述

Code

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.0</version>
</dependency>

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducer {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            RecordMetadata art = kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-" + i)).get();
            System.out.println(art.offset());
            System.out.println("over - " + i);
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

输出

31
over - 0
32
over - 1
33
over - 2
34
over - 3
35
over - 4
36
over - 5
37
over - 6
38
over - 7
39
over - 8
40
over - 9

忽略我这个offset … 我都发了好多次了…

看控制台的吧

在这里插入图片描述


带回调函数的异步发送

回调函数callback()会在producer收到ack时调用,为异步调用。

该方法有两个参数分别是RecordMetadata(元数据信息)和Exception(异常信息)。

  • 如果Exception为null,说明消息发送成功,
  • 如果Exception不为null,说明消息发送失败

带回调函数的异步发送流程

在这里插入图片描述

注意:消息发送失败会自动重试,不需要我们在回调函数中手动重试。

Code

package com.artisan.pc;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerWithCallBack {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 添加回调
            // 该方法在Producer收到ack时调用,为异步调用
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-callback-" + i), (recordMetadata, e) -> {
                // 没有异常,输出信息到控制台
                System.out.println("主题" + recordMetadata.topic() + ", 分区:" + recordMetadata.partition() + ", 偏移量:" + recordMetadata.offset());
            });
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

在这里插入图片描述

控制台

在这里插入图片描述


同步发送API

同步发送的意思就是,一条消息发送之后,会阻塞当前线程,直至返回ack。
由于send方法返回的是一个Future对象,根据Futrue对象的特点,我们也可以实现同步发送的效果,只需在调用Future对象的get方发即可

在这里插入图片描述

package com.artisan.pc;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @author 小工匠
 * @version 1.0
 * @mark: show me the code , change the world
 */
public class CustomProducerSync {

    public static void main(String[] args) throws ExecutionException, InterruptedException {

        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.126.170:9092");

        // key,value序列化
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // 3. 创建kafka生产者对象
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer<String, String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 通过Future接口的get实现同步阻塞
            kafkaProducer.send(new ProducerRecord<>("art", "kafka-msg-get-" + i)).get() ;
        }

        // 5. 关闭资源
        kafkaProducer.close();

    }

}
    

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1137811.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

墨西哥专线大型工程设备海运

随着全球经济一体化的不断推进&#xff0c;国际贸易日益繁荣&#xff0c;跨国物流需求不断增长。墨西哥作为拉美地区的经济大国&#xff0c;拥有丰富的资源和庞大的市场&#xff0c;吸引了许多中国企业的投资。然而&#xff0c;由于地理位置的原因&#xff0c;许多大型工程设备…

23种设计模式【创建型模式】详细介绍之【建造者模式】

建造者模式&#xff1a;构建复杂对象的精妙设计 设计模式的分类和应用场景总结建造者模式&#xff1a;构建复杂对象的精妙设计建造者模式的核心思想建造者模式的参与者Java示例&#xff1a;建造者模式 设计模式的分类和应用场景总结 可以查看专栏设计模式&#xff1a;设计模式 …

Anaconda-2023版 下载安装配置(国内镜像+图文详解)

1、Anaconda 下载 1.1 国内镜像下载 Anaconda 是跨平台的&#xff0c;版本众多&#xff0c;小伙伴们按需下载。本文是Win11系统 下载途径一&#xff1a;官方网站 https://www.anaconda.com/download/ 下载途径二&#xff1a;国内清华镜像 https://mirrors.tuna.tsinghua.…

嵌入式基础

本篇文章主要转载自 嵌入式复习&#xff0c;便于自己复习使用。 1 基本概念 嵌入式系统概念&#xff1a;以应用为中心&#xff0c;以计算机为基础&#xff0c;软件、硬件可裁剪&#xff0c;功能、可靠性、成本、体积、功耗严格要求的专用计算机系统&#xff08;国内&#xff0…

Power BI 傻瓜入门 12. 抽取报表

本章内容包含&#xff1a; 设置报表采用各种报表过滤方法探索高级报表功能消化报表发布过程 将您可以使用Power BI创建的每个可视化视为为为数据集提供不同的见解。可视化也可以是独立的&#xff0c;也可以与许多其他可视化相结合。无论哪种方式&#xff0c;可视化的输出都是…

使用Typecho搭建个人博客网站,并内网穿透实现公网访问——“cpolar内网穿透”

使用Typecho搭建个人博客网站&#xff0c;并内网穿透实现公网访问 文章目录 使用Typecho搭建个人博客网站&#xff0c;并内网穿透实现公网访问前言1. 安装环境2. 下载Typecho3. 创建站点4. 访问Typecho5. 安装cpolar6. 远程访问Typecho7. 固定远程访问地址8. 配置typecho 前言 …

linux中断(中断一)

中断是操作系统中至关重要的机制&#xff0c;它能够显著提高系统的响应性能和并发处理能力。   中断是指在 CPU 正常运行期间&#xff0c;由外部或内部事件引起的一种机制。当中断发生时&#xff0c;CPU会停止当前正在执行的程序&#xff0c;并转而执行触发该中断的中断处理程…

drawio特性

drawio的特性 drawio是领先的基于Web技术的草图和图表功能功能的应用。 保证数据的安全 集成了各种不同的平台&#xff0c;和提供了在线的免费编辑器&#xff0c;可以使用app.diagrams.net来方案&#xff0c;drawio本身不会存储用户的数据。 随着互联网时代的发展&#xff0…

死锁是什么?如何避免?如何排查?为什么这样排查 详细总结

1.死锁是什么 多个线程访问资源 线程加锁不当 会造成死锁。导致所有线程被阻塞&#xff0c;且无法解开 2.死锁的产生原因 1.加锁后忘记解锁 2.重复加锁&#xff0c;造成死锁 3.B锁内部调用函数A &#xff0c;A运行是又加锁 导致A,B均无法运行 3.如何避免死锁 多检查…

【vector题解】杨辉三角 | 删除有序数组中的重复项 | 只出现一次的数字Ⅱ

杨辉三角 力扣&#xff08;LeetCode&#xff09;官网 - 全球极客挚爱的技术成长平台 给定一个非负整数 numRows&#xff0c;生成「杨辉三角」的前 numRows 行。 在「杨辉三角」中&#xff0c;每个数是它左上方和右上方的数的和。 示例 1: 输入: numRows 5 输出: [[1],[1,1…

操作系统【OS】死锁

常考点 系统资源不足不是系统产生死锁的原因&#xff0c;资源不足只会对进程造成“饥饿”【详见王道操作系统书P153第4题】 A&#xff1a;资源不足和资源分配不足还是有区别的啦~ 死锁是什么&#xff1f; 死锁是多个进程因为竞争资源而造成的一种互相等待 为什么会出现死锁&…

软考系统架构师知识点集锦二:软件工程

一、考情分析 二、考点精讲 2.1 软件过程模型 &#xff08;1&#xff09;原型模型 典型的原型开发方法模型。适用于需求不明确的场景,可以帮助用户明确需求。可以分为[抛弃型原型]与[演化型原型] 原型模型两个阶段: 1、原型开发阶段;2、目标软件开发阶段。 &#x…

Unity报错:Microsoft Visual C# Compiler version

Unity报错:Microsoft Visual C# Compiler version 问题解决方案总结 问题 Microsoft Visual C# Compiler version 2.9.1.65535 (9d34608e) Copyright © Microsoft Corporation 切换版本或者使用老项目的时候可能会出现这个报错&#xff0c;这个报错就是项目设置的问题 …

计算机起源(三)

一、前言 计算机在完成了硬件和操作系统的构建后&#xff0c;最重要的一件事情就是要处理数据。在现代意义的数据库出现之前(20世纪60年代)&#xff0c;人们通过人工和文件系统的方式来存储、管理数据。在人工管理时期&#xff0c;人们常使用穿孔纸带来管理数据 &#xff0c;虽…

华为机试题:HJ4 字符串分隔

目录 第一章、算法题1.1&#xff09;题目描述1.2&#xff09;解题思路与答案1.3&#xff09;牛客链接 友情提醒: 先看文章目录&#xff0c;大致了解文章知识点结构&#xff0c;点击文章目录可直接跳转到文章指定位置。 第一章、算法题 1.1&#xff09;题目描述 题目描述&…

前端html+css+js实现的2048小游戏,很完善。

源码下载地址 支持&#xff1a;远程部署/安装/调试、讲解、二次开发/修改/定制 逻辑用的是JavaScript&#xff0c;界面用canvas实现&#xff0c;暂时还没有添加动画。 视频浏览地址

mac安装并使用wireshark

mac安装并使用wireshark 1 介绍 我们在日常开发过程中&#xff0c;遇到了棘手的问题时&#xff0c;免不了查看具体网络请求情况&#xff0c;这个时候就需要用到抓包工具。比较著名的抓包工具就属&#xff1a;wireshark、fildder。我这里主要介绍wireshark。 2 安装 以mac安装为…

电压放大电路适用于什么场合(电压放大器)

电压放大电路是一种常见的电子电路&#xff0c;可将输入信号的电压放大到更高的水平&#xff0c;从而提高信号的强度和可靠性。它在各个领域都有广泛的应用&#xff0c;下面将介绍电压放大电路的适用场合。 电压放大电路广泛应用于音频领域。音频信号的传输和处理需要较高的电压…

你真的了解CPU和GPU?

目录 先举个栗子 CPU 什么是CPU CPU的定义 CPU的组成 CPU的功能 GPU 什么是GPU GPU的定义 GPU的组成 GPU的功能 CPU和GPU的区别 先举个栗子 假设你正在编辑一份文档&#xff0c;这时可以将CPU和GPU的角色比喻为文档编辑过程中的两个不同任务。 1. CPU CPU就好比是…

自动驾驶之—LaneAF学习相关总结

0.前言&#xff1a; 最近在学习自动驾驶方向的东西&#xff0c;简单整理一些学习笔记&#xff0c;学习过程中发现宝藏up 手写AI 1. 概述 Laneaf思想是把后处理放在模型里面。重点在于理解vaf&#xff0c; haf&#xff0c;就是横向聚类&#xff1a;中心点&#xff0c;纵向聚类&…