快速搞定分布式Kafka

news2025/1/12 13:41:59

本文从kafka中的实际应用场景分析,讲述kafka的一些基本概念。再讲述zookeeper集群环境的构建;kafka的搭建以及脚本文件编写;最后是一个快速入门的demo.内容会比较多,希望大家能有所收获!

1.Kafka(MQ)实战应用场景剖析

Kafka(mq)应用场景
1、Kafka之异步化、服务解耦、削峰填谷
2、Kafka海量日志收集
3、Kafka之数据同步应用
4、Kafka之实时计算分析

Kafka异步化实战:
异步化解耦,消息推送,持久化存储保证数据可靠性,避免需要重传。
在这里插入图片描述
Kafka服务解耦,削峰填谷:
下单请求-redis实现库存校验(list存储)
在这里插入图片描述
实现服务解耦指的是请求和接收方可以是两个完全独立的系统;
通常消息队列是作为不同服务之间信息传递的桥梁。

KAFKA海量日志收集:(logstash日志解析)
在这里插入图片描述

KAFKA之数据同步实战:
在这里插入图片描述
KAFKA实时计算分析:
在这里插入图片描述

2.Kafka基础概念

1集群架构简介
在这里插入图片描述
在这里插入图片描述

Topic主题与Partition分区(一对多的关系)
一个分区只能属于单个主题,同一个主题下可以有不同的分区。
分区里面包含不同的消息。分区可以理解为是一个可追加的日志文件。
Kafka可是通过offset保证消息在分区内的顺序性的。
路由规则有关分区区号,分区选择那个分区;
分区的目的是分散磁盘io;创建主题的时候也可以去制定分区的个数的。
通过增加分区,实现水平扩展

2副本概念(replica)
在这里插入图片描述
绿色p1是个主分片或主副本,
Broker2上的紫色p1是副本。副本分片。优点类似elasticsearch
通过replica,实现集群故障转移,保证了集群高可用。
用空间换取数据的高可靠性,稳定性。

3ISR详解
在这里插入图片描述
ISR集合的理解:数据录取及时进入ISR集合(比如p1s1的拉取时间为50ms,p1s2的pull时间为200ms,都小于规定超时时间1s。两者皆进入ISR.)跟数据一直性相关,副本从主副本的拉取时间
超时的话会进入OSR集合。
Leader副本(p1)主要是维护和跟踪ISR集合中的滞后状态。
ISR集合跟OSR集合是动态的。比如后面有消息来后,p1s2的时间超过规定时间1s,
则将p1s2放入OSR集合。
当p1宕机,一般是从ISR选取副本进行替代成为leader副本,即新的P1。

ISR只是模型,映射到实际的存储的一些基本概念。
HW:high watermark ,高水位线,消费者只能最多拉取到高水位线的消息
LEO:log end offset,日志文件的最后一条记录offset(偏移量)
ISR集合与HW和LEO直接存在着密不可分的关系
在这里插入图片描述
高水位线下一次需要拉取的数据是6
在这里插入图片描述
理解:当写入消息3和4时,此时leader已经成功写入,hw没变,
Leo变成4.HW没变是因为还没有完成数据同步,3和4目前consumer
目前还没有办法处理。
在这里插入图片描述
消息3同步的比较快,此时hw变成3.LEO变为4.
因为follower2对于数据同步的比较慢,所以暂定hw为3

3.构建zookeeper集群环境

在这里插入图片描述
在这里插入图片描述

4.开机启动与连接工具介绍

实现开机自启动步骤:
cd /etc/rc.d/init.d/
Touch zookeeper
Chmod 777 zookeeper
vi zookeeper

开启启动zookeeper脚本:

#!/bin/bash

#chkconfig:2345 20 90

#description:zookeeper

#processname:zookeeper

export JAVA_HOME=/usr/java/jdk1.8.0_301
export PATH=$JAVA_HOME/bin:$PATH
case $1 in
          start) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh start;;
          stop) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh stop;;
          status) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh status;;
          restart) su root /home/software5/zookeeper-3.6.2/bin/zkServer.sh restart;;
          *) echo "require start|stop|status|restart" ;;
esac

使用ZooInspector图形界面连接zookeeper。
使用到的命令:java -jar.\zookeeper-dev-ZooInspector.jar

5.Kafka环境搭建

Kafka版本:kafka_2.12
管控台:kafkaManager2.0.0.2
协调服务:zookeeper-3.4.6
Kafka环境验证(操作台控制)

Kafka环境搭建:
准备zookeeper环境(zookeeper-3.6.2)
下载kafka安装包:kafka
上传到虚拟机:/home/software6
解压到/usr/local目录下:
tar -zxvf kafka_2.12-2.1.0.tgz.gz -C /usr/local
重命名压缩的文件:mv kafka_2.12-2.1.0 kafka_2.12
修改kafka配置文件:

cd kafka_2.12/config
vi server.properties
  ## 修改内容:
  ## The id of the broker. This must be set to a unique integer for each broker
  broker.id=0
  port=9092
  host.name=192.168.11.221
  dvertised.host.name=192.168.11.221
  log.dirs=/usr/local/kafka_2.12/kafka-logs
  num.partitions=5
  zookeeper.connect=192.168.11.221:2181,192.168.11.222:2181,192.168.11.223:2181

创建kafka存储消息(log日志数据)的目录

mkdir /usr/local/kafka_2.12/kafka-logs

kafka配置成功,执行启动命令,启动kafka。

/usr/local/kafka_2.12/bin/kafka-server-start.sh  /usr/local/kafka_2.12/config/server.properties &
/usr/local/kafka_2.12/bin/kafka-server-start.sh /usr/local/kafka_2.12/config/server.properties &

Kafka-manager管控台搭建与脚本测试验证
安装kafka Manager可视化管控台:安装到192.168.56.107上
解压zip文件:
unzip kafka-manager-2.0.0.2.zip -d /usr/local
修改配置文件: vi /usr/local/kafka-manager-2.0.0.2/conf/appication.conf
修改的内容:
kafka-manager.zkhosts=“192.168.56.107:2181,192.168.56.110:2181,
192.168.56.111:2181”

192.168.56.107节点启动kafka manager 控制台
/usr/local/kafka-manager-2.0.0.2/bin/kafka-manager &

浏览器访问控制台:默认端口号是9000
http://192.168.56.107:9000/

集群验证:
通过控制台创建了一个topic为"test" 2个分区 1个副本

消费发送与接收验证
cd /usr/local/kafka_2.12/bin
## 启动发送消息的脚本
## --broker-list 192.168.11.221 指的是kafka broker的地址列表
## --topic test 指的是把消息发送到test主题
./kafka-console-producer.sh --broker-list 192.168.56.107:9092 --topic topic-quickstart
## 启动接收消息的脚本
./kafka-console-consumer.sh --bootstrap-server 192.168.56.107:9092 --topic topic-serial

6.Kafka快速入门

Kafka急速入门:producer:
配置生产者参数属性,创建生产者对象;构建消息producerrecord
发送消息send;关闭生产者
Kafka快速入门:Consumer
配置消费者参数写构造消费者对象;订阅主题;
拉取消息并进行消费处理;提交消费偏移量,关闭消费者
kafka是通过序列化好的key然后去进行分区的。

Kafka急速入门:producer:

import com.alibaba.fastjson.JSON;
import com.bfxy.kafka.api.Const;
import com.bfxy.kafka.api.User;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class QuickStartProducer {
    public static void main(String[] args) {
        Properties properties = new Properties();

        //1.配置生产者启动的关键属性参数
        //1.1 BOOTSTRAP_SERVERS_CONFIG,连接kafka集群服务列表,如果有多个,使用","进行分隔
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.56.107:9092");
        //1.2 CLIENT_ID_CONFIG: 这个属性的目的是标记kafka cilent的ID
        properties.put(ProducerConfig.CLIENT_ID_CONFIG,"quickstart-producer");
        //1.3对kafka的key和value做序列化(kafka只能识别二进制数据)
        //org.apache.kafka.common.serialization.Serialization

        //key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        //value:实际发送消息的内容
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2.创建kafka生产者对象,传递properties属性参数集合
        KafkaProducer<String ,String> producer = new KafkaProducer<>(properties);

        //3.构造消息内容
        User user = new User("003","张飞");
        //需要将user对象转化为string,arg1:topic  arg2:实际消息体的内容
        ProducerRecord<String,String> record =
                new ProducerRecord<String,String>(Const.TOPIC_QUICKSTART,
                        JSON.toJSONString(user));

        //4.发送消息
        producer.send(record);

        //5.关闭生产者,生产环境中一般不关闭
        producer.close();

    }
}

Kafka快速入门:Consumer

import com.bfxy.kafka.api.Const;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;

import java.time.Duration;
import java.util.Collections;
import java.util.List;
import java.util.Properties;

/**
 * @author nly
 */
public class QuickStartConsumer {

    public static void main(String[] args) throws InterruptedException {
        Properties properties = new Properties();

        //1.配置属性参数
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.56.107:9092");
        //key;是kafka用于做消息投递计算具体投递到对应的主题的哪一个partition而需要的
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //value:实际发送消息的内容
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //非常重要的属性配置,与我们消费者订阅组有关系
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "topic-quickstart");
        //常规属性:会话连接超时时间
        properties.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 10000);
        //常规属性:消费者提交offset:自动提交&手工提交,默认是自动提交
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,true);
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,5000);

        //2.创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        //3.订阅你感兴趣的主题:Const.TOPIC_QUICKSTART
        consumer.subscribe(Collections.singletonList(Const.TOPIC_QUICKSTART));

        System.err.println("quickstart consumer started ...");

        //4.采用拉取的方式消费数据
        while (true){
                 //等待多久进行一次数据的拉取
                 //拉取TOPIC_QUICKSTART主题里面所有的消息
                 //topic和partition是一对多的关系,一个topic可以有多个partition
                 ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(3000));
                 //因为消息是在partition中存储的,所以需要遍历partition集合
                 for (TopicPartition topicPartition : records.partitions()){
                     //通过topicPartition获取制定的消息集合,就是获取到当前topicPartition下面所有的消息
                     // 在records对象中的数据集合
                     List<ConsumerRecord<String,String>> partitionRecords = records.records(topicPartition);
                     //获取到每一个TopicPartition
                     String topic =topicPartition.topic();
                     //获取当前topicPartition下的消息条数
                     int size = partitionRecords.size();

                     System.out.println(String.format("--- 获取topic: %s,  分区位置 : %s, 消息总数: %s",
                             topic,
                             topicPartition.partition(),
                             size));

                     for(int i = 0; i <  size; i++){
                         ConsumerRecord<String,String> consumerRecord = partitionRecords.get(i);
                         // 实际数据内容
                         String value = consumerRecord.value();
                         // 当前获取的消息的偏移量
                         long offset = consumerRecord.offset();
                         //ISR : High Watermark,如果要提交的话,比如提交当前消息的offset
                         //表示下一次从什么位置(offset)拉取消息
                         long commitOffset = offset + 1;

                         System.err.println(String.format("获取实际消息value: %s,  消息offset: %s, 提交offset: %s",
                                 value,offset,commitOffset));
                     }
                 }
             }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1968752.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux系统ShellCheck检查shell脚步语法正确的工具

目录 ShellCheck 安装ShellCheck 、dnf、yum 源代码编译 步骤如下&#xff1a; 示例命令&#xff1a; 方法三&#xff1a;使用其他第三方仓库、COPR 仓库 假设 ShellCheck 输出如下&#xff1a; 分析输出 修改脚本 再次运行 ShellCheck 1. Shell 脚本最佳实践 主题…

vcpkg install libtorch[cuda] -allow-unsupported-compiler

在vcpkg中不懂如何使用 nvcc 的 -allow-unsupported-compiler, 所以直接注释了CUDA中对版本的检查代码. C:\Program Files\NVIDIA GPU Computing Toolkit\CUDA\v11.8\include\crt\host_config.h 奇了怪了,我是用的是vs2022,但是还是被检查为不支持的编译器!!! 可以试一下改这…

搭建gitlab代码托管仓库(解决centos7无法使用默认数据源问题)

公司的gitlab由于停电&#xff0c;又崩了&#xff0c;每次停电都会崩掉。所以就想到自己学一下搭建gitlab代码中心&#xff0c;后面在搞一个jenkins自动发版学习一下&#xff0c;慢慢搞吧。 在弄的时候&#xff0c;发现Centos7居然在2024年6月31日停止维护了。这就离谱了&…

职教国培丨高职教师数据分析与挖掘课程实施能力提升培训班莅临泰迪智能科技参观调研

7月28日&#xff0c;由广东机电职业技术学院牵头&#xff0c;广东泰迪智能科技股份有限公司为合作单位的“2024年高职教师数据分析与挖掘课程实施能力提升培训班”老师莅临广东泰迪智能科技股份有限公司产教融合实训基地参观调研&#xff0c;来自广东省各地36位高校教师参与本次…

如何在 Kali Linux 上安装和使用 Docker 和 Docker Compose

Docker 和 Docker Compose 是现代开发者必备的工具&#xff0c;特别是当你需要在不同的环境中部署应用时。本文将详细介绍如何在 Kali Linux 上安装 Docker 和 Docker Compose&#xff0c;并使用它们启动服务。即使你是个技术小白&#xff0c;也能轻松跟随这篇指南完成操作。 …

Ecovadis认证:企业申请Ecovadis认证条件

Ecovadis认证是一种用于评估和评价企业可持续发展绩效的认证体系。该认证由Ecovadis公司提供&#xff0c;目的是帮助公司了解和改善其环境、社会和治理&#xff08;ESG&#xff09;实践。 Ecovadis认证主要基于四个方面进行评估&#xff1a;环境、劳工和人权、道德采购以及可持…

Python——记录pip问题(解决下载慢、升级失败问题)

在python开发中&#xff0c;经常需要使用到各种各样的库。 pip又是我们常用的安装工具。但是国外的源下载速度实在太慢&#xff0c;经常导致超时。 有很多朋友刚刚学Python的时候&#xff0c;会来问为什么pip下载东西这么慢啊&#xff1f; 而且pycharm里面下载库也是非常的慢…

Linux服务器安装MySQL8.0

序号类型地址1MySQLLinux&#xff08;centos 7.5&#xff09;服务器安装MySQL5.72MySQLLinux服务器安装MySQL8.03MySQLMySQL操作之概念、SQL约束&#xff08;一&#xff09;4MySQLMySQL操作之数据定义语言&#xff08;DDL)&#xff08;二&#xff09;5MySQLMySQL操作之数据操作…

React三原理和路由

代码下载 React 组件通讯原理 setState() 说明 setState() 是异步更新数据的&#xff0c;使用该语法时&#xff0c;后面的 setState() 不要依赖于前面的 setState()&#xff0c;可以多次调用 setState() &#xff0c;只会触发一次重新渲染&#xff1a; this.setState({ coun…

CPQ报价管理系统 | 成本报价CPQ解决方案

一、成本报价流程现状 1、传统流程 2、业务痛点 ①、数据手工重复输入环节多、易错&#xff0c;为保障准确性需多次复核&#xff0c;影响报价效率 ②、原材波动较大&#xff0c;但是当前询价流程只有一次性&#xff0c;原材成本发生变化&#xff0c;无法及时更新变化提醒报价…

类和对象(作业篇)

简简单单整理一下咱们的小作业&#xff0c;这次的作业比较简单&#xff0c;只有选择题&#xff1a; public class Test{private float f1.0f;int m12;static int n1;public static void main(String args[]){Test tnew Test();} }A&#xff1a;抛开private不说&#xff0c;先看…

解析顺序表【数据结构】

1.线性表 线性表&#xff08;linear list&#xff09;是n个具有相同特性的数据元素的有线序列。线性表是一种在实际中广泛使用的数据结构&#xff0c;常见的线性表有&#xff1a;顺序表、链表、栈、队列、字符串… 线性表在逻辑上是线性结构&#xff0c;也就是说是连续的一条线…

HTML 字符集详解及示例

文章目录 摘要引言从ASCII到UTF-8的演变ASCII 字符集ANSI字符集ISO-8859-1字符集UTF-8字符集 示例代码运行Demo小结表格总结未来展望参考资料 摘要 本文介绍了HTML中的字符集演变历史&#xff0c;从最初的ASCII到现代的UTF-8&#xff0c;并提供了设置字符集的示例代码。文中涵…

图形编辑器基于Paper.js教程10:导入导出svg,导入导出json数据

深入了解Paper.js&#xff1a;实现SVG和JSON的导入导出功能 Paper.js是一款强大的矢量绘图JavaScript库&#xff0c;非常适合用于复杂的图形处理和交互式网页应用。本文将详细介绍如何在Paper.js项目中实现SVG和JSON格式的导入导出功能&#xff0c;这对于开发动态图形编辑器等…

git reset --soft(回退commit,保留add)

参考博客&#xff1a;git reset --soft命令的使用-CSDN博客感觉博客中举的例子不是很好。读者自行判断。举的例子的场景适合使用revert&#xff0c;撤销就行了。另外建议看下边这篇博客&#xff0c;这篇详细介绍了reset和revert&#xff0c;带图。但是要注意这个reset是hard的&…

mysql 内存一直增长(memory/sql/thd::main_mem_root)

mysql版本 8.0.14 发现过程 查询总内存 SELECT t.EVENT_NAME, t.CURRENT_NUMBER_OF_BYTES_USED FROM performance_schema.memory_summary_global_by_event_name t ORDER BY t.CURRENT_NUMBER_OF_BYTES_USED DESC;前&#xff1a; memory/sql/thd::main_mem_root 1…

第十五天啦 2024.8.1 (Spring框架)

1.从宏观上看spring框架和springboot Spring框架解决了企业级的开发的复杂性&#xff0c;它是一个容器框架&#xff0c;用于装java对象&#xff08;Bean&#xff09;&#xff0c;使程序间的依赖关系交由容器统一管理&#xff0c;松耦合&#xff0c;提高了可测试性和维护效率&a…

网络原理的TCP/IP

TCP/IP协议 1)应用层 应用层和应用程序直接相关,与程序员息息相关的一层协议,应用层协议,里面描述的内容,就是写的程序,通过网络具体按照啥样的方式来进行传输,不同的应用程序,就可以用不同的应用层协议,在实际开发的过程中,需要程序员自制应用层协议 应用层协议本质上就是对…

主题巴巴WordPress主题合辑打包下载+主题巴巴SEO插件

主题巴巴WordPress主题合辑打包下载&#xff0c;包含博客一号、博客二号、博客X、门户一号、门户手机版、图片一号、杂志一号、自媒体一号、自媒体二号和主题巴巴SEO插件。

5行代码快速Git配置ssh

0 流程步骤 检查本地主机是否已经存在ssh key生成ssh key获取ssh key公钥内容&#xff08;id_rsa.pub&#xff09;复制该内容&#xff0c;到Github账号上添加公钥&#xff0c;进入Settings设置验证是否设置成功 1 代码 # 1.检查本地主机是否已经存在ssh key cd ~/.ssh ls # …