Kafka(三)(集成SpringBoot)

news2025/1/13 13:34:46

第三章 Kafka集成 SpringBoot

SpringBoot 是一个在 JavaEE 开发中非常常用的组件。可以用于 Kafka 的生产者,也可以 用于 SpringBoot 的消费者。

在初始化springboot环境的时候要勾选kafka依赖

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

<dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka-test</artifactId>
            <scope>test</scope>
        </dependency>

3.1 SpringBoot 生产者

(1)修改 SpringBoot 核心配置文件 application.propeties, 添加生产者相关信息

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: hadoop102:9092,hadoop103:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test

(2)创建 controller 从浏览器接收数据, 并写入指定的 topic

package cn.jxust.springbootkafka.Controller;/*
 *
 *  @author pengjx
 *
 * */

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class ProducerController {

    @Autowired
    private KafkaTemplate<String,String> stringKafkaTemplate;

    @RequestMapping("/atguigu")
    public String data(String msg){
        stringKafkaTemplate.send("first",msg);
        return "ok";
    }
}

(3)在浏览器中给/atguigu 接口发送数据 http://localhost:8080/atguigu?msg=hello

3.2 SpringBoot 消费者

(1)修改 SpringBoot 核心配置文件 application.propeties

server:
  port: 8080
spring:
  kafka:
    bootstrap-servers: hadoop102:9092,hadoop103:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      group-id: test

(2)创建类消费 Kafka 中指定 topic 的数据

package cn.jxust.springbootkafka.Consumer;/*
 *
 *  @author pengjx
 *
 * */

import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.annotation.KafkaListener;

@Configuration
public class KafkaConsumer {

    @KafkaListener(topics = "first")
    public void getData(String msg){
        System.out.println("消息是:"+msg);
    }

}

3.3 工具类

KafkaProducer
package cn.jxust.springbootkafka.utils;/*
 *
 *  @author pengjx
 *
 * */

import cn.hutool.json.JSONUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
@Slf4j
public class KafkaProducer {

    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    private static final String TOPIC="first";
    public void send(Object object){

        String jsonStr = JSONUtil.toJsonStr(object);
        log.info("准备发送消息为:{}", jsonStr);

        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(TOPIC, jsonStr);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            public void onFailure(Throwable ex) {
                //发送失败的处理
                log.info(TOPIC + " - 生产者 发送消息失败:" + ex.getMessage());
            }

            @Override
            public void onSuccess(SendResult<String, Object> result) {
                //成功的处理
                log.info(TOPIC + " - 生产者 发送消息成功:" + result.toString());

            }
        });


    }


}
KafkaConsumer
package cn.jxust.springbootkafka.utils;/*
 *
 *  @author pengjx
 *
 * */

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

@Component
@Slf4j
public class KafkaConsumer {

    @KafkaListener(topics = "first")
    public void topicTest(ConsumerRecord<?,?> record){
        Optional<?> message = Optional.ofNullable(record.value());
        if(message.isPresent()){
            Object object = message.get();
            log.info("topic.group1 消费了: Topic:" + "first" + ",Message:" + object);
        }
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1448500.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【基础】第K大与第K小数

说明 给定一个长度为N(0< n< 10000)的序列&#xff0c;保证每一个序列中的数字a[i]是正整数 &#xff0c;编程要求求出整个序列中第k大的数字减去第k小的数字的值m&#xff0c;并判断m是否为质数。(0< k< n) 输入数据 第一行为2个数n&#xff0c;k&#xff08;…

步步深入 k8s 使用 pv pvc sc 在 nfs 基础上共享存储

博客原文 文章目录 前言集群环境nfs 环境搭建pod 挂载 nfs架构图 pvc 方式挂载 nfs架构图 storageclass 方式动态申请 pv架构图 参考 前言 持久化卷&#xff08;Persistent Volume, PV&#xff09;允许用户将外部存储映射到集群&#xff0c;而持久化卷申请&#xff08;Persist…

[Java][算法 滑动窗口]Day 02---LeetCode 热题 100---08~09

第一题 无重复字符串的最长子串 思路 其实就是在字符串S中 找到没有重复的最长子串的长度 这道题的难点就是在于如何判断最长并且无重复 首先 最长长度 可以使用变量max记录保存 再者 判断有无重复 最简单的方法就是 暴力遍历法 即对于每次找的子串都再次寻找遍历…

【Django】Django内建用户系统

Django内建用户系统 14.1 Django中的用户认证 Django带有一个用户认证系统系统&#xff0c;它处理用户用户账号、组、权限以及基于cookie的用户会话。 用户可以直接使用Django自带的用户表。 官方文档&#xff1a;https://docs.djangoproject.com/zh-hans/2.2/topics/auth/ …

计算机组成原理 2 数据表示

机器数 研究机器内的数据表示&#xff0c;目的在于组织数据&#xff0c;方便计算机硬件直接使用。 需要考虑&#xff1a; 支持的数据类型&#xff1b; 能表示的数据精度&#xff1b; 是否有利于软件的移植 能表示的数据范围&#xff1b; 存储和处理的代价&#xff1b; ... 真值…

ELAdmin 配置定时任务

定义方法 在自己的 Module 中写个要执行的方法。 比如获取微信公众号的 accessToken&#xff0c;每两个小时更新一次。这种的其实使用 Spring 的 Scheduled 更方便些&#xff0c;此处仅为演示。 package me.zhengjie.mp.task;import com.alibaba.fastjson.JSON; import lombo…

基于Linux的nfs、samba网络服务搭建

我学的Ubuntu&#xff0c;以它为例子 一、nfs(linux <---> linux) 1.1.nfs首先搭建服务端&#xff08;对外共享&#xff09; //安装nfs核心服务 sudo apt update sudo apt install nfs-kernel-server //配置nfs文件(指定共享文件) sudo vim /etc/exports //重启nf…

H5 渐变3D旋转个人主页引导页源码

H5 渐变3D旋转个人主页引导页源码 源码介绍&#xff1a;一款渐变3D旋转个人主页引导页源码&#xff0c;可以做个人主页/旗下网站引导 下载地址&#xff1a; https://www.changyouzuhao.cn/10392.html

第三百四十五回

文章目录 1. 概念介绍2. 方法与功能2.1 基本用法2.2 加密算法 3. 示例代码4. 内容总结 我们在上一章回中介绍了"FlutterCacheManager组件"相关的内容&#xff0c;本章回中将介绍一个加密工具包.闲话休提&#xff0c;让我们一起Talk Flutter吧。 1. 概念介绍 加密主要…

【MySQL进阶之路】通过实操理解 explain 执行计划

欢迎关注公众号&#xff08;通过文章导读关注&#xff1a;【11来了】&#xff09;&#xff0c;及时收到 AI 前沿项目工具及新技术的推送&#xff01; 在我后台回复 「资料」 可领取编程高频电子书&#xff01; 在我后台回复「面试」可领取硬核面试笔记&#xff01; 文章导读地址…

【刷题记录】——2024寒假day9编程题

本系列博客为个人刷题思路分享&#xff0c;有需要借鉴即可。 1.目录大纲&#xff1a; 2.题目链接&#xff1a; T1:LINK T2:LINK 3.详解思路&#xff1a; T1: 思路&#xff1a; /*** Note: The returned array must be malloced, assume caller calls free().*/#include<…

计算机网络——11EMail

EMail 电子邮件&#xff08;EMail&#xff09; 3个主要组成部分 用户代理邮件服务器简单邮件传输协议&#xff1a;SMTP 用户代理 又名“邮件阅读器”撰写、编辑和阅读邮件输入和输出邮件保存在服务器上 邮件服务器 邮箱中管理和维护发送给用户的邮件输出报文队列保持待发…

【成长记录】第一次写博客入csdn榜单了 还是第一.....

庆祝一下第一次拿综合榜榜一 Java内容榜第二 总之谢谢大家支持 小苏会继续努力的 可以看看我的新作 嘻嘻&#x1f601;&#x1f924;&#x1f449;&#x1f3fb;&#x1f448;&#x1f3fb; 谢谢大家

web3知识体系汇总

web3.0知识体系 1.行业发展 2. web3的特点&#xff1a; 1、统一身份认证系统 2、数据确权与授权 3、隐私保护与抗审查 4、去中心化运行 Web3.0思维技术思维✖金融思维✖社群思维✖产业思维”&#xff0c;才能从容理解未来Web3.0时代的大趋势。 3.技术栈 Web3.jsSolidit…

Spring Boot 笔记 005 环境搭建

1.1 创建数据库和表&#xff08;略&#xff09; 2.1 创建Maven工程 2.2 补齐resource文件夹和application.yml文件 2.3 porn.xml中引入web,mybatis,mysql等依赖 2.3.1 引入springboot parent 2.3.2 删除junit 依赖--不能删&#xff0c;删了会报错 2.3.3 引入spring web依赖…

2.14数据结构与算法学习日记

洛谷P1934 封印 题目背景 很久以前&#xff0c;魔界大旱&#xff0c;水井全部干涸&#xff0c;温度也越来越高。为了拯救居民&#xff0c;夜叉族国王龙溟希望能打破神魔之井&#xff0c;进入人界“窃取”水灵珠&#xff0c;以修复大地水脉。可是六界之间皆有封印&#xff0c;…

洛谷_P1059 [NOIP2006 普及组] 明明的随机数_python写法

这道题的关键在于去重和排序&#xff0c;去重可以联想到集合&#xff0c;那排序直接使用sort方法。 n int(input()) data set(map(int,input().split( ))) data list(data) data.sort() print(len(data)) for i in data:print(i,end )

Python编程之旅:从入门到精通

在数字世界的无尽宇宙中&#xff0c;Python无疑是一颗璀璨的明星。其简洁易懂的语法、丰富的库和广泛的应用领域&#xff0c;使得Python成为了众多初学者的首选编程语言。那么&#xff0c;如何学习Python呢&#xff1f;本文将带你一步步踏上Python编程的旅程。 一、入门篇&…

1.Electron初始与安装

这里写目录标题 一、前言二、下载三、简要总结 一、前言 原文以及该系列后续文章请参考&#xff1a;安装Electron 随着前端的不断强盛&#xff0c;现在的前端已经不再满足于网页开发了&#xff0c;而是在尝试能否使用前端的开发逻辑来开发PC端的桌面软件。 即用html、js、css…

软件价值12-射箭游戏

射箭游戏&#xff0c;按空格键发射&#xff0c;打击移动靶&#xff0c;左上角显示成绩状态。 代码&#xff1a; import pygame import sys import random# 初始化Pygame pygame.init()# 设置窗口大小 SCREEN_WIDTH 800 SCREEN_HEIGHT 600 screen pygame.display.set_mode((…