亿万级海量数据去重软方法

news2024/12/24 1:01:07

文章目录

  • 原理
  • 案例一
        • 需求:
        • 方法
  • 案例二
        • 需求:
        • 方法:
  • 参考

原理

在大数据分布式计算框架生态下,提升计算效率的方法是尽可能的把计算分布式话、并行化,避免单节点计算过载,把计算分摊到各个节点。这样解释小白能够听懂:比如你有5个桶,怎样轻松地把A池子的水倒入B池子里?

  • 最大并行化,5个桶同时利用,避免count distinct只用一个桶的方法
  • 重复利用化,一次提不动那么多水,不要打肿脸充胖子,一不小心oom,为什么不分几次呢
  • 数据均衡化,5个桶的水不要一个多一个少的,第一个提水的次数变多,第二个某些桶扛不住,俗称数据倾斜

案例一

需求:

计算day_num维度下的uv,自己脑补出海量数据,这里为方便说明,只列举了day_num,一个维度用桶来描绘计算模型,假设数据都是按字典顺序分桶

> select * from event;
+----------------+------------+
| event.day_num  | event.uid  |
+----------------+------------+
| day1           | a          |
| day1           | a          |
| day1           | a          |
| day1           | a          |
| day1           | bb         |
| day1           | bb         |
| day1           | bbb        |
| day1           | ccc        |
| day1           | ccc        |
| day1           | dddd       |
| day1           | eeee       |
| day1           | eeeee      |
| day1           | eeeee      |
| day1           | eeeee      |
+----------------+------------+

方法

  • 原始方法:count(distinct)
select count(distinct(uid))as uv from event group by day_num;

在这里插入图片描述
可以看到所有数据装到一个桶里面,桶已经快装不下了,明显最差

  • 优化一
select size(collect_set(uid)) as uv 
from (select day_num,uid from event group by day_num,uid) tmp 
group by day_num;

在这里插入图片描述
充分利用了桶,最大的实现了并行化,执行虽然分为了两部,但是大大减轻了第一步的负担,面向海量数据的场景去重方面拥有绝对的优势,假如第二步的结果集还是太大了呢?一样会oom扛不住

  • 优化二(推荐👍)

简单说就是转化计算,在一个jvm里面,硬去重的方法都逃不开把所有字符或字符的映射放一个对象里面,通过一定的逻辑获取去重集合,对于分布式海量数据的场景下,这种硬去重的计算仍然会花大量的时间在上图的最后单点去重的步骤,我们可以把去重的逻辑按照一定的规则分桶计算完成,每个桶之间分的数据都不重复,所有桶计算完桶内数据去重的集合大小,最后一步再相加。

创建临时表,其中length(uid) as len_uid是映射字段,uid的长度

create table event_tmp as select *,length(uid) as len_uid from event;
select sum(uv_tmp) as uv 
from
  (
      select day_num,size(collect_set(uid)) as uv_tmp 
       from event_tmp 
       group by len_uid,day_num
  ) tmp group by day_num

在这里插入图片描述
这里使用uid长度映射字段,实际开发中,你也可以选择首字母、末字母或者其它能想到的属性作为映射字段,分桶分步预聚合的方法,巧妙的把一个集合去重问题最终转化为相加问题,避开了单个jvm去重承受的压力,在海量数据的场景下,这个方法最为使用,推荐用在生产上


案例二

需求:

商品 product 每日总销售记录量级亿 级别起,去重 product 量大概 万 级别。每个商品有一个 state 标识其状态,该状态共3个值,分别为 “0”, “1”,“2”。
统计:
(1) 三个 state 下 product 的总量 pv
(2) 对应 state 下 product 去重后的量 uv
第二个统计每个 state 下有亿级别的 value ,去重时有严重的数据倾斜且数据去重规模很大,亿级别去重至万亿级别

方法:

  • GroupBy + RandomIndex + ToSet
    val re = sc.textFile(input).map(line => {
      val info = line.split("\t")
      val state = info(0)
      val productId = info(1)
      // 全局计数
      countMap(state).add(1L)
      // 构建 state + randomIndex + product 的 PairRDD
      (state + "_" + random.nextInt(100) , productId)
    }).groupBy(_._1).map(info => {
      val state = info._1.split("_")(0)
      // 分治
      val productSet = info._2.map(kv => {
        val productId = kv._2
        productId
      }).toArray.toSet
      (state, productSet)
    }).groupBy(_._1).map(info => {
      val state = info._1
      val tmpSet = mutable.HashSet[String]()
      // 合并
      info._2.foreach(kv => {
        tmpSet ++= kv._2
      })
      state + ":" + tmpSet.size
    }).collect()

因为 state 只有 0,1,2 三种可能,所以最后全部压力分摊在 3 个节点上,构造 PairRDD 时可以给 state 加上随机索引,从而将任务分散,获得多个小的 Set 再合并成大 Set 。相当于分治,该方法会将原始数据分为 3 x 100 份,缩减了每个 key 要处理的 productId 的量,最后再去除随机索引再 groupBy 一次,汇总得到结果,执行时间 5 min,优化效果显著。

  • Distinct + GroupBy (推荐👍 )
    上一步方案通过 randomIndex 将数据量分治,减少的百分比和 random 的数值成正比,但是在数据量很大的情况下,分治的每个 key 对应的 value 量还是很大,所以简单的去重执行 5min +,这次将 groupBy 改为 distinct先去重得到 万 级别数据量,再 GroupBy,此时的数据量本机也可轻松完成
def main(args: Array[String]): Unit = {

    val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getSimpleName)
    val sc: SparkContext = SparkContext.getOrCreate(conf)
    val rdd1: RDD[String] = sc.parallelize(List(
      "1,spark",
      "0,flink",
      "1,kafka",
      "1,spark",
      "0,hadoop",
    ), 4)

    val myAccumulator = new MyAccumulator
    sc.register(myAccumulator, "myAcc")
    val rdd2 = rdd1.map(str => {
      val info: Array[String] = str.split(",")
      val state: String = info(0)
      val productId: String = info(1)
      //累加器 求pv
      myAccumulator.add(state)
      state + "_" + productId
    }).distinct()
      .map(info => {
      val str: Array[String] = info.split("_")
      val state: String = str(0)
      val productId: String = str(1)
      (state, productId)
    }).groupBy(_._1) //(1,CompactBuffer((1,kafka), (1,spark)))
      .map(f => {
        val state: String = f._1
        val num: Int = f._2.map(_._2).toSet.size
        (state, num)
      })

    rdd2.foreach(println(_))
    //输出累加器值(注意在action后)
    val sentMap: mutable.HashMap[String, Long] = myAccumulator.value
    println(sentMap.toString())
  }
}

//自定义累加器
class MyAccumulator extends AccumulatorV2[String, mutable.HashMap[String, Long]] {
  private val hashMap = new mutable.HashMap[String, Long]()

  override def isZero: Boolean = hashMap.isEmpty

  override def copy(): AccumulatorV2[String, mutable.HashMap[String, Long]] = new MyAccumulator

  override def reset(): Unit = hashMap.clear()

  override def add(v: String): Unit = {
    val l: Long = hashMap.getOrElse(v, 0L)
    hashMap.update(v, l + 1)
  }

  override def merge(other: AccumulatorV2[String, mutable.HashMap[String, Long]]): Unit = {
    val hashMap1: mutable.HashMap[String, Long] = this.hashMap
    val hashMap2: mutable.HashMap[String, Long] = other.value
    hashMap2.foreach {
      case (k, v) => {
        val l: Long = hashMap1.getOrElse(k, 0L)
        hashMap1.update(k, l + v)
      }
    }
  }

  override def value: mutable.HashMap[String, Long] = this.hashMap
}
输出:
 	 (1,2)
     (0,2)
     Map(1 -> 3, 0 -> 2)
  • distinct源码
  def distinct(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
    def removeDuplicatesInPartition(partition: Iterator[T]) ....
        ...
    partitioner match {
      case Some(_) if numPartitions == partitions.length =>
        mapPartitions(removeDuplicatesInPartition, preservesPartitioning = true)
      case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
    }
  }

partitioner源码是这样声明的:val partitioner: Option[Partitioner] = None
case Some(_) //这句是匹配partitioner不为None
所以最终执行的代码是:
case _ => map(x => (x, null)).reduceByKey((x, ) => x, numPartitions).map(._1)

case _ => map(x => (x, null)).reduceByKey((x, _) => x, numPartitions).map(_._1)
主要是用到了 reduceByKey ,这个算子会在MapSide进行预聚合的操作。将聚合后的结果传递到reduce端。

参考

https://www.jianshu.com/p/1cdc943bb649

https://blog.csdn.net/BIT_666/article/details/121672715

reduceByKey详见

累加器详见

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/386570.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

最新|移动机器人导航定位技术概述

前言目前工业界广泛落地使用的移动机器人,除了应用场景在餐厅、酒店、超市等小范围室内送餐机器人和消毒机器人外,另外一个“大赛道”应用场景就是在工厂、制造装配车间、电站或车站的物流搬运机器人和巡检机器人了。而在最开始,一切都得从AG…

spring cloud gateway (五)

Gateway简介 Spring Cloud Gateway是Spring公司基于Spring 5.0,Spring Boot 2.0 和 Project Reactor 等技术开发的网关,它旨在为微服务架构提供一种简单有效的统一的 API 路由管理方式。它的目标是替代Netflix Zuul,其不仅提供统一的路由方式…

java 字典

java 字典 数据结构总览 Map Map 描述的是一种映射关系,一个 key 对应一个 value,可以添加,删除,修改和获取 key/value,util 提供了多种 Map HashMap: hash 表实现的 map,插入删除查找性能都是 O(1)&…

MySQL跨服务器数据映射

MySQL跨服务器数据映射环境准备1. 首先是要查看数据库的federated引擎 开启/关闭 状态2. 打开任务管理器,并重启mysql服务3. 再次查看FEDERATED引擎状态,引擎已启动映射实现问题总结在日常的开发中经常进行跨数据库进行查询数据。 同服务器下跨数据库进…

【SpringCloud】SpringCloud详解之Feign实战

目录前言SpringCloud Feign远程服务调用一.需求二.两个服务的yml配置和访问路径三.使用RestTemplate远程调用(order服务内编写)四.使用Feign远程调用(order服务内配置)五.自定义Feign配置(order服务内配置)六.Feign配置日志(oder服务内配置)七.Feign调优(order服务内配置)八.抽…

STM32之 串口

串口通信串行接口简称串口,也称串行通信接口或串行通讯接口(通常指COM接口),是采用串行通信方 式的扩展接口。串行接口(Serial Interface)是指数据一位一位地顺序传送。其特点是通信线路简 单,只…

Vue基础入门讲义(三)-指令

文章目录1.什么是指令?2.插值表达式2.1.花括号2.2.插值闪烁2.3.v-text和v-html3.v-model4.v-on4.1.基本用法4.2.事件修饰5.v-for5.1.遍历数组5.2.数组角标5.3.遍历对象6.key7.v-if和v-show7.1.基本使用7.2.与v-for结合7.3.v-else7.4.v-show8.v-bind8.1. 属性上使用v…

服务器处理发生异常:java.text.ParseException: Unparseable date

测试上传报文的时候遇见报错 服务器处理发生异常:java.text.ParseException: Unparseable date: “2023/03/03” 错误报文 实际需要的报文 错误原因 上传时间字段,与Date字段数据位数不匹配,Java类型:Date默认带有年月日 时分秒yyyy-mm-dd…

十年业务开发总结,如何做好高效高质量的价值交付

作者:杨博林 阿里大淘宝场景金融团队 软件交付是一个非常复杂的过程和体系,需要保障好每个阶段的质量和效率才能保障最终的质量和效率。本文将尝试从需求交付的前、中、后三个环节来阐述一下如何做高效高质量的价值交付。 一、背景 转眼间已经做了十年的…

JavaScript基础内容

日升时奋斗,日落时自省 目录 1、基础语法 2、DOM 2.1、选中页面元素 2.2、获取/修改元素内容 3、JS案例 3.1、网页版本猜数字 3.2、网页版表白墙 JS最初只是为了进行前端页面的开发后来JS也被赋予了更多的功能,可以用来开发桌面程序,手…

RHCSA-重置root密码(3.3)

方法1:rd.break (1)首先重启系统,在此页面按e键,在屏幕上显示内核启动参数 (2)知道linux这行,末尾空格后输入rd.break,然后按ctrlx (3)查看&#…

电脑桌面上的图标不见了怎么办?5个完美的解决技巧

案例:电脑桌面不显示任何东西? “救命!电脑打开后,只有桌面,任何图标都没有怎么办?心急,不知道该怎么解决?” 电脑桌面上的图标消失是一个比较常见的问题,许多用户都会…

Hadoop集群启动从节点没有DataNode

目录 一、问题背景 二、解决思路 三、解决办法: 一、问题背景 之前启动hadoop集群的时候都没有问题,今天启动hadoop集群的时候,从节点的DataNode没有启动起来。 二、解决思路 遇见节点起不来的情况,可以去看看当前节点的日志…

各大加密算法对比(原理、性能、安全、运用)

原理按加密可逆可以分为:加密可逆算法和加密不可逆算法。加密可逆算法又可以分为:对称加密和非对称加密。1、加密不可逆算法:一般采用hash算法加密,其原理一般是将原文长度补位成64的倍数,接着初始化固定长度的缓存值&…

大数据框架之Hadoop:MapReduce(五)Yarn资源调度器

Apache YARN (Yet Another Resource Negotiator) 是 hadoop 2.0 引入的集群资源管理系统。用户可以将各种服务框架部署在 YARN 上,由 YARN 进行统一地管理和资源分配。 简言之,Yarn是一个资源调度平台,负责为运算程序提供服务器运算资源&…

Windows Cannot Initialize Data Bindings 问题的解决方法

前言 拿到一个调试程序, 怎么折腾都打不开, 在客户那边, 尝试了几个系统版本, 发现Windows 10 21H2 版本可以正常运行。 尝试 系统篇 系统结果公司电脑 Windows 8有问题…下载安装 Windows10 22H2问题依旧下载安装 Windows10 21H2问题依旧家里的 笔记本Window 11正常 网上…

第三章 opengl之着色器

OpenGL着色器GLSLGLSL的数据类型向量输入与输出Uniform更多属性自己的着色器类着色器 着色器是运行在GPU上的小程序。着色器只是一种把输入转化为输出的程序。着色器也是一种非常独立的程序,因为它们之间不能相互通信;它们之间唯一的沟通只有通过输入和…

DC-DC模块电源隔离直流升压高压稳压输出5v12v24v转60v100v110v150v220v250v300v400v500v

特点效率高达80%以上1*1英寸标准封装单电压输出稳压输出工作温度: -40℃~85℃阻燃封装,满足UL94-V0 要求温度特性好可直接焊在PCB 上应用HRB 0.2~10W 系列模块电源是一种DC-DC升压变换器。该模块电源的输入电压分为:4.5~9V、9~18V、及18~36VDC标准&#…

软工第二次个人作业——软件案例分析

软工第二次个人作业——软件案例分析 项目内容这个作业属于哪个课程2023北航敏捷软件工程这个作业的要求在哪里个人作业-软件案例分析我在这个课程的目标是体验敏捷开发过程,掌握一些开发技能,为进一步发展作铺垫这个作业在哪个具体方面帮助我实现目标通…

四川大学软件学院|系统级编程期末复习

概述 选择题 50 分(原题率 80%):http://tieba.baidu.com/p/1250021454?&share9105&frsharewise&unique967707B1DAECEF4A785B61D29AF36950&st1639102957&client_type1&client_version12.10.1&sfccopy&share…