# Kafka_深入探秘者(1):初识 kafka

news2024/7/1 14:12:33

Kafka_深入探秘者(1):初识 kafka

一、kafka 特性

1、Kafka :最初是由 Linkedln 公司采用 Scala 语言开发的一个多分区、多副本并且基于 ZooKeeper 协调的分布式消息系统,现在已经捐献给了 Apache 基金会。目前 Kafka 已经定位为一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流处理等多种特性而被广泛应用。

2、Apache Kafka 是一个分布式的发布-订阅消,息系统,能够支撑海量数据的数据传递。在离线和实时的消息处理业务系统中,Kafka 都有广泛的应用。Kafka 将消息持久化到磁盘中,并对消息创建了备份保证了数据的安全。Kafka 在保证了较高的处理速度的同时,又能保证数据处理的低延迟和数据的零丢失。

3、示例图:

在这里插入图片描述

4、kafka 特性

  • (1)高吞吐量、低延迟: kafka 每秒可以处理几十万条消息,它的延迟最低只有几毫秒,每个主题可以分多个分区,消费组对分区进行消费操作;

  • (2)可扩展性: kafka 集群支持热扩展;

  • (3)持久性、可靠性: 消息被持久化到本地磁盘,并且支持数据备份防止数据丢失;

  • (4)容错性: 允许集群中节点失败(若副本数量为n,则允许n-1个节点失败);

  • (5)高并发: 支持数千个客户端同时读写;

5、kafka 使用场景

  • (1)日志收集:

一个公司可以用 Kafka 可以收集各种服务的 l0g,通过kafka以统一接口服务的方式开放给各种 consumer,例如 Hadoop、Hbase、Solr 等;

  • (2)消息系统:

解耦和生产者和消费者、缓存消息等:

  • (3)用户活动跟踪:

Kafka经常被用来记录web用户或者app用户的各种活动,如浏览网页、搜索、点击等活动,这些活动信息被各个服务器发布到 kafka 的 topic 中,然后订阅者通过订阅这些 topic 来做实时的监控分析,或者装载到 Hadoop、数据仓库中做离线分析和挖掘;

  • (4)运营指标:

Kafka 也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。

  • (5)流式处理:

比如 spark streaming 和 storm。

6、技术优势

1)可伸缩性: Kafka 的两个重要特性造就了它的可伸缩性。

  • 1、Kafka 集群在运行期间可以轻松地扩展或收缩(可以添加或删除代理),而不会宕机。
  • 2、可以扩展一个 Kafka 主题来包含更多的分区。由于一个分区无法扩展到多个代理,所以它的容量受到代理磁盘空间的限制。能够增加分区和代理的数量意味着单个主题可以存储的数据量是没有限制的。

2)容错性和可靠性:

Kafka 的设计方式使某个代理的故障能够被集群中的其他代理检测到。由于每个主题都可以在多个代理上复制,所以集群可以在不中断服务的情况下从此类故障中恢复并继续运行。

3)吞吐量: 代理能够以超快的速度有效地存储和检索数据

二、kafka 概念详解

1、kafka 官网: https://kafka.apache.org/

2、kafka 概念 流程图:

2.png

3、Producer

生产者即数据的发布者,该角色将消息发布到 Kafka 的 topic 中。broker 接收到生产者发送的消息后,broker 将该消息追加到当前用于追加数据的 segment 文件中。生产者发送的消息,存储到一个 partition 中,生产者也可以指定数据存储的 partition。

4、Consumer

消费者可以从 broker 中读取数据。消费者可以消费多个 topic 中的数据。

5、Topic

在 Kafka 中,使用一个类别属性来划分数据的所属类,划分数据的这个类称为 topic。如果把Kafka看做为一个数据库,topic 可以理解为数据库中的一张表,topic 的名字即为表名。

6、Partition

topic 中的数据分割为一个或多个 partition。每个 topic 至少有一个 partition。每个 partition 中的数据使用多个 segment 文件存储。partition 中的数据是有序的,partition 间的数据丢失了数据的顺序。如果 topic 有多个 partition,消费数据时就不能保证数据的顺序。在需要严格保证消息的消费顺序的场景下,需要将 partition 数目设为 1。

7、Partition offset

每条消息都有一个当前 Partition 下唯一的64字节的 offset,它指明了这条消息的起始位置。

8、Replicas of partition

副本是一个分区的备份。副本不会被消费者消费,副本只用于防止数据丢失,即消费者不从为 folower 的 partition 中消费数据,而是从为 leader 的 partition 中读取数据。副本之间是一主多从的关系。

9、Broker

Kafka 集群包含一个或多个服务器,服务器节点称为 broker。broker 存储 topic 的数据。如果某 topic 有N个 partition,集群有N个 broker,那么每个 broker 存储该 topic 的一个 partition。如果某 topic 有N个 partition,集群有(N+M)个 broker,那么其中有N个 broker 存储该 topic 的一个 partition,剩下的M个 broker 不存储该 topic 的 partition 数据。如果某 topic 有N个 partition,集群中 broker 数目少于N个,那么一个 broker 存储该 topic 的一个或多个 partition。在实际生产环境中,尽量避免这种情况的发生,这种情况容易导致 Kafka 集群数据不均衡。

10、Leader

每个 partition 有多个副本,其中有且仅有一个作为 Leader,Leader 是当前负责数据的读写的 partition。

11、Follower

Follower 跟随 Leader,所有写请求都通过 Leader 路由,数据变更会广播给所有 Follower,Follower 与 Leader 保持数据同步。如果 Leader 失效,则从 Follower 中选举出一个新的Leader。当 Follower 与 Leader 挂掉、卡住或者同步太慢,leader 会把这个 follower 从 “in syncreplicas"(ISR) 列表中删除,重新创建一个 Follower。

12、Zookeeper

Zookeeper 负责维护和协调 broker。当 Kafka 系统中新增了 broker 或者某个 broker 发生故障失效时,由 ZooKeeper 通知生产者和消费者。生产者和消费者依据 Zookeeper 的 broker 状态信息与 broker 协调数据的发布和订阅任务。

13、AR(Assigned Replicas)

分区中所有的副本统称为 AR。

14、ISR(In-Sync Replicas)

所有与 Leader 部分保持一定程度的副(包括 Leader 副本在内)本组成 ISR。

15、OSR(Out-of-Sync-Replicas)

与 Leader 副本同步滞后过多的副本。

16、HW(High Watermark)

高水位,标识了一个特定的 offset,消费者只能拉取到这个 offset 之前的消息。

17、LEO(Log End offset)

即日志末端位移(log end offset),记录了该副本底层日志(l0g)中下一条消息的位移值。注意是下一条消息!也就是说,如果 LEO=10,那么表示该副本保存了10条消息,位移值范围是[0,9]。

在这里插入图片描述

三、kafka 环境配置 jdk、zookeeper 下载安装

1、 jdk-12 下载安装

Oracle官网下载JDK

https://www.oracle.com/technetwork/java/javase/downloads/jdk12-downloads-5295953.html

Java 开发工具包_JDK 各个版本 下载:

jdk.java.net
http://jdk.java.net/

Oracle 官网:
https://www.oracle.com/java/technologies/javase-downloads.html

  1. JDK-7 下载:
    http://jdk.java.net/java-se-ri/7

  2. JDK-8 下载:
    https://jdk.java.net/java-se-ri/8-MR5

  3. JDK-9 下载:
    http://jdk.java.net/java-se-ri/9

Java 平台标准版 9 参考实现
为Java SE 9(官方参考实现JSR 379)仅在从现有的开源代码是基于JDK 9项目在OpenJDK的社区。

  1. JDK-10 下载:
    http://jdk.java.net/java-se-ri/10

  2. JDK-11 下载:
    http://jdk.java.net/java-se-ri/11

  3. JDK-12 下载:
    http://jdk.java.net/java-se-ri/12

  4. JDK-13 下载:
    http://jdk.java.net/java-se-ri/13

  5. JDK-14 下载:
    http://jdk.java.net/java-se-ri/14

  6. JDK-15 下载:
    http://jdk.java.net/java-se-ri/15

  7. JDK-16 下载:
    http://jdk.java.net/java-se-ri/16

2、jdk-12 环境变量配置


sudo vim /etc/profile 

# 添加 jdk 环境变量配置
export JAVA_HOME=/opt/java/jdk-12.0.1
export JRE_HOME-${JAVA_HOME}/jre
#export MAVEN_HOME=/opt/maven/apache-maven-3.5.0
export CLASSPATH=.:${JAVA_HOME}/lib:${JRE_HOME}/lib
#export PATH=S{JAVA_HOME}/bin:${MAVEN_HOME}/bin:$PATH
export PATH=S{JAVA_HOME}/bin:$PATH

# 或者简单配置
export JAVA_HOME=/usr/local/java/jdk-12
export PATH=${JAVA_HOME}/bin:$PATH

# 配置完 JDK 记得断开连接重新连接 或者重启系统。

# 测试 jdk 是否安装配置成功
java -version

3、zookeeper 下载安装

1)Zookeeper 是安装 Kafka 集群的必要组件,Kafka 通过 Zookeeper 来实施对元数据信息的管理,包括集群、主题分区等内容。

2)同样在官网下载安装包到指定目录解压缩,步骤如下:

ZooKeeper 官网: http://zookeeper.apache.org
https://github.com/apache/zookeeper/tags?after=release-3.8.0-1

3)把 zookeeper 上传至服务器,并解压


# 切换目录:
cd /usr/local/zookeeper/
# 解压即安装
tar -zxvf apache-zookeeper-3.6.3-bin.tar.gz -C /usr/local/zookeeper/

4)修改 Zookeeper 的配置文件,首先进入安装路径 conf 目录,并将 zoo_sample.cfg 文件修改为 zoo.cfg,并对核心参数进行配置。文件内容如下:


# 切换目录
cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/conf/

# 将 zoo_sample.cfg 文件修改为 zoo.cfg
mv zoo_sample.cfg zoo.cfg

# 对核心参数进行配置
vim zoo.cfg


# The number of milliseconds of each tick
#zk服务器的心跳时间
tickTime-2080
# The number of ticks that the initial
#synchronization phase can take
#投票选举新Leader的初始化时间
initlimit=10
# The number of ticks that can pass between
# sendidg a request and getting an acknowledgementsyncLimit-5
# the directory where the snapshot is stored.
# do not use /tmp for storage, /tmp here is just
# example sakes.

# 数据目录(需要新建此目录)
dataDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/data

# 日志目录
dataLogDir=/usr/local/zookeeper/apache-zookeeper-3.6.3-bin/log

# the port at which the clients will connect
#Zookeeper对外服务端口,保持默认
clientPort-2181

4、启动 Zookeeper 命令: bin/zkServer.sh start


# 切换目录
cd /usr/local/zookeeper/apache-zookeeper-3.6.3-bin/
# 启动 Zookeeper
bin/zkServer.sh start
# 查询 zookeeper 是否启动成功
ps -ef | grep zookeeper 
# 或者 
jps -l

四、kafka 环境配置 kafka 下载安装

1、 下载安装解压 kafka

官网下载地址: https://kafka.apache.org/downloads


# 切换目录
cd /usr/local/kafka/

# 解压即安装
tar -zxvf kafka-2.12.2.8.0.tar.gz -C /usr/local/kafka/

2、配置 kafka 参数


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 修改 kafka 配置参数:
vim config/server.properties

# server.properties 配置中需要关注以下几个参数:

# 表示 broker 的编号,如果集群中有多个 broker,则每个 broker 的编号需要设置的不同
broker.id=0 

# brokder 对外提供的服务入口地址( kafka 监听地址:你的虚拟机 IP 地址)
# listeners=PLAINTEXT://127.0.0.1:9092 
listeners=PLAINTEXT://172.18.30.110:9092 

# 设置存放消息日志文件的地址
log.dirs=/tmp/kafka/log 

# Kafka 所需 Zookeeper 集群地址
zookeeper.connect=lolalhost:2181 

3、启动 kafka


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 启动 kafka
bin/kafka-server-start.sh config/server.properties

4、查询 kafka 是否启动成功:


# 重新打开一个终端

# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 查询 kafka 是否启动成功
ps -ef | grep kafka 
# 或者 
jps -l

五、kafka 消息的生产与消费

1、创建一个主题

1)命令: bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1

2)参数说明:

–zookeeper :指定了 kafka 所连接的 zookeeper 服务地址。
–topic : 指定了所要创建主题的名称
–partitions : 指定了分区个数
–replication-factor : 指定了副本因子
–create : 创建主题的动作指令。


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 创建一个名为 heima 的 主题
bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic heima --partitions 2 --replication-factor 1

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --create --topic heima --partitions 2 --replication-factor 1

2、展示出当前所有主题


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 展示出当前所有主题
bin/kafka-topics.sh --zookeeper 1ocalhost:2181 --1ist

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --1ist

3、 查看主题详情:


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 查看主题详情
bin/kafka-topics.sh --zookeeper localhost:2181 --describe --topic heima

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-topics.sh --zookeeper 172.18.30.110:2181 --describe --topic heima

4、启动消费端接收消息:

命令: bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima

参数说明:
–bootstrap-server 指定了连接 Kafka 集群的地址
–topic 指定了消费端订阅的主题


# 重新打开一个终端:

# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 启动消费端接收消息
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic heima

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-console-consumer.sh --bootstrap-server 172.18.30.110:9092 --topic heima

5、生产端发送消息

命令: bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima

参数说明:
–broker-list 指定了连接的 Kafka 集群的地址
–topic 指定了发送消息时的主题


# 重新打开一个终端:

# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 生产端发送消息
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic heima

# 或者把 localhost 换成 填写你的 虚拟机 IP 地址(如:172.18.30.110):
bin/kafka-console-producer.sh --broker-list 172.18.30.110:9092 --topic heima

在这里插入图片描述

在这里插入图片描述

六、kafka java 第一个程序 001

1、打开 idea 创建 artifactId 名为 kafka_learn 的 maven 工程。


	--> idea --> File 
	--> New --> Project 
	--> Maven 
		Project SDK: ( 1.8(java version "1.8.0_131" ) 
	--> Next 
	--> Groupld : ( djh.it )
		Artifactld : ( kafka_learn )
		Version : 1.0-SNAPSHOT
	--> Name: ( kafka_learn )
		Location: ( ...\kafka_learn\ )	
	--> Finish
	

2、在 kafka_learn 工程的 pom.xml 文件中导入依赖坐标。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>djh.it</groupId>
    <artifactId>kafka_learn</artifactId>
    <version>1.0-SNAPSHOT</version>
    <name>kafka_learn</name>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.1.8.RELEASE</version>
        <relativePath></relativePath>
    </parent>

    <properties>
        <java.version>8</java.version>
<!--        <scala.version>2.11</scala.version>-->
        <scala.version>2.12</scala.version>
        <slf4j.version>1.7.21</slf4j.version>
<!--        <kafka.version>2.0.0</kafka.version>-->
        <kafka.version>2.8.0</kafka.version>
        <lombok.version>1.18.8</lombok.version>
        <junit.version>4.11</junit.version>
        <gson.version>2.2.4</gson.version>
        <protobuff.version>1.5.4</protobuff.version>
<!--        <spark.version>2.3.1</spark.version>-->
        <spark.version>2.4.8</spark.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_${scala.version}</artifactId>
            <version>${kafka.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>

        <dependency>
            <groupId>com.google.code.gson</groupId>
            <artifactId>gson</artifactId>
            <version>${gson.version}</version>
        </dependency>

        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>${junit.version}</version>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>${lombok.version}</version>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-core</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>io.protostuff</groupId>
            <artifactId>protostuff-runtime</artifactId>
            <version>${protobuff.version}</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.9.4</version>
        </dependency>

        <dependency>
            <groupId>com.fasterxml.jackson.module</groupId>
            <artifactId>jackson-module-scala_2.11</artifactId>
            <version>2.9.5</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.apache.zookeeper</groupId>
                    <artifactId>zookeeper</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-log4j12</artifactId>
                </exclusion>
                <exclusion>
                    <groupId>log4j</groupId>
                    <artifactId>log4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql-kafka-0-10_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>
<!-- kafka_learn\pom.xml -->

3、在 kafka_learn 工程中,创建 生产者 ProducerFastStart.java 类

/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java
 *
 *  2024-6-21 创建 生产者 ProducerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;

import io.protostuff.StringSerializer;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import java.util.Properties;

public class ProducerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) {
        Properties properties = new Properties();
        //1)设置 key 序列化器
        properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器
        properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        //properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址
        properties.put("bootstrap.servers", brokerList);
        //properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "hello-kafka-test!");
        try{
            producer.send(record);
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

在这里插入图片描述

七、kafka java 第一个程序 002

1、在 kafka_learn 工程中,创建 消费者 ConsumerFastStart.java 类


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ConsumerFastStart.java
 *
 *  2024-6-21 创建 消费者 ConsumerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;


import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    private static final String groupId = "group.demo";

    public static void main( String[] args ) {
        Properties properties = new Properties();
        //1)设置 key 序列化器
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //2)设置值序列化器
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        //3)设置集群地址
        properties.put("bootstrap.servers", brokerList);
        properties.put("group.id", groupId);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}

2、在 kafka_learn 工程中,启动 消费者 ConsumerFastStart.java 类 和 生产者 ProducerFastStart.java 类,进行测试。会发现,消费者这边接收到,生产者发送的消费。

消费者接收消息.png

八、kafka 优化与总结

1、在 kafka_learn 工程中,修改 生产者 ProducerFastStart.java 类 ,进行优化代码。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ProducerFastStart.java
 *
 *  2024-6-21 创建 生产者 ProducerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    public static void main( String[] args ) {

        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //2)设置重试次数 -- 优化代码
        properties.put(ProducerConfig.RETRIES_CONFIG, 10);

        //3)设置值序列化器 -- 优化代码
        //properties.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        //4)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);

        KafkaProducer<String,String> producer = new KafkaProducer<String, String>(properties);
        ProducerRecord<String,String> record = new ProducerRecord<>(topic, "kafka-demo", "优化-2024-6-21-kafka-test!");
        try{
            producer.send(record);
        }catch (Exception e){
            e.printStackTrace();
        }
        producer.close();
    }
}

2、在 kafka_learn 工程中,修改 消费者 ConsumerFastStart.java 类,进行优化代码。


/**
 *  kafka_learn\src\main\java\djh\it\kafka\learn\chapter1\ConsumerFastStart.java
 *
 *  2024-6-21 创建 消费者 ConsumerFastStart.java 类
 */
package djh.it.kafka.learn.chapter1;


import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
//注意导包,一定要导成 kafka 的序列化包
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.Collections;
import java.util.Properties;

public class ConsumerFastStart {

    //private static final String brokerList = "localhost:9092";
    private static final String brokerList = "172.18.30.110:9092";

    private static final String topic = "heima";

    private static final String groupId = "group.demo";

    public static void main( String[] args ) {
        Properties properties = new Properties();
        //1)设置 key 序列化器 -- 优化代码
        //properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //2)设置值序列化器 -- 优化代码
        //properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

        //3)设置集群地址 -- 优化代码
        //properties.put("bootstrap.servers", brokerList);
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, brokerList);
        //properties.put("group.id", groupId);
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, groupId);

        KafkaConsumer<String,String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singletonList(topic));

        while (true){
            ConsumerRecords<String,String> records = consumer.poll(Duration.ofMillis(1000));
            for(ConsumerRecord<String,String> record : records){
                System.out.println(record.value());
            }
        }
    }
}

在这里插入图片描述

3、初识 kafka 总结:

kafka 启动前,在进行配置 kafka 参数时,即修改 kafka/kafka_2.12-2.8.0/config/server.properties 配置文件时,需要注意以下几点:

  • 1) zookeeper.connect

指明 Zookeeper 主机地址,如果 zookeeper 是集群则以逗号隔开,如:172.6.14.61:2181,172.6.14.62:2181,172.6.14.63:21B1

  • 2) listeners 监听列表 配置:

broker 对外提供服务时绑定的 IP 和端口。多个以逗号隔开,如果监听器名称不是一个安全的协议,listener.security.protocol.map 也必须设置。主机名称设置 0.0.0.0 绑定所有的接口,主机名称为空则绑定默认的接口。如:PLAINTEXT://myhost:9092,SSL:/1:9091 CLIENT://0.0.0.0:9092,REPLICATION://localhost:9093

  • 3) broker.id

broker 的唯一标识符,如果不配置则自动生成,建议配置且一定要保证集群中必须唯一,默认-11.4.4

  • 4) log.dirs

日志数据存放的目录,如果没有配置则使用 log.dir,建议此项配置。

  • 5) message.max.bytes

服务器接受单个消息的最大大小,默认 1000012 约等于 976.6KB。


# 切换目录
cd /usr/local/kafka/kafka_2.12-2.8.0/

# 修改 kafka 配置参数:
vim config/server.properties

# server.properties 配置中需要关注以下几个参数:

# 1Kafka 所需 Zookeeper 集群地址
zookeeper.connect=lolalhost:2181 

# 2)listeners 监听列表 配置:brokder 对外提供的服务入口地址( kafka 监听地址:你的虚拟机 IP 地址)
# listeners=PLAINTEXT://127.0.0.1:9092 
listeners=PLAINTEXT://172.18.30.110:9092 

# 3)表示 broker 的编号,如果集群中有多个 broker,则每个 broker 的编号需要设置的不同
broker.id=0 

# 4)log.dirs 设置存放消息日志文件的地址
log.dirs=/tmp/kafka/log 


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1863714.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

职升网:环评师考试成绩查询时间分享!

成绩查询时间 根据多个省市地区发布的2024年环境影响评价工程师的报名通知&#xff0c;预计2024年环境影响评价工程师考试成绩的查询时间将在2024年7月下旬开启。 成绩合格标准 2024年环境影响评价师考试的合格标准如下&#xff1a; 环境影响评价相关法律法规&#xff1a;科…

基于Python/MNE处理fnirs数据

功能性近红外光谱技术在脑科学领域被广泛应用&#xff0c;市面上也已经有了许多基于MATLAB的优秀工具包及相关教程&#xff0c;如&#xff1a;homer、nirs_spm等。而本次教程将基于Python的MNE库对fNIRS数据进行处理。 本次教程基于&#xff1a;https://mne.tools/stable/auto_…

Vue3 按钮根据屏幕宽度展示折叠按钮

文章目录 一、组件封装二、使用三、最终效果(参考)四、参考 一、组件封装 ButtonFold.vue 1、获取父组件的元素&#xff0c;根据元素创建动态插槽 2、插槽中插入父元素标签。默认效果和初始状态相同。 3、当屏幕宽度缩小时&#xff0c;部分按钮通过 dropdown 的方式展示出来&a…

APT 组织也在利用云存储进行攻击

研究人员发现&#xff0c;各类攻击者都在攻击行动中将恶意脚本、远控木马和诱饵文档等恶意文件上传到云服务器上&#xff0c;各种恶意文件组合起来完成恶意攻击。 某个攻击组织从发送钓鱼邮件到植入远控木马的过程如下所示&#xff1a; 攻击链 多个恶意文件串联起了整个攻击行…

【ai】tx2 nx: yolov4-triton-tensorrt 成功部署server

isarsoft / yolov4-triton-tensorrt运行发现插件未注册? 【ai】tx2 nx: jetson Triton Inference Server 部署YOLOv4 【ai】tx2 nx: jetson Triton Inference Server 运行YOLOv4 对main 进行了重新构建 【ai】tx2 nx :ubuntu查找NvInfer.h 路径及哪个包、查找符号【ai】tx2…

Swift开发——简单App设计

App的界面设计需要具有大量的图像并花费大量的时间,这样的应用不方便学习和交流,这里重点介绍SwiftUI界面元素的用法,通过简单App设计过程的讲解,展示图形用户界面应用程序的设计方法。 01、简单App设计 按照9.1节工程MyCh0901的创建方法,创建一个新的工程MyCh0902,此时工…

基于SSM的医药垃圾分类管理系统

文章目录 项目介绍主要功能截图:部分代码展示设计总结项目获取方式🍅 作者主页:超级无敌暴龙战士塔塔开 🍅 简介:Java领域优质创作者🏆、 简历模板、学习资料、面试题库【关注我,都给你】 🍅文末获取源码联系🍅 项目介绍 基于SSM的医药垃圾分类管理系统,java项目…

浅谈逻辑控制器之while控制器

浅谈逻辑控制器之while控制器 “While控制器”是一种高级控制结构&#xff0c;它允许用户基于特定条件来循环执行其下的子采样器或控制器&#xff0c;直至该条件不再满足。本文旨在详细介绍While控制器的功能、配置方法、使用场景以及实践示例&#xff0c;帮助测试工程师高效利…

龙芯CPU架构上使用向日葵远程工具

原文链接&#xff1a;龙芯CPU架构上使用向日葵远程工具 Hello&#xff0c;大家好啊&#xff01;今天给大家带来一篇在龙芯CPU上使用向日葵远程控制软件的文章。向日葵是一款强大的远程控制软件&#xff0c;能够帮助用户轻松地实现远程桌面访问和控制。本文将详细介绍如何在龙芯…

DevExpress WPF中文教程:Grid - 如何排序、分组、过滤数据(设计时)?

DevExpress WPF拥有120个控件和库&#xff0c;将帮助您交付满足甚至超出企业需求的高性能业务应用程序。通过DevExpress WPF能创建有着强大互动功能的XAML基础应用程序&#xff0c;这些应用程序专注于当代客户的需求和构建未来新一代支持触摸的解决方案。 无论是Office办公软件…

【学习】科大睿智解读ITSS认证中咨询机构的作用

企业拥有ITSS认证这不仅将为企业开拓商机&#xff0c;提升竞争力&#xff0c;还能促使企业改进内部运维流程&#xff0c;提高服务质量&#xff0c;为客户提供更优质的IT运维支持。在ITSS认证中&#xff0c;咨询机构扮演着重要的角色&#xff0c;其主要作用包括以下几个方面&…

Apache APISIX遇到504超时的解决办法

说明&#xff1a; Apache APISIX版本&#xff1a;v3.9.0Apache APISIX Dashboard版本&#xff1a;v3.0.1 当使用Apache APISIX开源网关&#xff0c;通过接口上传或下载大文件等时&#xff0c;出现如下“504 Gateway Time-out”错误信息&#xff0c;它表示网关或代理服务器未能…

通达信擒牛亮剑出击抄底主升浪指标公式源码

通达信擒牛亮剑出击抄底主升浪指标公式源码&#xff1a; ABC1:(CLOSE-REF(CLOSE,1))/REF(CLOSE,1)*100; ABC2:IF(CLOSE>OPEN,CLOSE,OPEN); ABC3:IF(CLOSE>OPEN,OPEN,CLOSE); ABC4:LLV(ABC2,4); ABC5:HHV(ABC3,4); ABC6:ABC2>ABC4 AND ABC3<ABC4 AND ABC2>ABC5 …

emqx4.4.3关于如何取消匿名登录,添加认证用户这件事

emqx4.4.3如何取消匿名登录&#xff0c;添加认证用户 emqx版本&#xff1a;4.4.3 背景&#xff1a;使用docker搭建完emqx后&#xff0c;使用 MQTTX 连接总是超时&#xff1a; 检查Java项目 是否有接口&#xff1a;https://XXXX:80/mqtt/auth? 若有&#xff0c;则具体逻辑查询…

上海亚商投顾:沪指5连阴 工业母机概念逆势走强

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。 一.市场情绪 三大指数今日继续调整&#xff0c;沪指午后一度跌近1%&#xff0c;随后探底回升跌幅收窄&#xff0c;创业板指…

多维度mysql性能优化手段实践

数据库优化维度有四个:硬件升级、系统配置、表结构设计、SQL语句及索引。 优化选择: 优化成本:硬件升级>系统配置>表结构设计>SQL语句及索引。 优化效果:硬件升级<系统配置<表结构设计<SQL语句及索引。 系统配置优化 保证从内存中读取数据 MySQL会在内…

鼠标与键盘交互设计

自学python如何成为大佬(目录):https://blog.csdn.net/weixin_67859959/article/details/139049996?spm1001.2014.3001.5501 在海龟绘图中&#xff0c;也支持与鼠标或键盘的交互操作。它提供了监听键盘按键事件、鼠标事件以及定时器等方法&#xff0c;下面分别进行介绍。 1键…

【python013】pyinstaller打包PDF提取脚本为exe工具

1.在日常工作和学习中&#xff0c;遇到类似问题处理场景&#xff0c;如pdf文件核心内容截取&#xff0c;这里将文件打包成exe可执行文件&#xff0c;实现功能简便使用。 2.欢迎点赞、关注、批评、指正&#xff0c;互三走起来&#xff0c;小手动起来&#xff01; 3.欢迎点赞、关…

视频文件太大怎么压缩?十大视频压缩软件可解决您的问题

您是否已经受够了无法上传视频文件&#xff0c;因为它们太大了&#xff1f;如果您正在积极寻找免费下载的视频压缩软件&#xff0c;下面概述了目前在线提供的 10 个功能更强大的软件。 我们建议您在决定下载之前先通读一下这个简短的介绍。我们不希望您随意点击一个选项&#…

STM32定时器篇——通用定时器的使用(定时中断,PWM输出)

一、通用定时器的类型以及应用功能&#xff1a; 通用定时器有&#xff1a;TIM2、TIM3、TIM4、TIM5&#xff0c;其总线挂载于APB1上&#xff0c;且有基本定时器的所有功能&#xff08;定时中断、主模式触发ADC&#xff09;&#xff0c;并额外具有内外时钟源选择&#xff0c;输入…