spark第七章:SparkStreaming实例

news2025/1/16 17:04:48

系列文章目录

系列文章目录

spark第一章:环境安装
spark第二章:sparkcore实例
spark第三章:工程化代码
spark第四章:SparkSQL基本操作
spark第五章:SparkSQL实例
spark第六章:SparkStreaming基本操作
spark第七章:SparkStreaming实例


文章目录

  • 系列文章目录
  • 系列文章目录
  • 前言
  • 一、环境准备
    • 1.pox修改
    • 2.文件准备
    • 3.数据准备
  • 二、项目案例
    • 1.需求一:广告黑名单
    • 2.需求二:广告点击量实时统计
    • 3.需求三:最近一小时广告点击量
  • 总结


前言

今天我们来完成spark的最后一次实验案例.


一、环境准备

1.pox修改

<dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.47</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-hive_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>1.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.12</artifactId>
            <version>3.2.3</version>
        </dependency>
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-core</artifactId>
            <version>2.14.2</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/com.alibaba/druid -->
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <version>1.1.10</version>
        </dependency>

这是完整的pom代码,查缺补漏吧.

2.文件准备

在这里插入图片描述
为了不要和之前的项目混淆,我重建了一个包

3.数据准备

我们通过代码发送数据到kafka来生产数据,然后在从另一端消费数据进行分析.
每条数据有五个字段,其中包括.
时间(用时间戳代替)
地区
城市
用户
广告.
MockData.scala

package com.atguigu.bigdata.spark.streaming.exp

import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}

import java.util.Properties
import scala.collection.mutable.ListBuffer
import scala.util.Random


object MockData {
  def main(args: Array[String]): Unit = {
    //生成模拟数据
    //格式 : timestamp area city userid adid
    //含义 : 时间戳 区域 城市 用户 广告

    // 创建配置对象
    val prop = new Properties()
    // 添加配置
    prop.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092")
    prop.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")
    prop.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String, String](prop)

    while (true){
      mockdata().foreach(
        (data: String) =>{
          val record = new ProducerRecord[String,String]("atguigu",data)
          producer.send(record)
        }
      )
      Thread.sleep(3000)
    }
  }
  def mockdata(): ListBuffer[String] ={
    val list: ListBuffer[String] = ListBuffer[String]()
    val areaList: ListBuffer[String] = ListBuffer[String]("华北", "华东", "华南")
    val cityList: ListBuffer[String] = ListBuffer[String]("北京", "上海", "深圳")

    for (_ <-1 to 30){
      val area: String = areaList(new Random().nextInt(3))
      val city: String = cityList(new Random().nextInt(3))
      val userid: Int = new Random().nextInt(6)+1
      val adid: Int = new Random().nextInt(6)+1

      list.append(s"${System.currentTimeMillis()} $area $city $userid $adid")
    }
    list
  }
}

此处用的是之前创建的atguigu主题,如果删除了,在创建一下.

为了测试生产的数据,我们先简单消费一下,直接打印一下.
req1.scala

package com.atguigu.bigdata.spark.streaming.exp

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}


object req1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    kafkaDataDS.map(_.value()).print()

    ssc.start()
    ssc.awaitTermination()
  }
}

在集群中开启zookpeer和kafka,然后进行数据消费
在这里插入图片描述
出现时间戳后开始生产数据.
在这里插入图片描述
当开始打印数据后,就代码我们整个流程没有问题,接下来我们对数据进行处理.

二、项目案例

1.需求一:广告黑名单

实现实时的动态黑名单机制:将每天对某个广告点击超过 100 次的用户拉黑。
注:黑名单保存到 MySQL 中。
MySQL建表
在这里插入图片描述
我直接新建了一个spark-streaming数据库
建表语句
存放黑名单用户的表
CREATE TABLE black_list (userid CHAR(1) PRIMARY KEY);

存放单日各用户点击每个广告的次数
CREATE TABLE user_ad_count (
dt varchar(255),
userid CHAR (1),
adid CHAR (1),
count BIGINT,
PRIMARY KEY (dt, userid, adid)
);
封装MySQL工具类
JDBCUtil.scala

package com.atguigu.bigdata.spark.streaming.exp.Util

import com.alibaba.druid.pool.DruidDataSourceFactory

import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSource

object JDBCUtil { //初始化连接池
  var dataSource: DataSource = init()
  //初始化连接池方法
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
    properties.setProperty("username", "root")
    properties.setProperty("password", "000000")
    properties.setProperty("maxActive", "50")

    DruidDataSourceFactory.createDataSource(properties)
  }
  //获取 MySQL 连接
  def getConnection: Connection = {
    dataSource.getConnection
  }
}

需求实现
req1_BlackList.scala

package com.atguigu.bigdata.spark.streaming.exp

import com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer


object req1_BlackList {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    val adClickData: DStream[AdClickData] = kafkaDataDS.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data: String = kafkaData.value()
        val datas: Array[String] = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )
    //获取统计后的数据
    val ds: DStream[((String, String, String), Int)] = adClickData.transform(
      (rdd: RDD[AdClickData]) => {
        val blackList: ListBuffer[String] = ListBuffer[String]()
        val conn: Connection = JDBCUtil.getConnection
        val pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")

        val rs: ResultSet = pstat.executeQuery()

        while (rs.next()) {
          blackList.append(rs.getString(1))
        }

        rs.close()
        pstat.close()
        conn.close()

        //判断用户是否在黑名单中
        val filterRDD: RDD[AdClickData] = rdd.filter(
          (data: AdClickData) => {
            !blackList.contains(data.user)
          }
        )

        filterRDD.map(
          (data: AdClickData) => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day: String = sdf.format(new Date(data.ts.toLong))
            val user: String = data.user
            val ad: String = data.ad
            ((day, user, ad), 1)
          }
        ).reduceByKey((_: Int) + (_: Int))
      }
    )

    ds.foreachRDD(
      (rdd: RDD[((String, String, String), Int)]) =>{
        rdd.foreach{
          case ((day, user, ad), count)=>{
            println((day, user, ad), count)
            if (count>=30){
              //如果统计数量超过30,将用户拉近黑名单
              val conn: Connection = JDBCUtil.getConnection
              val pstat: PreparedStatement = conn.prepareStatement(
                """
                  |insert into black_list (userid) values (?)
                  |on DUPLICATE KEY
                  |UPDATE userid=?
                  |""".stripMargin)
              pstat.setString(1,user)
              pstat.setString(2,user)
              pstat.executeUpdate()
              pstat.close()
              conn.close()
            }else{
              //如果没有超过,点击数量更新
              val conn: Connection = JDBCUtil.getConnection
              val pstat: PreparedStatement = conn.prepareStatement(
                """
                  | select *
                  | from user_ad_count
                  | where dt =? and userid=? and adid=?
                  |""".stripMargin)
              pstat.setString(1,day)
              pstat.setString(2,user)
              pstat.setString(3,ad)
              val rs: ResultSet = pstat.executeQuery()

              if (rs.next()){
                //如果存在数据,那么更新
                val pstat1: PreparedStatement = conn.prepareStatement(
                  """
                    | update user_ad_count
                    | set count=count+?
                    | where dt =? and userid=? and adid=?
                    |""".stripMargin)
                pstat1.setInt(1,count)
                pstat1.setString(2,day)
                pstat1.setString(3,user)
                pstat1.setString(4,ad)
                pstat1.executeUpdate()
                pstat1.close()
                //更新后如果超过,拉进黑名单
                val pstat2: PreparedStatement = conn.prepareStatement(
                  """
                    | select *
                    | from user_ad_count
                    | where dt =? and userid=? and adid=? and count>=30
                    |""".stripMargin)
                pstat2.setString(1,day)
                pstat2.setString(2,user)
                pstat2.setString(3,ad)

                val rs2: ResultSet = pstat2.executeQuery()

                if (rs2.next()){

                  val pstat3: PreparedStatement = conn.prepareStatement(
                    """
                      |insert into black_list (userid) values (?)
                      |on DUPLICATE KEY
                      |UPDATE userid=?
                      |""".stripMargin)
                  pstat3.setString(1,user)
                  pstat3.setString(2,user)
                  pstat3.executeUpdate()
                  pstat3.close()
                }
                rs2.close()
                pstat2.close()

              }else{
                //如果不存在数据,那么新增
                val pstat1: PreparedStatement = conn.prepareStatement(
                  """
                    | insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)
                    |""".stripMargin)
                pstat1.setString(1,day)
                pstat1.setString(2,user)
                pstat1.setString(3,ad)
                pstat1.setInt(4,count)
                pstat1.executeUpdate()
                pstat1.close()
              }


              rs.close()
              pstat.close()
              conn.close()

            }
          }
        }
      }
    )


    ssc.start()
    ssc.awaitTermination()

  }

  //广告点击数据
  case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)

}

然后测试一下,还是先消费后生产,先将kafka积压的数据都消费掉,在重新生产.
在这里插入图片描述
如果没有开启生产就出现了数据,说明之前kafka有数据积压,我们将数据库的内容清空后,就可以开始生产数据了.
在这里插入图片描述
之后刷新数据库,可以发现数据开始不断变化,直到最后一个字段,点击数量超过30,被拉入黑名单.
在这里插入图片描述
在这里插入图片描述
代码优化
修改工具类
JDBCUtil.scala

package com.atguigu.bigdata.spark.streaming.exp.Util

import com.alibaba.druid.pool.DruidDataSourceFactory

import java.sql.{Connection, PreparedStatement}
import java.util.Properties
import javax.sql.DataSource

object JDBCUtil { //初始化连接池
  var dataSource: DataSource = init()
  //初始化连接池方法
  def init(): DataSource = {
    val properties = new Properties()
    properties.setProperty("driverClassName", "com.mysql.jdbc.Driver")
    properties.setProperty("url", "jdbc:mysql://hadoop102:3306/spark-streaming?useUnicode=true&characterEncoding=UTF-8&useSSL=false")
    properties.setProperty("username", "root")
    properties.setProperty("password", "000000")
    properties.setProperty("maxActive", "50")

    DruidDataSourceFactory.createDataSource(properties)
  }
  //获取 MySQL 连接
  def getConnection: Connection = {
    dataSource.getConnection
  }
  //执行 SQL 语句,单条数据插入
  def executeUpdate(connection: Connection, sql: String, params: Array[Any]): Int = {
    var rtn = 0
    var pstmt: PreparedStatement = null
    try {
      connection.setAutoCommit(false)
      pstmt = connection.prepareStatement(sql)
      if (params != null && params.length > 0) {
        for (i <- params.indices) {
          pstmt.setObject(i + 1, params(i))
        }
      }
      rtn = pstmt.executeUpdate()
      connection.commit()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    rtn
  }


  //判断一条数据是否存在
  def isExist(connection: Connection, sql: String, params: Array[Any]): Boolean =
  {
    var flag: Boolean = false
    var pstmt: PreparedStatement = null
    try {
      pstmt = connection.prepareStatement(sql)
      for (i <- params.indices) {
        pstmt.setObject(i + 1, params(i))
      }
      flag = pstmt.executeQuery().next()
      pstmt.close()
    } catch {
      case e: Exception => e.printStackTrace()
    }
    flag
  }
}

req1_BlackList1.scala

package com.atguigu.bigdata.spark.streaming.exp

import com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, PreparedStatement, ResultSet}
import java.text.SimpleDateFormat
import java.util.Date
import scala.collection.mutable.ListBuffer


object req1_BlackList1 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    val adClickData: DStream[AdClickData] = kafkaDataDS.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data: String = kafkaData.value()
        val datas: Array[String] = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )
    //获取统计后的数据
    val ds: DStream[((String, String, String), Int)] = adClickData.transform(
      (rdd: RDD[AdClickData]) => {
        val blackList: ListBuffer[String] = ListBuffer[String]()
        val conn: Connection = JDBCUtil.getConnection
        val pstat: PreparedStatement = conn.prepareStatement("select userid from black_list")

        val rs: ResultSet = pstat.executeQuery()

        while (rs.next()) {
          blackList.append(rs.getString(1))
        }

        rs.close()
        pstat.close()
        conn.close()

        //判断用户是否在黑名单中
        val filterRDD: RDD[AdClickData] = rdd.filter(
          (data: AdClickData) => {
            !blackList.contains(data.user)
          }
        )

        filterRDD.map(
          (data: AdClickData) => {
            val sdf = new SimpleDateFormat("yyyy-MM-dd")
            val day: String = sdf.format(new Date(data.ts.toLong))
            val user: String = data.user
            val ad: String = data.ad
            ((day, user, ad), 1)
          }
        ).reduceByKey((_: Int) + (_: Int))
      }
    )

    ds.foreachRDD(
      (rdd: RDD[((String, String, String), Int)]) =>{
        //一个分区创建一个连接对象
//        rdd.foreachPartition(
//          iter=>{
//            val conn: Connection = JDBCUtil.getConnection
//            iter.foreach{
//              case ((day, user, ad), count)=>{
//
//              }
//            }
//            conn.close()
//          }
//        )




        rdd.foreach{
          case ((day, user, ad), count)=>{
            println((day, user, ad), count)
            if (count>=30){
              //如果统计数量超过30,将用户拉近黑名单
              val conn: Connection = JDBCUtil.getConnection
              val sql: String ="""
                        |  insert into black_list (userid) values (?)
                        |  on DUPLICATE KEY
                        |  UPDATE userid=?
                        |""".stripMargin
              JDBCUtil.executeUpdate(conn,sql,Array(user,user))
              conn.close()
            }else{
              //如果没有超过,点击数量更新
              val conn: Connection = JDBCUtil.getConnection
              val sql0: String ="""
                        | select *
                        | from user_ad_count
                        | where dt =? and userid=? and adid=?
                        |""".stripMargin
              val flg: Boolean = JDBCUtil.isExist(conn, sql0, Array(day, user, ad))

              if (flg){
                //如果存在数据,那么更新
                val sql1: String ="""
                          |   update user_ad_count
                          |   set count=count+?
                          |   where dt =? and userid=? and adid=?
                          |""".stripMargin
                JDBCUtil.executeUpdate(conn,sql1,Array(count,day,user,ad))

                //更新后如果超过,拉进黑名单
                val sql2: String ="""
                          | select *
                          | from user_ad_count
                          | where dt =? and userid=? and adid=? and count>=30
                          |""".stripMargin
                val flg1: Boolean = JDBCUtil.isExist(conn, sql2, Array(day, user, ad))

                if (flg1){
                  val sql3: String ="""
                            |  insert into black_list (userid) values (?)
                            |  on DUPLICATE KEY
                            |  UPDATE userid=?
                            |""".stripMargin
                  JDBCUtil.executeUpdate(conn,sql3,Array(user,user))
                }


              }else{
                //如果不存在数据,那么新增
                val sql4: String ="""
                           |insert into user_ad_count (dt,userid,adid,count) values (?,?,?,?)
                           |""".stripMargin
                JDBCUtil.executeUpdate(conn,sql4,Array(day,user,ad,count))

              }
              conn.close()

              //更新后如果超过,拉进黑名单
            }
          }
        }
      }
    )
    ssc.start()
    ssc.awaitTermination()
  }

  //广告点击数据
  case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

效果和之前一样,就不演示了.

2.需求二:广告点击量实时统计

描述:实时统计每天各地区各城市各广告的点击总流量,并将其存入 MySQL。
MySQL建表
CREATE TABLE area_city_ad_count (
dt VARCHAR(255),
area VARCHAR(255),
city VARCHAR(255),
adid VARCHAR(255),
count BIGINT,
PRIMARY KEY (dt,area,city,adid)
);
req2.scala

package com.atguigu.bigdata.spark.streaming.exp

import com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date


object req2 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(3))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    val adClickData: DStream[AdClickData] = kafkaDataDS.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data: String = kafkaData.value()
        val datas: Array[String] = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )

    val reduceDS: DStream[((String, String, String, String), Int)] = adClickData.map(
      (data: AdClickData) => {
        val sdf = new SimpleDateFormat("yyyy-MM-dd")
        val day: String = sdf.format(new Date(data.ts.toLong))
        val area: String = data.area
        val city: String = data.city
        val ad: String = data.ad

        ((day, area, city, ad), 1)
      }
    ).reduceByKey((_: Int) + (_: Int))

    reduceDS.foreachRDD(
      rdd=>{
        rdd.foreachPartition(
          iter=>{
            val conn: Connection = JDBCUtil.getConnection
            val pstat: PreparedStatement = conn.prepareStatement(
              """
                |  insert into area_city_ad_count (dt ,area,city,adid,count)
                |  values (?,?,?,?,?)
                |  on DUPLICATE KEY
                |  UPDATE count=count+?
                |""".stripMargin)
            iter.foreach{
              case ((day, area, city, ad), sum)=>{
                pstat.setString(1,day)
                pstat.setString(2,area)
                pstat.setString(3,city)
                pstat.setString(4,ad)
                pstat.setInt(5,sum)
                pstat.setInt(6,sum)
                pstat.executeUpdate()
              }
            }
            pstat.close()
            conn.close()
          }
        )
      }
    )
    ssc.start()
    ssc.awaitTermination()
  }
  //广告点击数据
  case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

还是先消费,后生产,然后查看数据库.
在这里插入图片描述

3.需求三:最近一小时广告点击量

一个小时太长了,咱们就做1分钟的.10秒钟统计一次.
req3.scala

package com.atguigu.bigdata.spark.streaming.exp

import com.atguigu.bigdata.spark.streaming.exp.Util.JDBCUtil
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.{DStream, InputDStream}
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

import java.sql.{Connection, PreparedStatement}
import java.text.SimpleDateFormat
import java.util.Date


object req3 {
  def main(args: Array[String]): Unit = {
    val sparkConf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkStream")
    val ssc = new StreamingContext(sparkConf,Seconds(5))

    val kafkaPara: Map[String, Object] = Map[String, Object](
      ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "hadoop102:9092,hadoop103:9092,hadoop104:9092",
      ConsumerConfig.GROUP_ID_CONFIG -> "atguigu",
      "key.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer",
      "value.deserializer" -> "org.apache.kafka.common.serialization.StringDeserializer"
    )

    val kafkaDataDS: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream[String, String](
      ssc,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe[String, String](Set("atguigu"), kafkaPara)
    )

    val adClickData: DStream[AdClickData] = kafkaDataDS.map(
      (kafkaData: ConsumerRecord[String, String]) => {
        val data: String = kafkaData.value()
        val datas: Array[String] = data.split(" ")
        AdClickData(datas(0), datas(1), datas(2), datas(3), datas(4))
      }
    )


    //最近一分钟,每10秒计算一次
    val reduceDS: DStream[(Long, Int)] = adClickData.map(
      data => {
        val ts: Long = data.ts.toLong
        val newTs: Long = ts / 10000 * 10000
        (newTs, 1)
      }
    ).reduceByKeyAndWindow((_: Int) + (_: Int), Seconds(60), Seconds(10))

    reduceDS.print()


    ssc.start()
    ssc.awaitTermination()

  }
  //广告点击数据
  case class AdClickData(ts:String,area:String,city:String,user:String,ad:String)
}

还是先消费,后生产.
在这里插入图片描述

总结

Spark的学习就告一段落了,下一步估计要啃Flink了

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/415592.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

HTML标签结构1.1(标题标签、文本格式化标签)

1&#xff0c;标签结构图&#xff1a; <!-- 加粗标签 双标签 --><strong>文字变粗</strong> <!-- 换行 单标签--><br><!-- 水平分割线 单标签--><hr> HTML标签与标签之间的关系&#xff1a; ①父子关系&#xff08;嵌套关系&…

开源代码只是心里安慰,开源软件如何选择?

大家好&#xff0c;欢迎来到停止重构的频道。 本期我们聊一个比较开放的话题&#xff0c;也是项目中经常遇到的问题。 在实际项目当中&#xff0c;由于开发成本、上线周期等因素&#xff0c;不可避免地需要使用开源软件。 开源软件意味着源码公开。 但是&#xff0c;使用开…

[零刻]EQ12EQ12Pro安装软路由教程

OpenWRT系统安装 安装前准备 1.U盘一个 2.WePE写盘工具 3.Openwrt固件 4.Img镜像写盘工具 安装步骤&#xff1a; 1.首先下载WePE写盘工具&#xff0c;制作一个PE系统安装环境&#xff0c;启动软件后&#xff0c;选择安装PE到U盘 2.插入U盘后&#xff0c;刷新一下设备&#x…

【多微电网】计及碳排放的基于交替方向乘子法(ADMM)的多微网电能交互分布式运行策略研究(Matlab代码实现)

&#x1f4a5;&#x1f4a5;&#x1f49e;&#x1f49e;欢迎来到本博客❤️❤️&#x1f4a5;&#x1f4a5; &#x1f3c6;博主优势&#xff1a;&#x1f31e;&#x1f31e;&#x1f31e;博客内容尽量做到思维缜密&#xff0c;逻辑清晰&#xff0c;为了方便读者。 ⛳️座右铭&a…

Vue2 API-源码解析

目录 Vue.extend(option) delimiters functional Vue.component(id, Function | Object) Vue.directive( id, [definition] ) Vue.filter( id, function) Vue.nextTick() Vue.set() Vue.delete(target, index/key) Vue.compile(template) Vue.observable(object) …

相机的内参和外参介绍

注&#xff1a;以下相机内参与外参介绍除来自网络整理外全部来自于《视觉SLAM十四讲从理论到实践 第2版》中的第5讲&#xff1a;相机与图像&#xff0c;为了方便查看&#xff0c;我将每节合并到了一幅图像中 相机与摄像机区别&#xff1a;相机着重于拍摄静态图像&#x…

智安网络|Ddos攻击原理和防范方法,你值得拥有

这几年网络越来越容易崩溃&#xff0c;今天不是这个网站崩&#xff0c;就是那个网站进不去&#xff0c;去年各个省份的健康码也接连崩溃&#xff0c;大家在寒风冷冽的冬天排队几小时做核酸&#xff0c;为什么现在的APP、网页都那么容易崩&#xff1f;还是同样的理由&#xff0c…

[Java·算法·困难]LeetCode10. 正则表达式匹配

每天一题&#xff0c;防止痴呆题目示例分析思路1题解1&#x1f449;️ 力扣原文 题目 给你一个字符串 s 和一个字符规律 p&#xff0c;请你来实现一个支持 . 和 * 的正则表达式匹配。 . 匹配任意单个字符 * 匹配零个或多个前面的那一个元素 所谓匹配&#xff0c;是要涵盖 整…

Doubbo

目录 1.简介 2.架构 3.配置监控中心 4.模拟服务提供者 5.模拟服务消费者 6.消费者的其他配置 6.1协议 6.2启动时检查 6.3超时和重试 6.4负载均衡 7.补充 1.简介 Apache Dubbo是一款高性能的Java RPC框架。其前身是阿里巴巴公司开源的一个高性能、轻量级的开源Java RP…

【Vue-Spring跨域Bug已解决】has been blocked by CORS policy: The value of the······

文章目录一.问题发现二.问题解决过程2.1 询问AI结果2.2 问题解决三.知识点一.问题发现 报错内容&#xff1a; p://localhost:8001/user/login’ from origin ‘http://localhost:3001’ has been blocked by CORS policy: The value of the ‘Access-Control-Allow-Credentia…

我决定给 ChatGPT 做个缓存层 Hello GPTCache

&#x1f31f; 写在前面黄老板的一句【AI 的 iPhone 时刻已至】震撼了半个科技圈。或许&#xff0c;应该把这句话再扩展一下&#xff1a;AI 的 iPhone 时刻早已势不可挡&#xff0c;它不是平静随和地跟大家 say hi&#xff0c;而是作为一个强悍的巨人携着一把名为 ChatGPT 的斧…

leetcode每日一题:数组专练篇第二期(2/2)

&#x1f61a;一个不甘平凡的普通人&#xff0c;日更算法学习和打卡&#xff0c;期待您的关注和认可&#xff0c;陪您一起学习打卡&#xff01;&#xff01;&#xff01;&#x1f618;&#x1f618;&#x1f618; &#x1f917;专栏&#xff1a;每日算法学习 &#x1f4ac;个人…

【LeetCode: 剑指 Offer II 089. 房屋偷盗(打家窃舍) | 暴力递归=>记忆化搜索=>动态规划】

&#x1f34e;作者简介&#xff1a;硕风和炜&#xff0c;CSDN-Java领域新星创作者&#x1f3c6;&#xff0c;保研|国家奖学金|高中学习JAVA|大学完善JAVA开发技术栈|面试刷题|面经八股文|经验分享|好用的网站工具分享&#x1f48e;&#x1f48e;&#x1f48e; &#x1f34e;座右…

Adept AI,颠覆“产品学“的产品

1.三体降临&#xff0c;产品学不存在了&#xff1f; 兄弟们&#xff0c;你们敢想象以后我们都会有用自己的贾维斯吗&#xff1f;我们不需要在安装一大堆APP&#xff0c;不需要适应各种APP交互&#xff0c;只需一句话你能快速达到你想要的目的吗&#xff1f;你能想象那种科幻大…

Nacos 客户端服务注册源码分析-篇二

Nacos 客户端服务注册源码分析-篇二 继续接上回&#xff0c;上回分析到 NacosNamingService 的整个注册的流程&#xff0c;其实是通过 NacosFactory.createNamingService 方法&#xff0c;反射获取 NacosNamingService 接口的实现类 NacosNamingService &#xff0c;而 NacosN…

基于粒子群算法的含风光燃储微网优化调度

说明书 MATLAB代码&#xff1a;基于粒子群算法的含风光燃储微网优化调度 关键词&#xff1a;微网优化调度 粒子群算法 风光燃储 参考文档&#xff1a;《基于多目标粒子群算法的微电网优化调度_王金全》仅参考部分模型&#xff0c;非完全复现 优势&#xff1a;代码注释详实&…

【通过Cpython3.9源码看看python中的大小整数】

小整数 /* interpreter state */#define _PY_NSMALLPOSINTS 257 #define _PY_NSMALLNEGINTS 5这是CPython中定义的两个常量&#xff0c;它们用于控制解释器状态中的小整数对象池。在CPython中&#xff0c;小整数对象池是一种优化机制&#xff0c;用于减少…

轨迹相似度整理

1 基于点之间的距离 1.1 欧几里得距离 优点&#xff1a;线性计算时间缺点&#xff1a;轨迹长度必须一样 1.2 DTW DTW 笔记&#xff1a; Dynamic Time Warping 动态时间规整 &#xff08;&DTW的python实现&#xff09; 【DDTW&#xff0c;WDTW】_UQI-LIUWJ的博客-CSDN博客 …

Golang流媒体实战之六:lal拉流服务源码阅读

欢迎访问我的GitHub 这里分类和汇总了欣宸的全部原创(含配套源码)&#xff1a;https://github.com/zq2599/blog_demos 《Golang流媒体实战》系列的链接 体验开源项目lal回源转推和录制lalserver的启动源码阅读Golang流媒体实战之五&#xff1a;lal推流服务源码阅读Golang流媒体…

大数据3 -Hadoop HDFS-分布式文件系统

目录 1.为什么需要分布式存储&#xff1f; 2. HDFS的基础架构 3. HDFS存储原理 4. NameNode是如何管理Block块的 5. HDFS数据的读写流程 1.为什么需要分布式存储&#xff1f; •数据量太大&#xff0c;单机存储能力有上限&#xff0c;需要靠数量来解决问题•数量的提升带…