测试代码
package Data_text
import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.util.Collector
import java.text.SimpleDateFormat
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
object pv {
def main(args: Array[String]): Unit = {
//创建流环境
val env = StreamExecutionEnvironment.getExecutionEnvironment
//自定义进程为1
env.setParallelism(1)
//定义时间语义
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
//时间格式转换
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
//获取数据
val stream = env.readTextFile("E:\\idea项目\\untitled4\\src\\main\\resources\\UserBehavior.csv")
//处理数据
val stream2 = stream
.map(data=>{
val arr = data.split(",")
(arr(0),arr(1),arr(2),arr(3),arr(4).toLong,1)
})
.assignAscendingTimestamps(data=>{
data._5 * 1000L
})
.windowAll(TumblingEventTimeWindows.of(Time.hours(1))) //开窗1个小时
.process(new ProcessAllWindowFunction[(String,String,String,String,Long,Int),String,TimeWindow] {
override def process(context: Context, elements: Iterable[(String, String, String, String,Long,Int)],
out: Collector[String]): Unit = {
//定义hashMap
val hashMap = new mutable.HashMap[String,Int]()
//遍历数据,将数据装入hashMap
for(i <- elements){
if(hashMap.contains(i._2)){
// 计算点击量,一条数据一次点击量
val sum = hashMap(i._2) + 1
hashMap.update(i._2,sum)
}else{
hashMap.put(i._2,i._6)
}
}
//定义一个可变的listBuffer,装填数据,自行排序
val listBuffer = new ListBuffer[(String,Int)]
for(i <- hashMap){
listBuffer += i
}
val listBuffer2 = listBuffer
.sortBy(_._2)(Ordering.Int.reverse)
.take(3)
//输出
out.collect("开始时间"+format.format(context.window.getStart)+"结束时间"+format.format(context.window.getEnd)+"数据:"+listBuffer2)
}
})
stream2.print("")
env.execute("")
}
}
运行结果