27.hadoop系列之50G数据清洗入库秒查询实践

news2024/12/23 18:38:05

1. 项目背景

目前本地有50G的企业年报csv数据, 需要清洗出通信地址,并需要与原有的亿条数据合并以供业务查询最新的企业通信地址

2. 技术选型

Hadoop + ClickHouse

3. Hadoop数据清洗

我们50G的数据无须上传至集群处理,上传目前带宽2M/S, 巨慢,我直接在本地hadoop处理

我们先看下数据格式,以@_@分割,最后一列是杂乱的数据

315@_@102878404@_@91430802MA4PPBWA9Y@_@3@_@2021-03-19 15:29:05@_@2021-03-19 15:29:04@_@-@_@2019@_@<tr> <!--180 285 145--> <td>统一社会信用代码/注册号</td> <td>91430802MA4PPBWA9Y</td> <td>企业名称</td> <td>张家界恒晟广告传媒有限公司</td></tr><tr> <td>企业联系电话</td> <td>15874401535</td> <td>邮政编码</td> <td>427000</td></tr><tr> <td>企业经营状态</td> <td>开业</td> <td>从业人数</td> <td>1人</td></tr><tr> <td>电子邮箱</td> <td>-</td> <td>是否有网站或网店</td> <td>否</td></tr><tr> <td>企业通信地址</td> <td>湖南省张家界市永定区大庸桥办事处大庸桥居委会月亮湾小区金月阁5601号</td> <td>企业是否有投资信息<br>或购买其他公司股权</td> <td>否</td></tr><tr> <td>资产总额</td> <td>企业选择不公示</td> <td>所有者权益合计</td> <td>企业选择不公示</td></tr><tr> <td>销售总额</td> <td>企业选择不公示</td> <td>利润总额</td> <td>企业选择不公示</td></tr><tr> <td>营业总收入中主营业务收入</td> <td>企业选择不公示</td> <td>净利润</td> <td>企业选择不公示</td></tr><tr> <td>纳税总额</td>
public class Company implements Tool {

    private Configuration conf;

    @Override
    public int run(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "company");
        job.setJarByClass(CompanyDriver.class);
        job.setMapperClass(CompanyMapper.class);
        job.setReducerClass(CompanyReducer.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(NullWritable.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(NullWritable.class);
        job.setNumReduceTasks(1);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        return job.waitForCompletion(true) ? 0 : 1;
    }

    @Override
    public void setConf(Configuration conf) {
        this.conf = conf;
    }

    @Override
    public Configuration getConf() {
        return conf;
    }

    public static class CompanyMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
        private Text keyOut = new Text();
        private Text valueOut = new Text();

        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String line = value.toString();
            String[] words = line.split("@_@");
            keyOut.set(key.toString());
            String company_id = words[1];
            String unified_code = words[2];
            String year = words[7];
            String company = StringUtils.substringBetween(words[8], "<td>统一社会信用代码/注册号</td> <td>", "</td> <td>企业名称</td>")
                    .replaceAll("\"", "");
            String mailAddress = StringUtils.substringBetween(words[8], "<td>企业通信地址</td> <td>", "</td> <td>企业是否有投资信息")
                    .replaceAll("\"", "");
            if (!company.contains("td") && !mailAddress.contains("td")) {
                valueOut.set(key.toString() + '@' + company_id + '@' + unified_code + '@' + year + '@' + company + '@' + mailAddress);
                context.write(valueOut, NullWritable.get());
            }
        }
    }

    public static class CompanyReducer extends Reducer<Text, NullWritable, Text, NullWritable> {
        @Override
        protected void reduce(Text key, Iterable<NullWritable> values, Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
            // 防止相同数据丢失
            for (NullWritable value : values) {
                context.write(key, NullWritable.get());
            }
        }
    }
}
public class CompanyDriver {
    private static Tool tool;

    public static void main(String[] args) throws Exception {
        // 1. 创建配置文件
        Configuration conf = new Configuration();

        // 2. 判断是否有 tool 接口
        switch (args[0]) {
            case "company":
                tool = new Company();
                break;
            default:
                throw new RuntimeException(" No such tool: " + args[0]);
        }
        // 3. 用 Tool 执行程序
        // Arrays.copyOfRange 将老数组的元素放到新数组里面
        int run = ToolRunner.run(conf, tool, Arrays.copyOfRange(args, 1, args.length));
        System.exit(run);
    }
}

参数传递运行与先前文章一致,25.hadoop系列之Yarn Tool接口实现动态传参 不在重复,10分钟左右处理完毕,处理后约1.8G

4. ClickHouse ReplaceMergeTree实践

现在我们将处理后数据导入ClickHouse

4.1 创建表company_report及导入处理后的part-r-00000文件
CREATE TABLE etl.company_report (
    id      String,
    company_id      String,
    unified_code      String,
    year  String,
    company    String,
    mail_address   String
) ENGINE MergeTree()
PARTITION BY substring(unified_code, 2, 2) PRIMARY KEY (id) ORDER BY (id);
clickhouse-client --format_csv_delimiter="@" --input_format_with_names_use_header=0 --query="INSERT INTO etl.company_report FORMAT CSV" --host=192.168.0.222 --password=shenjian < part-r-00000
4.2 关联插入dwd_company表

在左连接的子查询中,我们取当前企业最新的年报中的通信地址,如下图所示

# 关联导入,可能DataGrip客户端超时,就在ClickHouse-Client命令行运行即可
INSERT INTO etl.dwd_company(district, ent_name, reg_addr, unified_code, authority, region_code, reg_addr1, province_code, city_code, province_name, city_name, region_name, mail_address)
SELECT district, ent_name, reg_addr, unified_code, authority, region_code, reg_addr1, province_code, city_code, province_name, city_name, region_name, cr.mail_address
FROM etl.dwd_company c
LEFT JOIN (
    SELECT unified_code, argMax(mail_address, year) mail_address, argMax(year, year) new_year FROM etl.company_report GROUP BY unified_code
)  cr ON c.unified_code=cr.unified_code
WHERE cr.mail_address!='' and cr.mail_address is not null;

这插入速度还行吧,插入后,存在两条记录,对于ReplaceMergeTree来说,无妨,看过之前文章的你应该很熟悉为啥了吧

4.3 清洗企业通信地址

新建字段mail_address1,剔除省市区前缀信息,列式存储,全量更新很快,请不要单条那种更新

ALTER TABLE etl.dwd_company update mail_address1=replaceRegexpAll(mail_address, '^(.{2,}(省|自治区))?(.{2,}市)?(.{2,}(区|县))?', '') WHERE 1=1

4.4 手动执行分区合并

如果线上对ClickHouse服务稳定性要求极高不建议这样操作,可能影响服务,可以参考9.ClickHouse系列之数据一致性保证

optimize table etl.dwd_company final;

后面可以将dwd_company中所需字段数据导入数据中间层dwm_company,略

欢迎关注公众号算法小生与我沟通交流

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/600418.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【shiro】shiro整合JWT——1.需要创建的类

前言 shiro整合JWT系列&#xff0c;主要记录核心思路–如何在shiroredis整合JWTToken。 该篇主要讲述整合JWT需要创建那些类&#xff0c;如下&#xff1a; JwtToken &#xff08;JWT实体类&#xff09;JwtUtil &#xff08;JWT工具类&#xff09;JwtFilter &#xff08;JWT拦…

IIS日志分析

一、下载IIS日志分析软件 地址如下&#xff1a; 开放网盘: 寄存一些分享出来的文件之类的东西 其中就是LogParser和LPS两个压缩文件 二、安装软件 1、需要先安装Log Parser 运行安装上面的文件。 2. 运行Log Parser Studio 在解压的LPSV2.D1文件夹中运行LPS.exe 出现下面…

BR 4P3040.00-490 标准PLC采用梯形逻辑编程

B&R 4P3040.00-490 奥地利贝加莱 电源面板 可编程逻辑控制器(Programmable Logic Controller)技术通常与梯形逻辑编程隔离通信——这是B&R迈出的一大步。B&R平台是基于PC的&#xff0c;这意味着您可以使用PLC系统中不常见的编程语言和功能。例如&#xff0c;可以用…

《架构设计》-09-分布式服务架构(注册中心、服务发布、服务调用、服务治理)

文章目录 1. 概述2. 集群容错策略3. 服务路由3.1 直接路由3.2 间接路由和注册中心3.3 路由规则3.4 服务路由/负载均衡/集群容错的关系 4. 服务发布4.1 发布启动器4.2 动态代理4.3 发布管理器4.4 协议服务器 5. 服务调用6. 服务治理 1. 概述 RPC架构的意义 解决了分布式环境下两…

chatgpt赋能python:Python写UDF对于SEO的影响

Python写UDF对于SEO的影响 作为一名有10年python编程经验的工程师&#xff0c;我对Python写UDF的优势深有体会。UDF&#xff08;User-Defined Functions&#xff09;是用户自定义函数的缩写&#xff0c;在数据处理和数据分析的过程中经常用到。下面我将介绍Python写UDF对于SEO…

渲染学生信息表

代码 <!DOCTYPE html> <html lang"en"> <head><meta charset"UTF-8"><meta http-equiv"X-UA-Compatible" content"IEedge"><meta name"viewport" content"widthdevice-width, initi…

MFC(六)框架理论

关键类 ,MFC中关键类有&#xff1a; CMFCAPP:最底层的类&#xff0c;也是最重要的类&#xff0c;统筹全局&#xff0c;管理DOCUMENT TEMPLATE CFRAMEWND:框架窗口&#xff0c;包括菜单栏、工具栏、状态栏等等&#xff0c;主要是负责窗口的布局 CVIEW:负责展示具体的数据 C…

chatgpt赋能python:Python内置变量介绍

Python内置变量介绍 Python是一种高级编程语言&#xff0c;具有简单易学、可读性强、可扩展性强等特点。在Python中&#xff0c;有许多内置变量&#xff08;built-in variables&#xff09;&#xff0c;以方便用户在编写程序时进行使用。本文将会对Python中的内置变量进行介绍…

基于SpringBoot+Vue的逍遥大药房管理系统设计与实现

博主介绍&#xff1a; 大家好&#xff0c;我是一名在Java圈混迹十余年的程序员&#xff0c;精通Java编程语言&#xff0c;同时也熟练掌握微信小程序、Python和Android等技术&#xff0c;能够为大家提供全方位的技术支持和交流。 我擅长在JavaWeb、SSH、SSM、SpringBoot等框架下…

干货,一文弄懂RF检波器那些事

WiFi、4G、蓝牙等各种无线连接技术的普及带动各种终端设备井喷式增长&#xff0c;包括物联网、可穿戴等各种基于无线连接技术的新兴产业迅速成长起来&#xff0c;各种无线信号链解决方案涌现推动这种热潮的持续发展。在无线信号链中&#xff0c;很久没有听到有人提起一个关键的…

快速开发和使用Android串口

一、什么是串口 串口叫做串行接口&#xff0c;也称串行通信接口&#xff0c;也可以叫做COM口&#xff0c;按电气标准及协议来分包括RS-232-C、RS-422、RS485、USB等。串行接口是指数据一位一位地顺序传送&#xff0c;其特点是通信线路简单&#xff0c;只要一对传输线就可以实现…

计算机视觉—YOLO V4

计算机视觉—YOLO V4 1、YOLO V41.1、网络结构1.1.1、BackBone&#xff1a;CSPDarknet531.1.2、Neck&#xff1a;SPP结构1.1.3、Neck&#xff1a;PAN结构1.1.4、YOLO v4整体结构 1.2、优化策略 1、YOLO V4 原论文下载地址&#xff1a;https://arxiv.org/abs/2004.10934 1.1、…

Windows中安装GCC教程

GCC的安装教程 GCC简介 GCC编译器通常在Linux系统下使用&#xff0c;一般来说大部分发行的系统会默认安装&#xff0c;GCC编译器使用gcc指令在终端进行shell操作。 对于新接触Linux的朋友来说&#xff0c;简单的在Windows中练习过渡一下应该就足够了。&#xff08;我就是因为…

Apache IoTDB 荣获国家网信办 2022 年中国开源创新大赛决赛一等奖,三位核心研发荣获表彰!...

项目获得权威认可&#xff01; 2023 年 5 月 15 日&#xff0c;2022 年中国开源创新大赛组委会对外公布“2022 中国互联网发展创新与投资大赛公益项目暨2022年中国开源创新大赛”决赛获奖名单&#xff0c;并于 2023 年 5 月 31 日在北京举办“2022年中国开源创新大赛总结发布活…

chatgpt赋能python:用Python编写FizzBuzz——解析最简单的编程题

用Python编写FizzBuzz——解析最简单的编程题 作为每个程序员的入门题目&#xff0c;FizzBuzz是一个简单但常见的问题。FizzBuzz要求我们用数字1到100来打印输出&#xff0c;但是当数字是3的倍数时&#xff0c;需要输出Fizz&#xff1b;当数字是5的倍数时&#xff0c;需要输出…

力扣高频SQL50题(基础版)——第三天

力扣高频SQL50题(基础版)——第三天 1 产品销售分析Ⅰ 1.1 题目内容 1.1.1 基本题目信息1 1.1.2 基本题目信息2 1.1.3 示例输入输出 1.2 示例sql语句 # Write your MySQL query statement below SELECT p.product_name,s.year,s.price FROM Sales s INNER JOIN Product p …

chatgpt赋能python:Python几次方函数介绍

Python几次方函数介绍 Python作为一门高级编程语言&#xff0c;具有丰富的数学函数库。其中&#xff0c;几次方函数在许多数值计算、数据分析和科学计算中都得到广泛应用。Python中的几次方函数有多种实现方式&#xff0c;包括内置函数pow()、运算符**、NumPy库的numpy.power(…

(3)NUC 980 kenerl编译

解压 用到的配置文件位置&#xff1a; /NUC980-linux-4.4.y-master/arch/arm/configs/nuc980_defconfig 执行&#xff1a; 编译linux内核源码。了解其 配置文件在 arch/arm/configs/nuc980_defconfig (1) make nuc980_defconfig 载入配置文件 (2) make menuconfig --->Devi…

机器龙的制作

1. 功能说明 本文示例将实现R326样机机器龙边张合嘴巴、边煽动翅膀、边摆动尾巴运动的功能。 2. 结构说明 本项目使用的机器龙样机是用可以用探索者零件或者探索者兼容零件制作。样机主要由头部模块、翅膀模块、尾巴模块、四足行走模块四部分组成。其中头部模块由2自由度并联关…

从元宇宙到生成式AI:炒作、现实和未来前景

不久前&#xff0c;科技界充斥着一种被称为元宇宙的未来主义概念。这个相互关联的虚拟现实空间宇宙&#xff0c;个人可以在模拟环境中进行交互&#xff0c;被誉为技术的未来。如今围绕元宇宙的炒作已经彻底失败了。技术重点现在已经转向生成AI&#xff0c;重点是像GPT-4和谷歌的…