大数据-Spark批处理实用广播Broadcast构建一个全局缓存Cache

news2024/11/17 17:28:37

1、broadcast广播

在这里插入图片描述

在Spark中,broadcast是一种优化技术,它可以将一个只读变量缓存到每个节点上,以便在执行任务时使用。这样可以避免在每个任务中重复传输数据。

2、构建缓存

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject

// 定义全局缓存单例对象
object GlobalCache extends Serializable {

  // 广播变量,用于存储缓存数据
  private var cacheData: Broadcast[collection.mutable.Map[String, JSONObject]] = _

  // 设置 SparkSession 和广播变量
  def setSparkSession(spark: SparkSession): Unit = {
    cacheData = spark.sparkContext.broadcast(collection.mutable.Map.empty[String, JSONObject])
  }


  // 按订单ID和用户ID缓存JSONObject对象
  def cacheJSONObject(orderId: String, userId: String, jsonObject: JSONObject): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.put(generateKey(orderId, userId), jsonObject)
    }
  }

  // 根据订单ID和用户ID删除缓存的JSONObject对象
  def removeJSONObject(orderId: String, userId: String): Unit = {
    // 获取广播变量的值并进行修改
    val data = cacheData.value
    data.synchronized {
      data.remove(generateKey(orderId, userId))
    }
  }

  // 根据订单ID和用户ID获取缓存的JSONObject对象
  def getJSONObjet(orderId: String, userId: String): JSONObject = {
    // 获取广播变量的值并进行访问
    val data = cacheData.value
    data.synchronized {
      data.get(generateKey(orderId, userId)).orNull
    }
  }

  // 生成缓存键,使用订单ID和用户ID拼接
  private def generateKey(orderId: String, userId: String): String = s"$orderId|$userId"
}

3、缓存测试

import org.apache.spark.sql.SparkSession
import org.apache.spark.broadcast.Broadcast
import com.alibaba.fastjson.JSONObject
import org.apache.log4j.{Level, Logger}

object CacheTest {
  Logger.getLogger("org").setLevel(Level.ERROR)
  Logger.getRootLogger().setLevel(Level.ERROR) // 设置日志级别


  def addItem(orderId:String, userId:String, name:String): Unit = {
    val jsonObject = new JSONObject()
    jsonObject.put("name", name)

    // 缓存JSONObject对象
    GlobalCache.cacheJSONObject(orderId, userId, jsonObject)
  }


  def getCache(orderId: String, userId: String): JSONObject = {
    // 获取缓存的JSONObject对象
    GlobalCache.getJSONObjet(orderId, userId)
  }

  def delItem(orderId:String, userId:String): Unit = {
    // 删除缓存的JSONObject对象
    GlobalCache.removeJSONObject(orderId, userId)
  }


  def getSparkSession(appName: String, localType: Int): SparkSession = {
    val builder: SparkSession.Builder = SparkSession.builder().appName(appName)
    if (localType == 1) {
      builder.master("local[8]") // 本地模式,启用8个核心
    }

    val spark = builder.getOrCreate() // 获取或创建一个新的SparkSession
    spark.sparkContext.setLogLevel("ERROR") // Spark设置日志级别
    spark
  }

  def main(args: Array[String]): Unit = {
    println("Start CacheTest")
    val spark: SparkSession = getSparkSession("CacheTest", 1)

    GlobalCache.setSparkSession(spark)  // 构造全局缓存

    addItem("001", "456", "苹果")      // 添加元素
    addItem("002", "789", "香蕉")      // 添加元素
    var cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")

    delItem("001", "456")      // 删除元素
    cachedObject = getCache("001", "456")
    println(s"Cached Object: $cachedObject")
    spark.stop()
  }
}

4、控制台输出

Start CacheTest
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Cached Object: {"name":"苹果"}
Cached Object: null

Process finished with exit code 0

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/809793.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Xshell配置ssh免密码登录-公钥与私钥登录linux服务器

目录 简介 提示 方法步骤 步骤1:生成密钥公钥(Public key)与私钥(Private Key) 方法1:使用xshell工具 方法2:使用命令行 步骤2:放置公钥(Public Key)到服务器 方法1:(我使用的是…

LeetCode551.Student-Attendance-Record-i<学生出勤记录 I>

题目&#xff1a; 思路&#xff1a; 遍历就完事了.连续三天不来return false; 超过两次缺勤 fasle; 代码是&#xff1a; //codeclass Solution { public:bool checkRecord(string s) {int n s.length();int abtimes0,latimes0;for(int i0;i<n;i){switch(s[i]){case(A):l…

总结942

5:40起床 6:00~7:00单词复习300个&#xff0c;记100个 7:15~8:00早读&#xff0c;《love is as strong as death》第一第二段 8:10~9:10三大计算回顾 9:15~10:06 习题880第一章基础选择纠错 10:10~10&#xff1a;30单词默写 10:30~11:40强化第一讲习题 11:40~12:30继续…

【雕爷学编程】MicroPython动手做(16)——掌控板之图片图像显示3

知识点&#xff1a;什么是掌控板&#xff1f; 掌控板是一块普及STEAM创客教育、人工智能教育、机器人编程教育的开源智能硬件。它集成ESP-32高性能双核芯片&#xff0c;支持WiFi和蓝牙双模通信&#xff0c;可作为物联网节点&#xff0c;实现物联网应用。同时掌控板上集成了OLED…

Android 设备兼容性使用详解

和你一起终身学习&#xff0c;这里是程序员Android 经典好文推荐&#xff0c;通过阅读本文&#xff0c;您将收获以下知识点: 一、设备兼容性分类二、硬件设备兼容三、软件 APP 兼容四、兼容不同语言五、兼容不同分辨率六、兼容不同屏幕方向布局七、兼容不同硬件 Feature八、兼容…

TortoiseGit安装与配置

注&#xff1a;在安装TortoiseGit之前我已经安装了git工具。 二、Git的诞生及环境配置_tortoisegit安装包_朱嘉鼎的博客-CSDN博客 1、TortoiseGit简介 TortoiseGit是基于TortoiseSVN的Git版本的Windows Shell界面。它是开源的&#xff0c;可以完全免费使用。 TortoiseGit 支持…

redis突然变慢问题定位

CPU 相关&#xff1a;使用复杂度过高命令、数据的持久化&#xff0c;都与耗费过多的 CPU 资源有关 内存相关&#xff1a;bigkey 内存的申请和释放、数据过期、数据淘汰、碎片整理、内存大页、内存写时复制都与内存息息相关 磁盘相关&#xff1a;数据持久化、AOF 刷盘策略&…

v.sqlflow.cn 上线试用

马哈鱼数据血缘工具从2023年8月开始开通国内云版本的服务&#xff0c;相比国外版本&#xff0c;访问速度有很大的提升&#xff0c;访问域名为 https://v.sqlflow.cn. 2023年8月和9月注册的用户可免费获得价值 3000 元的一年高级帐户&#xff0c;可以使用马哈鱼数据血缘工具全部…

DCGAN对抗网络用于生成动漫卡通人物(Python代码)

DCGAN全称Deep Convolutional Generative Adversarial Networks&#xff0c;中文名深度卷积对抗网络。 1.1 DCGAN的特点 DCGAN除了G网络与CNN不同之外&#xff0c;它还有以下的不同&#xff1a; 1.取消所有pooling层。G网络中使用转置卷积&#xff08;transposed convolutiona…

集合中的数据结构

栈 先进后出入口跟出口在同一侧 队列 先进先出入口跟出口在不同的一层 数组 查询快、增删慢查询快是因为数组的地址是连续的&#xff0c;我们通过数组的首地址就可以找到数组&#xff0c;之后通过数组的下标就可以访问数组的每一个元素。增删慢是因为数组的长度是固定的&…

计算机论文中名词翻译和解释笔记

看论文中一些英文的简写不知道中文啥意思&#xff0c;或者一个名词不知道啥意思。 于是自己做了一个个人总结。 持续更新 目录 SoftmaxDeep Learning(深度学习)循环神经网络(Recurrent Neural Network简称 RNN)损失函数/代价函数(Loss Function)基于手绘草图的三维模型检索(Ske…

区块链 2.0笔记

区块链 2.0 以太坊概述 相对于比特币的几点改进 缩短出块时间至10多秒ghost共识机制mining puzzle BTC:计算密集型ETH&#xff1a;memory-hard(限制ASIC) proof of work->proof of stake对智能合约的支持 BTC&#xff1a;decentralized currencyETH&#xff1a;decentral…

一篇文章搞定克拉美罗界(CRB)

起因&#xff1a; 二郎最近在研究LBL&#xff08;长基线&#xff09;定位&#xff0c;大部分论文都提到了文中算法获得的方差接近CRB&#xff0c;所以自己的算法性能较好。于是二郎就想知道克拉美罗界是什么意思&#xff0c;以及能应用的场景。 经过&#xff1a; 1&#xff…

python整理

Python 整理&#xff08;更新中&#xff09; 一、环境搭建 1- 下载python解析器 下载地址&#xff1a;https://www.python.org/ 2- 安装解析器: 3.pycharm 安装操作 1- 下载pycharm 下载地址: https://www.jetbrains.com/pycharm/ pycharm开发第一个Python程序 在这…

20.0 HTTP 通信

1. web开发 1.1 web开发介绍 Web指的是World Wide Web(万维网), 是一种基于互联网的信息系统. 万维网由一系列通过超文本链接相互连接的页面组成, 这些页面中包含了文本, 图像, 音频, 视频等多媒体内容. 用户可以通过浏览器访问万维网上的网页, 并通过超链接在不同页面之间导…

Flowable-中间事件-消息中间捕获事件

定义 消息中间事件指在流程中将一个消息事件作为独立的节点来运行。它是一种捕获事件&#xff0c;当流程 执行到消息中间事件时就会中断在这里&#xff0c;一直等待被触发&#xff0c;直接到该事件接收到相应的消息后&#xff0c;流 程沿后继路线继续执行。消息事件是一种引用…

网络编程(10) : 从connect到三次握手建立连接,再从close到四次挥手断开连接

1、TCP前置知识 1.1什么是TCP TCP 是面向连接的、可靠的、基于字节流的传输层通信协议。 面向连接&#xff1a;必须是一对一建立连接后才能通信可靠的&#xff1a;无论网络链路出现怎么样的变化&#xff0c;TCP可以保证报文一定能被对端收到字节流&#xff1a;流式协议&#…

QGraphicsView实现简易地图1『加载离线瓦片地图』

最简单粗暴的加载方式&#xff0c;将每一层级的所有瓦片地图全部加载 注&#xff1a;该方式仅能够在瓦片地图层级较低时使用&#xff0c;否则卡顿&#xff01;&#xff01;&#xff01; 瓦片地图数据来源&#xff1a;水经注-高德地图-卫星地图 瓦片地图瓦片大小&#xff1a;25…

损失函数篇 | YOLOv8 更换损失函数之 MPDIoU | 《2023 一种用于高效准确的边界框回归的损失函数》

论文地址:https://arxiv.org/pdf/2307.07662v1.pdf 边界框回归(Bounding Box Regression,BBR)在目标检测和实例分割中得到了广泛应用,是目标定位的重要步骤。然而,对于边界框回归的大多数现有损失函数来说,当预测的边界框与真值边界框具有相同的长宽比,但宽度和高度的…

想学Python高级编程?必须了解这个小技巧:match-case!

大家好&#xff0c;这里是程序员晚枫&#xff0c;小破站/知乎/小红书/抖音都叫这个名字。 上次给大家分享了Python高级编程第一讲&#xff1a;从使用类型提示开始 &#xff1b;今天分享Python高级编程第二讲&#xff1a;深入解析Python中switch case的使用方法。 写在前面 分…