【电影推荐系统】实时推荐

news2024/10/5 19:09:45

目录

原因

由于实时性,所以算法设计需要满足一下两点

算法设计

算法实现

算法公式

 完整代码


原因

用户对电影的偏好随着时间的推移总是会发生变化的。此时离线系统无法解决,需要实时推荐。

由于实时性,所以算法设计需要满足一下两点

1 根据用户最近几次评分进行推荐;

2 计算量要小,满足响应时间上面的实时;

算法设计

借鉴基于物品Item-CF的算法,将相似度和评分结合起来。

算法实现

1 当用户完成一次评分后,从Redis里取最近的k次评分的数据(mid,score);

2 备选电影(与评分电影的相似的电影)和评分过的电影,根据电影形似度矩阵计算相似度;

3 为提高精准率和召回率,设置偏移项,进行模型的鼓励与惩罚。

算法公式

其中:

表示用户 u 对电影 r 的评分;

sim(q,r)表示电影 q 与电影 r 的相似度,设定最小相似度为 0.6,当电影 q 和 电影 r 相似度低于 0.6 的阈值,则视为两者不相关并忽略;

sim_sum 表示 q 与 RK 中电影相似度大于最小阈值的个数;

incount 表示 RK 中与电影 q 相似的、且本身评分较高(>=3)的电影个数;

recount 表示 RK 中与电影 q 相似的、且本身评分较低(<3)的电影个数;

核心代码

 def computeMovieScore(candidateMovies: Array[Int],userRecentlyRatings: Array[(Int, Double)],simMovies:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]]): Array[(Int, Double)] = {
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()
    for(candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
      val simScore =getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )
      if(simScore > 0.7){
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
        if (userRecentlyRating._2 > 3) {
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1
        } else {
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    scores.groupBy(_._1).map{
      case (mid, scoreList) =>
        (mid,scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray
  }

 完整代码

package com.qh.streaming

import com.mongodb.casbah.commons.MongoDBObject
import com.mongodb.casbah.{MongoClient, MongoClientURI}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}
import redis.clients.jedis.Jedis

/**
 * 连接助手
 * 序列化
 */
object ConnHelper extends Serializable {
  lazy val jedis = new Jedis("hadoop100") //redis
  lazy val mongoClient = MongoClient(MongoClientURI("mongodb://hadoop100:27017/recommender"))
}

case class MongoConfig(uri: String, db: String)

// 定义一个基准推荐对象
case class Recommendation(mid: Int, score: Double)

// 定义基于预测评分的用户推荐列表
case class UserRecs(uid: Int, recs: Seq[Recommendation])

// 定义基于LFM电影特征向量的电影相似度列表
case class MovieRecs(mid: Int, recs: Seq[Recommendation])

object StreamingRecommender {

  val MAX_USER_RATINGS_NUM = 20
  val MAX_SIM_MOVIES_NUM = 20
  val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
  val MONGODB_RATING_COLLECTION = "Rating"
  val MONGODB_MOVIE_RECS_COLLECTION = "MovieRecs"

  def main(args: Array[String]): Unit = {
    val config = Map(
      "spark.cores" -> "local[*]",
      "mongo.uri" -> "mongodb://hadoop100:27017/recommender",
      "mongo.db" -> "recommender",
      "kafka.topic" -> "recommender"
    )

    val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("StreamingRecommender")
    //    spark2.x sparksession已封装除了 sparkstream上下文之外的 session
    val spark = SparkSession.builder().config(sparkConf).getOrCreate()

    // 拿到streaming context
    val sc = spark.sparkContext
    val ssc = new StreamingContext(sc, Seconds(2)) // batch duration 批处理时间 微批次

    import spark.implicits._

    implicit val mongoConfig = MongoConfig(config("mongo.uri"), config("mongo.db"))

    //加载电影相似度矩阵,广播
    val simMovieMatrix = spark.read
      .option("uri", mongoConfig.uri)
      .option("collection", MONGODB_MOVIE_RECS_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[MovieRecs]
      .rdd //为了查询相似度的过程更快,转换成map
      .map {
        movieRecs =>
        (movieRecs.mid, movieRecs.recs.map(x => (x.mid, x.score)).toMap)
      }.collectAsMap()

    val simMovieMatrixBroadCast = sc.broadcast(simMovieMatrix)

    // 定义kafka连接参数
    val kafkaParam = Map(
      "bootstrap.servers" -> "hadoop100:9092",
      "key.deserializer" -> classOf[StringDeserializer],  //反序列化
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "recommender",
      "auto.offset.reset" -> "latest"   //偏移量初始化设置
    )
    //kafka创建一个DStream
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
      ssc, LocationStrategies.PreferConsistent, ConsumerStrategies.Subscribe[String, String](Array(config("kafka.topic")), kafkaParam )
    )

    //把原始数据 UID|MID|SCORE|TIMESTAMP =>评分流
    val ratingStream = kafkaStream.map {
      msg =>
        val attr = msg.value().split("\\|")
        (attr(0).toInt, attr(1).toInt, attr(2).toDouble, attr(3).toInt)
    }

    //继续做流式处理,核心实时算法
    ratingStream.foreachRDD{
      rdds => rdds.foreach{
        case (uid,mid, score, timestamp) =>
          println("rating data coming ! >>>>>>>>>>>>>>>>")

          //1 从redis里获取当前用户最近的k次评分,保存成Array[(mid, score)]
          val userRecentlyRatings = getUserRecentlyRating( MAX_USER_RATINGS_NUM, uid, ConnHelper.jedis )
          //2 从相似度矩阵中获取当前电影最相似的N个电影,作为备选列表,Array[mid]
//              根据uid过滤,将该用户评分过的电影过滤掉
//              在mongodb里面进行过滤
          val candidateMovies = getTopSimMovies( MAX_SIM_MOVIES_NUM, mid, uid, simMovieMatrixBroadCast.value )
          //3 最每个备选电影计算推荐优先级,得到当前用户的实时推荐列表,Array[(mid, score)]
          val streamResc = computeMovieScore( candidateMovies, userRecentlyRatings, simMovieMatrixBroadCast.value )
          //4 把推荐数据保存到mongodb
          saveDataToMongoDB( uid, streamResc )

      }
    }

    ssc.start()



    println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> streaming started")
    ssc.awaitTermination()



  }
//  redis返回的是java类,需要引入转换类
  import scala.collection.JavaConversions._

  /**
   * 从 redis 中选取 该用户最近 num次的 对某个电影的评分
   * @param num  选取的最近评分记录的 数量
   * @param uid  该用户的 ID
   * @param jedis
   * @return 该用户的最近num次 的评分记录
   */
  def getUserRecentlyRating(num: Int, uid: Int, jedis: Jedis): Array[(Int, Double)] = {
    //从redis读数据,用户评分数据保存在 uid:UID 为Key的队列里, value MID:SCORE
    jedis.lrange("uid:" + uid, 0, num-1)
      .map{
        item =>
          val attr = item.split("\\:")
          ( attr(0).trim.toInt, attr(1).trim.toDouble )  //uid , score
      }
      .toArray
  }

  /**
   * 根据当前电影 从相似度矩阵 选取除了用户已看的电影 之外的 num个形似的电影列表
   * @param num   要选取相似的电影数量
   * @param mid   当前低钠盐的ID
   * @param uid   当前评分的用户ID
   * @param simMovies  相似度矩阵
   * @param mongoConfig
   * @return  过滤后的 备选电影的列表
   */
  def getTopSimMovies(num: Int, mid: Int, uid: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]])
                     (implicit mongoConfig: MongoConfig): Array[Int] = {
    //从相似度矩阵中取出所有相似的电影
    val allSimMovies: Array[(Int, Double)] = simMovies(mid).toArray
    //从mongodb中查询用户已看过的电影, 用于过滤
    val ratingExist = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
      .find( MongoDBObject("uid" -> uid) )
      .toArray
      .map{
        item => item.get("mid").toString.toInt
      }
    //过于该用户看过的电影(用户如果评过分数 就代表用户看过该电影)
    allSimMovies.filter( x => ! ratingExist.contains(x._1) )
      .sortWith(_._2>_._2)
      .take(num)
      .map( x => x._1 )
  }



  def computeMovieScore(candidateMovies: Array[Int],
                        userRecentlyRatings: Array[(Int, Double)],
                        simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Array[(Int, Double)] = {
    //定义一个ArrayBuffer.用户保存每一个备选电影的基础得分
    val scores = scala.collection.mutable.ArrayBuffer[(Int, Double)]()
    // 定义一个HashMap,保存每一个备选电影的增强减弱因子
    val increMap = scala.collection.mutable.HashMap[Int, Int]()
    val decreMap = scala.collection.mutable.HashMap[Int, Int]()

    for( candidateMovie <- candidateMovies; userRecentlyRating <- userRecentlyRatings ){
      //先拿到相似度(备选电影和 最近评分电影)
//      相当于从 相似度矩阵中 最近取 相应坐标 的值
//      可能 没有值 ,所以需要 设置默认值
      val simScore = getMoviesSimScore( candidateMovie, userRecentlyRating._1, simMovies )

//      相似度矩阵 在 离线计算中 存储时,筛选条件为 > 0.6
//      这里进一步 筛选
      if(simScore > 0.7){
        //* 打分 => 备选电影的基础推荐得分
//        ArrayBuffer +=  运算符重载 第一个()为 +=()的函数() 第二个() 为 元组
        scores += ( (candidateMovie, simScore * userRecentlyRating._2) )
//        统计 增强因子 和 减弱因子
        if (userRecentlyRating._2 > 3) {
          increMap(candidateMovie) = increMap.getOrDefault(candidateMovie, 0) + 1 //设置默认值
        } else {
          decreMap(candidateMovie) = decreMap.getOrDefault(candidateMovie, 0) + 1
        }
      }
    }
    //      除 总数
//     合并相同mid 的 groupby
    scores.groupBy(_._1).map{
//          groupBY => Map[Int, ArrayBuffer[(Int, Double)]
      case (mid, scoreList) =>
        ( mid, scoreList.map(_._2).sum / scoreList.length + log(increMap.getOrDefault(mid, 1)) - log(decreMap.getOrDefault(mid, 1)) )
    }.toArray

  }

  /**
   * 获取两个电影之间的相似度
   * @param mid1
   * @param mid2
   * @param simMovies
   * @return
   */
  def getMoviesSimScore(mid1: Int, mid2: Int, simMovies: scala.collection.Map[Int, scala.collection.immutable.Map[Int, Double]]): Double = {
    // 用模式匹配判断是否为 空值
    simMovies.get(mid1) match {
      case Some(sims1) => sims1.get(mid2) match {
        case Some(sims2) => sims2
        case None => 0.0
      }
      case None => 0.0
    }
  }

  /**
   * 自定义 log 将 底数作为超参数
   * 超参数N 默认为 10
   */
  def log(x: Int):Double ={
    val N = 10
//    对数换底公式
    math.log(x) / math.log(N)
  }

//覆盖 更新
  def saveDataToMongoDB(uid: Int, streamRecs: Array[(Int, Double)])(implicit mongoConfig: MongoConfig): Unit ={
//    连接表
val streamRecsCollection = ConnHelper.mongoClient(mongoConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
//    如果有 先删除
    streamRecsCollection.findAndRemove( MongoDBObject("uid" -> uid) )
//    再更新
    streamRecsCollection.insert(MongoDBObject("uid" -> uid,
      "recs" -> streamRecs.map(x => MongoDBObject("mid" -> x._1, "score" -> x._2))))
  }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/725654.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

GIS杂记(二):Arcgis对采样点进行裁剪,获取指定区域内的采样点

有时候需要对栅格数据进行采样处理&#xff0c;如果采样点过多则会使得采样时间过长&#xff0c;今天在进行数据采样时&#xff0c;使用了1km*1km的渔网建立的采样点&#xff0c;大概有1百万个点&#xff0c;程序运行时间大概4个小时&#xff0c;但是其中有绝大部分数据都是空值…

微信小程序使用animation.css

animation.css是一款纯css动画库&#xff0c;其中提供了丰富的动画效果 我们直接下载animation.css&#xff0c;即可使用其中的样式 其官网为&#xff1a;Animate.css | A cross-browser library of CSS animations. 1.下载 使用npm下载animation.css&#xff1a; npm inst…

LVS简介及LVS-NAT负载均衡群集的搭建

目录 一、LVS群集简介 1.群集的含义和应用场景 2.性能扩展方式 3.群集的分类 负载均衡&#xff08;LB&#xff09; 高可用&#xff08;HA&#xff09; 高性能运算&#xff08;HPC&#xff09; 二、LVS负载均衡群集简介及搭建 1.负载均衡群集架构 2.三种工作模式 3.启…

基于servlet的简单登录界面

前端登录发起请求 1.安装axios axios 是一个 HTTP 的网络请求库 安装 npm install axios &#xff08;脚手架中&#xff09; 在 main.js 中配置 axios //导入网络请求库 import axios from axios; //设置访问后台服务器地址&#xff1a;ip&#xff0c;端口和项目名字&#xff0…

基于Python Django实现KNN协同电影推荐系统

系统说明 基于Python Django实现KNN协同电影推荐系统&#xff0c;有用户端和管理后端&#xff0c;完整可在任何环境运行。 KNN&#xff08;K-Nearest Neighbor&#xff09;算法是机器学习算法中最基础、最简单的算法之一。它既能用于分类&#xff0c;也能用于回归。KNN通过测量…

使用前端JS上传文件到阿里云的OSS服务器,PHP生成STS临时访问凭证

官方教程地址&#xff1a;https://help.aliyun.com/document_detail/383950.html?spma2c4g.383952.0.0 这篇文章主要是指出官方教程没有说明的地方 后端代码 并非是完全完全不需要后端的参与。需要后端生成凭证&#xff0c;防止秘钥泄露 这里是官方的说明文档&#xff1a;使…

obj文件解析及用meshlab查看

举例 它以txt打开后如下所示 v -0.3 0 0.3 v 0.4 0 0 v -0.2 0.3 -0.1 v 0 0.4 0 # 4 verticesg head s 1 f 1/1/1 2/1/1 4/1/1 f 1/1/1 2/1/1 3/1/1 f 2/1/1 4/1/1 3/1/1 f 1/1/1 4/1/1 3/1/1一般而言obj文件以txt格式打开后包含如下片段 v -0.3 0 0.3 vt 0.625 0.458 0.00…

FPGA纯verilog实现UDP协议栈,GMII接口驱动88E1111,提供工程源码和技术支持

目录 1、前言2、我这里已有的UDP方案3、该UDP协议栈性能4、详细设计方案网络PHYGMII AXIS接口模块AXIS FIFOUDP协议栈 5、vivado工程1-->B50610 工程6、上板调试验证并演示准备工作查看ARPUDP数据回环测试 7、福利&#xff1a;工程代码的获取 1、前言 目前网上的fpga实现ud…

密码学学习笔记(七):Modular arithmetic - 模算数

简介 模算术是整数的一种算术结构&#xff0c;其中数字在达到特定值时“环绕”。模运算使我们能够简单地生成群、环和域&#xff0c;这是大多数现代公钥密码系统的基本构造部分。其中数字超过一定值后&#xff08;称为模&#xff09;后会“卷回”到较小的数值。 模算数常见的…

5.2ORB-SLAM3之回环矫正

1.简介 在上一章《回环检测之检测是否存共视区域》已经介绍了检测共视区域的部分&#xff0c;接下来就是对共视区域进行回环矫正或者地图融合。 回环矫正和之前的ORBSLAM系列一致&#xff0c;就是消除因为长时间运动产生的位姿累计误差和尺度漂移。在ORBSLAM3中新增了多地图系…

idea集成maven-mvnd

maven-mvnd是什么&#xff1f; 参考文档&#xff1a; Maven加强版 — mvnd的使用测试 - 知乎 1.下载mvnd安装包 Releases apache/maven-mvnd GitHub 2.修改配置文件&#xff1a;安装包中的conf目录下的mvnd.properties文件 配置maven settings的地址&#xff1a; 注意&am…

MySQL配置主从备份

文章目录 1.什么是主从备份2. 原理3.配置主服务器4.配置从服务器4.1进入数据库&#xff0c;准备建立连接4.2开启 slave 连接&#xff0c;主备机连接成功&#xff0c;数据开始同步4.3查看有关从属服务器线程的关键参数的信息 1.什么是主从备份 主从复制简单来说就是主库把增删改…

环境搭载vscode

Windows 10 下 VS Code 配置 C 开发环境&#xff08;MinGW&#xff09; 读书读傻了哟 配置 C/C 环境   主要是配置launch.json、tasks.json这两个文件&#xff08;当然&#xff0c;还有别的.json文件&#xff0c;可有可无&#xff09;。这两个文件位于.vscode文件夹下&#…

mysql--第一天基础操作

1.创建数据库 2.查询创建数据的语句 3.使用数据库&#xff0c;查询当前默认的数据库以及使用的编码方式校验规则 4.删除数据库 5.在一张表中定义多个字段&#xff0c;要使用今天提到的所有的数据类型&#xff08;数字&#xff0c;文本&#xff0c;日期&#xff09; 查看表结构

产品方案设计高效的4大注意事项

做产品方案时&#xff0c;我们容易遭遇&#xff1a;未澄清需求、未梳理业务方案、缺少思考过程以及缺少对比方案等误区&#xff0c;往往会造成产品方案并不能完全解决用户问题&#xff0c;项目后期容易遇到需求变更等风险。 因此如何如何高效设计产品方案&#xff1f;就显得尤为…

SpringCloud入门实战(十二)-Sleuth+Zipkin分布式请求链路跟踪详解

&#x1f4dd; 学技术、更要掌握学习的方法&#xff0c;一起学习&#xff0c;让进步发生 &#x1f469;&#x1f3fb; 作者&#xff1a;一只IT攻城狮 &#xff0c;关注我&#xff0c;不迷路 。 &#x1f490;学习建议&#xff1a;1、养成习惯&#xff0c;学习java的任何一个技术…

Django的数据库配置、生成(创建)过程、写入数据、查看数据的学习过程记录

目录 01-配置数据库信息02-安装Python的MySQL数据库驱动程序 mysqlclient03-安装Mysql&#xff0c;并启动Mysql04-定义Django的数据库模型(定义数据表-编写models.py文件)05-按照数据的配置生成数据库(执行迁移命令)05-01-生成迁移执行文件05-02-执行数据库模型迁移 06-查看数据…

Vue.js Js引入相关

Vue.js vue.js 新增了一些语法,有一些旧的模组并没有使用"先进"的export和import语法 即 es语法进行模块化。 <script></script>但 editor.md 真的很好用. 但很抱歉,它在vue中无法使用 es6 进行导入。 所以需要使用传统的方式进行导入。 很多人会把js…

【ARM Coresight 系列文章 2.1 - ARM Coresight 组件介绍】

文章目录 1.1 Coresight 组件介绍1.1.1 Trace sources1.1.2 Trace Sinks1.1.2 Trace links 1.1 Coresight 组件介绍 图 1-1 1.1.1 Trace sources 什么是 Trace source? 在ARM Coresight技术中&#xff0c;Trace Source是指处理器中的一个组件&#xff0c;用于产生和发送跟踪数…

全球十大看黄金走势免费app软件最新名单推荐(综合版)

选择黄金走势免费app软件时&#xff0c;有几个关键因素需要考虑。首先&#xff0c;要选择可靠的软件平台&#xff0c;确保其在金融市场上拥有良好的声誉和高度的信任度。此外&#xff0c;软件应提供及时准确的市场数据&#xff0c;包括实时行情、交易量和技术指标等&#xff0c…