Kafka-Java一:Spring实现kafka消息的简单发送

news2024/9/23 5:15:53

目录

写在前面

一、创建maven项目

二、引入依赖

2.1、maven项目创建完成后,需要引入以下依赖

2.2、创建工程目录

三、创建生产者

3.1、创建生产者,同步发送消息

3.2、创建生产者,异步发送消息

四、同步发送消息和异步发送消息的区别

五、报错处理思路


写在前面

        该文章通过spring只实现消息的简单发送,不实现消息的监听。

一、创建maven项目

        创建maven过程不再赘述。

二、引入依赖

2.1、maven项目创建完成后,需要引入以下依赖

    // kafka 依赖
    <dependency>
      <groupId>org.apache.kafka</groupId>
      <artifactId>kafka-clients</artifactId>
      <version>3.4.0</version>
    </dependency>

   //  json依赖,demo中可能会用到该依赖,与kafka依赖无关
    <dependency>
      <groupId>com.alibaba</groupId>
      <artifactId>fastjson</artifactId>
      <version>2.0.10</version>
    </dependency>

2.2、创建工程目录

三、创建生产者

3.1、创建生产者,同步发送消息

        3.1.1、在MyProducer中实现如下代码

package com.demo.lxb.kafka;

import com.alibaba.fastjson.JSON;
import com.demo.lxb.entiry.Order;
import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;

/**
 * @Description:
 * @Author: lvxiaobu
 * @Date: 2023-10-23 17:06
 **/
public class MyProducer {

    private final  static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 一、设置参数
        // 配置kafka地址
        //props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.151.28:9092"); // 单机配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.151.28:9092,192.168.151.28:9092,192.168.151.28:9092"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 二、声明消息对象
        // 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum
        // 创建发送的消息: producerRecord
          // 参数1: 要发送的主题
          // 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式
          // 参数3: value,具体的消息的内容,json格式的字符串
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>
                            (TOPIC_NAME,
                            "mykey",
                            "hello-kafka");
        // 三、声明消息发送者
        Producer<String,String> producer = new KafkaProducer<String,String>(props);
        // 开发发送,并返回结果和元数据
        RecordMetadata recordMetadata = producer.send(producerRecord).get();

        System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"
                + recordMetadata.partition() + " | offset-" + recordMetadata.offset());
    }
}

        执行main方法,结果如下:

        如果多次执行main方法,会发现offset偏移量的数字会发生变化。 

3.2、创建生产者,异步发送消息

        3.2.1、在MyProducer2中实现如下代码

package com.demo.lxb.kafka;

import org.apache.kafka.clients.producer.*;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

/**
 * @Description: kafka 异步发送消息
 * @Author: lvxiaobu
 * @Date: 2023-10-23 17:06
 **/
public class MyProducer2 {

    private final  static String TOPIC_NAME = "topic0921";

    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 一、设置参数
        // 配置kafka地址
//        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
//                "192.168.151.28:9092"); // 单机配置
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
                "192.168.154.128:9092,192.168.154.128:9093,192.168.154.128:9094"); // 集群配置
        // 配置消息 键值的序列化规则
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class.getName());

        // 二、声明消息对象
        // 未指定发送分区,具体撒送分区的计算公式: hash(key)%PartitionNum
        // 创建发送的消息: producerRecord
          // 参数1: 要发送的主题
          // 参数2: 消息的key值,可有可无,如果存在的话,该字段用来带入分区的计算公式
          // 参数3: value,具体的消息的内容,json格式的字符串
        ProducerRecord<String,String> producerRecord = new ProducerRecord<String, String>
                            (TOPIC_NAME,
                            "mykey",
                            "hello-kafka2");
        // 三、声明消息发送者
        Producer<String,String> producer = new KafkaProducer<String,String>(props);
        // 异步发送消息,通过callback回调函数获取发送结果
        producer.send(producerRecord, new Callback() {
            public void onCompletion(RecordMetadata recordMetadata, Exception e) {
                if(e != null){
                    System.out.println("消息发送失败:" + e);
                }
                if(recordMetadata != null){
                    System.out.println("发送消息结果: " + "topic-" + recordMetadata.topic() + " | partition-"
                            + recordMetadata.partition() + " | offset-" + recordMetadata.offset());
                }
            }
        });

        Thread.sleep(50000L);

    }
}

执行 Main方法,会产生和同步发送消息一样的结果。

说明:Thread.sleep(50000L)是让主线程休眠50s,否则主线程在异步发送了消息以后就直接结束了,不会再输出callback中的输出语句

四、同步发送消息和异步发送消息的区别

消息的同步发送
  如果生产者发送的消息没有收到kafka的ack通知,生产者会产生阻塞,如果阻塞了3s仍然没有收到消息反馈,会进行消息发送的重试操作,重试的次数是3次。如果三次以后还不行,代码将抛出异常
消息的异步发送
  生产者发送消息后,会提供一个callback的回调方法,callback会获取消息是否发送成功的结果。但是需要注意,异步发送消息会出现消息的丢失。

五、报错处理思路

        3.2.1、检查Props配置Kafka地址是否正确

        3.2.2、检查Linux是否关闭防火墙

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1131894.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

ChinaSoft 论坛巡礼 | 开源软件生态健康度量论坛

2023年CCF中国软件大会&#xff08;CCF ChinaSoft 2023&#xff09;由CCF主办&#xff0c;CCF系统软件专委会、形式化方法专委会、软件工程专委会以及复旦大学联合承办&#xff0c;将于2023年12月1-3日在上海国际会议中心举行。 本次大会主题是“智能化软件创新推动数字经济与社…

京东销量(销额)数据分析:2023年9月京东奶粉行业品牌销售排行榜

鲸参谋监测的京东平台9月份奶粉市场销售数据已出炉&#xff01; 根据鲸参谋平台的数据显示&#xff0c;今年9月份&#xff0c;京东平台奶粉&#xff08;包括婴幼儿奶粉、孕妈奶粉、婴幼儿液态奶&#xff09;市场的销量约730万&#xff0c;环比降低约6%&#xff0c;同比降低约19…

【C语言】指针那些事(上)

C语言系列 文章目录 文章目录 一. 字符指针 一.&#xff08;1 &#xff09; 数组创建空间的地址和指针指向的地址 二. 指针数组 二.&#xff08;1&#xff09;指针数组模拟一个二维数组 ​ 三. 数组指针 三.(1)数组指针到底有什么用 对一维数组没有什么用 二.(…

VC++程序崩溃时,使用Visual Studio静态分析dump文件

1、通过Visual Studio直接把Dump文件打开 2、点击【仅限本机进行调试】&#xff0c;启动Dump 3.1、本机调试启动后&#xff0c;如果程序运行模块和pdb文件在同一个目录的&#xff0c;直接定位到异常代码行 3.2、如果显示找不到pdb文件&#xff0c;则需要通过【新建路径】设置…

Python-列表、元组和字典

目录 一、列表概述 二、列表的循环遍历 1、使用for循环遍历列表 2、使用while循环遍历列表 三、列表的常见操作 1、在列表中增加元素 &#xff08;1&#xff09;使用append方法 &#xff08;2&#xff09;使用extend方法 &#xff08;3&#xff09;使用insert方法 2、…

温馨提示!小心不法分子的隐藏陷阱

《绝地求生》国服的”老兵回归”活动一经推出就受到广大玩家欢迎&#xff0c;因此也有不法分子想趁虚而入。就在近日&#xff0c;我们接到玩家举报&#xff0c;发现一些不法分子通过伪基站邮件群发的形式&#xff0c;以“第XXXX位老兵奖励”为主题&#xff0c;向用户推荐一个非…

【薅羊毛】免费领取6个月语雀会员-关于语雀 23 日故障的公告

关于语雀 23 日故障的公告 关于语雀 23 日故障的公告 语雀团队 语雀 2023-10-24 21:11 发表于浙江 各位语雀的用户&#xff1a; 10 月 23 日语雀出现重大服务故障&#xff0c;且持续 7 个多小时才完全恢复&#xff0c;给用户使用造成极大不便&#xff0c;对此我们深感抱歉。…

Go 包操作之如何拉取私有的Go Module

Go 包操作之如何拉取私有的Go Module 在前面&#xff0c;我们已经了解了GO 项目依赖包管理与Go Module常规操作&#xff0c;Go Module 构建模式已经成为了 Go 语言的依赖管理与构建的标准。 在平时使用Go Module 时候&#xff0c;可能会遇到以下问题&#xff1a; 在某 modul…

关于报错java.util.ConcurrentModificationException: null的源码分析和解决

一般有这种问题,方法中至少会有List或者Map下的至少两个子类,有可能参数类型相同,也有可能不同都有可能触发这个问题!其主要原因是使用了ArrayList进行删除操作或者使用iterator遍历集合的同时对集合进行修改都有可能会出现这个问题 ArrayList属于List下的子类 需要区分的是Li…

AI小百科 - 什么是生成式AI中的提示语 “Prompt“

定义 "Prompt" 是指在生成式AI中用于指导模型生成输出的输入文本或问题。它是一种方法&#xff0c;通过提供特定的信息或指示&#xff0c;引导AI生成与所需任务相关的响应。 让我们用一个小学生可以理解的方式来解释提示语。想象一下&#xff0c;你是一名小学生&am…

深入剖析Java反射机制:解锁高级编程技巧,事半功倍

前言 在 Java中&#xff0c;反射机制&#xff08;Reflection&#xff09;非常重要&#xff0c;但对于很多开发者来说&#xff0c;这并不容易理解&#xff0c;甚至觉得有点神秘 目录 1. 简介 定义&#xff1a;Java语言中 一种 动态&#xff08;运行时&#xff09;访问、检测 …

docker安装minio作为图床服务

minio官网&#xff1a;MinIO | 高性能分布式存储&#xff0c;私有云存储 适用于AI的高性能分布式云存储&#xff0c;MinIO提供高性能、与S3 兼容的对象存储系统&#xff0c;让你自己能够构建自己的私有云储存服务。 MinIO原生支持 Kubernetes&#xff0c;它可用于每个独立的公…

迅为itop-3568开发板qt学习手册上新

基于RK3568的QT教程他来了~从C基础到QT编程实例再到项目实战&#xff0c;《iTOP-3568开发板QT学习手册》带你打通QT的任督二脉。 界面布局 3.5.1 水平布局 l Horizontal Layout&#xff1a;水平方向布局&#xff0c;组件自动在水平方向上分布 使用时先选中组件&#xff0…

外卖霸王餐系统 支持小程序,分站合作

它终于出来啦&#xff01;微客云分站终于正式发布&#xff01;&#xff01;&#x1f4e3;&#x1f4e3;&#x1f4e3; 先前的文章就已经预告过微客云要搞分站&#xff0c;很多小伙伴们一直在问老许分站什么时候出来&#xff0c;想加盟分站怎么做&#xff1f;有什么条件&#xf…

部署:端口映射相关问题

图片仅作示意用途 在很多现场部署环境里&#xff0c;网络管理是相对严格的&#xff0c;设备所在的子网如果需要和办公网所在的服务器通讯&#xff0c;需要通过专门的中间节点&#xff0c;一般还有严格的防火墙配置。此时&#xff0c;研发环境里&#xff0c;服务器与设备子网各…

【Python3】【力扣题】136. 只出现一次的数字

【力扣题】题目描述&#xff1a; 【Python3】代码&#xff1a; 1、解题思路&#xff1a;遍历列表元素&#xff0c;查看该元素在列表中共有多少个&#xff0c;返回个数为1的元素。 知识点&#xff1a;列表.count(...)&#xff1a;统计列表中某元素个数。 class Solution:def …

【实战项目】高并发内存池(下)

我们上篇文章&#xff08;高并发内存池&#xff08;上&#xff09;&#xff09;介绍了向高并发内存池申请资源的整个过程&#xff0c;本篇文章我们将会对申请后的空间资源释放的整个流程。同时也会对我们自己实现的内存池进行性能测试和优化。 文章目录 一、thread cache 回收资…

CentOS 7 安装和配置java环境

1 安装包准备 安装包可以通过下面地址进行版本选择安装&#xff1a; https://www.oracle.com/java/technologies/downloads/#java8 2 正式开始安装 本次分享的安装方法直接通过编辑/etc/profile文件实现java的安装 2.1 新建安装包存放目录 mkdir /java cd /java/ 2.2 解压安…

Mac电脑窗口管理Magnet中文 for mac

Magnet是一款Mac窗口管理工具&#xff0c;它可以帮助用户轻松管理打开的窗口&#xff0c;提高多任务处理效率。以下是Magnet的一些主要特点和功能&#xff1a; 分屏模式支持&#xff1a;Magnet支持多种分屏模式&#xff0c;包括左/右/顶部/底部 1/2 分屏、左/中/右 1/3 分屏、…

分享一下怎么做一个投票小程序链接

在这个数字化时代&#xff0c;微信小程序已经成为了我们生活中不可或缺的一部分。而投票小程序链接&#xff0c;更是具有广泛的应用场景和巨大的市场潜力。本文将详细介绍如何制作一个投票小程序链接&#xff0c;帮助大家了解其意义、设计思路、实现方法、亮点突出以及如何推广…