大数据学习05-Kafka分布式集群部署

news2025/1/15 22:09:23

系统环境:centos7
软件版本:jdk1.8、zookeeper3.4.8、hadoop2.8.5
本次实验使用版本 kafka_2.12-3.0.0

一、安装

Kafka官网
在这里插入图片描述

将安装包上传至linux服务器上
在这里插入图片描述
解压

tar -zxvf kafka_2.12-3.0.0.tgz -C /home/local/

移动目录至kafka

mv kafka_2.12-3.0.0/ kafka

二、部署

配置Kafka环境

vi /etc/profile

添加如下配置

#kafka
export KAFKA_HOME=/home/local/kafka
export PATH=$PATH:${KAFKA_HOME}/bin

修改server.properties文件

vim /home/local/kafka/config/server.properties

修改参数如下:

broker.id=0
listeners=PLAINTEXT://192.168.245.200:9092
log.dirs=/tmp/kafka-logs
zookeeper.connect=192.168.245.200:2181,192.168.245.201:2181,192.168.245.202:2181

参数说明:
broker.id : 集群内全局唯一标识,每个节点上需要设置不同的值
listeners:这个IP地址也是与本机相关的,每个节点上设置为自己的IP地址
log.dirs :存放kafka消息的
zookeeper.connect : 配置的是zookeeper集群地址

分发kafka安装目录

for i in {1..2};do scp -r /home/local/kafka root@slave${i}:/home/local/;done

三、启动

进入kafka安装目录下

./bin/kafka-server-start.sh ./config/server.properties &

kafka相关命令


创建topic
kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test

显示所有topic
kafka-topics.sh --list --bootstrap-server localhost:9092

产生消息
kafka-console-producer.sh --broker-list localhost:9092 --topic test

消费消息
kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

删除topic
kafka-topics.sh --delete --bootstrap-server localhost:9092 --topic test

四、flink与kafka结合示例

首先 ,构建maven工程,加入flink与kafka的一些依赖:

<?xml version="1.0" encoding="UTF-8"?>

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>bigdata-kafka_2.12-3.0.0</artifactId>
    <version>1.0-SNAPSHOT</version>

    <name>bigdata-kafka_2.12-3.0.0</name>
    <!-- FIXME change it to the project's website -->
    <url>http://www.example.com</url>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <maven.compiler.source>1.8</maven.compiler.source>
        <maven.compiler.target>1.8</maven.compiler.target>
        <flink-version>1.14.0</flink-version>
        <scala.binary.version>2.11.2</scala.binary.version>

    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>${flink-version}</version>
        </dependency>
        <dependency>
            <groupId>junit</groupId>
            <artifactId>junit</artifactId>
            <version>4.11</version>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>

第一个,flink生产者示例代码:

package com.example;

import org.apache.commons.lang3.RandomStringUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;

import java.io.Serializable;
import java.util.Properties;

public class KafkaProducerExample {
    public static void main(String[] args) throws Exception {
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.245.200:9092");
        DataStream<String> stream = env.addSource(new SimpleStringGenerator());
        stream.addSink(new FlinkKafkaProducer<String>("test", new SimpleStringSchema(), props));
        env.execute();
    }
}


class SimpleStringGenerator implements SourceFunction<String>, Serializable {
    private static final long serialVersionUID = 1L;
    private volatile boolean isRunning = true;

    @Override
    public void run(SourceContext<String> ctx) throws Exception {
        while (isRunning) {
            String str = RandomStringUtils.randomAlphanumeric(5);
            ctx.collect(str);
            Thread.sleep(1000);
        }
    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

因为flink是生产者,需要启动一个kafka的消费者终端,然后运行本示例:
启动kafka

bin/kafka-server-start.sh config/server.properties &

启动一个kafka的消费者终端

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic test

终端内容
在这里插入图片描述
第二个,flink消费者示例代码:

package com.example;

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class KafkaConsumerApp {
    public static void main(String[] args) {
        try {
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            Properties properties = new Properties();
            properties.setProperty("bootstrap.servers", "master:9092");
            properties.setProperty("group.id", "flink");
            DataStream<String> stream = env.addSource(new FlinkKafkaConsumer<String>("test", new SimpleStringSchema(), properties));
            stream.map(new MapFunction<String, Object>() {
                @Override
                public Object map(String value) throws Exception {
                    return "flink: " + value;
                }
            }).print();
            env.execute("consumer");
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

为了测试,我们先开启一个生产者,不断往kafka中发送消息。

 kafka-console-producer.sh --broker-list master:9092 --topic test

终端
在这里插入图片描述

控制台
在这里插入图片描述
打印结果符合预期,flink与kafka结合的示例就演示完成了,主要的还是熟悉flink编程。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/780515.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

十二.Redis模拟集群搭建

配置环境 查看环境信息 127.0.0.1:6379> info replication #查看当前redis的信息 # Replication role:master #角色 master主机 connected_slaves:0 #从机数量为0 master_failover_state:no-failover master_replid:115f37a0ec195680ef754d6915738b0c0a05f450 master_replid…

学习UE的FArchive的最基础功能

目标 尝试FArchive的最基础功能。代码参考这里。 0. FArchive是什么&#xff1f; 源代码里FArchive的注释如下&#xff1a; Base class for archives that can be used for loading, saving, and garbage collecting in a byte order neutral way 它是 archive 的基类。&am…

【高并发基础】基本锁算法及原理

系列综述&#xff1a; &#x1f49e;目的&#xff1a;本系列是个人整理为了秋招面试的&#xff0c;整理期间苛求每个知识点&#xff0c;平衡理解简易度与深入程度。 &#x1f970;来源&#xff1a;材料主要源于多处理器编程的艺术进行的&#xff0c;每个知识点的修正和深入主要…

elasticsearch操作(命令方式)

说明&#xff1a;elasticsearch是目前最流行的搜索引擎&#xff0c;功能强大&#xff0c;非常受欢迎。特点是倒排索引&#xff0c;在海量数据的模糊查找领域&#xff0c;效率非常高。elasticsearch的安装和使用参考&#xff1a;http://t.csdn.cn/yJdZb。 本文介绍在es的索引库…

Blender自动化脚本,无人值守批量渲图/渲视频

渲染视频是个非常耗时的大工程&#xff0c;如果要渲染多个视频或者每个视频还需要切换不同的贴图、颜色等&#xff0c;工作量就更离谱了&#xff0c;所以不得不用脚本实现自动化。 Blender的脚本是用Python编写&#xff0c;比PS的js要方便很多。再下载一套Blender对应版本的AP…

十分钟掌握 Vim 编辑器核心功能

十分钟掌握 Vim 编辑器核心功能 文章目录 十分钟掌握 Vim 编辑器核心功能&#x1f468;‍&#x1f3eb;内容一&#xff1a;前言【Vim是什么】&#x1f468;‍&#x1f52c;内容二&#xff1a;Vim 常用模式&#x1f468;‍&#x1f680;内容三&#xff1a;基本操作&#x1f468;…

三十二章:Progressive Semantic Segmentation ——渐进式语义分割

0.摘要 这项工作的目标是在不过载GPU内存使用或丢失输出分割图中的细节的情况下对高分辨率图像进行分割。内存限制意味着我们必须对大图像进行降采样&#xff0c;或者将图像分为局部补丁进行分离处理。然而&#xff0c;前一种方法会丢失细节&#xff0c;而后一种方法由于缺乏全…

前端学习——Vue (Day1)

Vue 快速上手 Vue 是什么 创建 Vue 实例 Vue2官网&#xff1a;https://v2.cn.vuejs.org/ <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge…

Nexus3部署、配置+SpringBoot项目Demo

Docker部署Nexus 搜索Nexus3镜像&#xff1a;[rootlocalhost ~]# docker search nexus 拉取Nexus3镜像&#xff1a;[rootlocalhost ~]# docker pull sonatype/nexus3 启动Nexus3前查看虚拟机端口是否被占用&#xff1a;[rootlocalhost ~]# netstat -nultp 通过Docker Hub查看安…

MySQL——备份恢复

数据库备份&#xff0c;数据库为school&#xff0c;素材如下 1.创建student和score表 CREATE TABLE student ( id INT(10) NOT NULL UNIQUE PRIMARY KEY , name VARCHAR(20) NOT NULL , sex VARCHAR(4) , birth YEAR, department VARCHAR(20) , address …

【unity3D】水平方向上UI自动排列整齐(Horizontal Layout Group组件)

&#x1f497; 未来的游戏开发程序媛&#xff0c;现在的努力学习菜鸡 &#x1f4a6;本专栏是我关于游戏开发的学习笔记 &#x1f236;本篇是unity的Horizontal Layout Group Horizontal Layout Group 属性介绍属性详解使用以及效果展示补充 属性介绍 属性功能padding布局组边缘…

C语言——qsort函数的使用(详解)

qsort函数详解 前言&#xff1a;一、qsort函数的含义1.1 函数的参数1.2 参数的含义 二、用不同类型数据&#xff0c;测试sqort2.1 对数组内整数进行排序2.2对数组内浮点数进行排序2.3对字符串进行排序2.4对结构体进行排序 三、模拟实现qsort函数 前言&#xff1a; qsort&#…

RK3588平台开发系列讲解(Camera篇)V4L2 接口查询设备能力

文章目录 一、查询设备的基本信息二、查询设备支持的视频格式三、查询支持分辨率四、查询支持的帧率范围沉淀、分享、成长,让自己和他人都能有所收获!😄 📢在使用 V4L2 进行视频采集前,需要先通过查询设备能力来获取设备可以提供的视频格式、分辨率等信息。 一、查询设…

springboot 配置Knife4j- Swagger3.0

一、导入maven包 <dependency><groupId>com.github.xiaoymin</groupId><artifactId>knife4j-spring-boot-starter</artifactId><version>3.0.2</version></dependency> 二、配置config-swagger工具 package com.exceltotxt.…

MySQL连接查询与存储过程

一、连接查询1.1 内连接1.2 左连接1.3 右连接 二、存储过程2.1 概述2.2 简介2.3 优点2.4 语法2.5 举例2.5.1 创建存储过程2.5.2 调用存储过程2.5.3 查看存储过程2.5.4 存储过程的参数2.5.5 修改存储过程2.5.6 删除存储过程 三、总结 一、连接查询 MySQL 的连接查询&#xff0c…

Java集合是Set

HashSet集合 HashSet集合的特点 HashSet常用方法 ①&#xff1a;add(Object o)&#xff1a;向Set集合中添加元素&#xff0c;不允许添加重复数据。 ②&#xff1a;size()&#xff1a;返回Set集合中的元素个数 public class Test {public static void main(String[] args) {…

LeetCode107. 二叉树的层序遍历 II

107. 二叉树的层序遍历 II 文章目录 [107. 二叉树的层序遍历 II](https://leetcode.cn/problems/binary-tree-level-order-traversal-ii/)一、题目二、题解方法一&#xff1a;正常层序遍历翻转数组 一、题目 给你二叉树的根节点 root &#xff0c;返回其节点值 自底向上的层序…

HTTP Header定制,客户端使用Request,服务器端使用Response

在服务器端通过request.getHeaders()是无效的&#xff0c;只能使用response.getHeaders()。 Overridepublic Object beforeBodyWrite(Object body, MethodParameter returnType, MediaType mediaType,Class selectedConverterType, ServerHttpRequest request, ServerHttpRespo…

前端将css.html.js打包到一起在手机打开

过程我是按照下面的执行的&#xff0c;大家可以直接参考这个博客里的过程&#xff0c;下面我记录一下遇到的一些问题&#xff0c;我的电脑是mac 打包教程 1.执行命令npm install electron 在安装Electron时报错command sh -c node install.js 在指令后面添加 --ignore-scripts…

Linux 系统中异常与中断

文章目录 异常与中断的关系中断的处理流程异常向量表Linux 系统对中断的处理ARM 处理器程序运行的过程程序被中断时&#xff0c;怎么保存现场Linux 系统对中断处理的演进Linux 对中断的扩展&#xff1a;硬件中断、软件中断硬件中断软件中断 中断处理原则&#xff1a;耗时中断的…