使用布隆过滤器的flink十亿级数据实时过滤实践一

news2025/2/12 22:09:31

1项目背景

1.1 需求

实时推荐项目需求如下:根据用户实时行为(如关注,播放,收藏)推荐该UP主(关注的up主,播放视频发布up主,收藏up主)或其相似UP主的作品,UP主及相似UP主下的作品是提前离线召回排序计算好了,存在redis中的数据。
由于是实时触发,有些作者的离线数据是没有生成的(如新的up主),实时推荐时需要将这部分用户下实时行为下的UP主给过滤掉。由于底层有20亿数据(用户+UP主为唯一id生成的数据)需要过滤,因此考虑使用布隆过滤器进行过滤。

1.2 布隆过滤器实时过滤实现思路

有以下两不需要考虑:
1 怎样构建布隆过滤器,即创建更新存储布隆过滤器
2 当布隆过滤器实时更新时,flink里的布隆过滤器怎样随之实时更新
(由于推荐数据是每天更新的,因此布隆过滤器数据也是每天更新,也就要求flink中使用的布隆过滤器也要实时更新)

2 布隆过滤器存入redis

此部分针对上面的问题1 实现
有以下三步:
1 使用com.google.common.hash.BloomFilter 构建布隆过滤器,写入数据
2 将布隆过滤器存入redis
3 从redis读取布隆过滤器数据,转换为com.google.common.hash.BloomFilter 过滤器进行使用


import com.google.common.hash.{BloomFilter, Funnels}
import jutil.JedisClient
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import java.nio.charset.Charset


object bloomfilterByGuavaTest {
  val expectedSize=200000000
  val falsePositiveRate=0.01
  def main(args: Array[String]): Unit = {

   // 1 创建布隆过滤器
    val bloomKey="foryou_bloom_filter"
    val bloomFilter:BloomFilter[String] = BloomFilter.create[String](Funnels.stringFunnel(Charset.forName("UTF-8")), expectedSize, falsePositiveRate)
    //1.1 写入数据
    for(i <- 1 to 10) {
      bloomFilter.put(s"num${i}")
       }

    // 2 redis连接并写入序列化的布隆过滤器
    // 2.1 构建redis连接器
    val redisClient = new JedisClient(Conf.redisHosts)
    // 2.2 布隆过滤器写入redis
    val outputStream = new ByteArrayOutputStream()
    bloomFilter.writeTo(outputStream)
    val bitSetByteArray = outputStream.toByteArray
    redisClient.del(bloomKey)
    redisClient.set(bloomKey, bitSetByteArray)
    redisClient.expireBySecond(bloomKey,7*24*60*60)

//   3 从redis读取布隆过滤器,并进行过滤
    val bitSetByteArray2:Array[Byte] = redisClient.get(bloomKey).asInstanceOf[Array[Byte]]
    val bf = BloomFilter.readFrom[String](new ByteArrayInputStream(bitSetByteArray2), Funnels.stringFunnel(Charset.forName("UTF-8")))

    println("是否包含num1",bf.mightContain("num1"))
    println("是否包含num8",bf.mightContain("num8"))
    println("是否包含num13",bf.mightContain("num13"))
    redisClient.close()

  }

打印结果如下
在这里插入图片描述

3 使用flink 的BroadcastProcessFunction实时更新布隆过滤器

此部分针对上面的问题2实现
思路:
1 构建一个流实定时读取redis中的布隆过滤器(有坑,需要注意Kryo序列化失败问题)
2 将业务流和布隆过滤器流使用connect相结合
3 自定义实现BroadcastProcessFunction方法
基于篇幅原因,此部分写于下篇播客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/639706.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

react---生命周期

目录 1.新旧生命周期对比 2.常用的3个钩子函数 3.生命周期&#xff08;旧&#xff09; 4.生命周期&#xff08;新&#xff09; React 中为我们提供了一些生命周期钩子函数&#xff0c;让我们能在 React 执行的重要阶段&#xff0c;在钩子函数中做一些事情。 1.新旧生命周期…

数据安全--17--数据安全管理之数据传输

本博客地址&#xff1a;https://security.blog.csdn.net/article/details/131061729 一、数据传输概述 数据传输有两个主体&#xff0c;一个是数据发送方&#xff0c;另一个是数据接收方。数据在通过不可信或者较低安全性的网络进行传输时&#xff0c;容易发生数据被窃取、伪…

Mybatis源码学习之全局配置文件和映射文件解析

全局配置文件和映射文件解析 全局配置文件解析 public static void main(String[] args) throws IOException {// 读取配置文件InputStream is Resources.getResourceAsStream("org/apache/ibatis/builder/MapperConfig1.xml");// 创建SqlSessionFactory工厂SqlSes…

JDK11 官网下载(提供网盘下载资源)

目录 引言一、Oracle&#xff08;甲骨文&#xff09;二、JDK11下载1.JDK11下载入口2.JDK版本说明3.JDK11下载前说明4.JDK11下载 三、网盘下载1.资源提供说明2.资源列表清单&#xff08;持续更新中...&#xff09;3.获取方式 总结 引言 我们要学习 Java 语言去开发 Java 程序&a…

k8s 基本架构

k8s 中支持的 node 数 和 pod 数 k8s 也是逐步发展过来的&#xff0c;来看看以前和现在支持的 node 数 和 pod 数对比 node 即 节点 &#xff0c; 早期的 k8s 版本能够支持 100 台节点&#xff0c;现在 k8s 可以支持到 2000 台了 pod 数&#xff0c;早期的版本可以支持 1000 …

Android 自定义View 之 Dialog弹窗

Dialog弹窗 前言正文一、弹窗视图帮助类二、弹窗控制类三、监听接口四、样式五、简易弹窗六、常规使用七、简易使用八、源码 前言 在日常开发中用到弹窗是比较多的&#xff0c;常用于提示作用&#xff0c;比如错误操作提示&#xff0c;余额不足提示&#xff0c;退出登录提示等&…

linux 内核版本和发行版本

当要明确自己的Linux系统的版本号时&#xff0c;大多数情况是用命令确定Linux内核版本的。不过这个还是要与CentOS的版本号&#xff08;就是你使用的Linux系统的发行版本&#xff09;区分开来&#xff0c;这两个不是一个东西。 一、发行版本号 比如当时安装CentOS时&#x…

【Python】集合 set ① ( 集合定义 | 集合特点 | 代码示例 - 集合定义 )

文章目录 一、集合特点二、集合定义三、代码示例 - 集合定义 一、集合特点 在之前 的博客中 介绍了 列表 / 元组 / 字符串 数据容器 , 列表 支持 定义后 , 增加元素 / 修改元素 / 删除元素 , 并且 列表中可以存储 重复 / 有序 的元素 ;元组 定义后 不能 进行 增加元素 / 修改元…

(转载)有导师学习神经网络的回归拟合(matlab实现)

神经网络的学习规则又称神经网络的训练算法&#xff0c;用来计算更新神经网络的权值和阈值。学习规则有两大类别&#xff1a;有导师学习和无导师学习。在有导师学习中&#xff0c;需要为学习规则提供一系列正确的网络输入/输出对(即训练样本),当网络输入时&#xff0c;将网络输…

对于Promise的理解

1.什么是回调地狱 多层异步函数的嵌套叫做回调地狱 代码1&#xff1a; setTimeout(() > {console.log(吃饭);setTimeout(() > {console.log(睡觉);setTimeout(() > {console.log(打豆豆);}, 1000);}, 2000);}, 3000); 代码2: 通过id获取用户名,通过用户名获取邮箱…

如何自动识别快递单号和批量查询的方法

最近有很多朋友问我&#xff0c;有没有办法批量查询快递单号&#xff0c;查询该快递单号的所有物流发货信息&#xff1f;今天小编就来分享一个实用的查询技巧&#xff0c;教大家轻松查询多个快递单号&#xff0c;还可以一键保存查询数据&#xff0c;一起来看看吧。 首先今天我们…

PoseNet深度网络进行6D位姿估计的训练,python3实现

0.相关github网址 原版github代码-caffe实现tensorflow实现&#xff0c;相关版本较低&#xff0c;python2&#xff0c;本文根据此代码迁移到python3上面。pytorch实现&#xff0c;但将骨干模型从goglenet改成了resnet&#xff0c;实验效果得到提升&#xff0c;但没公布预训练权…

快递单号一键批量查询的具体操作方法和步骤

最近做电商的朋友对一个话题很感兴趣&#xff1a;如何批量查询快递单号&#xff1f;今天&#xff0c;小编给你安利一款软件&#xff1a;固乔快递查询助手&#xff0c;支持大量快递单号的批量查询。下面我们来看看批量查询的具体操作方法。 小伙伴们需要先在“固乔科技”的官网上…

session与cookie的来源与区别

目录 1.什么是HTTP&#xff1f; 2.cookie 3.session 4.cookie和session的区别 如果你对于session 和cookie 只有一点模糊理解&#xff0c;那么此文章能帮你更深入理解session和cookie &#xff0c;依旧和上篇文章一样&#xff0c;我们采用问题的方式来一步步探索&#xff0…

第七章 测试

文章目录 第七章 测试7.1 编码7.1.1 选择程序设计语言1. 计算机程序设计语言基本上可以分为汇编语言和高级语言2. 从应用特点看&#xff0c;高级语言可分为基础语言、结构化语言、专用语言 7.1.2 编码风格 7.2 软件测试基础7.2.1 软件测试的目标7.2.2 软件测试准则7.2.3 测试方…

JVM基础面试题及原理讲解

基本问题 介绍下 Java 内存区域&#xff08;运行时数据区&#xff09;Java 对象的创建过程&#xff08;五步&#xff0c;建议能默写出来并且要知道每一步虚拟机做了什么&#xff09;对象的访问定位的两种方式&#xff08;句柄和直接指针两种方式&#xff09; 拓展问题 Strin…

Flutter Widget 生命周期 key探究

Widget 在Flutter中&#xff0c;一切皆是Widget&#xff08;组件&#xff09;&#xff0c;Widget的功能是“描述一个UI元素的配置数据”&#xff0c;它就是说&#xff0c;Widget其实并不是表示最终绘制在设备屏幕上的显示元素&#xff0c;它只是描述显示元素的一个配置数据。 …

分库分表 21 招

&#xff08;一&#xff09;好好的系统&#xff0c;为什么要分库分表&#xff1f; 咱们先介绍下在分库分表架构实施过程中&#xff0c;会接触到的一些通用概念&#xff0c;了解这些概念能够帮助理解市面上其他的分库分表工具&#xff0c;尽管它们的实现方法可能存在差异&#…

自动化测试框架seldom

创建项目 | seldom文档 这个框架还是不错的&#xff0c;一直在优化&#xff0c;测试框架里的功能这里都有了。 seldom继承unittest单元测试框架&#xff0c;可以用来做UI和接口自动化项目。 安装 pip install seldom 创建项目 > seldom -P mypro 创建测试用例 # tes…

第8章 维护

文章目录 第8章 维护一、软件交付使用的工作二、软件交付使用的方式1) 直接方式2) 并行方式3) 逐步方式 8.1 软件维护的定义1、软件维护的定义2、软件维护的原因3、软件维护的类型1、改正性维护2、适应性维护3、完善性维护4、预防性维护 8.2 软件维护的特点8.2.1结构化维护和非…