【数据去重】海量数据实时去重方案

news2024/9/22 18:29:22

文章目录

  • Prologue
  • 布隆过滤器去重
    • 什么是布隆过滤器
    • 实现的核心思想
    • 怎么理解
  • 内嵌RocksDB状态后端去重
  • 引入外部K-V存储去重

Prologue

数据去重(data deduplication)是我们大数据攻城狮司空见惯的问题了。除了统计UV等传统用法之外,去重的意义更在于消除不可靠数据源产生的脏数据——即重复上报数据或重复投递数据的影响,使流式计算产生的结果更加准确。本文以Flink处理日均亿级别及以上的日志数据为背景,讨论除了朴素方法(HashSet)之外的三种实时去重方案,即:布隆过滤器、RocksDB状态后端、外部存储。

布隆过滤器去重

什么是布隆过滤器

布隆过滤器(Bloom Filter)是一种空间效率非常高的随机数据结构,它利用位数组(BitSet)表示一个集合,并通过一定数量的哈希函数将元素映射为位数组中的位置,用于检查一个元素是否属于这个集合。

实现的核心思想

对于一个元素,通过多个哈希函数生成多个哈希值,将对应的位在位数组中设为 1,若多个哈希值对应的位都为 1,则认为该元素可能在集合中;若至少有一个哈希值对应的位为 0,则该元素一定不在集合中。这种方法可以在较小的空间中实现高效的查找,但可能存在误判率(false positive)。

怎么理解

一个典型的布隆过滤器包含三个参数: 位数组的大小(即存储元素的个数); 哈希函数的个数; 填充因子(即误判率),即将元素数量与位数组大小的比值。
在这里插入图片描述

以之前用过的子订单日志模型为例,假设上游数据源产生的消息为<Integer, Long, String>三元组,三个元素分别代表站点ID、子订单ID和数据载荷。由于数据源只能保证at least once语义(例如未开启correlation ID机制的RabbitMQ队列),会重复投递子订单数据,导致下游各统计结果偏高。现引入Guava的BloomFilter来去重,直接上代码说事。

 // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(0)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);
    private static final int BF_CARDINAL_THRESHOLD = 1000000;
    private static final double BF_FALSE_POSITIVE_RATE = 0.01;

    private volatile BloomFilter<Long> subOrderFilter;

    @Override
    public void open(Configuration parameters) throws Exception {
      long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Created Guava BloomFilter, time cost: " + (e - s));
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
      long subOrderId = value.f1;
      if (!subOrderFilter.mightContain(subOrderId)) {
        subOrderFilter.put(subOrderId);
        out.collect(value.f2);
      }
      ctx.timerService().registerProcessingTimeTimer(UnixTimeUtil.tomorrowZeroTimestampMs(System.currentTimeMillis(), 8) + 1);
    }

    @Override
    public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
      long s = System.currentTimeMillis();
      subOrderFilter = BloomFilter.create(Funnels.longFunnel(), BF_CARDINAL_THRESHOLD, BF_FALSE_POSITIVE_RATE);
      long e = System.currentTimeMillis();
      LOGGER.info("Timer triggered & resetted Guava BloomFilter, time cost: " + (e - s));
    }

    @Override
    public void close() throws Exception {
      subOrderFilter = null;
    }
  }

  // 根据当前时间戳获取第二天0时0分0秒的时间戳
  public static long tomorrowZeroTimestampMs(long now, int timeZone) {
    return now - (now + timeZone * 3600000) % 86400000 + 86400000;
  }

这里先按照站点ID为key分组,然后在每个分组内创建存储子订单ID的布隆过滤器。布隆过滤器的期望最大数据量应该按每天产生子订单最多的那个站点来设置,这里设为100万,并且可容忍的误判率为1%。根据上面科普文中的讲解,单个布隆过滤器需要8个哈希函数,其位图占用内存约114MB,压力不大。

每当一条数据进入时,调用BloomFilter.mightContain()方法判断对应的子订单ID是否已出现过。当没出现过时,调用put()方法将其插入BloomFilter,并交给Collector输出。

另外,通过注册第二天凌晨0时0分0秒的processing time计时器,就可以在onTimer()方法内重置布隆过滤器,开始新一天的去重。

(吐槽一句,Guava的BloomFilter竟然没有提供清零的方法,有点诡异)

内嵌RocksDB状态后端去重

布隆过滤器虽然香,但是它不能做到100%精确。在必须保证万无一失的场合,我们可以选择Flink自带的RocksDB状态后端,这样不需要依赖其他的组件。之前已经讲过,RocksDB本身是一个类似于HBase的嵌入式K-V数据库,并且它的本地性比较好,用它维护一个较大的状态集合并不是什么难事。

首先我们要开启RocksDB状态后端(平常在生产环境中,也建议总是使用它),并配置好相应的参数。这些参数同样可以在flink-conf.yaml里写入。

RocksDBStateBackend rocksDBStateBackend = new RocksDBStateBackend(Consts.STATE_BACKEND_PATH, true);
rocksDBStateBackend.setPredefinedOptions(PredefinedOptions.FLASH_SSD_OPTIMIZED);
rocksDBStateBackend.setNumberOfTransferingThreads(2);
rocksDBStateBackend.enableTtlCompactionFilter();

env.setStateBackend(rocksDBStateBackend);
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.enableCheckpointing(5 * 60 * 1000);

RocksDB的调优是个很复杂的话题,详情参见官方提供的tuning guide,以及Flink配置中与RocksDB相关的参数,今后会挑时间重点分析一下RocksDB存储大状态时的调优方法。好在Flink已经为我们提供了一些预调优的参数,即PredefinedOptions,请务必根据服务器的实际情况选择。我们的Flink集群统一采用SSD做存储,故选择的是PredefinedOptions.FLASH_SSD_OPTIMIZED。

另外,由于状态空间不小,打开增量检查点以及设定多线程读写RocksDB,可以提高checkpointing效率,检查点周期也不能太短。还有,为了避免状态无限增长下去,我们仍然得定期清理它(即如同上节中布隆过滤器的复位)。当然,除了自己注册定时器之外,我们也可以利用Flink提供的状态TTL机制,并打开RocksDB状态后端的TTL compaction filter,让它们在RocksDB后台执行compaction操作时自动删除。特别注意,状态TTL仅对时间特征为处理时间时生效,对事件时间是无效的。

接下来写具体的业务代码,以上节的<站点ID, 子订单ID, 消息载荷>三元组为例,有两种可实现的思路:

仍然按站点ID分组,用存储子订单ID的MapState(当做Set来使用)保存状态;
直接按子订单ID分组,用单值的ValueState保存状态。
显然,如果我们要用状态TTL控制过期的话,第二种思路更好,因为粒度更细。代码如下。

  // dimensionedStream是个DataStream<Tuple3<Integer, Long, String>>
  DataStream<String> dedupStream = dimensionedStream
    .keyBy(1)
    .process(new SubOrderDeduplicateProcessFunc(), TypeInformation.of(String.class))
    .name("process_sub_order_dedup").uid("process_sub_order_dedup");

  // 去重用的ProcessFunction
  public static final class SubOrderDeduplicateProcessFunc
    extends KeyedProcessFunction<Tuple, Tuple3<Integer, Long, String>, String> {
    private static final long serialVersionUID = 1L;
    private static final Logger LOGGER = LoggerFactory.getLogger(SubOrderDeduplicateProcessFunc.class);

    private ValueState<Boolean> existState;

    @Override
    public void open(Configuration parameters) throws Exception {
      StateTtlConfig stateTtlConfig = StateTtlConfig.newBuilder(Time.days(1))
        .setStateVisibility(StateVisibility.NeverReturnExpired)
        .setUpdateType(UpdateType.OnCreateAndWrite)
        .cleanupInRocksdbCompactFilter(10000)
        .build();

      ValueStateDescriptor<Boolean> existStateDesc = new ValueStateDescriptor<>(
        "suborder-dedup-state",
        Boolean.class
      );
      existStateDesc.enableTimeToLive(stateTtlConfig);

      existState = this.getRuntimeContext().getState(existStateDesc);
    }

    @Override
    public void processElement(Tuple3<Integer, Long, String> value, Context ctx, Collector<String> out) throws Exception {
      if (existState.value() == null) {
        existState.update(true);
        out.collect(value.f2);
      }
    }
  }

上述代码中设定了状态TTL的相关参数:

  • 过期时间设为1天;
  • 在状态值被创建和被更新时重设TTL;
  • 已经过期的数据不能再被访问到;
  • 在每处理10000条状态记录之后,更新检测过期的时间戳。这个参数要小心设定,更新太频繁会降低compaction的性能,更新过慢会使得compaction不及时,状态空间膨胀。

在实际处理数据时,如果数据的key(即子订单ID)对应的状态不存在,说明它没有出现过,可以更新状态并输出。反之,说明它已经出现过了,直接丢弃,so easy。

最后还需要注意一点,若数据的key占用的空间比较大(如长度可能会很长的字符串类型),也会造成状态膨胀。我们可以将它hash成整型再存储,这样每个key就最多只占用8个字节了。不过任何哈希算法都无法保证不产生冲突,所以还是得根据业务场景自行决定。

引入外部K-V存储去重

如果既不想用布隆过滤器,也不想在Flink作业内维护巨大的状态,就只能用折衷方案了:利用外部K-V数据库(Redis、HBase之类)存储需要去重的键。由于外部存储对内存和磁盘占用同样敏感,所以也得设定相应的TTL,以及对大的键进行压缩。另外,外部K-V存储毕竟是独立于Flink框架之外的,一旦作业出现问题重启,外部存储是不会与作业的checkpoint同步恢复到一致的状态的,也就是说结果仍然会出现偏差,需要注意。

鉴于这种方案对第三方组件有强依赖,要关心的东西太多,所以一般情况下是不用的,我们也没有实操过,所以抱歉没有代码了。

The End
如果有其他更高效的解决方法,欢迎批评指正哈。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/465177.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信号完整性分析基础知识之传输线和反射(四):不连续点和端接

每当信号遇到阻抗变化&#xff0c;就会出现反射现象&#xff0c;反射对信号质量影响很大。信号完整性工作最重要的部分之一就是预测不连续点对信号的影响&#xff0c;以及设计工程可接受的备选方案。 尽管电路板在设计上是可控阻抗互连&#xff0c;但是信号在以下结构中仍然会遇…

如何选择最佳的实时聊天软件

在客户服务和支持领域&#xff0c;实时聊天正在改变游戏规则已不是什么秘密。从推动销售到提升客户体验和提高保留率&#xff0c;实时聊天已成为与客户互动和支持的一种全新的方式。客户和支持专业人员都注意到了这一点。 研究发现&#xff0c;高达41%的消费者更喜欢实时聊天&…

李宏毅 深度学习

目录 深度学习与自然语言处理 | 斯坦福CS224n 课程带学与全套笔记解读&#xff08;NLP通关指南完结&#xff09;pytorch快速入门csdn快速入门OS包PIL包Opencv包Dataset类Tensorboard的使用torchvision.transforms 的使用torchvision中数据集的使用DataLoader的使用(torch.util…

【C++】:想知道如何实现互译字典吗?来看二叉搜索树

二叉搜索树好文&#xff01; 文章目录 前言一、实现搜索二叉树二、二叉搜索树的应用 1.K模型2.KV模型总结 前言 二叉搜索树概念 &#xff1a; 二叉搜索树又称二叉排序树&#xff0c;它或者是一棵空树 &#xff0c;或者是具有以下性质的二叉树 : 若它的左子树不为空&#xff0…

Orcale中OCILogon和OCIServerAttach登录方式的区别分析

参考文档《Oracle Call Interface Programmers Guide》 在Orcale和DM数据库提供的API中&#xff0c;通过OCI方式接口连接数据库的方法有多个&#xff0c;这里只讨论OCILogon和OCIServerAttach的比较。 1、官方描述 根据文档里的描述&#xff1a; OCILogon():This function is…

DJ4-5 路由和选路

目录 一、路由与转发的相互作用 二、路由的基本概念 1. 默认路由器 2. 路由算法 三、网络的抽象模型 1. 节点图 2. 费用 Cost 四、路由算法分类 1. 静态路由算法 2. 动态路由算法 3. 全局路由算法 4. 分布式路由算法 一、路由与转发的相互作用 二、路由的基本概念 …

美团赴抖音之“约”:让本地生活补贴大战来得更猛烈些?

面对抖音在本地生活领域的强势挑战&#xff0c;美团似乎准备好了正面迎战。 近期&#xff0c;美团动作频频。最开始&#xff0c;美团在美团App美食页面下的“特价团购”打出“限时补贴&#xff0c;全网低价”的口号。对此&#xff0c;一位行业人士分析称&#xff0c;“之前美团…

java commons-io 工具类的使用

commons-io是第三方程序员编写的工具类&#xff0c;并不是java本身带的方法。是在java提供的工具类基础上&#xff0c;开发的工具类。简化了代码的用法&#xff0c;可以提升开发效率。 用法 1.下载jar包 2.在程序中新建lib目录&#xff0c;把jar包放进去 3.在jar包上右键&…

learn_C_deep_6 (布尔类型、布尔与“零值“、浮点型与“零值“、指针与“零值“的比较)

目录 语句和表达式的概念 if语句的多种语法结构 注释的便捷方法&#xff08;环境vs&#xff09; if语句执行的过程 逻辑与&& 逻辑或|| 运算关系的顺序 else的匹配原则 C语言有没有布尔类型 C99标准 sizeof(bool)的值为多少&#xff1f; _Bool原码 BOOL…

音视频八股文(6)-- ffmpeg大体介绍和内存模型

播放器框架 常用音视频术语 • 容器&#xff0f;文件&#xff08;Conainer/File&#xff09;&#xff1a;即特定格式的多媒体文件&#xff0c; 比如mp4、flv、mkv等。 • 媒体流&#xff08;Stream&#xff09;&#xff1a;表示时间轴上的一段连续数据&#xff0c;如一 段声音…

dubbogo如何实现路由规则功能

dubbo-go中如何实现路由规则功能 路由规则&#xff08; routing rule &#xff09;是为了改变网络流量所经过的途径而修改路由信息的技术&#xff0c;主要通过改变路由属性&#xff08;包括可达性&#xff09;来实现。在发起一次 RPC 调用前起到过滤目标服务器地址的作用&…

Node第三方包 【node-xlsx】

文章目录 &#x1f31f;前言&#x1f31f;node-xlsx&#x1f31f;安装&#x1f31f;导出xlsx文件&#x1f31f;解析xlsx文件&#x1f31f;另外&#xff1a;其他支持读写Excel的Node.js模块有&#xff1a;&#x1f31f;直接导出excel文件 &#x1f31f;写在最后 &#x1f31f;前…

麒麟信安联合主办 | openEuler Developer Day 2023召开 openEuler全场景走向深入

【中国&#xff0c;上海&#xff0c;2023年4月21日】openEuler Developer Day 2023于4月20-21日在线上和线下同步举办。本次大会由开放原子开源基金会、中国软件行业协会、openEuler社区、边缘计算产业联盟共同主办&#xff0c;以“万涓汇流&#xff0c;奔涌向前”为主题&#…

【FTP工具】- Win10下免费的FTP服务器搭建 - FileZilla 的下载、安装、使用

目录 一、概述二、下载、安装2.1 下载2.2 安装 三、FileZilla服务器的使用3.1 连接服务器3.2 配置用户权限 四、在windows访问该Ftp服务器4.1 查看Ftp服务器IP4.2 访问Ftp服务器 一、概述 FileZilla服务器是一个免费的开源FTP和FTPS服务器&#xff0c;是根据GNU通用公共许可证条…

HTB靶机03-Shocker-WP

Shocker scan 2023-03-30 23:22 ┌──(xavier㉿xavier)-[~/Desktop/Inbox] └─$ sudo nmap -sSV -T4 -F 10.10.10.56 Starting Nmap 7.91 ( https://nmap.org ) at 2023-03-30 23:22 HKT Nmap scan report for 10.10.10.56 Host is up (0.40s latency). Not shown: 99 clos…

QT-day(2)-(常用类、信号与槽.....)

题目&#xff1a;编写一个登录功能的界面&#xff0c;在登录框项目中&#xff0c;将登陆按钮发射的tclicked信号&#xff0c;连接到自定义的槽函数中&#xff0c;在槽函数中&#xff0c;判断u界面输入的用户名是否为"admin ,密码是否为"123456如果验证成功&#xff0…

数据结构_时间复杂度/空间复杂度

目录 1. 数据结构在学什么 2. 数据结构的基本概念 3. 算法和算法评价 3.1 算法的基本概念 3.2 算法的特征 3.3 算法效率的度量 3.3.1 时间复杂度 3.3.2 空间复杂度 1. 数据结构在学什么 常言道&#xff1a;学以致用&#xff1b;学习完基本C语言程序&#xff0c;我们希…

AB数对 码蹄集

题目来源&#xff1a;码蹄集 题目描述&#xff1a; 大致思路&#xff1a; 遍历输入的n个整数&#xff0c;将每个数存入哈希表中&#xff0c;key为数值&#xff0c;value为该数出现的次数。 再次遍历这n个整数&#xff0c;对于每个数x&#xff0c;计算出x-C和xC的值&#xff0…

Go语言基础----Go语言简介

【原文链接】Go语言基础----Go语言简介 一、Go语言简介 Go语言&#xff0c;又称Golang&#xff0c;是Google公司的Robert Griesemer&#xff0c;Rob Pike 及 Ken Thompson开发的一种静态强类型、编译型的语言。Go语言语法和C语言接近&#xff0c;但是功能上内存安全&#xff…

【Python】lambda匿名函数

文章目录 前言lambda匿名函数的定义lambda匿名函数的使用使用lambda匿名函数写一个计算器总结 前言 在Python中,可以使用def 关键字定义函数,使用def定义的关键字是有名称的,在调用时可以重复使用.还有一种是使用lambda关键字进行函数的定义,这个方式定义的函数是匿名函数,只能…