大数据-玩转数据-Flink 海量数据实时去重

news2025/1/11 22:58:10

一、海量数据实时去重说明

借助redis的Set,需要频繁连接Redis,如果数据量过大, 对redis的内存也是一种压力;使用Flink的MapState,如果数据量过大, 状态后端最好选择 RocksDBStateBackend; 使用布隆过滤器,布隆过滤器可以大大减少存储的数据的数据量。

二、海里书实时去重为什么需要布隆过滤器

如果想判断一个元素是不是在一个集合里,一般想到的是将集合中所有元素保存起来,然后通过比较确定。链表、树、散列表(又叫哈希表,Hash table)等等数据结构都是这种思路。
但是随着集合中元素的增加,我们需要的存储空间越来越大。同时检索速度也越来越慢,上述三种结构的检索时间复杂度分别为。
布隆过滤器即可以解决存储空间的问题, 又可以解决时间复杂度的问题.
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。

三、布隆过滤基本概念

布隆过滤器(Bloom Filter,下文简称BF)由Burton Howard Bloom在1970年提出,是一种空间效率高的概率型数据结构。它专门用来检测集合中是否存在特定的元素。
它实际上是一个很长的二进制向量和一系列随机映射函数。

实现原理
布隆过滤器的原理是,当一个元素被加入集合时,通过K个散列函数将这个元素映射成一个位数组中的K个点,把它们置为1。检索时,我们只要看看这些点是不是都是1就(大约)知道集合中有没有它了:如果这些点有任何一个0,则被检元素一定不在;如果都是1,则被检元素很可能在。这就是布隆过滤器的基本思想。
BF是由一个长度为m比特的位数组(bit array)与k个哈希函数(hash function)组成的数据结构。位数组均初始化为0,所有哈希函数都可以分别把输入数据尽量均匀地散列。
当要插入一个元素时,将其数据分别输入k个哈希函数,产生k个哈希值。以哈希值作为位数组中的下标,将所有k个对应的比特置为1。
当要查询(即判断是否存在)一个元素时,同样将其数据输入哈希函数,然后检查对应的k个比特。如果有任意一个比特为0,表明该元素一定不在集合中。如果所有比特均为1,表明该集合有(较大的)可能性在集合中。为什么不是一定在集合中呢?因为一个比特被置为1有可能会受到其他元素的影响(hash碰撞),这就是所谓“假阳性”(false positive)。相对地,“假阴性”(false negative)在BF中是绝不会出现的。
下图示出一个m=18, k=3的BF示例。集合中的x、y、z三个元素通过3个不同的哈希函数散列到位数组中。当查询元素w时,因为有一个比特为0,因此w不在该集合中。
在这里插入图片描述

优点
1.不需要存储数据本身,只用比特表示,因此空间占用相对于传统方式有巨大的优势,并且能够保密数据;
2.时间效率也较高,插入和查询的时间复杂度均为, 所以他的时间复杂度实际是
3.哈希函数之间相互独立,可以在硬件指令层面并行计算。
缺点
1.存在假阳性的概率,不适用于任何要求100%准确率的情境;
2.只能插入和查询元素,不能删除元素,这与产生假阳性的原因是相同的。我们可以简单地想到通过计数(即将一个比特扩展为计数值)来记录元素数,但仍然无法保证删除的元素一定在集合中。
使用场景
所以,BF在对查准度要求没有那么苛刻,而对时间、空间效率要求较高的场合非常合适.
另外,由于它不存在假阴性问题,所以用作“不存在”逻辑的处理时有奇效,比如可以用来作为缓存系统(如Redis)的缓冲,防止缓存穿透。
假阳性概率的计算
假阳性的概率其实就是一个不在的元素,被k个函数函数散列到的k个位置全部都是1的概率。可以按照如下的步骤进行计算: p = f(m,n,k)
其中各个字母的含义:
1.n :放入BF中的元素的总个数;
2.m:BF的总长度,也就是bit数组的个数
3.k:哈希函数的个数;
4.p:表示BF将一个不在其中的元素错判为在其中的概率,也就是false positive的概率;
A.BF中的任何一个bit在第一个元素的第一个hash函数执行完之后为 0的概率是:

B.BF中的任何一个bit在第一个元素的k个hash函数执行完之后为 0的概率是:

C.BF中的任何一个bit在所有的n元素都添加完之后为 0的概率是:

D.BF中的任何一个bit在所有的n元素都添加完之后为 1的概率是:

E.一个不存在的元素被k个hash函数映射后k个bit都是1的概率是:

结论:在哈数函数个数k一定的情况下
1.比特数组m长度越大, p越小, 表示假阳性率越低
2.已插入的元素个数n越大, p越大, 表示假阳性率越大
经过各种数学推导:
对于给定的m和n,使得假阳性率(误判率)最小的k通过如下公式定义:

四、使用布隆过滤器实现去重

Flink已经内置了布隆过滤器的实现(使用的是google的Guava)

package com.lyh.flink12;

import com.atguigu.flink.java.chapter_6.UserBehavior;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter;
import org.apache.flink.shaded.guava18.com.google.common.hash.Funnels;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.time.Duration;

public class Flink02_UV_BoomFilter {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建WatermarkStrategy
        WatermarkStrategy<UserBehavior> wms = WatermarkStrategy
            .<UserBehavior>forBoundedOutOfOrderness(Duration.ofSeconds(5))
            .withTimestampAssigner(new SerializableTimestampAssigner<UserBehavior>() {
                @Override
                public long extractTimestamp(UserBehavior element, long recordTimestamp) {
                    return element.getTimestamp() * 1000L;
                }
            });

        env
            .readTextFile("input/UserBehavior.csv")
            .map(line -> { // 对数据切割, 然后封装到POJO中
                String[] split = line.split(",");
                return new UserBehavior(Long.valueOf(split[0]), Long.valueOf(split[1]), Integer.valueOf(split[2]), split[3], Long.valueOf(split[4]));
            })
            .filter(behavior -> "pv".equals(behavior.getBehavior())) //过滤出pv行为
            .assignTimestampsAndWatermarks(wms)
            .keyBy(UserBehavior::getBehavior)
            .window(TumblingEventTimeWindows.of(Time.minutes(60)))
            .process(new ProcessWindowFunction<UserBehavior, String, String, TimeWindow>() {

                private ValueState<Long> countState;
                private ValueState<BloomFilter<Long>> bfState;

                @Override
                public void open(Configuration parameters) throws Exception {
                    countState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("countState", Long.class));

                    bfState = getRuntimeContext()
                        .getState(new ValueStateDescriptor<BloomFilter<Long>>("bfState", TypeInformation.of(new TypeHint<BloomFilter<Long>>() {})
                                  )
                        );

                }

                @Override
                public void process(String key,
                                    Context context,
                                    Iterable<UserBehavior> elements, Collector<String> out) throws Exception {
                    countState.update(0L);

                    // 在状态中初始化一个布隆过滤器
                    // 参数1: 漏斗, 存储的类型
                    // 参数2: 期望插入的元素总个数
                    // 参数3: 期望的误判率(假阳性率)
                    BloomFilter<Long> bf = BloomFilter.create(Funnels.longFunnel(), 1000000, 0.001);
                    bfState.update(bf);
                    
                    for (UserBehavior behavior : elements) {
                        // 查布隆
                        if (!bfState.value().mightContain(behavior.getUserId())) {
                            // 不存在 计数+1
                            countState.update(countState.value() + 1L);
                            // 记录这个用户di, 表示来过
                            bfState.value().put(behavior.getUserId());
                        }
                    }
                    out.collect("窗口: " + context.window() + " 的uv是: " + countState.value());
                }
            })
            .print();
        env.execute();
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1058518.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

快速选择排序

"你经过我每个灿烂时刻&#xff0c;我才真正学会如你般自由" 前些天有些无聊&#xff0c;想试试自己写的快排能否过leetcode上的排序算法题。结果是&#xff0c;不用截图可想而知&#xff0c;肯定是没过的&#xff0c;否则也不会有这篇文章的产出。 这份快排算法代码…

请求转发与请求作用域

创建input.jsp页面&#xff0c;通过表单输入学号、姓名后&#xff0c;单击登录按钮&#xff0c;控制转发到FirstServlet对其进行处理&#xff0c;然后通过请求对象的getRequestDispartcher()获得RequestDispartcher对象&#xff0c;将请求转发至SecondServlet&#xff0c;在Sec…

辅助驾驶功能开发-测试篇(2)-真值系统介绍

1 真值系统概述 1.1 真值评测系统核心应用 快速构建有效感知真值,快速完成感知性能评估,快速分析感知性能缺陷。 主要应用场景包括: 1. 感知算法开发验证: 在算法开发周期中,评测结果可以作为测试报告的一部分,体现算法性能的提升。 2. 遴选供应…

九、2023.10.3.Linux(end).9

文章目录 33、简述mmap的原理和使用场景&#xff1f;34、互斥量能不能在进程中使用&#xff1f;35、协程是轻量级线程&#xff0c;轻量级表现在哪里&#xff1f;36、说说常见信号有哪些&#xff0c;表示什么含义&#xff1f;37、说说线程间通信的方式有哪些&#xff1f;38、说说…

基于matlab创作简易表白代码

一、程序 以下是一个基于MATLAB的简单表白代码&#xff1a; % 表白代码 clc; % 清除命令行窗口 clear; % 清除所有变量 close all; % 关闭所有图形窗口 % 输入被表白者的名字 name input(请输入被表白者的名字&#xff1a;, s); % 显示表白信息 fprintf(\n); fprintf(亲爱的…

深入浅出的算法设计与分析技巧解读(软件设计师笔记)

&#x1f600;前言 在计算机科学的庞大体系中&#xff0c;算法始终占据着核心的地位&#xff0c;充当着解决问题的“钥匙”。本章主要深入探讨了算法设计与分析这一主题&#xff0c;旨在通过具体的问题解析和代码实现&#xff0c;引导读者深入理解各种经典算法的设计原理和应用…

ElasticSearch第四讲:ES详解:ElasticSearch和Kibana安装

ElasticSearch第四讲&#xff1a;ES详解&#xff1a;ElasticSearch和Kibana安装 本文是ElasticSearch第四讲&#xff1a;ElasticSearch和Kibana安装&#xff0c;主要介绍ElasticSearch和Kibana的安装。了解完ElasticSearch基础和Elastic Stack生态后&#xff0c;我们便可以开始…

力扣 -- 322. 零钱兑换(完全背包问题)

参考代码&#xff1a; 未优化代码&#xff1a; class Solution { public:int coinChange(vector<int>& coins, int amount) {int n coins.size();const int INF 0x3f3f3f3f;//多开一行&#xff0c;多开一列vector<vector<int>> dp(n 1, vector<i…

cadence - 解决orcad无响应的有效方法

文章目录 解决orcad无响应的有效方法概述笔记备注补充好像必须要在英文(美国)语言的主环境下运行才行补充 - orcad无响应的可能原因补充 - 英文模式也不好使补充 - orcad无响应的真实原因解决orcad无响应的有效方法END 解决orcad无响应的有效方法 概述 在画H7的飞达控制底板.…

复习C语言数组的用法

实验内容 1.1设计一个函数fun&#xff0c;功能是有N*N的矩阵&#xff0c;根据给定的m值&#xff0c;m<N,将每行元素中的值&#xff0c;均往右移m个位置&#xff0c;左边置0 #include<stdio.h> void fun(int (*a)[3],int m){int n,j,i,k,num;int p2;//右移位置列数nu…

Linux:minishell

目录 1.实现逻辑 2.代码及效果展示 1.打印字符串提示用户输入指令 2.父进程拆解指令 3.子进程执行指令,父进程等待结果 4.效果 3.实现过程中遇到的问题 1.打印字符串的时候不显示 2.多换了一行 3.cd路径无效 4.优化 1.ll指令 2.给文件或目录加上颜色 代码链接 模…

MySQL进阶 —— 超详细操作演示!!!(下)

MySQL进阶 —— 超详细操作演示&#xff01;&#xff01;&#xff01;&#xff08;下&#xff09; 五、锁5.1 概述5.2 全局锁5.3 表级锁5.4 行级锁 六、InnoDB 引擎6.1 逻辑存储结构6.2 架构6.3 事务原理6.4 MVCC 七、MySQL 管理7.1 系统数据库7.2 常用工具 MySQL— 基础语法大…

【管理运筹学】第 8 章 | 动态规划(5,设备更新问题)

系列文章 【管理运筹学】第 8 章 | 动态规划&#xff08;1&#xff0c;多阶段决策过程与动态规划基本概念&#xff09; 【管理运筹学】第 8 章 | 动态规划&#xff08;2&#xff0c;动态规划的基本思想与模型求解&#xff09; 【管理运筹学】第 8 章 | 动态规划&#xff08;3&…

Vscode 如何创建java项目,并添加包

创建java项目 添加包 先打开这个资源管理器中的javaProject&#xff0c;然后打开这个javaProject&#xff0c;点击里面的Reference Libraries,然后点击加号 选择要添加的包然后进行确认即可

C/C++学习 -- 分组密算法(3DES算法)

1. 3DES算法概述 3DES&#xff08;Triple Data Encryption Standard&#xff09;&#xff0c;又称为TDEA&#xff08;Triple Data Encryption Algorithm&#xff09;&#xff0c;是一种对称加密算法&#xff0c;是DES&#xff08;Data Encryption Standard&#xff09;的加强版…

k8s全栈-笔记6-Prometheus+Alertmanager构建监控系统

k8s全栈-笔记6-PrometheusAlertmanager构建监控系统 实验环境: Pormetheusgrafanaalertmanager安装在k8s集群,k8s环境如下 K8S集群角色IP主机名安装的组件控制节点(master)172.20.252.181k8s-master01apiserver,controller-manager,schedule,kubelet,etcd,kube-proxy,容器运…

vcruntime140.dll如何修复,快速修复vcruntime140.dll丢失的三种方法

vcruntime140.dll是Visual C 2015运行库的一个组件&#xff0c;它包含了许多运行时函数&#xff0c;用于支持各种程序的正常运行。当vcruntime140.dll文件丢失时&#xff0c;可能会导致一些程序无法正常运行。本文将详细介绍vcruntime140.dll的作用、丢失原因以及三种修复方法。…

STM32Cubemx新建F429基础工程

配置STM32CubeMX 配置KEY 配置USART1 配置RCC Project Manager Toolchain 选择 MDK-ARM Code Generator 配置如下 GENERATE CODE 即可 配置Keil5 魔术棒配置 – Target – 勾选 Use MicroLIB – Debug – Flash Download – 勾选Reset and Run 基础代码 /* Private incl…

linux——信号

✅<1>主页&#xff1a;&#xff1a;我的代码爱吃辣 &#x1f4c3;<2>知识讲解&#xff1a;Linux——进程等待 ☂️<3>开发环境&#xff1a;Centos7 &#x1f4ac;<4>前言&#xff1a;生活中处处有信号&#xff0c;linux中也有很多信号&#xff0c;OS使…

算法强训:第三十四天

文章目录 收件人列表养兔子一、收件人列表OJ链接 本题思路:先接收到一个数字,代表接下来是多少组数据 ,逐个接收每个名字,如果名字中没有,或者 则直接输出,否则在改名字前后拼接"\""再输出,除最后一个名字外,每个名字之后都有一个", " ,该组用例…