【电影推荐系统】基于 ALS 的协同过滤推荐算法

news2025/1/24 16:26:49

目录

目的

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

 完整代码

offlineRecommender.scala

ALSTrainer.scala


目的

训练出用户电影推荐矩阵,电影相似度矩阵。

用户电影推荐矩阵主要思路如下

1 UserId 和 MovieID 做笛卡尔积,产生(uid,mid)的元组

val userMovies = userRDD.cartesian(movieRDD)

2 通过模型预测(uid,mid)的元组。

3 将预测结果通过预测分值进行排序。

4 返回分值最大的 K 个电影,作为当前用户的推荐。

核心代码    

val preRatings = model.predict(userMovies)
    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

5 排序根据余弦相似度进行排序;

6 用均方根误差RMSE进行模型评估和参数调整   ;

均方根误差RMSE公式

核心代码 

 def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3)

由于电影的特征是稳定不变的,所以离线训练电影相似矩阵,节约实时计算时间。

核心代码   

val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)   
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

 完整代码

offlineRecommender.scala

package com.qh.offline

import org.apache.spark.SparkConf
import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.sql.SparkSession
import org.jblas.DoubleMatrix

//跟sparkMl 里面区分
case class MovieRating(uid: Int, mid: Int, score: Double, timestamp: Int)

case class MongoConfig(uri: String, db: String)

//基准推荐对象
case class Recommendation(mid: Int, score: Double)

//基于预测评分的用户推荐列表
case class UserResc(uid: Int, recs: Seq[Recommendation]) //类别, top10基准推荐对象

//基于LFM电影特征向量的电影相似度列表
case class MoiveResc(mid: Int, recs: Seq[Recommendation])

/**
 * 基于隐语义模型的协同过滤 推荐
 *
 * 用户电影推荐矩阵
 * 通过ALS训练出来的Model计算所有当前用户电影的推荐矩阵
 * 1. UserID和MovieId做笛卡尔积
 * 2. 通过模型预测uid,mid的元组
 * 3. 将预测结果通过预测分值进行排序
 * 4. 返回分值最大的K个电影,作为推荐
 * 生成的数据结构 存到UserRecs表中
 *
 * 电影相似度矩阵
 * 模型评估和参数选取
 *
 */
object offline {

  val MONGODB_RATING_COLLECTION = "Rating"
  val USER_RECS = "UserRecs"
  val MOVIE_RECS = "MovieRecs"
  val USER_MAX_RECOMMENDATION = 20

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    //加载数据
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => (rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache() //缓存,诗酒花在内存中
    //预处理
    val userRDD = ratingRDD.map(_._1).distinct()
    val movieRDD = ratingRDD.map(_._2).distinct()

    //训练LFM
    val trainData = ratingRDD.map(x => Rating(x._1, x._2, x._3))
    val (rank, iterations, lambda) = (200, 5, 0.1)
    val model = ALS.train(trainData, rank, iterations, lambda)

    //基于用户和电影的隐特征,计算预测评分,得到用户的推荐列表
    //    笛卡尔积空矩阵
    val userMovies = userRDD.cartesian(movieRDD)

    //预测
    val preRatings = model.predict(userMovies)

    val userRecs = preRatings
      .filter(_.rating > 0)
      .map(rating => (rating.user, (rating.product, rating.rating)))
      .groupByKey()
      .map {
        case (uid, recs) => UserResc(uid, recs.toList
          .sortWith(_._2 > _._2)
          .take(USER_MAX_RECOMMENDATION)
          .map(
            x => Recommendation(x._1, x._2)
          )
        )
      }
      .toDF()

    userRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", USER_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    //基于电影隐特征,计算相似度矩阵,得到电影的相似度列表
    val movieFeatures = model.productFeatures.map{
        case (mid, feature) => (mid, new DoubleMatrix(feature))
      }

    //电影与电影 笛卡尔积
     val movieRecs = movieFeatures.cartesian(movieFeatures)
       .filter{
         case (a,b ) => a._1 != b._1
       }
       .map{
         case(a, b) => {
           val simScore = this.consinSim(a._2, b._2)
           ( a._1, (b._1, simScore) )
         }
       }
       .filter(_._2._2 > 0.6)    //过滤出相似度
       .groupByKey()
       .map{
         case (mid, item) => MoiveResc(mid, item
           .toList
           .sortWith(_._2 > _._2)
           .map(x => Recommendation(x._1, x._2)))
       }
       .toDF()

    movieRecs.write
      .option("uri", mongoConfig.uri)
      .option("collection", MOVIE_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()

    spark.stop()
  }

  /*
  求解余弦相似度
   */
  def consinSim(matrixA: DoubleMatrix, matrixB: DoubleMatrix):Double = {
//    .dot 点乘  .norm2 L2范数,就是模长
    matrixA.dot(matrixB) / (matrixA.norm2() * matrixB.norm2())
  }
}

ALSTrainer.scala

package com.qh.offline

import breeze.numerics.sqrt
import com.qh.offline.offline.MONGODB_RATING_COLLECTION
import org.apache.spark.mllib.recommendation.{ALS, MatrixFactorizationModel, Rating}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
 * 调参
 * 通过ALS寻找lMF参数,打印到控制台输出
 */
object ALSTrainer {
  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender"
    )
    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("offline")
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    import spark.implicits._
    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))
    val ratingRDD = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_RATING_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRating]
      .rdd
      .map(rating => Rating(rating.uid, rating.mid, rating.score)) //去掉时间戳
      .cache()
    //   随机切分数据集=>训练集 测试集
    val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    val trainRDD = splits(0)
    val testRDD = splits(1)

    //    模型参数选择
    adjustALSParam(trainRDD, testRDD)

    spark.close()

  }

  def adjustALSParam(trainRDD: RDD[Rating], testRDD: RDD[Rating]): Unit = {
    val result = for (rank <- Array(50, 100, 200, 300); lambda <- Array(0.01, 0.1, 1 ))
      yield {
        val model = ALS.train(trainRDD, rank, 5, lambda)
        val rmse = getRMSE(model, testRDD)
        (rank, lambda, rmse)
      }
    println(result.minBy(_._3))
  }

  def getRMSE(model: MatrixFactorizationModel, data: RDD[Rating]): Double = {
    //      计算预测评分
    val userProducts = data.map(item => (item.user, item.product))
    val predictRating = model.predict(userProducts)
    //uid 和 mid 交集 做被减数
    val observed = data.map(item => ((item.user, item.product), item.rating)) //实际观测值和预测值
    val predict = predictRating.map(item => ((item.user, item.product), item.rating))

    sqrt(
      observed.join(predict).map {
        case ((uid, mid), (actual, pre)) => //内连接没有数据冗余,不需要groupby
          val err = actual - pre
          err * err
      }.mean()
    )
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/728180.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

js debugger的两种方式

第一种&#xff1a;在js代码中加上debugger class ReactiveEffect {constructor(fn, scheduler) {this.fn fn;this.scheduler scheduler;this.active true;this.deps [];console.log("创建 ReactiveEffect 对象");}run() {debugger; console.log("run…

Spring高手之路8——Spring Bean模块装配的艺术:@Import详解

文章目录 1. Spring手动装配基础2. Spring框架中的模块装配2.1 Import注解简单使用 3. Import模块装配的四种方式3.1 Import注解的功能介绍3.2 导入普通类与自定义注解的使用3.3 导入配置类的策略3.4 使用ImportSelector进行选择性装配3.5 使用ImportBeanDefinitionRegistrar进…

指针进阶详解

目录 指针基本概念 1.字符指针 2.指针数组 3.数组指针 对数组名的理解 小结 指针基本概念 在初阶指针中我们了解到一些指针的基本概念: 1.指针就是个变量&#xff0c;用来存放地址&#xff0c;地址唯一标识一块内存 2.指针的大小是固定的4/8个字节&#xff08;32位/64位平台&…

详解c++---哈希封装

目录标题 哈希桶的代码哈希桶的修改迭代器的实现const迭代器 哈希桶的代码 通过前面的学习大家应该能够很容易的写出下面的代码&#xff1a; #pragma once #include<iostream> #include<vector> using namespace std; template<class K,class V> struct Ha…

2023年互联网行业研究报告

第一章 行业概况 互联网行业是一个广泛的领域&#xff0c;包括所有利用互联网技术进行商业活动的企业和组织。这个行业的核心是互联网&#xff0c;一个全球性的网络&#xff0c;连接着数以亿计的计算设备和用户&#xff0c;使他们可以共享信息、资源和服务。 互联网行业包括网…

apache 安装配置 基础篇(-)

download 地址 apache下载 ApacheHaus是免安装的&#xff0c; 然后解压上面的文件&#xff0c;把里面 因apache 默认端口是80&#xff0c;如果这个端口被占用&#xff0c;apache服务是启动不起来的 netstat -ano|findstr 80 apache 修改端口号 创建apache服务 在apa…

ESP32-H2 固件烧录需满足的硬件环境整理

ESP32-H2 默认通过 UART0 &#xff08;即 TXD&#xff08;GPIO24&#xff09;和 RXD&#xff08;GPIO23&#xff09;&#xff09;下载固件。 Windows 下可使用 Flash download tool 工具来下载编译后的 bin 文件&#xff1b; 运行 flash_download_tool.exe 的文件 选择开发…

SkyEye处理器仿真系列:龙芯2K1000处理器

​SkyEye简介&#xff1a; 天目全数字实时仿真软件SkyEye作为基于可视化建模的硬件行为级仿真平台&#xff0c;能够为嵌入式软件提供虚拟化运行环境&#xff0c;开发、测试人员可在该虚拟运行环境上进行软件开发、软件测试和软件验证活动。小到芯片&#xff0c;大到系统&#…

win10 DBeaver (升级)下载、安装、彻底卸载

DBeaver &#xff08;升级&#xff09;下载及安装 一、DBeaver 下载二、安装三、DBeaver 的基本使用 - mysql连接四、DBeaver 彻底卸载 DBeaver是一种通用数据库管理工具&#xff0c;适用于需要以专业方式使用数据的每个人&#xff1b;适用于开发人员&#xff0c;数据库管理员&…

苹果笔买原装的还是随便买?便宜好用的手写笔推荐

自从ipad和其他的平板电脑都搭配上了电容笔以后&#xff0c;电容笔很好地取代了我们的手指&#xff0c;书写的效率就大大提升了&#xff0c;但由于苹果原装电容笔的价格不够人性化&#xff0c;一直高居不下给普通人带来了很大的负担&#xff0c;特别是对于学生们来说&#xff0…

QT DAY1

做一个窗口界面 #include "mainwindow.h" #include "ui_mainwindow.h"MainWindow::MainWindow(QWidget *parent) :QMainWindow(parent),ui(new Ui::MainWindow) {ui->setupUi(this);//设置窗口标题、图标this->setWindowTitle("Fly_Chat")…

6、Flume安装部署

按照采集通道规划&#xff0c;需在hadoop102&#xff0c;hadoop103&#xff0c;hadoop104三台节点分别部署一个Flume。可参照以下步骤先在hadoop102安装&#xff0c;然后再进行分发。 1、Flume入门 1.1、 Flume安装部署 1.1.1、 安装地址 &#xff08;1&#xff09; Flume官…

全网最牛,Web自动化测试Selenium八大元素定位实战(详细)

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 安装Selenium和下…

BFT 最前线|OpenAI暂时下线ChatGPT”浏览“功能;Stability AI CEO:5年内,人类程序员将不复存在

原创 | 文 BFT机器人 AI视界 TECHNOLOGY NEWS 01 Open AI暂时下线ChatGPT“浏览”功能 日前OpenAI方面宣布&#xff0c;面向ChatGPT Plus用户的"浏览"功能会在某些情况下出现故障&#xff0c;因此已于7月3日暂时禁用了这一功能。该功能是为了提高ChatGPT的搜索体验…

威胁检测和取证日志分析

在网络中&#xff0c;威胁是指可能影响其平稳运行的恶意元素。因此&#xff0c;对于任何希望搁置任何财政损失或生产力下降机会的组织来说&#xff0c;威胁检测都是必要的。为了先发制人地阻止来自各种来源的任何此类攻击&#xff0c;需要有效的威胁检测情报。 威胁检测可以是…

mmap函数

参考 https://blog.csdn.net/bhniunan/article/details/104105153void *mmap(void *addr, size_t len, int prot, int flags, int fd, off_t offset);参数 addr&#xff1a;出参&#xff0c; 指定映射的起始地址&#xff0c;通常设为NULL&#xff0c;由内核来分配 len&#x…

网络编程3——TCP Socket实现的客户端服务器通信完整代码(详细注释帮你快速理解)

文章目录 前言一、理论准备Socket套接字是什么TCP协议的特点 二、TCP 流套接字提供的APIServerSocket APISocket API 三、代码实现请求响应式 客户端服务器服务器客户端疑惑解答为什么服务器进程需要手动指定端口号而客户端进程不需要为什么客户端中的服务器IP与端口号是"…

Mysql架构篇--Mysql 主从同步方案

文章目录 前言一、传统的主从复制&#xff1a;1 原理&#xff1a;2 缺点&#xff1a; 二、半同步复制&#xff08;Semi-Synchronous Replication&#xff09;&#xff1a;三、组复制&#xff1a;1 原理&#xff1a;2 实现&#xff1a;2.1 myql 实例安装&#xff1a;2.1 myql 实…

量子近似优化算法(QAOA)入门(1):从量子绝热算法(QAA)角度的直观理解

文章目录 前言&#xff1a;量子计算的本质是测量一、基于量子逻辑电路的常用算法1.NISQ&#xff1a;Noisy Intermediate-Scale Quantum&#xff08;含噪声中等规模量子&#xff09; 二、量子绝热算法&#xff08;QAA&#xff1a;Quantum Adiabatic Algorithm&#xff09;1.QAA的…

【KingFusion】用KingFusion3.6创建一个客户端工程的步骤

哈喽&#xff0c;大家好&#xff0c;我是雷工&#xff01; 今天学习用KingFusion3.6创建一个客户端工程&#xff0c;以下记录创建过程。 客户端组件作为KingFusion3.6的数据展示功能模块&#xff0c;其主要功能是通过组态组态式配置以及丰富的图表元素、动画连接等多样的展示形…