https://dblab.xmu.edu.cn/post/spark-kafka-dashboard/
https://dblab.xmu.edu.cn/post/8116/
实验环境准备
Kafka安装
访问Kafka官方下载页面,下载稳定版本0.10.1.0的kafka.此安装包内已经附带zookeeper,不需要额外安装zookeeper.按顺序执行如下步骤:
cd ~/下载
sudo tar -zxf kafka_2.11-2.4.1.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.11-2.4.1/ ./kafka
sudo chown -R hadoop ./kafka
然而按照教程的版本会出bug,所以我们选择比较新的版本:
cd ~/下载
sudo tar -zxf kafka_2.12-3.2.0.tgz -C /usr/local
cd /usr/local
sudo mv kafka_2.12-3.2.0/ ./kafka
sudo chown -R hadoop ./kafka
测试简单实例
接下来在Ubuntu系统环境下测试简单的实例。Mac系统请自己按照安装的位置,切换到相应的指令。按顺序执行如下命令:
# 进入kafka所在的目录
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
命令执行后不会返回Shell命令输入状态,zookeeper就会按照默认的配置文件启动服务,请千万不要关闭当前终端.启动新的终端,输入如下命令:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
kafka服务端就启动了,请千万不要关闭当前终端。启动另外一个终端,输入如下命令:
cd /usr/local/kafka
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic dblab
topic是发布消息发布的category,以单节点的配置创建了一个叫dblab的topic.可以用list列出所有创建的topics,来查看刚才创建的主题是否存在。
bin/kafka-topics.sh --list --zookeeper localhost:2181
可以在结果中查看到dblab这个topic存在。接下来用producer生产点数据:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic dblab
并尝试输入如下信息:
hello hadoop
hello xmu
hadoop world
然后再次开启新的终端或者直接按CTRL+C退出。然后使用consumer来接收数据,输入如下命令:
cd /usr/local/kafka
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic dblab --from-beginning
./bin/kafka-console-consumer.sh --bootstrap-server localhost:2181 --topic codesheep --from-beginning
便可以看到刚才产生的三条信息。说明kafka安装成功。
数据处理和Python操作Kafka
数据预处理
数据集如下:
用户行为日志user_log.csv,日志中的字段定义如下:
- user_id | 买家id
- item_id | 商品id
- cat_id | 商品类别id
- merchant_id | 卖家id
- brand_id | 品牌id
- month | 交易时间:月
- day | 交易事件:日
- action | 行为,取值范围{0,1,2,3},0表示点击,1表示加入购物车,2表示购买,3表示关注商品
- age_range | 买家年龄分段:1表示年龄=50,0和NULL则表示未知
- gender | 性别:0表示女性,1表示男性,2和NULL表示未知
- province| 收获地址省份
数据具体格式如下:
user_id,item_id,cat_id,merchant_id,brand_id,month,day,action,age_range,gender,province
328862,323294,833,2882,2661,08,29,0,0,1,内蒙古
328862,844400,1271,2882,2661,08,29,0,1,1,山西
328862,575153,1271,2882,2661,08,29,0,2,1,山西
328862,996875,1271,2882,2661,08,29,0,1,1,内蒙古
328862,1086186,1271,1253,1049,08,29,0,0,2,浙江
328862,623866,1271,2882,2661,08,29,0,0,2,黑龙江
328862,542871,1467,2882,2661,08,29,0,5,2,四川
328862,536347,1095,883,1647,08,29,0,7,1,吉林
这个案例实时统计每秒中男女生购物人数,因此针对每条购物日志,我们只需要获取gender即可,然后发送给Kafka,接下来Structured Streaming再接收gender进行处理。
执行如下Shell命令来安装Python操作Kafka的代码库:
conda install kafka-python
之后是producer.py的代码:
# coding: utf-8
import csv
import time
from kafka import KafkaProducer
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
# 打开数据文件
csvfile = open("../data/user_log.csv","r")
# 生成一个可用于读取csv文件的reader
reader = csv.reader(csvfile)
for line in reader:
gender = line[9] # 性别在每行日志代码的第9个元素
if gender == 'gender':
continue # 去除第一行表头
time.sleep(0.1) # 每隔0.1秒发送一行数据
# 发送数据,topic为'sex'
producer.send('sex',line[9].encode('utf8'))
上述代码很简单,首先是先实例化一个Kafka生产者。然后读取用户日志文件,每次读取一行,接着每隔0.1秒发送给Kafka,这样1秒发送10条购物日志。这里发送给Kafka的topic为’sex’。
python操作kafka
我们可以写一个KafkaConsumer测试数据是否投递成功,代码如下,文件名为consumer.py:
from kafka import KafkaConsumer
consumer = KafkaConsumer('sex')
for msg in consumer:
print((msg.value).decode('utf8'))
在开启上述KafkaProducer和KafkaConsumer之前,需要先开启Kafka,命令如下:
cd /usr/local/kafka
bin/zookeeper-server-start.sh config/zookeeper.properties
打开一个新的命令行窗口,输入命令如下:
cd /usr/local/kafka
bin/kafka-server-start.sh config/server.properties
在Kafka开启之后,即可开启KafkaProducer和KafkaConsumer。
这时,你会看到屏幕上会输出一行又一行的数字,类似下面的样子:
Structured Streaming实时处理数据
配置Spark开发Kafka环境
下载Spark连接Kafka的代码库。然后把下载的代码库放到目录/usr/local/spark/jars目录下,命令如下:
sudo mv ~/下载/spark-streaming_2.12-3.2.0.jar /usr/local/spark/jars
sudo mv ~/下载/spark-streaming-kafka-0-10_2.12-3.2.0.jar /usr/local/spark/jars
然后在/usr/local/spark/jars目录下新建kafka目录,把/usr/local/kafka/libs下所有函数库复制到/usr/local/spark/jars/kafka目录下,命令如下
cd /usr/local/spark/jars
mkdir kafka
cd kafka
cp /usr/local/kafka/libs/* .
然后,修改 Spark 配置文件,命令如下:
cd /usr/local/spark/conf
sudo vim spark-env.sh
把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh,修改后的 spark-env.sh 类似如下:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
因为我使用的是anaconda中创建的python环境,所以介绍一下,怎么为spark设置python环境。
需要修改conf目录下的spark_env.sh:在这个文件的开头添加:
export PYSPARK_PYTHON=/home/hadoop/anaconda3/envs/py37/bin/python
#其中,py37应该为自行创建的环境名称
!需要注意的是,作者在给出代码的时候/usr/local/hadoop/bin/hadoopclasspath这一段是不用写的,如果直接复制上去会报错,去掉之后就好了。
建立pySpark项目
首先在/usr/local/spark/mycode新建项目目录
cd /usr/local/spark/mycode
mkdir kafka
然后在kafka这个目录下创建一个kafka_test.py文件。
from kafka import KafkaProducer
from pyspark.streaming import StreamingContext
#from pyspark.streaming.kafka import KafkaUtils
from pyspark import SparkConf, SparkContext
import json
import sys
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import window
from pyspark.sql.types import StructType, StructField
from pyspark.sql.types import TimestampType, StringType
from pyspark.sql.functions import col, column, expr
def KafkaWordCount(zkQuorum, group, topics, numThreads):
spark = SparkSession \
.builder \
.appName("KafkaWordCount") \
.getOrCreate()
spark.sparkContext.setLogLevel("ERROR")
topicAry = topics.split(",")
# 将topic转换为hashmap形式,而python中字典就是一种hashmap
topicMap = {}
for topic in topicAry:
topicMap[topic] = numThreads
#lines = KafkaUtils.createStream(ssc, zkQuorum, group, topicMap).map(lambda x : x[1])
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "sex") \
.load()
df.selectExpr( "CAST(timestamp AS timestamp)","CAST(value AS STRING)")
#lines = df.selectExpr("CAST(value AS STRING)")
windowedCounts = df \
.withWatermark("timestamp", "1 seconds") \
.groupBy(
window(col("timestamp"), "1 seconds" ,"1 seconds"),
col("value")) \
.count()
wind = windowedCounts.selectExpr( "CAST(value AS STRING)","CAST(count AS STRING)")
query = wind.writeStream.option("checkpointLocation", "/check").outputMode("append").foreach(sendmsg).start()
query.awaitTermination()
query.stop()
# 格式转化,将格式变为[{1: 3}]
def Get_dic(row):
res = []
#for elm in row:
tmp = {row[0]: row[1]}
res.append(tmp)
print(res)
return json.dumps(res)
def sendmsg(row):
print(row)
if row.count != 0:
msg = Get_dic(row)
# 实例化一个KafkaProducer示例,用于向Kafka投递消息
producer = KafkaProducer(bootstrap_servers='localhost:9092')
producer.send("result", msg.encode('utf8'))
# 很重要,不然不会更新
producer.flush()
if __name__ == '__main__':
# 输入的四个参数分别代表着
# 1.zkQuorum为zookeeper地址
# 2.group为消费者所在的组
# 3.topics该消费者所消费的topics
# 4.numThreads开启消费topic线程的个数
if (len(sys.argv) < 5):
print("Usage: KafkaWordCount <zkQuorum> <group> <topics> <numThreads>")
exit(1)
zkQuorum = sys.argv[1]
group = sys.argv[2]
topics = sys.argv[3]
numThreads = int(sys.argv[4])
print(group, topics)
KafkaWordCount(zkQuorum, group, topics, numThreads)
代码功能:
- 首先按每秒的频率读取Kafka消息;
- 然后对每秒的数据执行wordcount算法,统计出0的个数,1的个数,2的个数;
- 最后将上述结果封装成json发送给Kafka。
在运行代码之前,先启动hadoop:
cd /usr/local/hadoop #这是hadoop的安装目录
./sbin/start-dfs.sh
运行项目
编写好程序之后,接下来编写运行脚本,在/usr/local/spark/mycode/kafka目录下新建startup.sh文件,输入如下内容:
/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 /usr/local/spark/mycode/kafka/kafka_test.py 127.0.0.1:2181 1 sex 1
其中最后四个为输入参数,含义如下
- 127.0.0.1:2181为Zookeeper地址
- 1 为consumer group标签
- sex为消费者接收的topic
- 1 为消费者线程数
sh startup.sh
最后在/usr/local/spark/mycode/kafka目录下,运行如下命令即可执行刚编写好的Structured Streaming程序
报错:
结果展示
利用Flask创建web程序,利用Flask-SocketIO实现实时推送数据,利用socket.io.js实现实时接收数据,hightlights.js展现数据
Flask-SocketIO实时推送数据
首先我们创建如图中的app.py文件,app.py的功能就是作为一个简易的服务器,处理连接请求,以及处理从kafka接收的数据,并实时推送到浏览器。app.py的代码如下:
import json
from flask import Flask, render_template
from flask_socketio import SocketIO
from kafka import KafkaConsumer
# 因为第一步骤安装好了flask,所以这里可以引用
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
socketio = SocketIO(app)
thread = None
# 实例化一个consumer,接收topic为result的消息
consumer = KafkaConsumer('result')
# 一个后台线程,持续接收Kafka消息,并发送给客户端浏览器
def background_thread():
girl = 0
boy = 0
for msg in consumer:
data_json = msg.value.decode('utf8')
data_list = json.loads(data_json)
for data in data_list:
if '0' in data.keys():
girl = data['0']
elif '1' in data.keys():
boy = data['1']
else:
continue
result = str(girl) + ',' + str(boy)
print(result)
socketio.emit('test_message', {'data': result})
# 客户端发送connect事件时的处理函数
@socketio.on('test_connect')
def connect(message):
print(message)
global thread
if thread is None:
# 单独开启一个线程给客户端发送数据
thread = socketio.start_background_task(target=background_thread)
socketio.emit('connected', {'data': 'Connected'})
# 通过访问http://127.0.0.1:5000/访问index.html
@app.route("/")
def handle_mes():
return render_template("index.html")
# main函数
if __name__ == '__main__':
socketio.run(app, debug=True)
这段代码实现比较简单,最重要就是background_thread函数,该函数从Kafka接收消息,并进行处理,获得男女生每秒钟人数,然后将结果通过函数socketio.emit实时推送至浏览器。
浏览器获取数据并展示
index.html文件负责获取数据并展示效果,该文件中的代码内容如下:
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>DashBoard</title>
<script src="static/js/socket.io.js"></script>
<script src="static/js/jquery-3.1.1.min.js"></script>
<script src="static/js/highcharts.js"></script>
<script src="static/js/exporting.js"></script>
<script type="text/javascript" charset="utf-8">
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {
socket.emit('test_connect', {data: 'I\'m connected!'});
});
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
</head>
<body>
<div>
<b>Girl: </b><b id="girl"></b>
<b>Boy: </b><b id="boy"></b>
</div>
<div id="container" style="width: 600px;height:400px;"></div>
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // don't animate in old IE
marginRight: 10,
events: {
load: function () {
// set up the updating of the chart each second
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime(), // current time
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: {
text: '男女生购物人数实时分析'
},
xAxis: {
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '数量'
},
plotLines: [{
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
formatter: function () {
return '<b>' + this.series.name + '</b><br/>' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生购物人数',
data: (function () {
// generate an array of random data
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
</script>
</body>
</html>
socket.io.js
在index.html中包含了如下一段代码,就是用来调用socket.io.js和socket.io.js.map这两个js库文件的:
<script type="text/javascript" charset="utf-8">
// 创建连接服务器的链接
var socket = io.connect('http://' + document.domain + ':' + location.port);
socket.on('connect', function() {// 连上服务器后的回调函数
socket.emit('connect', {data: 'I\'m connected!'});
});
// 接收服务器实时发送的数据
socket.on('test_message',function(message){
console.log(message);
var obj = eval(message);
var result = obj["data"].split(",");
// 将男生和女生人数展示在html标签内
$('#girl').html(result[0]);
$('#boy').html(result[1]);
});
socket.on('connected',function(){
console.log('connected');
});
// 链接断开时的回调函数
socket.on('disconnect', function () {
console.log('disconnect');
});
</script>
highchart.js
在index.html中包含如下一段代码,就是调用highcharts.js库,来实时地从html标签内获取数据并展示在网页中。
<script type="text/javascript">
$(document).ready(function () {
Highcharts.setOptions({
global: {
useUTC: false
}
});
Highcharts.chart('container', {
chart: {
type: 'spline',
animation: Highcharts.svg, // 这个在ie浏览器可能不支持
marginRight: 10,
events: {
load: function () {
//设置图表每秒更新一次
var series1 = this.series[0];
var series2 = this.series[1];
setInterval(function () {
var x = (new Date()).getTime();// 获取当前时间
count1 = $('#girl').text();
y = parseInt(count1);
series1.addPoint([x, y], true, true);
count2 = $('#boy').text();
z = parseInt(count2);
series2.addPoint([x, z], true, true);
}, 1000);
}
}
},
title: { //设置图表名
text: '男女生购物人数实时分析'
},
xAxis: { //x轴设置为实时时间
type: 'datetime',
tickPixelInterval: 50
},
yAxis: {
title: {
text: '数量'
},
plotLines: [{ //设置坐标线颜色粗细
value: 0,
width: 1,
color: '#808080'
}]
},
tooltip: {
//规范显示时间的格式
formatter: function () {
return '<b>' + this.series.name + '</b><br/>' +
Highcharts.dateFormat('%Y-%m-%d %H:%M:%S', this.x) + '<br/>' +
Highcharts.numberFormat(this.y, 2);
}
},
legend: {
enabled: true
},
exporting: {
enabled: true
},
series: [{
name: '女生购物人数',
data: (function () {
// 随机方式生成初始值填充图表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
},
{
name: '男生购物人数',
data: (function () {
// 随机方式生成初始值填充图表
var data = [],
time = (new Date()).getTime(),
i;
for (i = -19; i <= 0; i += 1) {
data.push({
x: time + i * 1000,
y: Math.random()
});
}
return data;
}())
}]
});
});
exporting.js
exporting.js这个库文件的功能是实现导出功能。
效果展示
python app.py
问题与处理
问题1 spark-env.sh添加代码错误
把 Kafka 相关 jar 包的路径信息增加到 spark-env.sh,修改后的 spark-env.sh 类似如下:
原代码:
export SPARK_DIST_CLASSPATH=$(/usr/local/hadoop/bin/hadoopclasspath):/usr/local/spark/jars/kafka/*:/usr/local/kafka/libs/*
!需要注意的是,作者在给出代码的时候/usr/local/hadoop/bin/hadoopclasspath这一段是不用写的,如果直接复制上去会报错,去掉之后就好了。
问题2 相关包没有安装
pip install gevent-websocket -i https://pypi.tuna.tsinghua.edu.cn/simple pip --trusted-host pypi.tuna.tsi
如上图,根据报错安装好包就行了。
问题3 java版本不一致
通过代码
java -version
javac -version
检查
这里经过检查没有问题。
问题4 启动时报错
究其原因,是步骤三复现出了问题: