1.业务分析
2.数据截图
3.代码实现:
main.py:
#cording:utf8
from pyspark import SparkConf, SparkContext
from pyspark.storagelevel import StorageLevel
from defs import content_jieba, filter_word, append_word, extract_user_and_word
from operator import add
if __name__ == '__main__':
conf = SparkConf().setAppName('test')
sc = SparkContext(conf=conf)
# 读取数据
file_rdd = sc.textFile('hdfs://pyspark01:8020/input/SogouQ.txt')
# 对数据进行划分
split_rdd = file_rdd.map(lambda x: x.split('\t'))
# 因为要做多个需求,split_RDD 作为基础的RDD会被多次使用
split_rdd.persist(StorageLevel.DISK_ONLY)
# TODO: 需求1:用户搜素的关键‘词’分析
# 主要分析热点词
# 将所有的搜索内容取出
# print(split_rdd.takeSample(True, 3))
context_rdd = split_rdd.map(lambda x: x[2])
# 对搜索的内容进行分词分析
word_rdd = context_rdd.flatMap(content_jieba)
# print(word_rdd.collect())
# 对数据进行过滤,补全
# 博学 谷 -> 博学谷
# 传智播 客 -> 传智播客
# 院校 帮 -> 院校帮
filtered_rdd = word_rdd.filter(filter_word)
# 数据补全
append_rdd = filtered_rdd.map(append_word)
# 对单词进行分组 聚合 排序 求出前五名
result_1 = append_rdd.reduceByKey(lambda a, b: a + b).\
sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print('需求1结果:', result_1)
# TODO:需求2:用户和关键词组合分析
# 1,我喜欢传智播客
# 1+我 1+喜欢 1+传智播客
user_content_rdd = split_rdd.map(lambda x: (x[1],x[2]))
# 对用户的搜索内容进行分词,分词和用户的ID再次组合
user_word_rdd = user_content_rdd.flatMap(extract_user_and_word)
# 对单词进行分组 聚合 排序 求出前五名
result_2 = user_word_rdd.reduceByKey(lambda a, b: a + b).\
sortBy(lambda x: x[1], ascending=False, numPartitions=1).take(5)
print("需求2的结果:", result_2)
# TODO:需求3:热门搜索时间段分析
# 取出来所有时间
time_rdd = split_rdd.map(lambda x: x[0])
# 对事件进行处理,只保留小时精度即可
hour_with_one_rdd = time_rdd.map(lambda x: (x.split(":")[0],1))
# 分组 聚合 排序
result_3 = hour_with_one_rdd.reduceByKey(add).\
sortBy(lambda x: x[1],ascending=False,numPartitions=1).collect()
print("需求3的结果:", result_3)
defs.py:
import jieba
def content_jieba(data):
'''使用jieba分词库 对数据分词处理'''
seg = jieba.cut_for_search(data)
l = list()
for word in seg:
l.append(word)
return l
def filter_word(data):
return data not in ['谷', "帮", '客']
def append_word(data):
if data == '院校' : data = '院校帮'
if data == '博学' : data = '博学谷'
if data == '传智播' : data = '传智播客'
return (data, 1)
def extract_user_and_word(data):
'''传入数据 例(1,我喜欢传智播客)'''
user_id = data[0]
content = data[1]
# 对content进行访问
words = content_jieba(content)
return_list = list()
for word in words:
# 过滤 谷、帮、客
if filter_word(word):
return_list.append((user_id + '_' + append_word(word)[0],1))
return return_list