1、数据模拟器代码
-
1- 创建一个topic, 放置后续物联网的数据 search-log-topic
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --create --topic search-log-topic --partitions 3 --replication-factor 2
-
2- 将代码放置到项目中:
import json
import random
import sys
import time
import os
from kafka import KafkaProducer
from kafka.errors import KafkaError
# 锁定远端操作环境, 避免存在多个版本环境的问题
os.environ['SPARK_HOME'] = '/export/server/spark'
os.environ["PYSPARK_PYTHON"] = "/root/anaconda3/bin/python"
os.environ["PYSPARK_DRIVER_PYTHON"] = "/root/anaconda3/bin/python"
# 快捷键: main 回车
if __name__ == '__main__':
print("模拟物联网数据")
# 1- 构建一个kafka的生产者:
producer = KafkaProducer(
bootstrap_servers=['node1:9092', 'node2:9092', 'node3:9092'],
acks='all',
value_serializer=lambda m: json.dumps(m).encode("utf-8")
)
# 2- 物联网设备类型
deviceTypes = ["洗衣机", "油烟机", "空调", "窗帘", "灯", "窗户", "煤气报警器", "水表", "燃气表"]
while True:
index = random.choice(range(0, len(deviceTypes)))
deviceID = f'device_{index}_{random.randrange(1, 20)}' # 设备ID
deviceType = deviceTypes[index] # 设备类型
deviceSignal = random.choice(range(10, 100)) # 设备信号
# 组装数据集
print({'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
'time': time.strftime('%s')})
# 发送数据
producer.send(topic='search-log-topic',
value={'deviceID': deviceID, 'deviceType': deviceType, 'deviceSignal': deviceSignal,
'time': time.strftime('%s')}
)
# 间隔时间 5s内随机
time.sleep(random.choice(range(1, 5)))
-
测试, 观察是否可以正常生成:
./kafka-topics.sh --bootstrap-server node1.itcast.cn:9092,node2.itcast.cn:9092 --describe --topic search-log-topic
2、需求说明
目前咱们有一个模拟器程序, 可以向Kafka不断的写入数据
要做的是, 用Spark的结构化流接收数据, 并且对数据进行统计分析操作:
-
求: 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度
需求分析:
1- 需要按照设备类型进行分组,也就是维度是设备类型deviceType
2- 指标
设备数量:deviceID
平均信号强度:deviceSignal
示例数据:
{'deviceID': 'device_1_1', 'deviceType': '油烟机', 'deviceSignal': 23, 'time': '1668848417'} {'deviceID': 'device_0_4', 'deviceType': '洗衣机', 'deviceSignal': 55, 'time': '1668848418'}
deviceID: 设备ID deviceType: 设备类型 deviceSignal: 设备信号 time : 设备发送时间戳
3、代码实现
from pyspark import SparkConf, SparkContext import os from pyspark.sql import SparkSession import pyspark.sql.functions as F # 绑定指定的Python解释器 os.environ['SPARK_HOME'] = '/export/server/spark' os.environ['PYSPARK_PYTHON'] = '/root/anaconda3/bin/python3' os.environ['PYSPARK_DRIVER_PYTHON'] = '/root/anaconda3/bin/python3' def sql(): # SQL # 3.2- 拆解数据结构。将json解析得到单个的字段 """ get_json_object(参数1,参数2):用来解析json串。一次只能得到一个字段的值 参数1:要解析的json字段名称 参数2:字段的解析路径 $.字段路径 """ etl_df = spark.sql(""" select get_json_object(value,'$.deviceID') as deviceID, get_json_object(value,'$.deviceType') as deviceType, get_json_object(value,'$.deviceSignal') as deviceSignal, get_json_object(value,'$.time') as time from iot """) etl_df.createTempView("etl") # 3.3- 各种信号强度>30各种类型的设备数量 和 它们的平均信号强度 result_df = spark.sql(""" select deviceType, count(deviceID) as cnt_deviceID, round(avg(deviceSignal),2) as avg_deviceSignal from etl where deviceSignal>30 group by deviceType """) # 4- 数据输出 # 5- 启动流式任务 result_df.writeStream.format('console').outputMode('complete').start().awaitTermination() def dsl(): result_df = etl_tmp_df.select( F.get_json_object('value', '$.deviceID').alias('deviceID'), F.get_json_object('value', '$.deviceType').alias('deviceType'), F.get_json_object('value', '$.deviceSignal').alias('deviceSignal'), F.get_json_object('value', '$.time').alias('time') ).where('deviceSignal>30').groupBy('deviceType').agg( F.count('deviceID').alias('cnt_deviceID'), F.round(F.avg('deviceSignal'), 2).alias('avg_deviceSignal') ) # 4- 数据输出 # 5- 启动流式任务 result_df.writeStream.format('console').outputMode('complete').start().awaitTermination() if __name__ == '__main__': # 1- 创建SparkSession对象 spark = SparkSession.builder\ .config("spark.sql.shuffle.partitions",2)\ .appName('iot')\ .master('local[*]')\ .getOrCreate() # 2- 数据输入 init_df = spark.readStream\ .format("kafka") \ .option("kafka.bootstrap.servers", "node1.itcast.cn:9092,node2.itcast.cn:9092") \ .option("subscribe", "search-log-topic") \ .load() # 3- 数据处理 # 3.1- 数据ETL:进行数据类型转换,将value字段bytes->字符串 etl_tmp_df = init_df.selectExpr("cast(value as string) as value") etl_tmp_df.createTempView('iot') # SQL # sql() # DSL dsl()
运行结果截图:
结构化流不支持的操作:
-
多个流同时聚合
-
limit和take不能使用
-
不能使用去重操作
-
Few types of outer joins on streaming Datasets are not supported. See the support matrix in the Join Operations section for more details.