Kafka Producer发送消息流程之消息异步发送和同步发送

news2024/11/15 4:34:48

文章目录

  • 1. 异步发送
  • 2. 同步发送

在这里插入图片描述

1. 异步发送

Kafka默认就是异步发送,在Main线程中的多条消息,没有严格的先后顺序,Sender发送后就继续下一条,异步接受结果。

public class KafkaProducerCallbackTest {
    public static void main(String[] args) throws InterruptedException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test2",
                    ""+i,
                    "我是你爹"+i
            );
            //发送record
            producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("回调信息:消息发送成功");
                }
            });
            System.out.println("发送数据");
        }

        //关闭producer
        producer.close();
    }
}

Main线程中,对于多条数据,下一条消息的发送并不等待上一条消息的确认,而是继续发送。

2024-07-17 21:43:46.052 [kafka-producer-network-thread | producer-1] INFO  org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: BqIgDGtwTeeusL_ygHtn2w
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
发送数据
2024-07-17 21:43:46.075 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:43:46.280 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 6000 with epoch 0
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
回调信息:消息发送成功
2024-07-17 21:43:46.569 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

可以看到先是main线程循环发送完了多条数据,然后再异步收到通知。

2. 同步发送

消息有严格的先后顺序,下一条消息必须等到上一条消息的回调确认后,再发送,这是一个效率极低的过程。

按照流程图,上一条消息需要从生产者一直流转,多个步骤,到数据收集器,到Sender,最后还要等待回调确认,才可以开始下一条消息的流转。

public class KafkaProducerCallbackTest {
    public static void main(String[] args) throws InterruptedException, ExecutionException {
        //创建producer
        HashMap<String, Object> config = new HashMap<>();
        config.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:19092");
        config.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        config.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(config);

        for (int i = 0; i < 10; i++) {
            //创建record
            ProducerRecord<String, String> record = new ProducerRecord<String, String>(
                    "test2",
                    ""+i,
                    "我是你爹"+i
            );
            //发送record
            Future<RecordMetadata> send = producer.send(record, new Callback() {
                @Override
                public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                    System.out.println("回调信息:消息发送成功");
                }
            });
            System.out.println("发送数据");
            send.get();
        }

        //关闭producer
        producer.close();
    }
}
2024-07-17 21:49:19.586 [kafka-producer-network-thread | producer-1] INFO  o.a.k.c.producer.internals.TransactionManager - [Producer clientId=producer-1] ProducerId set to 5000 with epoch 0
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
发送数据
回调信息:消息发送成功
2024-07-17 21:49:19.823 [main] INFO  org.apache.kafka.clients.producer.KafkaProducer - [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2024-07-17 21:49:19.838 [main] INFO  org.apache.kafka.common.metrics.Metrics - Metrics scheduler closed

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1932523.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

k8s集群 安装配置 Prometheus+grafana+alertmanager

k8s集群 安装配置 Prometheusgrafanaalertmanager k8s环境如下&#xff1a;机器规划&#xff1a; node-exporter组件安装和配置安装node-exporter通过node-exporter采集数据显示192.168.40.180主机cpu的使用情况显示192.168.40.180主机负载使用情况 Prometheus server安装和配置…

如何用AI交互数字人一体机,打造政务服务新名片?

如今&#xff0c;将“高效办成一件事”作为优化政务服务、提升行政效能的重要抓手&#xff0c;各地方为了促进政务服务由传统模式向数字化、智能化方向转变&#xff0c;纷纷在政务服务场景融合了AI交互数字人&#xff0c;实现“无人化、智慧化”导办、帮办、代办等模式&#xf…

深度学习程序环境配置

深度学习环境配置 因为之前轻薄本没有显卡跑不起来&#xff0c;所以换了台电脑重新跑程序&#xff0c;故记录一下配置环境的步骤及常见错误 本人数学系&#xff0c;计算机部分知识比较匮乏&#xff0c;计算机专业同学可以略过部分内容 深度学习环境配置 深度学习环境配置 CUD…

Vue脚手架安装(保姆级)

&#x1f49d;&#x1f49d;&#x1f49d;欢迎来到我的博客&#xff0c;很高兴能够在这里和您见面&#xff01;希望您在这里可以感受到一份轻松愉快的氛围&#xff0c;不仅可以获得有趣的内容和知识&#xff0c;也可以畅所欲言、分享您的想法和见解。 非常期待和您一起在这个小…

WEB前端05-JavaScrip基本对象

JavaScript对象 1.Function对象 函数的创建 //方法一&#xff1a;自定义函数 function 函数名([参数]) {函数体[return 表达式] }//方法二&#xff1a;匿名函数 (function([参数]) {函数体[return 表达式] }); **使用场景一&#xff1a;定义后直接调用使用(只使用一次) (fun…

【Arduino IDE】安装及开发环境、ESP32库

一、Arduino IDE下载 二、Arduino IDE安装 三、ESP32库 四、Arduino-ESP32库配置 五、新建ESP32-S3N15R8工程文件 乐鑫官网 Arduino官方下载地址 Arduino官方社区 Arduino中文社区 一、Arduino IDE下载 ESP-IDF、MicroPython和Arduino是三种不同的开发框架&#xff0c;各自适…

如何防范场外个股期权的交易风险?

场外个股期权交易&#xff0c;作为金融衍生品市场的重要组成部分&#xff0c;为投资者提供了更为灵活和多样化的投资策略。然而&#xff0c;其高杠杆、高风险特性也使得投资者在追求高收益的同时&#xff0c;面临着较大的交易风险。为了有效防范这些风险&#xff0c;投资者需要…

达梦 ./disql SYSDBA/SYSDBA报错[-70028]:创建SOCKET连接失败. 解决方法

原因 达梦命令./disql SYSDBA/SYSDBA默认访问端口5236&#xff0c;如果初始化实例的时候修改了端口&#xff0c;需要指定端口访问 解决 ./disql SYSDBA/SYSDBA192.168.10.123:5237

手机如何伪装ip网络地址

伪装IP地址是指通过技术手段修改网络设备的IP地址&#xff0c;使其看起来像是来自另一个网络位置。这种技术通常用于隐藏真实的网络活动&#xff0c;以保护隐私。那么&#xff0c;手机如何伪装IP网络地址&#xff1f; 要在手机上伪装IP地址&#xff0c;‌可以通过下载和安装手机…

阿里云国际站:海外视频安全的DRM加密

随着科技的进步&#xff0c;视频以直播或录播的形式陆续开展海外市场&#xff0c;从而也衍生出内容安全的问题&#xff0c;阿里云在这方面提供了完善的内容安全保护机制&#xff0c;适用于不同的场景&#xff0c;如在视频安全提供DRM加密。 由图可以了解到阿里云保护直播安全的…

工业三防平板助力工厂生产数据实时管理

在当今高度数字化和智能化的工业生产环境中&#xff0c;工业三防平板正逐渐成为工厂实现生产数据实时管理的得力助手。这种创新的技术设备不仅能够在恶劣的工业环境中稳定运行&#xff0c;还为工厂的生产流程优化、效率提升和质量控制带来了前所未有的机遇。 工业生产场景通常充…

08-8.6.1 外部排序

&#x1f44b; Hi, I’m Beast Cheng &#x1f440; I’m interested in photography, hiking, landscape… &#x1f331; I’m currently learning python, javascript, kotlin… &#x1f4eb; How to reach me --> 458290771qq.com 喜欢《数据结构》部分笔记的小伙伴可以…

使用 Flask 3 搭建问答平台(一):项目结构搭建

一、项目基本结构 二、app.py from flask import Flask import config from exts import db from models import UserModel from blueprints.qa import bp as qa_bp from blueprints.auth import bp as auth_bp# 创建一个Flask应用实例&#xff0c;__name__参数帮助Flask确定应…

uniapp 开发 App 对接官方更新功能

插件地址&#xff1a;升级中心 uni-upgrade-center - App - DCloud 插件市场 首先创建一个 uni-admin 项目&#xff0c;选择你要部署的云开发服务商&#xff1a; 然后会自动下载模板&#xff0c;部署云数据库、云函数 第二步&#xff1a;将新创建的 uni-admin 项目托管到…

自动驾驶系列—智能巡航辅助功能中的车道变换功能介绍

文章目录 1. 背景介绍2. 功能定义3. 功能原理4. 传感器架构5. 实际应用案例5.1 典型场景1&#xff1a;换道时无其他交通参与者5.1.1 直道中的车道变换5.1.2 弯道中的车道变换5.1.3 综合场景应用 5.2 典型场景2&#xff1a;换道方向车道线非虚线5.3 典型场景3&#xff1a;换道方…

【Socket套接字编程】(实现TCP和UDP的通信)

&#x1f387;&#x1f389;&#x1f389;&#x1f389;点进来你就是我的人了 博主主页&#xff1a;&#x1f648;&#x1f648;&#x1f648;戳一戳&#xff0c;欢迎大佬指点&#xff01; 人生格言: 当你的才华撑不起你的野心的时候,你就应该静下心来学习! 欢迎志同道合的朋友…

Ubuntu 24.04安装Jellyfin媒体服务器图解教程

使用 Jellyfin 等开源软件创建媒体服务器肯定能帮助您管理和跨各种设备传输媒体集合。当你有一个封闭社区时&#xff0c;这尤其有用。 什么是 Jellyfin 媒体服务器&#xff1f; Jellyfin 媒体服务器&#xff0c;顾名思义&#xff0c;是一款开源软件&#xff0c;允许用户使用本…

高通Android 12 设置Global属性为null问题

1、最近在做app调用framework.jar需求&#xff0c;尝试在frameworks/base/packages/SettingsProvider/res/values/defaults.xml增加属性 <integer name"def_xxxxx">1</integer> 2、在frameworks\base\packages\SettingsProvider\src\com\android\provide…

mac环境下安装python3的图文教程

Python 是一种功能多样且强大的编程语言&#xff0c;在各个领域得到广泛应用。许多 Mac 用户都在其设备上安装和运行 Python&#xff0c;以运行特定的应用程序或创建、运行自己的 Python 脚本。 文章源自设计学徒自学网-http://www.sx1c.com/49441.html 虽然某些版本的 macOS…

jmeter-beanshell学习11-从文件获取指定数据

参数文件里的参数可能过段时间就不能用了&#xff0c;需要用新的参数。如果有多个交易&#xff0c;读不同的参数文件&#xff0c;但是数据还是一套&#xff0c;就要改多个参数文件。或者只想执行参数文件的某一行数据&#xff0c;又不想调整参数文件顺序。 第一个问题目前想到…