Spark Streaming可以通过Socket端口监听并接收数据,然后进行相应处理。
新建NetworkWordCount.scala代码文件,请在该文件中输入如下内容:
package org.apache.spark.examples.streaming
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.storage.StorageLevel
object NetworkWordCount {
def main(args: Array[String]) {
if (args.length < 2) {
System.err.println("Usage: NetworkWordCount <hostname> <port>")
System.exit(1)
}
StreamingExamples.setStreamingLogLevels()
// Create the context with a 1 second batch size
val sparkConf = new SparkConf().setAppName("NetworkWordCount").setMaster("local[2]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create a socket stream on target ip:port and count the
// words in input stream of \n delimited text (eg. generated by 'nc')
// Note that no duplication in storage level only for running locally.
// Replication necessary in distributed scenario for fault tolerance.
val lines = ssc.socketTextStream(args(0), args(1).toInt, StorageLevel.MEMORY_AND_DISK_SER)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
}
}
退出vim编辑器。上面的代码,不能直接拿去sbt打包编辑,因为,里面有个 StreamingExamples.setStreamingLogLevels(),StreamingExamples来自另外一个代码文件,请在相同目录下再新建另外一个代码文件StreamingExamples.scala,文件内容如下:
package org.apache.spark.examples.streaming
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
/** Set reasonable logging levels for streaming if the user has not configured log4j. */
def setStreamingLogLevels() {
val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
if (!log4jInitialized) {
// We first log something to initialize Spark's default logging, then we override the
// logging level.
logInfo("Setting log level to [WARN] for streaming example." +
" To override add a custom log4j.properties to the classpath.")
Logger.getRootLogger.setLevel(Level.WARN)
}
}
}
可以看出,StreamingExamples.scala文件主要是用来对输出的日志信息进行格式设置。
打包成功以后,就可以输入以下命令启动这个程序:
spark2-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /home/songxitang/jars/simple-project_2.11-1.0.jar localhost 9999
执行上面命令后,就进入了监听状态(我们把运行这个监听程序的窗口称为监听窗口),这时,你就可以像刚才一样,新打开一个窗口作为nc窗口,启动nc程序:
nc -lk 9999
这样,就可以在nc窗口中随意输入一些单词,监听窗口就会自动获得单词数据流信息,在监听窗口每隔1秒就会打印出词频统计信息,大概会再屏幕上出现类似如下的结果:
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
(hello,1)
(world,1)
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
(hadoop,1)
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------
(spark,1)
如果要停止运行上述程序,只要按键盘Ctrl+Z键就可以了。
注意,如果你的电脑屏幕上,不是显示上面这样非常干净的信息,而是夹杂了很多乱七八糟的信息,如下:
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431100000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431120000 ms
-------------------------------------------
//这里省略若干屏幕信息,干扰你的视线
-------------------------------------------
Time: 1479431140000 ms
-------------------------------------------
遇到上面这问题,不要紧,这是和log4j的设置有关的。Log4j是Apache的一个开源项目,通过使用Log4j,我们可以控制日志信息输送的目的地是控制台、文件、GUI组件,甚至是套接口服务器、NT的事件记录器、UNIX Syslog守护进程等;我们也可以控制每一条日志的输出格式;通过定义每一条日志信息的级别,我们能够更加细致地控制日志的生成过程。最令人感兴趣的就是,这些可以通过一个配置文件来灵活地进行配置,而不需要修改应用的代码。
那么如何修改log4j的设置,把这些乱七八糟的信息给过滤掉,不要显示到屏幕上面呢?方法如下:
请新打开一个Shell窗口,进入Shell命令提示符状态,然后执行下面命令:
cd /usr/local/spark/conf
ls
这时,你可以看到一个名字为log4j.properties.template的文件,请执行如下命令复制一份到当前目录:
cp log4j.properties.template log4j.properties
这样就得到一个log4j.properties文件,然后,请使用vim编辑器打开这个文件,修改里面的一个配置选项(只修改log4j.rootCategor,其他配置选项不要修改):
//原来的原始配置如下
log4j.rootCategory=INFO,console
//请修改为下面格式
log4j.rootCategory=WARN,console
修改后,保存退出vim编辑器,就可以关闭当前终端窗口。然后,再次重新运行NetworkWordCount词频统计程序,就可以看到屏幕会输出比较纯净的信息。
在Cloudera Manager中设置Spark2 Log信息:
下面我们再前进一步,这回,我们把数据源头的产生方式修改一下,不要使用nc程序,而是采用自己编写的程序产生Socket数据源。
新建一个名称为DataSourceSocket.scala的代码文件,用来产生Socket数据源,请在该代码文件中输入下面代码:
package org.apache.spark.examples.streaming
import java.io.{PrintWriter}
import java.net.ServerSocket
import scala.io.Source
object DataSourceSocket {
def index(length: Int) = {
val rdm = new java.util.Random
rdm.nextInt(length)
}
def main(args: Array[String]) {
if (args.length != 3) {
System.err.println("Usage: <filename> <port> <millisecond>")
System.exit(1)
}
val fileName = args(0)
val lines = Source.fromFile(fileName).getLines.toList
val rowCount = lines.length
val listener = new ServerSocket(args(1).toInt)
while (true) {
val socket = listener.accept()
new Thread() {
override def run = {
println("Got client connected from: " + socket.getInetAddress)
val out = new PrintWriter(socket.getOutputStream(), true)
while (true) {
Thread.sleep(args(2).toLong)
val content = lines(index(rowCount))
println(content)
out.write(content + '\n')
out.flush()
}
socket.close()
}
}.start()
}
}
}
注意,实际上,这个时候,我们的程序目录下,就有了三个代码文件,分别是NetworkWordCount.scala
、StreamingExamples.scala
和DataSourceSocket.scala
。sbt打包编译是同时对这三个代码文件打包编译。打包成功以后,就可以输入命令启动数据源程序和监听程序。
下面首先启动用来生成数据源的DataSourceSocket
程序,不过,DataSourceSocket
程序需要把一个文本文件作为输入参数,所以,在启动这个程序之前,需要首先创建一个文本文件:
cd /home/songxitang/spark/mycode/streaming/
vim word.txt
在word.txt中随便输入几行英文语句,然后保存并退出vim编辑器。
下面就启动DataSourceSocket程序,这个程序需要三个参数,第一个参数是文本文件路径,第二个参数是端口地址,第三个参数是时间间隔(单位是毫秒,也就是每隔多少毫秒发送一次信息),请执行下面命令启动这个程序:
spark2-submit --class "org.apache.spark.examples.streaming.DataSourceSocket" /home/songxitang/spark/jars/simple-project_2.11-1.0.jar /home/songxitang/spark/mycode/streaming/word.txt 9999 1000
然后,你就会看到,这个窗口会不断打印出一些随机读取到的文本信息,这些信息也是Socket数据源,会被监听程序捕捉到。所以,下面,我们就在另外一个窗口启动监听程序:
spark2-submit --class "org.apache.spark.examples.streaming.NetworkWordCount" /home/songxitang/spark/jars/simple-project_2.11-1.0.jar localhost 9999
启动成功后,你就会看到,屏幕上不断打印出词频统计信息。成功完成实验。