Kafka怎么发送JAVA对象并在消费者端解析出JAVA对象--示例

news2025/1/4 20:22:55

1、在pom.xml中加入依赖

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
            <version>3.1.6</version>
        </dependency>

2、配置application.yml

加入Kafka的配置

spring
  kafka:
    #Kafka地址,可以是一个,也可以是Kafka集群的地址,多个地址用逗号分隔
    bootstrap-servers: 192.168.57.1xx:9093,192.168.57.1xx:9094,192.168.57.1xx:9095
 
    producer:
      # 消息确认模式:0=不等待确认,1=等待leader确认,all=所有副本确认
      acks: 1
      # 发送失败时的重试次数,0表示不重试
      retries: 0
      # 批量发送时的批次大小(字节)
      batch-size: 30720000 # 30MB
      # 生产者的内存缓冲区大小(字节)
      buffer-memory: 33554432 # 32MB
      # Key的序列化器类
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # Value的序列化器类
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
 
    consumer:
      # 消费者所属的组ID
      group-id: test-kafka
      # 禁用自动提交offset,改为手动提交
      enable-auto-commit: false
      # 偏移量重置策略:
      # earliest:从最早的记录开始消费
      # latest:从最新的记录开始消费
      auto-offset-reset: earliest
      # Key的反序列化器类
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # Value的反序列化器类
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 每次poll()调用返回的最大消息条数
      max-poll-records: 2
      session:
        # 消费者会话超时时间,超时未发送心跳将被认为失联(毫秒)
        timeout:
          ms: 300000 # 5分钟
 
    listener:
      # 如果指定的主题不存在,是否让应用启动失败,false表示不会报错
      missing-topics-fatal: false
      # 消费模式:single=单条消息,batch=批量消费
      type: single
      # 消费确认模式:
      # manual_immediate:手动确认消息,立即提交offset
      ack-mode: manual_immediate

这里的生产者value的序列化器用org.apache.kafka.common.serialization.StringSerializer
 ,消费者value的序列化器用org.apache.kafka.common.serialization.StringDeserializer即可。

(这里不需要自定义序列化器,但在代码需要将JAVA对象转化为JSON字符串发送)

3、config、producer、consumer代码

3.1、User.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;


@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public class User {
    private int id;
    private String name;
}

3.2、Task.java

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
public
class Task {
    private int id;
    private String description;
    private User assignedUser;
}

模拟嵌套类 

3.3、KafkaConfig.java

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.EnableKafka;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
 
@EnableKafka
@Configuration
public class KafkaConfig {
 
    // 单条消费监听器工厂,手动提交offset
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> singleFactory(
            ConsumerFactory<String, String> consumerFactory) {
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory);
        factory.getContainerProperties().setAckMode(org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL_IMMEDIATE);
        return factory;
    }
 
 
}

3.4、KafkaProducer.java

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;
import org.springframework.kafka.core.KafkaTemplate;

@SpringBootApplication
public class KafkaProducer {

    public static void main(String[] args) {
        SpringApplication.run(KafkaProducer.class, args);
    }

    @Bean
    CommandLineRunner commandLineRunner(KafkaTemplate<String, String> kafkaTemplate) {
        return args -> {
            String topic = "task-topic";
            ObjectMapper objectMapper = new ObjectMapper();
            for (int i = 1; i <= 5; i++) {
                // 定义一个对象实例
                User user = User.builder().id(1).name("Alice").build();
                Task task = Task.builder().id(101).description("Complete report").assignedUser(user).build();
                     //JAVA对象转化为JSON字符串
                String message =  objectMapper.writeValueAsString(task);

                kafkaTemplate.send(topic, message);
                System.out.println("Sent: " + message);
                Thread.sleep(500); // 模拟消息发送间隔
            }
        };
    }
}

序列化:使用 Jackson 的 ObjectMapperTask 对象转化为 JSON 字符串,方法 writeValueAsString() 将 Java 对象转为 JSON 字符串。

3.5、SingleConsumer.java

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Service;



@Service
public class SingleConsumer {
    @KafkaListener(topics = "task-topic", groupId = "test-group", containerFactory = "singleFactory", autoStartup = "true")
    public void listen(ConsumerRecord<String, String> record, Acknowledgment acknowledgment) throws JsonProcessingException {
        String message = record.value();

        ObjectMapper objectMapper = new ObjectMapper();
        Task task = objectMapper.readValue(message,Task.class);
        // 取出
        System.out.println("User - Received: " + task.getAssignedUser());
        // 手动提交offset
        acknowledgment.acknowledge();
    }
}

反序列化: 使用 ObjectMapper 将 JSON 字符串 message 转换回 Task 对象,方法 readValue() 可以将 JSON 字符串解析为指定的 Java 对象类型。

4、测试

启动KafkaProducer.java

可以解析出JAVA对象中User

 

成功!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2257371.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

为了安全,自己搭建KMS,成功激活Office2010

在本篇文章中&#xff0c;将全过程描述Office Professional Plus 2010 With SP1 VOL从下载到自建KMS服务器再到激活的过程。本文展示的是64位版本&#xff0c;32位版本的方法类似。 特别注意&#xff1a;KMS激活仅限于VOL 版本&#xff0c;其他的零售版无法激活&#xff01;&am…

Unity 基于Collider 组件在3D 物体表面放置3D 物体

实现 从鼠标点击的屏幕位置发送射线&#xff0c;以射线监测点击到的物体&#xff0c;根据点击物体的法线向量调整放置物体的位置及朝向。 Ray ray Camera.main.ScreenPointToRay(Input.mousePosition); if (Physics.Raycast(ray, out RaycastHit hit, 100)) {obj.transform.…

【RDMA】RDMA read和write编程实例(verbs API)

WRITE|READ编程&#xff08;RDMA read and write with IB verbs&#xff09; &#xff08;本文讲解的示例代码在&#xff1a;RDMA read and write with IB verbs | The Geek in the Corner&#xff09; 将 RDMA 与verbs一起使用非常简单&#xff1a;首先注册内存块&#xff0c…

HTML5教程-表格宽度设置,最大宽度,自动宽度

HTML表格宽度 参考&#xff1a;html table width HTML表格是网页设计中常用的元素之一&#xff0c;可以用来展示数据、创建布局等。表格的宽度是一个重要的参数&#xff0c;可以通过不同的方式来设置表格的宽度&#xff0c;本文将详细介绍HTML表格宽度的不同设置方式和示例代…

2024年12月9日Github流行趋势

项目名称&#xff1a;ollama / ollama 项目维护者&#xff1a;mxyng, jmorganca, dhiltgen, BruceMacD, technovangelist等项目介绍&#xff1a;快速上手使用Llama 3.2、Mistral、Gemma 2及其他大型语言模型。项目star数&#xff1a;101,591项目fork数&#xff1a;8,117 项目名…

IntelliJ+SpringBoot项目实战(29)--如何将Beetl的模板文件放在独立的文件目录

在实际的项目开发中&#xff0c;为了方便前端人员调试页面&#xff0c;所以有必要将Beetl的模板文件放在独立的目录下&#xff0c;方便前端人员维护&#xff0c;而不是打包到项目的jar包中&#xff0c;如果打包到项目的jar包中还有另外的问题&#xff0c;就是一改动页面就要重新…

在Ubuntu上使用IntelliJ IDEA:开启你的Java开发之旅!

你好&#xff0c;年轻的学徒&#xff01;&#x1f9d1;‍&#x1f4bb; 是时候踏上进入Java开发世界的史诗之旅了&#xff0c;我们的得力助手将是强大的IntelliJ IDEA。准备好了吗&#xff1f;出发吧&#xff01; 在我们开始之前&#xff0c;我们需要下载这个工具。但是&#…

GWAS分析先做后学

大家好&#xff0c;我是邓飞。 GWAS分析是生物信息和统计学的交叉学科&#xff0c;上可以学习编程&#xff0c;下可以学习统计。对于Linux系统&#xff0c;R语言&#xff0c;作图&#xff0c;统计学&#xff0c;机器学习等方向&#xff0c;都是一个极好的入门项目。生物信息如…

LeetCode—189. 轮转数组(中等)

题目描述&#xff1a; 给定一个整数数组 nums&#xff0c;将数组中的元素向右轮转 k 个位置&#xff0c;其中 k 是非负数。 示例1&#xff1a; 输入: nums [1,2,3,4,5,6,7], k 3输出:[5,6,7,1,2,3,4] 解释: 向右轮转 1 步:[7,1,2,3,4,5,6] 向右轮转 2 步:[6,7,1,2,3,4,5] 向…

ModelScope-Agent(1): 基于开源大语言模型的可定制Agent系统

目录 简介快速入门 简介 github地址 快速入门 看前两篇&#xff0c;调用千问API和天气API # 选用RolePlay 配置agent from modelscope_agent.agents.role_play import RolePlay # NOQArole_template 你扮演一个天气预报助手&#xff0c;你需要查询相应地区的天气&#x…

go语言的成神之路-标准库篇-fmt标准库

目录 一、三种类型的输出 print&#xff1a; println&#xff1a; printf&#xff1a; 总结&#xff1a; 代码展示&#xff1a; 二、格式化占位符 %s&#xff1a;用于格式化字符串。 %d&#xff1a;用于格式化整数。 %f&#xff1a;用于格式化浮点数。 %v&#xff1…

pyenv 安装脚本解读

pyenv 安装脚本 curl https://pyenv.run | bash执行上面这一行脚本就可以安装pyenv来满足你对 Python 多版本共存以及切换的支持。 pyenv搭配virtualenv可以满足你对Python虚拟环境版本的支持。个人感觉pyenv比conda更轻量&#xff0c;更推荐使用。 那么上面的脚本到底干了什…

OpenAI 12Days 第二天 强化微调(RFT):推动语言模型在科学研究中的应用

OpenAI 12Days 第二天 强化微调&#xff08;RFT&#xff09;&#xff1a;推动语言模型在科学研究中的应用 文章目录 OpenAI 12Days 第二天 强化微调&#xff08;RFT&#xff09;&#xff1a;推动语言模型在科学研究中的应用RFT的工作原理与应用领域案例研究&#xff1a;基因突变…

柯桥职场商务英语生活英语口语培训外贸纺织口语学习

"等一下"该怎么说&#xff1f; 大家应该都知道&#xff0c;wait a moment是一个祈使句&#xff0c;祈使句就难免带有命令的口吻&#xff0c;还有点不耐烦。 如果你把“等一下”说成wait a moment&#xff0c;外国人多半认为你是个傲慢无礼的人。毕竟在他们看来wait a…

嵌入式蓝桥杯学习7 产生PWM

Cubemx配置 打开cubemx&#xff0c;前面的配置看上文&#xff0c;这里主要配置定时器产生PWM波。 以PA1的TIM2-CH2通道为例进行演示。 1.在Timers中打开TIM2,将Channel2配置为PWM Generation CH2。 2.将Clock Source 选择为Internal Clock。 3.配置Paramater Settings中的参…

SQL Server中SELECT (Transact-SQL)语法定义和解释

语法定义&#xff1a; <SELECT statement> ::[ WITH { [ XMLNAMESPACES , ] [ <common_table_expression> [ , ...n ] ] } ]<query_expression>[ ORDER BY <order_by_expression> ][ <FOR Clause> ][ OPTION ( <query_hint> [ , ...n ] )…

证书监控续签工具

详细实现方式以及代码下载请前往https://www.passerma.com/article/91 一个基于 go 的 ssl 证书监控续签工具&#xff0c;用于监控阿里云免费 ssl 证书的有效期&#xff0c;并在证书即将过期时自动续签证书 仅支持续签阿里云的免费证书 页面 使用方法 1.申请阿里云账号 2.…

【开源】A063—基于Spring Boot的农产品直卖平台的设计与实现

&#x1f64a;作者简介&#xff1a;在校研究生&#xff0c;拥有计算机专业的研究生开发团队&#xff0c;分享技术代码帮助学生学习&#xff0c;独立完成自己的网站项目。 代码可以查看项目链接获取⬇️&#xff0c;记得注明来意哦~&#x1f339; 赠送计算机毕业设计600个选题ex…

HDR视频技术之六:色调映射

图像显示技术的最终目的就是使得显示的图像效果尽量接近人们在自然界中观察到的对应的场景。 HDR 图像与视频有着更高的亮度、更深的位深、更广的色域&#xff0c;因此它无法在常见的普通显示器上显示。 入门级的显示器与播放设备&#xff08;例如普通人家使用的电视&#xff0…

【C++笔记】map和set的使用

前言 各位读者朋友们大家好&#xff01;上期我们讲完了二叉搜索树这一数据结构&#xff0c;这一期我们来讲STL中的map和set这两大容器。这两个容器的底层是红黑树&#xff0c;红黑树的底层是平衡二叉搜索树。 目录 前言一. 序列式容器和关联式容器二. set系列的使用2.1 set类…