日志收集系统:将应用产生的数据通过flume收集后发送到Kafka,整理后保存至hbase

news2025/1/11 10:07:04

目录

前言:功能描述

第一步:flume拉取日志数据,并将源数据保存至Kafka

flume配置文件:

users:

user_friends_raw:

events:

train:

第二步:Kafka源数据处理

方式一:KafkaStream

EventAttendStream

UserFriendsStream

方式二:sparkStreaming

EventAttendeesRaw -> EventAttendees

UserFriendRaw -> UserFriend

 第三步:Kafka topic数据发送到HBase

EventAttendee -> HBase

Events -> HBase

Train -> HBase

UserFriends -> HBase

Users -> HBase


前言:功能描述

 源数据为日志数据,通过flume采集后发送到Kafka,对源数据进行处理后保存至hbase。

第一步:flume拉取日志数据,并将源数据保存至Kafka

flume配置文件:

users:

users.sources=usersSource
users.channels=usersChannel
users.sinks=userSink

users.sources.usersSource.type=spooldir
users.sources.usersSource.spoolDir=/opt/flumelogfile/users
users.sources.usersSource.deserializer=LINE
users.sources.usersSource.deserializer.maxLineLength=320000
users.sources.usersSource.includePattern=user_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
users.sources.usersSource.interceptors=head_filter
users.sources.usersSource.interceptors.head_filter.type=regex_filter
users.sources.usersSource.interceptors.head_filter.regex=^user_id*
users.sources.usersSource.interceptors.head_filter.excludeEvents=true

users.channels.usersChannel.type=file
users.channels.usersChannel.checkpointDir=/opt/flumelogfile/checkpoint/users
users.channels.usersChannel.dataDirs=/opt/flumelogfile/data/users

users.sinks.userSink.type=org.apache.flume.sink.kafka.KafkaSink
users.sinks.userSink.batchSize=640
users.sinks.userSink.brokerList=192.168.136.20:9092
users.sinks.userSink.topic=users

users.sources.usersSource.channels=usersChannel
users.sinks.userSink.channel=usersChannel

user_friends_raw:

userfriends.sources=userfriendsSource
userfriends.channels=userfriendsChannel
userfriends.sinks=userfriendsSink

userfriends.sources.userfriendsSource.type=spooldir
userfriends.sources.userfriendsSource.spoolDir=/opt/flumelogfile/uf
userfriends.sources.userfriendsSource.deserializer=LINE
userfriends.sources.userfriendsSource.deserializer.maxLineLength=320000
userfriends.sources.userfriendsSource.includePattern=uf_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
userfriends.sources.userfriendsSource.interceptors=head_filter
userfriends.sources.userfriendsSource.interceptors.head_filter.type=regex_filter
userfriends.sources.userfriendsSource.interceptors.head_filter.regex=^user*
userfriends.sources.userfriendsSource.interceptors.head_filter.excludeEvents=true

userfriends.channels.userfriendsChannel.type=file
userfriends.channels.userfriendsChannel.checkpointDir=/opt/flumelogfile/checkpoint/uf
userfriends.channels.userfriendsChannel.dataDirs=/opt/flumelogfile/data/uf

userfriends.sinks.userfriendsSink.type=org.apache.flume.sink.kafka.KafkaSink
userfriends.sinks.userfriendsSink.batchSize=640
userfriends.sinks.userfriendsSink.brokerList=192.168.136.20:9092
userfriends.sinks.userfriendsSink.topic=user_friends_raw

userfriends.sources.userfriendsSource.channels=userfriendsChannel
userfriends.sinks.userfriendsSink.channel=userfriendsChannel

events:

events.sources=eventsSource
events.channels=eventsChannel
events.sinks=eventsSink

events.sources.eventsSource.type=spooldir
events.sources.eventsSource.spoolDir=/opt/flumelogfile/events
events.sources.eventsSource.deserializer=LINE
events.sources.eventsSource.deserializer.maxLineLength=320000
events.sources.eventsSource.includePattern=events_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
events.sources.eventsSource.interceptors=head_filter
events.sources.eventsSource.interceptors.head_filter.type=regex_filter
events.sources.eventsSource.interceptors.head_filter.regex=^event_id*
events.sources.eventsSource.interceptors.head_filter.excludeEvents=true

events.channels.eventsChannel.type=file
events.channels.eventsChannel.checkpointDir=/opt/flumelogfile/checkpoint/events
events.channels.eventsChannel.dataDirs=/opt/flumelogfile/data/events

events.sinks.eventsSink.type=org.apache.flume.sink.kafka.KafkaSink
events.sinks.eventsSink.batchSize=640
events.sinks.eventsSink.brokerList=192.168.136.20:9092
events.sinks.eventsSink.topic=events

events.sources.eventsSource.channels=eventsChannel
events.sinks.eventsSink.channel=eventsChannel

event_attendees_raw:

ea.sources=eaSource
ea.channels=eaChannel
ea.sinks=eaSink

ea.sources.eaSource.type=spooldir
ea.sources.eaSource.spoolDir=/opt/flumelogfile/ea
ea.sources.eaSource.deserializer=LINE
ea.sources.eaSource.deserializer.maxLineLength=320000
ea.sources.eaSource.includePattern=ea_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
ea.sources.eaSource.interceptors=head_filter
ea.sources.eaSource.interceptors.head_filter.type=regex_filter
ea.sources.eaSource.interceptors.head_filter.regex=^event*
ea.sources.eaSource.interceptors.head_filter.excludeEvents=true

ea.channels.eaChannel.type=file
ea.channels.eaChannel.checkpointDir=/opt/flumelogfile/checkpoint/ea
ea.channels.eaChannel.dataDirs=/opt/flumelogfile/data/ea

ea.sinks.eaSink.type=org.apache.flume.sink.kafka.KafkaSink
ea.sinks.eaSink.batchSize=640
ea.sinks.eaSink.brokerList=192.168.136.20:9092
ea.sinks.eaSink.topic=event_attendees_raw

ea.sources.eaSource.channels=eaChannel
ea.sinks.eaSink.channel=eaChannel

train:

train.sources=trainSource
train.channels=trainChannel
train.sinks=trainink

train.sources.trainSource.type=spooldir
train.sources.trainSource.spoolDir=/opt/flumelogfile/train
train.sources.trainSource.deserializer=LINE
train.sources.trainSource.deserializer.maxLineLength=320000
train.sources.trainSource.includePattern=train_[0-9]{4}-[0-9]{2}-[0-9]{2}.csv
train.sources.trainSource.interceptor=head_filter
train.sources.trainSource.interceptor.head_filter.type=regex_filter
train.sources.trainSource.interceptor.head_filter.regex=^user*
train.sources.trainSource.interceptor.head_filter.excludeEvents=true

train.channels.trainChannel.type=file
train.channels.trainChannel.checkpointDir=/opt/flumelogfile/checkpoint/train
train.channels.trainChannel.dataDirs=/opt/flumelogfile/data/train

train.sinks.trainink.type=org.apache.flume.sink.kafka.KafkaSink
train.sinks.trainink.batchSize=640
train.sinks.trainink.brokerList=192.168.136.20:9092
train.sinks.trainink.topic=train

train.sources.trainSource.channels=trainChannel
train.sinks.trainink.channel=trainChannel                               

 

第二步:Kafka源数据处理

user_friends和event_attendees需要对元数据进行处理,在这里我们有两种方式。一种是KafkaStream将数据消费后发送到新的topic。

方式一:KafkaStream

第一步:初始化配置

Properties properties = new Properties();

    // 组id
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "userfriend");    
    // 设备id
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
    // key反序列化
properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
    // value反序列化
properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

第二步:创建流构造器

StreamsBuilder builder = new StreamsBuilder();

第三步:从topic中取数据,放入到新的topic中

builder.stream("event_attendees_raw").to("event_attendees");

上面一行相当于下面两行
KStream<Object, Object> mystreamin = builder.stream("event_attendees_raw");
mystreamin.to("event_attendees");

第四步:通过建造者模式,创建kafkaStreams

Topology topo = builder.build();
 
KafkaStreams kafkaStreams = new KafkaStreams(topo, properties);

第五步:启动程序

 kafkaStreams.start();

EventAttendStream

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import java.util.ArrayList;
import java.util.Properties;

public class EventAttendStream {
    public static void main(String[] args) {

// 1. 初始化配置
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "userfriend");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

// 2. 创建流构造器
        StreamsBuilder builder = new StreamsBuilder();

// 3. 将topic event_attendees_raw的数据取出,处理后发送到topic event_attendees
        builder.stream("event_attendees_raw")
                .flatMap((key,value)->{
                            ArrayList<KeyValue<String, String>> list = new ArrayList<>();
                            String[] fields = value.toString().split(",");
                            String event = fields[0];
                            if(event.trim().length()>0){
                                if (fields.length>=2){ //判断yes的位置是否有值
                                    String[] yes = fields[1].split("\\s+");
                                    for (String y : yes) {
                                        System.out.println(event+" "+y+" yes");
                                        KeyValue<String, String> kv = new KeyValue<>(null, event + "," + y+" yes");
                                        list.add(kv);
                                    }
                                }
                                if (fields.length>=3){ //判断maybe的位置是否有值
                                    String[] maybe = fields[2].split("\\s+");
                                    for (String my : maybe) {
                                        System.out.println(event+" "+my+" maybe");
                                        KeyValue<String, String> kv = new KeyValue<>(null, event + "," + my+" maybe");
                                        list.add(kv);
                                    }
                                }
                                if (fields.length>=4){ //判断invited的位置是否有值
                                    String[] invited  = fields[3].split("\\s+");
                                    for (String in : invited) {
                                        System.out.println(event+" "+in+" invited");
                                        KeyValue<String, String> kv = new KeyValue<>(null, event + "," + in+" invited");
                                        list.add(kv);
                                    }
                                }
                                if (fields.length>=5){  //判断no的位置是否有值
                                    String[] no = fields[4].split("\\s+");
                                    for (String n : no) {
                                        System.out.println(event+" "+n+" no");
                                        KeyValue<String, String> kv = new KeyValue<>(null, event + "," + n+" no");
                                        list.add(kv);
                                    }
                                }
                            }
                            return list;
                        })
                .to("event_attendees");

// 4. kafkaStream需要传递两个参数,Topology topology, Properties props
// props配置通过new Properties()实现
// Topology则通过builder.build()创建

        Topology topo = builder.build();
        KafkaStreams streams = new KafkaStreams(topo, properties);

// 5. 启动程序
        streams.start();
    }
}

 

UserFriendsStream

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import java.util.ArrayList;
import java.util.Properties;


public class UserFriendsStream {
    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"userfriend");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092");
        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"false");

        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"earliest");
        properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());

        StreamsBuilder builder = new StreamsBuilder();

        // 主要业务逻辑处理 start
        builder.stream("user_friends_raw")
                .flatMap((key,value)->{
                    ArrayList<KeyValue<String,String>> list = new ArrayList<>();
                    String[] fields = value.toString().split(",");
                    if(fields.length==2){
                        String userid = fields[0];
                        String[] friends = fields[1].split("\\s+");
                        for (String friendid :
                                friends) {
                            System.out.println(userid + " " + friendid);
                            KeyValue kv = new KeyValue<String, String>(null, userid + "," + friendid);
                            list.add(kv);
                        }
                    }

                    return list;
                })
                .to("user_friends");
        // 主要业务逻辑处理 end

        Topology topo = builder.build();
        KafkaStreams streams = new KafkaStreams(topo, properties);

        streams.start();
    }
}

方式二:sparkStreaming

第一步:配置sparkStream

val conf = new SparkConf().setAppName("user_friends_raw").setMaster("local[*]")
val streamingContext = new StreamingContext(conf,Seconds(5))

第二步:kafka配置

val kafkaParams = Map(
    (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
    (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
    (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
    (ConsumerConfig.GROUP_ID_CONFIG -> "ea01"),
)

第三步:KafkaStream创建流

val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent,
    ConsumerStrategies.Subscribe(Set("user_friends_raw"), kafkaParams)
)

第四步:对RDD进行处理

kafkaStream.foreachRDD(
    rdd=>{
        rdd.foreachPartition(x=>x处理)
    }
)

第五步:创建Kafka producer并发送

val props = new util.HashMap[String,Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
 props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)

val record = new ProducerRecord[String,String]("user_friends2",userid+","+friend)
producer.send(record)

第六步:启动采集器

streamingContext.start()
streamingContext.awaitTermination()

EventAttendeesRaw -> EventAttendees

import java.util
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}

object SparkStreamEventAttendeesrawToEventAttends {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("user_friends_raw").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf,Seconds(5))

    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "ea01"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("event_attendees_raw"), kafkaParams)
    )

    kafkaStream.foreachRDD(rdd=>{
      rdd.foreachPartition(x=>{
        val props = new util.HashMap[String,Object]()
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
        val producer = new KafkaProducer[String,String](props)

        x.foreach(y=> { // event,yes,maybe,invited,no
          val fields = y.value().split(",")
          if(fields.length>=0) {
            val events = fields(0)
            if (fields.length >= 2) {
              val yess = fields(1).split("\\s+")
              for (yes <- yess) {
                val record = new ProducerRecord[String, String]("event_attendees2", events + "," + yes)
                producer.send(record)
              }
            }
            if (fields.length >= 3) {
              val maybes = fields(2).split("\\s+")
              for (maybe <- maybes) {
                val record = new ProducerRecord[String, String]("event_attendees2", events + "," + maybe)
                producer.send(record)
              }
            }
            if (fields.length >= 4) {
              val inviteds = fields(3).split("\\s+")
              for (invited <- inviteds) {
                val record = new ProducerRecord[String, String]("event_attendees2", events + "," + invited)
                producer.send(record)
              }
            }
            if (fields.length >= 5) {
              val nos = fields(4).split("\\s+")
              for (no <- nos) {
                val record = new ProducerRecord[String, String]("event_attendees2", events + "," + no)
                producer.send(record)
              }
            }
          }
          })
      })
    }
    )

// 启动采集器
      streamingContext.start()
      streamingContext.awaitTermination()

  }

}

UserFriendRaw -> UserFriend

import java.util

import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerConfig, ProducerRecord}
import org.apache.spark.SparkConf
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.kafka010.{ConsumerStrategies, KafkaUtils, LocationStrategies}
import org.apache.spark.streaming.{Seconds, StreamingContext}

object SparkStreamUserFriendrawToUserFriend {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("user_friends_raw").setMaster("local[*]")
    val streamingContext = new StreamingContext(conf,Seconds(5))

    streamingContext.checkpoint("checkpoint")

    val kafkaParams = Map(
      (ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "192.168.136.20:9092"),
      (ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> "org.apache.kafka.common.serialization.StringDeserializer"),
      (ConsumerConfig.GROUP_ID_CONFIG -> "uf"),
      (ConsumerConfig.AUTO_OFFSET_RESET_CONFIG->"earliest")
    )
    val kafkaStream: InputDStream[ConsumerRecord[String, String]] = KafkaUtils.createDirectStream(
      streamingContext,
      LocationStrategies.PreferConsistent,
      ConsumerStrategies.Subscribe(Set("user_friends_raw"), kafkaParams)
    )

    kafkaStream.foreachRDD(
      rdd=>{
        rdd.foreachPartition(x=>{
          val props = new util.HashMap[String,Object]()
          props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"192.168.136.20:9092")
          props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
          props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,"org.apache.kafka.common.serialization.StringSerializer")
          val producer = new KafkaProducer[String,String](props)
          x.foreach(y=>{
            val splits = y.value().split(",")
            if(splits.length==2){
              val userid = splits(0)
              val friends = splits(1).split("\\s+")
              for(friend<-friends){
                val record = new ProducerRecord[String,String]("user_friends2",userid+","+friend)
                producer.send(record)
              }

            }
          })
        })
      }
    )


    streamingContext.start()
    streamingContext.awaitTermination()
  }

}

 

 第三步:Kafka topic数据发送到HBase

第一步:初始化配置

Properties properties = new Properties();

properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);
properties.put(ConsumerConfig.GROUP_ID_CONFIG, "event_attendee");

第二步:配置Kafka消费者,消费数据

KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
consumer.subscribe(Collections.singleton("event_attendees"));

第三步:配置hbase信息,连接hbase数据库

Configuration conf = HBaseConfiguration.create();

conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
conf.set(HConstants.CLIENT_PORT_STR,"2181");

Connection connection = ConnectionFactory.createConnection(conf);
Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:event_attendee"));

第四步:consumer消费数据

ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));

第五步:数据拉取出来之后插入到hbase中

ArrayList<Put> datas = new ArrayList<>();

for (ConsumerRecord<String, String> record : poll) {

// 将Kafka中的数据按照","分隔,
    String[] split = record.value().split(",");

    // rowkey
    Put put = new Put(Bytes.toBytes(split[0]));
    // 列族
    put.addColumn("eu".getBytes(), "user".getBytes(), split[0].getBytes());

    // put作为一条数据,插入到datas中
    datas.add(put);
}

EventAttendee -> HBase

package nj.zb.kb21.kafka.kafkatohb;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class EventAttendeToHB {

    static int num = 0; // 计数器

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  // 手动提交
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");   // 自动提交时,提交时间
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "event_attendee");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置Kafka信息。消费消息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("event_attendees"));

        // 配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            Table eventAttendTable = connection.getTable(TableName.valueOf("events_db:event_attendee"));

            while (true) {
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
                ArrayList<Put> datas = new ArrayList<>();
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.value()); // userid,friendid
                    String[] eventattend = record.value().split(",");
                    Put put = new Put(Bytes.toBytes(eventattend[0]+eventattend[1].split("\\s")[0]+eventattend[1].split("\\s")[1]));     // rowkey
                    put.addColumn("euat".getBytes(), "eventid".getBytes(), Bytes.toBytes(eventattend[0]));
                    put.addColumn("euat".getBytes(), "friendid".getBytes(), Bytes.toBytes(eventattend[1].split("\\s")[0]));
                    put.addColumn("euat".getBytes(), "state".getBytes(), Bytes.toBytes(eventattend[1].split("\\s")[1]));

                    datas.add(put);
                }
                num = num + datas.size();
                System.out.println("----------num:" + num);
                if (datas.size() != 0)
                    eventAttendTable.put(datas);

                Thread.sleep(10);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

}

Events -> HBase

package nj.zb.kb21.kafka.kafkatohb;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class EventsToHB {
    static int num = 0; // 计数器

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  // 手动提交
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");   // 自动提交时,提交时间
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "events");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置Kafka信息。消费消息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("events"));

        // 配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:events"));

            while (true){
                ConsumerRecords<String,String> poll = consumer.poll(Duration.ofMillis(100));
                ArrayList<Put> datas = new ArrayList<>();
                for (ConsumerRecord<String, String> record : poll){
                    System.out.println(record.value()); // userid,friendid
                    String[] event = record.value().split(",");
                    Put put = new Put(Bytes.toBytes(event[0]));     // row_key
                    put.addColumn("creator".getBytes(),"userid".getBytes(),event[1].getBytes());
                    put.addColumn("schedule".getBytes(),"starttime".getBytes(),event[2].getBytes());
                    put.addColumn("location".getBytes(),"city".getBytes(),event[3].getBytes());
                    put.addColumn("location".getBytes(),"state".getBytes(),event[4].getBytes());
                    put.addColumn("location".getBytes(),"zip".getBytes(),event[5].getBytes());
                    put.addColumn("location".getBytes(),"country".getBytes(),event[6].getBytes());
                    put.addColumn("location".getBytes(),"lat".getBytes(),event[7].getBytes());
                    put.addColumn("location".getBytes(),"lng".getBytes(),event[8].getBytes());
                    put.addColumn("remark".getBytes(),"commonwords".getBytes(),event[9].getBytes());

                    datas.add(put);
                }
                num = num + datas.size();
                System.out.println("----------num:"+num);
                if (datas.size()!=0)
                    userFriendTable.put(datas);

                Thread.sleep(10);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

Train -> HBase

package nj.zb.kb21.kafka.kafkatohb;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class TrainToHB {
    static int num = 0; // 计数器

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  // 手动提交
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");   // 自动提交时,提交时间
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "train");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置Kafka信息。消费消息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("train"));

        // 配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            Table usersTable = connection.getTable(TableName.valueOf("events_db:train"));

            while (true) {
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
                ArrayList<Put> datas = new ArrayList<>();
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.value()); // userid,friendid
                    String[] split = record.value().split(",");
                    Put put = new Put(Bytes.toBytes(split[0]));     // rowkey
                    put.addColumn("eu".getBytes(), "user".getBytes(), split[0].getBytes());
                    put.addColumn("eu".getBytes(), "event".getBytes(), split[1].getBytes());
                    put.addColumn("eu".getBytes(), "invited".getBytes(), split[2].getBytes());
                    put.addColumn("eu".getBytes(), "timestamp".getBytes(), split[3].getBytes());
                    put.addColumn("eu".getBytes(), "interested".getBytes(), split[4].getBytes());
                    put.addColumn("eu".getBytes(), "not_interested".getBytes(), split[5].getBytes());

                    datas.add(put);
                }
                num = num + datas.size();
                System.out.println("----------num:" + num);
                if (datas.size() != 0)
                    usersTable.put(datas);

                Thread.sleep(10);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

UserFriends -> HBase

package nj.zb.kb21.kafka.kafkatohb;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.protocol.types.Field;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class UserFriendsToHB {

    static int num = 0; // 计数器

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  // 手动提交
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");   // 自动提交时,提交时间
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "userfriend_group");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置Kafka信息。消费消息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("user_friends"));

        // 配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            Table userFriendTable = connection.getTable(TableName.valueOf("events_db:user_friend"));

            while (true){
                ConsumerRecords<String,String> poll = consumer.poll(Duration.ofMillis(100));
                ArrayList<Put> datas = new ArrayList<>();
                for (ConsumerRecord<String, String> record : poll){
                    System.out.println(record.value()); // userid,friendid
                    String[] split = record.value().split(",");
                    Put put = new Put(Bytes.toBytes((split[0]+split[1]).hashCode()));   // rowkey
                    put.addColumn("uf".getBytes(),"userid".getBytes(),split[0].getBytes());
                    put.addColumn("uf".getBytes(),"friendid".getBytes(),split[1].getBytes());
                    datas.add(put);
                }
                num = num + datas.size();
                System.out.println("----------num:"+num);
                if (datas.size()!=0)
                    userFriendTable.put(datas);

                Thread.sleep(10);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

    }
}

Users -> HBase

package nj.zb.kb21.kafka.kafkatohb;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.io.IOException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Properties;

public class UsersToHB {
    static int num = 0; // 计数器

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.136.20:9092");
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);

        properties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");  // 手动提交
        properties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "1000");   // 自动提交时,提交时间
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "users");
        properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // 配置Kafka信息。消费消息
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(properties);
        consumer.subscribe(Collections.singleton("users"));

        // 配置hbase信息,连接hbase数据库
        Configuration conf = HBaseConfiguration.create();
        conf.set(HConstants.HBASE_DIR,"hdfs://192.168.136.20:9000/hbase");
        conf.set(HConstants.ZOOKEEPER_QUORUM,"192.168.136.20");
        conf.set(HConstants.CLIENT_PORT_STR,"2181");

        Connection connection = null;
        try {
            connection = ConnectionFactory.createConnection(conf);
            Table usersTable = connection.getTable(TableName.valueOf("events_db:users"));

            while (true) {
                ConsumerRecords<String, String> poll = consumer.poll(Duration.ofMillis(100));
                ArrayList<Put> datas = new ArrayList<>();
                for (ConsumerRecord<String, String> record : poll) {
                    System.out.println(record.value()); // userid,friendid
                    String[] user = record.value().split(",");
                    Put put = new Put(Bytes.toBytes(user[0]));  // rowkey
                    put.addColumn("profile".getBytes(), "birthyear".getBytes(), user[2].getBytes());
                    put.addColumn("profile".getBytes(), "gender".getBytes(), user[3].getBytes());
                    put.addColumn("region".getBytes(), "locale".getBytes(), user[1].getBytes());
                    if (user.length > 5)
                        put.addColumn("region".getBytes(), "location".getBytes(), user[5].getBytes());
                    if (user.length > 6)
                        put.addColumn("region".getBytes(), "timezone".getBytes(), user[6].getBytes());
                    if (user.length > 4)
                        put.addColumn("registration".getBytes(), "joinAt".getBytes(), user[4].getBytes());

                    datas.add(put);
                }
                num = num + datas.size();
                System.out.println("----------num:" + num);
                if (datas.size() != 0)
                    usersTable.put(datas);

                Thread.sleep(10);
            }
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/488933.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

滚珠螺杆在设备上的应用

滚珠螺杆跟直线导轨一样&#xff0c;是很多机械设备上不可或缺的重要部件&#xff0c;它是确保机器能够具备高加工精度的前提条件&#xff0c;因此本身对于精度的要求也相当地高。今天&#xff0c;我们就来了解一下滚珠螺杆在不同设备上的应用吧&#xff01; 1、大型的加工中心…

磁盘U盘变本地磁盘寻回教程

磁盘损坏怎么恢复&#xff1f;磁盘是我们工作、学习和生活中常用的信息存储工具&#xff0c;因为容量大、价格便宜而深受人们的喜爱&#xff0c;因此磁盘也成为了我们一些重要信息的信息载具。磁盘U盘变本地磁盘寻回教程这时我们该如何恢复我们丢失的数据呢&#xff1f;这个时候…

ubuntu 安装 notepad++,显示中文菜单,并解决中文乱码问题

1.安装notepad sudo snap install notepad-plus-plus sudo snap install wine-platform-runtime2. notepad中文乱码问题 安装完成之后&#xff0c;输入中文会显示“口口…”&#xff0c;实际上并不是缺少什么windows字库&#xff0c;而是刚安装好的notepad默认字体是Courier …

4月VR大数据:PICO平台应用近400款,领跑国内VR生态

Hello大家好&#xff0c;每月一期的VR内容/硬件大数据统计又和大家见面了。 想了解VR软硬件行情么&#xff1f;关注这里就对了。我们会统计Steam平台的用户及内容等数据&#xff0c;每月初准时为你推送&#xff0c;不要错过喔&#xff01; 本数据报告包含&#xff1a;Steam VR硬…

软件测试面试题最牛汇总,不会有人没有这份文档吧

常见的面试题汇总 1、你做了几年的测试、自动化测试&#xff0c;说一下 selenium 的原理是什么&#xff1f; 我做了五年的测试&#xff0c;1年的自动化测试&#xff1b; selenium 它是用 http 协议来连接 webdriver &#xff0c;客户端可以使用 Java 或者 Python 各种编程语言…

一个.Net版本的ChatGPT SDK

ChatGPT大火&#xff0c;用它来写代码、写表白书、写文章、写对联、写报告、写周边… 啥都会&#xff01; 个人、小公司没有能力开发大模型&#xff0c;但基于开放平台&#xff0c;根据特定的场景开发应用&#xff0c;却是非常火热的。 为了避免重复造轮子&#xff0c;今天给…

你真的会跟 ChatGPT 聊天吗?(上)

前言&#xff1a;即使你对文中提及的技术不大了解&#xff0c;你也可以毫无压力地看完这篇描述如何更好地获得 ChatGPT 生成内容的文章。因为我也是利用 Azure OpenAI 等认知服务来学习&#xff0c;然后就这样写出来的。所以&#xff0c;舒服地坐下来&#xff0c;慢慢看吧&…

网络计算模式复习(三)

云计算和网格技术的差别 相对于网格计算&#xff0c;在表现形式上&#xff0c;云计算拥有明显的特点&#xff1a; 低成本&#xff0c;这是最突出的特点虚拟机的支持&#xff0c;得在网络环境下的一些原来比较难做的事情现在比较容易处理镜像部署的执行&#xff0c;这样就能够…

【微服务 | 学成在线】项目易错重难点分析(媒资管理模块篇·下)

文章目录 视频处理视频编码和文件格式文件格式和视频编码方式区别ProcessBuilder分布式任务调度XXL-JOBXXL-JOB配置XXL-JOB使用分片广播技术方案视频处理方案及实现思路分布式锁 视频处理 视频编码和文件格式 什么是视频编码&#xff1f; 同时我们还要知道我们为什么要对视频…

家用洗地机哪款好?2023入门级智能洗地机

现代社会对卫生日益重视&#xff0c;尤其是在工业、商业和公共场所要求越来越高。传统清洁方式不能满足人们的需求&#xff0c;清洁工作效率低且卫生难以保证。而洗地机的出现&#xff0c;正是为了解决这些问题。它能够深入清洁地面&#xff0c;有效防止不必要的污垢、细菌和病…

小满nestjs(第二十八章 nestjs 事务)

事务的四大特性 事务具有4个基本特征&#xff0c;分别是&#xff1a;原子性&#xff08;Atomicity&#xff09;、一致性&#xff08;Consistency&#xff09;、隔离性&#xff08;Isolation&#xff09;、持久性&#xff08;Duration&#xff09;&#xff0c;简称ACID ① 原子…

2023年5月产品经理认证NPDP线上班火热招生中

产品经理国际资格认证NPDP是新产品开发方面的认证&#xff0c;集理论、方法与实践为一体的全方位的知识体系&#xff0c;为公司组织层级进行规划、决策、执行提供良好的方法体系支撑。 【认证机构】 产品开发与管理协会&#xff08;PDMA&#xff09;成立于1979年&#xff0c;是…

23年5月高项备考学习笔记 —— 信息系统治理

治理是管理的控制 IT治理&#xff1a;关注风险 治理的驱动因素&#xff1a; 信息孤岛 资源整合目的空泛&#xff0c;缺少规划 目标价值&#xff1a; 与业务目标一致 有效利用信息资源 风险管理 管理层次&#xff1a; 最高管理层&#xff1a;董事会、证实***、战略 执行管理…

数值分析-埃特金算法

目录 一、前言 二、什么是埃特金算法 三、埃特金算法的原理 四、埃特金算法的步骤 1.确定插值点和半方差函数模型 2.计算插值点与已知点之间的距离和半方差函数值 3.确定权重 4.进行插值计算 5.评估插值结果 五、埃特金算法的优缺点 一、前言 数值分析是数学中的一个…

CUDA Stream, Event 与 NVVP

文章目录 一、CUDA StreamAPI实战CUDA Stream和 Serial执行的对比&#xff1a;PCIE和NVLINKCUDA Stream 多流的收益和上限CUDA Kernel合并CUDA7中的Per-Thread编译选项 二、Event三、NVVP四、知识点四 一、CUDA Stream CUDA Stream是GPU上task的执行队列&#xff0c;所有CUDA操…

Mysql表索引(总结篇)

目录 前言 ✨✨✨大家好&#xff0c;我是会飞的鱼-blog&#xff0c;今天我来给大家介绍一下Mysql&#xff0c;有不足之处&#xff0c;请大家多多指教。感谢大家支持&#xff01;&#xff01;&#xff01; 一、索引的概述 1.索引类型 2.索引存储 3.索引优缺点 4.使用建议…

如何在Windows上搭建NFS服务器实现开发板与Windows之间的文件共享

目录 1 安装nfs.exe 2 mounting 172.31.8.183:/f/nfs on /mnt/nfs failed: No such file or directory 3 mounting 172.31.8.183:/d/nfs on /mnt/nfs failed: Permission denied 1 安装nfs.exe 某项目中需要把程序放到Linux开发板中测试&#xff0c;刚开始使用tftp命令下载…

常见8大排序算法详解

常见8大排序算法 分别是冒泡排序、选择排序、插入排序、希尔排序、快速排序、堆排序、归并排序、基数排序&#xff08;桶排序&#xff09; 冒泡排序 思路 n个数字从小到大排序&#xff0c;每个数和它后面的数比较&#xff0c;小的放前面&#xff0c;大的放后面&#xff0c;…

微信小程序学习实录5(H5嵌入小程序、map组件、地图调起功能、腾讯百度高德导航页、返回web-view页)

H5嵌入微信小程序 一、H5页面地图1.H5地图加载2.标注事件 二、H5返回微信小程序1.H5页面核心代码2.微信小程序接收传参核心代码 三、开发中遇见的坑1.wx.openLocation调起地图后需要点击两次返回才到web-view页面2.H5无法调用百度定位new BMap.Geolocation对象3.安卓某些机型无…

Linux:《tar》归档命令

准备好4个文件然后使用tar命令进行归档 最常用的是 -c, --create&#xff08;小写&#xff09; 建立新的存档 -f, --file [HOSTNAME:]F 指定存档或设备 (缺省为 /dev/rmt0) -z, --gzip, --ungzip 用 gzip 对存档压缩或解压 -j&…