Spark---DataFrame存储、Spark UDF函数、UDAF函数

news2025/1/12 10:47:36

四、DataFrame存储+Spark UDF函数

1、储存DataFrame

1)、将DataFrame存储为parquet文件

2)、将DataFrame存储到JDBC数据库

3)、将DataFrame存储到Hive表

2、UDF:用户自定义函数

可以自定义类实现UDFX接口

java:

SparkConf conf = new SparkConf();
conf.setMaster("local");
conf.setAppName("udf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType,true));

StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD,schema);
df.registerTempTable("user");

/**
 * 根据UDF函数参数的个数来决定是实现哪一个UDF  UDF1,UDF2。。。。UDF1xxx
 */
sqlContext.udf().register("StrLen", new UDF1<String,Integer>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Integer call(String t1) throws Exception {
             return t1.length();
	}
}, DataTypes.IntegerType);
sqlContext.sql("select name ,StrLen(name) as length from user").show();

//sqlContext.udf().register("StrLen",new UDF2<String, Integer, Integer>() {
//
//	/**
//	 * 
//	 */
//	private static final long serialVersionUID = 1L;
//
//	@Override
//	public Integer call(String t1, Integer t2) throws Exception {
//return t1.length()+t2;
//	}
//} ,DataTypes.IntegerType );
//sqlContext.sql("select name ,StrLen(name,10) as length from user").show();

sc.stop();	

scala:

1.val spark = SparkSession.builder().master("local").appName("UDF").getOrCreate()
2.val nameList: List[String] = List[String]("zhangsan", "lisi", "wangwu", "zhaoliu", "tianqi")
3.import spark.implicits._
4.val nameDF: DataFrame = nameList.toDF("name")
5.nameDF.createOrReplaceTempView("students")
6.nameDF.show()
7.
8.spark.udf.register("STRLEN",(name:String)=>{
9.name.length
10.})
11.spark.sql("select name ,STRLEN(name) as length from students order by length desc").show(100)

五、UDAF函数

1、UDAF:用户自定义聚合函数

1)、实现UDAF函数如果要自定义类要继承

UserDefinedAggregateFunction类

2)、UDAF原理图

java:

SparkConf conf = new SparkConf();
conf.setMaster("local").setAppName("udaf");
JavaSparkContext sc = new JavaSparkContext(conf);
SQLContext sqlContext = new SQLContext(sc);
JavaRDD<String> parallelize = sc.parallelize(Arrays.asList("zhansan","lisi","wangwu","zhangsan","zhangsan","lisi"));
JavaRDD<Row> rowRDD = parallelize.map(new Function<String, Row>() {

	/**
	 * 
	 */
	private static final long serialVersionUID = 1L;

	@Override
	public Row call(String s) throws Exception {
              return RowFactory.create(s);
	}
});

List<StructField> fields = new ArrayList<StructField>();
fields.add(DataTypes.createStructField("name", DataTypes.StringType, true));
StructType schema = DataTypes.createStructType(fields);
DataFrame df = sqlContext.createDataFrame(rowRDD, schema);
df.registerTempTable("user");
/**
 * 注册一个UDAF函数,实现统计相同值得个数
 * 注意:这里可以自定义一个类继承UserDefinedAggregateFunction类也是可以的
 */
sqlContext.udf().register("StringCount", new UserDefinedAggregateFunction() {
	
   /**
    * 
    */
   private static final long serialVersionUID = 1L;
   /**
    * 更新 可以认为一个一个地将组内的字段值传递进来 实现拼接的逻辑
    * buffer.getInt(0)获取的是上一次聚合后的值
    * 相当于map端的combiner,combiner就是对每一个map task的处理结果进行一次小聚合 
    * 大聚和发生在reduce端.
    * 这里即是:在进行聚合的时候,每当有新的值进来,对分组后的聚合如何进行计算
    */
   @Override
   public void update(MutableAggregationBuffer buffer, Row arg1) {
         buffer.update(0, buffer.getInt(0)+1);

   }
   /**
    * 合并 update操作,可能是针对一个分组内的部分数据,在某个节点上发生的 但是可能一个分组内的数据,会分布在多个节点上处理
    * 此时就要用merge操作,将各个节点上分布式拼接好的串,合并起来
    * buffer1.getInt(0) : 大聚和的时候 上一次聚合后的值       
    * buffer2.getInt(0) : 这次计算传入进来的update的结果
    * 这里即是:最后在分布式节点完成后需要进行全局级别的Merge操作
    */
   @Override
   public void merge(MutableAggregationBuffer buffer1, Row buffer2) {
     buffer1.update(0, buffer1.getInt(0) + buffer2.getInt(0));
   }
   /**
    * 指定输入字段的字段及类型
    */
   @Override
   public StructType inputSchema() {
     return DataTypes.createStructType(
      Arrays.asList(DataTypes.createStructField("name", 
          DataTypes.StringType, true)));
   }
   /**
    * 初始化一个内部的自己定义的值,在Aggregate之前每组数据的初始化结果
    */
   @Override
   public void initialize(MutableAggregationBuffer buffer) {
         buffer.update(0, 0);
   }
   /**
    * 最后返回一个和DataType的类型要一致的类型,返回UDAF最后的计算结果
    */
   @Override
   public Object evaluate(Row row) {
      return row.getInt(0);
   }
   
   @Override
   public boolean deterministic() {
     //设置为true
     return true;
   }
   /**
    * 指定UDAF函数计算后返回的结果类型
    */
   @Override
   public DataType dataType() {
      return DataTypes.IntegerType;
   }
   /**
    * 在进行聚合操作的时候所要处理的数据的结果的类型
    */
   @Override
   public StructType bufferSchema() {
       return 
       DataTypes.createStructType(
   Arrays.asList(DataTypes.createStructField("bf", DataTypes.IntegerType, 
            true)));
   }
   
});

sqlContext.sql("select name ,StringCount(name) from user group by name").show();

sc.stop();

scala:

1.class MyCount extends UserDefinedAggregateFunction{
2.  //输入数据的类型
3.  override def inputSchema: StructType =    StructType(List[StructField](StructField("xx",StringType,true)))
4.
5.  //在聚合过程中处理的数据类型
6.  override def bufferSchema: StructType =   StructType(List[StructField](StructField("xx",IntegerType,true)))
7.
8.  //最终返回值的类型,与evaluate返回的值保持一致
9.  override def dataType: DataType = IntegerType
10.
11.  //多次运行数据是否一致
12.  override def deterministic: Boolean = true
13.
14.  //每个分区中每组key 对应的初始值
15.  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0,0)
16.
17.  //每个分区中,每个分组内进行聚合操作
18.  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
19.    buffer.update(0,buffer.getInt(0) + 1)
20.  }
21.
22.  //不同的分区中相同的key的数据进行聚合
23.  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
24.    buffer1.update(0,buffer1.getInt(0)+buffer2.getInt(0))
25.  }
26.
27.  //聚合之后,每个分组最终返回的值,类型要和dataType 一致
28.  override def evaluate(buffer: Row): Any = buffer.getInt(0)
29.}
30.
31.object Test {
32.  def main(args: Array[String]): Unit = {
33.    val session = SparkSession.builder().appName("jsonData").master("local").getOrCreate()
34.    val list = List[String]("zhangsan","lisi","wangwu","zhangsan","lisi","zhangsan")
35.
36.    import session.implicits._
37.    val frame = list.toDF("name")
38.    frame.createTempView("mytable")
39.
40.    session.udf.register("MyCount",new MyCount())
41.
42.    val result = session.sql("select name,MyCount(name) from mytable group by name")
43.    result.show()
44.
45.  }
46.}
47.

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1294833.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

fastadmin打印页面

如下图选中订单号进行打印 html中增加代码 <div id"toolbar" class"toolbar"><a href"javascript:;" class"btn btn-primary btn-refresh" title"{:__(Refresh)}" ><i class"fa fa-refresh">&l…

喜讯频传|海川润泽获物联网场景应用品牌企业、国家高新技术企业认定

物联网场景应用品牌企业 12月5日至6日&#xff0c;以“物联中国 数智雄安”为主题的“千企雄安行&#xff1a;2023物联网产业品牌大会”在雄安新区成功举办。大会发起了“物联网场景应用品牌企业”的评选活动&#xff0c;面向农业、城市、工业、医疗、园区、建筑、文旅、能源、…

100V耐压 3A峰值电流 60V 72V输入 输出12V 1.5A内置MOS 大电流降压芯片

100V耐压 3A峰值电流 60V 72V输入 输出12V 1.5A 内置MOS 大电流降压芯片

gitee对接使用

1.创建一个文件夹 2.进入Gitee接受对方项目编辑 3.打开终端初始化一开始创建的文件夹 git init 3.1打开终端 3.2输入git.init 4.克隆对方的项目 4.1进入Gitee复制对方项目的路径 4.2在编辑器终端内克隆对方项目 git clone 网址 如此你的编辑器就会出现对方的项目 …

公司敏感数据被上传Github,吓得我赶紧改提交记录

大家好&#xff0c;我是小富&#xff5e; 说个事吧&#xff01;最近公司发生了一个事故&#xff0c;有同事不小心把敏感数据上传到了GitHub上&#xff0c;结果被安全部门扫描出来了。这件事导致公司对所有员工进行了一次数据安全的培训。对于这个事我相信&#xff0c;有点工作…

106.进程控制(结束、孤儿、僵尸进程)以及进程回收

目录 结束进程 孤儿进程 僵尸进程 进程回收 wait() waitpid 进程控制是指在操作系统中对进程进行创建、终止、挂起、唤醒以及进程之间的同步、通信等操作的管理。 结束进程 exit() 和 _exit() 函数都用于终止一个进程&#xff0c;但它们之间有一些重要的区别&#xf…

OpenAI的Sam Altman,获《时代》2023年度最佳CEO

12月7日&#xff0c;《时代》周刊在官网公布了2023年最佳CEO——OpenAI的Sam Altman。 此外&#xff0c;梅西入选了年度最佳运动员&#xff0c;Taylor Swift入选年度最佳人物&#xff0c;Alex Newell获年度突破奖。 《时代》周刊曾在今年的9月8日发布了“2023年AI领域最有影响…

Stable Diffusion XL on diffusers

Stable Diffusion XL on diffusers 翻译自&#xff1a;https://huggingface.co/docs/diffusers/using-diffusers/sdxl v0.24.0 非逐字翻译 Stable Diffusion XL (SDXL) 是一个强大的图像生成模型&#xff0c;其在上一代 Stable Diffusion 的基础上主要做了如下优化&#xff1a;…

超详细介绍Ubuntu系统安装CUDA和cuDNN【一站式服务!!!】

文章目录 简介1.安装显卡驱动查看显卡型号下载并安装NVIDIA驱动使用Ubuntu自带的软件和更新&#xff08;Software&Updates&#xff09;工具安装【博主使用的这种方式&#xff0c;推荐】自行下载使用命令行安装【自由度更高&#xff0c;大佬自行尝试】 2.下载并安装CUDA3.下…

docker容器_自定义上传jenkins镜像(Dockerfile实现)

1.创建jenkins目录&#xff0c;并上传相应的包 mkdir /jenkins/ 2.创建一个Dockerfile文件 FROM daocloud.io/library/centos:7#把当前目录下的jenkins.war包传到内部容器的/ 下 ADD ./jenkins.war /#把当前目录下的jdk传到内部容器的/opt/,并解压 ADD ./jdk-11.0.19_linu…

【软件推荐】文本转语音,语音转wav,导入ue5

文字转语音 在线免费文字转语音 - TTSMaker官网 | 马克配音https://ttsmaker.cn/ 文件转换器 语音转wav Convertio — 文件转换器https://convertio.co/zh/

前端学习微信小程序开发

1.微信小程序项目结构 2.WXML和HTML的区别 3.WXSS与CSS的区别 4.小程序中的.js文件 5.小程序的宿主环境 宿主环境是指程序运行所必须的依赖环境&#xff0c;因此手机微信时小程序的宿主环境。小程序宿主环境包含了通信模型、运行机制、组件、API。 &#xff08;1&#xff09;…

基于jsp+servlet的图书管理系统

基于jspservlet的图书管理系统演示地址为 图书馆后台管理系统 用户名:mr ,密码:123 图书馆管理系统主要的目的是实现图书馆的信息化管理。图书馆的主要业务就是新书的借阅和归还&#xff0c; 因此系统最核心的功能便是实现图书的借阅和归还。此外&#xff0c;还需要提供图书…

小视频怎么做成二维码?视频二维码3步生成

在日常工作和生活中经常会看到各种类型的小视频、短视频&#xff0c;比如网页、抖音等等的视频都是可以下载查看的。当我们想要将下载视频分享给多个人看时&#xff0c;生成二维码的方式会更加的方便&#xff0c;那么视频如何生成二维码呢&#xff1f;下面就将快捷生成二维码的…

基于SpringBoot+Vue学生成绩管理系统前后端分离(源码+数据库)

一、项目简介 本项目是一套基于SpringBootVue学生成绩管理系统&#xff0c;主要针对计算机相关专业的正在做bishe的学生和需要项目实战练习的Java学习者。 包含&#xff1a;项目源码、数据库脚本等&#xff0c;该项目可以直接作为bishe使用。 项目都经过严格调试&#xff0c;确…

【送书活动四期】被GitHub 要求强制开启 2FA 双重身份验证,我该怎么办?

记得是因为fork了OpenZeppelin/openzeppelin-contracts的项目&#xff0c;之后就被GitHub 要求强制开启 2FA 双重身份验证了&#xff0c;一拖再拖&#xff0c;再过几天帐户操作将受到限制了&#xff0c;只能去搞一下了 目录 2FA是什么为什么要开启 2FA 验证GitHub 欲在整个平台…

【每日一题】重新规划路线

文章目录 Tag题目来源题目解读解题思路方法一&#xff1a;深度优先搜索方法二&#xff1a;广度优先搜索 写在最后 Tag 【深搜】【广搜】【树】【2023-12-07】 题目来源 1466. 重新规划路线 题目解读 题目给定一张由 n个点&#xff08;使用 0 到 n−1 编号&#xff09;&#…

优秀案例 | 元宇宙双语财经科技主播“舒望”主持首届粤港澳大湾区元宇宙国际传播论坛

12月6日&#xff0c;由南方财经全媒体集团指导、大湾区元宇宙国际传播实验室(GBA MIC Lab&#xff09;主办、南财国际传播中心和21世纪经济报道共同承办&#xff0c;以“多元共创开放共享”为主题的首届粤港澳大湾区元宇宙国际传播论坛在广州隆重开幕。 “立足湾区&#xff0c;…

当然热门的原创改写改写大全【2023最新】

在信息时代&#xff0c;随着科技的不断发展&#xff0c;改写软件逐渐成为提高文案质量和写作效率的重要工具。本文将专心分享一些好用的改写软件&#xff0c;其中包括百度文心一言智能写作以及147SEO改写软件。这些工具不仅支持批量改写&#xff0c;而且在发布到各大平台后能够…

“福利”还是“陷阱”?公司给员工放假3个月引发劳动权益争议

近日&#xff0c;广东佛山一家玻璃制造公司的长达3个月放假通知引发广泛关注。这一决策引发了社会对员工福利和公司经营平衡的深入思考。公司表示&#xff0c;此次决策是为了维修老化设备&#xff0c;但随之而来的疑虑则主要集中在员工的收入和劳动权益问题上。 公司表示&…