物流实时数仓:数仓搭建(ODS)

news2024/11/16 23:46:27

系列文章目录

物流实时数仓:采集通道搭建
物流实时数仓:数仓搭建


文章目录

  • 系列文章目录
  • 前言
  • 一、IDEA环境准备
    • 1.pom.xml
    • 2.目录创建
  • 二、代码编写
    • 1.log4j.properties
    • 2.CreateEnvUtil.java
    • 3.KafkaUtil.java
    • 4.OdsApp.java
  • 三、代码测试
  • 总结


前言

现在我们开始进行数仓的搭建,我们用Kafka来代替数仓的ods层。
基本流程为使用Flink从MySQL读取数据然后写入Kafka中


一、IDEA环境准备

1.pom.xml

写入项目需要的配置

<properties>
        <maven.compiler.source>8</maven.compiler.source>
        <maven.compiler.target>8</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <java.version>1.8</java.version>
        <flink.version>1.17.0</flink.version>
        <hadoop.version>3.2.3</hadoop.version>
        <flink-cdc.version>2.3.0</flink-cdc.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.68</version>
        </dependency>

        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-client</artifactId>
            <version>${hadoop.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>org.slf4j</groupId>
                    <artifactId>slf4j-reload4j</artifactId>
                </exclusion>
            </exclusions>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>1.7.25</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-to-slf4j</artifactId>
            <version>2.14.0</version>
        </dependency>

        <dependency>
            <groupId>com.ververica</groupId>
            <artifactId>flink-connector-mysql-cdc</artifactId>
            <version>${flink-cdc.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-runtime</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-planner-loader</artifactId>
            <version>${flink.version}</version>
        </dependency>
    </dependencies>

基本上项目需要的所有jar包都有了,不够以后在加。

2.目录创建

在这里插入图片描述按照以上目录结构进行目录创建

二、代码编写

1.log4j.properties

log4j.rootLogger=error,stdout
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.target=System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n

2.CreateEnvUtil.java

这个文件中有两个方法
创建初始化Flink的env
Flink连接mysql的MySqlSource

package com.atguigu.tms.realtime.utils;


import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import com.ververica.cdc.connectors.mysql.source.MySqlSourceBuilder;
import com.ververica.cdc.connectors.mysql.table.StartupOptions;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import org.apache.flink.api.common.restartstrategy.RestartStrategies;
import org.apache.flink.api.common.time.Time;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.runtime.state.hashmap.HashMapStateBackend;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.kafka.connect.json.DecimalFormat;
import org.apache.kafka.connect.json.JsonConverterConfig;

import java.util.HashMap;

public class CreateEnvUtil {
    public static StreamExecutionEnvironment getStreamEnv(String[] args) {
        // 1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 2.检查点相关设置
        // 2.1 开启检查点
        env.enableCheckpointing(6000L, CheckpointingMode.EXACTLY_ONCE);
        // 2.2 设置检查点的超时时间
        env.getCheckpointConfig().setCheckpointTimeout(120000L);
        // 2.3 设置job取消之后 检查点是否保留
        env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 2.4 设置两个检查点之间的最小时间间隔
        env.getCheckpointConfig().setMinPauseBetweenCheckpoints(30000L);
        // 2.5 设置重启策略
        env.setRestartStrategy(RestartStrategies.failureRateRestart(3, Time.days(1), Time.seconds(3)));
        // 2.6 设置状态后端
        env.setStateBackend(new HashMapStateBackend());
        env.getCheckpointConfig().setCheckpointStorage("hdfs://hadoop102:8020/tms/ck");
        // 2.7 设置操作hdfs用户
        // 获取命令行参数
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String hdfsUserName = parameterTool.get("hadoop-user-name", "atguigu");
        System.setProperty("HADOOP_USER_NAME", hdfsUserName);
        return env;
        
    }

    public static MySqlSource<String> getMysqlSource(String option, String serverId, String[] args) {
        ParameterTool parameterTool = ParameterTool.fromArgs(args);
        String mysqlHostname = parameterTool.get("hadoop-user-name", "hadoop102");
        int mysqlPort = Integer.parseInt(parameterTool.get("mysql-port", "3306"));
        String mysqlUsername = parameterTool.get("mysql-username", "root");
        String mysqlPasswd = parameterTool.get("mysql-passwd", "000000");
        option = parameterTool.get("start-up-option", option);
        serverId = parameterTool.get("server-id", serverId);

        // 创建配置信息 Map 集合,将 Decimal 数据类型的解析格式配置 k-v 置于其中
        HashMap config = new HashMap<>();
        config.put(JsonConverterConfig.DECIMAL_FORMAT_CONFIG, DecimalFormat.NUMERIC.name());
        // 将前述 Map 集合中的配置信息传递给 JSON 解析 Schema,该 Schema 将用于 MysqlSource 的初始化
        JsonDebeziumDeserializationSchema jsonDebeziumDeserializationSchema =
                new JsonDebeziumDeserializationSchema(false, config);

        MySqlSourceBuilder<String> builder = MySqlSource.<String>builder()
                .hostname(mysqlHostname)
                .port(mysqlPort)
                .username(mysqlUsername)
                .password(mysqlPasswd)
                .deserializer(jsonDebeziumDeserializationSchema);
        switch (option) {
            // 读取实时数据
            case "dwd":
                String[] dwdTables = new String[]{
                        "tms.order_info",
                        "tms.order_cargo",
                        "tms.transport_task",
                        "tms.order_org_bound"};
                return builder
                        .databaseList("tms")
                        .tableList(dwdTables)
                        .startupOptions(StartupOptions.latest())
                        .serverId(serverId)
                        .build();

            // 读取维度数据
            case "realtime_dim":
                String[] realtimeDimTables = new String[]{
                        "tms.user_info",
                        "tms.user_address",
                        "tms.base_complex",
                        "tms.base_dic",
                        "tms.base_region_info",
                        "tms.base_organ",
                        "tms.express_courier",
                        "tms.express_courier_complex",
                        "tms.employee_info",
                        "tms.line_base_shift",
                        "tms.line_base_info",
                        "tms.truck_driver",
                        "tms.truck_info",
                        "tms.truck_model",
                        "tms.truck_team"};
                return builder
                        .databaseList("tms")
                        .tableList(realtimeDimTables)
                        .startupOptions(StartupOptions.initial())
                        .serverId(serverId)
                        .build();


        }

        Log.error("不支持操作类型");
        return null;

    }
}

3.KafkaUtil.java

该文件中有一个方法,创建Flink连接Kafka需要的Sink

package com.atguigu.tms.realtime.utils;

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.connector.base.DeliveryGuarantee;
import org.apache.flink.connector.kafka.sink.KafkaRecordSerializationSchema;
import org.apache.flink.connector.kafka.sink.KafkaSink;
import org.apache.kafka.clients.producer.ProducerConfig;

public class KafkaUtil {
    private static final String KAFKA_SERVER = "hadoop102:9092,hadoop103:9092,hadoop104:9092";

    public static KafkaSink<String> getKafkaSink(String topic, String transIdPrefix, String[] args) {
        // 将命令行参数对象封装为 ParameterTool 类对象
        ParameterTool parameterTool = ParameterTool.fromArgs(args);

        // 提取命令行传入的 key 为 topic 的配置信息,并将默认值指定为方法参数 topic
        // 当命令行没有指定 topic 时,会采用默认值
        topic = parameterTool.get("topic", topic);
        // 如果命令行没有指定主题名称且默认值为 null 则抛出异常
        if (topic == null) {
            throw new IllegalArgumentException("主题名不可为空:命令行传参为空且没有默认值!");
        }

        // 获取命令行传入的 key 为 bootstrap-servers 的配置信息,并指定默认值
        String bootstrapServers = parameterTool.get("bootstrap-severs", KAFKA_SERVER);
        // 获取命令行传入的 key 为 transaction-timeout 的配置信息,并指定默认值
        String transactionTimeout = parameterTool.get("transaction-timeout", 15 * 60 * 1000 + "");


        return KafkaSink.<String>builder()
                .setBootstrapServers(bootstrapServers)
                .setRecordSerializer(KafkaRecordSerializationSchema.builder()
                        .setTopic(topic)
                        .setValueSerializationSchema(new SimpleStringSchema())
                        .build()
                )
                .setDeliveryGuarantee(DeliveryGuarantee.AT_LEAST_ONCE)
                .setTransactionalIdPrefix(transIdPrefix)
                .setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, transactionTimeout)
                .build();

    }

    public static KafkaSink<String> getKafkaSink(String topic, String[] args) {
        return getKafkaSink(topic, topic + "_trans", args);

    }
}

4.OdsApp.java

Ods层的app创建,负责读取和写入数据

package com.atguigu.tms.realtime.app.ods;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import com.atguigu.tms.realtime.utils.CreateEnvUtil;
import com.atguigu.tms.realtime.utils.KafkaUtil;
import com.esotericsoftware.minlog.Log;
import com.ververica.cdc.connectors.mysql.source.MySqlSource;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;

public class OdsApp {
    public static void main(String[] args) throws Exception {
        // 1.获取流处理环境并指定检查点
        StreamExecutionEnvironment env = CreateEnvUtil.getStreamEnv(args);
        env.setParallelism(4);


        // 2 使用FlinkCDC从MySQL中读取数据-事实数据
        String dwdOption = "dwd";
        String dwdServerId = "6030";
        String dwdsourceName = "ods_app_dwd_source";

        mysqlToKafka(dwdOption, dwdServerId, dwdsourceName, env, args);

        // 3 使用FlinkCDC从MySQL中读取数据-维度数据
        String realtimeDimOption = "realtime_dim";
        String realtimeDimServerId = "6040";
        String realtimeDimsourceName = "ods_app_realtimeDim_source";

        mysqlToKafka(realtimeDimOption, realtimeDimServerId, realtimeDimsourceName, env, args);

        env.execute();


    }

    public static void mysqlToKafka(String option, String serverId, String sourceName, StreamExecutionEnvironment env, String[] args) {

        MySqlSource<String> MySqlSource = CreateEnvUtil.getMysqlSource(option, serverId, args);

        SingleOutputStreamOperator<String> dwdStrDS = env.fromSource(MySqlSource, WatermarkStrategy.noWatermarks(), sourceName)
                .setParallelism(1)
                .uid(option + sourceName);


        // 3 简单ETL
        SingleOutputStreamOperator<String> processDS = dwdStrDS.process(
                new ProcessFunction<String, String>() {
                    @Override
                    public void processElement(String jsonStr, ProcessFunction<String, String>.Context ctx, Collector<String> out) {
                        try {
                            JSONObject jsonObj = JSONObject.parseObject(jsonStr);
                            if (jsonObj.getJSONObject("after") != null && !"d".equals(jsonObj.getString("op"))) {
//                                System.out.println(jsonObj);
                                Long tsMs = jsonObj.getLong("ts_ms");
                                jsonObj.put("ts", tsMs);
                                jsonObj.remove("ts_ms");
                                String jsonString = jsonObj.toJSONString();
                                out.collect(jsonString);
                            }

                        } catch (Exception e) {
                            Log.error("从Flink-CDC得到的数据不是一个标准的json格式",e);
                        }
                    }
                }
        ).setParallelism(1);
        // 4 按照主键进行分组,避免出现乱序
        KeyedStream<String, String> keyedDS = processDS.keyBy((KeySelector<String, String>) jsonStr -> {
            JSONObject jsonObj = JSON.parseObject(jsonStr);
            return jsonObj.getJSONObject("after").getString("id");
        });

        //将数据写入Kafka

        keyedDS.sinkTo(KafkaUtil.getKafkaSink("tms_ods", sourceName + "_transPre", args))
                .uid(option + "_ods_app_sink");
    }
}

三、代码测试

在虚拟机启动我们需要的组件,目前需要hadoop、zk、kafka和MySQL。
在这里插入图片描述
先开一个消费者进行消费。

bin/kafka-console-consumer.sh --bootstrap-server hadoop102:9092 --topic tms_ods

然后运行OdsApp.java
他会先读取维度数据,因为维度数据需要全量更新之前的数据。
在这里插入图片描述
当他消费结束后,我们运行jar包,获取事实数据。

java -jar tms-mock-2023-01-06.jar 

如果能消费到新数据,代表通道没问题,ODS层创建完成。

在这里插入图片描述


总结

至此ODS搭建完成。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1246319.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

6.显示评论 + 添加评论

1.显示评论 数据层&#xff1a;根据实体查询一页评论数据、根据实体查询评论的数量业务层&#xff1a;处理查询评论的业务、处理查询评论数量的业务表现层&#xff1a;显示帖子详情数据时&#xff0c;同时显示该帖子所有的评论数据 1.1 数据访问层 entity_type&#xff1a;实体…

小红书达人等级有哪些,不同粉丝量有什么区别?

小红书的种草能力和社区加电商的模式&#xff0c;使得越来越多品牌将目光放在小红书笔记上。而在平台的营销与传播&#xff0c;多是借助达人进行的&#xff0c;今天为大家分享下小红书达人等级有哪些&#xff0c;不同粉丝量有什么区别&#xff1f; 一、小红书达人等级划分 虽然…

医院手术麻醉信息系统全套源码,自主版权,支持二次开发

医院手术麻醉信息系统全套商业源码&#xff0c;自主版权&#xff0c;支持二次开发 手术麻醉信息系统是HIS产品的中的一个组成部分&#xff0c;主要应用于医院的麻醉科&#xff0c;属于电子病历类产品。医院麻醉监护的功能覆盖整个手术与麻醉的全过程&#xff0c;包括手术申请与…

uniapp实现多时间段设置

功能说明&#xff1a; 1 点击新增时间&#xff0c;出现一个默认时间段模板&#xff0c;不能提交 2 点击“新增时间文本”&#xff0c;弹出弹窗&#xff0c;选择时间&#xff0c;不允许开始时间和结束时间同时为00:00&#xff0c; <view class"item_cont"> …

JWT 还能这样的去理解嘛??

其他系列文章目录 设计模式合集 多线程合集 分布式合集 ES合集 文章目录 其他系列文章目录 文章目录 一、什么是 JWT? 二、JWT 由哪些部分组成&#xff1f; 2.1Header 2.2Payload 2.3Signature 三、如何基于 JWT 进行身份验证&#xff1f; 四、如何防止 JWT 被篡改&a…

_STORAGE_WRITE_ERROR_ thinkphp报错问题原因

整个报错内容如下 Uncaught exception Think\Exception with message _STORAGE_WRITE_ERROR_:./Runtime/Cache/Home/1338db9dec777aab181d4e74d1bdf964.php in C:\inetpub\wwwroot\ThinkPHP\Common\functions.php:101 Stack trace: #0 C:\inetpub\wwwroot\ThinkPHP\Library\…

“圆柱-计算公式“技术支持网址

该软件可以计算圆柱的底面圆周长、底面积、侧面积和体积。 您在使用中有遇到任何问题都可以和我们联系。我们会在第一时间回复您。 邮箱地址&#xff1a;elmo30zeongmail.com 谢谢&#xff01;

【深度学习】CNN中pooling层的作用

1、pooling是在卷积网络&#xff08;CNN&#xff09;中一般在卷积层&#xff08;conv&#xff09;之后使用的特征提取层&#xff0c;使用pooling技术将卷积层后得到的小邻域内的特征点整合得到新的特征。一方面防止无用参数增加时间复杂度&#xff0c;一方面增加了特征的整合度…

骨传导能保护听力吗?为什么说骨传导耳机可以保护听力?

由于骨传导耳机的特殊传声方式&#xff0c;是可以保护听力的。 首先了解下骨传导耳机的传声方式是什么&#xff1a; 骨传导耳机是通过骨骼震动传导技术&#xff0c;将声音传至颅骨&#xff0c;然后通过颅骨传导到内耳&#xff0c;直接刺激听觉神经&#xff0c;使人感知到声音…

最全的电商API接口|全面淘宝平台数据接口参数和文档说明

淘宝联盟“工具服务商”开放招募 为了支持生态淘宝客业务提效及新业务模式拓展&#xff0c;淘宝联盟针对各工具业务模式招募若干工具服务商团队&#xff0c;仅限符合该招募规则的开发者申请创建工具服务商AppKey&#xff0c;并针对新引入的工具服务商将开放对应模式所需要的“服…

易点易动固定资产管理系统:全生命周期管理的理想选择

在现代企业中&#xff0c;固定资产管理是一项至关重要的任务。为了确保企业的资产安全、提高资产利用率&#xff0c;全面管理固定资产的生命周期至关重要。易点易动固定资产管理系统为企业提供了一种全面的解决方案&#xff0c;实现了从固定资产申购、采购、入库、领用、退库、…

iOS强引用引起的内存泄漏

项目中遇到一个问题&#xff1a; 1.在A页面的ViewDidLoad 方法里写了一个接收通知的方法&#xff0c;如下图&#xff1a; 然后在B页面发送通知 &#xff08;注&#xff1a;下图的NOTI 是 [NSNotificationCenter defaultCenter] 的宏&#xff0c; 考虑一下可能有小白看这篇文章…

打印菱形-第11届蓝桥杯选拔赛Python真题精选

[导读]&#xff1a;超平老师的Scratch蓝桥杯真题解读系列在推出之后&#xff0c;受到了广大老师和家长的好评&#xff0c;非常感谢各位的认可和厚爱。作为回馈&#xff0c;超平老师计划推出《Python蓝桥杯真题解析100讲》&#xff0c;这是解读系列的第9讲。 打印菱形&#xff…

Android笔记(十五):JetPack Compose的附带效应(二)-produceState和derivedStateOf

在本笔记中&#xff0c;将结合实例介绍produceState和derivedStateOf两个可组合函数。它们分别实现状态的转换。 &#xff08;1&#xff09;produceState将非Compose状态转换虫Compose状态 &#xff08;2&#xff09;derivedStateOf将多个状态转换成其他状态。 一、produceSta…

2023年金融信创行业研究报告

第一章 行业概况 1.1 定义 金融信创是指在金融行业中应用的信息技术&#xff0c;特别是那些涉及到金融IT基础设施、基础软件、应用软件和信息安全等方面的技术和产品。这一概念源于更广泛的“信创 (信息技术应用创新)”&#xff0c;即通过中国国产信息技术替换海外信息技术&a…

全球最大生产基地已投产,百年京西借智能悬架谋「新生」

受相关等爆款车型的高配置率及销量带动&#xff0c;空气悬架市场热度不减。 比如&#xff0c;理想在今年的理想魔毯空气悬架技术日上宣布&#xff0c;搭载空气悬架的车型累计交付已突破20万辆&#xff0c;在所有已交付的L9、L8、L7中&#xff0c;配备空气悬架的比例达93%。 作…

php通过curl方式发送接受xml数据

目录 1、php通过curl方式发送xml数据 2、php通过file_get_contents接受curl方式发送xml数据 1、php通过curl方式发送xml数据 <?php function sendXmlData($url, $xmlData) {$ch curl_init();curl_setopt($ch, CURLOPT_URL, $url);curl_setopt($ch, CURLOPT_RETURNTRANSFE…

亚马逊云科技向量数据库助力生成式AI成功落地实践探秘(二)

向量数据库选择哪种近似搜索算法&#xff0c;选择合适的集群规模以及集群设置调优对于知识库的读写性能也十分关键&#xff0c;主要需要考虑以下几个方面&#xff1a; 向量数据库算法选择 在 OpenSearch 里&#xff0c;提供了两种 k-NN 的算法&#xff1a;HNSW (Hierarchical…

代码随想录算法训练营第四十四天【动态规划part06】 | 完全背包、518. 零钱兑换 II、377. 组合总和 Ⅳ

完全背包 有N件物品和一个最多能背重量为W的背包。第i件物品的重量是weight[i]&#xff0c;得到的价值是value[i] 。每件物品都有无限个&#xff08;也就是可以放入背包多次&#xff09;&#xff0c;求解将哪些物品装入背包里物品价值总和最大。 题目链接&#xff1a; 题目页…

android keylayout键值适配

1、通过getevent打印查看当前keyevent数字对应事件和物理码 2、dumpsys input 查看输入事件对应的 KeyLayoutFile: /system/usr/keylayout/Vendor_6080_Product_8060.kl 3、通过物理码修改键值映射&#xff0c;修改/system/usr/keylayout/目录下的文件