Kafka核心原理

news2025/3/1 3:06:35

一、kafka安装步骤

(1)配置profile文件
vim /etc/profile

// KAFKA
export KAFKA_HOME=/opt/soft/kafka212
export PATH=$KAFKA_HOME/bin:$PATH

source /etc/profile
(2)创建kfkdata目录

cd /opt/soft/kafka212/

mkdir kfkdata

(3)进入config目录配置server.properties文件
// Kafka服务器id号
 21 broker.id=0

// 设置主机IP地址和端口号
 36 advertised.listeners=PLAINTEXT://192.168.91.11:9092

// 存储日志文件的目录
 60 log.dirs=/opt/soft/kafka212/kfkdata

123 zookeeper.connect=192.168.91.11:2181

// 设置连接zk的超时时间(毫秒)
126 zookeeper.connection.timeout.ms=18000

// 主题启用
137 delete.topic.enable=true                         
(4)进入config目录启动Kafka

先启动zookeeper后启动Kafka

// 先启动zookeeper
zhServer.sh start
// 后启动kafka
nohup kafka-server-start.sh /opt/soft/kafka212/config/server.properties &

二、kafka常用命令

// partitions 分区  replication-factor 副本数
// 创建主题
kafka-topics.sh --create --zookeeper 192.168.91.128:2181 --topic kb23 --partitions 1 --replication-factor 1

// 查看主题内容
kafka-topics.sh --zookeeper 192.168.91.11:2181 --list

// 生产者,生产消息
kafka-console-producer.sh --topic kb23 --broker-list 192.168.78.131:9092

// 消费者
kafka-console-consumer.sh --bootstrap-server 192.168.78.131:9092 --topic bigdate --from-beginning

// 查看队列详情
kafka-topics.sh --zookeeper 192.168.91.128:2181 --describe --topic kb23

// 查看队列消息数量
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.91.11:9092 --topic bigdate2

// 删除topic
kafka-topics.sh --zookeeper 192.168.91.11:2181 --delete --topic bigdate

三、Kafka架构

Topic:维护一个主题中的消息,可视为消息分类

Producer:向Kafka主题发布(生产)消息

Consumer:订阅(消费)主题并处理消息

Broker:Kafka集群中的服务器

四、Kafka

(1)topic

主题是已发布消息的类别名称

发布和订阅数据必须指定主题

主题副本数量不大于Brokers个数

(2)partition

一个主题包含多个分区,默认按Key Hash分区

每个Partition对应一个文件夹<topic_name>-<partition_id>

每个Partition被视为一个有序的日志文件(LogSegment)

Replication策略是基于Partition,而不是Topic

每个Partition都有一个Leader,0或多个Followers

(3)Kafka Message header(消息头,固定长度)

offset:唯一确定每条消息在分区内的位置

CRC32:用crc32校验消息

"magic":表示本次发布Kafka服务程序协议版本号

"attributes":表示为独立版本、或标识压缩类型、或编码类型

(4)Kafka Message body(消息体)

key:表示消息键,可选

value bytes payload:表示实际消息数据

(5)Kafka Producer

生产者将消息写入到Broker

Producer直接发送消息到Broker上的Leader Partition

Producer客户端自己控制着消息被推送到哪些Partition
        随机分配、自定义分区算法等

Batch推送提高效率

(6)Kafka Consumer
①消费者通过订阅消费消息

offset的管理是基于消费组(group.id)的级别

每个Partition只能由同一消费组内的一个Consumer来消费

每个Consumer可以消费多个分区

消费过的数据仍会保留在Kafka中

消费者不能超过分区数量

②消费模式

队列:所有消费者在一个消费组内

发布/订阅:所有消费者被分配到不同的消费组

(7)ZooKeeper在Kafka中的作用
①Broker注册并监控状态

/brokers/ids

②Topic注册

/brokers/topics

③生产者负载均衡

每个Broker启动时,都会完成Broker注册过程,生产者会通过该节点的变化来动态地感知到Broker服务器列表的变更

④offset维护

Kafka早期版本使用ZooKeeper为每个消费者存储offset,由于ZooKeeper写入性能较差,从0.10版本后,Kafka使用自己的内部主题维护offset

(8)Kafka数据流
①副本同步 ISR(In-Sync Replica)
②容灾 Leader Partition
③高并发 读写性能 Consumer Group
④负载均衡

(9)Kafka API
①Producer API
②Consumer API
③Streams API
④Connector API

五、kafka生产消费者模式

package nj.zb.kb23.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.Scanner;
/*
生产者消费模式
 */
public class MyProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        /*
        应答机制
        0:不需要等待broker任何响应,无法确保数据正确送到broke中
        1:只需要得到分区副本中leader的确认就OK,可能会丢失数据(极端情况下)
        -1:等到所有副本确认收到信息,响应时间最长,数据最安全,不会丢失数据,(极端情况下,可能会重复)
         */
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");
        KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
        Scanner scanner = new Scanner(System.in);
        while (true){
            System.out.println("请输入内容:");
            String msg=scanner.nextLine();
            if (msg.equals("tt")) {
                break;
            }
            ProducerRecord<String, String> record = new ProducerRecord<>("bigdate",msg);
            producer.send(record);
        }
    }
}

 

// 创建表bigdate
kafka-topics.sh --create --zookeeper 192.168.91.11:2181 --topic bigdate --partitions 1 --replication-factor 1
// 查看列队信息
kafka-console-consumer.sh --bootstrap-server 192.168.91.11:9092 --topic bigdate --from-beginning

六、kafka生产者生产消息

package nj.zb.kb23.kafka;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/*
生产者生产消息
 */
public class MyProducer2 {
    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.91.11:9092");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,StringSerializer.class);
        /*
         * 开启重试,如果发送消息失败,默认是0,只发送一次,可以设置重置3次(共4次)
         * 每次重试的间隔是100ms,可以手动设置10000
         */
        properties.put(ProducerConfig.RETRIES_CONFIG,3);//properties.put(ProducerConfig."retries",3);
        properties.put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG,10000);//每个分区10000条信息
        /*
        BUFFER_MEMORY_CONFIG   缓存内存           默认值  32M
        BATCH_SIZE_CONFIG     批大小配置          默认值  16KB
        SEND_BUFFER_CONFIG   每次批量发送的值
        */
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,102400); //1024 byte kb
        properties.put(ProducerConfig.SEND_BUFFER_CONFIG,102400);
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);  //64M
        //应答机制:-1 all
        properties.put(ProducerConfig.ACKS_CONFIG,"-1");
        //多线程
        ExecutorService executorService = Executors.newCachedThreadPool();
        //模拟10个线程,同时向Kafka传递数据
        for (int i = 0; i < 10; i++) {
            Thread thread = new Thread(new Runnable() {
                @Override
                public void run() {
                    //一个线程代表一个人
                    KafkaProducer<String,String> producer = new KafkaProducer<>(properties);
                    String threadName = Thread.currentThread().getName();
                    System.out.println(threadName);
                    //每个线程传递1000条数据
                    for (int j = 0; j < 10000; j++) {
                        ProducerRecord<String, String> record = new ProducerRecord<>("bigdate1", threadName + " " + j);
                        try {
                            //让主机休息10毫微秒
                            Thread.currentThread().sleep(10);
                        } catch (Exception e) {
                            e.printStackTrace();
                        }
                        producer.send(record);
                    }
                }
            });
            executorService.execute(thread);
        }
        executorService.shutdown();
        while (true){
            //让主线程多休息10秒,主线程关闭,子线程没有跟上,所有数据缺失
            Thread.sleep(10000);
            if (executorService.isTerminated()){
                System.out.println("game over!");
                break;
            }
        }
    }
}

// 查看进度
[root@kb23 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.91.11:9092 --topic bigdate1 --from-beginning
pool-1-thread-4 9997
pool-1-thread-9 9997
pool-1-thread-1 9997
pool-1-thread-8 9998
pool-1-thread-4 9999
pool-1-thread-7 9999
pool-1-thread-3 9999
pool-1-thread-5 9999
pool-1-thread-2 9999
^CProcessed a total of 100000 messages

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1026852.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

OpenCV实现图像去水印功能(inpaint)

水印定位 需要根据图像特征获取水印的位置。 如图所示&#xff0c;图像左下角、右下角有水印。第一步&#xff0c;我们首先得定位水印所在位置。 Mat gray;cvtColor(src, gray, COLOR_BGR2GRAY);//图像二值化&#xff0c;筛选出白色区域部分Mat thresh;threshold(gray, thres…

可转债实战与案例分析——成功的和失败的可转债投资案例、教训与经验分享

实战与案例分析——投资案例研究 股票量化程序化自动交易接口 一、成功的可转债投资案例 成功的可转债投资案例提供了有价值的经验教训&#xff0c;以下是一个典型的成功案例&#xff1a; 案例&#xff1a;投资者B的成功可转债投资 投资者B是一位懂得风险管理的投资者&#…

LLM各层参数详细分析(以LLaMA为例)

网上大多分析LLM参数的文章都比较粗粒度&#xff0c;对于LLM的精确部署不太友好&#xff0c;在这里记录一下分析LLM参数的过程。 首先看QKV。先上transformer原文 也就是说&#xff0c;当h&#xff08;heads&#xff09; 1时&#xff0c;在默认情况下&#xff0c; W i Q W_i…

RabbitMQ - 死信、TTL原理、延迟队列安装和配置

目录 一、死信交换机 1.1、什么是死信交换机 1.2、TTL 1.2.1、什么是 TTL 1.2.2、通过 TTL 模拟触发死信 二、延迟队列 2.1、什么是延迟队列 2.2、配置延迟队列插件 2.2.1、延迟队列配置 a&#xff09;下载镜像 b&#xff09;运行容器 c&#xff09;刚刚设定的Rabb…

jmeter下载安装教程

一、下载安装jdk&#xff08;jmeter需要&#xff09; 1、首页下载jdk&#xff0c;地址&#xff1a;Java Downloads | Oracle 2、下载se&#xff0c;注意需要oracle账号&#xff0c;注册即可 这里的8u384代表JDK 8版本&#xff0c;384代表子版本&#xff0c;u是update(更新)的…

flink集群与资源@k8s源码分析-运行时

1 运行时 运行时提供了Flink作业运行过程依赖的基础执行环境,包含Dispatcher、ResourceManager、JobManager和TaskManager等核心组件,本节分析资源相关运行时组件构建和启动。 flink没有使用spring,缺少ioc的构建过程相当复杂,所有依赖手动关联和置入,为了共享组件,fli…

jenkins容器内配置python项目运行环境(Python3.7.3)

目录 1.查看启动的容器2.进入jenkins容器内部3.使用wget&#xff1a;提示没有wget命令4.查看jenkins容器系统版本5.换成国内源&#xff08;阿里&#xff09;5.更新apt-get6.安装wget7.创建python存放目录8.下载python9.解压10.安装依赖11.运行脚本configure12.make编译make ins…

汽车三高试验离不开的远程试验管理平台-TFM

随着信息技术的高速发展&#xff0c;企业对远程试验实时监控与数据管理的需求日益增强。而利用远程试验信息协同技术&#xff0c;可突破部门与地域的限制&#xff0c;并把试验现场的车辆状态信息、试验数据和分析结果实时传输给数据分析部门和设计部门等&#xff0c;从而缩短时…

什么是HTTP/2?它与HTTP/1.1相比有什么改进?

聚沙成塔每天进步一点点 ⭐ 专栏简介⭐ HTTP/2 简介⭐ 主要的改进和特点1. 多路复用&#xff08;Multiplexing&#xff09;2. 头部压缩&#xff08;Header Compression&#xff09;3. 服务器推送&#xff08;Server Push&#xff09;4. 二进制传输&#xff08;Binary Protocol&…

12基于MATLAB的短时傅里叶变换( STFT),连续小波变换( CWT),程序已调通,可以直接运行。

基于MATLAB的短时傅里叶变换( STFT),连续小波变换( CWT),程序已调通&#xff0c;可以直接运行

jdk exe安装包如何自制zip解压版

jdk8 oracle官方下载页面 https://www.oracle.com/java/technologies/downloads/#java8-windows 可以看到&#xff0c;只有exe安装包 下载最新的exe安装包 解压 用7Zip解压 里面有好几个JAVA_CAB*文件夹&#xff0c;我们只需要关注两个&#xff1a;9和10&#xff0c;JAVA_CA…

【操作系统笔记】内存分配

内存对齐 问题&#xff1a;为什么需要内存对齐呢&#xff1f; 主要原因是为了兼容&#xff0c;为了让程序可以运行在不同的处理器中&#xff0c;有很多处理器在访问内存的时候&#xff0c;只能从特定的内存地址读取数据。换个说法就是处理器每次只能从内存取出特定个数字节的数…

卡尔曼滤波(Kalman Filter)C#测试

一、操作过程 刚学了一下卡尔曼滤波&#xff0c;具体原理还没细看&#xff0c;大致过程如下 分为两步&#xff0c;第一步Predict&#xff0c;以下两个公式 第二步Correct&#xff0c;以下三个公式 公式看起来很复杂&#xff0c;其中是我们要处理的数据&#xff0c; 是滤…

HTTP 协商缓存 ETag、If-None-Match

&#xff08;1&#xff09;浏览器第一次跟服务器请求一个资源&#xff0c;服务器在返回这个资源的同时&#xff0c;在respone header加上ETag。 ETag是服务器根据当前请求的资源生成的一个唯一标识。 这个唯一标识是一个字符串&#xff0c;只要资源有变化这个串就不同&#xff…

CSS的学习

1.认识CSS CSS 叫做"层叠样式表" “层叠样式表” 样式 --> 大小,位置,间距,颜色,字体,表框背景… 统称为"样式",描述了一个网页长什么样子~ 层叠 --> 针对一个html的元素/标签,可以同时应用多组CSS样式~~ 多组样式会叠加在一起~~ CSS描述的是页…

cocosCreator 之 Graphics绘制基础图形,五角星,线型图,柱形图

版本&#xff1a; 3.4.0 环境&#xff1a; Mac Graphics组件 Graphics组件主要用于绘画使用&#xff0c;属于渲染组件。继承结构&#xff1a; #mermaid-svg-WHveKVDzMTXmCbpg {font-family:"trebuchet ms",verdana,arial,sans-serif;font-size:16px;fill:#333;}#mer…

Android Studio 创建项目不自动生成BuildConfig文件

今天在AS上新建项目发现找不到BuildConfig文件&#xff0c;怎么clear都不行。通过多方面查找发现原来gradle版本不同造成的&#xff0c;Gradle 8.0默认不生成 BuildConfig 文件。 如上图&#xff0c;8.0版本是没有source文件夹 上图是低于8.0版本有source文件夹 针对这个问题&…

Jenkins学习笔记1

CI 服务器&#xff1a; 认识Jenkins&#xff1a; Jenkins是一个可扩展的持续集成&#xff08;CI&#xff09;引擎&#xff0c;是一个开源项目&#xff0c;旨在提供一个开放易用的软件平台&#xff0c;使得软件持续集成变成可能。Jenkins非常易于安装和配置&#xff0c;简单易…

算法leetcode|83. 删除排序链表中的重复元素(rust重拳出击)

文章目录 83. 删除排序链表中的重复元素&#xff1a;样例 1&#xff1a;样例 2&#xff1a;提示&#xff1a; 分析&#xff1a;题解&#xff1a;rust&#xff1a;go&#xff1a;c&#xff1a;python&#xff1a;java&#xff1a; 83. 删除排序链表中的重复元素&#xff1a; 给…

Docker从认识到实践再到底层原理(六-1)|Docker容器基本介绍+命令详解

前言 那么这里博主先安利一些干货满满的专栏了&#xff01; 首先是博主的高质量博客的汇总&#xff0c;这个专栏里面的博客&#xff0c;都是博主最最用心写的一部分&#xff0c;干货满满&#xff0c;希望对大家有帮助。 高质量博客汇总 然后就是博主最近最花时间的一个专栏…