Kafka篇:Kafka搭建、使用、及Flink整合Kafka文档

news2025/1/11 15:57:34

一、Kafka搭建

1、上传并解压改名

tar -xvf kafka_2.11-1.0.0.tgz

mv kafka_2.11-1.0.0 kafka-1.0.0

2、配置环境变量

      vim /etc/profile

export KAFKA_HOME=/usr/local/soft/kafka-1.0.0
export PATH=$PATH:$KAFKA_HOME/bin  

     source /etc/profile (使环境变量生效)

3、修改配置文件

      vim config/server.properties

#将从节点的broker.id修改为1,2

broker.id=0       

#指定数据存放的位置,包含了Kafka集群中所有topic的分区数据 
log.dirs=/usr/local/soft/kafka-1.0.0/data         

#指定Kafka如何连接到其依赖的ZooKeeper集群
zookeeper.connect=master:2181,node1:2181,node2:2181/kafka       

4、将kafka文件同步到node1,node2

4.1  同步kafka文件

scp -r kafka-1.0.0/ node1:`pwd`
scp -r kafka-1.0.0/ node2:`pwd`

4.2  因为kafka不是主从架构的分布式组件,而是去中心化的架构,没有主从节点之分,所以也要将master中的而环境变量同步到node1和node2中

scp /etc/profile node1:/etc/
scp /etc/profile node2:/etc/

4.3  在ndoe1和node2中执行source

source /etc/profile

4.4  修改node1和node2中的broker.id为1,2

# node1
broker.id=1
# node2
broker.id=2

5、启动kafka

前提:已经安装了zookeeper,没安装可以参考以前的写的一个zookeeper的搭建文档

链接:http://t.csdnimg.cn/qpuKV

启动步骤:

5.1 因为kafka使用zookeeper保存元数据需要,所以需要先在三个节点上启动zookeeper(也是去中心化架构)

zkServer.sh start

5.2 查看zookeeper启动的状态

zkServer.sh status       #一个节点Mode:leader,剩余节点Mode: follower说明启动成功

5.3  启动kafka,每个节点中都要启动(去中心化的架构)

# -daemon后台启动

kafka-server-start.sh -daemon /usr/local/soft/kafka-1.0.0/config/server.properties

5.4  测试是否成功

#生产者
kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic 

二、Kafka的使用

1、创建topic

注意:在生产和消费数据时,如果topic不存在会自动创建一个分区为1,副本为1的topic

--replication-factor     #每一个分区的副本数量, 同一个分区的副本不能放在同一个节点,副本的数量不能大于kafak集群节点的数量

--partition          #分区数, 根据数据量设置

--zookeeper      #zk的地址,将topic的元数据保存在zookeeper中 ​

kafka-topics.sh --create --zookeeper master:2181,node1:2181,node2:2181/kafka --replication-factor 2 --partitions 3 --topic bigdata

删除topic
kafka-topics.sh --delete --topic  bigdata--zookeeper master:2181,node1:2181,node2:2181/kafka

 

2、查看topic描述信息

kafka-topics.sh --describe  --zookeeper master:2181,node1:2181,node2:2181/kafka --topic kfkuse

3、获取所有topic

kafka-topics.sh --list --zookeeper master:2181,node1:2181,node2:2181/kafka

__consumer_offsetsL是kafka自带的用于保存消费偏移量的topic

 

4、创建控制台生产者

kafka-console-producer.sh --broker-list master:9092,node1:9092,node2:9092 --topic kfkuse

5、创建控制台消费者

--from-beginning   从头消费,, 如果不在执行消费的新的数据

kafka-console-consumer.sh --bootstrap-server master:9092,node1:9092,node2:9092 --from-beginning --topic bigdata

6、kafka数据保存的方式及相关事项

# 1、保存的文件
/usr/local/soft/kafka_2.11-1.0.0/data

# 2、每一个分区每一个副本对应一个目录

# 3、每一个分区目录中可以有多个文件, 文件时滚动生成的
00000000000000000000.log
00000000000000000001.log
00000000000000000002.log

# 4、滚动生成文件的策略
log.segment.bytes=1073741824           #达到1GB左右滚动生成一个文件
log.retention.check.interval.ms=300000      #每隔5分钟检查一次文件数据是否被清理

# 5、文件删除的策略,默认为7天,以文件为单位删除
log.retention.hours=168

三、Flink整合kafka

1、IDEA中整合

  1.1 根据自己的flink版本添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-connector-kafka</artifactId>
    <version>1.15.2</version>
</dependency>

  1.2 案例

    案例1:编写flink代码,使用flink从kafka中读数据

import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * 使用flink从kafka中读数据
 */
public class Demo1KafkaSource {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        /*
         *内置的位点初始化器包括:
         *     // 从消费组提交的位点开始消费,不指定位点重置策略
         *     .setStartingOffsets(OffsetsInitializer.committedOffsets())
         *     // 从消费组提交的位点开始消费,如果提交位点不存在,使用最早位点
         *     .setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.EARLIEST))
         *     // 从时间戳大于等于指定时间戳(毫秒)的数据开始消费
         *     .setStartingOffsets(OffsetsInitializer.timestamp(1657256176000L))
         *     // 从最早位点开始消费
         *     .setStartingOffsets(OffsetsInitializer.earliest())
         *     // 从最末尾位点开始消费
         *     .setStartingOffsets(OffsetsInitializer.latest());
         */
        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表
                .setTopics("hash_students")     //指定消费的topic
                .setStartingOffsets(OffsetsInitializer.earliest())     //从最早点开始消费
                .setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8
                .build();

        //使用kafka source
        DataStreamSource<String> studentsDS = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        studentsDS.print();

        env.execute();

    }
}

 

案例2:使用flink从kafka中读json数据,解析json格式的数据

使用阿里的fastjson工具,要先添加fastjson依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.83</version>
</dependency>
import com.alibaba.fastjson.JSON;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo3Json {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        //创建kafka source
        KafkaSource<String> source = KafkaSource.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")     //设置kafka集群列表
                .setTopics("cars")     //指定消费的topic
                .setGroupId("flink_group1")     //指定消费者组
                .setStartingOffsets(OffsetsInitializer.earliest())     //从最早尾位点开始消费
                .setValueOnlyDeserializer(new SimpleStringSchema())      //设置读取数据的编码格式utf-8
                .build();

        //使用kafka source
        DataStreamSource<String> kafkaSource = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        //解析json格式的数据
        DataStream<car> cars = kafkaSource.map(line -> JSON.parseObject(line, car.class));

        cars.print();
        env.execute();
    }
}


@Data
@AllArgsConstructor
@NoArgsConstructor
class car{
    private String car;
    private String city_code;
    private String county_code;
    private String card;
    private String camera_id;
    private String orientation;
    private Long road_id;
    private Long time;
    private Double speed;
}

 

案例3:将本地文件的数据生产(写入)到kafka中

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

public class Demo2FIleToKafkaSink {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<String> carsDS = env.readTextFile("flink/data/cars_sample.json");

        //创建kafka sink
        KafkaSink<String> sink = KafkaSink.<String>builder()
                .setBootstrapServers("master:9092,node1:9092,node2:9092")//kafka集群列表
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic("cars")//指定topic
                        .setValueSerializationSchema(new SimpleStringSchema())//指定数据格式
                        .build()
                )
                //指定数据处理的语义
                .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .build();

        //使用kafka sink
        carsDS.sinkTo(sink);

        env.execute();
    }
}

 

2、集群中整合

将flink-sql-connector-kafka-1.15.2.jar包上传到Flink的lib目录下就行

注意:一些外部的依赖,比如说上述案例中使用的fastjson依赖也要上传,否则会报错或者会运行不成功。

至于在什么集群中运行,以及集群中运行的详细步骤,我在Flink系列三中以及详细写过,在此不再赘述了。

文章链接:http://t.csdnimg.cn/W1yZL

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1721417.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

SPHINX的输出文档格式

SPHINX的输出文档格式 SPHINX的输出文档格式更多信息 SPHINX的输出文档格式 用rst编写&#xff0c;然后用sphinx-build进行编译&#xff0c;还是效果相当不错地&#xff0c;只要掌握了格式&#xff0c;可以一次编译&#xff0c;多种格式输出&#xff0c;主要是用的可能是html和…

Java对象的比较——equals方法,Comparable接口,Comparator接口

Java对象的比较——equals方法&#xff0c;Comparable接口&#xff0c;Comparator接口 1. equals方法2. Comparable接口3. Comparator接口 1. equals方法 在判断两个整数是否相同时&#xff0c;我们可以使用以下方式&#xff1a; System.out.println(1 2); System.out.printl…

多普云DPGo摄影测量航线规划软件

1.航线代规划。支持GSR航线&#xff08;大疆精灵4RTKSDK遥控器&#xff09;、DJI Pilot航线&#xff08;大疆精灵4RTK、M300&#xff09;、DJI Pilot2航线&#xff08;大疆精灵4RTK、M300、Mavic3E&#xff09;。 2.DPGO三维模型满足毫米级精度要求&#xff1a;已知被摄范围&am…

基于java的CRM客户关系管理系统(二)

目录 第二章 相关技术介绍 2.1 后台介绍 2.1.1 B/S平台模式 2.1.2 MVC 2.1.3 Spring 2.1.4 Hibernate 2.1.5 Struts 2.2 前端介绍 2.2.1 JSP网页技术 2.3 开发工具 2.4 本章小结 前面内容请移步 基于java的CRM客户关系管理系统&#xff08;二&#xff09; 资源…

vscode编辑器创建分支注意事项?!

最近在公司开发项目时&#xff0c;不小心将自己分支的东西提交到公司的master的分支&#xff0c;大家看看是什么情况&#xff1f; 先上图&#xff1a; 从图上看&#xff0c;我这边用了GITLENS这个插件&#xff0c;在创建分支时&#xff0c;有个create branch from&#xff0c;有…

如何选择软件开发服务商

在当今数字化快速发展的时代&#xff0c;软件已经成为企业运营不可或缺的一部分。然而&#xff0c;对于许多非技术背景的企业来说&#xff0c;如何选择一个合适的软件开发服务商却是一个不小的挑战。本文将从需求分析、服务商评估、合同条款以及后期维护等方面&#xff0c;详细…

【GD32F303红枫派使用手册】第五节 FMC-片内Flash擦写读实验

5.1 实验内容 通过本实验主要学习以下内容&#xff1a; FMC控制器原理&#xff1b; FMC擦写读操作&#xff1b; 5.2 实验原理 5.2.1 FMC控制器原理 FMC即Flash控制器&#xff0c;其提供了片上Flash操作所需要的所有功能&#xff0c;在GD32F303系列MCU中&#xff0c;Flash…

环卫车北斗GPS视频监控定位解决方案的应用与优势

一、引言 随着城市化进程的加快&#xff0c;环卫车作为城市环境卫生的重要保障力量&#xff0c;其运行效率与安全性直接关系到城市形象与居民生活品质。然而&#xff0c;传统的环卫车管理模式往往存在信息不对称、调度不合理、行驶不规范等问题&#xff0c;导致城市道路污染和…

Java中的软引用,你了解吗?

哈喽&#xff0c;各位小伙伴们&#xff0c;你们好呀&#xff0c;我是喵手。运营社区&#xff1a;C站/掘金/腾讯云&#xff1b;欢迎大家常来逛逛 今天我要给大家分享一些自己日常学习到的一些知识点&#xff0c;并以文字的形式跟大家一起交流&#xff0c;互相学习&#xff0c;一…

打工人福音,办公神器软件

今天分享2个免VIP的办公神器app&#xff0c;建议所有手机安装&#xff0c;第一个布丁扫描http://www.budingscan.com &#xff0c;无广告不收费&#xff0c;这是VIVO 推出的一款完全免费的扫描APP&#xff0c;支持文档&#xff06;证件扫描、OCR文字&#xff06;表格识别提取、…

解决Windows 10通过SSH连接Ubuntu 20.04时的“Permission Denied”错误

在使用SSH连接远程服务器时&#xff0c;我们经常可能遇到各种连接错误&#xff0c;其中“Permission denied, please try again”是较为常见的一种。本文将分享一次实际案例的解决过程&#xff0c;帮助你理解如何排查并解决这类问题。 问题描述 在尝试从Windows 10系统通过SS…

js:flex弹性布局

目录 代码&#xff1a; 1、 flex-direction 2、flex-wrap 3、justify-content 4、align-items 5、align-content 代码&#xff1a; <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta name"viewp…

Mysql基础教程(13):GROUP BY

MySQL GROUP BY 【 GROUP BY】 子句用于将结果集根据指定的字段或者表达式进行分组。 有时候&#xff0c;我们需要将结果集按照某个维度进行汇总。这在统计数据的时候经常用到&#xff0c;考虑以下的场景&#xff1a; 按班级求取平均成绩。按学生汇总某个人的总分。按年或者…

【spring】Spring Boot3.3.0发布啦

spring最新版本 springboot官网&#xff1a;Spring Boot :: Spring Boot Spring Boot 3.3 发行说明&#xff1a;https://github.com/spring-projects/spring-boot/wiki/Spring-Boot-3.3-Release-Notes 开发环境的要求对比表 Spring BootJDKSpringMavenGradle3.3.017 ~ 226.1…

Nginx实战:LUA脚本_环境配置安装

目录 一、什么是LUA脚本 二、Nginx中的LUA脚本 1、主要特点 2、用途 三、如何在nginx中使用LUA脚本 1、原生nginx 2、OpenResty 3、nginx lua配置验证 一、什么是LUA脚本 Nginx Lua 脚本是 Nginx 与 Lua 语言集成的结果&#xff0c;它允许你使用 Lua 语言编写Nginx 模块…

【Redis】List源码剖析

大家好&#xff0c;我是白晨&#xff0c;一个不是很能熬夜&#xff0c;但是也想日更的人。如果喜欢这篇文章&#xff0c;点个赞&#x1f44d;&#xff0c;关注一下&#x1f440;白晨吧&#xff01;你的支持就是我最大的动力&#xff01;&#x1f4aa;&#x1f4aa;&#x1f4aa…

使用第三方工具percona-xtrabackup进行数据备份与恢复

目录 准备工作 开始安装 innobackupex的使用 完全备份 增量备份 数据恢复 本次需要用到的软件 mysql 5.7.35percona-xtrabackup-24-2.4.8 ps&#xff1a;---MySQL必须是5.7的版本&#xff0c;在8.0之后已经不支持 percona-xtrabackup-24 系统版本CentOS7.9 准备工作 …

面试题 17.05. 字母与数字(前缀和)

给定一个放有字母和数字的数组&#xff0c;找到最长的子数组&#xff0c;且包含的字母和数字的个数相同。 返回该子数组&#xff0c;若存在多个最长子数组&#xff0c;返回左端点下标值最小的子数组。若不存在这样的数组&#xff0c;返回一个空数组。 示例 1: 输入: ["…

SpringBoot+layui实现Excel导入操作

excel导入步骤 第三方插件引入插件 效果图 &#xff08;方法1&#xff09;代码实现&#xff08;方法1&#xff09;Html代码&#xff08; 公共&#xff09;下载导入模板 js实现 &#xff08;方法1&#xff09;上传文件实现 效果图&#xff08;方法2&#xff09;代码实现&#xf…

一碗米线火了24年,蒙自源六一再献新作

当一碗热气腾腾的米线在餐桌上飘香四溢&#xff0c;你是否会想起那个陪伴了无数食客24年的名字——蒙自源&#xff1f;在这个充满欢笑与童真的六一儿童节&#xff0c;蒙自源米线品牌再度发力&#xff0c;用全新的儿童餐系列为孩子们带来了一份特别的节日礼物。 蒙自源&#xf…