kafka-高级篇(下载安装、快速入门、使用场景)

news2025/1/8 5:41:13

在这里插入图片描述

文章目录

    • Kafka介绍
    • Kafka使用场景
    • kafka概述和安装
      • kafka概述
      • kafka安装配置
    • kafka快速入门
      • 创建项目
      • 导入依赖
      • 发送消息
      • 接收消息
    • kafka生产者详解
      • 发送类型
      • 参数详解
    • kafka消费者详解
      • 消息有序性
      • Kafka消息有序性
      • 提交和偏移量

更多相关内容可查看

Kafka介绍

Apache Kafka是一个开源的分布式事件流平台,由LinkedIn公司开发并于2011年贡献给Apache软件基金会。Kafka设计用于处理大规模实时数据,它能够处理每秒数百万条消息,因此被广泛应用于大数据和实时分析领域。

Kafka的主要特点包括:

  1. 高吞吐量:Kafka能够处理每秒数百万条消息,满足大规模数据处理的需求。

  2. 分布式:Kafka通过分布式系统设计,提供数据冗余和容错能力。

  3. 实时性:Kafka能够实时处理数据,适合需要快速响应的场景。

  4. 持久性:Kafka将数据存储在磁盘上,即使系统崩溃,数据也不会丢失。

  5. 可扩展性:Kafka可以通过添加更多的服务器来扩展处理能力。

Kafka使用场景

自媒体用户发布文章成功之后需要进行文章的审核 , 审核通过之后才会发布到APP端供用户查看 , 审核功能因为耗时较久 , 长时间阻塞会影响用户体验 , 而且长时间阻塞会严重影响系统的吞吐量
所以为了实现功能之间的解耦 , 提升用户体验 , 我们可以抽取一个独立的审核服务 , 文章发布成功之后自媒体服务通过MQ通知审核服务进行文章审核 , 如下图所示 :
image.png

为什么要选择使用Kafka作为消息中间件

  • 因为我们后期会使用MQ进行行为数据采集 , 对于消息的吞吐量要求更高
  • 因为后期会进行文章的实时推荐 , 会使用到一些实时流计算技术 , Kafka提供这么一个技术 Kafka Stream , 开发成本和运维成本会更低一些

kafka概述和安装

kafka概述

消息中间件对比

特性ActiveMQRabbitMQRocketMQKafka
开发语言javaerlangjavascala(JVM)
单机吞吐量万级万级10万级100万级
时效性msusmsms级以内
可用性高(主从)高(主从)非常高(分布式)非常高(分布式)
功能特性成熟的产品、较全的文档、各种协议支持好并发能力强、性能好、延迟低MQ功能比较完善,扩展性佳只支持主要的MQ功能,主要应用于大数据领域

消息中间件对比-选择建议

消息中间件建议
Kafka追求高吞吐量,适合产生大量数据的互联网服务的数据收集业务
RocketMQ可靠性要求很高的金融互联网领域,稳定性高,经历了多次阿里双11考验
RabbitMQ性能较好,社区活跃度高,数据量没有那么大,优先选择功能比较完备的RabbitMQ

kafka介绍
Kafka 是一个分布式流媒体平台,类似于消息队列或企业消息传递系统。
kafka官网:http://kafka.apache.org/

image.png

kafka介绍-名词解释
image.png

  • producer:发布消息的对象称之为主题生产者(Kafka topic producer)
  • topic:Kafka将消息分门别类,每一类的消息称之为一个主题(Topic)
  • consumer:订阅消息并处理发布的消息的对象称之为主题消费者(consumers)
  • broker:已发布的消息保存在一组服务器中,称之为Kafka集群。集群中的每一个服务器都是一个代理(Broker)。 消费者可以订阅一个或多个主题(topic),并从Broker拉数据,从而消费这些已发布的消息。

kafka安装配置

可查看文章【Linux】Docker安装kafka教程(保姆篇)

kafka快速入门

  • 生产者发送消息,多个消费者只能有一个消费者接收到消息
  • 生产者发送消息,多个消费者都可以接收到消息

创建项目

image.png
image.png

导入依赖

<dependencies>
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-test</artifactId>
    </dependency>
</dependencies>

发送消息

启动引导类

@SpringBootApplication
public class KafkaProducerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaProducerApplication.class, args);
    }
}

在kafka-producer项目中编写生产者代码发送消息 , 创建application.yml配置文件, 配置Kafka连接信息

spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer

配置消息主题

@Configuration
public class KafkaConfig {

    @Bean
    public NewTopic newTopic(){
        return TopicBuilder.name("topic.my-topic1").build();
    }
}

发送消息到Kafka

package com.heima.kafka;

import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.SuccessCallback;

import java.util.concurrent.ExecutionException;

@SpringBootTest
public class KafkaProducerTest {

    @Autowired
    private KafkaTemplate kafkaTemplate;

    @Test
    public void testSend() throws ExecutionException, InterruptedException {
        kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka !");
    }
}

接收消息

启动引导类

@SpringBootApplication
public class KafkaConsumerApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaConsumerApplication.class, args);
    }
}

在kafka-consumer项目中编写消费者代码接收消息 , 创建application.yml配置文件, 配置Kafka连接信息

spring:
  application:
    name: kafka-consumer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    consumer:
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer

创建监听器类, 监听kafka消息

@Component
public class KafkaConsumerListener {

    @KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
    public void listenTopic1group1(ConsumerRecord<String, String> record) {
        String key = record.key();
        String value = record.value();
        System.out.println("group1中的消费者接收到消息:" + key + " : " + value+"));
    }
}

kafka生产者详解

发送类型

同步发送: 使用send()方法发送,它会返回一个Future对象,调用get()方法进行等待,就可以知道消息是否发送成功,因为如果发送操作失败,kafkaTemplate.send().get()会抛出异常,而不会返回SendResult。如果SendResult被成功返回,那么就意味着消息已经被成功发送到Kafka。

消息偏移量:是Kafka用于标识消息在主题中的位置的一个数字。每个新的消息都会被赋予一个比前一个消息大的偏移量。

@Test
public void testSend() throws ExecutionException, InterruptedException {
    同步发送
    SendResult result = (SendResult) kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka").get();
    //打印发送结果中的消息偏移量。
    System.out.println(result.getRecordMetadata().offset());
}

异步发送 :调用send()方法,并指定一个回调函数,服务器在返回响应时调用函数

@Test
public void testSend() throws ExecutionException, InterruptedException {
    //异步发送
    ListenableFuture future = kafkaTemplate.send("kafka.topic.my-topic1", "kafka", "hello kafka");
    future.addCallback(result -> {
        //消息发送成功执行
        SendResult sendResult = (SendResult) result;
        System.out.println(sendResult.getRecordMetadata().offset());
    }, throwable -> {
        //消息发送失败执行
        System.out.println("发送消息出现异常:" + throwable);
    });

    Thread.sleep(1000);
}

参数详解

签收机制 : acks

代码的配置方式:

spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 10  # 重试次数
      compression-type: gzip  # 消息压缩算法
      batch-size: 16KB  #批量提交的数据大小
      acks: all  # 消息确认机制  0: 不签收 , 1 : leader签收 , all : leader和follower都签收

参数的选择说明

确认机制说明
acks=0生产者在成功写入消息之前不会等待任何来自服务器的响应,消息有丢失的风险,但是速度最快
acks=1(默认值)只要集群首领节点收到消息,生产者就会收到一个来自服务器的成功响应
acks=all只有当所有参与赋值的节点全部收到消息时,生产者才会收到一个来自服务器的成功响应

追求极致的吞吐量和性能使用 acks=0
追求是数据安全, 消息发送不丢失 , acks=all
既要吞吐量也要可靠性 : acks=1 (折中方案)

重试机制 : retries

生产者从服务器收到的错误有可能是临时性错误,在这种情况下,retries参数的值决定了生产者可以重发消息的次数,如果达到这个次数,生产者会放弃重试返回错误,默认情况下,生产者会在每次重试之间等待100ms
代码中配置方式:

spring:
  application:
    name: kafka-producer
  kafka:
    bootstrap-servers: 118.25.197.221:9092
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      retries: 10  # 重试次数

为了提高消息投递的成功率, 可以将重试次数设为一个很大的值 , 例如 : 999999999999999

kafka消费者详解

消息有序性

所谓消息有序性就是保证Kafka消息消费的顺序和发送的顺序保持一致 ,
应用场景 : 比如客户开车出事故了需要保险公司来处理,至少要有以下几个步骤: 报案、查勘定损、立案、收单理算支付、结案等环节,这些环节是严格有序的。保险公司每完成一个环节,需要给中保信(监管保险公司的)推送数据,如果推送顺序有问题,会返回错误,比如上一个环节还没有完成。同样电商行业也是如此,下单、支付、发货都是有序的。

Kafka消息有序性

我们知道Kafka中的每个分区中的数据是有序的,但有序性仅限于当前的分区中。比如我们现在往一个topic中发送消息 , 这个topic有两个分区 , 默认采用轮询策略, 那么这个topic分区0中插入数据 1,3,5,然后在分区1中插入数据2,4,6 , 这时如果消费者想要读取这个topic的数据,他就可能随机从分区0和分区1中读取数据,比如读出结果为1,3,2,5,4,6。这时可以看到读到的数据顺序已经不是插入的顺序了。

方法一 : 一个 Topic 只对应一个 Partion
消息在被追加到 Partition(分区)的时候都会分配一个特定的偏移量(offset)。Kafka 通过偏移量(offset)来保证消息在分区内的顺序性。每次添加消息到 Partition(分区) 的时候都会采用尾加法,如下图所示。Kafka 只能为我们保证 Partition(分区) 中的消息有序,而不能保证 Topic(主题) 中的 Partition(分区) 的有序。
image.png
所以,我们就有一种很简单的保证消息消费顺序的方法:一个 Topic 只对应一个 Partion , 这种方式影响Kafka效率

方法二 : 按键路由
Kafka 中发送 1 条消息的时候,可以指定 topic, partition, key,data(数据) 4 个参数。如果你发送消息的时候指定了 partion 的话,所有消息都会被发送到指定的 partion。并且,同一个 key 的消息可以保证只发送到同一个 partition ! 这样我们就可以为需要保证顺序的消息设置同一个Key , 这样就能保证这组消息都发送到同一个分区中 , 从而保证消息顺序性

@Test
public void sendTopic3() {
    kafkaTemplate.send("test.topic03", "order_1001", "kafka!");
}

提交和偏移量

Kafka会记录每条消息的offset(偏移量) , 消费者可以使用offset来追踪消息在分区的位置 , 所以在Kafka中消息消费采用的是pull模型, 由消费者主动去Kafka Brocker中拉取消息
之前说过Kafka的消费者再均衡机制 : 如果消费者发生崩溃或有新的消费者加入群组,就会触发再均衡 , 例如:
image.png
如果消费者2挂掉以后,会发生再均衡,消费者2负责的分区会被其他消费者进行消费
image.png
再均衡后不可避免会出现一些问题(消息丢失&消息重复消费)

问题一:消息重复消费

如果提交偏移量小于客户端处理的最后一个消息的偏移量,那么处于两个偏移量之间的消息就会被重复处理。
问题二:消息丢失

如果提交的偏移量大于客户端的最后一个消息的偏移量,那么处于两个偏移量之间的消息将会丢失。
如果想要解决这些问题,还要知道目前kafka提交偏移量的方式 , 提交偏移量的方式有两种,分别是自动提交偏移量和手动提交

  • 自动提交偏移量 :当enable.auto.commit被设置为true,提交方式就是让消费者自动提交偏移量,每隔5秒消费者会自动把从poll()方法接收的最大偏移量提交上去
  • 手动提交偏移量 : 当enable.auto.commit被设置为false , 需要程序员手动提交偏移量

手动提交偏移量 : 同步提交
enable.auto.commit设置为false,让应用程序决定何时提交偏移量。使用commitSync()提交偏移量,commitSync()将会提交poll返回的最新的偏移量,所以在处理完所有记录后要确保调用了commitSync()方法。否则还是会有消息丢失的风险。
只要没有发生不可恢复的错误,commitSync()方法会一直尝试直至提交成功,如果提交失败也可以记录到错误日志里。

@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());

    //同步提交偏移量
    consumer.commitSync();  
}

手动提交偏移量 : 异步提交
手动提交有一个缺点,那就是当发起提交调用时应用会阻塞。当然我们可以减少手动提交的频率,但这个会增加消息重复的概率(和自动提交一样)。另外一个解决办法是,使用异步提交的API。

@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());

    //异步提交偏移量
    consumer.commitAsync();
}

手动提交偏移量 : 同步和异步组合提交
异步提交也有个缺点,那就是如果服务器返回提交失败,异步提交不会进行重试。相比较起来,同步提交会进行重试直到成功或者最后抛出异常给应用。异步提交没有实现重试是因为,如果同时存在多个异步提交,进行重试可能会导致位移覆盖。

@KafkaListener(topics = "kafka.topic.my-topic1", groupId = "group1")
public void listenTopic1group1(KafkaConsumer consumer, ConsumerRecord<String, String> record) {
    String key = record.key();
    String value = record.value();
    System.out.println("group1中的消费者接收到消息:" + key + " : " + value+",偏移量:"+record.offset());

    //同步异步, 结合提交
    try {
        consumer.commitAsync();
    } catch (Exception e) {
        e.printStackTrace();
        consumer.commitSync();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1863401.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

BarTender版软件下载及安装教程

​根据行业数据显示强大的配套应用软件甚至能够管理系统安全性、网络打印功能、文档发布、打印作业记录等&#xff0c;为满足不同的需要和预算&#xff0c;BarTender 提供四个版本&#xff0c;每个都拥有卓越的功能和特性。根据软件大数据显示多国语言支持&#xff1a;轻松设计…

web刷题记录

[HDCTF 2023]SearchMaster 打开环境&#xff0c;首先的提示信息就是告诉我们&#xff0c;可以用post传参的方式来传入参数data 首先考虑的还是rce&#xff0c;但是这里发现&#xff0c;不管输入那种命令&#xff0c;它都会直接显示在中间的那一小行里面&#xff0c;而实际的命令…

[分布式网络通讯框架]----RPC通信原理以及protobuf的基本使用

RPC &#xff08;Remote Procedure Call Protocol&#xff09;远程过程调用协议。 RPC特点 RPC协议&#xff1a;目前典型的RPC实现包括&#xff1a;Dubbo、Thrift、GRPC、Hetty等。网络协议和网络IO模型透明&#xff1a;RPC客户端认为自己是在调用本地对象&#xff0c;无需关…

Redis之优惠券秒杀

文章目录 全局ID生成器添加优惠券实现优惠券秒杀下单超卖问题悲观锁和乐观锁相关文章乐观锁执行逻辑乐观锁解决超卖问题 一人一单功能超卖问题相关文章一人一单执行逻辑代码实现集群模式下锁失效 分布式锁基于Redis的分布式锁Redis实现分布式锁流程实现分布式锁初级版本分布式锁…

2024年河北省特岗教师报名流程详细图解

最近有很多学员们问特岗教师具体的报名流程 给大家安排! 特岗报名步骤 第步: 电脑搜索“河北特岗招聘”登录进行注册 第步:注册后重新登录 第步: 根据个人情况选择填写自己的学历 第步:填写个人信息 (需要上传的电子版的照片、普通话证、学历证书、教资证等) 第步:选择岗位报名…

【源码+文档+调试讲解】企业人才引进服务平台

摘 要 随着信息时代的来临&#xff0c;过去的传统管理方式缺点逐渐暴露&#xff0c;对过去的传统管理方式的缺点进行分析&#xff0c;采取计算机方式构建企业人才引进服务平台。本文通过课题背景、课题目的及意义相关技术&#xff0c;提出了一种企业信息、招聘信息、应聘信息等…

敏捷开发笔记(第8章节)--单一职责原则(SRP)

1&#xff1a;PDF上传链接 【免费】敏捷软件开发(原则模式与实践)资源-CSDN文库 这条原则曾经在Tom DeMaro和Meilir Page-Jones的著作中描述过&#xff0c;并称之为内聚性。他们把内聚性定义为&#xff1a;一个模块的组成元素之间的功能相关性。 8.1 单一职责原则&#xff08…

【面试干货】Java中==和equals()的区别

【面试干货】Java中和equals&#xff08;&#xff09;的区别 1、操作符2、equals()方法3、总结 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Java中&#xff0c;和equals()是两个常用的比较操作符和方法&#xff0c;但它们之间的用法和…

制图工具(13)地理数据库初始化工具

一、需求背景 地理数据库库体初始化 作为GIS数据管理者&#xff0c;当你拿到数据库表结构&#xff0c;需要你创建一个数据库&#xff1f; 你需要将几个地理数据库的属性结构进行组合、修改&#xff0c;提供一个库体结构&#xff1f; 将不同作业单位&#xff0c;不同作业人员…

图神经网络实战(15)——SEAL链接预测算法

图神经网络实战&#xff08;15&#xff09;——SEAL链接预测算法 0. 前言1. SEAL 框架1.1 基本原理1.2 算法流程 2. 实现 SEAL 框架2.1 数据预处理2.2 模型构建与训练 小结系列链接 0. 前言 我们已经学习了基于节点嵌入的链接预测算法&#xff0c;这种方法通过学习相关的节点嵌…

【第三方JSON库】org.json.simple用法初探—Java编程【Eclipse平台】【不使用项目管理工具】【不添加依赖解析】

本文将重点介绍&#xff0c;在不使用项目管理工具&#xff0c;不添加依赖解析情况下&#xff0c;【第三方库】JSON.simple库在Java编程的应用。 JSON.simple是一种由纯java开发的开源JSON库&#xff0c;包含在JSON.simple.jar中。它提供了一种简单的方式来处理JSON数据和以JSO…

SQL Server 2022从入门到精通

大家好&#xff0c;我是爱编程的喵喵。双985硕士毕业&#xff0c;现担任全栈工程师一职&#xff0c;热衷于将数据思维应用到工作与生活中。从事机器学习以及相关的前后端开发工作。曾在阿里云、科大讯飞、CCF等比赛获得多次Top名次。现为CSDN博客专家、人工智能领域优质创作者。…

架构是怎样练成的-楼宇监控系统案例

目录 概要 项目背景 原系统设计方案 改进后的设计方案 小结 概要 绝大多数人掌握的架构都是直接学习&#xff0c;慢慢地才能体会到一个架构的好处。架构是一种抽象&#xff0c;是为了复用目的而对代码做的抽象。通过一个项目的改造&#xff0c;理解架构是如何产生的&…

[C++][设计模式][抽象工厂]详细讲解

目录 1.动机2.模式定义3.要点总结4.代码感受1.代码一2.代码二 -- 工厂方法3.代码三 -- 抽象工厂 1.动机 在软件系统中&#xff0c;经常面临着“一系列相互依赖的对象”的创建工作&#xff1b;同时&#xff0c;由于需求的变化&#xff0c;往往存在更多系列对象的创建工作如何应…

【ARM】MDK工程切换高版本的编译器后出现error A1137E报错

【更多软件使用问题请点击亿道电子官方网站】 1、 文档目标 解决工程从Compiler 5切换到Compiler 6进行编译时出现一些非语法问题上的报错。 2、 问题场景 对于一些使用Compiler 5进行编译的工程&#xff0c;要切换到Compiler 6进行编译的时候&#xff0c;原本无任何报错警告…

Redis-哨兵模式-主机宕机-推选新主机的过程

文章目录 1、为哨兵模式准备配置文件2、启动哨兵3、主机6379宕机3.4、查看sentinel控制台日志3.5、查看6380主从信息 4、复活63794.1、再次查看sentinel控制台日志 1、为哨兵模式准备配置文件 [rootlocalhost redis]# ll 总用量 244 drwxr-xr-x. 2 root root 150 12月 6 2…

免费APP分发平台:小猪APP分发如何解决开发者的痛点

你是否曾为自己开发的APP找不到合适的分发平台而烦恼&#xff1f;你是否因为高昂的分发费用而望而却步&#xff1f;放心吧&#xff0c;你并不是一个人。很多开发者都面临同样的问题。但别担心&#xff0c;小猪APP分发来了&#xff0c;它可以帮你解决这些问题。 小猪app封装www…

微软结束将数据中心置于海底的实验

2016 年&#xff0c;微软 宣布了一项名为"纳蒂克项目"&#xff08;Project Natick&#xff09;的实验。基本而言&#xff0c;该项目旨在了解数据中心能否在海洋水下安装和运行。经过多次较小规模的测试运行后&#xff0c;该公司于 2018 年春季在苏格兰海岸外 117 英尺…

《Redis设计与实现》阅读总结-2

第 7 章 压缩列表 1. 概念&#xff1a; 压缩列表是列表键和哈希键的底层实现之一。当一个列表键只包含少量列表项&#xff0c;并且每个列表项是小整数值或长度比较短的字符串&#xff0c;那么Redis就会使用压缩类别来做列表键的底层实现。哈希键里面包含的所有键和值都是最小…

基于ESP8266串口WIFI模块ESP-01S在AP模式(即发射无线信号( WiFi))下实现STC单片机与手机端网路串口助手相互通信功能

基于ESP8266串口WIFI模块ESP-01S在AP模式(即发射无线信号( WiFi))下实现STC单片机与手机端网路串口助手相互通信功能 ESP8266_01S引脚功能图ESP8266_01S原理图ESP8266_01S尺寸图检验工作1、USB-TTL串口工具(推荐使用搭载CP2102芯片的安信可USB-T1串口)与ESP8266_01S WiFi…