【Flink】需求实现之独立访客数量的计算 和 布隆过滤器的原理及使用

news2024/11/24 17:42:01

文章目录

  • 一 独立访客数量计算
  • 二 布隆过滤器
    • 1 什么是布隆过滤器
    • 2 实现原理
      • (1)HashMap 的问题
      • (2)布隆过滤器数据结构
    • 3 使用布隆过滤器去重

一 独立访客数量计算

public static void main(String[] args) throws Exception{
    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);

    env
            .readTextFile("E:\\develop\\MyWork\\flink2022tutorial\\src\\main\\resources\\UserBehavior.csv")
            .map(new MapFunction<String, Example7.UserBehavior>() {
                @Override
                public Example7.UserBehavior map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new Example7.UserBehavior(
                            arr[0],arr[1],arr[2],arr[3],
                            Long.parseLong(arr[4]) * 1000L
                    );
                }
            })
            .keyBy(r -> r.behavior.equals("pv"))
            .assignTimestampsAndWatermarks(
                    WatermarkStrategy.<Example7.UserBehavior>forMonotonousTimestamps()
                    .withTimestampAssigner(new SerializableTimestampAssigner<Example7.UserBehavior>() {
                        @Override
                        public long extractTimestamp(Example7.UserBehavior element, long recordTimestamp) {
                            return element.timestamp;
                        }
                    })
            )
            .keyBy(r -> true)
            .window(TumblingEventTimeWindows.of(Time.hours(1)))
            .aggregate(new CountAgg(),new WindowResult())
            .print();

    env.execute();
}

public static class WindowResult extends ProcessWindowFunction<Long,String,Boolean, TimeWindow>{

    @Override
    public void process(Boolean s, Context context, Iterable<Long> elements, Collector<String> out) throws Exception {
        String windowStart = new Timestamp(context.window().getStart()).toString();
        String windowEnd = new Timestamp(context.window().getEnd()).toString();
        Long count = elements.iterator().next();
        out.collect("窗口" + windowStart + "~" + windowEnd + "的统计值是:" + count);
    }
}

// 使用hashset对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction<Example7.UserBehavior, HashSet<String>,Long>{

    @Override
    public HashSet<String> createAccumulator() {
        return new HashSet<>();
    }

    @Override
    public HashSet<String> add(Example7.UserBehavior value, HashSet<String> accumulator) {
        accumulator.add(value.userId);
        return accumulator;
    }

    @Override
    public Long getResult(HashSet<String> accumulator) {
        return (long)accumulator.size();
    }

    @Override
    public HashSet<String> merge(HashSet<String> a, HashSet<String> b) {
        return null;
    }
}

此程序有一个隐患:现将所有数据keyBy到了同一条流,每一个小时取一次uv,添加到hashSet去重,如果程序的用户向很大,如1亿个独立访客,一个用户的用户id为100个字符,那么一个窗口中的独立访客就要占用10G的内存。

想要优化这种使用了增量聚合与全窗口聚合的程序,就需要使用一种新的数据结构 – 布隆过滤器。

二 布隆过滤器

1 什么是布隆过滤器

本质上布隆过滤器是一种数据结构,是一种比较巧妙的概率型数据结构(probabilistic datastructure),特点是高效地插入和查询,可以用来告诉你“某样东西一定不存在或者可能存在”。

相比于传统的 List、Set、Map 等数据结构,它更高效、占用空间更少,但是缺点是其返回的结果是概率性的,而不是确切的。

2 实现原理

(1)HashMap 的问题

通常我们判断某个元素是否存在用的是什么?

HashMap 可以将值映射到 HashMap 的 Key,然后可以在 O(1) 的时间复杂度内返回结果,效率奇高。但是 HashMap 的实现也有缺点,例如存储容量占比高,考虑到负载因子的存在,通常空间是不能被用满的,而一旦值很多,例如上亿的时候,那 HashMap 占据的内存大小就变得很可观了。

再加入我们的数据集存储在远程服务器上,本地服务接受输入,而数据集非常大不可能一次性读进内存构建 HashMap 的时候,也会存在问题。

(2)布隆过滤器数据结构

布隆过滤器是一个 bit 向量或者说 bit 数组,如下图:

在这里插入图片描述

如果要映射一个值到布隆过滤器中,需要使用多个不同的哈希函数生成多个哈希值,并对每个生成的哈希值指向的 bit 位重置为 1,例如针对值“baidu”和三个不同的哈希函数分别生成了哈希值 1、4、7,之后“baidu”字符串就被丢弃,则上图转变为:

在这里插入图片描述

那么此时现在再存一个值“tencent”,如果哈希函数返回 3、4、8 的话,图继续变为:
在这里插入图片描述

值得注意的是,4 这个 bit 位由于两个值的哈希函数都返回了这个 bit 位,因此它被覆盖了。

现在如果想查询“dianping”这个值是否存在,哈希函数返回了 1、5、8 三个值,结果发现 5 这个 bit 位上的值为 0,说明没有任何一个值映射到这个 bit 位上,因此可以很确定地说“dianping”这个值不存在。

而当需要查询“baidu”这个值是否存在的话,那么哈希函数必然会返回 1、4、7,然后检查发现这三个 bit 位上的
值均为 1,那么不可以说“baidu”存在,只能是“baidu”这个值可能存在,不确定1、4、7这三位没有被其他数据覆盖过。

3 使用布隆过滤器去重

使用org.apache.flink.shaded.guava18.com.google.common.hash.BloomFilter包下的布隆过滤器。

// 使用布隆过滤器对用户id进行去重,输出访客数量
public static class CountAgg implements AggregateFunction<Example7.UserBehavior, Tuple2<Long,BloomFilter<String>>,Long>{

    @Override
    public Tuple2<Long, BloomFilter<String>> createAccumulator() {
        // 参数依次为要去重的元素类型,预估有多少人,误判率
        return Tuple2.of(0L,BloomFilter.create(Funnels.stringFunnel(Charsets.UTF_8),100000,0.01));
    }

    @Override
    public Tuple2<Long, BloomFilter<String>> add(Example7.UserBehavior value, Tuple2<Long, BloomFilter<String>> accumulator) {
        if(!accumulator.f1.mightContain(value.userId)){
            accumulator.f1.put(value.userId);
            accumulator.f0 += 1L;
        }
        return accumulator;
    }

    @Override
    public Long getResult(Tuple2<Long, BloomFilter<String>> accumulator) {
        return accumulator.f0;
    }

    @Override
    public Tuple2<Long, BloomFilter<String>> merge(Tuple2<Long, BloomFilter<String>> a, Tuple2<Long, BloomFilter<String>> b) {
        return null;
    }

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/65432.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Locust学习记录3-用户类属性【host attribute,tasks attribute】

host attribute(主机属性) host属性是家长主机的URL前缀&#xff08;即“https://www.baidu.com”&#xff09;。 这是在Locust的web UI 或命令行上指定的&#xff0c;在Locust【--host】启动时使用该选项 【--host】如果在用户类中声明了一个host属性&#xff0c;那么在命令…

答网友提问 - SAP Business Technology Platform(BTP) 的计费模式

我的知识星球 有朋友向我提问&#xff1a; S4HANA(本地部署或云版)跟SAP家族系统以及非SAP系统的集成&#xff0c;sap的标准/推荐做法是通过BTP还是直接连接&#xff0c;或者是根据目标系统分别选择&#xff1f;有参考链接最好了。 还有BTP的收费模式是什么样的&#xff0c;是不…

业聚医疗通过聆讯:上半年营收6885万美元 钱永勋为实控人

雷递网 雷建平 12月5日血管介入器械公司――业聚医疗集团有限公司&#xff08;OrbusNeich Medical Group Limited&#xff09;&#xff08;简称“业聚医疗”&#xff09;日前通过聆讯&#xff0c;准备在香港上市。上半年营收6885万美元业聚医疗总部位于中国香港&#xff0c;是一…

springboot解决跨域问题

springboot解决跨域问题 文章目录springboot解决跨域问题一、跨域是什么&#xff1f;二、java解决CORS请求的方式1.返回新的CORSFilter&#xff08;全局跨域&#xff09;2.重写WebMvcConfigurer&#xff08;全局跨域&#xff09;3.使用注解CorsOrigin4.手动设置响应头&#xff…

如何安装Ambari集群_大数据培训

注意&#xff1a;以下操作主节点操作即可 1 制作本地源 制作本地源是因为在线安装Ambari太慢。制作本地源只需在主节点上进行。 1.1 配置HTTPD 服务 配置HTTPD 服务到系统层使其随系统自动启动 [roothadoop102 ~]# chkconfig httpd on [roothadoop102 ~]# service httpd …

[附源码]Python计算机毕业设计Django网上电影购票系统

项目运行 环境配置&#xff1a; Pychram社区版 python3.7.7 Mysql5.7 HBuilderXlist pipNavicat11Djangonodejs。 项目技术&#xff1a; django python Vue 等等组成&#xff0c;B/S模式 pychram管理等等。 环境需要 1.运行环境&#xff1a;最好是python3.7.7&#xff0c;…

股票量化分析工具QTYX使用攻略——均线系统多头排列选股

搭建自己的量化系统如果要长期在市场中立于不败之地&#xff01;必须要形成一套自己的交易系统。如何学会搭建自己的量化交易系统&#xff1f;边学习边实战&#xff0c;在实战中学习才是最有效地方式。于是我们分享一个即可以用于学习&#xff0c;也可以用于实战炒股分析的量化…

【软件测试】软件测试工程师职位核心任务?测试人测试职业发展?

目录&#xff1a;导读前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09;前言 你认为软件测试工程…

RDMA测试集:preftest安装与使用

RDMA测试集&#xff1a;preftest安装与使用 perftest是一组基于uverbs编写的测试程序&#xff0c;是RDMA性能相关的benchmark。可用于软硬件调优以及功能测试。 源码位置 linux-rdma/perftest: Infiniband Verbs Performance Tests (github.com) perftest测试软件包含的测试…

111个Python数据分析实战项目,代码已跑通,数据可下载

写在前面&#xff1a; 这里整理了111个数据分析的案例&#xff0c;每一个都进行了严格的筛选&#xff0c;筛选标准如下&#xff1a; 1. 有干货&#xff1a;杜绝纯可视化、统计性分析&#xff0c;有一定比例的讲解性文字 2. 可跑通&#xff1a;所有代码均经过测试&#xff0c;…

框架的优点(SpringBoot VS Servlet)

创建项目比较&#xff1a; 创建Servlet项目&#xff1a;流程复杂&#xff0c;容易出错&#xff0c;这里可以参考之前写的一篇博客&#xff1a; 如何创建一个Servlet项目&#xff08;Maven&#xff09;&#xff1f;_crazy_xieyi的博客-CSDN博客_maven创建servlet 1.创建maven…

【JavaSE】HashMap底层原理、面试题详解

【JavaSE】HashMap底层原理、面试题详解 文章目录【JavaSE】HashMap底层原理、面试题详解一&#xff1a;HashMap的数据结构1&#xff1a;JDK1.72&#xff1a;JDK1.8二&#xff1a;hash 方法的原理三&#xff1a;HashMap的put流程四&#xff1a;HashMap的get流程五&#xff1a;H…

ARM基础(2):模式和特权等级(User/Thread mode和Privileged level)

Cortex-M3处理器支持两种模式和两种特权级别。 如下图所示&#xff0c;当处理器运行于Thread mode时&#xff0c;它可以处于Privileged或User级别&#xff1b;而Handler mode下&#xff0c;只能处于Privileged级别。当处理器复位完毕后&#xff0c;处于Thread mode。 在User级…

SSM(spring+springmvc+mybatis)完全注解开发整合

SSM(springspringmvcmybatis)完全注解开发整合 目录结构如图&#xff1a; 创建数据库 create database mydb; use mydb; create table tbl_users(id int primary key auto_increment,username varchar(20),password varchar(20),age int,birthday date );insert tbl_users(…

[Cortex-M3]-4-如何在内嵌RAM中运行程序

[Cortex-M3]-1-启动流程-启动文件[Cortex-M3]-2-map文件解析[Cortex-M3]-3-分散加载文件解析&#xff08;.sct&#xff09;[Cortex-M3]-4-如何在内嵌RAM中运行程序 1 定义items 在进行项目开发时&#xff0c;可以在project items中创建debug和release,并确定。 平时调试下拉选…

web结课作业的源码——HTML+CSS+JavaScript仿oppo官网手机商城(1页)

常见网页设计作业题材有 个人、 美食、 公司、 学校、 旅游、 电商、 宠物、 电器、 茶叶、 家居、 酒店、 舞蹈、 动漫、 服装、 体育、 化妆品、 物流、 环保、 书籍、 婚纱、 游戏、 节日、 戒烟、 电影、 摄影、 文化、 家乡、 鲜花、 礼品、 汽车、 其他等网页设计题目, A…

15、简单了解Vue

1、vue概述 Vue是一套前端框架&#xff0c;可以免除原生JavaScript中的DOM操作&#xff0c;简化书写。 基于MVVM&#xff08;Model-View-View Model&#xff09;思想&#xff0c;实现数据的双向绑定&#xff0c;将编程的关注点放在数据上 vue的官网&#xff1a;https://cn.v…

电脑数据转移到新电脑?换新电脑如何转移软件

电脑数据转移到新电脑&#xff1f;许多用户在下载游戏的时候&#xff0c;没有更改默认安装位置&#xff0c;直接把游戏安装到了C盘里&#xff0c;结果导致C盘空间不足&#xff0c;于是希望将游戏移动到其他驱动器以释放空间。也有的用户是更换了电脑&#xff0c;不想重新安装游…

常用数据库之sqlite的使用

2.1 介绍 sqlite为关系型数据库&#xff0c;是一款轻型的数据库&#xff0c;是遵守ACID的关系型数据库管理系统&#xff0c;它包含在一个相对小的C库中。它的设计目标是嵌入式的&#xff0c;而且已经在很多嵌入式产品中使用了它&#xff0c;它占用资源非常的低&#xff0c;在嵌…

反序列化__wakeup

简介 __wakeup()&#xff0c;执行unserialize()时&#xff0c;先会调用这个函数。 <?php class c1 {private $argv;private $method;function __construct($argv,$method){$this->argv$argv;$this->method$method;}public function f1(){ech…