SpringBoot-Learning系列之Kafka整合

news2024/12/24 11:06:31

SpringBoot-Learning系列之Kafka整合

本系列是一个独立的SpringBoot学习系列,本着 What Why How 的思想去整合Java开发领域各种组件。

file

  • 消息系统

    • 主要应用场景
      • 流量消峰(秒杀 抢购)、应用解耦(核心业务与非核心业务之间的解耦)
      • 异步处理、顺序处理
      • 实时数据传输管道
      • 异构语言架构系统之间的通信
        • 如 C语言的CS客户端的HIS系统与java语言开发的互联网在线诊疗系统的交互
  • Kafka是什么

    kafka是一个消息队列产品,基于Topic partitions的设计,能达到非常高的消息发送处理性能。是java领域常用的消息队列。

    核心概念:

    • 生产者(Producer) 生产者应用向主题队列中投送消息数据
    • 消费者 (Consumer) 消费者应用从订阅的Kafka的主题队列中获取数据、处理数据等后续操作
    • 主题 (Topic) 可以理解为生产者与消费者交互的桥梁
    • 分区 (Partition) 默认一个主题有一个分区,用户可以设置多个分区。每个分区可以有多个副本(Replica)。分区的作用是,将数据划分为多个小块,提高并发性和可扩展性。每个分区都有一个唯一的标识符,称为分区号。消息按照键(key)来进行分区,相同键的消息会被分配到同一个分区中。分区可以有不同的消费者同时消费。副本的作用是提供数据的冗余和故障恢复。每个分区可以有多个副本,其中一个被称为领导者(leader),其他副本被称为追随者(follower)。领导者负责处理读写请求,而追随者只负责复制领导者的数据。如果领导者宕机或不可用,某个追随者会被选举为新的领导者,保证数据的可用性。
  • windows 安装kafka

    本地环境DockerDeskTop+WSL2,基于Docker方式安装Kafka

    2.8.0后不需要依赖zk了

    • 拉取镜像

      docker pull wurstmeister/zookeeper
      
      docker pull wurstmeister/kafka
      
    • 创建网络

      docker network create kafka-net --driver bridge
      
    • 安装zk

      docker run --net=kafka-net --name zookeeper -p 21810:2181 -d wurstmeister/zookeeper
      
    • 安装kafka

      docker run -d --name kafka --publish 9092:9092 \
      --link zookeeper \
      --env KAFKA_ZOOKEEPER_CONNECT=172.31.192.1:2181 \
      --env KAFKA_ADVERTISED_HOST_NAME=172.31.192.1 \
      --env KAFKA_ADVERTISED_PORT=9092  \
      --volume /etc/localtime:/etc/localtime \
      wurstmeister/kafka:latest
      
    • 测试

      telnet localhost:9092
      
  • SpringBoot集成

    SpringBoot3.1.0+jdk17

    • pom依赖

      								```
      										<?xml version="1.0" encoding="UTF-8"?>
      										<project xmlns="http://maven.apache.org/POM/4.0.0"
      														 xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
      														 xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      												<modelVersion>4.0.0</modelVersion>
      												<parent>
      														<groupId>org.springframework.boot</groupId>
      														<artifactId>spring-boot-starter-parent</artifactId>
      														<version>3.1.0</version>
      														<relativePath/> <!-- lookup parent from repository -->
      												</parent>
      												<groupId>io.github.vino42</groupId>
      												<artifactId>springboot-kafka</artifactId>
      												<version>1.0-SNAPSHOT</version>
      
      												<properties>
      														<java.version>17</java.version>
      														<maven.compiler.source>17</maven.compiler.source>
      														<maven.compiler.target>17</maven.compiler.target>
      														<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
      												</properties>
      
      
      												<dependencies>
      														<dependency>
      																<groupId>org.projectlombok</groupId>
      																<artifactId>lombok</artifactId>
      																<optional>true</optional>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-test</artifactId>
      																<scope>test</scope>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-web</artifactId>
      																<exclusions>
      																		<exclusion>
      																				<groupId>org.springframework.boot</groupId>
      																				<artifactId>spring-boot-starter-logging</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.springframework.boot</groupId>
      																<artifactId>spring-boot-starter-log4j2</artifactId>
      														</dependency>
      														<!--kafka-->
      														<dependency>
      																<groupId>org.springframework.kafka</groupId>
      																<artifactId>spring-kafka</artifactId>
      																<exclusions>
      																		<!--排除掉 自行添加最新的官方clients依赖-->
      																		<exclusion>
      																				<groupId>org.apache.kafka</groupId>
      																				<artifactId>kafka-clients</artifactId>
      																		</exclusion>
      																</exclusions>
      														</dependency>
      														<dependency>
      																<groupId>org.apache.kafka</groupId>
      																<artifactId>kafka-clients</artifactId>
      																<version>3.5.1</version>
      														</dependency>
      														<dependency>
      																<groupId>com.google.code.gson</groupId>
      																<artifactId>gson</artifactId>
      																<version>2.10.1</version>
      														</dependency>
      														<dependency>
      																<groupId>cn.hutool</groupId>
      																<artifactId>hutool-all</artifactId>
      																<version>5.8.21</version>
      														</dependency>
      
      												</dependencies>
      												<build>
      														<plugins>
      																<plugin>
      																		<groupId>org.springframework.boot</groupId>
      																		<artifactId>spring-boot-maven-plugin</artifactId>
      																		<version>3.1.0</version>
      																</plugin>
      														</plugins>
      												</build>
      										</project>
      						```
      
    • 配置

      spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量发送消息的数量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息体的编解码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
      #      MANUAL	poll()拉取一批消息,处理完业务后,手动调用Acknowledgment.acknowledge()先将offset存放到map本地缓存,在下一次poll之前从缓存拿出来批量提交
            #      MANUAL_IMMEDIATE	每处理完业务手动调用Acknowledgment.acknowledge()后立即提交
            #      RECORD	当每一条记录被消费者监听器(ListenerConsumer)处理之后提交
            #      BATCH	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后提交
            #      TIME	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,距离上次提交时间大于TIME时提交
            #      COUNT	当每一批poll()的数据被消费者监听器(ListenerConsumer)处理之后,被处理record数量大于等于COUNT时提交
            #      COUNT_TIME	TIME或COUNT满足其中一个时提交
            ack-mode: manual_immediate
          consumer:
            group-id: test
            # 是否自动提交
            enable-auto-commit: false
            max-poll-records: 100
            #      用于指定消费者在启动时、重置消费偏移量时的行为。
            #      earliest:消费者会将消费偏移量重置为最早的可用偏移量,也就是从最早的消息开始消费。
            #      latest:消费者会将消费偏移量重置为最新的可用偏移量,也就是只消费最新发送的消息。
            #      none:如果找不到已保存的消费偏移量,消费者会抛出一个异常
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      server:
        port: 8888spring:
        kafka:
          bootstrap-servers: 172.31.192.1:9092
          producer:
            retries: 0
            # 每次批量发送消息的数量
            batch-size: 16384
            buffer-memory: 33554432
            # 指定消息key和消息体的编解码方式
            key-serializer: org.apache.kafka.common.serialization.StringSerializer
            value-serializer: org.apache.kafka.common.serialization.StringSerializer
          listener:
            missing-topics-fatal: false
            ack-mode: manual_immediate
          consumer:
            group-id: test
            enable-auto-commit: false
            max-poll-records: 100
            auto-offset-reset: earliest
            auto-commit-interval: 100
            # 指定消息key和消息体的编解码方式
            key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
            properties:
              max.poll.interval.ms: 3600000
      
    • 生产者代码示例

      package io.github.vino42.publiser;
      
      import com.google.gson.Gson;
      import com.google.gson.GsonBuilder;
      import org.springframework.beans.factory.annotation.Autowired;
      import org.springframework.kafka.core.KafkaTemplate;
      import org.springframework.stereotype.Component;
      
      /**
       * =====================================================================================
       *
       * @Created :   2023/8/30 21:29
       * @Compiler :  jdk 17
       * @Author :    VINO
       * @Copyright : VINO
       * @Decription : kafak 消息生产者
       * =====================================================================================
       */
      @Component
      public class KafkaPublishService {
          @Autowired
          KafkaTemplate kafkaTemplate;
      
          /**
           * 这里为了简单 直接发送json字符串
           *
           * @param json
           */
          public void send(String topic, String json) {
              kafkaTemplate.send(topic, json);
          }
      }
      
      
          @RequestMapping("/send")
          public String send() {
              IntStream.range(0, 10000).forEach(d -> {
                  kafkaPublishService.send("test", RandomUtil.randomString(16));
              });
              return "ok";
          }
      
      
    • 消费者

      @Component
      @Slf4j
      public class CustomKafkaListener {
      
          @org.springframework.kafka.annotation.KafkaListener(topics = "test")
          public void listenUser(ConsumerRecord<?, String> record, Acknowledgment acknowledgment) {
              try {
                  String key = String.valueOf(record.key());
                  String body = record.value();
                  log.info("\n=====\ntopic:test,key{},message:{}\n=====\n", key, body);
                  log.info("\n=====\ntopic:test,key{},payLoadJson:{}\n=====\n", key, body);
              } catch (Exception e) {
                  e.printStackTrace();
              } finally {
                  //手动ack
                  acknowledgment.acknowledge();
              }
          }
      }
      

SpringBoot Learning系列 是笔者总结整理的一个SpringBoot学习集合。可以说算是一个SpringBoot学习的大集合。欢迎Star关注。谢谢观看。

file
关注公众号不迷路

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1000125.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

多要素气象站:自动观测、数据可查

随着科技的不断发展&#xff0c;气象监测已经从传统的人工观测转变为自动化观测。多要素气象站作为自动化气象观测的重要组成部分&#xff0c;为天气预报提供了更加精准、实时的数据。 一、多要素气象站的优势 自动化程度高&#xff1a;多要素气象站采用先进的传感器和数据采…

算法AB实验平台进化历程和挑战

1 AB 平台简介 AB 实验平台这几年在互联网公司得到了越来越广泛的应用&#xff0c;采用 AB 实验来评估产品和技术迭代效果也成为主流的业务新功能效果评估方式&#xff0c;数据驱动的文化在这几年得到了不少公司的广泛的认同&#xff0c;通过数据和指标来说明产品效果也得到了…

win11电脑怎么设置定时关机

我们可以给电脑设置一个定时关机的功能&#xff0c;这样当我们有事情而无法在电脑前等待关机的时候就可以让电脑自己关机了&#xff0c;那么win11系统怎么设置定时关机功能呢&#xff0c;这里小编给大家带来win11电脑定时关机的设置方法&#xff0c;还不太清楚的小伙伴快来看一…

行业追踪,2023-09-11

自动复盘 2023-09-11 凡所有相&#xff0c;皆是虚妄。若见诸相非相&#xff0c;即见如来。 k 线图是最好的老师&#xff0c;每天持续发布板块的rps排名&#xff0c;追踪板块&#xff0c;板块来开仓&#xff0c;板块去清仓&#xff0c;丢弃自以为是的想法&#xff0c;板块去留让…

数据库直连提示 No suitable driver found for jdbc:postgresql

背景&#xff1a;我在代码里使用直连的方式在数据库中创建数据库等&#xff0c;由于需要适配各个数据库服务所以我分别兼容了mysql、postgresql、oracal等。但是在使用过程中会出现错误&#xff1a; No suitable driver found for jdbc:postgresql 但是我再使用mysql的直连方式…

JOSEF约瑟 DL-41 DL-42 DL-43 DL-44 导轨式电流继电器 过负荷和短路保护

DL-41导轨式电流继电器是一种常用于电机、变压器和输电线的过负荷和短路保护线路中的起动元件。 DL-41导轨式电流继电器具有体积小、外形直观、安装接线方便等优点&#xff0c;是随着开关柜体不断改进而新研发的配套产品。该继电器采用凸出式固定结构&#xff0c;也可卡装在35…

Unity中Shader使用最简屏幕坐标并且实现屏幕扭曲

文章目录 前言一、在之前写的shader中&#xff0c;用于对屏幕坐标取样的pos是在顶点着色器中完成计算的&#xff0c;然而还有一种更为简洁的方法&#xff0c;就是用顶点着色器中传给片元着色器的pos来给屏幕抓取进行采样原理&#xff1a;在顶点着色器中&#xff0c;o.pos是裁剪…

flv怎么转换成mp3?挑选三个方法给大家

flv怎么转换成mp3&#xff1f;FLV&#xff08;Flash Video&#xff09;是一种被广泛应用于互联网的流行视频格式&#xff0c;然而该格式并非适用于所有设备和媒体播放器。相反&#xff0c;MP3作为数字音频格式&#xff0c;能够将高质量的音频文件压缩成相对较小的大小&#xff…

IDEA使用database

一、导出数据库表结构 右键数据库、表&#xff0c;选择SQL Generator 可以查看多表的创建语句、删除语句、清空语句 1.创建脚本 初始创建脚本 rdbms提供的脚本 definition provided by rdbms server 2.脚本是否关联数据库名称 Qualify objects with schema names: aut…

记录造数据测试接口

一、前言 在java开发中经常需要造数据进行测试接口&#xff0c;这里记录一下常用的通过造数据测试接口的方法。 二、一般的接口传参方式 1、接口的方式最好是使用JSON或者map的方式&#xff0c;这样的好处是传参可以灵活伸缩&#xff0c;返回的结果也最好是JSON或者map的方式…

【Spring面试】六、@Autowired、@Configuration、第三方Bean的配置

文章目录 Q1、如何让自动注入没有找到依赖Bean时不报错&#xff1f;Q2、如何让自动注入找到多个依赖的Bean时不报错&#xff1f;Q3、Autowired注解有什么作用&#xff1f;Q4、Autowired和Resource之间的区别Q5、Autowired注解自动装配的过程是怎样的&#xff1f;Q6、Configurat…

sql server事务隔离别 、 mysql 事务隔离级别、并发性问题

隔离级别和锁 SQL中 mysql 、Oracle 、sql server 等数据库 都是客户端和服务器架构的软件&#xff0c;对于同一个服务器来说&#xff0c;可以有若干个客户端与之连接&#xff0c;每个客户端与服务器连接上之后&#xff0c;就可以称为一个 【会话&#xff08;session&#xff0…

leaflet 加载地图-引入各种地图

leaflet 加载地图-引入各种地图 一、智图地图 1、Geoq.Normal.Gray &#xff08;或 Geoq.Normal.Map&#xff09;灰色 let gaoDeLayer L.tileLayer.chinaProvider(Geoq.Normal.Gray);gaoDeLayer.addTo(this.map);2、Geoq.Normal.Warm 黄色 let gaoDeLayer L.tileLayer.chin…

计算机竞赛 基于计算机视觉的身份证识别系统

0 前言 &#x1f525; 优质竞赛项目系列&#xff0c;今天要分享的是 基于机器视觉的身份证识别系统 该项目较为新颖&#xff0c;适合作为竞赛课题方向&#xff0c;学长非常推荐&#xff01; &#x1f9ff; 更多资料, 项目分享&#xff1a; https://gitee.com/dancheng-sen…

2023年数维杯数学建模A题河流-地下水系统水体污染研究求解全过程文档及程序

2023年数维杯数学建模 A题 河流-地下水系统水体污染研究 原题再现&#xff1a; 河流对地下水有着直接地影响&#xff0c;当河流补给地下水时&#xff0c;河流一旦被污染&#xff0c;容易导致地下水以及紧依河流分布的傍河水源地将受到不同程度的污染&#xff0c;这将严重影响…

安全可靠的文件传输服务助力完成更高效的医疗保健工作(上)

医疗保健工作是关乎人类健康和社会进步的重要领域&#xff0c;需要处理和传输大量医疗数据&#xff0c;如患者资料、医疗图像、化验单、电子病历、诊断建议等。这些数据不仅涉及患者的个人隐私和医疗安全&#xff0c;还关系到医院的运行效率和医疗水平。 因此&#xff0c;如何…

推荐5款同类型中独树一帜的软件

今天要给大家推荐的是5款软件&#xff0c;每个都是同类软件中的个中翘楚&#xff0c;请大家给我高调地使用起来&#xff0c;不用替我藏着掖着。 1.动画演示制作——Focusky ​ Focusky 是一款专业的动画演示制作软件&#xff0c;可以让你用简单直观的方式制作各种折线图、柱状…

智慧工地:让工地可视化、数字化、智能化

智慧工地平台功能包括&#xff1a;劳务管理、施工安全管理、视频监控管理、机械安全管理、危大工程监管、现场物料监管、绿色文明施工、安全隐患排查、施工综合管理、施工质量管理、设备管理、系统管理等模块。 一、项目开发环境 技术架构&#xff1a;微服务 开发语言&#…

win11怎么删除PIN码

现在有很多用户都将电脑更新升级成win11系统&#xff0c;一些小伙伴对新系统的某些操作不是很熟悉&#xff0c;近期就有一部分小伙伴想要知道win11系统如何删除PIN码&#xff0c;这里小编就给大家详细介绍一下win11删除PIN码的教程&#xff0c;有需要的小伙伴快来看一看吧。 w…

解决: 使用html2canvas和print-js打印组件时, 出现空白页

如图所示: 当我利用html2canvas转换成图片后, 然后使用print-js打印多张图片, 会出现空白页 使用html2canvas和print-js打印组件的文章可参考这个: Vue-使用html2canvas和print-js打印组件 解决: 因为是使用html2canvas转换成图片后才打印的, 而图片是行内块级元素, 会有间隙…