Kafka的安装及接入SpringBoot

news2025/1/19 3:19:05

环境:windows、jdk1.8、springboot2

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/

1.概述

        Kafka 是一种高性能、分布式的消息队列系统,最初由 LinkedIn 公司开发,并于2011年成为 Apache 顶级项目。它设计用于处理大规模的实时数据流,具有高吞吐量、低延迟、持久性等特点,被广泛应用于构建实时数据管道、日志收集、事件驱动架构等场景。

        详细概述见Kafka概述:

1.1 Kafka的作用

  • 发布和订阅记录流
  • 持久存储记录流,Kafka中的数据即使消费后也不会消失
  • 在系统或应用之间构建可靠获取数据的实时流数据管道
  • 构建转换或响应数据流的实时流应用程序
  • Kafka可以处理源源不断产生的数据

1.2 Kafka的一些概念

  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic 就是Rabbitmq中的queue)

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)

  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)

  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

2.Kafka下载安装

Apache KafkaApache Kafka: A Distributed Streaming Platform.icon-default.png?t=N7T8https://kafka.apache.org/downloads        选择最新版就可以

2.1 配置kafka

        解压下载的文件,修改 config 文件夹下的 zookeeper.properties

        修改 config 文件夹下的 server.properties

        当需要外网访问时要配置advertised.listeners(比如连云服务器的kafka)

advertised.listeners=PLAINTEXT://xxx.xxx.xxx.xxx:9092

 

2.2 启动 zookeeper

        Zookeeper 在 Kafka 中充当了分布式协调服务的角色,帮助 Kafka 实现了集群管理、元数据存储、故障恢复、领导者选举等功能,是 Kafka 高可用性、可靠性和分布式特性的重要支撑。

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

zookeeper-server-start.bat ../../config/zookeeper.properties

        可以本地访问看一下:http://localhost:2181/ 

2.3 启动Kafka 

        kafka_2.13-3.7.0\bin\windows文件夹中输入命令:

kafka-server-start.sh ../../config/server.properties

        访问路径: http://localhost:9092/ 

2.4 便捷启动脚本

        两个脚本放到Kafka的目录(kafka_2.13-3.7.0)中

cd bin\windows

zookeeper-server-start.bat ../../config/zookeeper.properties

cd bin\windows

kafka-server-start.bat ../../config/server.properties

3.springboot集成Kafka

3.1 环境搭建

(1)添加pom依赖

<!-- 继承Spring boot工程 -->
<parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.2.8.RELEASE</version>
</parent>
<properties>
    <fastjson.version>1.2.58</fastjson.version>
</properties>
<dependencies>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>${fastjson.version}</version>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

(2)配置类application.yml

        生产者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    producer:
      retries: 0
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

        消费者:

spring:
  kafka:
    bootstrap-servers: xxx.xxx.xxx.xxx:9092
    consumer:
      group-id: kafka-demo-kafka-group
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

(3)启动类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
 
@SpringBootApplication
public class KafkaApp {
    public static void main(String[] args) {
        SpringApplication.run(KafkaApp.class, args);
    }
}

3.2 消息生产者

        junit测试,新建消息发送方

import org.junit.Test;
import org.junit.runner.RunWith;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.test.context.junit4.SpringRunner;
​
​
@RunWith(SpringRunner.class)
@SpringBootTest
public class KafkaSendTest {
    @Autowired
    private KafkaTemplate<String,String> kafkaTemplate; //如果这里有红色波浪线,那是假错误
​
    @Test
    public void sendMsg(){
        String topic = "spring_test";
        kafkaTemplate.send(topic,"hello spring boot kafka!");
        System.out.println("发送成功.");
        while (true){ //保存加载ioc容器
​
        }
    }
}

3.3 消息消费者

        新建监听类:

​
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
​
@Component
public class MyKafkaListener {
​
 //    以下两种方法都行
    
 // 指定监听的主题
//    @KafkaListener(topics = "spring_test")
//    public void receiveMsg(String message){
//        System.out.println("接收到的消息:"+message);
//    }
​
 
    @KafkaListener(topics = "spring_test")
    public void handleMessage(ConsumerRecord<String, String> record) {
        System.out.println("接收到消息,偏移量为: " + record.offset() + " 消息为: " + record.value());
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1667916.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【C/C++】内存分布

本文第一部分主要介绍了程序内存区域的划分以及数据的存储。第二部分有一段代码和一些题目&#xff0c;全面直观得分析了程序中的数组在内存中的存储。 因为不同的数据有不同的存储需求&#xff0c;各区域满足不同的需求&#xff0c;所以程序内存会有区域的划分。 根据需求的不…

第02章 计算机网络概述

2.1 本章目标 了解计算机网络的定义了解计算机网络的功能了解计算机网络的分类了解计算机网络的组成 2.2 计算机网络的定义 2.3 计算机网络的功能 2.4 计算机网络的分类 物理拓扑结构分类&#xff1a;总线型、环型、星型 2.5 计算机网络的组成 网络适配器(NIC)接口规格分类&a…

AI大模型探索之路-训练篇21:Llama2微调实战-LoRA技术微调步骤详解

系列篇章&#x1f4a5; AI大模型探索之路-训练篇1&#xff1a;大语言模型微调基础认知 AI大模型探索之路-训练篇2&#xff1a;大语言模型预训练基础认知 AI大模型探索之路-训练篇3&#xff1a;大语言模型全景解读 AI大模型探索之路-训练篇4&#xff1a;大语言模型训练数据集概…

DRF渲染之异常处理

异常处理 【1 】引言 Django REST Framework 这个就是我们常常说的DRF APIView的dispatch方法&#xff1a; 当请求到达视图时&#xff0c;DRF 的 APIView 类会调用 dispatch 方法来处理请求。在 dispatch 方法中&#xff0c;有一个关键的步骤是处理异常。如果在视图类的方法…

BGP综合大实验

实验要求 1.AS1中存在两个环回&#xff0c;一个地址是192.168.1.0/24&#xff0c;改地址不能在任何协议中宣告&#xff1b;AS3中存在两个环回&#xff0c;一个地址为192.168.2.0/24&#xff0c;该地址不能在任何协议中宣告&#xff0c;最终要求这两个环回可以ping通&#xff1b…

AI绘画动漫转真人详细教程

从小到大&#xff0c;我们看过的动漫、玩过的游戏有很多很多 但我们会发现里面的角色或者人物都是二次元的 我就会好奇这些动漫人物在现实中会长什么样 而现在&#xff0c;我们通过AI绘画竟然就能还原出来他们现实中的样子 除了动漫角色和游戏人物&#xff0c;古代的画像、经典…

K-AI01,K-AO01,K-BUS02和利时

K-AI01,K-AO01,K-BUS02和利时1.将工程师站的计算机开机&#xff1b;2.开机后鼠标双击桌面上的“maintenance tool”图标&#xff0c;K-AI01,K-AO01,K-BUS02和利时。出现如下图标&#xff1a; 按顺序点击”图标中的箭头所指的按钮&#xff0c;出现如下画面选中画面中需要强制的逻…

C++语言题库(三)—— PAT

目录 1. 打印点、圆、圆柱信息 2. 国际贸易统计 3. 设计一个类CRectangle 4. 定义一个时间类 5. 定义一个Date类 6. 定义一个Time类 7. 设计一个People类 8. 平均成绩 9. 计算若干个学生的总成绩及平均成绩 11. 使用面向对象的方法求长方形的周长 1. 打印点、圆、圆柱…

16 华三数据中心最流行的技术 M-LAG

STP和MTP&#xff08;第二十二课&#xff09;-CSDN博客 VRRP技术和浮动路由(第二十六课)_vrrp 浮动路由-CSDN博客 VRRP DHCP ACL NAT 网络核心路由技术综述 (第十课)-CSDN博客 04 交换机的IRF的配置-CSDN博客 1 M-LAG AI介绍 M-LAG&#xff08;Multi-Chassis Link Aggrega…

57 读取/写出/读取 文件的过程的调试

前言 问题来自于文章 请教文件读写问题 请教文件读写问题 - 内核源码-Chinaunix vim 编辑文件, 实际上删除了原有的文件建立了一个新的文件? Ls –ail . 查看 inode 编号不一样了 这里主要是 调试一下 这一系列流程 测试用例 就是一个程序, 读取 1.txt 两次, 两次之间间隔…

数据结构-二叉树-红黑树

一、红黑树的概念 红黑树是一种二叉搜索树&#xff0c;但在每个节点上增加一个存储位表示节点的颜色&#xff0c;可以是Red或者BLACK&#xff0c;通过对任何一条从根到叶子的路径上各个节点着色方式的限制&#xff0c;红黑树确保没有一条路径会比其他路径长出两倍&#xff0c;…

【C++杂货铺铺】AVL树

目录 &#x1f308;前言&#x1f308; &#x1f4c1; 概念 &#x1f4c1; 节点的定义 &#x1f4c1; 插入 &#x1f4c1; 旋转 1 . 新节点插入较高左子树的左侧---左左&#xff1a;右单旋 2. 新节点插入较高右子树的右侧---右右&#xff1a;左单旋 3. 新节点插入较高左…

在vue3中,如何优雅的使用echarts之实现大屏项目

前置知识 效果图 使用技术 Vue3 Echarts Gasp Gasp&#xff1a;是一个 JavaScript动画库,它支持快速开发高性能的 Web 动画。在本项目中&#xff0c;主要是用于做轨迹运动 所需安装的插件 npm i echarts npm i countup.js 数字滚动特效 npm i gsap javascript动画库 np…

全新时代的降临——比亚迪,助力未来出行

近日&#xff0c;世界舞台中央聚焦&#xff0c;比亚迪登上欧洲顶级赛事赞助席位&#xff0c;让全球见证中国新能源汽车传奇崛起&#xff01;作为新能源领袖品牌&#xff0c;比亚迪现已累计销售突破730万辆&#xff0c;全球每售出五辆新能源汽车&#xff0c;便有一辆来自比亚迪。…

vivado Virtex-7 配置存储器器件

Virtex-7 配置存储器器件 下表所示闪存器件支持通过 Vivado 软件对 Virtex -7 器件执行擦除、空白检查、编程和验证等配置操作。 本附录中的表格所列赛灵思系列非易失性存储器将不断保持更新 &#xff0c; 并支持通过 Vivado 软件对其中所列非易失性存储器 进行擦除、…

【中航证券军工】北摩高科2023年报2024Q1点评:聚焦航空及军工主赛道,民机业务有望成为第二曲线

事件 公司4月24日公告&#xff0c;2024Q1实现营收&#xff08;2.40亿元&#xff0c;同比-23.71%)&#xff0c;归母净利润&#xff08;0.73亿元&#xff0c;同比-45.63%)&#xff0c;毛利率&#xff08;62.63%&#xff0c;同比-7.22pcts)&#xff0c;净利率&#xff08;37.34%&…

安装conda并搭建python环境(入门教程)

文章目录 1. 什么是 conda&#xff1f;1.1 Conda 与 Anaconda 的区别1.2 Conda 与 pip 的区别 2. 下载安装3. 配置并使用 conda3.1 配置下载源3.2 环境管理3.2.1 创建&#xff08;删除&#xff09;环境3.2.2 激活&#xff08;切换&#xff09;环境3.2.2 下载&#xff08;卸载&a…

Sping源码(七)—ConfigurationClassPostProcessor ——@PropertySources解析

序言 先来简单回顾一下ConfigurationClassPostProcessor大致的一个处理流程&#xff0c;再来详细的讲解PropertySources注解的处理逻辑。 详细的步骤可参考ConfigurationClassPostProcessor这篇帖子。 流程图 从获取所有BeanDefinition -> 过滤、赋值、遍历 -> 解析 -&…

常用的简单友好的工单系统(免费)- WGCAT

最近在项目中&#xff0c;有工单系统的需求场景&#xff0c;所以想寻找一款轻量简单的运维工单软件&#xff0c;主要用来记录和处理工作中的一些故障、维护&#xff0c;主要用来记录设备的维护状态&#xff0c;包括服务器、主机、交换机那些 WGCAT&#xff0c;是一款简单轻量的…

2024中国(重庆)人工智能展览会8月举办

2024中国(重庆)人工智能展览会8月举办 邀请函 主办单位&#xff1a; 中国航空学会 重庆市南岸区人民政府 招商执行单位&#xff1a; 重庆港华展览有限公司 【报名I59交易会 2351交易会 9466】 展会背景&#xff1a; 2024中国航空科普大会暨第八届全国青少年无人机大赛在…