StructuredStreaming Sink
Output Modes
-
append
默认追加模式, 将新的数据输出,只支持简单查询
-
complete
完整模式,支持聚合和排序
-
update
更新模式,支持聚合不支持排序,没有聚合和append一样
下面这段操作,有聚合,有排序,只能用complete
val ds = df.as[String]
val wordDs = ds.flatMap(_.split(" "))
val result = wordDs
.groupBy('value)
.count()
.orderBy('count)
Sink位置
Memory sink
输出内存表存储在内存中
支持append和complete
应用于测试环境
// 输出
val query = result.writeStream
.format("memory")
.queryName("result")
.outputMode("complete")
.start()
while (true) {
spark.sql("select * from result").show()
Thread.sleep(3000)
}
完整代码
object Sink {
def main(args: Array[String]): Unit = {
// 创建环境
val spark = SparkSession.builder().appName("Operation").master("local[*]")
.config("spark.sql.shuffle.partitions", "4") // 设置分区数
.getOrCreate()
val sc = spark.sparkContext
import spark.implicits._
// 加载数据
val df = spark.readStream
.format("socket")
.option("host", "hadoop102")
.option("port", 9999)
.load()
df.printSchema()
// 处理数据
// DSL
val ds = df.as[String]
val wordDs = ds.flatMap(_.split(" "))
val result = wordDs
.groupBy('value)
.count()
.orderBy('count)
// 输出
val query = result.writeStream
.format("memory")
.queryName("result")
.outputMode("complete")
.start()
while (true) {
spark.sql("select * from result").show()
Thread.sleep(3000)
}
// query.awaitTermination()
spark.stop()
}
}
ForeachBatch Sink
ForeachSink可以对输出记录进行任意计算,针对每一条数据
ForeachBatch Sink 针对每一批数据
object ForeachBatchSink {
def main(args: Array[String]): Unit = {
//TODO 0.创建环境
val spark: SparkSession = SparkSession.builder().appName("sparksql").master("local[*]")
.config("spark.sql.shuffle.partitions", "4")
.getOrCreate()
val sc: SparkContext = spark.sparkContext
sc.setLogLevel("WARN")
import spark.implicits._
//TODO 1.加载数据
val df: DataFrame = spark.readStream
.format("socket")
.option("host", "node1")
.option("port", 9999)
.load()
df.printSchema()
//TODO 2.处理数据
val ds: Dataset[String] = df.as[String]
val result: Dataset[Row] = ds.flatMap(_.split(" "))
.groupBy('value)
.count()
.orderBy('count.desc)
//TODO 3.输出结果
result.writeStream
.foreachBatch((ds: Dataset[Row], batchId:Long) => {
//自定义输出到控制台
println("-------------")
println(s"batchId:${batchId}")
println("-------------")
ds.show()
//自定义输出到MySQL
ds.coalesce(1)
.write
.mode(SaveMode.Overwrite)
.format("jdbc")
.option("url", "jdbc:mysql://localhost:3306/bigdata?characterEncoding=UTF-8")
.option("user", "root")
.option("password", "root")
.option("dbtable", "bigdata.words")
.save()
})
.outputMode("complete")
//TODO 4.启动并等待结束
.start()
.awaitTermination()
//TODO 5.关闭资源
spark.stop()
}
}
触发间隔
微批处理的时候,每隔批次都可以做checkpoint
连续处理时需要指定时间做checkpoint
val ds: Dataset[String] = df.as[String]
val result: Dataset[Row] = ds.coalesce(1).flatMap(_.split(" "))
.groupBy('value)
.count()
result.writeStream
.format("console")
.outputMode("complete")
//触发间隔:
//1.默认的不写就是:尽可能快的运行微批,Default trigger (runs micro-batch as soon as it can)
//2.指定0也是尽可能快的运行
// .trigger(Trigger.ProcessingTime("0 seconds"))
//3.指定时间间隔
//.trigger(Trigger.ProcessingTime("5 seconds"))
//4.触发1次
//.trigger(Trigger.Once())
//5.连续处理并指定Checkpoint时间间隔,实验的
.trigger(Trigger.Continuous("1 second"))
.option("checkpointLocation", "./ckp"+System.currentTimeMillis())
//TODO 4.启动并等待结束
.start()
.awaitTermination()