SpringBoot 集成 Kafka

news2025/1/11 12:56:55

SpringBoot 集成 Kafka

  • 1 安装 Kafka
  • 2 创建 Topic
  • 3 Java 创建 Topic
  • 4 SpringBoot 项目
    • 4.1 pom.xml
    • 4.2 application.yml
    • 4.3 KafkaApplication.java
    • 4.4 CustomizePartitioner.java
    • 4.5 KafkaInitialConfig.java
    • 4.6 SendMessageController.java
  • 5 测试

1 安装 Kafka

Docker 安装 Kafka

2 创建 Topic

创建两个topic:topic1、topic2,其分区和副本数都设置为1 (可以在Java代码中创建)

PS C:\Users\Administrator> docker exec -it kafka /bin/sh

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic1
Created topic topic1.

$ kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic topic2
Created topic topic2.

3 Java 创建 Topic

package com.xu.mq.demo.test.service;

import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig {

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() {
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    }

    @Bean
    public NewTopic textUploadTopic() {
        return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
    }

}

4 SpringBoot 项目

4.1 pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>2.7.8</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<groupId>com.xu</groupId>
	<artifactId>kafka</artifactId>
	<version>0.0.1-SNAPSHOT</version>

	<name>kafka</name>

	<description>Demo project for Spring Boot</description>

	<properties>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>

		<dependency>
			<groupId>cn.hutool</groupId>
			<artifactId>hutool-all</artifactId>
			<version>5.8.12</version>
		</dependency>

		<dependency>
			<groupId>org.apache.kafka</groupId>
			<artifactId>kafka-streams</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.kafka</groupId>
			<artifactId>spring-kafka-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-web</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>

		<dependency>
			<groupId>org.projectlombok</groupId>
			<artifactId>lombok</artifactId>
			<optional>true</optional>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-devtools</artifactId>
			<scope>runtime</scope>
			<optional>true</optional>
		</dependency>

	</dependencies>

	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
				<configuration>
					<excludes>
						<exclude>
							<groupId>org.projectlombok</groupId>
							<artifactId>lombok</artifactId>
						</exclude>
					</excludes>
				</configuration>
			</plugin>
		</plugins>
	</build>

</project>

4.2 application.yml

server:
  port: 8001

spring:
  application:
    name: hello-kafka
  kafka:
    # 以逗号分隔的地址列表,用于建立与Kafka集群的初始连接(kafka 默认的端口号为9092)
    bootstrap-servers: 192.168.1.92:9092
    producer:
      # 发生错误后,消息重发的次数。
      retries: 0
      #当有多个消息需要被发送到同一个分区时,生产者会把它们放在同一个批次里。该参数指定了一个批次可以使用的内存大小,按照字节数计算。
      batch-size: 16384
      # 设置生产者内存缓冲区的大小。
      buffer-memory: 33554432
      # 键的序列化方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 值的序列化方式
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # acks=0 : 生产者在成功写入消息之前不会等待任何来自服务器的响应。
      # acks=1 : 只要集群的首领节点收到消息,生产者就会收到一个来自服务器成功响应。
      # acks=all :只有当所有参与复制的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应。
      acks: all
    consumer:
      # 自动提交的时间间隔 在spring boot 2.X 版本中这里采用的是值的类型为Duration 需要符合特定的格式,如1S,1M,2H,5D
      auto-commit-interval: 1S
      # 该属性指定了消费者在读取一个没有偏移量的分区或者偏移量无效的情况下该作何处理:
      # latest(默认值)在偏移量无效的情况下,消费者将从最新的记录开始读取数据(在消费者启动之后生成的记录)
      # earliest :在偏移量无效的情况下,消费者将从起始位置读取分区的记录
      auto-offset-reset: earliest
      # 是否自动提交偏移量,默认值是true,为了避免出现重复数据和数据丢失,可以把它设置为false,然后手动提交偏移量
      enable-auto-commit: true
      # 键的反序列化方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      # 值的反序列化方式
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    listener:
      # 在侦听器容器中运行的线程数。
      concurrency: 4

4.3 KafkaApplication.java

package com.xu.mq.demo;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.EnableKafka;

/**
 * @author Administrator
 */
@EnableKafka
@SpringBootApplication
public class KafkaApplication {

	public static void main(String[] args) {
		SpringApplication.run(DemoApplication.class, args);
	}

}

4.4 CustomizePartitioner.java

package com.xu.kafka.config;


import java.util.Map;

import org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

/**
 * @author Administrator
 */
public class CustomizePartitioner implements Partitioner {

    /**
     * 自定义分区规则
     *
     * @param topic      The topic name
     * @param key        The key to partition on (or null if no key)
     * @param keyBytes   The serialized key to partition on( or null if no key)
     * @param value      The value to partition on or null
     * @param valueBytes The serialized value to partition on or null
     * @param cluster    The current cluster metadata
     * @return
     */
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        return 0;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }

}

4.5 KafkaInitialConfig.java

package com.xu.kafka.config;


import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import org.apache.kafka.clients.admin.NewTopic;

/**
 * kafka 初始化配置类 创建 Topic
 *
 * @author Administrator
 * @date 2023年2月17日11点30分
 */
@Configuration
public class KafkaInitialConfig {

    public static final String AUDIO_UPLOAD_TOPIC = "AudioUploadTopic";

    public static final String TEXT_UPLOAD_TOPIC = "TextUploadTopic";

    @Bean
    public NewTopic audioUploadTopic() {
        return new NewTopic(AUDIO_UPLOAD_TOPIC, 1, (short) 1);
    }

    @Bean
    public NewTopic textUploadTopic() {
        return new NewTopic(TEXT_UPLOAD_TOPIC, 1, (short) 1);
    }

}

4.6 SendMessageController.java

package com.xu.kafka.message.controller;


import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import com.xu.kafka.config.KafkaInitialConfig;

import cn.hutool.json.JSONUtil;

/**
 * @author Administrator
 */
@RequestMapping(value = "/kafka")
@RestController
public class SendMessageController {

    @Autowired
    private KafkaTemplate template;

    /**
     * KafkaTemplate 发送消息
     *
     * @param message
     */
    @GetMapping("/test1/{message}")
    public void test1(@PathVariable("message") String message) {
        template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message);
    }

    /**
     * KafkaTemplate 发送消息 有回调
     *
     * @param message
     */
    @GetMapping("/test2/{message}")
    public void test2(@PathVariable("message") String message) {
        template.send(KafkaInitialConfig.AUDIO_UPLOAD_TOPIC, message).addCallback(success -> {
            System.out.println("发送成功\t" + success);
        }, fail -> {
            System.out.println("发送失败\t" + fail);
        });
    }
}

5 测试

在这里插入图片描述

发送成功	SendResult [producerRecord=ProducerRecord(topic=AudioUploadTopic, partition=null, headers=RecordHeaders(headers = [], isReadOnly = true), key=null, value=有回调的消息推送111, timestamp=null), recordMetadata=AudioUploadTopic-0@4]

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/351935.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据库原理及应用期末复习汇总(附某高校期末真题试卷)

文章目录《数据库原理及应用》试题1一、选择题&#xff08;共35分&#xff09;二、填空&#xff08;每空1分&#xff0c;共20分&#xff09;三、T-SQL综合题(共35分)四、综合应用题(共10分)《数据库原理及应用》试题2一、选择题&#xff08;共35分&#xff09;二、填空&#xf…

handler解析(5)常见面试题

目录 1.请大致讲下handler的工作原理 2.handler.postDelay原理 3.一个线程有几个Looper?几个Handler&#xff1f; 4. Handler内存泄漏原因&#xff1f;以及解决方案 5.为何主线程可以new Handler如果想要在子线程中new Handler要做些什么准备&#xff1f; 6.消息退出是调…

R语言广义可加模型在空气环境污染方面的应用(1)

粉丝私信我希望复制一篇文章的图片&#xff0c;图片来源于文章&#xff1a;Wu C, Yan Y, Chen X, Gong J, Guo Y, Zhao Y, Yang N, Dai J, Zhang F, Xiang H. Short-term exposure to ambient air pollution and type 2 diabetes mortality: A population-based time series st…

中频采样和IQ采样的比较和转换

一、什么是中频采样&#xff0c;什么是IQ采样 射频接收系统通常使用数字信号处理算法进行信号解调和分析&#xff0c;因此需要使用ADC对信号进行采样。根据采样频率的不同&#xff0c;可以分为射频直接采样、中频采样、IQ采样。射频采样和中频采样只需要一路ADC&#xff0c;采…

搜索引擎ES相关问题

一、什么是倒排索引&#xff1f;有什么好处&#xff1f;索引&#xff1a; 从ID到内容。倒排索引&#xff1a; 从内容到ID。好处&#xff1a; 比较适合做关键字检索。 可以控制数据的总量。提高查询效率。搜索引擎为什么比MySQL查询快&#xff1f; lucence文章 -》 term ->排…

element-ui中el-table点击其他自定义按钮展开table中某一行

element-ui中el-table点击其他自定义按钮展开table中某一行 在日常开发中&#xff0c;我们遇见了会有点击某些按钮&#xff0c;使得表格行展开的需求&#xff0c;这时候去查看文档 element-ui&#xff08;table&#xff09; 这里官方提供了示例为在行最左侧有一个展开合并ico…

JAVA开发测试(jmeter如何测试性能与估算)

对C的业务网站或应用&#xff0c;进行性能测试来评估使用服务器情况是必不可少的一项工作。 一、测试工具&#xff1a; Apache JMeter 可以用于对服务器、网络或对象模拟巨大的负载&#xff0c;来自不同压力类别下测试它们的强度和分析整体性能&#xff0c;是Apache组织开发的…

CCF-CSP真题《202212-1 现值计算》思路+python满分题解

想查看其他题的真题及题解的同学可以前往查看&#xff1a;CCF-CSP真题附题解大全 试题编号&#xff1a;202212-1试题名称&#xff1a;现值计算时间限制&#xff1a;1.0s内存限制&#xff1a;512.0MB问题描述&#xff1a; 问题描述 评估一个长期项目的投资收益&#xff0c;资金的…

中点BH算法对任意斜率的直线扫描转换方法

作者&#xff1a;非妃是公主 专栏&#xff1a;《计算机图形学》 博客地址&#xff1a;https://blog.csdn.net/myf_666 个性签&#xff1a;顺境不惰&#xff0c;逆境不馁&#xff0c;以心制境&#xff0c;万事可成。——曾国藩 文章目录专栏推荐专栏系列文章序一、算法原理二、…

六“元”数智增长模型,企业元宇宙时代的经营新范式

摘要&#xff1a;在中国传统哲学里&#xff0c;“元”表示最基本的、最根本的东西;在企业管理经营中&#xff0c;将“元”解释为企业的核心竞争力或者基础能力;元宇宙下&#xff0c;“元”就代表数智化下的新场景&#xff0c;来支撑企业的各种业务创新。 一、元宇宙下的“元” …

分享IDEA通过插件 【一键自动生成】 在线api接口文档

开发写代码已经很辛苦&#xff0c;相信每个开发人员都不想写接口文档&#xff0c;但是不写又不行。尤其现在开发的项目偏向于前后端分离&#xff0c;在没有接口的情况下&#xff0c;前后端很难对接联调&#xff0c;测试也无法很好的测试。现在IDEA的插件仓库里有款插件&#xf…

qt 内存泄漏处理办法

windows 版本windows msvc版本可以使用vld检测可以得到内存泄漏点的调用堆栈&#xff0c;如果可以的话&#xff0c;还可以得到其所在文件及行号&#xff1b;可以得到泄露内存的完整数据&#xff1b;可以设置内存泄露报告的级别。缺点&#xff1a;1.只针对 Visual C &#xff08…

VUE -- defineExpose

defineExpose定义demo定义 defineExpose定义&#xff1a;用于组件通信中父级组件调用操作子组建方法和响应式属性参数能力 在使用definExpose前需要了解两个拷贝对象函数 对象copy&#xff1a;shallowReactive 与 数据 copy&#xff1a;shallowRef 这两个都是vue包里面的 简…

图片文字识别OCR调研-中文

直接看效果对比 tesseract-ocr 该识别引擎最新版本tesseract4添加了支持神经网络&#xff08;LSTM&#xff09;的&#xff0c;该引擎专注于线条识别&#xff0c; 同时也保留了Tesseract OCR 引擎&#xff0c;该引擎通过识别字符模式来工作。 我们需求端的后台语言是go&#x…

时尚高级实用,零跑C01满足各种用车需求

零跑C01在新能源车市场上销量可观且口碑较好&#xff0c;为什么消费者会相中这个国产车全域自主研发的新能源车呢&#xff1f;下面的介绍会给出答案。就其外观而言&#xff0c;零跑C01的外观定位于中大型轿车&#xff0c;在外观设计上充分考虑到美学观念。零跑给出了七个车身颜…

扬帆优配|日均客运量恢复,民航业加速复苏,外资买入2股超亿元

春运民航客运量康复至疫情前七成。 2月16日&#xff0c;民航局举行2月例行新闻发布会。会上介绍&#xff0c;自1月7日至2月15日&#xff0c;春运40天&#xff0c;民航运送旅客5523万人次&#xff0c;日均客运量138万人次&#xff0c;同比去年春运添加39%&#xff0c;康复至2019…

Lesson5.1---Python 之 NumPy 简介和创建数组

一、NumPy 简介 NumPy&#xff08;Numerical Python&#xff09;是 Python 的一种开源的数值计算扩展。这种工具可用来存储和处理大型矩阵&#xff0c;比 Python 自身的嵌套列表&#xff08;nested list structure&#xff09;结构要高效的多&#xff08;该结构也可以用来表示…

【贰】嵌入式系统的分类

随手拍拍&#x1f481;‍♂️&#x1f4f7; 日期: 2022.08.31 地点: 杭州 介绍: 2022.08.31下午一点&#xff0c;在闷热的学校里实在是待不下去了&#xff0c;跑到了门口的钱塘江边散了一会儿步&#x1f6b6;正值盛夏&#xff0c;八月即将完结&#xff0c;日子越过越快&#x1…

FPGA MAX 10 10M50系列10M50DAF484C8G/10M50DAF484C7G/10M50DCF484C7G规格

介绍MAX 10器件是单芯片、非易失性低成本可编程逻辑器件(pld)&#xff0c;用于集成最优的系统组件集。MAX 10设备的亮点包括:内部存储双配置闪存用户闪存即时支持集成模数转换器(adc)支持Nios II单芯片软核处理器MAX 10设备是系统管理、I/O扩展、通信控制平面、工业、汽车和消费…

ant design vue 组件中经常会出现 label过长被盖住的情况

ant design vue 组件中经常会出现 label过长被盖住的情况&#xff0c;我还特地找了解决方法&#xff1a;当过长时让他换行显示&#xff0c;还写了一篇博客记录&#xff0c;今天同样是写代码&#xff0c;但并没有做特殊的设置&#xff0c;结果却出乎意料的正常&#xff0c;过长自…