多业务聚合查询设计思路与实践

news2025/1/12 6:55:23

文章目录

    • @[toc]
  • 1.需求
  • 2.方案
    • 2.1 方案架构图
    • 2.2 选用flink-cdc的原因
  • 3.实践
    • 3.1 环境准备
    • 3.3 es集群搭建
    • 3.4 flink1.14.0环境搭建
    • 3.5 准备sql和jar包
      • 3.5.1[创建mysql的flink用户并授权](https://ververica.github.io/flink-cdc-connectors/master/content/connectors/mysql-cdc.html)
      • 3.5.2 准备sql
      • 3.5.3准备jar包
      • 3.5.4 mysql数据库中需要准备products、orders两张表
      • 3.5.5 执行sql同步数据
  • 4.总结

1.需求

  最近接到一个需求,需要给客服部做一个把多个业务系统的的数据聚合到es里面做统一的搜索,一期需要把各个业务侧的订单数据同步到es中做聚合搜索,这个需求如果用传统的思维会在各个业务侧写一个xxl-job的定时任务,然后向es的各个业务索引中全量、增量的同步数据,这种传统的方案来说,不断需要各个业务侧写大量的业务代码,并且数据的时效性不高,做不到业务测试的业务侧的订单表的数据修改后,es的业务索引中的数据能立马也同步成最新的数据,如果有数据思维的话,这种需求能不写业务代码搞定就不要写业务代码,但是在传统的开发中采用的传统思维就会去写大量的业务代码,各个业务侧一套代码,由于各个业务侧的开发的水平参差不齐,维护成本高,代码质量层次不齐,时效性不高,还容易写出一些奇奇怪怪的坑,所以该设计一个什么样的方案呢?

2.方案

  方案灵感来源于如下两篇文章:

  基于 Flink CDC 构建 MySQL 和 Postgres 的 Streaming ETL

  基于 Flink CDC 实现海量数据的实时同步和转换

2.1 方案架构图

图片

  该方案使用flink-cdc对mysql的log-bin做监听做全量、增量和实时的数据同步,那来看下常见的cdc的方案的对比:

图片

2.2 选用flink-cdc的原因

1)架构是分布式的,避免了单点故障,提升了计算数据处理的效率。

2)支持全量、增量和实时的数据同步,避免了业务侧写各种各样的同步数据的业务代码。

3)生态丰富、易用、功能强大,社区活跃。

4)数据采集、计算、统计、打宽、转换等强大的功能,可以使用编程式的方式跑jar任务,也可以使用flink-sql强大的sql功能做数据提取统计分析这种实时的事情,同时可以和大数据生态结合使用,也可以单独使用解决业务开发的痛点问题。

5)flink-cdc2.x版本以上做了重大的更新,支持无锁化(不像1.x是有锁的,一个不小心就会锁数据库的表)、并发性能更高,所以2.x以上的版本就不用担心这个锁表的问题了。

3.实践

3.1 环境准备

  首先需要准备一个mysql的数据库,flink集群或者是单机和一个es集群或者es单机,本文都是采用Docker-Desktop来搭建的。

版本如下:

mysql5.7.16

es7.14.0

flink1.14.0

3.2 开启mysql的log-bin

  Mysql5.7.x镜像开启log-bin失效及解决

3.3 es集群搭建

  Docker部署ES集群、kibana、RabbitMq和chrome安装elasticsearch-head插件

3.4 flink1.14.0环境搭建

  flink1.14.0官网

  flink-playgrounds1.14.0官方git

  本文使用flink1.14.0搭建的环境,所以你也可以使用官方最新的稳定版本,可以使用flink1.17.0,flink的每个版本的差异还是有点大的,每个版本的写法都会有点不太一样,这个还有的注意下的。

  使用官方提供的docker-compose的部署文件做了修改:

图片

  部署文件docker-compose.yaml:

version: "2.1"
services:
  jobmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    command: "jobmanager.sh start-foreground"
    ports:
      - 8081:8081
    volumes:
      - ./conf:/opt/flink/conf
      - /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager
  taskmanager:
    image: apache/flink:1.14.4-scala_2.12-java8
    depends_on:
      - jobmanager
    command: "taskmanager.sh start-foreground"
    volumes:
      - ./conf:/opt/flink/conf
      - /tmp/flink-checkpoints-directory:/tmp/flink-checkpoints-directory
      - /tmp/flink-savepoints-directory:/tmp/flink-savepoints-directory
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  这里只部署了jobmanager、taskmanager没有要zookeeper和kafka

  修改配置文件:

  flink-conf.yaml

图片

  在该配置文件中加入如下配置:

classloader.resolve-order: parent-first

  如果不加这个配置会到导致在flink的节点上执行sql-client.sh的窗口执行sql任务的时候会报错:

图片

  这里也是一个大坑,这里也是请教的大佬才搞定的。

  部署命令:

git clone https://github.com/apache/flink-playgrounds.git
cd flink-playgrounds/operations-playground
docker-compose builddocker-compose up -d
# 强制全部重新创建会重新拉取镜像会很慢
docker-compose up --force-recreate -d
# 指定docker-compose文件后重新部署,只会重新部署新增的容器,不会全部删除后创建
docker-compose -f docker-compose.yaml up -d

3.5 准备sql和jar包

3.5.1创建mysql的flink用户并授权

图片

3.5.2 准备sql

cdc任务的sql:

Flink SQL> SET execution.checkpointing.interval = 3s; # 首先,开启 checkpoint,每隔3秒做一次 checkpoint,在执行如下sql前先执行该sql,设置保存点,每3s执行一次同步

-- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
     'connector' = 'mysql-cdc',
     'hostname' = 'xxxx',
     'port' = '3306',
     'username' = 'flink',
     'password' = '123456',
     'database-name' = 'mydb',
     'table-name' = 'products'
  );
  
Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
    'connector' = 'mysql-cdc',
     'hostname' = 'ip',
     'port' = '3306',
     'username' = 'flink',
     'password' = '123456',
     'database-name' = 'mydb',
     'table-name' = 'orders'
 );
 
# 最后,创建 enriched_orders 表, 用来将关联后的订单数据写入 Elasticsearch 中
-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://ip:port',
     'index' = 'enriched_orders'
 );
 
#关联订单数据并且将其写入 Elasticsearch 中使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
 SELECT o.*, p.name, p.description
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id;

jdbc任务sql:

# JDBC跑批
-- Flink SQL                   
Flink SQL> SET execution.checkpointing.interval = 3s; # 首先,开启 checkpoint,每隔3秒做一次 checkpoint

-- Flink SQL
Flink SQL> CREATE TABLE products (
    id INT,
    name STRING,
    description STRING,
    PRIMARY KEY (id) NOT ENFORCED
  ) WITH (
    'connector' = 'jdbc',
    'url' = 'jdbc:mysql://ip:3306/mydb',
    'table-name' = 'products',
	'username' = 'flink',
    'password' = '123456'
  );

Flink SQL> CREATE TABLE orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
    'url' = 'jdbc:mysql://ip:3306/mydb',
    'table-name' = 'orders',
	'username' = 'flink',
    'password' = '123456'
 );

-- Flink SQL
Flink SQL> CREATE TABLE enriched_orders (
   order_id INT,
   order_date TIMESTAMP(0),
   customer_name STRING,
   price DECIMAL(10, 5),
   product_id INT,
   order_status BOOLEAN,
   product_name STRING,
   product_description STRING,
   PRIMARY KEY (order_id) NOT ENFORCED
 ) WITH (
     'connector' = 'elasticsearch-7',
     'hosts' = 'http://ip:port',
     'index' = 'enriched_orders'
 );
 
# 关联订单数据并且将其写入 Elasticsearch 中使用 Flink SQL 将订单表 order 与 商品表 products,物流信息表 shipments 关联,并将关联后的订单信息写入 Elasticsearch 中
-- Flink SQL
Flink SQL> INSERT INTO enriched_orders
 SELECT o.*, p.name, p.description
 FROM orders AS o
 LEFT JOIN products AS p ON o.product_id = p.id;

  这里需要注意的是jdbc的sql任务和cdc-sql的任务最大的区别是:jdbc的sql任务执行一次就完结掉了,而cdc-sql的任务是一直在running,之前由于这里搞成jdbc的sql的方式了,然后去修改数据里面的数据,新增、修改和删除发现不会增量和实时同步,只会全量同步,最后是请教了一个大佬才茅塞顿开的,然后换成了cdc的方式就可以了。

3.5.3准备jar包

图片

  jar包需要拷贝的执行的节点上的如下目录里面:

图片

上传jar包然后重启集群的容器。

3.5.4 mysql数据库中需要准备products、orders两张表

图片

sql如下:

-- MySQL
CREATE DATABASE mydb;
USE mydb;
CREATE TABLE products (
  id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  name VARCHAR(255) NOT NULL,
  description VARCHAR(512)
);
ALTER TABLE products AUTO_INCREMENT = 101;

INSERT INTO products
VALUES (default,"scooter","Small 2-wheel scooter"),
       (default,"car battery","12V car battery"),
       (default,"12-pack drill bits","12-pack of drill bits with sizes ranging from #40 to #3"),
       (default,"hammer","12oz carpenter's hammer"),
       (default,"hammer","14oz carpenter's hammer"),
       (default,"hammer","16oz carpenter's hammer"),
       (default,"rocks","box of assorted rocks"),
       (default,"jacket","water resistent black wind breaker"),
       (default,"spare tire","24 inch spare tire");

CREATE TABLE orders (
  order_id INTEGER NOT NULL AUTO_INCREMENT PRIMARY KEY,
  order_date DATETIME NOT NULL,
  customer_name VARCHAR(255) NOT NULL,
  price DECIMAL(10, 5) NOT NULL,
  product_id INTEGER NOT NULL,
  order_status BOOLEAN NOT NULL -- Whether order has been placed
) AUTO_INCREMENT = 10001;

INSERT INTO orders
VALUES (default, '2020-07-30 10:08:22', 'Jark', 50.50, 102, false),
       (default, '2020-07-30 10:11:09', 'Sally', 15.00, 105, false),
       (default, '2020-07-30 12:00:30', 'Edward', 25.25, 106, false);

3.5.5 执行sql同步数据

图片

  进入到/opt/flink/bin路径下,执行如下命令进入sql管理后台:

./sql-client.sh

图片

  然后在这个界面顺序执行上面准备的cdc任务的sql,首先执行:

Flink SQL> SET execution.checkpointing.interval = 3s; 

  开启checkpoint,然后在去顺序执行建表sql和关联insert的sql将数据同步到es的索引中,

  mysql-cdc的insert的任务一直在执行中的:

图片

  es中同步的数据如下:

图片

  在mysql的mydb的数据中执行新增、修改和删除的操作,数据都可以实时更新到es的索引中。

4.总结

  到此实践分享已经完成,生产的话在设计下高可用啥的,设计下各个业务的数据模型,然后就可以用flink-cdc把各个业务的mysql数据库表中的数据全量、增量和实时的数据同步到es中做聚合检索了,es的操作使用开源的easy-es框架,让操作es更简单和高效,希望我的分享能帮助到你,文章虽然是开源,但是创作不易,请不要侵权抄袭,否则直接举报,最起码的版权意识还是要有的,你可以转载,但是请把原文地址放上去,最可恶的那种就是直接复制过去然后就变成他自己的原创了,这种行为真的是很可耻,非常让人讨厌、反感和紫火,请一键三连,么么哒!

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/634673.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【树形DP+直径思想】代码源每日一题div1 三进制循环

三进制循环 - 题目 - Daimayuan Online Judge 题意: 思路: 有点像树的直径 回顾一下我们是怎么求直径的:维护根节点到其子树上的点的最大距离和次大距离,然后答案就是统计所有结点的次大值最大值 的 最大值 这道题也是一样的&a…

C盘爆满时的几个救命无害清理技巧

其实网上也有很多清理C盘的方法 但是很多就是为了弄成空间 不讲原理 也不计后果 很可能坑惨小伙伴 可以看到 我电脑的C盘都已经读红条了 非常危险 对了 可能有些朋友的电脑上没有此电脑的选项 我们可以在桌面上右键选择 个性化 找到 主题 并选择 向下拉 找到 桌面图标设置 …

经验总结:13 条自动化测试框架设计原则!

1.代码规范 测试框架随着业务推进,必然会涉及代码的二次开发,所以代码编写应符合通用规范,代码命名符合业界标准,并且代码层次清晰。特别在大型项目、多人协作型项目中,如果代码没有良好的规范,那么整个框…

微服务之负载均衡

Informal Essay By English I wish the students of the college entrance examination can win the gold toad and win the title of the gold list 参考书籍:“凤凰架构” 负载均衡(load balance) 负载平衡是指在一组后端服务器&#xf…

《计算机网络——自顶向下方法》精炼——4.4.1-4.4.2

敬教劝学,建国之大本;兴贤育才,为政之先务。——《朱舜水集。劝学》 文章目录 IPv4编址接口IP地址子网 无类别域间路由选择(CIDR)获取一块地址获取主机地址网络地址转换 IPv4编址 接口 主机或路由器与物理链路的边界称作接口。一…

深度学习卷积神经网络CNN之ResNet模型网络详解说明(超详细理论篇)

1.ResNet背景 2. ResNet论文 3. ResNet模型结构 4. ResNet优缺点 一、ResNet背景 ResNet 在2015 年由微软研究院提出的一种深度卷积神经网络结构,在ILSVRC(ImageNet Large Scale Visual Recognition Challenge)中取得了冠军(分类…

python - kubernetes中grpc服务健康检查实现

概述 kubernetes本身不支持gRPC健康检查,本文记录使用 ‘grpc-health-probe’ 实现grpc服务的健康检查 ‘grpc-health-probe’,这是 Kubernetes 原生的健康检查 gRPC 应用程序的方法 官方参考文档:https://kubernetes.io/zh-cn/blog/2018/1…

45--Django-项目实战-全栈开发-基于django+drf+vue+elementUI企业级项目开发流程-纯手工安装部署和docker一键部署

前期准备: 购买服务器,公网地址访问 服务器有多种选择,阿里云,腾讯云,华为云(可以免费试用几个月) 买阿里云就当成你去电脑市场组装了一台电脑。阿里云按时间计费(账户余额要大于100)。 购买流程: 搜索云服务器 开始创建你的云服务器 按量付费:用多少扣多少 地域…

ORACLE PL/SQL编程总结(一)

目录 1.1 SQL与PL/SQL 1.2 PL/SQL的优点或特征 1.3 PL/SQL 可用的SQL语句 1.4 运行PL/SQL程序 2.1 PL/SQL块 2.2 PL/SQL结构 2.3 标识符 2.4 PL/SQL 变量类型 2.5 运算符和表达式(数据定义) 2.6 变量赋值 2.7 变量作用范围及可见性 2.8 注释 2.9 简单例子 3.1 条件…

算法刷题-哈希表-两数之和

两数之和 1. 两数之和思路总结其他语言版本 1. 两数之和 力扣题目链接 给定一个整数数组 nums 和一个目标值 target,请你在该数组中找出和为目标值的那 两个 整数,并返回他们的数组下标。 你可以假设每种输入只会对应一个答案。但是,数组中…

【Leetcode】DP | 买卖股票的最佳时机,DP居然还可以用状态机?

带状态的DP君~ 类型总结:买卖一次、买卖无限次、买卖k次、买卖无限次、含冷冻期。 买卖k次的问题需要不断统计、维护买卖i次的最大收益。 状态较多的题可以借助状态机分析状态转移情况。 121 买卖股票的最佳时机 统计第 i i i天之前的股票最低价格,…

性能测试项目实战:应用加载慢该怎么办?

背景 app收到留学push、课堂、资讯,用户点击push消息,进入app,应用加载很慢,容易出现应用假死、app崩溃或提示网络异常等信息。 给用户体验十分不友好,监控阿里云资源tcp连接数飙高,cpu打满&#xff0c…

Nginx运行原理与基本配置文件讲解

文章目录 Nginx基本运行原理Nginx的基本配置文件serverlocationroot 与 alias 的区别server 和 location 中的 rootnginx欢迎页 本文参考文章Nginx相关文章 Nginx基本运行原理 Nginx的进程是使用经典的「Master-Worker」模型,Nginx在启动后,会有一个master进程和多个…

docker-compose编排容器

系列文章目录 文章目录 系列文章目录一、docker-compose1.Docker Compose2.YAML 文件格式及编写注意事项3.安装docker-compose4.Docker Compose配置常用字段 二、创建compose1.准备依赖文件2. 总结 一、docker-compose 1.Docker Compose 如果需要定义多个容器就需要服务编排。…

priority_queue的模拟实现

前言 优先级队列听名字好像与队列有关,但是实际上,与队列没有很多关系,它也是容器适配器,是通过vector来适配的,但是里面又加入了堆的调整算法。跟栈和队列又有一些不同,了解它的实现对于我们更好的掌握它是…

新手上路,安全驾驶,做行车安全的第一责任人

目录 一、生活与汽车二、树立安全意识三、掌握驾驶经验四、参考材料 道路千万条,安全第一条,行车不规范,亲人两行泪。 ——《流浪地球》 一、生活与汽车 开车是为了节省在路途上花费的时间,片面的追求交通效率会引发交通安全问题&…

day8 栈顶的种类与应用

目录 多寄存器访问指令与寻址方式 多寄存器内存访问指令 多寄存器内存访问指令的寻址方式 ​编辑 栈的种类与使用 栈的概念 栈的分类 栈的应用举例 叶子函数的调用过程举例 多寄存器访问指令与寻址方式 多寄存器内存访问指令 MOV R1, #1 MOV R2, #2 MOV R3, #3 MOV R…

Redis 持久化存储机制:RDB 和 AOF

Redis(Remote Dictionary Server)是一个高性能的键值存储系统,它可以将数据存储在内存中以实现快速访问。为了保持数据的持久性,Redis 提供了两种数据持久化方法:RDB 和 AOF。 RDB(Redis Database&#xff…

spring源码-代码的特殊写法

spring源码-代码的特殊写法 前言 在阅读spring源码中,可能会有很多种代码写法在工作中都没遇见过,阅读起来有一定的难度,在本文中,我会把我认为有难度的代码写法拿出来,并举例子说明清楚,方便大家阅读并理…

股价暴涨59%后,美股二手车平台Carvana在短期内还会进一步上涨?

来源:猛兽财经 作者:猛兽财经 Carvana(CVNA)股票在财报发布近一个月后又重新开始出现了上涨。 仅6月9日就上涨了59%。 相对于纳斯达克综合指数的上涨幅度,Carvana今年迄今为止的上涨幅度已经比纳斯达克综合指数高出了约400%。 Carvana最…