Flink项目实战篇 基于Flink的城市交通监控平台(上)

news2025/1/21 0:51:14

系列文章目录

Flink项目实战篇 基于Flink的城市交通监控平台(上)
Flink项目实战篇 基于Flink的城市交通监控平台(下)


文章目录

  • 系列文章目录
  • 1. 项目整体介绍
    • 1.1 项目架构
    • 1.2 项目数据流
    • 1.3 项目主要模块
  • 2. 项目数据字典
    • 2.1 卡口车辆采集数据
    • 2.2 城市交通管理数据表
    • 2.3 车辆轨迹数据表
  • 3. 实时卡口监控分析
    • 3.1 创建Maven项目
    • 3.2 准备数据
    • 3.3 实时车辆超速监控
    • 3.4 实时卡口拥堵情况监控
    • 3.5 实时最通畅的TopN卡口


1. 项目整体介绍

近几年来,随着国内经济的快速发展,高速公路建设步伐不断加快,全国机动车辆、驾驶员数量迅速增长,交通管理工作日益繁重,压力与日俱增。为了提高公安交通管理工作的科学化、现代化水平,缓解警力不足,加强和保障道路交通的安全、有序和畅通,减少道路交通违法和事故的发生,全国各地建设和使用了大量的“电子警察”、“高清卡口”、“固定式测速”、“区间测速”、“便携式测速”、“视频监控”、“预警系统”、“能见度天气监测系统”、“LED信息发布系统”等交通监控系统设备。尽管修建了大量的交通设施,增加了诸多前端监控设备,但交通拥挤阻塞、交通安全状况仍然十分严重。由于道路上交通监测设备种类和生产厂家繁多,目前还没有一个统一的数据采集和交换标准,无法对所有的设备、数据进行统一、高效的管理和应用,造成各种设备和管理软件混用的局面,给使用单位带来了很多不便,使得国家大量的基础建设投资未达到预期的效果。各交警支队的设备大都采用本地的数据库管理,交警总队无法看到各支队的监测设备及监测信息,严重影响对全省交通监测的宏观管理;目前网络状况为设备专网、互联网、公安网并存的复杂情况,需要充分考虑公安网的安全性,同时要保证数据的集中式管理;监控数据需要与“六合一”平台、全国机动车稽查布控系统等的数据对接,迫切需要一个全盘考虑面向交警交通行业的智慧交通管控指挥平台系统。

智慧交通管控指挥平台建成后,达到了以下效果目标:

  • 交通监视和疏导:通过系统将监视区域内的现场图像传回指挥中心,使管理人员直接掌握车辆排队、堵塞、信号灯等交通状况,及时调整信号配时或通过其他手段来疏导交通,改变交通流的分布,以达到缓解交通堵塞的目的。
  • 交通警卫:通过突发事件的跟踪,提高处置突发事件的能力。
  • 建立公路事故、事件预警系统的指标体系及多类分析预警模型,实现对高速公路通行环境、交通运输对象、交通运输行为的综合分析和预警,建立真正意义上的分析及预警体系。
  • 及时准确地掌握所监视路口、路段周围的车辆、行人的流量、交通治安情况等,为指挥人员提供迅速直观的信息从而对交通事故和交通堵塞做出准确判断并及时响应。
  • 收集、处理各类公路网动静态交通安全信息,分析研判交通安全态势和事故隐患,并进行可视化展示和预警提示。
  • 提供接口与其他平台信息共享和关联应用,基于各类动静态信息的大数据分析处理,实现交通违法信息的互联互通、源头监管等功能。

1.1 项目架构

本项目是与公安交通管理综合应用平台、机动车缉查布控系统等对接的,并且基于交通部门现有的数据平台上,进行的数据实时分析项目。
在这里插入图片描述

  • 卡口:道路上用于监控的某个点,可能是十字路口,也可能是高速出口等。
    在这里插入图片描述

  • 通道:每个卡口上有多个摄像头,每个摄像头有拍摄的方向。这些摄像头也叫通道。

  • “违法王“车辆: 该车辆违法未处理超过50次以上的车。

  • 摄像头拍照识别:
    (1)一次拍照识别:经过卡口摄像头进行的识别,识别对象的车辆号牌信息、车辆号牌颜色信息等,基于车辆号牌和车辆颜色信息,能够实现基本的违法行为辨识、车辆黑白名单比对报警等功能。
    (2)二次拍照识别:可以通过时间差和距离自动计算出车辆的速度。

1.2 项目数据流

在这里插入图片描述
实时处理流程如下:
http请求 -->数据采集接口–>数据目录–> flume监控目录[监控的目录下的文件是按照日期分的] -->Kafka -->Flink分析数据 --> Mysql[实时监控数据保存]

1.3 项目主要模块

在这里插入图片描述

2. 项目数据字典

2.1 卡口车辆采集数据

卡口数据通过Flume采集过来之后存入Kafka中,其中数据的格式为:

(
 `action_time` long  --摄像头拍摄时间戳,精确到秒, 
 `monitor_id` string  --卡口号, 
 `camera_id` string   --摄像头编号, 
 `car` string  --车牌号码, 
 `speed` double  --通过卡扣的速度, 
 `road_id` string  --道路id, 
 `area_id` string  --区域id, 
)

其中每个字段之间使用逗号隔开。
区域ID代表:一个城市的行政区域。
摄像头编号:一个卡口往往会有多个摄像头,每个摄像头都有一个唯一编号。
道路ID:城市中每一条道路都有名字,比如:蔡锷路。交通部门会给蔡锷路一个唯一编号。

2.2 城市交通管理数据表

Mysql数据库中有两张表是由城市交通管理平台提供的,本项目需要读取这两张表的数据来进行分析计算。
(1)城市区域表: t_area_info

DROP TABLE IF EXISTS `t_area_info`;
CREATE TABLE `area_info` (
  `area_id` varchar(255) DEFAULT NULL,
  `area_name` varchar(255) DEFAULT NULL
)
--导入数据
INSERT INTO `t_area_info` VALUES ('01', '海淀区');
INSERT INTO `t_area_info` VALUES ('02', '昌平区');
INSERT INTO `t_area_info` VALUES ('03', '朝阳区');
INSERT INTO `t_area_info` VALUES ('04', '顺义区');
INSERT INTO `t_area_info` VALUES ('05', '西城区');
INSERT INTO `t_area_info` VALUES ('06', '东城区');
INSERT INTO `t_area_info` VALUES ('07', '大兴区');
INSERT INTO `t_area_info` VALUES ('08', '石景山');

(2)城市“违法”车辆列表:
城市“违法”车辆,一般是指需要进行实时布控的违法车辆。

DROP TABLE IF EXISTS `t_violation_list`;
CREATE TABLE `t_violation_list` (
	`id` int(11) NOT NULL AUTO_INCREMENT,
	`car` varchar(255) DEFAULT NULL,
	`violation` varchar(1000) DEFAULT NULL,
	`create_time` bigint(20) DEFAULT NULL,
	`detail` varchar(1000) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

(3)城市卡口限速信息表:
城市中有些卡口有限制设置,一般超过当前限速的10%要扣分。

DROP TABLE IF EXISTS `t_monitor_info`;
CREATE TABLE `t_monitor_info` (
  `area_id` varchar(255) DEFAULT NULL,
  `road_id` varchar(255) NOT NULL,
  `monitor_id` varchar(255) NOT NULL,
  `speed_limit` int(11) DEFAULT NULL,
  PRIMARY KEY (`area_id`,`road_id`,`monitor_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
--导入数据
INSERT INTO `t_monitor_info` VALUES ('01','10','0000','60');
INSERT INTO `t_monitor_info` VALUES ('02','11','0001','60');
INSERT INTO `t_monitor_info` VALUES ('01','12','0002','80');
INSERT INTO `t_monitor_info` VALUES ('03','13','0003','100');

2.3 车辆轨迹数据表

在智能车辆布控模块中,需要保存一些车辆的实时行驶轨迹,为了方便其他部门和项目方便查询获取,我们在Mysql数据库设计一张车辆实时轨迹表。如果数据量太多,需要设置在HBase中。

DROP TABLE IF EXISTS `t_track_info`;
CREATE TABLE `t_track_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `car` varchar(255) DEFAULT NULL,
  `action_time` bigint(20) DEFAULT NULL,
  `monitor_id` varchar(255) DEFAULT NULL,
  `road_id` varchar(255) DEFAULT NULL,
  `area_id` varchar(255) DEFAULT NULL,
  `speed` double DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

3. 实时卡口监控分析

首先要实现的是实时卡口监控分析,由于前面课程项目中已经讲解了数据的ETL,本项目我们省略数据采集等ETL操作。我们将读取Kafka中的数据集来进行分析。
项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。首先我们需要搭建项目框架。

3.1 创建Maven项目

打开IDEA,创建一个maven项目,我们整个项目需要的工具的不同版本可能会对程序运行造成影响,所以应该在porm.xml文件的最上面声明所有工具的版本信息。

在pom.xml中加入以下配置:

<properties>
        <flink.version>1.9.1</flink.version>
        <scala.binary.version>2.11</scala.binary.version>
        <kafka.version>0.11.0.0</kafka.version>
</properties>

(1)添加项目依赖
对于整个项目而言,所有模块都会用到flink相关的组件,添加Flink相关组件依赖:

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
   </dependency>
   <dependency>
        <groupId>org.apache.flink</groupId>       <artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
        <version>${flink.version}</version>
   </dependency>
   <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka_${scala.binary.version}</artifactId>
        <version>${kafka.version}</version>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>          <artifactId>flink-connector-kafka_${scala.binary.version}</artifactId>
      <version>${flink.version}</version>
   </dependency>
   <dependency>
       <groupId>redis.clients</groupId>
       <artifactId>jedis</artifactId>
       <version>2.8.1</version>
   </dependency>
   <dependency>
       <groupId>org.apache.flink</groupId>
        <artifactId>flink-cep-scala_${scala.binary.version}</artifactId>
       <version>${flink.version}</version>
   </dependency>
</dependencies>

(2)添加Scala和打包插件

<build>
<plugins>
    <!-- 该插件用于将Scala代码编译成class文件 -->
    <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.4.6</version>
        <executions>
            <execution>
                <!-- 声明绑定到maven的compile阶段 -->
                <goals>
                    <goal>testCompile</goal>
                </goals>
            </execution>
        </executions>
    </plugin>

    <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>3.0.0</version>
        <configuration>
            <descriptorRefs>
                <descriptorRef>
                    jar-with-dependencies
                </descriptorRef>
            </descriptorRefs>
        </configuration>
        <executions>
            <execution>
                <id>make-assembly</id>
                <phase>package</phase>
                <goals>
                    <goal>single</goal>
                </goals>
            </execution>
        </executions>
    </plugin>
</plugins>
</build>

3.2 准备数据

由于在前面的课程中已经学过数据的采集和ETL,本项目不再赘述,现在我们直接随机生成数据到文件中(方便测试),同时也写入Kafka。

项目中模拟车辆速度数据和车辆经过卡扣个数使用到了高斯分布,高斯分布就是正态分布。“正态分布”(Normal Distribution)可以描述所有常见的事物和现象:正常人群的身高、体重、考试成绩、家庭收入等等。这里的描述是什么意思呢?就是说这些指标背后的数据都会呈现一种中间密集、两边稀疏的特征。以身高为例,服从正态分布意味着大部分人的身高都会在人群的平均身高上下波动,特别矮和特别高的都比较少见,正态分布非常常见。
在这里插入图片描述
基于以上所以需要在pom.xml中导入高斯分布需要的依赖包:

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-math3</artifactId>
    <version>3.6.1</version>
</dependency>

生成高斯标准分布的代码如下:

//获取随机数生成器
val generator: JDKRandomGenerator = new JDKRandomGenerator()
//随机生成高斯分布的数据
val grg: GaussianRandomGenerator = new GaussianRandomGenerator(generator)
//获取标准正态分布的数据
println(s"随机生成数据为:${grg.nextNormalizedDouble()}")

模拟生成数据的代码如下:

/**
  * 模拟生成数据,这里将数据生产到Kafka中,同时生成到文件中
  */
object GeneratorData {
  def main(args: Array[String]): Unit = {
    //创建文件流
    val pw = new PrintWriter("./data/traffic_data")

    //创建Kafka 连接properties
    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val random = new Random()

    //创建Kafka produer
    val producer = new KafkaProducer[String,String](props)

    //车牌号使用的地区
    val locations = Array[String]("京","津","京","鲁","京","京","冀","京","京","粤","京","京")

    //模拟车辆个数,这里假设每日有30万辆车信息
    for(i <- 1 to 30000){
      //模拟每辆车的车牌号,"%05d".format(100000) %05d,d代表数字,5d代表数字长度为5位,不足位数前面补0 。 例如:京A88888
      val car =locations(random.nextInt(12))+(65+random.nextInt(26)).toChar+"%05d".format(random.nextInt(100000))

      //模拟车辆经过的卡扣数,使用高斯分布,假设正常每辆车每日经过卡扣有30个
      val generator = new GaussianRandomGenerator(new JDKRandomGenerator())
      val monitorThreshold: Int = 1+(generator.nextNormalizedDouble()*30).abs.toInt //generator.nextNormalizedDouble() 处于-1 ~ 1 之间
      //模拟拍摄时间
      val day = DateUtils.getTodayDate()
      var hour = DateUtils.getHour()
      var flag = 0

      for(j <- 1 to monitorThreshold){
        flag+=1

        //模拟monitor_id ,4位长度
        val monitorId = "%04d".format(random.nextInt(9))

        //模拟camear_id ,5为长度
        val camearId = "%05d".format(random.nextInt(100000))

        //模拟road_id ,2为长度
        val roadId = "%02d".format(random.nextInt(50))

        //模拟area_id ,2为长度
        val areaId = "%02d".format(random.nextInt(8))

        //模拟速度 ,使用高斯分布,速度大多位于90 左右
        val speed = "%.1f".format(60 + (generator.nextNormalizedDouble()*30).abs)

        //模拟action_time
        if(flag % 30 == 0 && flag != 0 ){
          hour = (hour.toInt+1).toString
        }
        val currentTime = day+" "+hour+":"+DateUtils.getMinutesOrSeconds()+":"+DateUtils.getMinutesOrSeconds()
        //获取action_time 时间戳
        val actionTime: Long = DateUtils.getTimeStamp(currentTime)


        var oneInfo = s"$actionTime,$monitorId,$camearId,$car,$speed,$roadId,$areaId"
        println(s"oneInfo = $oneInfo")

        //写入文件:
        pw.write(oneInfo)
        pw.println()

        //写入kafka:
        producer.send(new ProducerRecord[String,String]("traffic-topic",oneInfo))
      }
    }

    pw.flush()
    pw.close()
    producer.close()
  }
}

3.3 实时车辆超速监控

在城市交通管理数据库中,存储了每个卡口的限速信息,但是不是所有卡口都有限速信息,其中有一些卡口有限制。Flink中有广播状态流,JobManger统一管理,TaskManger中正在运行的Task不可以修改这个广播状态。只能定时更新(自定义Source)。

我们通过实时计算,需要把所有超速超过10%的车辆找出来,并写入关系型数据库中。超速结果表如下:

DROP TABLE IF EXISTS `t_speeding_info`;
CREATE TABLE `t_speeding_info` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `car` varchar(255) NOT NULL,
  `monitor_id` varchar(255) DEFAULT NULL,
  `road_id` varchar(255) DEFAULT NULL,
  `real_speed` double DEFAULT NULL,
  `limit_speed` int(11) DEFAULT NULL,
`action_time` bigint(20) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

在当前需求中,需要不定时的从数据库表中查询所有限速的卡口,再根据限速的卡口列表来实时的判断是否存在超速的车辆,如果找到超速的车辆,把这些车辆超速的信息保存到Mysql数据库的超速违章记录表中t_speeding_info。

我们把查询限速卡口列表数据作为一个事件流,车辆通行日志数据作为第二个事件流。广播状态可以用于通过一个特定的方式来组合并共同处理两个事件流。第一个流的事件被广播到另一个operator的所有并发实例,这些事件将被保存为状态。另一个流的事件不会被广播,而是发送给同一个operator的各个实例,并与广播流的事件一起处理。广播状态非常适合两个流中一个吞吐大,一个吞吐小,或者需要动态修改处理逻辑的情况。

我们对两个流使用了connect()方法,并在连接之后调用BroadcastProcessFunction接口处理两个流:

  • processBroadcastElement()方法:每次收到广播流的记录时会调用。将接收到的卡口限速记录放入广播状态中;
  • processElement()方法:接受到车辆通行日志流的每条消息时会调用。并能够对广播状态进行只读操作,以防止导致跨越类中多个并发实例的不同广播状态的修改。

代码如下:

/**
  *   监控超速的车辆信息
  *   思路:从mysql中读取卡扣下的限速信息,通过广播流进行广播,然后与从kafka中读取的车流量监控事件流进行connect处理
  *     广播状态操作步骤:
  *       1).读取广播流的DStream数据
  *       2).将以上DStream数据广播出去
  *       3).主流与广播流进行Connect关联,调用 process 底层API处理
  *       4).实现process方法中 BroadcastProcessFunction 类下的两个方法进行数据处理
  */
object OutOfSpeedMonitor {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    env.setParallelism(1)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","testgroup1")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("auto.offset.reset","latest")

    //读取Kafka中的监控车辆事件流
    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      })

    //广播状态流 - 卡扣限速信息
    val broadCastStream: BroadcastStream[MonitorLimitSpeedInfo] = env.addSource(new JdbcReadSource("MonitorLimitSpeedInfo")).map(
      one => {
        one.asInstanceOf[MonitorLimitSpeedInfo]
      }
    ).broadcast(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)

    val outOfSpeedCarInfoDStream: DataStream[OutOfSpeedCarInfo] = mainDStream.connect(broadCastStream)
      .process(new BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo] {
        //当有车辆监控事件时会被调用
        override def processElement(trafficLog: TrafficLog, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#ReadOnlyContext, out: Collector[OutOfSpeedCarInfo]): Unit = {
          //道路_卡扣
          val roadMonitor = trafficLog.roadId+"_"+trafficLog.monitorId
          val info: MonitorLimitSpeedInfo = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR).get(roadMonitor)
          if (info != null) {
            //获取当前车辆真实的速度
            val realSpeed: Double = trafficLog.speed
            //获取当前卡扣限速信息
            val limitSpeed: Int = info.speedLimit
            //速度超过限速10% 就是超速车辆
            if (realSpeed > limitSpeed * 1.1) {
              out.collect(OutOfSpeedCarInfo(trafficLog.car, trafficLog.monitorId, trafficLog.roadId, realSpeed, limitSpeed, trafficLog.actionTime))
            }
          }
        }

        //每次收到广播流数据时,都会被调用,将接收到的卡扣限速记录放入到广播状态中
        override def processBroadcastElement(monitorLimitSpeedInfo: MonitorLimitSpeedInfo, ctx: BroadcastProcessFunction[TrafficLog, MonitorLimitSpeedInfo, OutOfSpeedCarInfo]#Context, out: Collector[OutOfSpeedCarInfo]): Unit = {
          val bcState: BroadcastState[String, MonitorLimitSpeedInfo] = ctx.getBroadcastState(GlobalConstant.MONITOR_LIMIT_SPEED_DESCRIPTOR)
          //key : 道路_卡扣 value :monitorLimitSpeedInfo
          bcState.put(monitorLimitSpeedInfo.roadId+"_"+monitorLimitSpeedInfo.monitorId, monitorLimitSpeedInfo)
        }
      })

    //将超速车辆的结果保存到 mysql 表 t_speeding_info 中。
    val sink: JdbcWriteSink[OutOfSpeedCarInfo] = new JdbcWriteSink("OutOfSpeedCarInfo")
    outOfSpeedCarInfoDStream.addSink(sink)

    env.execute()

  }
}

3.4 实时卡口拥堵情况监控

卡口的实时拥堵情况,其实就是通过卡口的车辆平均车速,为了统计实时的平均车速,这里设定一个滑动窗口,窗口长度是为5分钟,滑动步长为1分钟。平均车速=当前窗口内通过车辆的车速之和 / 当前窗口内通过的车辆数量 ;并且在Flume采集数据的时候,我们发现数据可能出现时间乱序问题,最长迟到5秒。

实时卡口平均速度需要保存到Mysql数据库中,结果表设计为:

DROP TABLE IF EXISTS `t_average_speed`;
CREATE TABLE `t_average_speed` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `start_time` bigint(20) DEFAULT NULL,
  `end_time` bigint(20) DEFAULT NULL,
  `monitor_id` varchar(255) DEFAULT NULL,
  `avg_speed` double DEFAULT NULL,
  `car_count` int(11) DEFAULT NULL,
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

完整的代码:

object MonitorAvgSpeedMonitor {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","testgroup2")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //使用时间为 事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设置线程为1
    env.setParallelism(1)

//    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props))
    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        val actionTime = arr(0).toLong
        val monitorId = arr(1)
        val cameraId = arr(2)
        val car = arr(3)
        val speed = arr(4).toDouble
        val roadId = arr(5)
        val areaId = arr(6)
        TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    mainDStream.keyBy(_.monitorId)
      .timeWindow(Time.minutes(5),Time.minutes(1))
      //统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数
      .aggregate(
        new AggregateFunction[TrafficLog,(Int,Double),(Int,Double)] {
          override def createAccumulator(): (Int, Double) = (0,0.0)

          override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1+1,accumulator._2+value.speed)

          override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulator

          override def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1+b._1,a._2+b._2)
        },
        new ProcessWindowFunction[(Int,Double),MonitorAvgSpeedInfo,String,TimeWindow] {
          override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {
            val monitorId  = key
            val avgSpeed = (elements.last._2/elements.last._1).formatted("%.2f").toDouble
            out.collect(new MonitorAvgSpeedInfo(context.window.getStart,context.window.getEnd,monitorId,avgSpeed,elements.last._1))
          }
        }
      )
      .addSink(new JdbcWriteSink[MonitorAvgSpeedInfo]("MonitorAvgSpeedInfo"))

    env.execute()

  }

3.5 实时最通畅的TopN卡口

所谓的最通畅的卡口,其实就是当时的车辆数量最少的卡口。这里有两种实现方式,一种是基于上一个功能的基础上再次开启第二个窗口操作,然后使用AllWindowFunction实现一个自定义的TopN函数Top来计算车速排名前3名的卡口,并将排名结果格式化成字符串,便于后续输出。另外一种是使用窗口函数,对滑动窗口内的数据全量计算并排序计算。

(1)基于上个功能基础上,完整的代码:

/**
  *  基于 "实时卡扣拥堵情况业务" 基础之上进行统计
  */
object FindTop5MonitorInfo2 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","testgroup2")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)

    //使用时间为 事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    //设置线程为1
    env.setParallelism(1)

        val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        val actionTime = arr(0).toLong
        val monitorId = arr(1)
        val cameraId = arr(2)
        val car = arr(3)
        val speed = arr(4).toDouble
        val roadId = arr(5)
        val areaId = arr(6)
        TrafficLog(actionTime, monitorId, cameraId, car, speed, roadId, areaId)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    val monitorAvgSpeedDStream: DataStream[MonitorAvgSpeedInfo] = mainDStream.keyBy(_.monitorId)
      .timeWindow(Time.minutes(5), Time.minutes(1))
      //统计每个卡扣通过车辆数,统计每个卡扣下的车辆总速度和,使用增量函数
      .aggregate(
      new AggregateFunction[TrafficLog, (Int, Double), (Int, Double)] {
        override def createAccumulator(): (Int, Double) = (0, 0.0)

        override def add(value: TrafficLog, accumulator: (Int, Double)): (Int, Double) = (accumulator._1 + 1, accumulator._2 + value.speed)

        override def getResult(accumulator: (Int, Double)): (Int, Double) = accumulator

        override def merge(a: (Int, Double), b: (Int, Double)): (Int, Double) = (a._1 + b._1, a._2 + b._2)
      },
      new ProcessWindowFunction[(Int, Double), MonitorAvgSpeedInfo, String, TimeWindow] {
        override def process(key: String, context: Context, elements: Iterable[(Int, Double)], out: Collector[MonitorAvgSpeedInfo]): Unit = {
          val monitorId = key
          val avgSpeed = (elements.last._2 / elements.last._1).formatted("%.2f").toDouble
          out.collect(new MonitorAvgSpeedInfo(context.window.getStart, context.window.getEnd, monitorId, avgSpeed, elements.last._1))
        }
      }
    ).assignAscendingTimestamps(masi => {
      masi.endTime
    })//设置下一个窗口的时间

    //这里设置一个滚动窗口,每隔1分钟,对以上所有卡扣对应的平均速度进行排序,得到对应的结果
    monitorAvgSpeedDStream.timeWindowAll(Time.minutes(1))
        .process(new ProcessAllWindowFunction[MonitorAvgSpeedInfo,String,TimeWindow] {
          override def process(context: Context, elements: Iterable[MonitorAvgSpeedInfo], out: Collector[String]): Unit = {
            val builder = new StringBuilder(s"窗口起始时间:${context.window.getStart} - ${context.window.getEnd},最拥堵的前3个卡扣信息如下:")
            val infoes: List[MonitorAvgSpeedInfo] = elements.toList.sortWith((masi1,masi2)=>{masi1.avgSpeed > masi2.avgSpeed}).take(3)
            for(masi <- infoes){
              builder.append(s"monitorId : ${masi.monitorId},avgSpeed : ${masi.avgSpeed} |")
            }
            out.collect(builder.toString())
          }
        }).print()

    env.execute()
  }
}

(2)滑动窗口全量计算:

object FindTop5MonitorInfo1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment

    //导入隐式转换
    import org.apache.flink.streaming.api.scala._

    //设置并行度为1
    env.setParallelism(1)

    //设置事件时间
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

    val props = new Properties()
    props.setProperty("bootstrap.servers","mynode1:9092,mynode2:9092,mynode3:9092")
    props.setProperty("group.id","testgroup3")
    props.setProperty("key.deserializer",classOf[StringDeserializer].getName)
    props.setProperty("value.deserializer",classOf[StringDeserializer].getName)
    val mainDStream: DataStream[TrafficLog] = env.addSource(new FlinkKafkaConsumer[String]("traffic-topic", new SimpleStringSchema(), props).setStartFromEarliest())
//    val mainDStream: DataStream[TrafficLog] = env.socketTextStream("mynode5",9999)
      .map(line => {
        val arr: Array[String] = line.split(",")
        TrafficLog(arr(0).toLong, arr(1), arr(2), arr(3), arr(4).toDouble, arr(5), arr(6))
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[TrafficLog](Time.seconds(5)) {
      override def extractTimestamp(element: TrafficLog): Long = element.actionTime
    })

    mainDStream
      .timeWindowAll(Time.minutes(1))
      .aggregate(
        //返回数据为 Map[String,Double] => Map[卡扣,平均速度]
        new AggregateFunction[TrafficLog,Map[String,(Int,Double)],Map[String,Double]]{
          //初始化一个Map[卡扣,(当前卡扣对应总车辆数,当前卡扣下所有车辆总速度和)]
          override def createAccumulator(): Map[String, (Int, Double)] = Map()

          override def add(value: TrafficLog, accMap: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {
            //获取当前一条数据的monitorID
            val monitorId: String = value.monitorId
            if(accMap.contains(monitorId)){//当前map中包含此卡扣
              accMap.put(monitorId,(accMap.get(monitorId).get._1+1,accMap.get(monitorId).get._2+value.speed))
            }else{
              accMap.put(monitorId,(1,value.speed))
            }
            accMap
          }

          override def getResult(accumulator: Map[String,(Int, Double)]): Map[String, Double] = {
            accumulator.map(tp=>{
              val monitorId: String = tp._1
              val totalCarCount: Int = tp._2._1
              val totalSpeed: Double = tp._2._2
              (monitorId,(totalSpeed/totalCarCount).formatted("%.2f").toDouble)
            })
          }

          //合并不同线程处理的数据
          override def merge(a: Map[String, (Int, Double)], b: Map[String, (Int, Double)]): Map[String, (Int, Double)] = {
            b.foreach(tp=>{
              val monitorId: String = tp._1
              val carCount: Int = tp._2._1
              val totalSpeed: Double = tp._2._2
              if(a.contains(monitorId)){//第一个map中包含当前卡扣数据
                a.put(monitorId,(a.get(monitorId).get._1 + carCount,a.get(monitorId).get._2+totalSpeed))
              }else{
                //第一个map中不包含当前卡扣数据
                a.put(monitorId,tp._2)
              }
            })
            a
          }
        },
        new AllWindowFunction[Map[String, Double],String,TimeWindow] {
          override def apply(window: TimeWindow, input: scala.Iterable[mutable.Map[String, Double]], out: Collector[String]): Unit = {
            val tuples: List[(String, Double)] = input.last.toList.sortWith((tp1,tp2)=>{tp1._2 > tp2._2}).take(3)
            val returnStr = new StringBuilder(s"窗口起始时间:${window.getStart} - ${window.getEnd} ,最拥堵前3个卡扣信息 :")
            for(tp <- tuples){
              returnStr.append(s"monitorId = ${tp._1} ,avgSpeed = ${tp._2} |")
            }
            out.collect(returnStr.toString())
          }
        }
      ).print()

    env.execute()

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1339693.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

信息网络协议基础-IPv6协议

文章目录 概述为什么引入IP服务模型IPv4的可扩展性问题解决方法***CIDR(Classless Inter-Domain Routing, 无类别域间寻路)前缀汇聚***前缀最长匹配***NAT(网络地址转换)存在的问题解决方案路由表配置***局限性IPv6协议头标IPv6地址表示前缀类型单播地址链路局部地址(Link-Loca…

RabbitMq知识概述

本文来说下RabbitMq相关的知识与概念 文章目录 概述AMQP协议Exchange 消息如何保证100&#xff05;投递什么是生产端的可靠性投递可靠性投递保障方案 消息幂等性高并发的情况下如何避免消息重复消费confirm 确认消息、Return返回消息如何实现confirm确认消息return消息机制 消费…

构建高效数据中台:集群规划与搭建的最佳实践指南

架构设计 Rack(机架)配置建议 大数据集群规划 安装细节见配套文档 两地三中心 两地三中心是一种信息技术架构模式,通常用于灾难恢复和业务连续性计划。这种模式设计有两个物理位置(两地),在这两个位置上部署了三个数据中心(三中心):一个主数据中心和两个备份数据中心…

电子邮件过滤软件SpamSieve mac高级功能

SpamSieve mac是一款电子邮件过滤软件&#xff0c;旨在帮助用户有效地识别和阻止垃圾邮件。该软件可通过机器学习算法自动学习您的邮箱中哪些邮件是垃圾邮件&#xff0c;哪些是正常邮件&#xff0c;并根据您的反馈不断优化过滤效果。 使用SpamSieve非常简单&#xff0c;只需将其…

How to Develop Word Embeddings in Python with Gensim

https://machinelearningmastery.com/develop-word-embeddings-python-gensim/ 本教程分为 6 个部分;他们是&#xff1a; 词嵌入 Gensim 库 开发 Word2Vec 嵌入 可视化单词嵌入 加载 Google 的 Word2Vec 嵌入 加载斯坦福大学的 GloVe 嵌入 词嵌入 单词嵌入是一种提供单词的…

HTML的学习记录

<br /> 标签在 HTML 页面中创换行符。 <hr /> 标签在 HTML 页面中创建水平线。 段落是通过 <p> 标签定义的。 浏览器会自动地在段落的前后添加空行。&#xff08;<p> 是块级元素&#xff09; 文本格式 <b>This text is bold</b>字体加粗 …

腾讯云轻量应用服务器购买流程(两种方式)

腾讯云轻量应用服务器购买指南&#xff0c;有两个入口&#xff0c;一个是在特价活动上购买&#xff0c;一个是在轻量应用服务器官方页面购买&#xff0c;特价活动上购买价格更便宜&#xff0c;轻量2核2G3M带宽服务器62元一年起&#xff0c;阿腾云atengyun.com分享腾讯云轻量应用…

VS配置PCO相机SDK环境

VS配置PCO相机SDK环境 概述:最近要用到一款PCO相机,需要协调其他部件实现一些独特的功能。因此需要用到PCO相机的SDK,并正确配置环境。良好的环境是成功的一半。其SDK可以在官网下载,选择对应版本的安装即可。这里用的是pco.cpp.1.2.0 Windows,VS 2022 专业版。 链接: P…

阿里云 ACK 云上大规模 Kubernetes 集群高可靠性保障实战

作者&#xff1a;贤维 马建波 古九 五花 刘佳旭 引言 2023 年 7 月&#xff0c;阿里云容器服务 ACK 成为首批通过中国信通院“云服务稳定运行能力-容器集群稳定性”评估的产品&#xff0c; 并荣获“先进级”认证。随着 ACK 在生产环境中的采用率越来越高&#xff0c;稳定性保…

【python 的各种模块】(8) 在python使用matplotlib和wordcloud库来画wordcloud词云图

目录 目标&#xff1a;用python画出&#xff0c;网上流行的wordcloud词云图 1 准备工作 1.1环境准备 1.1.1安装步骤 1.2 资源准备 1.2.1 文本文件内容如下 1.2.2 图片资源 2 代码测试 2.1 第一版代码和效果 2.1.1 代码和效果 2.1.2 一般plt里解决中文乱码问题 2.1…

StackOverflowError的JVM处理方式

背景&#xff1a; 事情来源于生产的一个异常日志 Caused by: java.lang.StackOverflowError: null at java.util.stream.Collectors.lambda$groupingBy$45(Collectors.java:908) at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) at java.util.ArrayL…

基于飞浆OCR的文本框box及坐标中心点检测JSON格式保存文本

OCR的文本框box及JSON数据保存 需求说明 一、借助飞浆框出OCR识别的文本框 二、以圆圈形式标出每个框的中心点位置 三、以JSON及文本格式保存OCR识别的文本 四、以文本格式保存必要的文本信息 解决方法 一、文本的坐标来自飞浆的COR识别 二、借助paddleocr的draw_ocr画出…

【数据库系统概论】第4章-数据库安全性

复习用&#xff0c;别看了 文章目录 4.1 计算机安全性概述4.2 数据库安全性控制4.2.1 用户标识和鉴定4.2.2 存取控制4.2.3 自主存取控制方法4.2.4 数据库角色4.2.5 强制存取控制 4.3 视图机制4.4 审计4.5 数据加密4.6 其他安全性保护 4.1 计算机安全性概述 不安全因素 4.2 …

项目管理计划(word版21页)

本计划的主要目的是通过本方案明确本项目的项目管理体系。方案的主要内容包括&#xff1a;明确项目的目标及工作范围&#xff0c;明确项目的组织结构和人员分工&#xff0c;确立项目的沟通环境&#xff0c;确立项目进度管理方法&#xff0c;明确项目跟踪和监控方式&#xff0c;…

代理模式:中间者的故事

代理模式&#xff1a;中间者的故事 介绍需求分析代理模式代码实现代理模式整理和用途第一种用途第二种用途第三种用途第四种用途 总结 介绍 本文引用《大话设计模式》第七章节的内容进行学习分析&#xff0c;仅供学习使用 需求&#xff1a;小明拜托自己好朋友小王给他朋友小美…

同化的题解

时间限制: 1000ms 空间限制: 524288kB 题目描述 古人云&#xff1a;“近朱者赤近墨者黑”。这句话是很有道理的。这不鱼大大和一群苦命打工仔被安排进厂拧螺丝了。 进厂第一天&#xff0c;每个人拧螺丝的动力k都是不同且十分高涨的。但是当大家坐在一起后会聊天偷懒&#xf…

python3处理docx并flask显示

前言&#xff1a; 最近有需求处理docx文件&#xff0c;并讲内容显示到页面&#xff0c;对world进行在线的阅读&#xff0c;这样我这里就使用flaskDocument对docx文件进行处理并显示&#xff0c;下面直接上代码&#xff1a; Document处理&#xff1a; 首先下载Document的库文…

【论文解读】Learning based fast H.264 to H.265 transcoding

时间&#xff1a; 2015 年 级别&#xff1a; APSIPA 机构&#xff1a; 上海电力大学 摘要 新提出的视频编码标准HEVC (High Efficiency video coding)以其比H.264/AVC更好的编码效率&#xff0c;被工业界和学术界广泛接受和采用。在HEVC实现了约40%的编码效率提升的同时&…

源码补丁神器—patch-package

一、背景 vue项目中使用 vue-pdf 第三方插件预览pdf&#xff0c;书写业务代码完美运行&#xff0c;pdf文件内容正常预览无问题。后期需求有变&#xff0c;业务需求增加电子签章功能。这个时候pdf文件的内容可以显示出来&#xff0c;但是公司的电子签章无法显示。这令人沮丧&am…

B/S架构云端SaaS服务的医院云HIS系统源码,自主研发,支持电子病历4级

医院云HIS系统源码&#xff0c;自主研发&#xff0c;自主版权&#xff0c;电子病历病历4级 系统概述&#xff1a; 一款满足基层医院各类业务需要的云HIS系统。该系统能帮助基层医院完成日常各类业务&#xff0c;提供病患挂号支持、病患问诊、电子病历、开药发药、会员管理、统…