SpringBoot集成kafka接收对象消息

news2025/1/17 1:13:52

SpringBoot集成kafka接收对象消息

  • 1、生产者
  • 2、消费者
  • 3、工具类
  • 4、消息实体对象
  • 5、配置文件
  • 6、启动类
  • 7、测试类
  • 8、测试结果

在这里插入图片描述

1、生产者

package com.power.producer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

import javax.annotation.Resource;
import java.util.Date;

@Component
public class EventProducer {

    @Resource
    private KafkaTemplate<String,Object> kafkaTemplate;

    public void sendEvent2(){
        User user = User.builder().id(10001).phone("15676767676").birthday(new Date()).build();
        String userJson = JSONUtils.toJSON(user);
        kafkaTemplate.send("helloTopic",userJson);
    }

}

2、消费者

package com.power.consumer;

import com.power.model.User;
import com.power.util.JSONUtils;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.stereotype.Component;

import java.util.function.Consumer;

@Component
public class EventConsumer {

    //采用监听的方式接收事件(消息,数据)
    @KafkaListener(topics = {"helloTopic"},groupId="helloGroup")
        public void onEvent(String userJson,
                        @Header(value=KafkaHeaders.RECEIVED_TOPIC) String topic,
                        @Header(value=KafkaHeaders.RECEIVED_PARTITION_ID) String partition,
                        ConsumerRecord<String,String> record){
        User user =JSONUtils.toBean(userJson,User.class);
        System.out.println("读取/消费到的事件,user:"+user+",topic:"+topic+",partition:"+partition);
        System.out.println("读取/消费到的事件:"+record.toString());

    }

}

3、工具类

package com.power.util;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class JSONUtils {

    private static final ObjectMapper OBJECTMAPPER = new ObjectMapper();

    public static String toJSON(Object object){
        try {
            return OBJECTMAPPER.writeValueAsString(object);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }

    public static <T> T toBean(String json,Class<T> clazz){
        try {
            return OBJECTMAPPER.readValue(json,clazz);
        } catch (JsonProcessingException e) {
            throw new RuntimeException(e);
        }
    }
}

4、消息实体对象

package com.power.model;

import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.util.Date;

@Builder
@AllArgsConstructor
@NoArgsConstructor
@Data
public class User {

    private Integer id;

    private String phone;

    private Date birthday;

}

5、配置文件

spring:
  application:
    #应用名称
    name: spring-boot-02-kafka-base

  #kafka连接地址(ip+port)
  kafka:
    bootstrap-servers: <你的kafka服务器IP>:9092

6、启动类

package com.power;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class KafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(KafkaApplication.class, args);
		System.out.println("启动成功--------------------------");
	}
}

7、测试类

package com.power;

import com.power.producer.EventProducer;
import org.junit.jupiter.api.Test;
import org.springframework.boot.test.context.SpringBootTest;

import javax.annotation.Resource;

@SpringBootTest
public class SpringBoot02KafkaBaseApplication {

    @Resource
    private EventProducer eventProducer;

    @Test
    void sendEvent2(){
        eventProducer.sendEvent2();
    }

}

8、测试结果

先启动消费者
在启动生产者测试类
已接收到消息对象数据:

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2070488.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

UEStudio V24 中文授权版

UEStudio是一款集成开发环境&#xff08;IDE&#xff09;软件&#xff0c;主要用于编写和编辑各种类型的代码&#xff0c;包括C/C、Java、HTML、PHP、Perl、Python等。 软件截图&#xff1a; 使用说明&#xff1a; 解压后&#xff0c;双击start_UEStudio.bat来运行软件 下载地…

【计算机组成原理】计算机系统概述<1>

学习目标&#xff1a; 掌握计算机组成原理的基础知识巩固 例如&#xff1a; 信息化世界的组成 计算机系统概述 计算机硬件基本组成 各个硬件的工作原理 计算机软件 计算机系统的多层次结构 计算机系统的工作原理 计算机性能指标 学习内容&#xff1a; 1.0、初入计算机组成原…

Apollo9.0 PNC源码学习之Planning模块—— Lattice规划(七):横纵向运动轨迹的优选

参考文章: (1)Apollo6.0代码Lattice算法详解——Part 7: 获得最优轨迹 (2)Lattice算法详解 0 前言 // 优选出cost最小的trajectory// 7. always get the best pair of trajectories to combine; return the first// collision-free trajectory.size_t constraint_failure…

Latent-OFER:使用潜在向量进行检测、屏蔽和重建,以实现遮挡的面部表情识别

论文&#xff1a;Latent-OFER: Detect, Mask, and Reconstruct with Latent Vectors for Occluded Facial Expression Recognition 摘要&#xff1a;所提出的方法Latent-OFER可以检测遮挡&#xff0c;将面部被遮挡的部分恢复为未被遮挡的部分&#xff0c;并识别它们&#xff0…

【Java自动化学习】Web自动化

一、环境安装 环境搭建安装见此博客文章链接&#xff1a;https://blog.csdn.net/qq_73471456/article/details/130836494 二、元素定位、等待方式 见此之前的博客文章&#xff1a;selenium操作使用方式 三、下拉框定位 四、iframe 切换元素定位 注意事项&#xff1a;连续定…

数学排列组合

我突然想发一篇文章(别问我为什么[doge]) 排列组合大家都听过吧,今天的主角就是排列组合。 废话不多说,直接开始 先来看几道题目&#xff1a; :由1&#xff0c;2&#xff0c;3&#xff0c;4组成不同的三位数有几种? :有四个人&#xff0c;每两个人都要握手一次&#xff0c;要握…

【秋招笔试】8.24美团秋招(算法岗)-三语言题解

🍭 大家好这里是 春秋招笔试突围,一起备战大厂笔试 💻 ACM金牌团队🏅️ | 多次AK大厂笔试 | 编程一对一辅导 ✨ 本系列打算持续跟新 春秋招笔试题 👏 感谢大家的订阅➕ 和 喜欢💗 和 手里的小花花🌸 ✨ 笔试合集传送们 -> 🧷春秋招笔试合集 🍒 本专栏已收…

[JAVA] 什么是Java线程同步机制?

在单线程程序中&#xff0c;每次只能做一件事情&#xff0c;后面的事情需要等待前面的事情完成后才可以进行&#xff0c;如果使用多线程程序&#xff0c;就会发生两个线程抢占资源的问题&#xff0c;所以在多线程编程中&#xff0c;需要防止这些资源访问的冲突&#xff0c;Java…

LED显示屏原理及其系统组成

随着城市化进程的加快&#xff0c;LED显示屏的需求在各个行业中迅速增长。无论是用于广告宣传、信息发布&#xff0c;还是场馆显示&#xff0c;LED显示屏都扮演着重要的角色。然而&#xff0c;对于很多人来说&#xff0c;LED显示屏的工作原理及其系统组成可能并不为熟知。本文将…

589. N 叉树的前序遍历(递归法)

目录 一&#xff1a;题目&#xff1a; 二&#xff1a;代码&#xff1a; 三&#xff1a;结果&#xff1a; 一&#xff1a;题目&#xff1a; 给定一个 n 叉树的根节点 root &#xff0c;返回 其节点值的 前序遍历 。 n 叉树 在输入中按层序遍历进行序列化表示&#xff0c;每…

Java JNA调用C函数常见问题及解决方法

目录 1 undefined symbol&#xff1a;xxx2 Java映射C数组乱码3 Java使用String接收不到C函数返回的char*4 Unable to load DLL xxx.dll5 java.lang.UnsatisfiedLinkError: %1 不是有效的 Win32 应用程序6 无效的ELF头7 Structure array elements must use contiguous memory8 j…

备考计算机二级Python之Day4下篇

实例解析--猜数字游戏 编写一个“猜数字游戏”的程序&#xff0c;在1~1000之间随机产生一个数&#xff0c;然后请用户循环猜这个数字&#xff0c;对于每个答案只回答“猜大了”或“猜小了”&#xff0c;直到猜对为止。输出用户的猜测次数。 使用Python语言的随机标准库random…

开源游戏开发引擎LayaAir

LayaAir是一款由Layabox公司推出的次世代全平台3D引擎&#xff0c;它支持2D、3D、VR与AR产品的开发&#xff0c;并允许开发者一次开发后同时发布为Web、小游戏、Native APP等多种平台的产品。 LayaAir引擎提供强大的IDE集成环境&#xff0c;包含3D场景编辑器、材质编辑器、粒子…

【linux中高级命令】

杀进程 1、lsof列出所有打开的文件&#xff0c;‌包括网络连接&#xff0c;‌从而提供关于系统状态的宝贵信息‌ #可以直接查看端口的进程 lsof -i:端口获取到进程&#xff0c;使用kill命令和PID来终止进程 kill -9 [PID]2、列出使用该端口的进程信息&#xff0c;‌包括PID …

linux上datax 安装以及使用

前言 DataX 是一款由阿里巴巴开源的数据同步工具&#xff0c;旨在帮助用户实现不同数据源之间的高效数据迁移和同步。无论是从传统的关系型数据库、NoSQL 数据库&#xff0c;还是到大数据存储系统&#xff0c;DataX 都能够轻松应对各种数据同步需求。通过简单的配置和灵活的插…

【解压即玩】最终幻想7 重制版中文+预购特典+全DLC,难忘的一作

数年前&#xff0c;一家名为神罗的公司&#xff08;起初称为神罗工程所&#xff09;发现了一种深埋于地底的神秘生物遗骸&#xff08;被称为杰诺瓦&#xff09;以及一种名为“魔晄”的能源。这家公司通过将这种生物遗骸浸泡在魔晄中来生产电力&#xff0c;从而迅速崛起成为全球…

一套在线工具管理服务器+DB+Redis+Mongo等

Team IDE是一个基于Web的、集成了多种开发工具和服务&#xff08;MySql、Oracle、金仓、达梦、神通等数据库、SSH、FTP、Redis、Zookeeper、Kafka、Elasticsearch、Mongodb&#xff09;的一体化开发环境&#xff0c;它不仅为开发者提供了便捷的开发体验&#xff0c;还支持团队协…

【文本 >>> 语音】⭐️SpringBoot 结合 jacob 简单实现一个文本朗读功能

目录 &#x1f378;前言 &#x1f37b;一、环境准备 &#x1f37a;二、依赖引入 &#x1f49e;️三、简单启动 &#x1f379;四、接口改造 4.1 封装为一个工具类 4.2 暴露一个接口 4.3 测试 &#x1f331;五、扩展 &#x1f378;前言 小伙伴们大家好&#xff0c;上次…

初学Python如何快速入门(内附详细攻略),一文讲清

目前python可以说是一门非常火爆的编程语言&#xff0c;应用范围也非常的广泛&#xff0c;工资也挺高&#xff0c;未来发展也极好。 Python究竟应该怎么学呢&#xff0c;我自己最初也是从零基础开始学习Python的&#xff0c;给大家分享Python的学习思路和方法。一味的买书看书…