基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(三)离线分析

news2024/11/18 22:49:32

文章目录

    • 08:离线分析:Hbase表设计及构建
    • 09:离线分析:Kafka消费者构建
    • 10:离线分析:Hbase连接构建
    • 11:离线分析:Rowkey的构建
    • 12:离线分析:Put数据列构建
    • 13:离线分析:存储运行测试
    • 14:离线分析:Hive关联测试
    • 15:离线分析:Phoenix关联测试

08:离线分析:Hbase表设计及构建

  • 目标掌握Hbase表的设计及创建表的实现

  • 路径

    • step1:基础设计
    • step2:Rowkey设计
    • step3:分区设计
    • step4:建表
  • 实施

    • 基础设计

      • Namespace:MOMO_CHAT

      • Table:MOMO_MSG

      • Family:C1

      • Qualifier:与数据中字段名保持一致

        image-20210905200550740

    • Rowkey设计

      • 查询需求:根据发件人id + 收件人id + 消息日期 查询聊天记录

        • 发件人账号
        • 收件人账号
        • 时间
      • 设计规则:业务、唯一、长度、散列、组合

      • 设计实现

        • 加盐方案:CRC、Hash、MD5、MUR
        • => 8位、16位、32位
        MD5Hash【发件人账号_收件人账号_消息时间 =》 8位】_发件人账号_收件人账号_消息时间
        
    • 分区设计

      • Rowkey前缀:MD5编码,由字母和数字构成
      • 数据并发量:高
      • 分区设计:使用HexSplit16进制划分多个分区
    • 建表

      • 启动Hbase:start-hbase.sh
      • 进入客户端:hbase shell
      #创建NS
      create_namespace 'MOMO_CHAT'
      #建表
      create 'MOMO_CHAT:MOMO_MSG', {NAME => "C1", COMPRESSION => "GZ"}, { NUMREGIONS => 6, SPLITALGO => 'HexStringSplit'}
      

      image-20210905192807020

  • 小结

    • 掌握Hbase表的设计及创建表的实现

09:离线分析:Kafka消费者构建

  • 目标实现离线消费者的开发

  • 路径

    • 整体实现的路径

      //入口:调用实现消费Kafka,将数据写入Hbase
      public void main(){
          //step1:消费Kafka
          consumerKafka();
          
      }
      
      //用于消费Kafka数据
      public void consumerKafka(){
          prop = new Properties()
      	KafkaConsumer consumer = new KafkaConsumer(prop)
          consumer.subscribe("MOMO_MSG")
          ConsumerRecords  records = consumer.poll
          //基于每个分区来消费和处理
              recordTopicPartitionOffsetKeyValue
          	//step2:写入Hbase
              writeToHbase(value)
          //提交这个分区的offset
           commitSycn(offset+1)
      }
      
      
      //用于将value的数据写入Hbase方法
      public void writeToHbase(){
          //step1:构建连接
          //step2:构建Table对象
          //step3:构建Put对象
          //获取rowkey
         rowkey = getRowkey(value)
          Put put = new Put(rowkey)
          put.添加每一列
          table.put()
      }
      
      public String getRowkey(){
          value.getSender
          value.getReceiver
          value.getTime
              rowkey = MD5+sender+receiverId +time
              return rowkey
      }
      
  • 实施

        /**
         * 用于消费Kafka的数据,将合法数据写入Hbase
         */
        private static void consumerKafkaToHbase() throws Exception {
            //构建配置对象
            Properties props = new Properties();
            //指定服务端地址
            props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
            //指定消费者组的id
            props.setProperty("group.id", "momo");
            //关闭自动提交
            props.setProperty("enable.auto.commit", "false");
            //指定K和V反序列化的类型
            props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
            //构建消费者的连接
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            //指定订阅哪些Topic
            consumer.subscribe(Arrays.asList("MOMO_MSG"));
            //持续拉取数据
            while (true) {
                //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
                //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
                ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
                //todo:3-处理拉取到的数据:打印
                //取出每个分区的数据进行处理
                Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
                //对每个分区的数据做处理
                for (TopicPartition partition : partitions) {
                    List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
                    //处理这个分区的数据
                    long offset = 0;
                    for (ConsumerRecord<String, String> record : partRecords) {
                        //获取Topic
                        String topic = record.topic();
                        //获取分区
                        int part = record.partition();
                        //获取offset
                        offset = record.offset();
                        //获取Key
                        String key = record.key();
                        //获取Value
                        String value = record.value();
                        System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                        //将Value数据写入Hbase
                        if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                            writeToHbase(value);
                        }
                    }
                    //手动提交分区的commit offset
                    Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                    consumer.commitSync(offsets);
                }
            }
        }
    
  • 小结

    • 实现离线消费者的开发

10:离线分析:Hbase连接构建

  • 目标实现Hbase连接的构建

  • 实施

        private static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    	private static Connection conn;
        private static Table table;
        private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
        private static byte[] family = Bytes.toBytes("C1");//列族
    
        // 静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
        static{
            try {
                //构建配置对象
                Configuration conf = HBaseConfiguration.create();
                conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
                //构建连接
                conn = ConnectionFactory.createConnection(conf);
                //获取表对象
                table = conn.getTable(tableName);
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
  • 小结

    • 实现Hbase连接的构建

11:离线分析:Rowkey的构建

  • 目标实现Rowkey的构建

  • 实施

    private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
            //转换时间戳
            long time = format.parse(stime).getTime();
            String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
            //构建MD5
            String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
            //合并返回
            return prefix+"_"+suffix;
        }
    
  • 小结

    • 实现Rowkey的构建

12:离线分析:Put数据列构建

  • 目标实现Put数据列的构建

  • 实施

    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
    put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
    
  • 小结

    • 实现Put数据列的构建

13:离线分析:存储运行测试

  • 目标测试运行消费Kafka数据动态写入Hbase

  • 实施

    • 启动消费者程序

    • 启动Flume程序

      cd /export/server/flume-1.9.0-bin
      bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
      
    • 启动模拟数据

      java -jar /export/data/momo_init/MoMo_DataGen.jar \
      /export/data/momo_init/MoMo_Data.xlsx \
      /export/data/momo_data/ \
      10
      
    • 观察Hbase结果

      image-20210905213457245

  • 小结

    • 测试运行消费Kafka数据动态写入Hbase

14:离线分析:Hive关联测试

  • 目标使用Hive关联Hbase实现离线分析

  • 路径

    • step1:关联
    • step2:查询
  • 实施

    • 启动Hive和yarn

      start-yarn.sh
      hive-daemon.sh metastore
      hive-daemon.sh hiveserver2
      start-beeline.sh
      
    • 关联

      create database MOMO_CHAT;
      use MOMO_CHAT;
      create external table if not exists MOMO_CHAT.MOMO_MSG (
        id string,
        msg_time string ,
        sender_nickyname string , 
        sender_account string , 
        sender_sex string , 
        sender_ip string ,
        sender_os string , 
        sender_phone_type string ,
        sender_network string , 
        sender_gps string , 
        receiver_nickyname string ,
        receiver_ip string ,
        receiver_account string ,
        receiver_os string ,
        receiver_phone_type string ,
        receiver_network string ,
        receiver_gps string ,
        receiver_sex string ,
        msg_type string ,
        distance string ,
        message string 
      ) stored by 'org.apache.hadoop.hive.hbase.HBaseStorageHandler' 
      with serdeproperties('hbase.columns.mapping'=':key,C1:msg_time,C1:sender_nickyname, 
      C1:sender_account,C1:sender_sex,C1:sender_ip,C1:sender_os,C1:sender_phone_type,
      C1:sender_network,C1:sender_gps,C1:receiver_nickyname,C1:receiver_ip,C1:receiver_account,
      C1:receiver_os,C1:receiver_phone_type,C1:receiver_network,C1:receiver_gps,C1:receiver_sex,
      C1:msg_type,C1:distance,C1:message ') tblproperties('hbase.table.name'='MOMO_CHAT:MOMO_MSG');
      
    • 分析查询

      --基础查询
      select 
        msg_time,sender_nickyname,receiver_nickyname,distance 
      from momo_msg limit 10;
      
      --查询聊天记录:发送人id + 接收人id + 日期:1f300e5d_13280256412_15260978785_1632888342000
      select 
        * 
      from momo_msg 
      where sender_account='13280256412' 
      and receiver_account='15260978785' 
      and substr(msg_time,0,10) = '2021-09-29';
      
      --统计每个小时的消息数
      select
        substr(msg_time,0,13) as hour,
        count(*) as cnt
      from momo_msg
      group by substr(msg_time,0,13);
      
  • 小结

    • 使用Hive关联Hbase实现离线分析

15:离线分析:Phoenix关联测试

  • 目标使用Phoenix关联Hbase实现即时查询

  • 路径

    • step1:关联
    • step2:查询
  • 实施

    • 启动

      cd /export/server/phoenix-5.0.0-HBase-2.0-bin/
      bin/sqlline.py node1:2181
      
    • 关联

      create view if not exists MOMO_CHAT.MOMO_MSG (
        "id" varchar primary key,
        C1."msg_time" varchar ,
        C1."sender_nickyname" varchar , 
        C1."sender_account" varchar , 
        C1."sender_sex" varchar , 
        C1."sender_ip" varchar ,
        C1."sender_os" varchar , 
        C1."sender_phone_type" varchar ,
        C1."sender_network" varchar , 
        C1."sender_gps" varchar , 
        C1."receiver_nickyname" varchar ,
        C1."receiver_ip" varchar ,
        C1."receiver_account" varchar ,
        C1."receiver_os" varchar ,
        C1."receiver_phone_type" varchar ,
        C1."receiver_network" varchar ,
        C1."receiver_gps" varchar ,
        C1."receiver_sex" varchar ,
        C1."msg_type" varchar ,
        C1."distance" varchar ,
        C1."message" varchar
      );
      
    • 即时查询

      --基础查询
      select 
        "id",c1."sender_account",c1."receiver_account" 
      from momo_chat.momo_msg 
      limit 10;
      
      --查询每个发送人发送的消息数
      select 
        c1."sender_account" ,
        count(*) as cnt 
      from momo_chat.momo_msg 
      group by c1."sender_account";
      
      --查询每个发送人聊天的人数
      select 
        c1."sender_account" ,
        count(distinct c1."receiver_account") as cnt 
      from momo_chat.momo_msg 
      group by c1."sender_account" 
      order by cnt desc;
      
  • 小结

    • 使用Phoenix关联Hbase实现即时查询

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1089135.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Spring自动装配源码分析

写在前面&#xff1a;阅读spring源码需要读者对Java反射和动态代理有一定了解。关于这部分内容&#xff0c;可以参考这篇博客&#xff1a;Spring源码分析准备工作及java知识补充 一、Spring依赖注入的方式 关于spring的依赖注入&#xff0c;可以参考官方文档&#xff1a;Spring…

注意力屏蔽(Attention Masking)在Transformer中的作用 【gpt学习记录】

填充遮挡&#xff08;Padding Masking&#xff09;&#xff1a; 未来遮挡&#xff08;Future Masking&#xff09;&#xff1a;

近地面无人机植被定量遥感与生理参数反演

目录 专题一 近十年近地面无人机植被遥感文献分析、传感器选择、观测方式及质量控制要点 专题二 辐射度量与地物反射特性 专题三 无人机遥感影像辐射与几何处理 专题四 光在植被叶片与冠层中的辐射传输机理及平面模型应用 专题五 植被覆盖度与叶面积指数遥感估算 更多应用…

sylar高性能服务器-日志(P7-P8)代码解析

文章目录 p71.TabFormatItem2.init函数&#xff0c;对于{}内容的解析3.Util.h4.CmakeLists5.优化日志输出-流式输出 p81.优化日志输出-格式化输出2.日志管理器3.单例模型设计 测试(无调试步骤) P7P8两节视频新增内容不多&#xff0c;主要看下优化日志输出使用的宏函数。本次记录…

python获取网口列表(获取网络接口列表、网口表)socket.if_nameindex()

文章目录 获取网口列表测试 获取网口列表 以下python代码将打印系统中所有存在的网络接口列表&#xff1a; import socketdef print_interfaces_list():# 打印所有的网络接口列表available_interfaces socket.if_nameindex()# 转换成字典形式 {if_index: if_name}available_…

6个视频剪辑必备的素材网站,免费下载。

视频剪辑必备的视频资源、音效素材、BGM&#xff0c;这6个网站全部免费下载&#xff0c;赶紧收藏起来吧&#xff01; 1、菜鸟图库 https://www.sucai999.com/video.html?vNTYxMjky 菜鸟图库网素材非常丰富&#xff0c;网站主要还是以设计类素材为主&#xff0c;高清视频素材…

TSINGSEE青犀智慧广场智能监控解决方案,助力广场监控数字化转型

前期和大家说过世界最大的城市公园——合肥市骆岗公园的监控方案&#xff0c;大家都很感兴趣&#xff0c;后台还有粉丝留言想看看广场类场景的智能方案&#xff0c;今天小编就和大家聊一聊。 广场视频监控方案大体和公园场景类似&#xff0c;但由于广场比公园更加空旷&#xf…

centos 里面的service自启动app.jar,出现两个java进程,app是同一个端口

当使用jps -lv查看java虚拟机进程 app.jar启动后&#xff0c;居然出现两个启动进程&#xff0c;而且他们的端口都一样&#xff0c;同一端口&#xff0c;是不允许启动两个相同app的。 使用进程ps查看进程工具 #ps -aux 参数说明&#xff1a; a: 显示跟当前终端关联的所有进…

到2026年,超过80%企业将使用生成式AI

10月12日&#xff0c;全球著名信息咨询调查机构Gartner在官网&#xff0c;公布了一项调查数据&#xff0c;到2026年&#xff0c;超过80%的企业将使用生成式AI API&#xff0c;或部署生成式AI的应用程序。而2023年这一比例还不到5%。 Gartner副总裁兼高级分析师 Arun Chandrase…

【约束布局】ConstraintLayout配合Guideline解决两个子控件其中一个被挤出屏幕的问题

一、需求 屏幕横向显示文本框A和图标B&#xff0c;A在B的左侧&#xff0c;B紧贴在A的右边显示&#xff0c;文本框A的字数不确定&#xff0c;文本框A的字数足够多时&#xff0c;换行显示&#xff0c;并且保证图标B一直在文本框A的右侧&#xff0c;且不被挤出屏幕。 二、问题 本来…

Java Object转String方式

Map<String,Object> map new HashMap<>(); map.put("a1","a"); map.put("a2",""); map.put("a3",1); map.put("a4",null);一、强制转换 value "a"或""可以进行强制转换String…

众佰诚:新手开抖音小店申请流程是什么

抖音小店为抖音平台上的商家提供了一个全新的销售渠道&#xff0c;让更多创业者能够轻松实现线上销售。如果你是一位希望在抖音上开展电商业务的新手&#xff0c;下面将为你详细介绍如何申请开通抖音小店。 一、准备工作 首先&#xff0c;你需要准备好以下材料&#xff1a; 营业…

数学术语之源——代数——(子空间的)直和(direct sum)

1. 关于(子空间的)直和(direct sum)的较正式定义 令 为向量空间 的子空间,若 且 是独立的&#xff0c;则称 是子空间 的直和(direct sum), 记为 &#xff0c; 这种表示在同一个基的前提下是唯一的。 一个直观几何类比理解(个人愚见)&#xff1a;如果我将向量空间V 看…

计算机基础——内存

文章目录 内存一、内存条、总线、DMA二、内存管理1、为什么要有逻辑地址2、逻辑地址和物理地址如何映射3、分页时间和空间优化4、程序内部的内存管理-分段 三、内存相关的系统调用1、用户态和内核态 四、Java内存 内存 提示&#xff1a;这里可以添加本文要记录的大概内容&…

JS+Jquery用法

1. 当存在多个select时&#xff0c;想要获取每一个select的选中的值(使用变量赋值的方法). var Metric "";$(#Metric).change(function () {Metric $(this).children("option:selected").val();console.log("Metric:" Metric);}); 2. 在页面…

海外代理IP与VPN有何区别?哪个更好?

当谈到网络安全和IP变更时&#xff0c;人们会想到VPN和IP代理服务器。很多人很困惑&#xff0c;它们之间有什么区别&#xff0c;应该选择哪一个呢&#xff1f;这取决于您的需求来决定哪一个更好。 一、什么是VPN与IP代理&#xff1f; VPN 是虚拟专用网络 (Virtual Private Net…

ACP.项目管理.5种复盘会议

复盘要怎么做的有水准&#xff0c;让领导满意&#xff0c;方式方法很重要。今天给你们安利5种复盘方法&#xff0c;保准你省事&#xff0c;领导还满意。 一、KPT复盘法 7月份年中一直在做和复盘相关的事&#xff0c;像公司的OKR复盘、年中战略规划&#xff0c;不过日常很多生…

Hadoop 安装教程 (Mac m1/m2版)

安装JDK1.8 这里最好是安装1.8版本的jdk 1. 进入官网Java Downloads | Oracle Hong Kong SAR, PRC,下滑到中间区域找到JDK8 2.选择mac os,下载ARM64 DMG Installer对应版本 注&#xff1a;这里下载需要注册oracle账号&#xff0c;不过很简单&#xff0c;只需要提供邮箱即可&…

【C++】模板进阶 -- 详解

一、非类型模板参数 模板参数 分类类型形参与非类型形参。 类型形参&#xff0c;即出现在模板参数列表中&#xff0c;跟在 class 或者 typename 之类的参数类型名称。 非类型形参&#xff0c;就是用一个常量作为类&#xff08;函数&#xff09;模板的一个参数&#xff0c;在类…

基于SSM的毕业生就业管理平台设计与实现

末尾获取源码 开发语言&#xff1a;Java Java开发工具&#xff1a;JDK1.8 后端框架&#xff1a;SSM 前端&#xff1a;采用JSP技术开发 数据库&#xff1a;MySQL5.7和Navicat管理工具结合 服务器&#xff1a;Tomcat8.5 开发软件&#xff1a;IDEA / Eclipse 是否Maven项目&#x…