Kafka:Kafka详解

news2024/9/21 8:01:24

Kafka

消息中间件

区别于rabbitmq,kafka更适用于量级较大的数据(100w级),主要在大数据领域使用

Kafka介绍

一个分布式流媒体平台,类似于消息队列或企业消息传递系统

Kafak的结构如下

在这里插入图片描述

producer:发布消息的对象
topic:Kafak将消息分门别类,每类的消息称为一个主题(Topic)
consumer:订阅主题并处理发布的消息的对象称为主题消费者
broker:已发布的消息保存在一组服务器中,称为kafka集群,集群中的每一个服务器都是一个代理(broker)

消费者可以订阅一个或者多个的主题,并从broker中拉取数据,从而消费这些已发布的消息.

Kafka对zookeeper强依赖,需要使用zk进行分区的负载均衡以及节点的注册
docker安装zk和kafka
docker pull zookeeper:3.4.14
docker run -d --name zookeeper --restart=always -p 2181:2181 zookeeper:3.4.14
docker pull wurstmeister/kafka:2.12-2.3.1
docker run -d --name kafka \
--env KAFKA_ADVERTISED_HOST_NAME=192.168.200.130 \
--env KAFKA_ZOOKEEPER_CONNECT=192.168.200.130:2181 \
--env KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://192.168.200.130:9092 \
--env KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:9092 \
--env KAFKA_HEAP_OPTS="-Xmx256M -Xms256M" \
--restart=always \
--net=host wurstmeister/kafka:2.12-2.3.1

Kafka入门

kafka的生产者消费者模型分为单播和多播

单播:同组内存在多个消费者,一个主题只能发送消息到同一个组内的一个消费者
多播:不同组的多个消费者,一个主题可以发送消息到不同组的多个不同消费者
<dependency>    
	<groupId>org.apache.kafka</groupId>    
	<artifactId>kafka-clients</artifactId>
</dependency>

Kafka的高可用设计(集群模式)

多个broker组成一个cluster集群

集群中的某一条机器宕机,其他机器上的broker依然能对外提供服务

Kafka的备份机制(republication)

消息的备份称为副本(replica)

分为两种副本

领导者副本(leader replica)
追随者副本(follower replica) 分为ISR 和 普通

同步方式:

领导者副本直接接收发布者的消息

随后领导者副本会同步将消息复制保存到ISR follower副本,异步将消息复制保存到普通follower副本

如果leader失效,需要选举出新的leader

依据以下原则选举:

选举优先从ISR中选举,因为ISR副本是同步的

如果ISR中没有生效的副本,就从普通中选举一个
在这里插入图片描述

如果所有副本都失效

可以等待ISR副本活过来保证数据完整性,也可以选举第一个活过来的保证数据可用性

Kafka的生产者详解

消息发送:

分为同步发送和异步发送
同步发送:
ProducerRecord<String,String> record = new ProducerRecord<String, String>("itheima-topic","100001","hello kafka");
RecordMetadata recordMetadata = producer.send(ProducerRecord).get();//获取发送的结果
异步发送:

传入一个callback对象处理回调结果

//异步消息发送
producer.send(ProducerRecord, new Callback() {
    @Override
    public void onCompletion(RecordMetadata recordMetadata, Exception e) {
        if(e != null){
            System.out.println("记录异常信息到日志表中");
        }
        System.out.println(recordMetadata.offset());
    }
});

生产者配置

消息确认配置acks

//ack配置  消息确认机制
prop.put(ProducerConfig.ACKS_CONFIG,"all");
acks = 0 //生产者不会等待任何来自服务器的响应
acks = 1 //集群leader收到消息后,生产者会接受一个来自服务器的响应
acks = -1/all(默认) //参与复制的所有生产者全部收到消息后,生产者才会收到响应

重试次数retries

//重试次数
prop.put(ProducerConfig.RETRIES_CONFIG,10);

消息压缩方式

消息默认不会被压缩

//数据压缩
prop.put(ProducerConfig.COMPRESSION_TYPE_CONFIG,"lz4");
| snappy | 占用较少的  CPU,  却能提供较好的性能和相当可观的压缩比, 如果看重性能和网络带宽,建议采用 |
| lz4    | 占用较少的 CPU, 压缩和解压缩速度较快,压缩比也很客观        |
| gzip   | 占用较多的  CPU,但会提供更高的压缩比,网络带宽有限,可以使用这种算法 |

Kafka消费者详解

消费者组:一个或者多个消费者组成的群体

topic会分发消息给每个消费者组中的一个消费者

消息有序性

跨分区的消息无法决定先后顺序

如果需要保证消息的有序性需要让多个消费者去监听同一个分区

提交和偏移量

消费者会在消费前或者消费后向__consumer_offset的特殊topic中发送偏移量,记录每个消息的处理进度

如果消费者崩溃或者新的消费者加入就会触发负载均衡

如果使用默认方式自动提交偏移量,就会在消费前直接自动提交偏移量,可能会出现重复消费或者漏消费的情况

所以我们需要使用手动提交的方式提交偏移量

手动提交(同步,会自动重试):

consumer.commitSync()

异步提交:

consumer.commitAsync(new OffsetCommitCallback() {
        @Override
        public void onComplete(Map<TopicPartition, OffsetAndMetadata> map, Exception e) {
            if(e!=null){
                System.out.println("记录错误的提交偏移量:"+ map+",异常信息"+e);
            }
        }
    });

Spring继承Kafka

引入依赖
<!-- kafkfa -->
    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
        <exclusions>
            <exclusion>
                <groupId>org.apache.kafka</groupId>
                <artifactId>kafka-clients</artifactId>
            </exclusion>
        </exclusions>
    </dependency>
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
    </dependency>
Topic的创建和删改(可省略)
public static void createTopic(String topicName, int partitions, short replicas) throws Exception {
        NewTopic newTopic = new NewTopic(topicName, partitions, replicas);
        CreateTopicsResult topics = adminClient.createTopics(Collections.singleton(newTopic));
        topics.all().get();
        log.info("【{}】topic创建成功", topicName);
    }

    /**
     * @Title deleteTopic
     * @Description 删除topic
     * @param topicName  topic名称
     * @return void
     */
    public static void deleteTopic(String topicName) throws Exception {
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Collections.singleton(topicName));
        deleteTopicsResult.all().get();
        log.info("【{}】topic删除成功", topicName);

    }

    /**
     * @Title updateTopicRetention
     * @Description 修改topic的过期时间
     * @param topicName  topic名称
     * @param ms  过期时间(毫秒值)
     * @return void
     */
    public static void updateTopicRetention(String topicName, String ms) throws Exception {
        ConfigResource resource = new ConfigResource(ConfigResource.Type.TOPIC, topicName);
        ConfigEntry configEntry = new ConfigEntry(TopicConfig.RETENTION_MS_CONFIG, ms);
        Config config = new Config(Collections.singleton(configEntry));
        // 创建AlterConfigsOptions
        AlterConfigsOptions alterConfigsOptions = new AlterConfigsOptions().timeoutMs(10000);
        // 执行修改操作
        adminClient.alterConfigs(Collections.singletonMap(resource, config), alterConfigsOptions).all().get();
        log.info("【{}】topic过期时间设置完成,过期时间为:{}毫秒", topicName, ms);
    }
生产者发送消息
@Resource
private KafkaTemplate<String,String> kafkaTemplate;

kafkaTemplate.send("topic","test");
消费者消费消息
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;
import org.springframework.util.StringUtils;

@Component
public class Listener {

    @KafkaListener(topics = "topic")
    public void onMessage(String message){
        if(!StringUtils.isEmpty(message)){
            System.out.println(message);
        }

    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1927817.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

《0基础》学习Python——第十一讲__时间函数

一、时间函数是Python中的内置函数和模块&#xff0c;用于处理日期和时间相关的操作。以下是常用的时间函数的种类和用法&#xff1a; 1、time.time()&#xff1a;返回当前时间的时间戳。 时间戳&#xff08;timestamp&#xff09;是一种表示日期和时间的方式&#xff0c;它是一…

高频面试题基本总结回顾4(含笔试高频算法整理)

目录 一、基本面试流程回顾 二、基本高频算法题展示 三、基本面试题总结回顾 &#xff08;一&#xff09;Java高频面试题整理 &#xff08;二&#xff09;JVM相关面试问题整理 &#xff08;三&#xff09;MySQL相关面试问题整理 &#xff08;四&#xff09;Redis相关面试…

使用 SSH 通过 VS Code 连接企业服务器并拉取 Git 仓库代码的指南

文章目录 前言一、SSH 是什么&#xff1f;1.1 SSH 的主要特性和用途1.2 SSH 的工作原理 二、 为什么使用 SSH 而不是 HTTPS三、使用步骤3.1 生成 SSH 密钥3.2 配置 VS Code 远程连接3.3 通过 SSH 克隆 Git 仓库3.4 安装必要的组件 总结 前言 在现代软件开发中&#xff0c;远程…

Sentinel-1 Level 1数据处理的详细算法定义(四)

《Sentinel-1 Level 1数据处理的详细算法定义》文档定义和描述了Sentinel-1实现的Level 1处理算法和方程,以便生成Level 1产品。这些算法适用于Sentinel-1的Stripmap、Interferometric Wide-swath (IW)、Extra-wide-swath (EW)和Wave模式。 今天介绍的内容如下: Sentinel-1 L…

鸿蒙语言基础类库:【@ohos.data.storage (轻量级存储)】

轻量级存储 轻量级存储为应用提供key-value键值型的文件数据处理能力&#xff0c;支持应用对数据进行轻量级存储及查询。数据存储形式为键值对&#xff0c;键的类型为字符串型&#xff0c;值的存储数据类型包括数字型、字符型、布尔型。 说明&#xff1a; 开发前请熟悉鸿蒙开发…

红色文化3D虚拟数字展馆搭建意义深远

在房地产与土地市场的浪潮中&#xff0c;无论是新城规划、乡村振兴&#xff0c;还是商圈建设&#xff0c;借助VR全景制作、虚拟现实和web3d开发技术打造的全链条无缝VR看房新体验。不仅极大提升了带看与成交的转化率&#xff0c;更让购房者足不出户&#xff0c;即可享受身临其境…

前端Vue组件化实践:自定义轮播图组件的探索与应用

在前端开发领域&#xff0c;随着业务逻辑的不断丰富和系统规模的日益扩大&#xff0c;传统的开发方式逐渐暴露出种种弊端。其中&#xff0c;最突出的问题之一便是修改一个小的功能或细节可能导致整个系统的逻辑调整&#xff0c;造成开发效率低下和维护困难。为了应对这些挑战&a…

部署大语言模型并对话

随着人工智能技术的飞速发展&#xff0c;大语言模型&#xff08;Large Language Models, LLMs&#xff09;因其强大的语言理解和生成能力而备受关注。OpenWebUI &#xff0c;原名 Ollama WebUI &#xff0c;是一款专为大语言模型&#xff08;LLM&#xff09;设计的先进 Web 交互…

pdf文件怎么转换为jpg图片?这几种转换方法操作起来很简单!

pdf文件怎么转换为jpg图片&#xff1f;在数字化洪流席卷职场的当下&#xff0c;PDF文档虽一度稳坐信息传输与储存的宝座&#xff0c;却逐渐显露出其在效率与便捷性追求中的疲态&#xff0c;随着技术疆界的不断拓宽&#xff0c;我们愈发深刻地意识到&#xff0c;PDF那复杂的格式…

Python array的特点及使用

1、Python array的特点及使用 1.1、python array为什么只能接收指定类型数据 array 模块提供了一种叫做 array 的数据结构&#xff0c;它表示一块连续的内存空间&#xff0c;所有的元素必须是相同的类型。这是因为在内存中&#xff0c;数组元素存储在连续的位置上&#xff0c…

【256 Days】我的创作纪念日

目录 &#x1f33c;01 机缘 &#x1f33c;02 收获 &#x1f33c;03 日常 &#x1f33c;04 成就 &#x1f33c;05 憧憬 最近收到官方来信&#xff0c; 突然发现&#xff0c;不知不觉间&#xff0c;距离发布的第一篇博客已过256天&#xff0c;这期间我经历了春秋招、毕业答辩…

Type-C PD芯片:引领充电技术的新纪元

随着科技的飞速发展&#xff0c;人们对电子设备的依赖日益加深&#xff0c;对充电速度、效率和安全性的要求也越来越高。在这样的背景下&#xff0c;Type-C PD&#xff08;Power Delivery&#xff09;芯片应运而生&#xff0c;以其高效、安全、智能的特点&#xff0c;成为了充电…

gorm多表联合查询 Joins方法 LEFT JOIN , RIGHT JOIN , INNER JOIN, FULL JOIN 使用总结

gorm中多表联合查询&#xff0c;我们可以使用Joins来完成&#xff0c;这个Joins方法很灵活&#xff0c;我们可以非常方便的多多表进行联合查询&#xff0c; 我们先来看看这个方法的官方定义和使用示例&#xff1a; Joins方法定义和使用示例 当然我们这里要说的使用方式是官方示…

网络运输层之(2)UDP协议

网络运输层之(2)UDP协议 Author: Once Day Date: 2024年7月14日 一位热衷于Linux学习和开发的菜鸟&#xff0c;试图谱写一场冒险之旅&#xff0c;也许终点只是一场白日梦… 漫漫长路&#xff0c;有人对你微笑过嘛… 全系列文章可参考专栏: 通信网络技术_Once-Day的博客-CSDN…

SQL server 练习题2

课后作业 作业 1&#xff1a;自己查找方法&#xff0c;将 homework_1.xls 文件数据导入到 SQLServer 的 homework 数据库中。数据导入完成后&#xff0c;把表名统一改为&#xff1a;外卖表 如下所示&#xff1a; 作业 2&#xff1a;找出所有在 2020 年 5 月 1 日至 5 月 31 …

离散数学,自反和反自反 ,对称和反对称,传递关系 ,复合关系和逆关系 ,关系的闭包

目录 1.自反和反自反 自反性 反自反性 判断关系是自反或是反自反 2.对称和反对称 对称性 反对称性 判断关系是对称或是反对称 3.传递关系 4.复合关系和逆关系 复合关系 逆关系 关系运算的性质 5.关系的闭包 闭包的性质 1.自反和反自反 自反性 反…

适合初创企业的有效 CRM 策略

客户关系管理 (CRM) 是任何企业的重要组成部分&#xff0c;尤其是对于旨在与客户建立牢固而有意义的关系的初创公司而言。实施良好的 CRM 策略不仅可以简化您的销售流程&#xff0c;还可以提高客户满意度和保留率。在本文中&#xff0c;我们将介绍初创公司有效 CRM 策略的关键组…

原生APP外包开发成本的估算

原生APP外包开发成本的估算取决于多种因素。根据经验&#xff0c;原生APP外包开发成本一般在几十万到几百万人民币之间。对于功能复杂、要求高的大型APP&#xff0c;开发成本可能更高&#xff0c;甚至达到上千万人民币。北京木奇移动技术有限公司&#xff0c;专业的软件外包开发…

前端Vue组件化实践:自定义银行卡号格式化组件的探索与应用

在前端开发中&#xff0c;随着业务逻辑的复杂化和应用规模的扩大&#xff0c;传统的一体式开发方式逐渐显露出其局限性。任何微小的改动或新功能的增加都可能牵一发而动全身&#xff0c;导致整体逻辑的修改&#xff0c;进而增加了开发成本和维护难度。为了解决这一问题&#xf…

Linux系统的用户组管理和权限以及创建用户

1.Linux是多用户的操作系统&#xff0c;正如在Windows系统中可以进行用户账号的切换&#xff0c;Linux同样允许多用户操作。在Linux服务器环境中&#xff0c;通常由多名运维人员共同管理&#xff0c;而这些运维人员各自拥有不同的权限和级别。因此&#xff0c;我们可以根据每个…