Kafka从0到消费者开发

news2024/12/23 6:27:43

安装ZK

Index of /zookeeper/zookeeper-3.9.2

  1. 下载安装包

一定要下载-bin的,不带bin的是源码,没有编译的,无法执行。-bin的才可以执行

  1. 解压
tar -zxvf apache-zookeeper-3.9.2-bin.tar.gz

  1. 备份配置
cp zoo_sample.cfg zoo_sample.cfg-back

  1. 配置命名并修改配置
# 创建zk数据路径
mkdir -p /data/zk
# 修改配置
mv zoo_sample.cfg zoo.cfg  && vim zoo.cfg

  1. 配置变更内容
###########################  变更区
# 将数据目录变更为新的路径
# dataDir=/tmp/zookeeper
dataDir=/data/zk
###########################  变更区


##########################################配置原文件
# The number of milliseconds of each tick
tickTime=2000
# The number of ticks that the initial 
# synchronization phase can take
initLimit=10
# The number of ticks that can pass between 
# sending a request and getting an acknowledgement
syncLimit=5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just 
# example sakes.
# dataDir=/tmp/zookeeper
dataDir=/data/zk
# the port at which the clients will connect
clientPort=2181
# the maximum number of client connections.
# increase this if you need to handle more clients
#maxClientCnxns=60
#
# Be sure to read the maintenance section of the 
# administrator guide before turning on autopurge.
#
# https://zookeeper.apache.org/doc/current/zookeeperAdmin.html#sc_maintenance
#
# The number of snapshots to retain in dataDir
#autopurge.snapRetainCount=3
# Purge task interval in hours
# Set to "0" to disable auto purge feature
#autopurge.purgeInterval=1

## Metrics Providers
#
# https://prometheus.io Metrics Exporter
#metricsProvider.className=org.apache.zookeeper.metrics.prometheus.PrometheusMetricsProvider
#metricsProvider.httpHost=0.0.0.0
#metricsProvider.httpPort=7000
#metricsProvider.exportJvmInfo=true

  1. 启动zk
./bin/zkServer.sh start

  1. 常用命令
# 启动
./zkServer.sh start

# 停止
./zkServer.sh stop

# 状态
./zkServer.sh status

  1. 集群改造

集群需要多台机器

每台机器的配置,都要配置上下面的内容

  • 配置
server.2=173.16.250.31:12888:13888
server.1=173.16.250.32:12888:13888
# 配置解析
# server.2=173.16.250.31:12888:13888
## server 是固定前缀
## 2 代表当前节点id,可以自定义,数字或字母标识,只要唯一标识一个节点即可。
## 173.16.250.31 节点id
## 12888 master和slave通信端口 默认2888
## 13888 leader选举端口,默认3888

注意,自己当前节点,及其他的节点,均要配置

  • 创建当前节点id=》myid

注意:myid需要创建在dataDir目录下,看配置的dataDir目录是什么,否则会起不来

# 这里的2 就是当前节点的id,等于 server.2=xxxx:2888:3888 ,中的2
echo "2" > /data/zk/myid

  1. 验证集群状态
# 集群状态下,登录每一台zk服务器,查看状态,会显示当前的节点是否为leader

./zkServer.sh status


# 非主节点显示
[root@localdomain bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zk/apache-zookeeper-3.9.2-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: follower

# 主节点显示
[root@localdomain bin]# ./zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /opt/zk/apache-zookeeper-3.9.2-bin/bin/../conf/zoo.cfg
Client port found: 2181. Client address: localhost. Client SSL: false.
Mode: leader

安装kafka

kafka首页:Apache Kafka

点击下载即可

  1. 解压安装包
tar -zxvf kafka_2.13-3.7.0.tgz

  1. 修改配置
vim /${KAFKA_HOME}/config/server.properties

broker.id=0 //初始是0,每个 server 的broker.id 都应该设置为不一样的,就和 myid 一样 我的三个服务分别设置的是 1,2,3
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log

#在log.retention.hours=168 下面新增下面三项
message.max.byte=5242880
default.replication.factor=2
replica.fetch.max.bytes=5242880

#设置zookeeper的连接端口
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181

# 设置局域网内其他机器可以访问,如果不设置,只能localhost能访问,会导致后面写消费者时,java程序连接不上。这里写自己的服务ip即可。
advertised.listeners=PLAINTEXT://173.16.250.32:9092

配置解释

broker.id=0  #当前机器在集群中的唯一标识,和zookeeper的myid性质一样
port=9092 #当前kafka对外提供服务的端口默认是9092
host.name=192.168.1.7 #这个参数默认是关闭的,在0.8.1有个bug,DNS解析问题,失败率的问题。
num.network.threads=3 #这个是borker进行网络处理的线程数
num.io.threads=8 #这个是borker进行I/O处理的线程数
log.dirs=/usr/local/kafka/kafka_2.12-2.3.0/log #消息存放的目录,这个目录可以配置为“,”逗号分割的表达式,上面的num.io.threads要大于这个目录的个数这个目录,如果配置多个目录,新创建的topic他把消息持久化的地方是,当前以逗号分割的目录中,那个分区数最少就放那一个
socket.send.buffer.bytes=102400 #发送缓冲区buffer大小,数据不是一下子就发送的,先回存储到缓冲区了到达一定的大小后在发送,能提高性能
socket.receive.buffer.bytes=102400 #kafka接收缓冲区大小,当数据到达一定大小后在序列化到磁盘
socket.request.max.bytes=104857600 #这个参数是向kafka请求消息或者向kafka发送消息的请请求的最大数,这个值不能超过java的堆栈大小
num.partitions=1 #默认的分区数,一个topic默认1个分区数
log.retention.hours=168 #默认消息的最大持久化时间,168小时,7天
message.max.byte=5242880  #消息保存的最大值5M
default.replication.factor=2  #kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务
replica.fetch.max.bytes=5242880  #取消息的最大直接数
log.segment.bytes=1073741824 #这个参数是:因为kafka的消息是以追加的形式落地到文件,当超过这个值的时候,kafka会新起一个文件
log.retention.check.interval.ms=300000 #每隔300000毫秒去检查上面配置的log失效时间(log.retention.hours=168 ),到目录查看是否有过期的消息如果有,删除
log.cleaner.enable=false #是否启用log压缩,一般不用启用,启用的话可以提高性能
zookeeper.connect=192.168.1.7:2181,192.168.1.8:2181,192.168.1.9:2181 #设置zookeeper的连接端口

详细说明

常规配置
这些参数是 kafka 中最基本的配置

broker.id

每个 broker 都需要有一个标识符,使用 broker.id 来表示。它的默认值是 0,它可以被设置成其他任意整数,在集群中需要保证每个节点的 broker.id 都是唯一的。

port

如果使用配置样本来启动 kafka ,它会监听 9092 端口,修改 port 配置参数可以把它设置成其他任意可用的端口。

zookeeper.connect

用于保存 broker 元数据的地址是通过 zookeeper.connect 来指定。localhost:2181 表示运行在本地 2181 端口。该配置参数是用逗号分隔的一组 hostname:port/path 列表,每一部分含义如下:

hostname 是 zookeeper 服务器的服务名或 IP 地址

port 是 zookeeper 连接的端口

/path 是可选的 zookeeper 路径,作为 Kafka 集群的 chroot 环境。如果不指定,默认使用跟路径

log.dirs

Kafka 把消息都保存在磁盘上,存放这些日志片段的目录都是通过 log.dirs 来指定的。它是一组用逗号分隔的本地文件系统路径。如果指定了多个路径,那么 broker 会根据 "最少使用" 原则,把同一分区的日志片段保存到同一路径下。要注意,broker 会向拥有最少数目分区的路径新增分区,而不是向拥有最小磁盘空间的路径新增分区。

num.recovery.threads.per.data.dir

对于如下 3 种情况,Kafka 会使用可配置的线程池来处理日志片段

服务器正常启动,用于打开每个分区的日志片段;

服务器崩溃后启动,用于检查和截断每个分区的日志片段;

服务器正常关闭,用于关闭日志片段

默认情况下,每个日志目录只使用一个线程。因为这些线程只是在服务器启动和关闭时会用到,所以完全可以设置大量的线程来达到井行操作的目的。特别是对于包含大量分区的服务器来说,一旦发生崩愤,在进行恢复时使用井行操作可能会省下数小时的时间。设置此参数时需要注意,所配置的数字对应的是 log.dirs 指定的单个日志目录。也就是说,如果 num.recovery.threads.per.data.dir 被设为 8,并且 log.dir 指定了 3 个路径,那么总共需要 24 个线程。

auto.create.topics.enable

默认情况下,Kafka 会在如下 3 种情况下创建主题

当一个生产者开始往主题写入消息时

当一个消费者开始从主题读取消息时

当任意一个客户向主题发送元数据请求时

delete.topic.enable

如果你想要删除一个主题,你可以使用主题管理工具。默认情况下,是不允许删除主题的,delete.topic.enable 的默认值是 false 因此你不能随意删除主题。这是对生产环境的合理性保护,但是在开发环境和测试环境,是可以允许你删除主题的,所以,如果你想要删除主题,需要把 delete.topic.enable 设为 true。

主题默认配置
Kafka 为新创建的主题提供了很多默认配置参数,下面就来一起认识一下这些参数

num.partitions

num.partitions 参数指定了新创建的主题需要包含多少个分区。如果启用了主题自动创建功能(该功能是默认启用的),主题分区的个数就是该参数指定的值。该参数的默认值是 1。要注意,我们可以增加主题分区的个数,但不能减少分区的个数。

default.replication.factor

这个参数比较简单,它表示 kafka保存消息的副本数,如果一个副本失效了,另一个还可以继续提供服务default.replication.factor 的默认值为1,这个参数在你启用了主题自动创建功能后有效。

log.retention.ms

Kafka 通常根据时间来决定数据可以保留多久。默认使用 log.retention.hours 参数来配置时间,默认是 168 个小时,也就是一周。除此之外,还有两个参数 log.retention.minutes 和 log.retentiion.ms 。这三个参数作用是一样的,都是决定消息多久以后被删除,推荐使用 log.retention.ms。

log.retention.bytes

另一种保留消息的方式是判断消息是否过期。它的值通过参数 log.retention.bytes 来指定,作用在每一个分区上。也就是说,如果有一个包含 8 个分区的主题,并且 log.retention.bytes 被设置为 1GB,那么这个主题最多可以保留 8GB 数据。所以,当主题的分区个数增加时,整个主题可以保留的数据也随之增加。

log.segment.bytes

上述的日志都是作用在日志片段上,而不是作用在单个消息上。当消息到达 broker 时,它们被追加到分区的当前日志片段上,当日志片段大小到达 log.segment.bytes 指定上限(默认为 1GB)时,当前日志片段就会被关闭,一个新的日志片段被打开。如果一个日志片段被关闭,就开始等待过期。这个参数的值越小,就越会频繁的关闭和分配新文件,从而降低磁盘写入的整体效率。

log.segment.ms

上面提到日志片段经关闭后需等待过期,那么 log.segment.ms 这个参数就是指定日志多长时间被关闭的参数和,log.segment.ms 和 log.retention.bytes 也不存在互斥问题。日志片段会在大小或时间到达上限时被关闭,就看哪个条件先得到满足。

message.max.bytes

broker 通过设置 message.max.bytes 参数来限制单个消息的大小,默认是 1000 000, 也就是 1MB,如果生产者尝试发送的消息超过这个大小,不仅消息不会被接收,还会收到 broker 返回的错误消息。跟其他与字节相关的配置参数一样,该参数指的是压缩后的消息大小,也就是说,只要压缩后的消息小于 mesage.max.bytes,那么消息的实际大小可以大于这个值

这个值对性能有显著的影响。值越大,那么负责处理网络连接和请求的线程就需要花越多的时间来处理这些请求。它还会增加磁盘写入块的大小,从而影响 IO 吞吐量。

  1. 启动kafka
./bin/kafka-server-start.sh -daemon ./config/server.properties

  1. 检查服务是否启动
# 执行命令 jps
6201 QuorumPeerMain
7035 Jps
6972 Kafka

  1. 验证
  • 创建topic
bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 2 --partitions 1 --topic test

--replication-factor 2   复制两份

--partitions 1 创建1个分区

--topic 创建主题

  • 查看topic是否创建成功
bin/kafka-topics.sh --list --bootstrap-server 127.0.0.1:9092

  • 创建生产者
./kafka-console-producer.sh --broker-list 127.0.0.1:9092 --topic test

  • 创建消费者
bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --topic test --from-beginning

  • 查看topic状态

bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic test


# 下面是显示的详细信息
Topic:cxuantopic PartitionCount:1 ReplicationFactor:2 Configs:
Topic: cxuantopic Partition: 0 Leader: 1 Replicas: 1,2 Isr: 1,2

# 分区为为1  复制因子为2   主题 cxuantopic 的分区为0 
# Replicas: 0,1   复制的为1,2

  1. kafka集群配置

多台机器启动集群环境,配置改动很少,都很简单

  • 配置修改
# 这里多台服务器,改成不一样的
broker.id=1
# 改为自己的ip地址
advertised.listeners=PLAINTEXT://173.16.250.32:9092
# 配置所有zk节点
zookeeper.connect=173.16.250.32:2181,173.16.250.31:2181

其他的按照正常上面的流程,重新启动即可。

如果遇到

开发生产者及消费者

  1. maven依赖

    <dependency>
      <groupId>org.springframework.kafka</groupId>
      <artifactId>spring-kafka</artifactId>
    </dependency>

  1. 配置

spring:
  kafka:
    consumer:
      bootstrap-servers: 173.16.250.32:9092
      # 配置消费者消息offset是否自动重置(消费者重连会能够接收最开始的消息)
      auto-offset-reset: earliest
    producer:
      bootstrap-servers: 173.16.250.32:9092
      # 发送的对象信息变为json格式
      value-serializer: org.springframework.kafka.support.serializer.JsonSerializer

  1. 生产者服务
package cn.huadingyun.bp.tidb.sync.provider;

import lombok.RequiredArgsConstructor;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
 * @author rubik
 * created of 2024/4/11 20:24 for cn.huadingyun.bp.tidb.sync.provider
 */
@Component
@RequiredArgsConstructor
public class KafkaProducerService {

    private final KafkaTemplate<String, Object> kafkaTemplate;

    public void send(String topic, Object message) {
        kafkaTemplate.send(topic, message);
    }

}

  1. 生产者接口
package cn.huadingyun.bp.tidb.sync.api;

import cn.huadingyun.bp.tidb.sync.provider.KafkaProducerService;
import cn.huadingyun.framework.dto.model.Result;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
 * @author rubik
 * created of 2024/4/11 20:22 for cn.huadingyun.bp.tidb.sync.api
 */
@RestController
@RequestMapping("kafka")
@RequiredArgsConstructor
public class KafkaProvider {

    private final KafkaProducerService kafkaProducerService;

    @GetMapping("send")
    public Result<String> send() {
        kafkaProducerService.send("test", "hello");
        return Result.success("okk");
    }

}

  1. 消费者
package cn.huadingyun.bp.tidb.sync.listener;

import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * @author rubik
 * created of 2024/4/11 20:01 for cn.huadingyun.bp.tidb.sync.listener
 */
@Component
public class KafkaListenerDemo {


    @KafkaListener(topics = {"test"}, groupId = "test")
    public void listen(String message) {
        System.err.println(message);
    }
}

注意,这里的groupId必须指定,否则启动会报错。可以在配置的时候统一指定

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1658681.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Angular入门

Angular版本&#xff1a;Angular 版本演进史概述-天翼云开发者社区 - 天翼云 安装nodejs&#xff1a;Node.js安装与配置环境 v20.13.1(LTS)-CSDN博客 Angular CLI是啥 Angular CLI 是一个命令行接口(Angular Command Line Interface)&#xff0c;是开发 Angular 应用的最快、最…

【Java基础】时间相关的类

需要引入类包java.util.Date* 用构造方法得到当前时间 Date d new Date(); 用构造方法将long类型的时间值转成日期 Date dd new Date(time); 得到当前时间的毫秒值 Long System.currentTimeMillis() 把Date类型转为long类型 long getTime() Date d new Date();//重写了…

亚马逊云科技中国峰会:与你开启云计算与前沿技术的探索之旅

亚马逊云科技中国峰会&#xff1a;与你开启云计算与前沿技术的探索之旅 Hello,我是科技博主Maynor&#xff0c;非常高兴地向你们推荐亚马逊云科技中国峰会&#xff0c;这是一场将于 5 月 29 日至 30 日在上海世博中心举办的科技盛会&#xff0c;如果你对云计算、行业发展新趋势…

探索人工智能的深度神经网络:理解、应用与未来

深度神经网络&#xff08;DNNs&#xff09;是一种人工智能模型&#xff0c;其灵感来自于人脑神经元之间的连接。它们由多个层次组成&#xff0c;每一层都包含多个神经元&#xff0c;这些神经元通过权重连接在一起。信息通过网络的输入层传递&#xff0c;并经过一系列隐藏层&…

【Leetcode每日一题】 分治 - 交易逆序对的总数(难度⭐⭐⭐)(74)

1. 题目解析 题目链接&#xff1a;LCR 170. 交易逆序对的总数 这个问题的理解其实相当简单&#xff0c;只需看一下示例&#xff0c;基本就能明白其含义了。 2.算法原理 归并排序的基本思路 归并排序将数组从中间分成两部分&#xff0c;在排序的过程中&#xff0c;逆序对的来…

如何把公章盖在电子档文件上?

将公章盖在电子档文件上&#xff0c;尤其是确保其法律效力和安全性&#xff0c;通常涉及以下步骤&#xff1a; 准备工作 获取合法的电子公章&#xff1a;确保你拥有公司或机构正式授权的电子公章图像&#xff0c;且该图像经过了必要的加密或数字签名处理&#xff0c;以确保其…

特征提取与深度神经网络(二)

关键点/角点检测 2011论文-ORB关键点检测&#xff0c;比SIFT与SURF速度更快。 ORB算法可以看出两个部分组成&#xff1a;快速关键点定位BRIEF描述子生成 Fast关键点检测&#xff1a; 选择当前像素点P&#xff0c;阈值T&#xff0c;周围16个像素点&#xff0c;超过连续N12个像素…

SparkSQL概述

1.1. SparkSQL介绍 SparkSQL&#xff0c;就是Spark生态体系中的构建在SparkCore基础之上的一个基于SQL的计算模块。SparkSQL的前身不叫SparkSQL&#xff0c;而是叫做Shark。最开始的时候底层代码优化、SQL的解析、执行引擎等等完全基于Hive&#xff0c;总是Shark的执行速度要比…

SpringCloud:认识微服务

程序员老茶 &#x1f648;作者简介&#xff1a;练习时长两年半的Java up主 &#x1f649;个人主页&#xff1a;程序员老茶 &#x1f64a; P   S : 点赞是免费的&#xff0c;却可以让写博客的作者开心好久好久&#x1f60e; &#x1f4da;系列专栏&#xff1a;Java全栈&#…

让数据更「高效」一点!IvorySQL在Neon平台上的迅速部署和灵活应用

IvorySQL本身就是一个100%兼容PostgreSQL最新内核的开源数据库系统&#xff0c;而Neon Autoscaling Platform通常支持多种数据库和应用程序。将IvorySQL集成到该平台后&#xff0c;可以进一步增强与其他系统和应用程序的兼容性&#xff0c;同时更全面的体验IvorySQL的Oracle兼容…

lint 代码规范,手动修复,以及vscode的第三方插件eslint自动修复

ESlint代码规范 不是语法规范&#xff0c;是一种书写风格&#xff0c;加多少空格&#xff0c;缩进多少&#xff0c;加不加分号&#xff0c;类似于书信的写作格式 ESLint:是一个代码检查工具&#xff0c;用来检查你的代码是否符合指定的规则(你和你的团队可以自行约定一套规则)…

【3dmax笔记】032: 编辑顶点

一、编辑顶点概述 (1)启动安装好的3dmax软件。 (2)选择顶视图,用图形画出一个矩形。 (3)选择矩形,右击鼠标,将矩形转换成可编辑样条线。 (4)进入顶点层级。 展开可编辑样条线,选择顶点层级(快捷键为1,在不展开样条线的情况下也可以选择顶点层级)。选择后,可以…

python:做柱状图

import matplotlib.pyplot as plt # 数据 categories [A, B, C, D] values [23, 45, 56, 78] # 创建柱状图 plt.bar(categories, values) # 添加标题和标签 plt.title(柱状图示例) plt.xlabel(类别) plt.ylabel(数值) # 显示图形 plt.show() D:\software\新建文件夹\python\L…

TODESK远程开机的原理

在现代计算机技术飞速发展的背景下&#xff0c;远程控制软件成为我们日常工作中不可或缺的工具。其中&#xff0c;ToDesk作为一款高效且易用的远程控制软件&#xff0c;备受用户青睐。那么&#xff0c;ToDesk远程开机的原理是什么呢&#xff1f;本文将为你揭晓这个秘密。 KKVie…

[淘宝销量]—采集分析—实例参考▶

[干货] 本文爬取淘宝的搜索结果&#xff0c;包含标题、价格、原价、店铺、月销量字段。将结果保存成csv格式&#xff0c;并作简单分析。以手机为例。【淘宝销量】 用到的python库&#xff1a;selenium、urllib、pyquery、pandas。 1.爬取页面分析 1.1 获取URL 打开淘宝&am…

Python | Leetcode Python题解之第79题单词搜索

题目&#xff1a; 题解&#xff1a; class Solution:def exist(self, board: List[List[str]], word: str) -> bool:def dfs(i, j, k):if not 0 < i < len(board) or not 0 < j < len(board[0]) or board[i][j] ! word[k]: return Falseif k len(word) - 1: r…

Linux线程(一)初识线程

目录 一、什么是线程 二、线程和进程的区别 三、线程的操作 1、创建线程 2、获取线程ID 3、线程的终止与等待 4、线程分离 一、什么是线程 在Linux中&#xff0c;线程&#xff08;thread&#xff09;是一种轻量级进程&#xff08;Light-weight Process, LWP&#xff09…

五一超级课堂---Llama3-Tutorial(Llama 3 超级课堂)---第一节 Llama 3 本地 Web Demo 部署

课程文档&#xff1a; https://github.com/SmartFlowAI/Llama3-Tutorial 课程视频&#xff1a; https://space.bilibili.com/3546636263360696/channel/collectiondetail?sid2892740&spm_id_from333.788.0.0 操作平台&#xff1a; https://studio.intern-ai.org.cn/consol…

全面解析C++11与C++20线程(含内容)

昨晚跟一些小伙伴做了第一次直播尝试&#xff0c;一起探讨了C11 thread与 C20的jthread&#xff0c;于此同时给大家出了几个问题&#xff0c;在直播之外不会公布答案&#xff0c;所以以后直播还是得跟着走起。 总共有22人参加直播&#xff0c;氛围相当不错&#xff0c;没有录播…

Linux 无名信号量(Semaphore)的使用

目录 一、无名信号量的概念二、无名信号量相关函数三、信号量的使用步骤四、应用场景五、测试代码 一、无名信号量的概念 Linux无名信号量&#xff08;Semaphore&#xff09;   在Linux操作系统中&#xff0c;信号量&#xff08;Semaphore&#xff09;是一种用于进程间或线程…