flume整合数据到kafka,sparkStreaming消费数据,并存储到hbase和redis中

news2025/1/16 4:48:20

目录

1、模拟数据生成

2、flume采集数据 

1、node01配置flume的conf文件 

2、node02开发flume的配置文件

3、node03开发flume的配置文件

4、开发flume启动停止脚本 

5、node01执行以下命令创建kafka的topic

6、启动并查看kafka的数据

3、SparkStreaming消费kafka中的数据 

1、第一步sparkStreaming的连接

2、第二步从kafka中获取数据信息,写了一个自定义方法getStreamingContextFromHBase

3、第三步、消费数据,解析数据并将数据存入hbase不同的表中和redis中

4、第四步提交消费的kafka的偏移量到 hbase的表中进行管理


使用flume采集我们的日志数据,然后将数据放入到kafka当中去,通过sparkStreaming消费我们的kafka当中的数据,然后将数据保存到hbase,并且将海口数据保存到redis当中去,实现实时轨迹监控以及历史轨迹回放的功能

 为了模拟数据的实时生成,我们可以通过数据回放程序来实现订单数据的回放功能

1、模拟数据生成

1、海口订单数据上传

            将海口数据上传到node01服务器的/kkb/datas/sourcefile这个路径下,node01执行以下命令创建文件夹,然后上传数据

mkdir -p /kkb/datas/sourcefile

2、成都轨迹日志数据

成都数据上传到node02服务器的/kkb/datas/sourcefile这个路径下node02执行以下命令创建文件夹,然后上传数据

mkdir -p /kkb/datas/sourcefile

3、因为没有实际应用,所以写一个脚本,对数据不断复制追加

在node01服务器的/home/hadoop/bin路径下创建shell脚本,用于数据的回放

cd /home/hadoop/bin

vim start_stop_generate_data.sh

#!/bin/bash

scp /home/hadoop/FileOperate-1.0-SNAPSHOT-jar-with-dependencies.jar  node02:/home/hadoop/

#休眠时间控制

sleepTime=1000

if [ ! -n "$2" ];then

 echo ""

 else

 sleepTime=$2

fi

case $1 in

"start" ){

 for i in  node01 node02

  do

    echo "-----------$i启动数据回放--------------"

      ssh $i "source /etc/profile;nohup java -jar /home/hadoop/FileOperate-1.0-SNAPSHOT-jar-with-dependencies.jar /kkb/datas/sourcefile /kkb/datas/destfile $2   > /dev/null 2>&1 & "

   

  done

};;

"stop"){

  for i in node02 node01

    do

      echo "-----------停止 $i 数据回放-------------"

      ssh $i "source /etc/profile; ps -ef | grep FileOperate-1.0-SNAPSHOT-jar | grep -v grep |awk '{print \$2}' | xargs kill"

    done

};;

esac

脚本赋权 

cd /home/hadoop/bin

chmod 777 start_stop_generate_data.sh

启动脚本

sh start_stop_generate_data.sh start  3000 

停止脚本

sh start_stop_generate_data.sh stop 

2、flume采集数据 

逻辑机构如下

1、node01配置flume的conf文件 

cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/

vim flume_client.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#配置source

a1.sources.r1.type = taildir

a1.sources.r1.positionFile = /kkb/datas/flume_temp/flume_posit/haikou.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /kkb/datas/destfile/part.+

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = hai_kou_gps_topic

#flume监听轨迹文件内容的变化 tuch gps

#配置sink

#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

#flume监听的文件数据发送到此kafka的主题当中

#a1.sinks.k1.topic = hai_kou_gps_topic

#a1.sinks.k1.brokerList= node01:9092,node02:9092,node03:9092

#a1.sinks.k1.batchSize = 20

#a1.sinks.k1.requiredAcks = 1

#a1.sinks.k1.producer.linger.ms = 1

#配置sink

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node03

a1.sinks.k1.port = 41414

#配置channel

a1.channels.c1.type = file

#检查点文件目录

a1.channels.c1.checkpointDir=/kkb/datas/flume_temp/flume_check

#缓存数据文件夹

a1.channels.c1.dataDirs=/kkb/datas/flume_temp/flume_cache

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

 sources:文件数据来源

channels: 文件数据通道

 sinks :文件数据输出

其中关键的就是在sources配置过滤器,方便node03数据集中处理的时候将不同的数据,分配给不同的kafka的topic

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = hai_kou_gps_topic

2、node02开发flume的配置文件

cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/

vim flume_client.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

# Describe/configure the source

#配置source

a1.sources.r1.type = taildir

a1.sources.r1.positionFile = /kkb/datas/flume_temp/flume_posit/chengdu.json

a1.sources.r1.filegroups = f1

a1.sources.r1.filegroups.f1 = /kkb/datas/destfile/part.+

a1.sources.r1.fileHeader = true

a1.sources.r1.channels = c1

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = cheng_du_gps_topic

#flume监听轨迹文件内容的变化 tuch gps

#配置sink

#a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

#flume监听的文件数据发送到此kafka的主题当中

#a1.sinks.k1.topic = cheng_du_gps_topic

#a1.sinks.k1.brokerList= node01:9092,node02:9092,node03:9092

#a1.sinks.k1.batchSize = 20

#a1.sinks.k1.requiredAcks = 1

#a1.sinks.k1.producer.linger.ms = 1

a1.sinks.k1.type = avro

a1.sinks.k1.hostname = node03

a1.sinks.k1.port = 41414

#配置channel

a1.channels.c1.type = file

#检查点文件目录

a1.channels.c1.checkpointDir=/kkb/datas/flume_temp/flume_check

#缓存数据文件夹

a1.channels.c1.dataDirs=/kkb/datas/flume_temp/flume_cache

# Bind the source and sink to the channel

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

关键配置信息

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = static

##  static拦截器的功能就是往采集到的数据的header中插入自己定## 义的key-value对

a1.sources.r1.interceptors.i1.key = type

a1.sources.r1.interceptors.i1.value = cheng_du_gps_topic

3、node03开发flume的配置文件

cd /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/

vim flume2kafka.conf

a1.sources = r1

a1.sinks = k1

a1.channels = c1

#定义source

a1.sources.r1.type = avro

a1.sources.r1.bind = node03

a1.sources.r1.port =41414

#添加时间拦截器

a1.sources.r1.interceptors = i1

a1.sources.r1.interceptors.i1.type = org.apache.flume.interceptor.TimestampInterceptor$Builder

#定义channels

#配置channel

a1.channels.c1.type = file

#检查点文件目录

a1.channels.c1.checkpointDir=/kkb/datas/flume_temp/flume_check

#缓存数据文件夹

a1.channels.c1.dataDirs=/kkb/datas/flume_temp/flume_cache

#定义sink

#flume监听轨迹文件内容的变化 tuch gps

#配置sink

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

flume监听的文件数据发送到此kafka的主题当中

a1.sinks.k1.topic = %{type}

a1.sinks.k1.brokerList= node01:9092,node02:9092,node03:9092

a1.sinks.k1.batchSize = 20

a1.sinks.k1.requiredAcks = 1

a1.sinks.k1.producer.linger.ms = 1

#组装source、channel、sink

a1.sources.r1.channels = c1

a1.sinks.k1.channel = c1

将 传入信息的key为type的value值,作为sink数据输出端kafka的topic

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink

flume监听的文件数据发送到此kafka的主题当中

a1.sinks.k1.topic = %{type}

4、开发flume启动停止脚本 

cd /home/hadoop/bin/

vim flume_start_stop.sh

#!/bin/bash

case $1 in

"start" ){

 for i in node03 node02 node01

  do

    echo "-----------启动 $i 采集flume-------------"

    if [ "node03" = $i ];then

      ssh $i "source /etc/profile;nohup /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/bin/flume-ng agent -n a1 -c /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf -f /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/flume2kafka.conf -Dflume.root.logger=info,console > /dev/null 2>&1 & "

    else

      ssh $i "source /etc/profile;nohup /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/bin/flume-ng agent -n a1 -c /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf -f /kkb/install/apache-flume-1.6.0-cdh5.14.2-bin/conf/flume_client.conf -Dflume.root.logger=info,console > /dev/null 2>&1 &  "

    fi

  done

};;

"stop"){

  for i in node03 node02 node01

    do

      echo "-----------停止 $i 采集flume-------------"

      ssh $i "source /etc/profile; ps -ef | grep flume | grep -v grep |awk '{print \$2}' | xargs kill"

    done

};;

esac

chmod 777  flume_start_stop.sh 

开启flume脚本

sh flume_start_stop.sh start 

停止flume脚本

sh flume_start_stop.sh stop 

5、node01执行以下命令创建kafka的topic

cd /kkb/install/kafka/

bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 9 --topic cheng_du_gps_topic

bin/kafka-topics.sh --create --zookeeper node01:2181 --replication-factor 2 --partitions 9 --topic hai_kou_gps_topic

6、启动并查看kafka的数据

node01执行以下命令启动订单回放脚本

cd /home/hadoop/bin/

sh start_stop_generate_data.sh start 3000

node01启动flume采集数据脚本

cd /home/hadoop/bin/

sh flume_start_stop.sh start

消费数据 

cd /kkb/install/kafka/

bin/kafka-console-consumer.sh --topic cheng_du_gps_topic  --zookeeper node01:2181,node02:2181,node03:2181

成都kafka消费数据 

bin/kafka-console-consumer.sh --topic hai_kou_gps_topic  --zookeeper node01:2181,node02:2181,node03:2181

海口kafka消费数据 

3、SparkStreaming消费kafka中的数据 

主要程序如下 

package com.travel.programApp

import java.util.regex.Pattern

import com.travel.common.{ConfigUtil, Constants, HBaseUtil, JedisUtil}
import com.travel.utils.HbaseTools
import org.apache.hadoop.hbase.{Cell, CellUtil, HColumnDescriptor, HTableDescriptor, TableName}
import org.apache.hadoop.hbase.client.{Admin, Connection, Get, Result, Table}
import org.apache.hadoop.hbase.util.Bytes
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.kafka010.{CanCommitOffsets, ConsumerStrategies, ConsumerStrategy, HasOffsetRanges, KafkaUtils, LocationStrategies, OffsetRange}
import redis.clients.jedis.Jedis

import scala.collection.mutable

object StreamingKafka {
  def main(args: Array[String]): Unit = {

    val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)
    //KAFKA_BOOTSTRAP_SERVERS=node01:9092,node02:9092,node03:9092
    val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC), ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))
    
    val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")
    val group: String = "gps_consum_group"
    //   "bootstrap.servers" -> brokers,
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> brokers,

      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> group,
      "auto.offset.reset" -> "latest", // earliest,latest,和none
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
    val context: SparkContext = sparkSession.sparkContext
    context.setLogLevel("WARN")
    // val streamingContext = new StreamingContext(conf,Seconds(5))
    //获取streamingContext
    val streamingContext: StreamingContext = new StreamingContext(context, Seconds(1))



    //sparkStreaming消费kafka的数据,然后将offset维护保存到hbase里面去
    
    //第一步从kafak当中获取数据
    val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext, kafkaParams, topics, group, "(.*)gps_topic")
    //第二步:将数据保存到hbase以及redis里面去
    result.foreachRDD(eachRdd => {
      if (!eachRdd.isEmpty()) {
        eachRdd.foreachPartition(eachPartition => {
          val connection: Connection = HBaseUtil.getConnection
          val jedis: Jedis = JedisUtil.getJedis
          //判断表是否存在,如果不存在就进行创建
          val admin: Admin = connection.getAdmin
          if (!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))) {
            /**
             * 一般Hbase创建表,代码结构如下
             * TableName myuser = TableName.valueOf("myuser");
             * HTableDescriptor hTableDescriptor = new HTableDescriptor(myuser);
             * //指定一个列族
             * HColumnDescriptor f1 = new HColumnDescriptor("f1");
             *  hTableDescriptor.addFamily(f1);
             *   admin.createTable(hTableDescriptor);
             */
            val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))
            htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))
            admin.createTable(htabgps)
          }
          //判断海口的GPS表是否存在,如果不存在则创建表
          if (!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))) {

            val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))
            htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))
            admin.createTable(htabgps)
          }
          //通过循环遍历分区的数据,将每个分区当中的每一条数据都获取出来
          eachPartition.foreach(record => {
            val consumerRecord: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection, jedis, record)
          })
          JedisUtil.returnJedis(jedis)
          connection.close()
        })
        //获取到消费完成的offset的偏移量
        val offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges
        //将offset保存到hbase里面去,默认可以手动提交保存到kafak的一个topic里面去
        //将offset保存到kafak里面去
        // result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
        for (eachrange <- offsetRanges) {
          val startOffset: Long = eachrange.fromOffset
          val endOffset: Long = eachrange.untilOffset
          val topic: String = eachrange.topic
          val partition: Int = eachrange.partition
          //将offset保存到hbase里面去
          HbaseTools.saveBatchOffset(group, topic, partition + "", endOffset)
        }

      }
    })

    streamingContext.start()
    streamingContext.awaitTermination()


  }

}

1、第一步sparkStreaming的连接

标准的基础连接,kafkaParams 为kafka的消费基础信息

val brokers = ConfigUtil.getConfig(Constants.KAFKA_BOOTSTRAP_SERVERS)
//KAFKA_BOOTSTRAP_SERVERS=node01:9092,node02:9092,node03:9092
val topics = Array(ConfigUtil.getConfig(Constants.CHENG_DU_GPS_TOPIC), ConfigUtil.getConfig(Constants.HAI_KOU_GPS_TOPIC))

val conf = new SparkConf().setMaster("local[1]").setAppName("sparkKafka")
val group: String = "gps_consum_group"

val kafkaParams = Map[String, Object](
  "bootstrap.servers" -> brokers,

  "key.deserializer" -> classOf[StringDeserializer],
  "value.deserializer" -> classOf[StringDeserializer],
  "group.id" -> group,
  "auto.offset.reset" -> "latest", // earliest,latest,和none
  "enable.auto.commit" -> (false: java.lang.Boolean)
)
val sparkSession: SparkSession = SparkSession.builder().config(conf).getOrCreate()
val context: SparkContext = sparkSession.sparkContext
context.setLogLevel("WARN")
// val streamingContext = new StreamingContext(conf,Seconds(5))
//获取streamingContext
val streamingContext: StreamingContext = new StreamingContext(context, Seconds(1))

2、第二步从kafka中获取数据信息,写了一个自定义方法getStreamingContextFromHBase

val result: InputDStream[ConsumerRecord[String, String]] = HbaseTools.getStreamingContextFromHBase(streamingContext, kafkaParams, topics, group, "(.*)gps_topic")

 1、获取hbase中存储的偏移量信息

2、对应的偏移量,获取对应的value数据信息

def getStreamingContextFromHBase(streamingContext: StreamingContext, kafkaParams: Map[String, Object], topics: Array[String], group: String, matchPattern: String): InputDStream[ConsumerRecord[String, String]] = {
  val connection: Connection = getHbaseConn
  val admin: Admin = connection.getAdmin
  //拿取到HBASE的存偏移量的表hbase_offset_store的偏移量数据,TopicPartition, Long组成的hashMap集合,Long表示偏移量位置
  //TopicPartition里面封装的有参构造器封装的 topic 主题和partition分区
  var getOffset: collection.Map[TopicPartition, Long] = HbaseTools.getOffsetFromHBase(connection, admin, topics, group)
  //如果偏移量数组大于0,则证明是以前被消费过的,所以多传一个参数,传入偏移量的值
  val result = if (getOffset.size > 0) {
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(matchPattern), kafkaParams, getOffset)
    val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent, consumerStrategy)
    //streamingContext streaming上下文对象
    //LocationStrategies.PreferConsistent:数据本地性策略
    //consumerStrategy消费策略
    value
    //返回streaming获取到kafka的真实value值
  } else {
    val consumerStrategy: ConsumerStrategy[String, String] = ConsumerStrategies.SubscribePattern[String, String](Pattern.compile(matchPattern), kafkaParams)
    val value: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent, consumerStrategy)
    value
  }
  admin.close()
  connection.close()
  result
}

getHbaseConn方法,hbase连接的方法

def getHbaseConn: Connection = {
  try {
// GlobalConfigUtils.getProp("hbase.master")自定义的配置信息
    val config: Configuration = HBaseConfiguration.create()
    config.set("hbase.zookeeper.quorum", GlobalConfigUtils.getProp("hbase.zookeeper.quorum"))
    //   config.set("hbase.master" , GlobalConfigUtils.getProp("hbase.master"))
    config.set("hbase.zookeeper.property.clientPort", GlobalConfigUtils.getProp("hbase.zookeeper.property.clientPort"))
    //      config.set("hbase.rpc.timeout" , GlobalConfigUtils.rpcTimeout)
    //      config.set("hbase.client.operator.timeout" , GlobalConfigUtils.operatorTimeout)
    //      config.set("hbase.client.scanner.timeout.period" , GlobalConfigUtils.scannTimeout)
    //      config.set("hbase.client.ipc.pool.size","200");
    val connection = ConnectionFactory.createConnection(config)
    connection

  } catch {
    case exception: Exception =>
      error(exception.getMessage)
      error("HBase获取连接失败")
      null
  }
}

3、第三步、消费数据,解析数据并将数据存入hbase不同的表中和redis中

//将数据保存到hbase以及redis里面去
result.foreachRDD(eachRdd => {
  if (!eachRdd.isEmpty()) {
    eachRdd.foreachPartition(eachPartition => {
      val connection: Connection = HBaseUtil.getConnection
      val jedis: Jedis = JedisUtil.getJedis
      //判断表是否存在,如果不存在就进行创建
      val admin: Admin = connection.getAdmin
      if (!admin.tableExists(TableName.valueOf(Constants.HTAB_GPS))) {
        /**
         * 一般Hbase创建表,代码结构如下
         * TableName myuser = TableName.valueOf("myuser");
         * HTableDescriptor hTableDescriptor = new HTableDescriptor(myuser);
         * //指定一个列族
         * HColumnDescriptor f1 = new HColumnDescriptor("f1");
         *  hTableDescriptor.addFamily(f1);
         *   admin.createTable(hTableDescriptor);
         */
        val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_GPS))
        htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))
        admin.createTable(htabgps)
      }
      //判断海口的GPS表是否存在,如果不存在则创建表
      if (!admin.tableExists(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))) {

        val htabgps = new HTableDescriptor(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))
        htabgps.addFamily(new HColumnDescriptor(Constants.DEFAULT_FAMILY))
        admin.createTable(htabgps)
      }
      //通过循环遍历分区的数据,将每个分区当中的每一条数据都获取出来
      eachPartition.foreach(record => {
        val consumerRecord: ConsumerRecord[String, String] = HbaseTools.saveToHBaseAndRedis(connection, jedis, record)
      })
      JedisUtil.returnJedis(jedis)
      connection.close()
    })

存入redis或者hbase的方法

通过逗号隔开判断数据长度,进而判断数据是成都GPS日志数据还是海口订单数据

//    成都的数据如下
//    18901e5f392c5ad98d24c296dcb0afe4,0003a1ceaf2979d0bdeff58da7665a41,1475976805,104.05368,30.70332
//    海口的数据如下
//    dwv_order_make_haikou_1.order_id dwv_order_make_haikou_1.product_id dwv_order_make_haikou_1.city_id  dwv_order_make_haikou_1.district dwv_order_make_haikou_1.county dwv_order_make_haikou_1.type dwv_order_make_haikou_1.combo_type dwv_order_make_haikou_1.traffic_type dwv_order_make_haikou_1.passenger_count  dwv_order_make_haikou_1.driver_product_id  dwv_order_make_haikou_1.start_dest_distance  dwv_order_make_haikou_1.arrive_time  dwv_order_make_haikou_1.departure_time dwv_order_make_haikou_1.pre_total_fee  dwv_order_make_haikou_1.normal_time  dwv_order_make_haikou_1.bubble_trace_id  dwv_order_make_haikou_1.product_1level dwv_order_make_haikou_1.dest_lng dwv_order_make_haikou_1.dest_lat dwv_order_make_haikou_1.starting_lng dwv_order_make_haikou_1.starting_lat dwv_order_make_haikou_1.year dwv_order_make_haikou_1.month  dwv_order_make_haikou_1.day
//    17592719043682 3  83 0898 460107 0  0  0  4  3  4361 2017-05-19 01:09:12  2017-05-19 01:05:19  13 11 10466d3f609cb938dd153738103b0303 3  110.3645 20.0353  110.3665 20.0059  2017 05 19
//    下面是成都的数据,成都的数据才有逗号,才能分组
def saveToHBaseAndRedis(connection: Connection, jedis: Jedis, eachLine: ConsumerRecord[String, String]): ConsumerRecord[String, String] = {

  var rowkey = ""
  //司机ID
  var driverId = ""
  //订单ID
  var orderId = ""
  //经度
  var lng = ""
  //维度
  var lat = ""
  //时间戳
  var timestamp = ""

  val topic: String = eachLine.topic()
  val line: String = eachLine.value()
  //成都数据
  if (line.split(",").size > 4) {
    if (!line.contains("end")) {
      //非结束数据,保存到hbase里面去
      //成都数据
      val strings: Array[String] = line.split(",")
      val split: Array[String] = line.split(",")
      driverId = split(0)
      orderId = split(1)
      timestamp = split(2)
      lng = split(3)
      lat = split(4)
      rowkey = orderId + "_" + timestamp
      val put = new Put(rowkey.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "CITYCODE".getBytes(), Constants.CITY_CODE_CHENG_DU.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DRIVERID".getBytes(), driverId.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "ORDERID".getBytes(), orderId.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "TIMESTAMP".getBytes(), (timestamp + "").getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "TIME".getBytes(), DateUtils.formateDate(new Date((timestamp + "000").toLong), "yyyy-MM-dd HH:mm:ss").getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "LNG".getBytes(), lng.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "LAT".getBytes(), lat.getBytes())
      val table: Table = connection.getTable(TableName.valueOf(Constants.HTAB_GPS))
      table.put(put)
      table.close()
    }
    //数据保存到redis里面去
    if (line.split(",").size == 5 || line.contains("end")) {
      JedisUtil.saveChengDuJedis(line)
    }
    //无论如何,成都数据都需要往下传递
  } else {
    //海口数据
    /**
     * 17595848583981 3 83 0898 460108 1 0 5 0 0 1642 0000-00-00 00:00:00 2017-09-20 03:20:00 14 NULL 2932979a59c14a3200007183013897db 3 110.4613 19.9425 110.462 19.9398 2017 09 20
     */
    var rowkey: String = ""
    val fields: Array[String] = line.split("\t")
    //println(fields.length)
    if (fields.length == 24 && !line.contains("dwv_order_make_haikou")) {
      //订单ID+出发时间作为hbase表的rowkey
      rowkey = fields(0) + "_" + fields(13).replaceAll("-", "") + fields(14).replaceAll(":", "")
      val put = new Put(rowkey.getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "ORDER_ID".getBytes(), fields(0).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "PRODUCT_ID".getBytes(), fields(1).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "CITY_ID".getBytes(), fields(2).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DISTRICT".getBytes(), fields(3).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "COUNTY".getBytes(), fields(4).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "TYPE".getBytes(), fields(5).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "COMBO_TYPE".getBytes(), fields(6).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "TRAFFIC_TYPE".getBytes(), fields(7).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "PASSENGER_COUNT".getBytes(), fields(8).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DRIVER_PRODUCT_ID".getBytes(), fields(9).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "START_DEST_DISTANCE".getBytes(), fields(10).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "ARRIVE_TIME".getBytes(), fields(11).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DEPARTURE_TIME".getBytes(), fields(12).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "PRE_TOTAL_FEE".getBytes(), fields(13).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "NORMAL_TIME".getBytes(), fields(14).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "BUBBLE_TRACE_ID".getBytes(), fields(15).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "PRODUCT_1LEVEL".getBytes(), fields(16).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DEST_LNG".getBytes(), fields(17).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DEST_LAT".getBytes(), fields(18).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "STARTING_LNG".getBytes(), fields(19).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "STARTING_LAT".getBytes(), fields(20).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "YEAR".getBytes(), fields(21).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "MONTH".getBytes(), fields(22).getBytes())
      put.addColumn(Constants.DEFAULT_FAMILY.getBytes(), "DAY".getBytes(), fields(23).getBytes())
      val table: Table = connection.getTable(TableName.valueOf(Constants.HTAB_HAIKOU_ORDER))
      table.put(put)
      table.close()
    }
  }
  eachLine

}

 

将GPS数据存入redis中

public static void saveChengDuJedis(String line) throws ParseException {
    Jedis jedis = getJedis();
    String[] split = line.split(",");
    String orderId = split[1];
    if (line.startsWith("end") && line.contains(",")) {
     //   System.out.println("我终于接受到了一条结束标识了,结束订单id为" + orderId);
        jedis.lpush(Constants.CITY_CODE_CHENG_DU + "_" + orderId, "end");
        //发现了结束标识,将订单从实时订单列表里面移除掉
        jedis.srem(Constants.REALTIME_ORDERS,Constants.CITY_CODE_CHENG_DU + "_" + orderId);
    } else {
        String driverId = split[0];
        String timestamp = split[2];
        String lng = split[3];
        String lat = split[4];
        //1.存入实时订单单号  使用redis的Set集合,自动对相同的订单id进行去重
        jedis.sadd(Constants.REALTIME_ORDERS, Constants.CITY_CODE_CHENG_DU + "_" + orderId);
        //2.存入实时订单的经纬度信息,使用set集合自动对经纬度信息进行去重操作
        jedis.lpush(Constants.CITY_CODE_CHENG_DU + "_" + orderId, lng + "," + lat);
        //3.存入订单的开始结束时间信息
        Order order = new Order();
        String hget = jedis.hget(Constants.ORDER_START_ENT_TIME, orderId);
        if(StringUtils.isNotEmpty(hget)){
            //已经有了数据了
            //将获取的数据与已经存在的数据比较,如果时间大于起始时间,更新结束时间和结束经纬度
            Order parseOrder = JSONObject.parseObject(hget, Order.class);
          //  System.out.println(parseOrder.toString());
            //当前数据时间比redis当中的结束时间更大,需要更新结束时间和结束经纬度
            if(Long.parseLong(timestamp) * 1000 > parseOrder.getEndTime()){
                parseOrder.setEndTime(Long.parseLong(timestamp) * 1000);
                parseOrder.setGetOfLat(lat);
                parseOrder.setGetOfLng(lng);
                jedis.hset(Constants.ORDER_START_ENT_TIME, orderId, JSONObject.toJSONString(parseOrder));
            }else if(Long.parseLong(timestamp) * 1000 < parseOrder.getStartTime()){
                parseOrder.setStartTime(Long.parseLong(timestamp) * 1000);
                parseOrder.setGetOnLat(lat);
                parseOrder.setGetOnLng(lng);
                jedis.hset(Constants.ORDER_START_ENT_TIME, orderId, JSONObject.toJSONString(parseOrder));
            }
        }else{
            //没有数据,将起始和结束时间设置成为一样的
            //上车经纬度和下车经纬度设置成为一样的
            order.setGetOnLat(lat);
            order.setGetOnLng(lng);
            order.setCityCode(Constants.CITY_CODE_CHENG_DU);
            order.setGetOfLng(lng);
            order.setGetOfLat(lat);
            order.setEndTime(Long.parseLong((timestamp + "000")));
            order.setStartTime(Long.parseLong((timestamp + "000")));
            order.setOrderId(orderId);
            //将对象转换成为字符串,存入到redis当中去
            jedis.hset(Constants.ORDER_START_ENT_TIME, orderId, JSONObject.toJSONString(order));
        }
        //每小时订单统计
        hourOrderCount(orderId, timestamp);
    }
    JedisUtil.returnJedis(jedis);

}

 

 

4、第四步提交消费的kafka的偏移量到 hbase的表中进行管理

//获取到消费完成的offset的偏移量
val offsetRanges: Array[OffsetRange] = eachRdd.asInstanceOf[HasOffsetRanges].offsetRanges
//将offset保存到hbase里面去,默认可以手动提交保存到kafak的一个topic里面去
//将offset保存到kafak里面去
// result.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)
for (eachrange <- offsetRanges) {
  val startOffset: Long = eachrange.fromOffset
  val endOffset: Long = eachrange.untilOffset
  val topic: String = eachrange.topic
  val partition: Int = eachrange.partition
  //将offset保存到hbase里面去
  HbaseTools.saveBatchOffset(group, topic, partition + "", endOffset)
}
def saveBatchOffset(group: String, topic: String, partition: String, offset: Long): Unit = {
  val conn: Connection = HbaseTools.getHbaseConn
  val table: Table = conn.getTable(TableName.valueOf(Constants.HBASE_OFFSET_STORE_TABLE))
  val rowkey = group + ":" + topic
  val columName = group + ":" + topic + ":" + partition
  val put = new Put(rowkey.getBytes())
  put.addColumn(Constants.HBASE_OFFSET_FAMILY_NAME.getBytes(), columName.getBytes(), offset.toString.getBytes())
  table.put(put)
  table.close()
  conn.close()
}

hbase_offset_store的存储格式如下 

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/169270.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

第二章.线性回归以及非线性回归—LASSO算法

第二章.线性回归以及非线性回归 2.13 LASSO算法 1.前期导入&#xff1a; 通过构造一个一阶惩罚函数获得一个精炼的模型&#xff1b;通过最终确定一些指标(变量)的系数为零&#xff0c;解释力很强 岭回归估计系数等于0的机会微乎其微&#xff0c;造成筛选变量困难 擅长处理具有…

如何实现根据环境切换不同配置?

在企业开发中&#xff0c;系统的配置信息往往会分不同的环境&#xff0c;如开发环境、测试环境、生产环境。当我们使用nacos做为配置中心时&#xff0c;一定会遇到的问题就是在应用中配置nacos的server-addr时测试环境的nacos地址和线上nacos地址如何区分的问题 拿开发环境和正…

4.4 可迭代对象(Iterable)与迭代器(Iterator)

4.4 可迭代对象(Iterable)与迭代器&#xff08;Iterator&#xff09; 4.4.1 可迭代&#xff08;Iterable&#xff09;对象 如果一个对象实现了__iter__方法&#xff0c;那么这个对象就是可迭代&#xff08;Iterable&#xff09;对象>>> #如何知道一个对象实现了那些…

STM32MP157内核移植相关bug

STM32MP157 官方Linux5.15内核移植相关bug一、主频问题二、驱动开发时的头文件缺失问题三、结语一、主频问题 在初学STM32MP157驱动开发时&#xff0c;笔者曾对官方最新版的Linux内核进行了移植&#xff0c;但是因为一些问题&#xff0c;导致移植后的系统存在一些bug。最近笔者…

Java学习之抽象类

目录 一、抽象类引出 二、抽象类的介绍 三、抽象类的细节 第一条 第二点 第三点 第四点 第五点 第六点 第七点 第八点 四、练习 第一题 第二题 一、抽象类引出 当父类的一些方法不能确定时,可以用abstract关键字来修饰该方法&#xff0c;这个方法就是抽象方法,用…

【设计模式】创建者模式·建造者模式

学习汇总入口【23种设计模式】学习汇总(数万字讲解体系思维导图) 写作不易&#xff0c;如果您觉得写的不错&#xff0c;欢迎给博主来一波点赞、收藏~让博主更有动力吧&#xff01; 一.概述 将一个复杂对象的构建与表示分离&#xff0c;使得同样的构建过程可以创建不同的表示。 …

Python实现哈里斯鹰优化算法(HHO)优化支持向量机回归模型(SVR算法)项目实战

说明&#xff1a;这是一个机器学习实战项目&#xff08;附带数据代码文档视频讲解&#xff09;&#xff0c;如需数据代码文档视频讲解可以直接到文章最后获取。 1.项目背景 2019年Heidari等人提出哈里斯鹰优化算法(Harris Hawk Optimization, HHO)&#xff0c;该算法有较强的全…

目标检测——day66 Scaled-YOLOv4: Scaling Cross Stage Partial Network

Scaled-Yolov4:可伸缩跨级部分网络 Scaled-YOLOv41. Introduction2. Related work2.1. Real-time object detection2.2. Model scaling&#xff08;模型缩放&#xff09;3. Principles of model scaling4. Scaled-YOLOv44.1. CSP-ized YOLOv44.2. YOLOv4-tiny4.3. YOLOv4-large…

上海亚商投顾:沪指缩量小幅调整 半导体与旅游股领涨

上海亚商投顾前言&#xff1a;无惧大盘涨跌&#xff0c;解密龙虎榜资金&#xff0c;跟踪一线游资和机构资金动向&#xff0c;识别短期热点和强势个股。市场情绪沪指今日缩量小幅调整&#xff0c;创业板指稍显强势&#xff0c;多数时间红盘运行&#xff0c;科创50指数涨超1%。半…

springMVC讲解(上)

SpringMvc 1&#xff0c;简介 springmvc是spring的一个后续产品&#xff0c;是spring的一个子项目&#xff0c;是spring为表示层所开发的一整套完备的解决方案&#xff0c;在表示层框架经历了struct、webWork、struct2等诸多产品的历代更迭之后&#xff0c;目前业界普遍选择了…

JAVA就业课程,面试题大全

面试整体流程1.1 简单的自我介绍我是xxxx,工作xxx年.我先后在xxxx公司、yyyy公司工作。先后做个xxxx项目、yyyy项目。1.2 你简单介绍一下xxxx项目为了解决xxxx问题&#xff0c;开发了一套xxxx系统&#xff0c;该系统主要有那些部分组成。简单介绍项目的整体架构。参与某个模块的…

【Python学习】输入和输出

前言 往期文章 【Python学习】列表和元组 【Python学习】字典和集合 【Python学习】条件和循环 在很多时候&#xff0c;你会想要让你的程序与用户&#xff08;可能是你自己&#xff09;交互。你会从用户那里得到输入&#xff0c;然后打印一些结果。我们可以使用iinput和pr…

MATLAB趣味绘图-内接正六边形旋转

MATLAB趣味绘图-内接正六边形旋转 观察一下内部的正六边形大概在外部的正六边形边的四等分点的位置&#xff0c;通过数学平面几何知识可得边和角度的迭代关系式为&#xff1a; an134an−1θnθn−1arctan⁡36a_n \frac{\sqrt{13}}{4} a_{n-1} \\ \theta_n \theta_{n-1}\arcta…

金融行业数据库场景下,SmartX 超融合表现如何|性能验证与落地实践

在金融行业&#xff0c;数据库是一项至关重要的 IT 基础设施。作为交易和数据的主要载体&#xff0c;数据库往往需要在短时间内处理大量的业务数据&#xff0c;其可靠性、稳定性和性能将直接影响业务系统的运行状态。而在进行数据库基础架构选型时&#xff0c;一些客户对于超融…

C++学习笔记——类和对象

1.面向对象的三大特性&#xff1a;封装、继承、多态 2.对象有其属性和行为 3.具有相同性质的对象&#xff0c;可被抽象为类 1.封装 1.封装是C面向对象三大特性之一 2.封装的意义&#xff1a; &#xff08;1&#xff09;将属性&#xff08;变量&#xff09;和行为&#xff…

python学习 --- 字典基础

目录 一、什么是字典&#xff1f; 1、字典示意图-无序说明 2、字典实现原理 二、字典的创建 1、使用花括号 2、使用内置函数dict() 三、字典常用操作 1、字典中元素的获取 2、key的判断&#xff08;存在与否&#xff09; 3、字典元素的删除 4、字典元素的新增 5、获…

计讯物联智慧水务解决方案:用“智水”捍卫生命之泉

项目背景 水是生命的源泉&#xff0c;是城市的灵魂&#xff0c;是农业的命脉&#xff0c;是工业的基石。2022年以来&#xff0c;基于国家政策的引导、科技革新的驱动与供排水需求增长&#xff0c;智慧水务的发展突飞猛进&#xff0c;从信息化到数字化&#xff0c;再到智能化&a…

若依 ruoyi vue el-switch 列表开关状态显示有误 全部关闭的问题

后台使用int类型传状态status的值但是前端列表展示的开关状态是未开启&#xff0c;实际上&#xff0c;后台传的都是开启的状态结果应该是这样确定后台传的status值 在 el-switch 标签中是否使用了正确的值判断&#xff0c;比如 后台用的是字符串、布尔 或者是 数值类型&#xf…

android判断文件是否存在跳转不同activity

android studio版本&#xff1a;2021.2.1Patch 2例程名称&#xff1a;ActivityJump完成日期&#xff1a;2023.1.17一直在完善一个小东西&#xff0c;也是不断的在学习。之前做的那个桌面日历天气&#xff08;老旧安卓手机发挥余热做桌面时钟摆件使用&#xff09;&#xff0c;有…

java 数列排序

试题 基础练习 数列排序提交此题 评测记录 资源限制内存限制&#xff1a;512.0MB C/C时间限制&#xff1a;1.0s Java时间限制&#xff1a;3.0s Python时间限制&#xff1a;5.0s问题描述给定一个长度为n的数列&#xff0c;将这个数列按从小到大的顺序排列。1<n<200输入格式…