尚硅谷大数据项目《在线教育之实时数仓》笔记004

news2024/11/18 9:43:21

视频地址:尚硅谷大数据项目《在线教育之实时数仓》_哔哩哔哩_bilibili

目录

第8章 数仓开发之DIM层

P024

P025

P026

P027

P028

P029

P030


第8章 数仓开发之DIM层

P024

package com.atguigu.edu.realtime.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.*;

public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    private MapStateDescriptor<String, DimTableProcess> tableProcessState;

    // 初始化配置表数据
    private HashMap<String, DimTableProcess> configMap = new HashMap<>();

    public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {
        this.tableProcessState = tableProcessState;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
                "user=root&password=123456&useUnicode=true&" +
                "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
        );

        PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
        ResultSet resultSet = preparedStatement.executeQuery();
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            JSONObject jsonObject = new JSONObject();
            for (int i = 1; i <= metaData.getColumnCount(); i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                jsonObject.put(columnName, columnValue);
            }
            DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
            configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
        }
        resultSet.close();
        preparedStatement.close();
        connection.close();
    }

    /**
     * @param value flinkCDC直接输入的json
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取配置表数据解析格式
        JSONObject jsonObject = JSON.parseObject(value);
        String type = jsonObject.getString("op");
        BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);
        if ("d".equals(type)) {
            // 从状态中删除对应的表格
            DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
            tableConfigState.remove(before.getSourceTable());
            // 从configMap中删除对应的表格
            configMap.remove(before.getSourceTable());
        } else {
            DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
            //TODO 3 将数据写入到状态 广播出去
            tableConfigState.put(after.getSourceTable(), after);
            //TODO 2 检查phoenix中是否存在表 不存在创建
            String sinkTable = after.getSinkTable();
            String sinkColumns = after.getSinkColumns();
            String sinkPk = after.getSinkPk();
            String sinkExtend = after.getSinkExtend();
            checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
        }
    }

    /**
     * @param value kafka中maxwell生成的json数据
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取广播的配置数据

        //TODO 2 过滤出需要的维度字段

        //TODO 3 补充输出字段
    }
}

P025

8.3.2 根据MySQL的配置表,动态进行分流

6)创建连接池工具类

package com.atguigu.edu.realtime.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.*;

public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    private MapStateDescriptor<String, DimTableProcess> tableProcessState;

    // 初始化配置表数据
    private HashMap<String, DimTableProcess> configMap = new HashMap<>();

    public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {
        this.tableProcessState = tableProcessState;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
                "user=root&password=123456&useUnicode=true&" +
                "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
        );

        PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
        ResultSet resultSet = preparedStatement.executeQuery();
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            JSONObject jsonObject = new JSONObject();
            for (int i = 1; i <= metaData.getColumnCount(); i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                jsonObject.put(columnName, columnValue);
            }
            DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
            configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
        }
        resultSet.close();
        preparedStatement.close();
        connection.close();
    }

    /**
     * @param value flinkCDC直接输入的json
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取配置表数据解析格式
        JSONObject jsonObject = JSON.parseObject(value);
        String type = jsonObject.getString("op");
        BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);
        if ("d".equals(type)) {
            // 从状态中删除对应的表格
            DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
            tableConfigState.remove(before.getSourceTable());
            // 从configMap中删除对应的表格
            configMap.remove(before.getSourceTable());
        } else {
            DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
            //TODO 3 将数据写入到状态 广播出去
            tableConfigState.put(after.getSourceTable(), after);
            //TODO 2 检查phoenix中是否存在表 不存在创建
            String sinkTable = after.getSinkTable();
            String sinkColumns = after.getSinkColumns();
            String sinkPk = after.getSinkPk();
            String sinkExtend = after.getSinkExtend();
            checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
        }
    }

    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
        // create table if not exists table (id string pk, name string...)
        // 拼接建表语句的sql
        StringBuilder sql = new StringBuilder();
        sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");

        // 判断主键
        // 如果主键为空,默认使用id
        if (sinkPk == null) {
            sinkPk = "";
        }
        if (sinkExtend == null) {
            sinkExtend = "";
        }

        // 遍历字段拼接建表语句
        String[] split = sinkColumns.split(",");
        for (int i = 0; i < split.length; i++) {
            sql.append(split[i] + " varchar");
            if (split[i].equals(sinkPk)) {
                sql.append(" primary key");
            }
            if (i < split.length - 1) {
                sql.append(",\n");
            }
        }
        sql.append(") ");
        sql.append(sinkExtend);

        PhoenixUtil.executeDDL(sql.toString());
    }

    /**
     * @param value kafka中maxwell生成的json数据
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取广播的配置数据

        //TODO 2 过滤出需要的维度字段

        //TODO 3 补充输出字段
    }
}

P026

P027

启动hadoop、zookeeper、kafka、hbase。p41

[atguigu@node001 ~]$ myhadoop.sh start
 ================ 启动 hadoop集群 ================
 ---------------- 启动 hdfs ----------------
Starting namenodes on [node001]
Starting datanodes
Starting secondary namenodes [node003]
 --------------- 启动 yarn ---------------
Starting resourcemanager
Starting nodemanagers
 --------------- 启动 historyserver ---------------
[atguigu@node001 ~]$ zookeeper.sh start
---------- zookeeper node001 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node002 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
---------- zookeeper node003 启动 ----------
ZooKeeper JMX enabled by default
Using config: /opt/module/zookeeper/zookeeper-3.5.7/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ kafka.sh start
--------------- node001 Kafka 启动 ---------------
--------------- node002 Kafka 启动 ---------------
--------------- node003 Kafka 启动 ---------------
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ 
[atguigu@node001 ~]$ start-hbase.sh
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/hbase-2.0.5/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node001.out
node002: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node002.out
node003: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node003.out
node001: running regionserver, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-regionserver-node001.out
node002: running master, logging to /opt/module/hbase/hbase-2.0.5/logs/hbase-atguigu-master-node002.out
[atguigu@node001 ~]$ jpsall 
================ node001 ================
4880 HMaster
4615 Kafka
4183 QuorumPeerMain
3017 DataNode
2858 NameNode
5083 HRegionServer
3676 JobHistoryServer
3454 NodeManager
5406 Jps
================ node002 ================
2080 ResourceManager
4050 Jps
3397 Kafka
3574 HRegionServer
2966 QuorumPeerMain
3719 HMaster
1833 DataNode
2233 NodeManager
================ node003 ================
3265 Kafka
2833 QuorumPeerMain
3634 Jps
2067 SecondaryNameNode
2245 NodeManager
1941 DataNode
3430 HRegionServer
[atguigu@node001 ~]$ cd /opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/
[atguigu@node001 apache-phoenix-5.0.0-HBase-2.0-bin]$ cd bin
[atguigu@node001 bin]$ pwd
/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/bin
[atguigu@node001 bin]$ ./sqlline.py node001:2181
Setting property: [incremental, false]
Setting property: [isolation, TRANSACTION_READ_COMMITTED]
issuing: !connect jdbc:phoenix:node001:2181 none none org.apache.phoenix.jdbc.PhoenixDriver
Connecting to jdbc:phoenix:node001:2181
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/opt/module/hbase/apache-phoenix-5.0.0-HBase-2.0-bin/phoenix-5.0.0-HBase-2.0-client.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/opt/module/hadoop/hadoop-3.1.3/share/hadoop/common/lib/slf4j-log4j12-1.7.25.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
23/10/26 20:07:57 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Connected to: Phoenix (version 5.0)
Driver: PhoenixEmbeddedDriver (version 5.0)
Autocommit status: true
Transaction isolation: TRANSACTION_READ_COMMITTED
Building list of tables and columns for tab-completion (set fastconnect to true to skip)...
133/133 (100%) Done
Done
sqlline version 1.2.0
0: jdbc:phoenix:node001:2181> create schema EDU_REALTIME;
No rows affected (3.039 seconds)
0: jdbc:phoenix:node001:2181> !tables 
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
| TABLE_CAT  | TABLE_SCHEM  | TABLE_NAME  |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  | IMMUTABLE_RO |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
|            | SYSTEM       | CATALOG     | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | FUNCTION    | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | LOG         | SYSTEM TABLE  |          |            |                            |                 |              | true         |
|            | SYSTEM       | SEQUENCE    | SYSTEM TABLE  |          |            |                            |                 |              | false        |
|            | SYSTEM       | STATS       | SYSTEM TABLE  |          |            |                            |                 |              | false        |
+------------+--------------+-------------+---------------+----------+------------+----------------------------+-----------------+--------------+--------------+
0: jdbc:phoenix:node001:2181> !tables 
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
| TABLE_CAT  |  TABLE_SCHEM  |        TABLE_NAME         |  TABLE_TYPE   | REMARKS  | TYPE_NAME  | SELF_REFERENCING_COL_NAME  | REF_GENERATION  | INDEX_STATE  |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
|            | SYSTEM        | CATALOG                   | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | FUNCTION                  | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | LOG                       | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | SEQUENCE                  | SYSTEM TABLE  |          |            |                            |                 |              |
|            | SYSTEM        | STATS                     | SYSTEM TABLE  |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_CATEGORY_INFO    | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_PROVINCE         | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_SOURCE           | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_BASE_SUBJECT_INFO     | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_CHAPTER_INFO          | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_COURSE_INFO           | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_KNOWLEDGE_POINT       | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_PAPER            | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_PAPER_QUESTION   | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_POINT_QUESTION   | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_QUESTION_INFO    | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_TEST_QUESTION_OPTION  | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_USER_INFO             | TABLE         |          |            |                            |                 |              |
|            | EDU_REALTIME  | DIM_VIDEO_INFO            | TABLE         |          |            |                            |                 |              |
+------------+---------------+---------------------------+---------------+----------+------------+----------------------------+-----------------+--------------+
0: jdbc:phoenix:node001:2181> 

P028

package com.atguigu.edu.realtime.app.func;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.bean.DimTableProcess;
import com.atguigu.edu.realtime.common.EduConfig;
import com.atguigu.edu.realtime.util.DruidDSUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.api.common.state.BroadcastState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.common.state.ReadOnlyBroadcastState;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.co.BroadcastProcessFunction;
import org.apache.flink.util.Collector;

import java.sql.*;
import java.util.*;

public class DimBroadcastProcessFunction extends BroadcastProcessFunction<JSONObject, String, JSONObject> {
    private MapStateDescriptor<String, DimTableProcess> tableProcessState;

    // 初始化配置表数据
    private HashMap<String, DimTableProcess> configMap = new HashMap<>();

    public DimBroadcastProcessFunction(MapStateDescriptor<String, DimTableProcess> tableProcessState) {
        this.tableProcessState = tableProcessState;
    }

    @Override
    public void open(Configuration parameters) throws Exception {
        super.open(parameters);

        Connection connection = DriverManager.getConnection("jdbc:mysql://node001:3306/edu_config?" +
                "user=root&password=123456&useUnicode=true&" +
                "characterEncoding=utf8&serverTimeZone=Asia/Shanghai&useSSL=false"
        );

        PreparedStatement preparedStatement = connection.prepareStatement("select * from edu_config.table_process");
        ResultSet resultSet = preparedStatement.executeQuery();
        ResultSetMetaData metaData = resultSet.getMetaData();
        while (resultSet.next()) {
            JSONObject jsonObject = new JSONObject();
            for (int i = 1; i <= metaData.getColumnCount(); i++) {
                String columnName = metaData.getColumnName(i);
                String columnValue = resultSet.getString(i);
                jsonObject.put(columnName, columnValue);
            }
            DimTableProcess dimTableProcess = jsonObject.toJavaObject(DimTableProcess.class);
            configMap.put(dimTableProcess.getSourceTable(), dimTableProcess);
        }
        resultSet.close();
        preparedStatement.close();
        connection.close();
    }

    /**
     * @param value flinkCDC直接输入的json
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processBroadcastElement(String value, Context ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取配置表数据解析格式
        JSONObject jsonObject = JSON.parseObject(value);
        String type = jsonObject.getString("op");
        BroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);
        if ("d".equals(type)) {
            // 从状态中删除对应的表格
            DimTableProcess before = jsonObject.getObject("before", DimTableProcess.class);
            tableConfigState.remove(before.getSourceTable());
            // 从configMap中删除对应的表格
            configMap.remove(before.getSourceTable());
        } else {
            DimTableProcess after = jsonObject.getObject("after", DimTableProcess.class);
            //TODO 3 将数据写入到状态 广播出去
            tableConfigState.put(after.getSourceTable(), after);
            //TODO 2 检查phoenix中是否存在表 不存在创建
            String sinkTable = after.getSinkTable();
            String sinkColumns = after.getSinkColumns();
            String sinkPk = after.getSinkPk();
            String sinkExtend = after.getSinkExtend();
            checkTable(sinkTable, sinkColumns, sinkPk, sinkExtend);
        }
    }

    private void checkTable(String sinkTable, String sinkColumns, String sinkPk, String sinkExtend) {
        // create table if not exists table (id string pk, name string...)
        // 拼接建表语句的sql
        StringBuilder sql = new StringBuilder();
        sql.append("create table if not exists " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(\n");

        // 判断主键
        // 如果主键为空,默认使用id
        if (sinkPk == null) {
            sinkPk = "";
        }
        if (sinkExtend == null) {
            sinkExtend = "";
        }

        // 遍历字段拼接建表语句
        String[] split = sinkColumns.split(",");
        for (int i = 0; i < split.length; i++) {
            sql.append(split[i] + " varchar");
            if (split[i].equals(sinkPk)) {
                sql.append(" primary key");
            }
            if (i < split.length - 1) {
                sql.append(",\n");
            }
        }
        sql.append(") ");
        sql.append(sinkExtend);

        PhoenixUtil.executeDDL(sql.toString());
    }

    /**
     * @param value kafka中maxwell生成的json数据
     * @param ctx
     * @param out
     * @throws Exception
     */
    @Override
    public void processElement(JSONObject value, ReadOnlyContext ctx, Collector<JSONObject> out) throws Exception {
        //TODO 1 获取广播的配置数据
        ReadOnlyBroadcastState<String, DimTableProcess> tableConfigState = ctx.getBroadcastState(tableProcessState);
        DimTableProcess tableProcess = tableConfigState.get(value.getString("table"));
        // 补充情况,防止kafka数据到的过快  造成数据丢失
        if (tableProcess == null) {
            tableProcess = configMap.get(value.getString("table"));
        }
        if (tableProcess != null) {
            String type = value.getString("type");
            if (type == null) {
                System.out.println("maxwell采集的数据不完整...");
            } else {
                JSONObject data = value.getJSONObject("data");
                //TODO 2 过滤出需要的维度字段
                String sinkColumns = tableProcess.getSinkColumns();
                filterColumns(data, sinkColumns);
                //TODO 3 补充输出字段
                data.put("sink_table", tableProcess.getSinkTable());
                // 添加数据的类型
                data.put("type", type);
                out.collect(data);
            }
        }
    }

    private void filterColumns(JSONObject data, String sinkColumns) {
        Set<Map.Entry<String, Object>> entries = data.entrySet();
        List<String> stringList = Arrays.asList(sinkColumns.split(","));
        entries.removeIf(entry -> !stringList.contains(entry.getKey()));
    }
}

P029

package com.atguigu.edu.realtime.util;

import com.alibaba.druid.pool.DruidDataSource;
import com.alibaba.druid.pool.DruidPooledConnection;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.common.EduConfig;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.commons.lang3.StringUtils;

import java.lang.reflect.InvocationTargetException;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Set;

public class PhoenixUtil {
    private static DruidDataSource druidDataSource = DruidDSUtil.getDruidDataSource();

    public static void executeDDL(String sqlString) {
        DruidPooledConnection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = druidDataSource.getConnection();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("连接池获取连接异常...");
        }

        try {
            preparedStatement = connection.prepareStatement(sqlString);
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("编译sql异常...");
        }

        try {
            preparedStatement.execute();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("建表语句错误...");
        }

        // 关闭资源
        try {
            preparedStatement.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }

        try {
            connection.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }

    public static void executeDML(String sinkTable, JSONObject jsonObject) {
        // TODO 2 拼接sql语言
        StringBuilder sql = new StringBuilder();
        Set<Map.Entry<String, Object>> entries = jsonObject.entrySet();
        ArrayList<String> columns = new ArrayList<>();
        ArrayList<Object> values = new ArrayList<>();
        StringBuilder symbols = new StringBuilder();
        for (Map.Entry<String, Object> entry : entries) {
            columns.add(entry.getKey());
            values.add(entry.getValue());
            symbols.append("?,");
        }

        sql.append("upsert into " + EduConfig.HBASE_SCHEMA + "." + sinkTable + "(");

        // 拼接列名
        String columnsStrings = StringUtils.join(columns, ",");
        String symbolStr = symbols.substring(0, symbols.length() - 1).toString();
        sql.append(columnsStrings)
                .append(")values(")
                .append(symbolStr)
                .append(")");

        DruidPooledConnection connection = null;
        try {
            connection = druidDataSource.getConnection();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("连接池获取连接异常...");
        }

        PreparedStatement preparedStatement = null;
        try {
            preparedStatement = connection.prepareStatement(sql.toString());
            // 传入参数
            for (int i = 0; i < values.size(); i++) {
                preparedStatement.setObject(i + 1, values.get(i) + "");
            }
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("编译sql异常...");
        }

        try {
            preparedStatement.executeUpdate();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
            System.out.println("写入phoenix错误...");
        }

        try {
            preparedStatement.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }

        try {
            connection.close();
        } catch (SQLException throwables) {
            throwables.printStackTrace();
        }
    }

    public static <T> List<T> queryList(String sql, Class<T> clazz) {
        ArrayList<T> resultList = new ArrayList<>();
        DruidPooledConnection connection = null;
        PreparedStatement preparedStatement = null;
        try {
            connection = druidDataSource.getConnection();
            preparedStatement = connection.prepareStatement(sql);
            ResultSet resultSet = preparedStatement.executeQuery();
            ResultSetMetaData metaData = resultSet.getMetaData();
            while (resultSet.next()) {
                T obj = clazz.newInstance();
                for (int i = 1; i <= metaData.getColumnCount(); i++) {
                    String columnName = metaData.getColumnName(i);
                    Object columnValue = resultSet.getObject(i);
                    BeanUtils.setProperty(obj, columnName, columnValue);
                }
                resultList.add(obj);
            }
        } catch (Exception e) {
            e.printStackTrace();
        }

        if (preparedStatement != null) {
            try {
                preparedStatement.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }

        if (connection != null) {
            try {
                connection.close();
            } catch (SQLException throwables) {
                throwables.printStackTrace();
            }
        }
        return resultList;
    }
}
package com.atguigu.edu.realtime.app.func;

import com.alibaba.fastjson.JSONObject;
import com.atguigu.edu.realtime.util.DimUtil;
import com.atguigu.edu.realtime.util.PhoenixUtil;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;

public class DimPhoenixSinkFunc implements SinkFunction<JSONObject> {
    @Override
    public void invoke(JSONObject jsonObject, Context context) throws Exception {
        // TODO 1 获取输出的表名
        String sinkTable = jsonObject.getString("sink_table");

//        String type = jsonObject.getString("type");
//        String id = jsonObject.getString("id");
        jsonObject.remove("sink_table");
//        jsonObject.remove("type");

        // TODO 2 使用工具类写出数据
        PhoenixUtil.executeDML(sinkTable, jsonObject);

        // TODO 3 如果类型为update,删除redis对应缓存
//        if ("update".equals(type)) {
//            DimUtil.deleteCached(sinkTable, id);
//        }
    }
}

P030

启动hadoop、zookeeper、kafka、hbase、maxwell。

org.apache.phoenix.schema.ColumnNotFoundException: ERROR 504 (42703): Undefined column. columnName=EDU_REALTIME.DIM_BASE_PROVINCE.TYPE
	at org.apache.phoenix.schema.PTableImpl.getColumnForColumnName(PTableImpl.java:828)
	at org.apache.phoenix.compile.FromCompiler$SingleTableColumnResolver.resolveColumn(FromCompiler.java:477)
	at org.apache.phoenix.compile.UpsertCompiler.compile(UpsertCompiler.java:452)
	at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:784)
	at org.apache.phoenix.jdbc.PhoenixStatement$ExecutableUpsertStatement.compilePlan(PhoenixStatement.java:770)
	at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:401)
	at org.apache.phoenix.jdbc.PhoenixStatement$2.call(PhoenixStatement.java:391)
	at org.apache.phoenix.call.CallRunner.run(CallRunner.java:53)
	at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:390)
	at org.apache.phoenix.jdbc.PhoenixStatement.executeMutation(PhoenixStatement.java:378)
	at org.apache.phoenix.jdbc.PhoenixPreparedStatement.executeUpdate(PhoenixPreparedStatement.java:206)
	at com.alibaba.druid.pool.DruidPooledPreparedStatement.executeUpdate(DruidPooledPreparedStatement.java:255)
	at com.atguigu.edu.realtime.util.PhoenixUtil.executeDML(PhoenixUtil.java:105)
	at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:19)
	at com.atguigu.edu.realtime.app.func.DimPhoenixSinkFunc.invoke(DimPhoenixSinkFunc.java:7)
	at org.apache.flink.streaming.api.operators.StreamSink.processElement(StreamSink.java:54)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.pushToOperator(CopyingChainingOutput.java:82)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:57)
	at org.apache.flink.streaming.runtime.tasks.CopyingChainingOutput.collect(CopyingChainingOutput.java:29)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:56)
	at org.apache.flink.streaming.api.operators.CountingOutput.collect(CountingOutput.java:29)
	at org.apache.flink.streaming.api.operators.TimestampedCollector.collect(TimestampedCollector.java:51)
	at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:148)
	at com.atguigu.edu.realtime.app.func.DimBroadcastProcessFunction.processElement(DimBroadcastProcessFunction.java:21)
	at org.apache.flink.streaming.api.operators.co.CoBroadcastWithNonKeyedOperator.processElement1(CoBroadcastWithNonKeyedOperator.java:110)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.processRecord1(StreamTwoInputProcessorFactory.java:217)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory.lambda$create$0(StreamTwoInputProcessorFactory.java:183)
	at org.apache.flink.streaming.runtime.io.StreamTwoInputProcessorFactory$StreamTaskNetworkOutput.emitRecord(StreamTwoInputProcessorFactory.java:266)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:134)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:105)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:65)
	at org.apache.flink.streaming.runtime.io.StreamMultipleInputProcessor.processInput(StreamMultipleInputProcessor.java:85)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:542)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:831)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:780)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:935)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:914)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:728)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:550)
	at java.lang.Thread.run(Thread.java:748)
写入phoenix错误...

😘👌💕

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1142221.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

面向服务的架构设计理论与实践

面向服务的架构设计理论与实践 面向服务的架构概述 SOA的定义 SOA发展现状 面向Web服务的业务流程执行语言(BPEL) BPEL&#xff08;面向Web服务的业务流程执行语言&#xff09;是一种用于描述和执行业务流程的标准化语言。它可以帮助组织在分布式系统中协调和管理各种Web服务…

【C++进阶】set和map的基本使用(灰常详细)

&#x1f466;个人主页&#xff1a;Weraphael ✍&#x1f3fb;作者简介&#xff1a;目前学习C和算法 ✈️专栏&#xff1a;C航路 &#x1f40b; 希望大家多多支持&#xff0c;咱一起进步&#xff01;&#x1f601; 如果文章对你有帮助的话 欢迎 评论&#x1f4ac; 点赞&#x1…

QQ云端机器人登录系统php源码开心版

可能很多人不知道这源码有什么用&#xff0c;这款源码主要是针对群机器人爱好者的&#xff0c; 这是一个通过对接挂机宝里面机器人框架的一个网页站点&#xff0c; 用户通过网页登录 QQ 账号至挂机宝里面框架&#xff08;可扫码登录、账密登录、跳转 QQ 快捷登录&#xff09;…

Canal整合SpringBoot详解(一)

文章目录 Canal整合SpringBoot详解&#xff08;一&#xff09;什么是canal搭建Kafka3.2.1集群⭐Kafka集群机器规划创建3台虚拟机&#xff08;centos7系统&#xff09;必要的环境准备&#xff08;3台虚拟机都要执行如下操作&#xff09;⭐分别修改每个服务器的hosts文件&#xf…

【2021集创赛】Robei杯一等奖:基于Robei EDA工具的隔离病房看护机器人设计

本作品参与极术社区组织的有奖征集|秀出你的集创赛作品风采,免费电子产品等你拿~活动。 团队介绍 参赛单位&#xff1a;重庆交通大学 队伍名称&#xff1a;一丘之貉 指导老师&#xff1a;毕波 李艾星 参赛队员&#xff1a;郁航 张坤 秦衡 总决赛奖项&#xff1a;Robei杯一等奖…

iOS开发-CoreNFC实现NFC标签Tag读取功能

iOS开发-CoreNFC实现NFC标签Tag读取功能 一、NFC近场通信 近场通信&#xff08;NFC&#xff09;是一种无线通信技术&#xff0c;它使设备能够在不使用互联网的情况下相互通信。它首先识别附近配备NFC的设备。NFC常用于智能手机和平板电脑。 二、实现NFC标签Tag读取功能 在…

Linux | 进程终止与进程等待

目录 前言 一、进程终止 1、进程终止的几种可能 2、exit 与 _exit 二、进程等待 1、为什么要进程等待 2、如何进行进程等待 &#xff08;1&#xff09;wait函数 &#xff08;2&#xff09;waitpid函数 3、再次深刻理解进程等待 前言 我们前面介绍进程时说子进程退出…

Canal整合SpringBoot详解(二)

文章目录 Canal整合SpringBoot详解&#xff08;二&#xff09;什么是canal案例2&#xff1a;CanalKafka实现mysql和elasticsearch的数据同步⭐Docker搭建elasticsearch7.8.0&#xff08;单机版本&#xff09;⭐Docker安装elasticsearch-head5⭐解决es-head 406错误问题直接修改…

实用篇-Eureka注册中心

一、提供者与消费者 服务提供者&#xff1a;一次业务中&#xff0c;被其他微服务调用的服务。(提供接口给其他微服务) 服务消费者&#xff1a;一次业务中&#xff0c;调用其他微服务的服务。(调用其他微服务提供的接口) 例如前面的案例中&#xff0c;order-service微服务是服…

系列七、动态代理

一、概述 二、Jdk动态代理案例 2.1、Star /*** Author : 一叶浮萍归大海* Date: 2023/10/27 17:16* Description:*/ public interface Star {/*** 唱歌* param name 歌曲名字* return*/String sing(String name);/*** 跳舞*/void dance(); } 2.2、BigStar /*** Author : 一叶…

AcWing 1.2.1 最长上升子序列模型 + 动态规划 + 图解(详细)

&#xff08;1&#xff09;acwing 4557. 最长上升子序列 4557. 最长上升子序列 - AcWing题库 给定一个长度为 N 的整数序列 a1,a2,…,aN。请你计算该序列的最长上升子序列的长度。上升子序列是指数值严格单调递增的子序列 输入格式 第一行包含整数 N第二行包含 N个整数 a1,a…

LLM系列 | 23:多模态大模型:浦语·灵笔InternLM-XComposer解读、实战和思考

引言 ​简介 模型解读 模型架构 训练 实战 环境准备 本地实测 服务部署 总结 引言 谁念西风独自凉&#xff0c;萧萧黄叶闭疏窗&#xff0c;沉思往事立残阳。 Created by DALLE 3 小伙伴们好&#xff0c;我是《小窗幽记机器学习》的小编&#xff1a;卖热干面的小女孩…

在Golang中理解错误处理

处理Golang中临时错误和最终错误的策略和示例 作为一名精通Golang的开发人员&#xff0c;您了解有效的错误处理是编写健壮可靠软件的关键因素。在复杂系统中&#xff0c;错误可能采取各种形式&#xff0c;包括临时故障和最终失败。在本文中&#xff0c;我们将探讨处理Golang中…

源码解析SpringMVC之RequestMapping注解原理

1、启动初始化 核心&#xff1a;得到应用上下文中存在的全部bean后依次遍历&#xff0c;分析每一个目标handler & 目标方法存在的注解RequestMapping&#xff0c;将其相关属性封装为实例RequestMappingInfo。最终将 uri & handler 之间的映射关系维护在类AbstractHand…

Java入门篇 之 数据类型(简单介绍)

博主回归学习状态的第三篇文章&#xff0c;希望对大家有所帮助 今日份励志文案:你若决定灿烂&#xff0c;山无遮&#xff0c;海无拦 加油&#xff01; Java中一共存在2种数据类型 1 . 基本数据类型,基本数据类型四种和八种之说(具体看下图) 四种说的是&#xff0c;整数型&…

vscode打开settings.json方法

cmd shift p&#xff0c;输入setting Open Workspace Settings 也会打开UI设置界面&#xff1b; Open User Settings (JSON) 会打开用户设置 settings.json 文件&#xff1b; Open Workspace Settings (JSON) 会打开工作区设置 settings.json 文件 vscode存在两种设置 sett…

损失函数和目标函数|知识补充

这张图中&#xff0c;横坐标size表示房屋的大小&#xff0c;纵坐标price表示房屋的价格&#xff0c;现在需要建立模型来表示两者之间的关系。 对于给定的输入x&#xff0c;模型会有一个输出f(x)&#xff0c;用一个函数来度量拟合的程度&#xff0c;也就是真实值和预测值之间的…

前端工程化面试题及答案【集合】

前言&#xff1a; 欢迎浏览和关注本专栏《 前端就业宝典 》&#xff0c; 不管是扭螺丝还是造火箭&#xff0c; 多学点知识总没错。 这个专栏是扭螺丝之上要造火箭级别的知识&#xff0c;会给前端工作学习的小伙伴带来意想不到的帮助。 本专栏将前端知识拆整为零&#xff0c;主要…

工业相机常见的工作模式、触发方式

参考&#xff1a;机器视觉——工业相机的触发应用(1) - 知乎 工业相机常见的工作模式一般分为&#xff1a; 触发模式连续模式同步模式授时同步模式 触发模式&#xff1a;相机收到外部的触发命令后&#xff0c;开始按照约定时长进行曝光&#xff0c;曝光结束后输出一帧图像。…

子集生成算法:给定一个集合,枚举所有可能的子集

给定一个集合&#xff0c;枚举所有可能的子集。 &#xff08;为简单起见&#xff0c;本文讨论的集合中没有重复元素&#xff09; 1、方法一&#xff1a;增量构造法 第一种思路是一次选出一个元素放到集合中&#xff0c;程序如下&#xff1a; void print_subset(int n, int …