Spark+Kafka构建实时分析Dashboard

news2024/11/25 22:59:57

Spark+Kafka构建实时分析Dashboard

  • 说明
  • 一、案例介绍
  • 二、实验环境准备
    • 1、实验系统和软件要求
    • 2、系统和软件的安装
      • (1)安装Spark
      • (2)安装Kafka
      • (3)安装Python
      • (4)安装Python依赖库
      • (5)安装PyCharm
  • 三、数据处理和Python操作Kafka
  • 四、Structured Streaming实时处理数据
    • 1、配置Spark开发Kafka环境
    • 2、建立pySpark项目
    • 3、运行项目
    • 4、测试程序
  • 五、结果展示
    • 1、Flask-SocketIO实时推送数据
    • 2、浏览器获取数据并展示
    • 3、效果展示
    • 4、相关问题的解决方法

说明

Spark+Kafka构建实时分析Dashboard【林子雨】
官方实验步骤:https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/
前几天刚做完这个实验,学了不少知识,也遇到了不少问题,在这里记录一下自己的实验过程,与小伙伴们一起探讨下。

一、案例介绍

案例概述(详情见官网)

二、实验环境准备

1、实验系统和软件要求

Ubuntu: 16.04
Spark: 3.2.0  (注意Spark版本,官网教程安装的Spark 2.1.0版本亲测不行,后面实验莫名报错)
kafka: 2.8.0
Python: 3.7.13
Flask: 1.1.2
Flask-SocketIO: 5.1.0 (这个版本也不行的,不过后面有解决方法)
kafka-python: 2.0.2

2、系统和软件的安装

(1)安装Spark

详细步骤见官网教程:Spark的安装和使用
安装Hadoop
①、创建hadoop用户

# a、创建 hadoop 用户,并使用 /bin/bash 作为 shell
sudo useradd -m hadoop -s /bin/bash
# b、设置用户密码
sudo passwd hadoop
# c、为 hadoop 用户增加管理员权限
sudo adduser hadoop sudo

在这里插入图片描述
②、更新apt

sudo apt-get update

在这里插入图片描述
③、安装vi/vim

sudo apt-get install vim

在这里插入图片描述
④、安装Java环境

cd /usr/lib
sudo mkdir jvm #创建/usr/lib/jvm目录用来存放JDK文件
cd ~ #进入hadoop用户的主目录
cd Downloads  #注意区分大小写字母,刚才已经通过FTP软件把JDK安装包jdk-8u162-linux-x64.tar.gz上传到该目录下
sudo tar -zxvf ./jdk-8u162-linux-x64.tar.gz -C /usr/lib/jvm  #把JDK文件解压到/usr/lib/jvm目录下

在这里插入图片描述
设置环境变量

cd ~
vim ~/.bashrc

在这里插入图片描述
在这里插入图片描述
查看是否安装成功

source ~/.bashrc  # 让配置文件生效
java -version

在这里插入图片描述
这里官网还有其他两种方法安装JDK,其中第二种方法我试过了,不太行。
⑤、安装 Hadoop
这里我没使用 md5 等检测工具校验文件完整性,因为我比较懒。

# 解压到/usr/local中
sudo tar -zxf ~/下载/hadoop-2.6.0.tar.gz -C /usr/local    
cd /usr/local/
# 将文件夹名改为hadoop
sudo mv ./hadoop-2.6.0/ ./hadoop
# 修改文件权限          
sudo chown -R hadoop ./hadoop      

在这里插入图片描述
输入如下命令来检查 Hadoop 是否可用,成功则会显示 Hadoop 版本信息:

cd /usr/local/hadoop
./bin/hadoop version

在这里插入图片描述
Hadoop伪分布式配置
修改2个配置文件 core-site.xml 和 hdfs-site.xml。

vim ./etc/hadoop/core-site.xml

在这里插入图片描述

vim ./etc/hadoop/hdfs-site.xml

在这里插入图片描述
配置完成后,执行 NameNode 的格式化:

cd /usr/local/hadoop
./bin/hdfs namenode -format

在这里插入图片描述
开启 NameNode 和 DataNode 守护进程

cd /usr/local/hadoop
./sbin/start-dfs.sh  

在这里插入图片描述
通过命令 jps 来判断是否成功启动
在这里插入图片描述
安装 Spark
这里建议安装Spark3.2.0以上版本,我之前安装Spark2.4.0,后面执行程序时还是报错。

sudo tar -zxf ~/下载/spark-3.2.0-bin-without-hadoop.tgz -C /usr/local/
cd /usr/local
sudo mv ./spark-3.2.0-bin-without-hadoop/ ./spark
sudo chown -R hadoop:hadoop ./spark          

在这里插入图片描述
修改Spark的配置文件spark-env.sh。

cd /usr/local/spark
cp ./conf/spark-env.sh.template ./conf/spark-env.sh
vim ./conf/spark-env.sh

在这里插入图片描述
在这里插入图片描述
通过运行Spark自带的示例,验证Spark是否安装成功。

cd /usr/local/spark
bin/run-example SparkPi 2>&1 | grep "Pi is"

在这里插入图片描述

(2)安装Kafka

详细步骤见官网教程:Kafka的安装和简单实例测试

cd ~/下载
sudo tar -zxf kafka_2.12-2.8.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-2.8.0/ ./kafka
sudo chown -R hadoop ./kafka

在这里插入图片描述
测试简单实例

# 进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties

在这里插入图片描述
启动新的终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties

在这里插入图片描述
启动另外一个终端,输入如下命令:

cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
bin/kafka-topics.sh --list --zookeeper localhost:2181  

在这里插入图片描述
接下来用producer生产点数据

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab

在这里插入图片描述
使用consumer来接收数据,输入如下命令:

cd /usr/local/kafka
# 官网那个命令失效了,用下面这个命令
bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic dblab --from-beginning  

在这里插入图片描述

(3)安装Python

详细步骤见官网教程:Anaconda的下载和使用方法
安装Anaconda方便切换python版本,而且也方便后面安装依赖。如果自己的python版本匹配的话,不安装其实也没啥问题。
在这里插入图片描述
安装过程有是否运行conda初始化,这里一定要输入yes,然后回车。教程是这样说的,但是当时实验的时候,我没留意它的安装进度,啪的一下,它就给我自动选择了no,然后刷新环境变量,(base)也不显示。如果你也遇到这个问题,莫慌,用下面命令初始化即可。

cd ~/anaconda3/bin/
/home/hadoop/anaconda3/bin/conda init

在这里插入图片描述
使用如下命令刷新环境变量

source ~/.bashrc

之后会发现命令行前面出现(base)的字样,就代表已经安装成功,并且环境变量和默认python都已经装配好。

我安装的python版本是3.7.13,用conda指令安装和切换即可:
在这里插入图片描述
切换版本:
在这里插入图片描述

(4)安装Python依赖库

这里一定要先切换到你要使用的python版本。

conda install flask
conda install flask-socketio
conda install kafka-python # 这个命令已经失效了

运行上面第三条指令时候寄了,提示我到下面这个网站下载。
所以咱可以用新的命令下载kafka-python依赖库。

conda install -c conda-forge kafka-python

在这里插入图片描述

(5)安装PyCharm

cd ~  #进入当前hadoop用户的主目录
sudo tar -zxvf ~/下载/pycharm-community-2016.3.5.tar.gz -C /usr/local  #把pycharm解压缩到/usr/local目录下
cd /usr/local
sudo mv pycharm-community-2016.3.5 pycharm  #重命名
sudo chown -R hadoop ./pycharm  #把pycharm目录权限赋予给当前登录Ubuntu系统的hadoop用户

在这里插入图片描述
执行如下命令启动PyCharm

cd /usr/local/pycharm
./bin/pycharm.sh  #启动PyCharm

在这里插入图片描述

三、数据处理和Python操作Kafka

详情见官网教程:数据处理和Python操作Kafka
写如下Python代码,文件名为producer.py:
在这里插入图片描述
写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py:
在这里插入图片描述
在开启KafkaProducer和KafkaConsumer之前,需要先开启Kafka。
点击Run 'producer’以及Run ‘consumer’,来运行生产者和消费者。如果生产者和消费者运行成功,则在consumer窗口会输出如下信息:
在这里插入图片描述

四、Structured Streaming实时处理数据

1、配置Spark开发Kafka环境

在这里插入图片描述
在/usr/local/spark/jars目录下新建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下:

cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .

在这里插入图片描述
修改 Spark 配置文件,命令如下:
在这里插入图片描述
在这里插入图片描述

2、建立pySpark项目

在kafka这个目录下创建一个kafka_test.py文件
在这里插入图片描述

3、运行项目

编写好程序之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
在这里插入图片描述
运行如下命令即可执行刚编写好的Structured Streaming程序:

sh startup.sh

在这里插入图片描述

4、测试程序

下面开启之前编写的KafkaProducer投递消息,然后将KafkaConsumer中接收的topic改为result,验证是否能接收topic为result的消息,更改之后的KafkaConsumer为:
在这里插入图片描述
在同时开启Structured Streaming项目,KafkaProducer以及KafkaConsumer之后,可以在KafkaConsumer运行窗口看到如下输出:
在这里插入图片描述

五、结果展示

1、Flask-SocketIO实时推送数据

创建如图中的app.py文件,app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
在这里插入图片描述

2、浏览器获取数据并展示

index.html文件负责获取数据并展示效果,该文件中的代码内容如下:
在这里插入图片描述

3、效果展示

经过以上步骤,一切准备就绪,我们就可以启动程序来看看最后的效果。启动步骤如下:
①、确保kafka开启。
②、开启producer.py模拟数据流。
③、启动Structured Streaming实时处理数据。
④、启动app.py。
启动后的效果如下图:
在这里插入图片描述
用浏览器访问上图中给出的网址 http://127.0.0.1:5000/ ,就可以看到最终效果图了。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

4、相关问题的解决方法

①、运行app.py报错:WebSocket transport not available. Install simple-websocket for improved performance.
解决方法:在使用的python环境下执行以下命令

pip install simple-websocket

②、运行app.py报错:
在这里插入图片描述
解决方法:flask-socketio版本问题,将版本改为4.3.1版本。
a、删除原先版本

conda remove flask-socketio

在这里插入图片描述
b、重新下载新版本

conda install flask-socketio==4.3.1

在这里插入图片描述
③、运行app.py报错:ImportError:cannot import name 'run_with_reloader' from 'werkzeug.serving'
在这里插入图片描述
解决方法:在使用的python环境下执行以下命令。

pip3 install --upgrade flask==1.1.4
pip3 install --upgrade Werkzeug==1.0.1
pip3 install --upgrade itsdangerous==1.1.0
pip3 install --upgrade Jinja2==2.11.2
pip3 install --upgrade MarkupSafe==2.0.1

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/574311.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

数据仓库hive本地/远程模式安装

文章目录 1.简述2.本地模式2.1安装包准备2.2解压安装mysql2.2.1 解压mysql安装包2.2.2 安装mysql相关组件2.2.3 修改my.cnf配置文件2.2.4 更改所属组2.2.5 启动mysql服务 2.3 hive解压安装及配置2.3.1 解压2.3.2 改名2.3.3 配置环境变量2.3.4 添加hive-site.xml配置文件2.3.5 放…

《Spring Guides系列学习》guide6 - guide10

要想全面快速学习Spring的内容,最好的方法肯定是先去Spring官网去查阅文档,在Spring官网中找到了适合新手了解的官网Guides,一共68篇,打算全部过一遍,能尽量全面的了解Spring框架的每个特性和功能。 接着上篇看过的gu…

VIBRO METER 带缓冲输出的机械监控系统接口套件

高质量、高可靠性的接口套件,用于现有机械监控系统的缓冲“原始”传感器输出信号。该接口套件支持多达25个通道,可以是动态或转速计(速度)信号。接口套件采用DIN导轨安装,通常安装在装有VM600或VibroSmart机械监控系统的外壳中。 特征 支持…

为 Kotlin 的函数添加作用域限制(以 Compose 为例)

前言 不知道各位是否已经开始了解 Jetpack Compose? 如果已经开始了解并且上手写过。那么,不知道你们有没有发现,在 Compose 中对于作用域(Scopes)的应用特别多。比如, weight 修饰符只能用在 RowScope 或…

docker快速部署hue+hue集成hive

首先需要安装hive,hive的安装在HIVE的安装与配置_EEEurekaaa!的博客-CSDN博客 安装完成之后,使用脚本命令启动hdfs和hive的相关服务。 一、安装docker # 安装yum-config-manager配置工具 $ yum -y install yum-utils # 设置yum源 $ yum-co…

《俞军产品方法论》- 站在更高的角度来拓展产品经理的内涵和边界

关于作者 俞军,互联网产品大神级人物。他是早年百度唯一的产品经理,主持了百度搜索这款产品的无数次进化,并主持设计了百度贴吧、百度 知道等世界级创新产品,后来又成为滴滴出行的产品负责人。他的 “ 俞军产品经理十二条 ” &a…

TexSAW|2023|Cryptography&Misc|WP

Cryptography|Crack the crime 用 nc 连上后,直接得到第一题 是一个简单的base64加密,解密如下: Meet in dubai on Tuesday 填入之后可获得第二题 猜测是古典加密,随后经过N次尝试后发现是rot13加密,解密…

蓝牙耳机怎么选?蓝牙耳机哪些性价比高?2023年蓝牙耳机推荐,蓝牙耳机品牌排行榜,蓝牙耳机最全选购指南

蓝牙耳机怎么选?蓝牙耳机哪些性价比高?2023年蓝牙耳机推荐,蓝牙耳机品牌排行榜,蓝牙耳机最全选购指南 观前提醒,本文中你将会了解到: |蓝牙耳机抄作业环节 |蓝牙耳机基础知识 &a…

快速上手kettle

一、前言 最近由于工作需要,需要用到kettle工具进行数据迁移转换。特意找资料学习了一下,kettle基本操作算是学会了。 所学的也结合实际工作进行了验证。为了防止以后用到忘记了,便写了几篇文章记录一下。 二 、ETL简介 ETL ( Extract-Tran…

synchronized 关键字和 volatile 关键字有什么区别?

synchronized 关键字和 volatile 关键字有什么区别? 在 Java 中,synchronized 关键字和 volatile 关键字都可以用来实现线程安全,但是它们有不同的用途和实现方式。本文将介绍 synchronized 关键字和 volatile 关键字的区别,包括…

二肽二氨基丁酰苄基酰胺二乙酸盐/Dipeptide Diaminobutyroyl Benzylamide Diacetate/SYN-AKE

作用机理----二肽二氨基丁酰苄基酰胺二乙酸盐 类蛇毒三肽通过松弛面部肌肉而作为有效的平滑和祛皱活性产品, 该活性三肽作用方式与 Temple Viper 毒蛇毒液的神经肌肉阻断化合物Waglerin 1 一致。类蛇毒三肽作用于突触后膜, 是肌肉烟碱乙酰胆碱受体(nmAChR)可逆转的拮抗剂。类蛇…

docker安装单机nacos、rocketmq、reids、xxl-job、minio、elasticsearch、kibana

启动容器报错 直接删除那个name后边的就可以 安装nacos 首先需要拉取对应的镜像文件:docker pull nacos/nacos-server 挂载目录: mkdir -p /mydata/nacos/logs/ #新建logs目录mkdir -p /mydata/nacos/init.d/ vim /myda…

使用 Kotlin 的 Opt-in (选择加入)功能注解API提示当前非稳定API

前言 之前在给公司项目封装库的时候,领导告诉我封装的漂亮一点,等以后公司发展起来了可能需要把这个库提供给第三方接入使用。 此时,就有这么一个问题:某些功能函数使用条件比较苛刻,直接使用可能会出现意想不到的后…

Mock.js 的语法规范学习

Mock.js 有一套完整的语法规范,可以好好学学。 Mock.js 的语法规范包括两部分: 数据模板定义规范(Data Template Definition,DTD) 数据占位符定义规范(Data Placeholder Definition,DPD) 数…

【mediasoup】12: ChannelRequest控制指令

rust 是把worker 当做lib 调用的。node是当做一个进程每一个ChannelRequest 就是一个外部发给worker的控制指令worker要负责处理。控制指令的处理实际是worker做的,worker可能立即执行,可能交给对应的handler去处理 worker根据指令id 来处理 处理完毕后才发消息ack 给控制侧 …

# Spring Boot 中如何使用 Spring Cloud Sleuth 来实现分布式跟踪?

Spring Boot 中如何使用 Spring Cloud Sleuth 来实现分布式跟踪? 在微服务架构中,通常会有多个服务相互协作,为了方便排查问题,我们需要对服务之间的调用进行跟踪。Spring Cloud Sleuth 是 Spring Cloud 生态中的分布式跟踪解决方…

charles使用

charles​ 一、概念​ charles是一款非常优秀的抓包工具,全平台支持,在mac,windows,linux上都可以使用,既可以抓 取web端的包,也可以抓app端的包。 ​ charles主要的功能包括如下几点: ​ 截取…

Linux网络服务:SSH远程访问及控制2

目录 一、理论 1.构建密钥对验证的SSH体系 2.TCP Wrappers访问控制 二、实验 1.ecdsa免密连接 2.rsa免密连接 一、理论 1.构建密钥对验证的SSH体系 (1)免密连接原理 ① 手动添加客户端的公钥到服务端 ② 服务端收到客户端的公钥后使用客户端公钥…

C++——引用

引用的概念 初步理解:引用相当于给变量取了一个别名,它和引用的变量共用同一块空间。 就好比孙悟空有很多外号,例如孙行者,齐天大圣,斗战胜佛,但是它们所指都是孙悟空。同样的,如果齐天大圣大…

如何在 Ubuntu 22.04 上安装 Python Pip?

Python Pip 是 Python 的包管理器,它允许您轻松地安装和管理 Python 包和库。在 Ubuntu 22.04 上安装 Python Pip 是非常简单的。 本文将详细介绍如何在 Ubuntu 22.04 上安装 Python Pip,并为您提供逐步指南。 步骤 1:更新软件包列表 在安装…