用户画像增量更新系列二

news2025/1/16 12:51:53

进行用户日志数据处理

原始日志数据

  

结果:

 

思路:按照user_id的行为一条条处理,根据用户的行为类型判别。

  • 由于sqlDF每条数据可能会返回多条结果,我们可以使用rdd.flatMap函数或者yield
    • 格式:["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"]
if sqlDF.collect():
    def _compute(row):
        # 进行判断行为类型
        _list = []
        if row.action == "exposure":
            for article_id in eval(row.articleId):
                _list.append(
                    [row.userId, row.actionTime, article_id, row.channelId, False, False, False, True, row.readTime])
            return _list
        else:
            class Temp(object):
                shared = False
                clicked = False
                collected = False
                read_time = ""

            _tp = Temp()
            if row.action == "share":
                _tp.shared = True
            elif row.action == "click":
                _tp.clicked = True
            elif row.action == "collect":
                _tp.collected = True
            elif row.action == "read":
                _tp.clicked = True
            else:
                pass
            _list.append(
                [row.userId, row.actionTime, int(row.articleId), row.channelId, _tp.shared, _tp.clicked, _tp.collected,
                 True,
                 row.readTime])
            return _list
    # 进行处理
    # 查询内容,将原始日志表数据进行处理
    _res = sqlDF.rdd.flatMap(_compute)
    data = _res.toDF(["user_id", "action_time","article_id", "channel_id", "shared", "clicked", "collected", "exposure", "read_time"])

合并历史数据,存储到user_article_basic表中

# 合并历史数据,插入表中
old = uup.spark.sql("select * from user_article_basic")
# 由于合并的结果中不是对于user_id和article_id唯一的,一个用户会对文章多种操作
new_old = old.unionAll(data)
  • HIVE目前支持hive终端操作ACID,不支持python的pyspark原子性操作,并且开启配置中开启原子性相关配置也不行。
new_old.registerTempTable("temptable")
# 按照用户,文章分组存放进去
uup.spark.sql(
        "insert overwrite table user_article_basic select user_id, max(action_time) as action_time, "
        "article_id, max(channel_id) as channel_id, max(shared) as shared, max(clicked) as clicked, "
        "max(collected) as collected, max(exposure) as exposure, max(read_time) as read_time from temptable "
        "group by user_id, article_id")

这里面需要根据用户ID和文章ID分组。

 

 

3.2.2 用户标签权重计算

3.2.2.1 画像存储

如何存储?

用户画像,作为特征提供给一些算法排序,方便与快速读取使用,选择存储在Hbase当中。如果离线分析也想要使用我们可以建立HIVE到Hbase的外部表。

  • 如果存到HIVE,建立HBASE关联过去,删除Hive表对HBase没有影响,但是先删除HBase表Hive就会报TableNotFoundException
  • HBase中的有同样的主键的行会被更新成最新插入的。可以依靠hbase来 新增/修改单条记录, 然后利用hive这个外表来实现hbase数据统计

HBase表设计

create 'user_profile', 'basic','partial','env'

示例:

put 'user_profile', 'user:2', 'partial:{channel_id}:{topic}': weights

put 'user_profile', 'user:2', 'basic:{info}': value

put 'user_profile', 'user:2', 'env:{info}': value

Hive关联表

create external table user_profile_hbase(
user_id STRING comment "userID",
information map<string, DOUBLE> comment "user basic information",
article_partial map<string, DOUBLE> comment "article partial",
env map<string, INT> comment "user env")
COMMENT "user profile table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,basic:,partial:,env:")
TBLPROPERTIES ("hbase.table.name" = "user_profile");

3.2.2.2 Spark SQL关联表读取问题?

创建关联表之后,离线读取表内容需要一些依赖包。解决办法:

  • 拷贝/root/bigdata/hbase/lib/下面hbase-*.jar 到 /root/bigdata/spark/jars/目录下
  • 拷贝/root/bigdata/hive/lib/h*.jar 到 /root/bigdata/spark/jars/目录下

上述操作三台虚拟机都执行一遍。

3.2.2.3 用户画像频道关键词获取与权重计算

  • 目标:获取用户1~25频道(不包括推荐频道)的关键词,并计算权重
  • 步骤:
    • 1、读取user_article_basic表,合并行为表文章画像中的主题词
    • 2、进行用户权重计算公式、同时落地存储

读取user_article_basic表

# 获取基本用户行为信息,然后进行文章画像的主题词合并
uup.spark.sql("use profile")
# 取出日志中的channel_id
user_article_ = uup.spark.sql("select * from user_article_basic").drop('channel_id')
uup.spark.sql('use article')
article_label = uup.spark.sql("select article_id, channel_id, topics from article_profile")
# 合并使用文章中正确的channel_id
click_article_res = user_article_.join(article_label, how='left', on=['article_id'])

对channel_id进行处理的原因:日志中的频道号,是通过Web后台进行埋点,有些并没有真正对应文章所属频道(推荐频道为0号频道,获取曝光文章列表时候埋点会将文章对应的频道在日志中是0频道。)

 

 

这样的主题词列表进行计算权重不方便对于用户的每个主题词权重计算,需要进行explode

 

 

# 将字段的列表爆炸
import pyspark.sql.functions as F
click_article_res = click_article_res.withColumn('topic', F.explode('topics')).drop('topics')

进行用户权重计算公式、同时落地存储。

3.2.2.4 用户画像之标签权重算法

用户标签权重 =( 行为类型权重之和) × 时间衰减

行为类型权重

分值的确定需要整体协商

行为分值
阅读时间(<1000)1
阅读时间(>=1000)2
收藏2
分享3
点击5

完成对关键行为赋予权重分值后,即可开始计算,首先我们把用户浏览(收听、观看)的内容全部按照上面内容标签化的方式打散成标签

时间衰减:1/(log(t)+1) ,t为时间发生时间距离当前时间的大小。

# 计算每个用户对每篇文章的标签的权重
def compute_weights(rowpartition):
    """处理每个用户对文章的点击数据
    """
    weightsOfaction = {
        "read_min": 1,
        "read_middle": 2,
        "collect": 2,
        "share": 3,
        "click": 5
    }

    import happybase
    from datetime import datetime
    import numpy as np
    #  用于读取hbase缓存结果配置
    pool = happybase.ConnectionPool(size=10, host='192.168.19.137', port=9090)

    # 读取文章的标签数据
    # 计算权重值
    # 时间间隔
    for row in rowpartition:

        t = datetime.now() - datetime.strptime(row.action_time, '%Y-%m-%d %H:%M:%S')
        # 时间衰减系数
        time_exp = 1 / (np.log(t.days + 1) + 1)

        if row.read_time == '':
            r_t = 0
        else:
            r_t = int(row.read_time)
        # 浏览时间分数
        is_read = weightsOfaction['read_middle'] if r_t > 1000 else weightsOfaction['read_min']

        # 每个词的权重分数
        weigths = time_exp * (
                    row.shared * weightsOfaction['share'] + row.collected * weightsOfaction['collect'] + row.
                    clicked * weightsOfaction['click'] + is_read)

#        with pool.connection() as conn:
#            table = conn.table('user_profile')
#            table.put('user:{}'.format(row.user_id).encode(),
#                      {'partial:{}:{}'.format(row.channel_id, row.topic).encode(): json.dumps(
#                          weigths).encode()})
#            conn.close()

click_article_res.foreachPartition(compute_weights)

落地Hbase中之后,在HBASE中查询,happybase或者hbase终端

import happybase
#  用于读取hbase缓存结果配置
pool = happybase.ConnectionPool(size=10, host='192.168.19.137', port=9090)

with pool.connection() as conn:
    table = conn.table('user_profile')
    # 获取每个键 对应的所有列的结果
    data = table.row(b'user:2', columns=[b'partial'])
    conn.close()
hbase(main):015:0> get 'user_profile', 'user:2'

同时在HIVE中查询

hive> select * from user_profile_hbase limit 1;
OK
user:1  {"birthday":0.0,"gender":null}  {"18:##":0.25704484358604845,"18:&#":0.25704484358604845,"18:+++":0.23934588700996243,"18:+++++":0.23934588700996243,"18:AAA":0.2747964402379244,"18:Animal":0.2747964402379244,"18:Author":0.2747964402379244,"18:BASE":0.23934588700996243,"18:BBQ":0.23934588700996243,"18:Blueprint":1.6487786414275463,"18:Code":0.23934588700996243,"18:DIR....................................................

3.2.3 基础信息画像更新

同时对于用户的基础信息也需要更新到用户的画像中。

    def update_user_info(self):
        """
        更新用户的基础信息画像
        :return:
        """
        self.spark.sql("use toutiao")

        user_basic = self.spark.sql("select user_id, gender, birthday from user_profile")

        # 更新用户基础信息
        def _udapte_user_basic(partition):
            """更新用户基本信息
            """
            import happybase
            #  用于读取hbase缓存结果配置
            pool = happybase.ConnectionPool(size=10, host='172.17.0.134', port=9090)
            for row in partition:

                from datetime import date
                age = 0
                if row.birthday != 'null':
                    born = datetime.strptime(row.birthday, '%Y-%m-%d')
                    today = date.today()
                    age = today.year - born.year - ((today.month, today.day) < (born.month, born.day))

                with pool.connection() as conn:
                    table = conn.table('user_profile')
                    table.put('user:{}'.format(row.user_id).encode(),
                              {'basic:gender'.encode(): json.dumps(row.gender).encode()})
                    table.put('user:{}'.format(row.user_id).encode(),
                              {'basic:birthday'.encode(): json.dumps(age).encode()})
                    conn.close()

        user_basic.foreachPartition(_udapte_user_basic)
        logger.info(
            "{} INFO completely update infomation of basic".format(datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
hbase(main):016:0> get 'user_profile', 'user:2'

3.2.4 用户画像增量更新定时开启

  • 用户画像增量更新代码整理
  • 添加定时任务以及进程管理

在main.py和update.py文件中增加

from offline.update_user import UpdateUserProfile


def update_user_profile():
    """
    更新用户画像
    """
    uup = UpdateUserProfile()
    if uup.update_user_action_basic():
        uup.update_user_label()
        uup.update_user_info()
scheduler.add_job(update_user_profile, trigger='interval', hours=2)

添加之后,进行supervisor的update。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/178424.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

总结:计算机中字符串比较大小的规则

总结&#xff1a;计算机中字符串比较大小的规则一背景&#xff1a;二Unicode编码表&#xff1a;字符越靠后&#xff0c;对应的十进制值越大三单个字符之间比较规则&#xff1a;四案例演示&#xff1a;单个字符与单个字符之间比较大小1.前提&#xff1a;汉字“一”与汉字“万”&…

Elasticsearch:Elasticsearch percolate 查询

Elasticsearch 通常如何工作&#xff1f; 我们将文档索引到 Elasticsearch 中并对其运行查询以获得满足提供的搜索条件的文档。 我们构造一个匹配或术语查询作为输入&#xff0c;匹配查询的文档作为结果返回。 但这不是 percolate query 的情况..... 让我们看看这篇文章中的 p…

10.Java方法学习知识点大全

文章目录前言一、什么是方法1.什么是方法?2.实际开发中,什么时候用到方法?3.实际开发中,方法有什么好处?二、最简单的方法定义和调用1.方法的格式2.方法的调用3.看代码说结果4.为什么要有带参数的方法呢?三、带参数的方法定义和调用1.带参数的方法定义和调用2.形参和实参3.…

NuSphere PhpED Pro 19.5 Crack

PhpED是PHP&#xff08;PHP IDE&#xff09;&#xff0c;HTML&#xff0c;CSS&#xff0c;XML&#xff0c;SMARTY&#xff0c;XHTML等的I ntegated Development Environment。 高级代码编辑器、可靠的 dbg 调试器、高效的数据库连接客户端以及快速安全的部署能力的平衡组合使 P…

90. 注意力分数及代码实现

1. 注意力分数 2. 拓展到高维度 3. Additive Attention ps&#xff1a; 这种的好处是&#xff0c;key&#xff0c;value&#xff0c;query的长度可以不一样 4. Scaled Dot-Product Attention n个query&#xff0c;m个key-value 对最后的结果是n x m的矩阵&#xff0c;第i行就表…

LeetCode[684]冗余连接

难度&#xff1a;中等题目&#xff1a;树可以看成是一个连通且 无环 的 无向 图。给定往一棵 n个节点 (节点值 1&#xff5e;n) 的树中添加一条边后的图。添加的边的两个顶点包含在 1到 n中间&#xff0c;且这条附加的边不属于树中已存在的边。图的信息记录于长度为 n的二维数组…

Python 压缩 css 文件,第三方模块推荐

本篇博客为大家详细介绍一下如何在 Python 中压缩 CSS 文件。 正式开始前&#xff0c;需要准备一个未压缩过的 CSS 文件。 Python 压缩 csscsscompressor 库使用在 Flask 中压缩 css 文件cssmin 库的用法rcssmin 库的用法总结csscompressor 库使用 在 Python 中可以使用多种方…

CSS之浮动以及清除浮动的几种方式

一. 什么是 CSS Float&#xff08;浮动&#xff09; CSS 的 Float&#xff08;浮动&#xff09;&#xff0c;会使元素向左或向右移动&#xff0c;其周围的元素也会重新排列。 Float&#xff08;浮动&#xff09;&#xff0c;往往是用于图像&#xff0c;但它在布局时一样非常有…

【大数据管理】Java实现字典树TireTree

实现字典树&#xff0c;支持插入和删除&#xff0c;能够打印每一层的数据示例数据“SJ”, “SHJ”, “SGYY”,"HGL" ,将这些数据插入前缀树&#xff0c;打印树&#xff0c;修改SHZ为SHHZ 解题思路 Trie树即字典树&#xff0c;又称单词查找树或键树&#xff0c;是一…

Linux下进程控制详解

目录 一、进程创建 1.1 初识fork 1.2 函数返回值 1.3 写时拷贝技术 1.4 fork函数的使用场景 1.5 fork函数的失败原因 二、进程终止 2.1 进程退出场景 2.2 进程退出码 2.3 进程正常退出方法 2.3.1 exit函数 2.3.2 _exit函数 2.3.3 return方法 2.3.4 方法分析对比 …

【LINUX修行之路】——工具篇gcc/g++的使用和自动化构建工具make/makefile

学习范围&#xff1a;✔️LINUX ✔️ gcc/g✔️make/makefile作者 &#xff1a;蓝色学者 文章目录一、前言二、概念什么是gcc/g&#xff1f;什么是make/makefile&#xff1f;三、教程3.1gcc/g命令3.2make/makefile依赖关系依赖方法编写makefile文件四、资源一、前言 欢迎大家来…

谷粒学院——Day20【项目总结】

❤ 作者主页&#xff1a;欢迎来到我的技术博客&#x1f60e; ❀ 个人介绍&#xff1a;大家好&#xff0c;本人热衷于Java后端开发&#xff0c;欢迎来交流学习哦&#xff01;(&#xffe3;▽&#xffe3;)~* &#x1f34a; 如果文章对您有帮助&#xff0c;记得关注、点赞、收藏、…

计算机组成原理实验-logisim实现自动售糖机

一.作业内容; 二.设计分析&#xff1a; 首先我们先确定输入和输出&#xff0c;根据题目的提示很明显可以看出因为每次可以投入10元或者5元硬币&#xff0c;当总钱数达到15元或者超过15元的时候&#xff0c;自动出糖&#xff0c;并且机器不找零&#xff0c;所以可以看出最大的钱…

基于 V2G 技术的电动汽车实时调度策略(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

第九层(2):STL之string类

文章目录前情回顾string类string类的本质string与char*的区别string类的特点string类的构造函数string类内的字符串追加函数string类内的字符串查找函数string类内的字符串替换函数string类内的字符串比较函数string类内的字符单个访问函数string类内的插入函数string类内的删除…

最小化最大值+拓扑排序要点+概率

今天嫖来的两道题&#xff1a; D.ScoreofaTreeD. Score of a TreeD.ScoreofaTree E.EdgeReverseE. Edge ReverseE.EdgeReverse DDD题是比较离谱的一道题&#xff0c;你在做的时候好像是dp&#xff0c;但是选择的情况太多了&#xff0c;其实对于每一个节点来说&#xff0c;除了叶…

fpga实操训练(fpga和cpu之间的配合)

【 声明:版权所有,欢迎转载,请勿用于商业用途。 联系信箱:feixiaoxing @163.com】 cpu和fpga之间,各有各的优势,cpu开发比较快捷,程序员比较好找;fpga对于基础运算效率高,但是找人不好找。实际产品的开发中,一般cpu负责需要接口定义和个性化定制的地方,而fp…

【Datewhale一起吃瓜 Task3】啃瓜第四章

文章目录决策树学习过程预测过程如何划分信息熵信息增益增益率基尼指数泛化能力关键&#xff1a;剪枝预剪枝后剪枝比较缺失值处理&#xff1a;样本赋权&#xff0c;权重划分决策树 决策树基于“树”结构进行决策 每个内部节点对应于某个属性上的测试每个分支对应于该属性的某个…

OpenGL ES着色器语言(GLSL ES)规范 ——下篇

文章目录前言分支和循环if、if-elseforcontinue、break、discard着色器内置变量函数函数定义规范声明webgl内置函数存储限定字constattributeuniformvarying精度限定字预处理指令总结前言 本篇接上文继续对着色器语言规范进行讲解&#xff0c;本文的内容包括&#xff1a;分支和…

Windows下JetBrains GoLand环境配置记录

闲来无事&#xff0c;go go go 这篇文章不是最简单的配置方法&#xff0c;相对简单的配置方法见文末引用。 本文记录了我遇见的一些问题以及解决方案与解释。 Go编译环境配置 首先得前往谷歌的网站下载go语言的镜像文件&#xff1a; Downloads - The Go Programming Languag…