陌陌综合案例

news2024/11/18 14:29:38

注:大家觉得博客好的话,别忘了点赞收藏呀,本人每周都会更新关于人工智能和大数据相关的内容,内容多为原创,Python Java Scala SQL 代码,CV NLP 推荐系统等,Spark Flink Kafka Hbase Hive Flume等等~写的都是纯干货,各种顶会的论文解读,一起进步。
今天继续和大家分享一下陌陌综合案例
#博学谷IT学习技术支持

文章目录

  • 前言
  • 一、apache flume
  • 二、kafka
  • 三、陌陌案例_接收消息, 写入到HBase
    • 1.在HBase中创建表
    • 2.陌陌的rowkey设计
    • 3.创建maven的项目 添加相关的依赖:
  • 四、陌陌案例_对接Phoenix
  • 五、陌陌案例_对接HIVE
  • 六、陌陌案例_基于Flink 进行实时统计计算
    • 1.创建maven的项目 添加相关的依赖:
    • 2.封装pojo类:
    • 3.封装写入MySQL数据库的类:
    • 4.代码实现,写入MySQL数据库
  • 七、陌陌案例_MySQL数据
  • 九、FineBI集成实时功能
  • 总结


前言

在这里插入图片描述
这是一个陌陌真是综合案例
项目架构如图所示
1.离线部分:flume+kafka+HBase+Hive/phoneix
2:实时部分:flume+kafka+flink+MySQL+FineBI


一、apache flume

flume目前是apache旗下的一款顶级开源项目, 最初是有cloudera公司开发的, 后期贡献给apache, flume是一款专门用于数据数据采集的工作, 主要的目的将数据从一端传输的另一端操作
整个flume启动后, 就是一个agent实例对象, 而一个agent实例对象一般有三大组件组成:

    1. source组件: 数据源 主要用于对接数据源, 从数据源中采集数据 flume提供多种source组件
    1. sink组件: 下沉地(目的地) 主要用于将数据源采集过来数据通过sink下沉具体的目的中 flume提供多种sink组件
    1. channel组件: 管道 主要起到缓存的作用, 从source将数据写入到channel从, sink从channel获取数据, 然后继续下沉即可, flume提供多种channel组件
  • 在这里插入图片描述
    采集的需求:
    监听 /export/data/momo_data/MOMO_DATA.dat 此文件, 一旦这个文件中有新的内容出现, 将对应数据写入到Kafka中, 同时还支持未来的扩展需要, 要求既能监听文件, 在未来也可以扩展监听目录
vim momo_tailDir_kafka.conf

添加以下内容:
a1.sources = r1
a1.sinks = k1
a1.channels = c1

a1.sources.r1.type = TAILDIR
a1.sources.r1.positionFile = /export/data/flume/taildir_position.json
a1.sources.r1.filegroups = f1
a1.sources.r1.filegroups.f1 = /export/data/momo_data/MOMO_DATA.dat
a1.sources.r1.maxBatchCount = 10

a1.channels.c1.type = memory

a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.kafka.topic = MOMO_MSG
a1.sinks.k1.kafka.bootstrap.servers = node1.itcast.cn:9092,node2.itcast.cn:9092,node3.itcast.cn:9092
a1.sinks.k1.kafka.flumeBatchSize = 10
a1.sinks.k1.kafka.producer.acks = 1
a1.sinks.k1.kafka.producer.linger.ms = 1

a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1

二、kafka

flume采集的数据作为kafka的生产者。消费者1为HBASE,消费者2为FLINK。
在Kafka中创建 MOMO_MSG 的Topic

创建Topic
./kafka-topics.sh --create --zookeeper node1:2181,node2:2181,node3:2181 --topic MOMO_MSG --partitions 6 --replication-factor 2

启动Flume组件, 准备进行数据采集工作

启动Flume
cd /export/server/flume/bin
./flume-ng agent -n a1  -c ../conf  -f ../conf/momo_tailDir_kafka.conf  -Dflume.root.logger=INFO,console 

测试是否正常采集数据

cd /export/server/kafka/bin/
./kafka-console-consumer.sh --bootstrap-server node1:9092,node2:9092,node3:9092 --topic MOMO_MSG

三、陌陌案例_接收消息, 写入到HBase

1.在HBase中创建表

create_namespace 'MOMO_CHAT'
create 'MOMO_CHAT:MOMO_MSG',{NAME=>'C1',COMPRESSION=>'GZ'},{NUMREGIONS=>6,SPLITALGO=>'HexStringSplit'}

2.陌陌的rowkey设计

MD5HASH_发件人账户_收件人账户_消息时间(时间戳)

3.创建maven的项目 添加相关的依赖:

    <repositories><!--代码库-->
        <repository>
            <id>aliyun</id>
            <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
            <releases><enabled>true</enabled></releases>
            <snapshots>
                <enabled>false</enabled>
                <updatePolicy>never</updatePolicy>
            </snapshots>
        </repository>
    </repositories>


    <dependencies>

        <!--Hbase 客户端-->
        <dependency>
            <groupId>org.apache.hbase</groupId>
            <artifactId>hbase-client</artifactId>
            <version>2.1.0</version>
        </dependency>
        <!--kafka 客户端-->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.1</version>
                <configuration>
                    <target>1.8</target>
                    <source>1.8</source>
                </configuration>
            </plugin>
        </plugins>
    </build>

代码实现:

package com.itheima.momo_chat;

import org.apache.commons.lang.text.StrBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;

public class MomoChatConsumerToHBase {

    private static Connection hbaseConn;
    private static Table table;

    static{
        try {
            // 2.1 根据hbase的连接工厂类 创建hbase的连接对象
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
            hbaseConn = ConnectionFactory.createConnection(conf);

            // 2.2 获取Hbase的管理类对象: admin / table
            table = hbaseConn.getTable(TableName.valueOf("MOMO_CHAT:MOMO_MSG"));
        }catch (Exception e){
            e.printStackTrace();
        }
    }

    public static void main(String[] args) throws Exception {

        //1. 接收Kafka中消息数据: topic 为 MOMO_MSG
        //1.1 创建Kafka的消费者核心类对象
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.setProperty("group.id","MOMO_G1");
        props.setProperty("enable.auto.commit","true");
        props.setProperty("auto.commit.interval.ms","1000");
        props.setProperty("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props);

        //1.2 设置监听Topic
        consumer.subscribe(Arrays.asList("MOMO_MSG"));

        // 1.3 从Kafka中获取消息数据
        while(true){

            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));

            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {

                String msg = consumerRecord.value();

                System.out.println(msg);

                //2. 写入HBase
                if(msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20){


                    // 2.3 执行相关的操作: 写入数据
                    // 2.3.1 生成rowkey的数据: MD5HASH_发件人账户_收件人账户_消息时间(时间戳)
                    byte[] rowkey = getRowkey(msg);

                    // 2.3.2 封装一行数据
                    String[] fields = msg.split("\001");

                    Put put = new Put(rowkey);

                    put.addColumn("C1".getBytes(),"msg_time".getBytes(),fields[0].getBytes());
                    put.addColumn("C1".getBytes(),"sender_nickyname".getBytes(),fields[1].getBytes());
                    put.addColumn("C1".getBytes(),"sender_account".getBytes(),fields[2].getBytes());
                    put.addColumn("C1".getBytes(),"sender_sex".getBytes(),fields[3].getBytes());
                    put.addColumn("C1".getBytes(),"sender_ip".getBytes(),fields[4].getBytes());
                    put.addColumn("C1".getBytes(),"sender_os".getBytes(),fields[5].getBytes());
                    put.addColumn("C1".getBytes(),"sender_phone_type".getBytes(),fields[6].getBytes());
                    put.addColumn("C1".getBytes(),"sender_network".getBytes(),fields[7].getBytes());
                    put.addColumn("C1".getBytes(),"sender_gps".getBytes(),fields[8].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_nickyname".getBytes(),fields[9].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_ip".getBytes(),fields[10].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_account".getBytes(),fields[11].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_os".getBytes(),fields[12].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_phone_type".getBytes(),fields[13].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_network".getBytes(),fields[14].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_gps".getBytes(),fields[15].getBytes());
                    put.addColumn("C1".getBytes(),"receiver_sex".getBytes(),fields[16].getBytes());
                    put.addColumn("C1".getBytes(),"msg_type".getBytes(),fields[17].getBytes());
                    put.addColumn("C1".getBytes(),"distance".getBytes(),fields[18].getBytes());
                    put.addColumn("C1".getBytes(),"message".getBytes(),fields[19].getBytes());

                    table.put(put);
                }

            }

        }

    }
    private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    // 此方法用于生成rowkey数据: MD5HASH_发件人账户_收件人账户_消息时间(时间戳)
    private static byte[] getRowkey(String msg) throws Exception{

        //1. 切割数据
        String[] fields = msg.split("\001");

        //2. 获取 发件人账户, 收件人账户  消息时间
        String msgTime = fields[0];
        String sender_account = fields[2];
        String receiver_account = fields[11];

        // 3- 拼接rowkey数据
        //生成 MD5HASH
        String md5Hash = MD5Hash.getMD5AsHex((sender_account+"_"+receiver_account).getBytes()).substring(0,8);

        // 将时间转换为时间戳
        long time = format.parse(msgTime).getTime();

        return (md5Hash+"_"+sender_account+"_"+receiver_account +"_"+time).getBytes();
    }
}

四、陌陌案例_对接Phoenix

-- 创建视图

create view MOMO_CHAT.MOMO_MSG(
    "id" varchar primary key,
    C1."msg_time" varchar,
    C1."sender_nickyname" varchar,
    C1."sender_account" varchar,
    C1."sender_sex" varchar,
    C1."sender_ip" varchar,
    C1."sender_os" varchar,
    C1."sender_phone_type" varchar,
    C1."sender_network" varchar,
    C1."sender_gps" varchar,
    C1."receiver_nickyname" varchar,
    C1."receiver_ip" varchar,
    C1."receiver_account" varchar,
    C1."receiver_os" varchar,
    C1."receiver_phone_type" varchar,
    C1."receiver_network" varchar,
    C1."receiver_gps" varchar,
    C1."receiver_sex" varchar,
    C1."msg_type" varchar,
    C1."distance" varchar,
    C1."message" varchar
);

五、陌陌案例_对接HIVE

create database if not exists MOMO_CHAT;
use MOMO_CHAT;
create external table MOMO_CHAT.MOMO_MSG (
    id string,
    msg_time string,
    sender_nickyname string,
    sender_account string,
    sender_sex string,
    sender_ip string,
    sender_os string,
    sender_phone_type string,
    sender_network string,
    sender_gps string,
    receiver_nickyname string,
    receiver_ip string,
    receiver_account string,
    receiver_os string,
    receiver_phone_type string,
    receiver_network string,
    receiver_gps string,
    receiver_sex string,
    msg_type string,
    distance string,
    message string
)
stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,
C1:sender_nickyname,
C1:sender_account,
C1:sender_sex,
C1:sender_ip,
C1:sender_os,
C1:sender_phone_type,
C1:sender_network,
C1:sender_gps,
C1:receiver_nickyname,
C1:receiver_ip,
C1:receiver_account,
C1:receiver_os,
C1:receiver_phone_type,
C1:receiver_network,
C1:receiver_gps,
C1:receiver_sex,
C1:msg_type,
C1:distance,
C1:message')
tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');

六、陌陌案例_基于Flink 进行实时统计计算

1- 实时统计总消息量
2- 实时统计各个地区发送消息总量
3- 实时统计各个地区接收消息总量
4- 实时统计各个客户发送的消息总量
5- 实时统计各个客户接收的消息总量
在这里插入图片描述

1.创建maven的项目 添加相关的依赖:

		<dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime-web_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-shaded-hadoop-2-uber</artifactId>
            <version>2.7.5-10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>
		<dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_2.11</artifactId>
            <version>1.10.0</version>
        </dependency>

        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.36</version>
        </dependency>
        
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.41</version>
        </dependency>

2.封装pojo类:

package com.itheima.momo_chat.pojo;

public class MoMoCountBean {

    private Integer id ;
    private  Long moMoTotalCount ;
    private String moMoProvince ;
    private String moMoUsername ;
    private Long moMo_MsgCount ;
    private String groupType ;

    public String getGroupType() {
        return groupType;
    }

    public void setGroupType(String groupType) {
        this.groupType = groupType;
    }

    public Integer getId() {
        return id;
    }

    public void setId(Integer id) {
        this.id = id;
    }

    public Long getMoMoTotalCount() {
        return moMoTotalCount;
    }

    public void setMoMoTotalCount(Long moMoTotalCount) {
        this.moMoTotalCount = moMoTotalCount;
    }

    public String getMoMoProvince() {
        return moMoProvince;
    }

    public void setMoMoProvince(String moMoProvince) {
        this.moMoProvince = moMoProvince;
    }

    public String getMoMoUsername() {
        return moMoUsername;
    }

    public void setMoMoUsername(String moMoUsername) {
        this.moMoUsername = moMoUsername;
    }

    public Long getMoMo_MsgCount() {
        return moMo_MsgCount;
    }

    public void setMoMo_MsgCount(Long moMo_MsgCount) {
        this.moMo_MsgCount = moMo_MsgCount;
    }
}

3.封装写入MySQL数据库的类:

package com.itheima.momo_chat.steam;


import com.itheima.momo_chat.pojo.MoMoCountBean;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.*;


public class MysqlSink extends RichSinkFunction<MoMoCountBean> {
    private Statement stat;
    private Connection connection;

    //private String sql;
    private String status;

    public MysqlSink() {
    }

    public MysqlSink(String status) {
        this.status = status;
    }

    /**
     * open() 方法中建立连接,这样不用每次 invoke 的时候都要建立连接和释放连接
     *
     * @param parameters
     * @throws Exception
     */
    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);
        connection = getConnection();

        stat = connection.createStatement();
    }

    @Override
    public void close() throws Exception {
        super.close();
        //关闭连接和释放资源
        if (connection != null) {
            connection.close();
        }
        if (stat != null) {
            stat.close();
        }
    }

    /**
     * 每条数据的插入都要调用一次 invoke() 方法
     *
     * @param value
     * @param context
     * @throws Exception
     */
    @Override
    public void invoke(MoMoCountBean value, Context context) throws Exception {

        if(status.equals("1")){
            String sql = "select * from momo_count where momo_groupType = '1'";
            ResultSet resultSet = stat.executeQuery(sql);
            boolean flag = resultSet.next();

            if(flag) {
                sql = "update momo_count set momo_totalcount= '"+value.getMoMoTotalCount()+ "' where momo_groupType = '1'";
            }else {
                sql = "insert into momo_count( momo_totalcount,momo_groupType) values ("+value.getMoMoTotalCount()+",'1') ";
            }
            stat.executeUpdate(sql);


        }else if (status.equals("2")){
            String sql = "select * from momo_count where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
            ResultSet resultSet = stat.executeQuery(sql);
            boolean flag = resultSet.next();

            if(flag) {
                sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '2' and momo_province= '"+value.getMoMoProvince()+"' ";
            }else {
                sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'2') ";
            }
            stat.executeUpdate(sql);
        }else if (status.equals("3")){
            String sql = "select * from momo_count where momo_groupType = '3' and momo_province= '"+value.getMoMoProvince()+"' ";
            ResultSet resultSet = stat.executeQuery(sql);
            boolean flag = resultSet.next();

            if(flag) {
                sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '3' and momo_province= '"+value.getMoMoProvince()+"' ";
            }else {
                sql = "insert into momo_count( momo_province,momo_msgcount,momo_groupType) values ('"+value.getMoMoProvince()+"',"+value.getMoMo_MsgCount()+",'3') ";
            }
            stat.executeUpdate(sql);

        }else if (status.equals("4")){
            String sql = "select * from momo_count where momo_groupType = '4' and momo_username= '"+value.getMoMoUsername()+"' ";
            ResultSet resultSet = stat.executeQuery(sql);
            boolean flag = resultSet.next();

            if(flag) {
                sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '4' and momo_username= '"+value.getMoMoUsername()+"' ";
            }else {
                sql = "insert into momo_count( momo_username,momo_msgcount,momo_groupType) values ('"+value.getMoMoUsername()+"',"+value.getMoMo_MsgCount()+",'4') ";
            }
            stat.executeUpdate(sql);


        }else if (status.equals("5")){

            String sql = "select * from momo_count where momo_groupType = '5' and momo_username= '"+value.getMoMoUsername()+"' ";
            ResultSet resultSet = stat.executeQuery(sql);
            boolean flag = resultSet.next();

            if(flag) {
                sql = "update momo_count set momo_msgcount= '"+value.getMoMo_MsgCount()+ "' where momo_groupType = '5' and momo_username= '"+value.getMoMoUsername()+"' ";
            }else {
                sql = "insert into momo_count( momo_username,momo_msgcount,momo_groupType) values ('"+value.getMoMoUsername()+"',"+value.getMoMo_MsgCount()+",'5') ";
            }
            stat.executeUpdate(sql);

        }

    }

    private static Connection getConnection() {
        Connection con = null;
        try {
            Class.forName("com.mysql.jdbc.Driver");
            con = DriverManager.getConnection("jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=UTF-8", "root", "123456");
        } catch (Exception e) {
            System.out.println("-----------mysql get connection has exception , msg = "+ e.getMessage());
        }
        return con;
    }
}

4.代码实现,写入MySQL数据库

package com.itheima.momo_chat.steam;
import com.itheima.momo_chat.pojo.MoMoCountBean;
import com.itheima.momo_chat.utils.HttpClientUtils;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.Tuple1;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class MomoFlinkSteam {
    public static void main(String[] args) throws Exception {
        //1. 创建Flink核心类环境类对象
        StreamExecutionEnvironment env =
        StreamExecutionEnvironment.getExecutionEnvironment();
        //2. 设置三大组件
        // 2.1 设置Source组件
        Properties props = new Properties();
        props.setProperty("bootstrap.servers","node1:9092,node2:9092,node3:9092");
        props.setProperty("group.id","MOMO_G2");
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<String>("MOMO_MSG", new
        SimpleStringSchema(), props);
        DataStreamSource<String> streamSource = env.addSource(kafkaConsumer);

        // 2.2 设置转换组件, 对消息数据进行实时处理操作
        //需求一: 实时统计总消息量
        totalMsgCount(streamSource);

        // 需求二: 实时统计各个地区发送的消息量
        totalProvinceSenderMsgCount(streamSource);

        // 需求三: 实时统计各个地区接受的消息量
        totalProvinceReceiverMsgCount(streamSource);

        // 需求四: 时统计各个用户发送的消息量
        totalUserSenderMsgCount(streamSource);

        // 需求五: 时统计各个用户接收的消息量
        totalUserReceiverMsgCount(streamSource);

        //3. 启动Flink程序
        env.execute("FlinkMoMo");
    }


    private static void totalMsgCount(DataStreamSource<String> streamSource) {
        SingleOutputStreamOperator<Tuple1<Long>> streamOperator = streamSource.map(new MapFunction<String, Tuple1<Long>>() {
            @Override
            public Tuple1<Long> map(String msg) throws Exception {
                return new Tuple1<>(1L);
            }
        }).keyBy(0).sum(0);

        SingleOutputStreamOperator<MoMoCountBean> operator = streamOperator.map(new MapFunction<Tuple1<Long>, MoMoCountBean>() {
            @Override
            public MoMoCountBean map(Tuple1<Long> tuple1) throws Exception {
                Long totalMsgCount = tuple1.f0;
                MoMoCountBean moMoCountBean = new MoMoCountBean();
                moMoCountBean.setMoMoTotalCount(totalMsgCount);
                return moMoCountBean;
            }
        });

        // 2.3 设置Sink组件,将数据进行输出到MYSQL
        operator.addSink(new MysqlSink("1"));
    }

    private static void totalProvinceSenderMsgCount(DataStreamSource<String> streamSource) {
        SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String msg) throws Exception {
                return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String msg) throws Exception {
                String[] fields = msg.split("\001");
                String[] latAndLng = fields[8].split(",");
                String lng = latAndLng[0].trim();
                String lat = latAndLng[1].trim();

                String province = HttpClientUtils.findByLatAndLng(lat, lng);

                return new Tuple2<>(province, 1L);
            }
        }).keyBy(0).sum(1);

        SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
            @Override
            public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
                String province = tuple2.f0;
                Long msgCount = tuple2.f1;
                MoMoCountBean moMoCountBean = new MoMoCountBean();
                moMoCountBean.setMoMoProvince(province);
                moMoCountBean.setMoMo_MsgCount(msgCount);
                return moMoCountBean;
            }
        });

        operator.addSink(new MysqlSink("2"));
    }

    private static void totalProvinceReceiverMsgCount(DataStreamSource<String> streamSource) {
        SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String msg) throws Exception {
                return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String msg) throws Exception {
                String[] fields = msg.split("\001");
                String[] latAndLng = fields[15].split(",");
                String lng = latAndLng[0].trim();
                String lat = latAndLng[1].trim();

                String province = HttpClientUtils.findByLatAndLng(lat, lng);

                return new Tuple2<>(province, 1L);
            }
        }).keyBy(0).sum(1);

        SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
            @Override
            public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
                String province = tuple2.f0;
                Long msgCount = tuple2.f1;
                MoMoCountBean moMoCountBean = new MoMoCountBean();
                moMoCountBean.setMoMoProvince(province);
                moMoCountBean.setMoMo_MsgCount(msgCount);
                return moMoCountBean;
            }
        });

        operator.addSink(new MysqlSink("3"));
    }

    private static void totalUserSenderMsgCount(DataStreamSource<String> streamSource) {
        SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String msg) throws Exception {
                return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String msg) throws Exception {
                String[] fields = msg.split("\001");
                String senderName = fields[1];

                return new Tuple2<>(senderName, 1L);
            }
        }).keyBy(0).sum(1);

        SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
            @Override
            public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
                String senderName = tuple2.f0;
                Long msgCount = tuple2.f1;
                MoMoCountBean moMoCountBean = new MoMoCountBean();
                moMoCountBean.setMoMoUsername(senderName);
                moMoCountBean.setMoMo_MsgCount(msgCount);
                return moMoCountBean;
            }
        });

        operator.addSink(new MysqlSink("4"));
    }

    private static void totalUserReceiverMsgCount(DataStreamSource<String> streamSource) {
        SingleOutputStreamOperator<String> filterOperator = streamSource.filter(new FilterFunction<String>() {
            @Override
            public boolean filter(String msg) throws Exception {
                return msg != null && !"".equals(msg.trim()) && msg.split("\001").length == 20;
            }
        });
        SingleOutputStreamOperator<Tuple2<String, Long>> sumOperator = filterOperator.map(new MapFunction<String, Tuple2<String, Long>>() {
            @Override
            public Tuple2<String, Long> map(String msg) throws Exception {
                String[] fields = msg.split("\001");
                String receiverName = fields[9];

                return new Tuple2<>(receiverName, 1L);
            }
        }).keyBy(0).sum(1);

        SingleOutputStreamOperator<MoMoCountBean> operator = sumOperator.map(new MapFunction<Tuple2<String, Long>, MoMoCountBean>() {
            @Override
            public MoMoCountBean map(Tuple2<String, Long> tuple2) throws Exception {
                String receiverName = tuple2.f0;
                Long msgCount = tuple2.f1;
                MoMoCountBean moMoCountBean = new MoMoCountBean();
                moMoCountBean.setMoMoUsername(receiverName);
                moMoCountBean.setMoMo_MsgCount(msgCount);
                return moMoCountBean;
            }
        });

        operator.addSink(new MysqlSink("5"));
    }
}

七、陌陌案例_MySQL数据

再第六步之前先再MySQL创建5各需求所对应的表

CREATE DATABASE /*!32312 IF NOT EXISTS*/`momo` /*!40100 DEFAULT CHARACTER SET utf8mb4 */;

USE `momo`;

/*Table structure for table `momo_count` */

CREATE TABLE `momo_count` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `momo_totalcount` bigint(20) DEFAULT '0' COMMENT '总消息量',
  `momo_province` varchar(20) DEFAULT '-1' COMMENT '省份',
  `momo_username` varchar(20) DEFAULT '-1' COMMENT '用户名',
  `momo_msgcount` bigint(20) DEFAULT '0' COMMENT '消息量',
  `momo_grouptype` varchar(20) DEFAULT '-1' COMMENT '统计类型:1 总消息量 2 各省份发送量 3 各省份接收量 4 各用户发送量 5各用户接收量',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

在这里插入图片描述

九、FineBI集成实时功能

最后将MySQL的数据集成到FineBI做实时化的展示即可。
在这里插入图片描述


总结

这是一个从数据采集到最后前端展示的一个案例。
1.离线部分:flume+kafka+HBase+Hive/phoneix
2:实时部分:flume+kafka+flink+MySQL+FineBI

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/125262.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【python3】10.python高阶内容:装饰器

10.python高阶内容&#xff1a;装饰器 2022.12.28 装饰器装饰器(Decorators)是 Python 的一个重要部分。简单地说&#xff1a;他们是修改其他函数的功能的函数。参数可以是函数&#xff0c;返回值也可以是函数。 10.1 形式 def decorator(fn):def wrapper(name):print(name&…

【Linux进程】进程的基本概念,fork的使用,各种状态的含义,孤儿和僵尸进程的含义

目录 1.进程的基本概念 2.描述进程-PCB 3.fork创建子进程 4.各种状态对应的含义及进程大概的运行原理 5.僵尸进程 ​6.孤儿进程 7.进程的优先级 1.进程的基本概念 2.描述进程-PCB PCB概念:进程信息被放在一个叫做进程控制块的数据结构中&#xff0c;可以理解为进程属性的集…

ChatGPT注册教程

ChatGPT是最近很火的AI智能对话聊天软件。我们是能够去用来进行更加自由的智能内容讨论和聊天。有的小伙伴还不知道怎么注册。 官方地址&#xff1a;chat.openai.com/chat 注册步骤 1、进入官网后&#xff0c;点击注册按钮。 2、点击创建账号。 3、输入自己的邮箱账号。 4、…

整合Kafka

Main Concepts 一些服务器形成了存储层&#xff0c;被称为broker&#xff0c;其他服务器运行kafka连接去不断地导入和导出数据作为事件流&#xff0c;将kafka和关系型数据库等存在的系统集成。 Servers: Kafka is run as a cluster of one or more servers that can span mult…

高手不用Redis内存数据库一

不是说Redis不好&#xff0c;不用Redis用别的(比如:Memcached 2、VoltDB 3、MongoDB 4、Hazelcast 5、Aerospike) No! No! No!!! Redis 很好&#xff0c;我不拦着您用…… 而是说&#xff0c;我们的水平更高了以后&#xff0c;您一定会感受到 内存数据库 不够用、不够灵活、不…

JavaScript基础篇2之日期时间函数

一、计算机中时间字母表示的预知识储备&#xff1a; G&#xff08;age&#xff0c;时代年龄等意思&#xff09;&#xff1a;时代标志&#xff0c;如AD&#xff08;Anno Domini公元&#xff09;、BC&#xff08;Before Christ公元前&#xff09;。 y&#xff08;year&#xff…

zabbix安装使用

1.1 Zabbix概述 Zabbix是一款能够监控各种网络参数以及服务器健康性和完整性的软件。Zabbix使用灵活的通知机制&#xff0c;允许用户为几乎任何事件配置基于邮件的告警。这样可以快速反馈服务器的问题。基于已存储的数据&#xff0c;Zabbix提供了出色的报告和数据可视化功能。…

基于python的turtle实现圣诞树的绘制

文章目录一、前言二、基于turtle实现绘制圣诞树三、效果展示四、实现步骤代码实现分步骤1. 导入库2. 配置圣诞树高度等信息3. 定义函数get_color()可获取随机颜色4. 定义函数snow() 绘制一朵雪花5. 定义函数name()可随机写一些文字6. 定义函数koc() 绘制星星7.定义函数tree()绘…

Qt中调用thrift

thrift是一个Apache公司开源的一款RPC&#xff08;Remote Procedure Call&#xff09;框架&#xff0c;让不同语言构建的服务可以做到远程调用无缝对接。 thrift库分两部分&#xff1a; libthrift - 核心库文件&#xff0c;需要依赖OpenSSL、boost libthriftnb - 包含thrift非阻…

内网穿透基本使用

内网穿透基本使用 文章目录内网穿透基本使用前言一、内网穿透二、工具1.FRP2.LCX3.NPS4.Sunny-Ngork三、Sunny-ngork使用四、Frp内网穿透代理1.一层代理2.二层代理总结前言 之前零零碎碎接触过不少关于内网渗透测试、内网穿透的知识&#xff0c;但是不得不说渗透测试很吃基础、…

初学Java web(十一)AjaxAxiosJSON

Ajax&Axios&JSON 概念:AJAX(Asynchronous JavaScript And XML):异步的JavaScript和XML AJAX作用:1.与服务器进行数据交换&#xff1a;通过AJAX可以给服务器发送请求&#xff0c;并获取服务器响应的数据 使用了AJAX和服务器进行通信&#xff0c;就可以使用HTMLAJAX来替…

ArcGIS基础实验操作100例--实验10绘制带空洞的面要素

本实验专栏来自于汤国安教授《地理信息系统基础实验操作100例》一书 实验平台&#xff1a;ArcGIS 10.6 实验数据&#xff1a;请访问实验1&#xff08;传送门&#xff09; 基础编辑篇--实验10 绘制带空洞的面要素 目录 一、实验背景 二、实验数据 三、实验步骤 方法一&…

【JavaScript】飞机大战简单网页版(源码下载)

文章目录一、效果演示设计思路二、鼠标版飞机大战代码展示1.HTML结构代码2.CSS样式代码3.JavaScript代码js.js文件plane.js文件三、键盘版飞机大战代码展示1.HTML结构代码2.CSS样式代码3.JavaScript代码四、代码资源分享一、效果演示 利用html&#xff0c;css&#xff0c;js制…

qt QCustomplot 用QCPItemStraightLine画参考线,阈值线,水平线

想要在两个坐标系下都画上如下参考线(阈值线&#xff0c;或者 水平线)&#xff0c; 这个参考线随着坐标轴的拖拽能够一直显示 我们找到了QCPItemStraightLine&#xff0c;该类能够画一条无限延伸的直线&#xff0c;通过下面的代码能够实现在A坐标系画一条水平线&#xff0c;但不…

[CF-EDU]Segment Tree - part 1 - Step 1 - Practice

练习名称&#xff1a;ITMO Academy: pilot course Segment Tree 练习链接&#xff1a;Segment Tree, part 1, Step1, Practice cf官方的线段树专题练习 A. Segment Tree for the Sum 单点修改&#xff0c;区间和查询 #include <bits/stdc.h> #define lson (u <&l…

P1825 [USACO11OPEN]Corn Maze S

题目描述 This past fall, Farmer John took the cows to visit a corn maze. But this wasnt just any corn maze: it featured several gravity-powered teleporter slides, which cause cows to teleport instantly from one point in the maze to another. The slides work…

Docker安装镜像,并运行成为容器

1.Docker作用 一个项目中&#xff0c;部署时需要依赖于node.js、Redis、RabbitMQ、MySQL等&#xff0c;这些服务部署时所需要的函数库、依赖项各不相同&#xff0c;甚至会有冲突。给部署带来了极大的困难。 而Docker确巧妙的解决了这些问题, Docker为了解决依赖的兼容问题的…

关于两种单菌种发酵的豆瓣酱代谢组学方面差异研究

生活离不开柴米油盐酱醋茶&#xff0c;其中酱油是中国传统的调味品&#xff0c;主要是由大豆经过发酵酿造而成。酱油由酱演变而来&#xff0c;早在三千多年前&#xff0c;中国就有制作酱的记载了。 本期百趣代谢组学文献分享为大家分享的文献是百趣生物协助客户发表的关于两种…

Git cherry-pick

Git cherry-pick 当有多个分支&#xff0c;想将一个分支 A 的提交合并到另一个分支 B 一&#xff1a;将分支A的所有提交合并到分支B&#xff0c;执行合并即可 二&#xff1a;将分支A 的某一次提交合并到分支 B&#xff0c;需要使用 git cherry-pick commit 命令 如下图&#…

随机森林-sklearn

随机森林 1.概述 1.1 集成算法概述 本身并不是一个单独的机器学习算法&#xff0c;而是通过在数据上构建多个模型&#xff0c;集成所有模型的建模结果。以此来获得最好的结果。 集成算法的目标&#xff1a; 集成算法会考虑多个评估器的建模结果&#xff0c;汇总之后得到一个…