kafka基本概念、springboot整合kafka、kafka常见问题

news2025/1/23 9:20:47

kafka基本概念

Kafka是一种高吞吐量、分布式、基于发布/订阅的消息系统。

基本概念:

  • broker:就是一个kafka服务,可以有多个broker形成集群

  • toptic:每个broker里面可以有若干个toptic(类似于标签,将消息分类)

  • partition:一个toptic里面可以有多个分区,分区是物理存储,消息会被追加到分区log末端

  • 副本:一个分区可以有多个副本(类似于主从复制,副本因子一直在同步主分区的数据,如果主分区宕了,其中副本因子升级为主分区)

  • Zookeeper:保存着集群 broker、 topic、 partition等meta 数据;另外,还负责broker故障发现, partition leader选举,负载均衡等功能

  • Consumer Group:消费者分组,每个Consumer必须属于一个group

  • offset:消息在日志中的位置,可以理解是消息在partition上的偏移量(每个消费者组维护自己对某个分区的offset)

img

kafka的安装部署

1、单机安装部署

kafka依赖zookeeper,而ZooKeeper服务器是用Java创建的,它运行在JVM之上。

所以,安装kafka之前先安装Java、zookeeper。

教程参考

1

2、kafka集群

搭建kafka集群也很简单,如果要搭建三台集群。在一台机子上安装部署zookeeper,然后三台机子都安装部署上kafka,注意每个kafka的【server.properties】不一样, 主要修改以下四个地方:

#节点id
broker.id=0
#配置本机IP和端口
listeners=PLAINTEXT://10.1.0.8:9092
#配置kafka日志记录位置
log.dirs=/data/kafka-logs
#配置kafka连接zookeeper的地址
zookeeper.connect=10.1.0.9:2181

最后每台机器上启动kafka即可【参考】

springboot整合kafka

1、pom依赖

引入spring-kafka依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.3.10.RELEASE</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>
    </dependencies>

2、配置文件

kafka相关配置,也可以写配置类,此处以配置文件示例:

properties版本:

#============== kafka ===================
# 指定kafka 代理地址,可以多个
#spring.kafka.bootstrap-servers=123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
spring.kafka.bootstrap-servers=114.116.115.153:9092
#=============== producer生产者  =======================


spring.kafka.producer.retries=0
# 每次批量发送消息的数量
spring.kafka.producer.batch-size=16384
# 缓存容量
spring.kafka.producer.buffer-memory=33554432

# 指定消息key和消息体的编解码方式
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer

#=============== consumer消费者  =======================
# 指定默认消费者group id,如果消费的时候不指定消费者组,则以这个默认的
spring.kafka.consumer.group-id=test-app

spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
spring.kafka.consumer.auto-commit-interval=100ms

# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer

#spring.kafka.consumer.bootstrap-servers=192.168.8.111:9092
#spring.kafka.consumer.zookeeper.connect=192.168.8.103:2181
#指定tomcat端口
server.port=8063 

yaml版本:

spring:
  # KAFKA
  kafka:
    #指定kafka 代理地址,可以多个
    #bootstrap-servers: 123.xxx.x.xxx:19092,123.xxx.x.xxx:19093,123.xxx.x.xxx:19094
    bootstrap-servers: 114.116.115.153:9092
    #=============== producer生产者配置 =======================
    producer:
      retries: 0
      # 每次批量发送消息的数量
      batch-size: 16384
      # 缓存容量
      buffer-memory: 33554432
      # ָ指定消息key和消息体的编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
    #=============== consumer消费者配置  =======================
    consumer:
      #指定默认消费者的group id
      group-id: test-app
      #earliest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      #latest
      #当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      #none
      #topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: latest
      enable-auto-commit: true
      auto-commit-interval: 100ms
      #指定消费key和消息体的编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

3、controller

package com.tzq.test.controller;

import com.tzq.test.utils.KafkaUtil;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
@RequestMapping("/kafka")
public class KafkaController {

    @Autowired
    KafkaUtil kafkaUtil;


    /**
     * 向kafka发送消息
     * @return
     */
    @GetMapping("/sendMsg")
    public String sendMessageToKafka(@RequestParam String topic,@RequestParam String key,@RequestParam String msg) {
        //kakfa的推送消息方法有多种,可以采取带有任务key的,也可以采取不带有的(不带时默认为null)
        String tetTopic = "testTopic";
        kafkaUtil.send(topic, key, msg);
        return "hi guy!";
    }


}

4、kafka核心工具类

package com.tzq.test.utils;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class KafkaUtil {

    @Autowired
    private KafkaTemplate<String, Object> kafkaTemplate;
    private final Logger logger = LoggerFactory.getLogger(KafkaUtil.class);


    /**
     * 向卡夫卡发送消息
     * @param topic 主题
     * @param taskid key
     * @param jsonStr 消息字符串
     */
    public void send(String topic, String taskid, String jsonStr) {
        //发送消息
        ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send(topic, taskid, jsonStr);
        future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
            @Override
            //推送成功
            public void onSuccess(SendResult<String, Object> result) {
                logger.info(topic + " 生产者 发送消息成功:" + result.toString());
            }
            @Override
            //推送失败
            public void onFailure(Throwable ex) {
                logger.info(topic + " 生产者 发送消息失败:" + ex.getMessage());
            }
        });
    }


    /**
     * 消费kafka里面的消息
     * @param record
     */
    //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
    @KafkaListener(topics = {"testTopic2"},groupId = "testGroup1")
    public void receive(ConsumerRecord<?, ?> record){
        logger.info("消费得到的消息1---key: " + record.key());
        logger.info("消费得到的消息1---value: " + record.value().toString());
    }

    /**
     * 消费kafka里面的消息
     * @param record
     */
    //下面的主题是一个数组,可以同时订阅多主题,只需按数组格式即可,也就是用“,”隔开
    @KafkaListener(topics = {"testTopic2"},groupId = "testGroup2")
    public void receive2(ConsumerRecord<?, ?> record){
        logger.info("消费得到的消息2---key: " + record.key());
        logger.info("消费得到的消息2---value: " + record.value().toString());
    }

}

@KafkaListener(topics = {“testTopic2”},groupId = “testGroup1”)

可以指定监听的toptic(可同时监听多个),可以设置guoupid

FAQ

1、kafka的消息被消费后会删除吗?

rabbitmq的消息消费后会被删除,而kafka的消息消费后不会删除,每个消费者组记录并维护自己对订阅toptic的分区里消息的offset,这样就可以知道自己读到了哪位置的数据了。kafka的消息只有到过期时间或者磁盘满时才会被删除。

image-20230129150425426

2、kafka如何保证消息的有序性

kafka只能保证一个分区内是有序的,不能保证整个toptic有序。

如果想要保证有序,方案:

  1. topic内只建一个分区
  2. 将有顺序的消息发送时设置的key要一样,这样经过hash算法,确保相同key的消息一定会存到同一个分区内

3、一条消息知道要被发送到哪个分区?

默认情况下,Kafka根据传递消息的key来进行分区的分配,即hash(key) % numPartitions。

key为null时,会从缓存中取分区id或者随机取一个

4、单播和广播

单播:一个消费者组订阅toptic

广播:多个消费者组订阅同一个toptic,每个消费者组都会消费这个toptic的消息(因为每个消费者组都会记录并维护自己对订阅toptic的分区里消息的offset,这样不同消费者组对同一个toptic消息的消费就互不影响了)

5、消费者组内的消费者数量最大为多少?

消费者组内的消费者数量建议小于toptic的分区数量,因为toptic中的一个分区只能被同一个消费者组内某一个消费者消费,如果消费者数量大于分区数量,则一定有消费者空闲了!

6、一个消费者组可以订阅多个toptic吗?

一个消费者组可以订阅多个toptic!

反之亦可,一个toptic可以被多个消费者组订阅,即实现了广播的功能!

7、分区的副本数目最大多少?

分区的副本数目最大按机器数量来,即broker数量

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/185840.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

从Bug中学习--Bug根因分析法

从Bug中学习--Bug根因分析法 目录&#xff1a;导读 1、认识Bug 2、Bug的发现 3、Bug的产生 4、Bug的改进 5、总结 一提起测试&#xff0c;大多数人很容易就会联想到Bug。的确&#xff0c;测试的日常工作离不开Bug&#xff0c;测试工作很重要的一部分就是发现Bug。但是&#xf…

Coraza:一款功能强大的企业级OWASP Web应用程序防火墙

关于Coraza Coraza是一款功能强大的企业级OWASP Web应用程序防火墙框架&#xff0c;该工具基于Golang开发&#xff0c;不仅支持Modsecurity的Seclang语言&#xff0c;而且能够100%兼容OWASP核心规则集。 该工具完全开源&#xff0c;任何开发人员都可以根据自己的需求轻松完成…

GPDB中AOCO列存页的checksum

GPDB中AOCO列存页的checksum为了保证数据的正确性&#xff0c;AOCO列存页采用CRC32即循环冗余校验算法来进行校验。首先看下页结构。页类型有四种&#xff1a;AOSmallContentHeader、AOLargeContentHeader、AONonBulkDenseContentHeader和AOBulkDenseContentHeader。页头的大小…

【初阶数据结构】——详解几个常见的经典排序算法

文章目录1. 排序的概念及其运用1.1 排序的概念1.2 排序的应用1.3 常见的排序算法2. 插入排序2.1 直接插入排序算法思想举例&#xff08;升序&#xff09;代码实现直接插入排序特性总结2.2 希尔排序( 缩小增量排序 )算法思想代码实现希尔排序特性总结3. 选择排序3.1 直接选择排序…

Hadoop基础之《(7)—Hadoop三种运行模式》

一、hadoop有三种运行模式 1、本地模式 数据存储在linux本地&#xff0c;不用 2、伪分布式集群 数据存储在HDFS&#xff0c;测试用 3、完全分布式集群 数据存储在HDFS&#xff0c;同时多台服务器工作。企业大量使用 二、单机运行 单机运行就是直接执行hadoop命令 1、例子-…

AntV结合Vue实现导出图片功能

一、业务场景&#xff1a; AntV 组织图操作完毕以后&#xff0c;需要点击按钮将画布以图片的形式导出 二、问题描述&#xff1a; 官网上有4个方法&#xff0c;我用的是 graph.toFullDataURL(callback, type, imageConfig) 三、具体实现步骤&#xff1a; &#xff08;1&#x…

Three.js纹理投影简明教程

纹理投影是一种将纹理映射到 3D 对象并使其看起来像是从单个点投影的方法。 把它想象成投射到云上的蝙蝠侠符号&#xff0c;云是我们的对象&#xff0c;蝙蝠侠符号是我们的纹理。 它用于游戏和视觉效果&#xff0c;以及创意世界的更多部分。 工具&#xff1a;使用 NSDT场景编辑…

Linux 入门教程||Linux 简介||Linux 安装

Linux 简介 Linux内核最初只是由芬兰人李纳斯托瓦兹&#xff08;Linus Torvalds&#xff09;在赫尔辛基大学上学时出于个人爱好而编写的。 Linux是一套免费使用和自由传播的类Unix操作系统&#xff0c;是一个基于POSIX和UNIX的多用户、多任务、支持多线程和多CPU的操作系统。…

pdf文件怎么压缩?pdf文件变小的简单方法

工作中&#xff0c;pdf文件的使用是非常广泛的&#xff0c;一些特殊的场景下对于pdf文件的大小是有着严格规定的&#xff0c;所以pdf文件压缩成了必备的一项技能&#xff0c;那么怎么将pdf压缩&#xff08;https://www.yasuotu.com/pdfyasuo&#xff09;呢&#xff1f;下面介绍…

一个完整的渗透学习路线是怎样的?如何成为安全渗透工程师?

前言 1/我是如何学习黑客和渗透&#xff1f; 我是如何学习黑客和渗透测试的&#xff0c;在这里&#xff0c;我就把我的学习路线写一下&#xff0c;让新手和小白们不再迷茫&#xff0c;少走弯路&#xff0c;拒绝时间上的浪费&#xff01; 2/学习常见渗透工具的使用 注意&…

2023年江苏建筑安全员精选真题题库及答案

百分百题库提供建筑安全员考试试题、安全员证考试真题、安全员证考试题库等,提供在线做题刷题&#xff0c;在线模拟考试&#xff0c;助你考试轻松过关。 250.施工升降机防坠安全器在装机使用时,应按吊笼额定载重量进行坠落试验,以后至少()个月应进行一次额定载重量的坠落试验 …

辨析Web Service, SOAP, REST, OData之间的关系与区别

最近发现&#xff0c;对于刚刚接触HTTP服务的同学&#xff0c;在一些基础概念上容易混乱。很多同学搞不清楚Web Service&#xff0c;SOAP&#xff0c;REST以及OData这些技术之间的关系与区别。文本会尽量用最简洁的方式&#xff0c;解释这几个概念&#xff0c;并附上一些资料的…

第一章:在Mac OS上安装Go语言开发包

各位朋友们大家好&#xff01; 本节主要为大家讲解如何在Mac OS上安装Go语言开发包&#xff0c;大家可以在Go语言官网下载对应版本的的安装包&#xff0c;如下图所示。 安装Go语言开发包 Mac OS 的Go语言开发包是 .pkg 格式的&#xff0c;双击我们下载的安装包即可开始安装。…

I.MX6ULL内核开发1:内核模块实验

目录 一、实验环境 二、编译4.19.35版内核 1、下载linux内核源码 2、安装必要的环境工具库 3、一键编译内核 4、获取编译出来的内核相关文件&#xff08;与makefile文件一致&#xff09; 三、内核模块代码分析 1、内核模块头文件 2、内核模块打印函数 3、文中语法分析…

filter滤镜实现网页置灰(纪念日)效果

目录 前言关键代码兼容ie的做法定位错乱的原因 前言 一些特殊纪念日的时候&#xff0c;很多网站的首页进行置灰处理。这种效果实际上是用滤镜filter实现的&#xff0c;几行css就可以实现。 在实现整个页面置灰的过程中&#xff0c;要注意页面中有定位的元素&#xff0c;就需…

java中 == 和 equels

1、 和 equals的区别 是操作符 操作符专门用来比较变量的值是否相同。对于基本类型变量来说&#xff0c;只能使用 &#xff0c;因为基本类型的变量没有方法。使用比较是值比较。对于引用类型的变量来说&#xff0c;比较的两个引用对象的地址是否相等。 equals 是方法 equals方…

Linux kdump配置步骤和注意事项(基于debian、OpenEuler和自定义编译内核的Linux)

1、kdump简单描述 kdump是Linux中的一个内核转储机制&#xff0c;主要用于当Linux内核发生崩溃时&#xff0c;将该内核相关的信息和崩溃原因通过转储的形式保留下来&#xff0c;在debian系统中&#xff0c;相关信息会存储在dump文件中&#xff0c;在OpenEuler和CentOS等系统中…

utils:crypto-js的基本使用和(加密/解密)功能封装

目录一、基本使用1. 资源下载2. 目录结构3. 代码二、功能封装1. 封装代码2. 使用说明(1) 引入utils工具(2) 使用工具一、基本使用 1. 资源下载 crypto-js 2. 目录结构 3. 代码 <html xmlns"http://www.w3.org/1999/xhtml"> <head> <meta http-equ…

【Linux】磁盘结构/文件系统/软硬链接/动静态库

文章目录前言一、磁盘结构1、磁盘的物理结构2、磁盘的存储结构3、磁盘的逻辑结构二、文件系统1、对 IO 单位的优化2、磁盘分区与分组3、对分组的具体管理方法4、文件操作三、软硬链接1、理解硬链接2、理解软链接3、理解 . 和 ..四、静动态库1、什么是动静态库2、动静态库的制作…

JavaScript 中的字符串

JavaScript 字符串 什么是字符串&#xff1f; 字符串是用来存储和处理文本数据的 比如&#xff1a;一句话、a等 字符串的创建方式 字面量方式进行创建 <!DOCTYPE html> <html lang"en"><head><meta charset"UTF-8" /><meta…