基于Flume+Kafka+Hbase+Flink+FineBI的实时综合案例(五)FineBI可视化

news2024/12/27 16:44:30

文章目录

    • 22:FineBI配置数据集
    • 23:FineBI构建报表
    • 24:FineBI实时配置测试
    • 附录二:离线消费者完整代码

22:FineBI配置数据集

  • 目标实现FineBI访问MySQL结果数据集的配置

  • 实施

    • 安装FineBI

      • 参考《FineBI Windows版本安装手册.docx》安装FineBI

        image-20210906214702837

    • 配置连接

      image-20210906214908806

      image-20210906214943267

      image-20210906215001069

      数据连接名称:Momo
      用户名:root
      密码:自己MySQL的密码
      数据连接URL:jdbc:mysql://node1:3306/momo?useUnicode=true&characterEncoding=utf8
      

      image-20210906215136987

      image-20210906215313596

    • 数据准备

      image-20210906233741527

      image-20210906215517834

      image-20210906215600395

      SELECT  
       id, momo_totalcount,momo_province,momo_username,momo_msgcount,
       CASE momo_grouptype WHEN '1' THEN '总消息量' WHEN '2' THEN '各省份发送量'  WHEN '3' THEN '各省份接收量'
      	WHEN '4' THEN '各用户发送量' WHEN '5' THEN '各用户接收量' END AS momo_grouptype
      FROM  momo_count
      
  • 小结

    • 实现FineBI访问MySQL结果数据集的配置

23:FineBI构建报表

  • 目标实现FineBI实时报表构建

  • 路径

    • step1:实时报表构建
    • step2:实时报表配置
    • step3:实时刷新测试
  • 实施

    • 实时报表构建

      • 新建仪表盘

        image-20210906221339838

        image-20210906221410591

      • 添加标题

        image-20210906221452201

        image-20210906221633739

      • 实时总消息数

        image-20210906225231210

      • 发送消息最多的Top10用户

        image-20210906221821438

        image-20210906222156861

        image-20210906222225524

        image-20210906222300546

        image-20210906222336466

        image-20210906222405217

        image-20210906222544774

        image-20210906222815956

      • 接受消息最多的Top10用户

        image-20210906224107608

        image-20210906224155452

        image-20210906224301084

        image-20210906224422220

      • 各省份发送消息Top10

        image-20210906224657081

        image-20210906224806298

        image-20210906224850783

      • 各省份接收消息Top10

        image-20210906224548114

        image-20210906223310186

        image-20210906223414046

        image-20210906223433477

        image-20210906223453710

        image-20210906223805626

      • 各省份总消息量

        image-20210906225451414

        image-20210906225508401

        image-20210906225557658

        image-20210906230243869

  • 小结

    • 实现FineBI实时报表构建

24:FineBI实时配置测试

  • 目标:实现实时报表测试

  • 实施

    • 实时报表配置

      • 官方文档:https://help.fanruan.com/finebi/doc-view-363.html

      • 添加jar包:将jar包放入FineBI安装目录的 webapps\webroot\WEB-INF\lib目录下

        image-20210906230548177

        • 注意:如果提示已存在,就选择覆盖
      • 添加JS文件

        • 创建js文件:refresh.js

          setTimeout(function () {
           var b =document.title;
           var a =BI.designConfigure.reportId;//获取仪表板id
           //这里要指定自己仪表盘的id
           if (a=="d574631848bd4e33acae54f986d34e69") {
            setInterval(function () {
             BI.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
             //Data.SharingPool.put("controlFilters", BI.Utils.getControlCalculations());
             BI.Utils.broadcastAllWidgets2Refresh(true);
            }, 3000);//5000000为定时刷新的频率,单位ms
           }
          }, 2000)
          
        • 将创建好的refresh.js文件放至 FineBI 安装目录%FineBI%/webapps/webroot中

          image-20210906231356346

        • 关闭FineBI缓存,然后关闭FineBI

          image-20210906231254734

        • 修改jar包,添加js

          image-20210906231519478

          image-20210906231626750

          image-20210906231721464

          image-20210906231735007

          <!-- 增加刷新功能 --> 
          <script type="text/javascript" src="/webroot/refresh.js"></script>
          
        
        
      • 重启FineBI

  • 实时刷新测试

    • 清空MySQL结果表

    • 启动Flink程序:运行MoMoFlinkCount

    • 启动Flume程序

      cd /export/server/flume-1.9.0-bin
      bin/flume-ng agent -c conf/ -n a1 -f usercase/momo_mem_kafka.properties -Dflume.root.logger=INFO,console
      
    • 启动模拟数据

      java -jar /export/data/momo_init/MoMo_DataGen.jar \
      /export/data/momo_init/MoMo_Data.xlsx \
      /export/data/momo_data/ \
      10
      
      
    - 观察报表
    
    

image-20210906235752933

image-20210906235808012

  • 小结

    • 实现FineBI实时测试




## 附录一:Maven依赖

​```xml
  <!--远程仓库-->
  <repositories>
      <repository>
          <id>aliyun</id>
          <url>http://maven.aliyun.com/nexus/content/groups/public/</url>
          <releases><enabled>true</enabled></releases>
          <snapshots>
              <enabled>false</enabled>
              <updatePolicy>never</updatePolicy>
          </snapshots>
      </repository>
  </repositories>
  <dependencies>
      <!--Hbase 客户端-->
      <dependency>
          <groupId>org.apache.hbase</groupId>
          <artifactId>hbase-client</artifactId>
          <version>2.1.0</version>
      </dependency>
      <!--kafka 客户端-->
      <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>2.4.1</version>
      </dependency>
      <!--JSON解析工具包-->
      <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>1.2.62</version>
      </dependency>
      <!--Flink依赖-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-runtime-web_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <!-- flink操作hdfs、Kafka、MySQL、Redis,所需要导入该包-->
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-shaded-hadoop-2-uber</artifactId>
          <version>2.7.5-10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-connector-kafka_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-jdbc_2.11</artifactId>
          <version>1.10.0</version>
      </dependency>
      <dependency>
          <groupId>org.apache.bahir</groupId>
          <artifactId>flink-connector-redis_2.11</artifactId>
          <version>1.0</version>
      </dependency>
      <!--HTTP请求的的依赖-->
      <dependency>
          <groupId>org.apache.httpcomponents</groupId>
          <artifactId>httpclient</artifactId>
          <version>4.5.4</version>
      </dependency>
      <!--MySQL连接驱动-->
      <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>5.1.38</version>
      </dependency>
  </dependencies>

  <build>
      <plugins>
          <plugin>
              <groupId>org.apache.maven.plugins</groupId>
              <artifactId>maven-compiler-plugin</artifactId>
              <version>3.1</version>
              <configuration>
                  <target>1.8</target>
                  <source>1.8</source>
              </configuration>
          </plugin>
      </plugins>
  </build>

附录二:离线消费者完整代码

package bigdata.itcast.cn.momo.offline;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.HBaseConfiguration;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Connection;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.MD5Hash;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;

import java.io.IOException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.time.Duration;
import java.util.*;

/**
 * @ClassName MomoKafkaToHbase
 * @Description TODO 离线场景:消费Kafka的数据写入Hbase
 * @Create By     Maynor
 */
public class MomoKafkaToHbase {

    private  static SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private static Connection conn;
    private static Table table;
    private static TableName tableName = TableName.valueOf("MOMO_CHAT:MOMO_MSG");//表名
    private static byte[] family = Bytes.toBytes("C1");//列族

    //todo:2-构建Hbase连接
    //静态代码块: 随着类的加载而加载,一般只会加载一次,避免构建多个连接影响性能
    static{
        try {
            //构建配置对象
            Configuration conf = HBaseConfiguration.create();
            conf.set("hbase.zookeeper.quorum","node1:2181,node2:2181,node3:2181");
            //构建连接
            conn = ConnectionFactory.createConnection(conf);
            //获取表对象
            table = conn.getTable(tableName);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }


    public static void main(String[] args) throws Exception {
        //todo:1-构建消费者,获取数据
        consumerKafkaToHbase();
//        String momoRowkey = getMomoRowkey("2020-08-13 12:30:00", "13071949728", "17719988692");
//        System.out.println(momoRowkey);
    }

    /**
     * 用于消费Kafka的数据,将合法数据写入Hbase
     */
    private static void consumerKafkaToHbase() throws Exception {
        //构建配置对象
        Properties props = new Properties();
        //指定服务端地址
        props.setProperty("bootstrap.servers", "node1:9092,node2:9092,node3:9092");
        //指定消费者组的id
        props.setProperty("group.id", "momo1");
        //关闭自动提交
        props.setProperty("enable.auto.commit", "false");
        //指定K和V反序列化的类型
        props.setProperty("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        //构建消费者的连接
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        //指定订阅哪些Topic
        consumer.subscribe(Arrays.asList("MOMO_MSG"));
        //持续拉取数据
        while (true) {
            //向Kafka请求拉取数据,等待Kafka响应,在100ms以内如果响应,就拉取数据,如果100ms内没有响应,就提交下一次请求: 100ms为等待Kafka响应时间
            //拉取到的所有数据:多条KV数据都在ConsumerRecords对象,类似于一个集合
            ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
            //todo:3-处理拉取到的数据:打印
            //取出每个分区的数据进行处理
            Set<TopicPartition> partitions = records.partitions();//获取本次数据中所有分区
            //对每个分区的数据做处理
            for (TopicPartition partition : partitions) {
                List<ConsumerRecord<String, String>> partRecords = records.records(partition);//取出这个分区的所有数据
                //处理这个分区的数据
                long offset = 0;
                for (ConsumerRecord<String, String> record : partRecords) {
                    //获取Topic
                    String topic = record.topic();
                    //获取分区
                    int part = record.partition();
                    //获取offset
                    offset = record.offset();
                    //获取Key
                    String key = record.key();
                    //获取Value
                    String value = record.value();
                    System.out.println(topic + "\t" + part + "\t" + offset + "\t" + key + "\t" + value);
                    //将Value数据写入Hbase
                    if(value != null && !"".equals(value) && value.split("\001").length == 20 ){
                        writeToHbase(value);
                    }
                }
                //手动提交分区的commit offset
                Map<TopicPartition, OffsetAndMetadata> offsets = Collections.singletonMap(partition,new OffsetAndMetadata(offset+1));
                consumer.commitSync(offsets);
            }
        }
    }

    /**
     * 用于实现具体的写入Hbase的方法
     * @param value
     */
    private static void writeToHbase(String value) throws Exception {
        //todo:3-写入Hbase
        //切分数据
        String[] items = value.split("\001");
        String stime = items[0];
        String sender_accounter = items[2];
        String receiver_accounter = items[11];
        //构建rowkey
        String rowkey = getMomoRowkey(stime,sender_accounter,receiver_accounter);
        //构建Put
        Put put = new Put(Bytes.toBytes(rowkey));
        //添加列
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_time"),Bytes.toBytes(items[0]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_nickyname"),Bytes.toBytes(items[1]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_account"),Bytes.toBytes(items[2]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_sex"),Bytes.toBytes(items[3]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_ip"),Bytes.toBytes(items[4]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_os"),Bytes.toBytes(items[5]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_phone_type"),Bytes.toBytes(items[6]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_network"),Bytes.toBytes(items[7]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("sender_gps"),Bytes.toBytes(items[8]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_nickyname"),Bytes.toBytes(items[9]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_ip"),Bytes.toBytes(items[10]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_account"),Bytes.toBytes(items[11]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_os"),Bytes.toBytes(items[12]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_phone_type"),Bytes.toBytes(items[13]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_network"),Bytes.toBytes(items[14]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_gps"),Bytes.toBytes(items[15]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("receiver_sex"),Bytes.toBytes(items[16]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("msg_type"),Bytes.toBytes(items[17]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("distance"),Bytes.toBytes(items[18]));
        put.addColumn(Bytes.toBytes("C1"),Bytes.toBytes("message"),Bytes.toBytes(items[19]));
        //执行写入
        table.put(put);
    }

    /**
     * 基于消息时间、发送人id、接受人id构建rowkey
     * @param stime
     * @param sender_accounter
     * @param receiver_accounter
     * @return
     * @throws Exception
     */
    private static String getMomoRowkey(String stime, String sender_accounter, String receiver_accounter) throws Exception {
        //转换时间戳
        long time = format.parse(stime).getTime();
        String suffix = sender_accounter+"_"+receiver_accounter+"_"+time;
        //构建MD5
        String prefix = MD5Hash.getMD5AsHex(Bytes.toBytes(suffix)).substring(0,8);
        //合并返回
        return prefix+"_"+suffix;
    }
}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1110405.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

使用Nginx可视化管理工具+Cpolar在本地搭建服务器并实现远程访问【内网穿透】

文章目录 前言1. docker 一键安装2. 本地访问3. Linux 安装cpolar4. 配置公网访问地址5. 公网远程访问6. 固定公网地址 前言 Nginx Proxy Manager 是一个开源的反向代理工具&#xff0c;不需要了解太多 Nginx 或 Letsencrypt 的相关知识&#xff0c;即可快速将你的服务暴露到外…

【MyBatis】mvc模式以及Mapper文件中的namespace以及ORM思想

目录 什么是MVC三层架构&#xff0c;初步了解&#xff1f; namespace的作用是什么&#xff1f; Mapper文件中的namespace&#xff1f; ORM思想&#xff08;对象关系映射思想&#xff09; 其中提供了一套映射规则和API 什么是MVC三层架构&#xff0c;初步了解&#xff1f; 三…

谷歌浏览器多版本切换测试兼容性

谷歌浏览器多版本切换测试兼容性 在开发过程中&#xff0c;我们常常会出现浏览器兼容问题&#xff0c;客户的浏览器版本参差不齐&#xff0c;只有对应版本的浏览器才会出现对应的问题&#xff0c;所以我们需要在本地通过切换不同的浏览器来测试对应的问题。本篇内容就是介绍不用…

投资理财:增额终身寿的优点和缺点

大家好&#xff0c;我是财富智星&#xff0c;今天跟大家继续探讨一下最近理财爆火的增额终身寿&#xff0c;你是真的了解增额终身寿的本质吗&#xff1f; 一、增额寿3.0%的利率真的吸引人吗&#xff1f; 身边有很多富有的成功人士以及财经博主都开始购买增额终身寿保险&#xf…

中国人民大学与加拿大女王大学金融硕士庞雪雨:行学之道,在自律、在勤勉、在止于至善

庞雪雨 中国人民大学-加拿大女王大学金融硕士2022-2023级行业高管班 光大保德信资产管理有限公司董事总经理 当我进入到人大校园的那一刻&#xff0c;映入眼帘的是明德楼&#xff0c;由此我想到了《大学》&#xff0c;大学开篇中讲到&#xff0c;大学之道&#xff0c;在明明德…

Maven 基础教程系列

Maven是一个项目开发管理和理解工具。基于项目对象模型的概念&#xff1a;构建、依赖关系管理、文档创建、站点发布和分发发布都由pom.xml声明性文件控制。Maven可以通过插件进行扩展&#xff0c;以使用许多其他开发工具来报告或构建过程。 一、Maven 使用教程-CSDN博客 二、…

电源芯片测试规范是什么?如何测试电源芯片输入电压范围?

电源芯片测试贯穿着研发、设计、生产过程的始终&#xff0c;目的就是为了通过反复检测来确保电源芯片的性能、质量和可靠性&#xff0c;保证正常工作运行。电源芯片测试涉及到许多测试项目&#xff0c;并且有着具体的测试规范标准和方法。本文纳米软件将介绍电源芯片输入电压范…

Flutter视图原理之StatefulWidget,InheritedWidget

目录 StatefulElement1. 构造函数2. build3. _firstBuild3. didChangeDependencies4. setState InheritedElement1. Element类2. _updateInheritance3. InheritedWidget数据向下传递3.1 dependOnInheritedWidgetOfExactType 4. InheritedWidget的状态绑定4.1. ProxyElement 在f…

为中小企业的网络推广策略解析:扩大品牌知名度和曝光度

目前网络推广已经成为企业获取潜在客户和提升品牌知名度的重要手段。对于中小企业而言&#xff0c;网络推广是一个具有巨大潜力和可行性的营销策略。在本文中&#xff0c;我们将探讨中小企业为什么有必要进行网络推广&#xff0c;并分享一些实用的网络推广策略。 一、扩大品牌知…

【Go之道】探索Go语言之旅:基础与进阶指南

在这个数字化快速发展的时代&#xff0c;掌握一门编程语言已成为必备技能。今天&#xff0c;我将带你踏上【Go之道】&#xff0c;探索Go语言的魅力&#xff0c;为你的编程之旅助力。 一、Go语言概述 Go&#xff0c;又称为Golang&#xff0c;是由Google设计和开发的一种静态类型…

如何提高嵌入式软件工程师的技术深度?

今日话题&#xff0c;如何提高嵌入式软件工程师的技术深度&#xff1f;建立坚实的基础知识是深入研究的关键。只有深入理解基础知识&#xff0c;才能在理论指导下不断深化和扩展自己的技术。没有坚实的基础&#xff0c;深入研究就显得空中楼阁。如果你有兴趣进入嵌入式行业我可…

WebDAV之π-Disk派盘 + Autosync

Autosync如何设置可以让各种云储存软件能够自动的同步备份手机上面的各种内容,让你手机当中的重要文件内容能够得到保存,让你的重要文件不会在手机上面丢失,随时都能够在各种云储存当中找到?快来试下Autosync自动同步工具吧。 Autosync是一款自动文件同步和备份工具。 它能…

StyleCLIP global direction详解

StyleCLIP中global direction的实现原理 前言第一阶段:预计算生成latents计算均值与标准差计算 Δ i \Delta i Δi 第二阶段:计算与文本的对应关系 前言 基于的假设&#xff1a; CLIP中虽然图像特征与文本特征不存在一一对应的关系&#xff0c;但相同的语义下&#xff0c;图像…

iOS上架App Store的全攻略

​ 第一步&#xff1a;申请开发者账号 在开始将应用上架到App Store之前&#xff0c;你需要申请一个开发者账号。 1.1 打开苹果开发者中心网站&#xff1a;Apple Developer 1.2 使用Apple ID和密码登录&#xff08;如果没有账号则需要注册&#xff09;&#xff0c;要确保使用…

会议OA项目-其它页面->自定义组件应用,其它界面的布局

1.自定义组件应用 文档参考:https://developers.weixin.qq.com/miniprogram/dev/framework/custom-component/ //oamin\project.config.json {"description": "项目配置文件","packOptions": {"ignore": [],"include": []},…

聊聊BOM的基础概念、管理难点

物料清单&#xff08;Bill of Materials&#xff0c;简称BOM&#xff09;是描述产品组成结构的信息数据。BOM信息是制造信息化/数字化管理的最核心的基础数据&#xff0c;BOM信息贯穿从产品设计、生产计划制定、物料采购和销售服务等制造全业务流程&#xff0c;是开展生产活动的…

vue 插槽 - 具名插槽

vue 插槽 - 具名插槽 **创建 工程&#xff1a; H:\java_work\java_springboot\vue_study ctrl按住不放 右键 悬着 powershell H:\java_work\java_springboot\js_study\Vue2_3入门到实战-配套资料\01-随堂代码素材\day05\准备代码\09-插槽-具名插槽 vue --version vue create…

兼容支付宝抖音小程序的工具还能把他们迁移到自己的app

事情的起因是这样的。 之前在微信、支付宝和抖音开放平台都上架了自己的小程序&#xff0c;虽然几个平台有自己的开发标准&#xff0c;但是都是基于 JavaScript 这种已经被广泛使用的编程语言进行开发的&#xff0c;对于开发者而言学习的门槛并不高&#xff0c;也很容易进行开…

标准的Gabor滤波器及Log_Gabor滤波器的实现、解析、速度优化及其和Halcon中gen_gabor的比较。

最近有朋友在研究Halcon中gen_gabor的函数&#xff0c;和我探讨&#xff0c;因为我之前也没有怎么去关注这个函数&#xff0c;因此&#xff0c;前前后后大概也折腾了有一个星期去模拟实现这个东西&#xff0c;虽然最终没有实现这个函数&#xff0c;但是也是有所收获&#xff0c…