Clojure 实战(4):编写 Hadoop MapReduce 脚本

news2025/1/12 9:45:31

Hadoop简介

众所周知,我们已经进入了大数据时代,每天都有PB级的数据需要处理、分析,从中提取出有用的信息。Hadoop就是这一时代背景下的产物。它是Apache基金会下的开源项目,受Google两篇论文的启发,采用分布式的文件系统HDFS,以及通用的MapReduce解决方案,能够在数千台物理节点上进行分布式并行计算。

对于Hadoop的介绍这里不再赘述,读者可以访问其官网,或阅读Hadoop权威指南。

Hadoop项目是由Java语言编写的,运行在JVM之上,因此我们可以直接使用Clojure来编写MapReduce脚本,这也是本文的主题。Hadoop集群的搭建不在本文讨论范围内,而且运行MapReduce脚本也无需搭建测试环境。

clojure-hadoop类库

Hadoop提供的API是面向Java语言的,如果不想在Clojure中过多地操作Java对象,那就需要对API进行包装(wrapper),好在已经有人为我们写好了,它就是clojure-hadoop。

从clojure-hadoop的项目介绍中可以看到,它提供了不同级别的包装,你可以选择完全规避对Hadoop类型和对象的操作,使用纯Clojure语言来编写脚本;也可以部分使用Hadoop对象,以提升性能(因为省去了类型转换过程)。这里我们选择前一种,即完全使用Clojure语言。

示例1:Wordcount

Wordcount,统计文本文件中每个单词出现的数量,可以说是数据处理领域的“Hello, world!”。这一节我们就通过它来学习如何编写MapReduce脚本。

Leiningen 2

前几章我们使用的项目管理工具lein是1.7版的,而前不久Leiningen 2已经正式发布了,因此从本章开始我们的示例都会基于新版本。新版lein的安装过程也很简单:

$ cd ~/bin
$ wget https://raw.github.com/technomancy/leiningen/stable/bin/lein
$ chmod 755 lein
$ lein repl
user=>

其中,lein repl这一步会下载lein运行时需要的文件,包括Clojure 1.4。

新建项目

$ lein new cia-hadoop

编辑project.clj文件,添加依赖项clojure-hadoop "1.4.1",尔后执行lein deps

Map和Reduce

MapReduce,简称mapred,是Hadoop的核心概念之一。可以将其理解为处理问题的一种方式,即将大问题拆分成多个小问题来分析和解决,最终合并成一个结果。其中拆分的过程就是Map,合并的过程就是Reduce。

以Wordcount为例,将一段文字划分成一个个单词的过程就是Map。这个过程是可以并行执行的,即将文章拆分成多个段落,每个段落分别在不同的节点上执行划分单词的操作。这个过程结束后,我们便可以统计各个单词出现的次数,这也就是Reduce的过程。同样,Reduce也是可以并发执行的。整个过程如下图所示:

Wordcount
中间Shuffle部分的功能是将Map输出的数据按键排序,交由Reduce处理。整个过程全部由Hadoop把控,开发者只需编写MapReduce函数,这也是Hadoop强大之处。

编写Map函数

在本示例中,我们处理的原始数据是文本文件,Hadoop会逐行读取并调用Map函数。Map函数会接收到两个参数:key是一个长整型,表示该行在整个文件中的偏移量,很少使用;value则是该行的内容。以下是将一行文字拆分成单词的Map函数:

;; src/cia_hadoop/wordcount.clj

(ns cia-hadoop.wordcount
  (:require [clojure-hadoop.wrap :as wrap]
            [clojure-hadoop.defjob :as defjob])
  (:import [java.util StringTokenizer])
  (:use clojure-hadoop.job))

(defn my-map [key value]
  (map (fn [token] [token 1])
       (enumeration-seq (StringTokenizer. value))))

可以看到,这是一个纯粹的Clojure函数,并没有调用Hadoop的API。函数体虽然只有两行,但还是包含了很多知识点的:

(map f coll)函数的作用是将函数f应用到序列coll的每个元素上,并返回一个新的序列。如(map inc [1 2 3])会对每个元素做加1操作(参考(doc inc)),返回[2 3 4]。值得一提的是,map函数返回的是一个惰性序列(lazy sequence),即序列元素不会一次性完全生成,而是在遍历过程中逐个生成,这在处理元素较多的序列时很有优势。

map函数接收的参数自然不会只限于Clojure内部函数,我们可以将自己定义的函数传递给它:

(defn my-inc [x]
  (+ x 1))

(map my-inc [1 2 3]) ; -> [2 3 4]

我们更可以传递一个匿名函数给map。上一章提过,定义匿名函数的方式是使用fn,另外还可使用#(...)简写:

(map (fn [x] (+ x 1)) [1 2 3])
(map #(+ % 1) [1 2 3])

对于含有多个参数的情况:

((fn [x y] (+ x y)) 1 2) ; -> 3
(#(+ %1 %2) 1 2) ; -> 3

my-map中的(fn [token] [token 1])即表示接收参数token,返回一个向量[token 1],其作用等价于#(vector % 1)。为何是[token 1],是因为Hadoop的数据传输都是以键值对的形式进行的,如["apple" 1]即表示“apple”这个单词出现一次。

StringTokenizer则是用来将一行文字按空格拆分成单词的。他的返回值是Enumeration类型,Clojure提供了enumeration-seq函数,可以将其转换成序列进行操作。

所以最终my-map函数的作用就是:将一行文字按空格拆分成单词,返回一个形如[["apple" 1] ["orange" 1] ...]的序列。

编写Reduce函数

从上文的图表中可以看到,Map函数处理完成后,Hadoop会对结果按照键进行排序,并使用key, [value1 value2 ...]的形式调用Reduce函数。在clojure-hadoop中,Reduce函数的第二个参数是一个函数,其返回结果才是值的序列:

(defn my-reduce [key values-fn]
  [[key (reduce + (values-fn))]])

和Map函数相同,Reduce函数的返回值也是一个序列,其元素是一个个[key value]。注意,函数体中的(reduce f coll)是Clojure的内置函数,其作用是:取coll序列的第1、2个元素作为参数执行函数f,将结果和coll序列的第3个元素作为参数执行函数f,依次类推。因此(reduce + [1 2 3])等价于(+ (+ 1 2) 3)

定义脚本

有了Map和Reduce函数,我们就可以定义一个完整的脚本了:

(defjob/defjob job
  :map my-map
  :map-reader wrap/int-string-map-reader
  :reduce my-reduce
  :input-format :text
  :output-format :text
  :compress-output false
  :replace true
  :input "README.md"
  :output "out-wordcount")

简单说明一下这些配置参数::map:reduce分别指定Map和Reduce函数;map-reader表示读取数据文件时采用键为int、值为string的形式;:input-formatcompress-output指定了输入输出的文件格式,这里采用非压缩的文本形式,方便阅览;:replace表示每次执行时覆盖上一次的结果;:input:output则是输入的文件和输出的目录。

执行脚本

我们可以采用Clojure的测试功能来执行脚本:

;; test/cia_hadoop/wordcount_test.clj

(ns cia-hadoop.wordcount-test
  (:use clojure.test
        clojure-hadoop.job
        cia-hadoop.wordcount))

(deftest test-wordcount
  (is (run job)))

尔后执行:

$ lein test cia-hadoop.wordcount-test
...
13/02/14 00:25:52 INFO mapred.JobClient:  map 0% reduce 0%
..
13/02/14 00:25:58 INFO mapred.JobClient:  map 100% reduce 100%
...
$ cat out-wordcount/part-r-00000
...
"java"  1
"lein"	3
"locally"	2
"on"	1
...

如果想要将MapReduce脚本放到Hadoop集群中执行,可以采用以下命令:

$ lein uberjar
$ hadoop jar target/cia-hadoop-0.1.0-SNAPSHOT-standalone.jar clojure_hadoop.job -job cia-hadoop.wordcount/job

示例2:统计浏览器类型

下面我们再来看一个更为实际的示例:从用户的访问日志中统计浏览器类型。

需求概述

用户访问网站时,页面中会有段JS请求,将用户的IP、User-Agent等信息发送回服务器,并记录成文本文件的形式:

{"stamp": "1346376858286", "ip": "58.22.113.189", "agent": "Mozilla/5.0 (iPad; CPU OS 5_0_1 like Mac OS X) AppleWebKit/534.46 (KHTML, like Gecko) Version/5.1 Mobile/9A405 Safari/7534.48.3"}
{"stamp": "1346376858354", "ip": "116.233.51.2", "agent": "Mozilla/5.0 (compatible; MSIE 9.0; Windows NT 6.1; WOW64; Trident/5.0)"}
{"stamp": "1346376858365", "ip": "222.143.28.2", "agent": "Mozilla/4.0 (compatible; MSIE 8.0; Windows NT 5.1; Trident/4.0)"}
{"stamp": "1346376858423", "ip": "123.151.144.40", "agent": "Mozilla/5.0 (Windows NT 5.1) AppleWebKit/536.11 (KHTML, like Gecko) Chrome/20.0.1132.57 Safari/536.11"}

我们要做的是从User-Agent中统计用户使用的浏览器类型所占比例,包括IE、Firefox、Chrome、Opera、Safari、以及其它。

User-Agent中的浏览器类型

由于一些历史原因,User-Agent中的信息是比较凌乱的,浏览器厂商会随意添加信息,甚至仿造其它浏览器的内容。因此在过滤时,我们需要做些额外的处理。Mozilla的这篇文章很好地概括了如何从User-Agent中获取浏览器类型,大致如下:

  • IE: MSIE xyz
  • Firefox: Firefox/xyz
  • Chrome: Chrome/xyz
  • Opera: Opera/xyz
  • Safari: Safari/xyz, 且不包含 Chrome/xyz 和 Chromium/xyz

解析JSON字符串

Clojure除了内置函数之外,周边还有一个名为clojure.contrib的类库,其中囊括了各类常用功能,包括JSON处理。目前clojure.contrib中的各个组件已经分开发行,读者可以到 https://github.com/clojure 中浏览。

处理JSON字符串时,首先在项目声明文件中添加依赖项[org.clojure/data.json "0.2.1"],然后就能使用了:

user=> (require '[clojure.data.json :as json])
user=> (json/read-str "{\"a\":1,\"b\":2}")
{"a" 1, "b" 2}
user=> (json/write-str [1 2 3])
"[1,2,3]"

正则表达式

Clojure提供了一系列的内置函数来使用正则表达式,其实质上是对java.util.regex命名空间的包装。

user=> (def ptrn #"[0-9]+") ; #"..."是定义正则表达式对象的简写形式
user=> (def ptrn (re-pattern "[0-9]+")) ; 和上式等价
user=> (re-matches ptrn "123") ; 完全匹配
"123"
user=> (re-find ptrn "a123") ; 返回第一个匹配项
"123"
user=> (re-seq ptrn "a123b456") ; 返回匹配项序列(惰性序列)
("123" "456")
user=> (re-find #"([a-z]+)/([0-9]+)" "a/1") ; 子模式
["a/1" "a" "1"]
user=> (def m (re-matcher #"([a-z]+)/([0-9]+)" "a/1 b/2")) ; 返回一个Matcher对象
user=> (re-find m) ; 返回第一个匹配
["a/1" "a" "1"]
user=> (re-groups m) ; 获取当前匹配
["a/1" "a" "1"]
user=> (re-find m) ; 返回下一个匹配,或nil
["b/2" "b" "2"]

Map函数

(defn json-decode [s]
  (try
    (json/read-str s)
    (catch Exception e)))

(def rule-set {"ie" (partial re-find #"(?i)MSIE [0-9]+")
               "chrome" (partial re-find #"(?i)Chrome/[0-9]+")
               "firefox" (partial re-find #"(?i)Firefox/[0-9]+")
               "opera" (partial re-find #"(?i)Opera/[0-9]+")
               "safari" #(and (re-find #"(?i)Safari/[0-9]+" %)
                              (not (re-find #"(?i)Chrom(e|ium)/[0-9]+" %)))
               })

(defn get-type [ua]
  (if-let [rule (first (filter #((second %) ua) rule-set))]
    (first rule)
    "other"))

(defn my-map [key value]
  (when-let [ua (get (json-decode value) "agent")]
    [[(get-type ua) 1]]))

json-decode函数是对json/read-str的包装,当JSON字符串无法正确解析时返回nil,而非异常终止。

rule-set是一个map类型,键是浏览器名称,值是一个函数,这里都是匿名函数。partial用于构造新的函数,(partial + 1)#(+ 1 %)(fn [x] (+ 1 x))是等价的,可以将其看做是为函数+的第一个参数定义了默认值。正则表达式中的(?i)表示匹配时不区分大小写。

get-type函数中,(filter #((second %) ua) rule-set)会用rule-set中的正则表达式逐一去和User-Agent字符串进行匹配,并返回第一个匹配项,也就是浏览器类型;没有匹配到的则返回other

单元测试

我们可以编写一组单元测试来检验上述my-map函数是否正确:

;; test/cia_hadoop/browser_test.clj

(ns cia-hadoop.browser-test
  (:use clojure.test
        clojure-hadoop.job
        cia-hadoop.browser))

(deftest test-my-map
  (is (= [["ie" 1]] (my-map 0 "{\"agent\":\"MSIE 6.0\"}")))
  (is (= [["chrome" 1]] (my-map 0 "{\"agent\":\"Chrome/20.0 Safari/6533.2\"}")))
  (is (= [["other" 1]] (my-map 0 "{\"agent\":\"abc\"}")))
  (is (nil? (my-map 0 "{"))))

(deftest test-browser
  (is (run job)))

其中deftestis都是clojure.test命名空间下定义的。

$ lein test cia-hadoop.browser-test

小结

本章我们简单介绍了Hadoop这一用于大数据处理的开源项目,以及如何借助clojure-hadoop类库编写MapReduce脚本,并在本地和集群上运行。Hadoop已经将大数据处理背后的种种细节都包装了起来,用户只需编写Map和Reduce函数,而借助Clojure语言,这一步也变的更为轻松和高效。Apache Hadoop是一个生态圈,其周边有很多开源项目,像Hive、HBase等,这里再推荐一个使用Clojure语言在Hadoop上执行查询的工具:cascalog。它的作者是Nathan Marz,也是我们下一章的主题——Storm实时计算框架——的作者。

本文涉及到的源码可以到 https://github.com/jizhang/blog-demo/tree/master/cia-hadoop 中查看。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1351390.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

算法巡练day03Leetcode203移除链表元素707设计链表206反转链表

今日学习的文章视频链接 https://www.bilibili.com/video/BV1nB4y1i7eL/?vd_source8272bd48fee17396a4a1746c256ab0ae https://programmercarl.com/0707.%E8%AE%BE%E8%AE%A1%E9%93%BE%E8%A1%A8.html#%E7%AE%97%E6%B3%95%E5%85%AC%E5%BC%80%E8%AF%BE 链表理论基础 见我的博…

擎创技术流 |如何使用eBPF监控NAT转换

一、NAT简介 Linux NAT(Network Address Translation)转换是一种网络技术,用于将一个或多个私有网络内的IP地址转换为一个公共的IP地址,以便与互联网通信。 图源于网络 在k8s业务场景中,业务组件之间的关系十分复杂. …

uni-app 前后端调用实例 基于Springboot 上拉分页实现

锋哥原创的uni-app视频教程: 2023版uniapp从入门到上天视频教程(Java后端无废话版),火爆更新中..._哔哩哔哩_bilibili2023版uniapp从入门到上天视频教程(Java后端无废话版),火爆更新中...共计23条视频,包括:第1讲 uni…

音视频通信

文章目录 一、音视频通信流程二、流媒体协议1、RTSP2、RTMP3、HLS4、WebRTC 一、音视频通信流程 音视频通信完整流程有如下几个环节:采集、编码、前后处理、传输、解码、缓冲、渲染等。 每一个细分环节,还有更细分的技术模块。比如,前后处…

【数据结构】二叉搜索(查找/排序)树

一、二叉搜索树基本概念 1、定义 二叉搜索树,又称为二叉排序树,二叉查找树,它满足如下四点性质: 1)空树是二叉搜索树; 2)若它的左子树不为空,则左子树上所有结点的值均小于它根结…

使用宝塔在Linux面板搭建网站,并实现公网远程访问

文章目录 前言1. 环境安装2. 安装cpolar内网穿透3. 内网穿透4. 固定http地址5. 配置二级子域名6. 创建一个测试页面 前言 宝塔面板作为简单好用的服务器运维管理面板,它支持Linux/Windows系统,我们可用它来一键配置LAMP/LNMP环境、网站、数据库、FTP等&…

STL map容器与pair类模板(解决扫雷问题)

CSTL之Map容器 - 数据结构教程 - C语言网 (dotcpp.com)https://www.dotcpp.com/course/118CSTL之Pair类模板 - 数据结构教程 - C语言网 (dotcpp.com)https://www.dotcpp.com/course/119 刷到一个扫雷的题目,之前没有玩怎么过扫雷,于是我就去玩了玩…

【计算机设计大赛作品】豆瓣电影数据挖掘可视化—信息可视化赛道获奖项目深入剖析【可视化项目案例-22】

文章目录 一.【计算机设计大赛作品】豆瓣电影数据挖掘可视化—信息可视化赛道获奖项目深入剖析【可视化项目案例-22】1.1 项目主题:豆瓣电影二.代码剖析2.1 项目效果展示2.2 服务端代码剖析2.3 数据分析2.4 数据评分三.寄语四.本案例完整源码下载一.【计算机设计大赛作品】豆瓣…

Go后端开发 -- main函数 变量 常量 函数

Go后端开发 – main函数 & 变量 & 常量 & 函数 文章目录 Go后端开发 -- main函数 & 变量 & 常量 & 函数一、第一个main函数1.创建工程2.main函数解析 二、变量声明1.单变量声明2.多变量声明 三、常量1.常量的定义2.优雅的常量 iota 四、函数1.函数返回…

2024.1.2 安装JDK和Eclipse,并配置java编译环境

2024.1.2 安装JDK和Eclipse,并配置java编译环境 一直对java一知半解,利用春节前一个月时间补补课。 一、安装jdk 首先在oracle官网上下载jdk,这里选jdk17,选择第二项直接安装,第一项是压缩文件,带有一些…

Noisy DQN 跑 CartPole-v1

gym 0.26.1 CartPole-v1 NoisyNet DQN NoisyNet 就是把原来Linear里的w/b 换成 mu sigma * epsilon, 这是一种非常简单的方法,但是可以显著提升DQN的表现。 和之前最原始的DQN相比就是改了两个地方,一个是Linear改成了NoisyLinear,另外一个是在agent在t…

第二十七章 正则表达式

第二十七章 正则表达式 1.正则快速入门2.正则需求问题3.正则底层实现14.正则底层实现25.正则底层实现36.正则转义符7.正则字符匹配8.字符匹配案例19.字符匹配案例211.选择匹配符(|)12.正则限定符{n}{n,m}(1个或者多个)*(0个或者多…

创建x11vnc系统进程

为方便使用vnc,所以寻找到一个比较好用的vnc服务端那就是x11vnc,索性就创建了一个系统进程 一、环境 系统:银河麒麟v4-sp2-server 软件:x11vnc【linux下】、VNCviewer【win下】 二、安装x11vnc 1、挂载光盘源并修改apt源 mou…

生态系统服务构建生态安全格局中的实践技术应用

生态安全是指生态系统的健康和完整情况。生态安全的内涵可以归纳为:一,保持生态系统活力和内外部组分、结构的稳定与持续性;二,维持生态系统生态功能的完整性;三,面临外来不利因素时,生态系统具…

Linux用shell脚本执行乘法口诀表的两种方式

#!/bin/bash # *********************************************************# # # # * Author : 藻头男 # # * QQ邮箱 : 2322944912qq.com # …

【SpringBoot3】1.SpringBoot入门的第一个完整小项目(新手保姆版+教会打包)

目录 1 SpringBoot简单介绍1.1 SpringBoot是什么1.2 主要优点1.3 术语1.3.1 starter(场景启动器) 1.4 官方文档 2 环境说明3 实现代码3.1 新建工程与模块3.2 加入依赖3.3 主程序文件3.4 业务代码3.5 运行测试3.6 部署打包3.7 命令行运行 1 SpringBoot简单…

[足式机器人]Part2 Dr. CAN学习笔记-自动控制原理Ch1-8Lag Compensator滞后补偿器

本文仅供学习使用 本文参考: B站:DR_CAN Dr. CAN学习笔记-自动控制原理Ch1-8Lag Compensator滞后补偿器 从稳态误差入手(steady state Error) 误差 Error : E ( s ) R ( s ) − X ( s ) R ( s ) − E ( s ) ⋅ K G …

再见2023,你好2024!

大家好,我是老三,本来今天晚上打算出去转一转,陆家嘴打车实在太艰难了,一公里多的路,司机走了四十分钟,还没到,再加上身体不适,咳嗽地比较厉害,所以还是宅在酒店里&#…

.NET Core SkiaSharp 替代 System.Drawing.Common 的一些用法

在.NET 6中,微软官方建议把 System.Drawing.Common 迁移到 SkiaSharp 库。因为System.Drawing.Common 被设计为 Window 技术的精简包装器,因此其跨平台实现欠佳。 SkiaSharp是一个基于谷歌的Skia图形库(Skia.org)的用于.NET平台的…

机器学习与深度学习——使用paddle实现随机梯度下降算法SGD对波士顿房价数据进行线性回归和预测

文章目录 机器学习与深度学习——使用paddle实现随机梯度下降算法SGD对波士顿房价数据进行线性回归和预测一、任务二、流程三、完整代码四、代码解析五、效果截图 机器学习与深度学习——使用paddle实现随机梯度下降算法SGD对波士顿房价数据进行线性回归和预测 随机梯度下降&a…