离线ctr特征中心更新

news2025/1/8 6:05:43

3.8 离线ctr特征中心更新

学习目标

  • 目标
    • 了解特征服务中心的作用
  • 应用

3.8.1 特征服务中心

 

 

特征服务中心可以作为离线计算用户与文章的高级特征,充当着重要的角色。可以为程序提供快速的特征处理与特征结果,而且不仅仅提供给离线使用。还可以作为实时的特征供其他场景读取进行

原则是:用户,文章能用到的特征都进行处理进行存储,便于实时推荐进行读取

  • 存储形式
    • 存储到数据库HBASE中
    • 构造好样本,存储到TFRecords文件给TensorFlow模型训练

首先确定创建特征结果HBASE表:

create 'ctr_feature_user', 'channel'
 4                         column=channel:13, timestamp=1555647172980, value=[]                        
 4                         column=channel:14, timestamp=1555647172980, value=[]                        
 4                         column=channel:15, timestamp=1555647172980, value=[]                        
 4                         column=channel:16, timestamp=1555647172980, value=[]                        
 4                         column=channel:18, timestamp=1555647172980, value=[0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073, 0.2156294170196073]                                                      
 4                         column=channel:19, timestamp=1555647172980, value=[]                        
 4                         column=channel:20, timestamp=1555647172980, value=[]                        
 4                         column=channel:2, timestamp=1555647172980, value=[]                         
 4                         column=channel:21, timestamp=1555647172980, value=[]

create 'ctr_feature_article', 'article'
 COLUMN                     CELL                                                                        
 article:13401             timestamp=1555635749357, value=[18.0,0.08196639249252607,0.11217275332895373,0.1353835167902181,0.16086650318453152,0.16356418791892943,0.16740082750337945,0.18091837445730974,0.1907214431716628,0.2........................-0.04634634410271921,-0.06451843378804649,-0.021564142420785692,0.10212902152136256]

创建HIVE外部表,

create external table ctr_feature_user_hbase(
user_id STRING comment "user_id",
user_channel map comment "user_channel")
COMMENT "ctr table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,channel:")
TBLPROPERTIES ("hbase.table.name" = "ctr_feature_user");

create external table ctr_feature_article_hbase(
article_id STRING comment "article_id",
article_feature map comment "article")
COMMENT "ctr table"
STORED BY 'org.apache.hadoop.hive.hbase.HBaseStorageHandler'
WITH SERDEPROPERTIES ("hbase.columns.mapping" = ":key,article:")
TBLPROPERTIES ("hbase.table.name" = "ctr_feature_article");

3.8.2 用户特征中心更新

  • 目的:计算用户特征更新到HBASE
  • 步骤:
    • 获取特征进行用户画像权重过滤
    • 特征批量存储

获取特征进行用户画像权重过滤

# 构造样本
ctr.spark.sql("use profile")

user_profile_hbase = ctr.spark.sql(
    "select user_id, information.birthday, information.gender, article_partial, env from user_profile_hbase")

# 特征工程处理
# 抛弃获取值少的特征
user_profile_hbase = user_profile_hbase.drop('env', 'birthday', 'gender')

def get_user_id(row):
    return int(row.user_id.split(":")[1]), row.article_partial

user_profile_hbase_temp = user_profile_hbase.rdd.map(get_user_id)

from pyspark.sql.types import *

_schema = StructType([
    StructField("user_id", LongType()),
    StructField("weights", MapType(StringType(), DoubleType()))
])

user_profile_hbase_schema = ctr.spark.createDataFrame(user_profile_hbase_temp, schema=_schema)

def frature_preprocess(row):

    from pyspark.ml.linalg import Vectors

    channel_weights = []
    for i in range(1, 26):
        try:
            _res = sorted([row.weights[key] for key
                           in row.weights.keys() if key.split(':')[0] == str(i)])[:10]
            channel_weights.append(_res)
        except:
            channel_weights.append([0.0] * 10)

    return row.user_id, channel_weights

res = user_profile_hbase_schema.rdd.map(frature_preprocess).collect()

特征批量存储,保存用户每个频道的特征

import happybase
# 批量插入Hbase数据库中
pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
with pool.connection() as conn:
    ctr_feature = conn.table('ctr_feature_user')
    with ctr_feature.batch(transaction=True) as b:
        for i in range(len(res)):
            for j in range(25):
                b.put("{}".format(res[i][0]).encode(),{"channel:{}".format(j+1).encode(): str(res[i][1][j]).encode()})
    conn.close()

3.8.2 文章特征中心更新

文章特征有哪些?

  • 关键词权重
  • 文章的频道
  • 文章向量结果

存储这些特征以便于后面实时排序时候快速使用特征

  • 步骤:
    • 1、读取相关文章画像
    • 2、进行文章相关特征处理和提取
    • 3、合并文章所有特征作为模型训练或者预测的初始特征
    • 4、文章特征存储到HBASE

读取相关文章画像

ctr.spark.sql("use article")
article_profile = ctr.spark.sql("select * from article_profile")

进行文章相关特征处理和提取

def article_profile_to_feature(row):
    try:
        weights = sorted(row.keywords.values())[:10]
    except Exception as e:
        weights = [0.0] * 10
    return row.article_id, row.channel_id, weights
article_profile = article_profile.rdd.map(article_profile_to_feature).toDF(['article_id', 'channel_id', 'weights'])

article_profile.show()

 

 

article_vector = ctr.spark.sql("select * from article_vector")
article_feature = article_profile.join(article_vector, on=['article_id'], how='inner')
def feature_to_vector(row):
    from pyspark.ml.linalg import Vectors
    return row.article_id, row.channel_id, Vectors.dense(row.weights), Vectors.dense(row.articlevector)
article_feature = article_feature.rdd.map(feature_to_vector).toDF(['article_id', 'channel_id', 'weights', 'articlevector'])

指定所有文章特征进行合并

# 保存特征数据
cols2 = ['article_id', 'channel_id', 'weights', 'articlevector']
# 做特征的指定指定合并
article_feature_two = VectorAssembler().setInputCols(cols2[1:4]).setOutputCol("features").transform(article_feature)

结果:

+----------+----------+--------------------+--------------------+--------------------+
|article_id|channel_id|             weights|       articlevector|            features|
+----------+----------+--------------------+--------------------+--------------------+
|        26|        17|[0.19827163395829...|[0.02069368539384...|[17.0,0.198271633...|
|        29|        17|[0.26031398249056...|[-0.1446092289546...|[17.0,0.260313982...|
|       474|        17|[0.49818598558926...|[0.17293323921293...|[17.0,0.498185985...|
|      1677|        17|[0.19827339246090...|[-0.1303829028565...|[17.0,0.198273392...|
|      1697|         6|[0.25105539265038...|[0.05229978313861...|[6.0,0.2510553926...|
|      1806|        17|[0.18449119772340...|[0.02166337053188...|[17.0,0.184491197...|
|      1950|        17|[0.33331407122173...|[-0.3318378543653...|[17.0,0.333314071...|
|      2040|        17|[0.38583431341698...|[-0.0164312324191...|[17.0,0.385834313...|
|      2250|         6|[0.46477621366740...|[-0.0597617824653...|[6.0,0.4647762136...|
|      2453|        13|[0.50514620188273...|[-0.1038588426578...|[13.0,0.505146201...|
|      2509|        13|[0.15138306650944...|[0.04533940468085...|[13.0,0.151383066...|
|      2529|        17|[0.11634963900866...|[0.02575729180313...|[17.0,0.116349639...|
|      2927|         6|[0.28513034617795...|[0.09066218648052...|[6.0,0.2851303461...|
|      3091|         6|[0.23478830492918...|[0.08091488655859...|[6.0,0.2347883049...|
|      3506|        17|[0.22844780420769...|[0.08157531127196...|[17.0,0.228447804...|
|      3764|        15|[0.27265314149033...|[-0.1795835048850...|[15.0,0.272653141...|
|      4590|        19|[0.40296288036812...|[0.07013928253496...|[19.0,0.402962880...|
|      4823|        19|[0.21729897161021...|[0.04938335582130...|[19.0,0.217298971...|
|      4894|        19|[0.11699953656531...|[0.04255864598683...|[19.0,0.116999536...|
|      5385|        15|[0.34743921088686...|[0.10922433026109...|[15.0,0.347439210...|
+----------+----------+--------------------+--------------------+--------------------+
only showing top 20 rows

保存到特征数据库中

# 保存到特征数据库中
def save_article_feature_to_hbase(partition):
    import happybase
    pool = happybase.ConnectionPool(size=10, host='hadoop-master')
    with pool.connection() as conn:
        table = conn.table('ctr_feature_article')
        for row in partition:
            table.put('{}'.format(row.article_id).encode(),
                     {'article:{}'.format(row.article_id).encode(): str(row.features).encode()})

article_feature_two.foreachPartition(save_article_feature_to_hbase)

3.8.3 离线特征中心定时更新

添加update.py更新程序

def update_ctr_feature():
    """
    定时更新用户、文章特征
    :return:
    """
    fp = FeaturePlatform()
    fp.update_user_ctr_feature_to_hbase()
    fp.update_article_ctr_feature_to_hbase()

添加apscheduler定时运行

# 添加定时更新用户文章特征结果的程序,每个4小时更新一次
scheduler.add_job(update_ctr_feature, trigger='interval', hours=4)

完整代码:

import os
import sys
# 如果当前代码文件运行测试需要加入修改路径,否则后面的导包出现问题
BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
sys.path.insert(0, os.path.join(BASE_DIR))
from offline import SparkSessionBase
# from offline.utils import textrank, segmentation
import happybase
import pyspark.sql.functions as F
from datetime import datetime
from datetime import timedelta
import time
import gc


class FeaturePlatform(SparkSessionBase):
    """特征更新平台
    """
    SPARK_APP_NAME = "featureCenter"
    ENABLE_HIVE_SUPPORT = True

    def __init__(self):
        # _create_spark_session
        # _create_spark_hbase用户spark sql 操作hive对hbase的外部表
        self.spark = self._create_spark_hbase()

    def update_user_ctr_feature_to_hbase(self):
        """
        :return:
        """
        clr.spark.sql("use profile")

        user_profile_hbase = self.spark.sql(
            "select user_id, information.birthday, information.gender, article_partial, env from user_profile_hbase")

        # 特征工程处理
        # 抛弃获取值少的特征
        user_profile_hbase = user_profile_hbase.drop('env', 'birthday', 'gender')

        def get_user_id(row):
            return int(row.user_id.split(":")[1]), row.article_partial

        user_profile_hbase_temp = user_profile_hbase.rdd.map(get_user_id)

        from pyspark.sql.types import *

        _schema = StructType([
            StructField("user_id", LongType()),
            StructField("weights", MapType(StringType(), DoubleType()))
        ])

        user_profile_hbase_schema = self.spark.createDataFrame(user_profile_hbase_temp, schema=_schema)

        def frature_preprocess(row):

            from pyspark.ml.linalg import Vectors

            channel_weights = []
            for i in range(1, 26):
                try:
                    _res = sorted([row.weights[key] for key
                                   in row.weights.keys() if key.split(':')[0] == str(i)])[:10]
                    channel_weights.append(_res)
                except:
                    channel_weights.append([])

            return row.user_id, channel_weights

        res = user_profile_hbase_schema.rdd.map(frature_preprocess).collect()

        # 批量插入Hbase数据库中
        pool = happybase.ConnectionPool(size=10, host='hadoop-master', port=9090)
        with pool.connection() as conn:
            ctr_feature = conn.table('ctr_feature_user')
            with ctr_feature.batch(transaction=True) as b:
                for i in range(len(res)):
                    for j in range(25):
                        b.put("{}".format(res[i][0]).encode(),
                              {"channel:{}".format(j + 1).encode(): str(res[i][1][j]).encode()})
            conn.close()

    def update_article_ctr_feature_to_hbase(self):
        """
        :return:
        """
        # 文章特征中心
        self.spark.sql("use article")
        article_profile = self.spark.sql("select * from article_profile")

        def article_profile_to_feature(row):
            try:
                weights = sorted(row.keywords.values())[:10]
            except Exception as e:
                weights = [0.0] * 10
            return row.article_id, row.channel_id, weights

        article_profile = article_profile.rdd.map(article_profile_to_feature).toDF(
            ['article_id', 'channel_id', 'weights'])

        article_vector = self.spark.sql("select * from article_vector")
        article_feature = article_profile.join(article_vector, on=['article_id'], how='inner')

        def feature_to_vector(row):
            from pyspark.ml.linalg import Vectors
            return row.article_id, row.channel_id, Vectors.dense(row.weights), Vectors.dense(row.articlevector)

        article_feature = article_feature.rdd.map(feature_to_vector).toDF(
            ['article_id', 'channel_id', 'weights', 'articlevector'])

        # 保存特征数据
        cols2 = ['article_id', 'channel_id', 'weights', 'articlevector']
        # 做特征的指定指定合并
        article_feature_two = VectorAssembler().setInputCols(cols2[1:4]).setOutputCol("features").transform(
            article_feature)

        # 保存到特征数据库中
        def save_article_feature_to_hbase(partition):
            import happybase
            pool = happybase.ConnectionPool(size=10, host='hadoop-master')
            with pool.connection() as conn:
                table = conn.table('ctr_feature_article')
                for row in partition:
                    table.put('{}'.format(row.article_id).encode(),
                              {'article:{}'.format(row.article_id).encode(): str(row.features).encode()})

        article_feature_two.foreachPartition(save_article_feature_to_hbase)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179831.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【My Electronic Notes系列——直流稳压电源】

目录 序言: 🏆🏆人生在世,成功并非易事,他需要破茧而出的决心,他需要永不放弃的信念,他需要水滴石穿的坚持,他需要自强不息的勇气,他需要无畏无惧的凛然。要想成功&…

快速安装OpenShift在Ubuntu系统上并使用

目录 OpenShift简介: 服务器信息 安装Docker 安装OpenShift 访问Dashboard oc常用命令 附 OpenShift简介: OpenShift 是一个开源容器应用平台,由 Red Hat 开发。它建立在 Kubernetes 之上,并提供用于部署、扩展和管理容器…

【Linux】基础:线程的同步与互斥

【Linux】基础:线程的同步与互斥 摘要:本文主要介绍线程的同步与互斥方面的内容,分为理论与实现两部分完成。首先从整体上介绍线程同步与互斥相关概念,在理解概念后对两者分开介绍。在互斥方面,主要介绍内容为互斥量的…

LinuxC—线程

线程 1 线程的基本概念 什么是线程 进程其实是一个容器,当我们在编程的时候实际上是在以线程为单位进行编程,包括处理器的调度也是以线程为单位的,一个进程可以有多个线程,一个进程的多个线程共享相同的进程空间,所以…

设计模式 - 创建型模式_抽象工厂模式

文章目录创建型模式概述Case场景模拟工程模拟早期单机Redis的使用Bad ImplBetter Impl (抽象⼯⼚模式重构代码)定义适配接⼝实现集群适配器接口代理方式的抽象工厂类单元测试小结创建型模式 创建型模式提供创建对象的机制, 能够提升已有代码…

0、Spring工程构建Spring快速入门Spring配置文件详解注入Sprint相关API

1、Spring工程构建 创建工程项目目录文件夹 IDEA选择项目new一个module 配置案例 aop创建 创建并下载完毕后,点击file选择projert 选择按照的jdk版本 output选择当前目录, 点击右下方apply 选择facets,点击""号选择web 选择当前…

Pinia状态管理

1、Pinia和Vuex的对比 1.1、什么是Pinia呢? Pinia(发音为/piːnjʌ/,如英语中的“peenya”)是最接近pia(西班牙语中的菠萝)的词; Pinia开始于大概2019年,最初是作为一个实验为Vue…

Linux使用操作

文章目录各类小技巧(快捷键)软件安装systemctl软连接日期、时区IP地址、主机名IP地址和主机名虚拟机配置固定IP网络传输下载和网络请求端口进程管理主机状态环境变量上传、下载压缩、解压各类小技巧(快捷键) 强制停止 Linux某些程…

python语法 dot函数

dot是numpy里的函数,主要用于求向量相乘,矩阵乘法,矩阵与向量乘法一、一维向量相乘要求元素个数相同,相当于求内积,对应元素相乘再相加,“1*3 2*4 11”二、矩阵和矩阵相乘遵循矩阵乘法法则“左行 * 右列”…

高通平台开发系列讲解(WIFI篇)什么是WLAN无线局域网

文章目录 一、什么是WLAN1.1、WLAN发展史1.2、WLAN工作频段二、高通相关文件2.1、配置文件2.2、开机启动2.3、wpa_supplicant沉淀、分享、成长,让自己和他人都能有所收获!😄 📢本文将基于高通平台介绍什么是无线局域网。 一、什么是WLAN 在WLAN领域被大规模推广和商用的是…

【编程入门】开源记事本(鸿蒙Java版)

背景 前面已输出多个系列: 《十余种编程语言做个计算器》 《十余种编程语言写2048小游戏》 《17种编程语言10种排序算法》 《十余种编程语言写博客系统》 《十余种编程语言写云笔记》 本系列对比云笔记,将更为简化,去掉了网络调用&#xff0…

WebSocket 入门:简易聊天室

大家好,我是前端西瓜哥,今天我们用 WebSocket 来实现一个简单的聊天室。 WebSocket 是一个应用层协议,有点类似 HTTP。但和 HTTP 不一样的是,它支持真正的全双工,即不仅客户端可以主动发消息给服务端,服务…

基于Tkinter制作简易的串口bootloader上位机

文章目录前言1.测试设备1.1 UART Bootloaer软件架构图1.2 UART Bootloader流程图1.3 通信数据处理1.3.1 S19文件的简单介绍1.3.2 S19文件的传输方式1.3.2 接收数据之后的处理1.4 链接文件设置1.4.1 Bootloader设置1.4.2 Application设置2.上位机2.1 参考资料2.2 Tkinter简介2.3…

C++初阶:vector类

文章目录1 vector介绍2 实现vector2.1 类的定义2.2 默认成员函数2.2.1 构造函数2.2.2 析构函数2.2.3 拷贝构造2.2.4 赋值重载2.3访问接口2.4 容量接口2.5 修改接口2.5.1 尾插尾删2.5.2 任意位置插入2.5.3 任意位置删除2.6 其他接口1 vector介绍 1 vector是表示可变大小数组的序…

每日学术速递1.26

CV - 计算机视觉 今天带来的是北航IRIP实验室被国际人工智能联合会议IJCAI-ECAI 2022接收的3篇论文。 IJCAI 是人工智能领域中最主要的学术会议之一,原为单数年召开,自2015年起改为每年召开,本次IJCAI与ECAI一起召开。IJCAI官网显示&#xf…

【Linux】冯诺依曼体系结构与操作系统概念理解

👑作者主页:安 度 因 🏠学习社区:StackFrame 📖专栏链接:Linux 文章目录一、前言二、冯诺依曼体系结构1、体系简述2、内存的重要性3、硬件方案解释软件行为4、体系结构中的数据流动5、拓展三、操作系统简述…

ch1 操作系统启动

lab1 实验准备 按照实验解压后进入oslab中,按照make编译。 cd /home/shiyanlou/oslab/ tar -zxvf hit-oslab-linux-20110823.tar.gz \-C /home/shiyanlou/ ./run cd ./linux-0.11/ make all make clean ..... make all运行脚本即可启动内核 调试 汇编级调试和C语…

贪心算法的题目

每一步都做出一个局部最优的选择,最终的结果就是全局最优 只有一部分问题才能用贪心算法(严格来讲,一个问题能不能用贪心算法需要证明的) 2022.8.30 蔚来笔试题: 有a个y,b个o,c个u,用这些字母拼成一个字符串&#xf…

Anaconda软件中的 Environments 及 Jupyter Lab使用方法介绍

来源:投稿 作者:助教-Frank 编辑:学姐 本篇是打造舒适的AI开发环境系列-软件篇1 上期内容:学人工智能电脑&主机八大件配置选择指南 本文的重点: (1)Environments使用中如何安装python包.; (2)Jupyter Lab如何在…

Kettle(6):表输入组件——mysql转mysql

1 需求 前面我们已经将Excel中数据抽取到了MySQL的t_user表中。 现在有了新需求,要将MySQL数据库中的 t_user 表中的数据抽取出来,装载到另外一张表 t_user1中。 2 构建Kettle数据流图 2.1 从核心对象的输入组件中,将「表输入」组件拖拽到中…