基于docker安装flink

news2024/12/24 21:18:54

文章目录

  • 环境准备
    • Flink
      • docker-compose方式
      • 二进制部署
    • Kafka
    • Mysql
  • Flink 执行 SQL命令
    • 进入SQL客户端CLI
    • 执行SQL查询
      • 表格模式
      • 变更日志模式
      • Tableau模式
      • 窗口计算
    • 窗口计算
      • 滚动窗口demo
      • 滑动窗口
  • 踩坑

环境准备

Flink

docker-compose方式

version: "3"
services:
  jobmanager:
    image: flink:latest
    expose:
      - "6123"
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

  taskmanager:
    image: flink:latest
    expose:
      - "6121"
      - "6122"
    depends_on:
      - jobmanager
    command: taskmanager
    links:
      - "jobmanager:jobmanager"
    environment:
      - JOB_MANAGER_RPC_ADDRESS=jobmanager

前端访问地址: http://192.168.56.112:8081/#/overview

二进制部署

wget https://archive.apache.org/dist/flink/flink-1.13.3/flink-1.13.3-bin-scala_2.11.tgz

vim conf/flink-conf.yaml

jobmanager.rpc.address: 192.168.56.112 # 修改为本机ip

./bin/start-cluster.sh

Kafka

version: '2'
services:
  zookeeper:
    image: wurstmeister/zookeeper   ## 镜像
    ports:
      - "2181:2181"                 ## 对外暴露的端口号
  kafka:
    image: wurstmeister/kafka       ## 镜像
    volumes:
        - /etc/localtime:/etc/localtime ## 挂载位置(kafka镜像和宿主机器之间时间保持一直)
    ports:
      - "9092:9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.56.112    ## 修改:宿主机IP
      KAFKA_ZOOKEEPER_CONNECT: 192.168.56.112:2181       ## 卡夫卡运行是基于zookeeper的
  kafka-manager:
    image: sheepkiller/kafka-manager                ## 镜像:开源的web管理kafka集群的界面
    environment:
        ZK_HOSTS:                    ## 修改:宿主机IP
    ports:
      - "9000:9000"

Mysql

docker run -d -p3306:3306 --name=mysql57 -e MYSQL_ROOT_PASSWORD=111111 mysql:5.7

在这里插入图片描述

Flink 执行 SQL命令

进入SQL客户端CLI

docker exec  -it flink_jobmanager_1  /bin/bash

./bin/sql-client.sh

在这里插入图片描述

执行SQL查询

SELECT 'Hello World';

在这里插入图片描述

表格模式

表格模式(table mode)在内存中物化结果,并将结果用规则的分页表格的形式可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode = table;

可以使用如下查询语句查看不同模式的的运行结果:

SELECT name, COUNT(*) AS cnt FROM (VALUES ('Bob'), ('Alice'), ('Greg'), ('Bob')) AS NameTable(name) GROUP BY name;

在这里插入图片描述

变更日志模式

变更日志模式(changelog mode)不会物化结果。可视化展示由插入(+)和撤销(-)组成的持续查询结果流。

SET sql-client.execution.result-mode = changelog;

在这里插入图片描述

Tableau模式

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容取决于作业执行模式(execution.type):

SET sql-client.execution.result-mode = tableau;

在这里插入图片描述

注意:当你在流式查询上使用这种模式时,Flink 会将结果持续的打印在当前的控制台上。如果流式查询的输入是有限数据集,那么 Flink 在处理完所有的输入数据之后,作业会自动停止,同时控制台上的打印也会自动停止。如果你想提前结束这个查询,那么可以直接使用 CTRL-C 按键,这个会停止作业同时停止在控制台上的打印。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

窗口计算

TUMBLE(time_attr, interval) 定义一个滚动窗口。滚动窗口把行分配到有固定持续时间( interval )的不重叠的连续窗口。比如,5 分钟的滚动窗口以 5 分钟为间隔对行进行分组。滚动窗口可以定义在事件时间(批处理、流处理)或处理时间(流处理)上。

滚动窗口demo

根据订单信息使用kafka作为数据源表,JDBC作为数据结果表统计用户在5秒内的订单数量,并根据窗口的订单id和窗口开启时间作为主键,将结果实时统计到JDBC中:

  1. 在MySQL的flink数据库下创建表order_count,创建语句如下:
CREATE TABLE `flink`.`order_count` (
        `user_id` VARCHAR(32) NOT NULL,
        `window_start` TIMESTAMP NOT NULL,
        `window_end` TIMESTAMP NULL,
        `total_num` BIGINT UNSIGNED NULL,
        PRIMARY KEY (`user_id`, `window_start`)
)        ENGINE = InnoDB
        DEFAULT CHARACTER SET = utf8mb4
        COLLATE = utf8mb4_general_ci;
  1. 创建flink opensource sql作业,并提交运行作业
CREATE TABLE orders (
  order_id string,
  order_channel string,
  order_time timestamp(3),
  pay_amount double,
  real_pay double,
  pay_time string,
  user_id string,
  user_name string,
  area_id string,
  watermark for order_time as order_time - INTERVAL '3' SECOND
) WITH (
  'connector' = 'kafka',
  'topic' = 'order_topic',
  'properties.bootstrap.servers' = '192.168.56.112:9092',
  'properties.group.id' = 'order_group',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);

CREATE TABLE jdbcSink (
  user_id string,
  window_start timestamp(3),
  window_end timestamp(3),
  total_num BIGINT,
  primary key (user_id, window_start) not enforced
) WITH (
  'connector' = 'jdbc',
  'url' = 'jdbc:mysql://192.168.56.112:3306/flink',
  'table-name' = 'order_count',
  'username' = 'root',
  'password' = '111111',
  'sink.buffer-flush.max-rows' = '1'
);

SELECT 
    'WINDOW',-- window_start,window_end,
    group_key,record_num,create_time,
    SUM(record_num) OVER w AS sum_amount
FROM temp
WINDOW w AS (
  PARTITION BY group_key
  ORDER BY rowtime
  RANGE BETWEEN INTERVAL '10' SECOND PRECEDING AND CURRENT ROW)
  
select 
    user_id,
    TUMBLE_START(order_time, INTERVAL '5' SECOND),
    TUMBLE_END(order_time, INTERVAL '5' SECOND),
    COUNT(*) from orders
    GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
    
SELECT 
    'WINDOW',
    user_id,order_id,real_pay,order_time
    COUNT(*) OVER w AS sum_amount
FROM orders
WINDOW w AS (
  PARTITION BY user_id
  ORDER BY order_time
  RANGE BETWEEN INTERVAL '60' SECOND PRECEDING AND CURRENT ROW) 
    

insert into jdbcSink select 
    user_id,
    TUMBLE_START(order_time, INTERVAL '5' SECOND),
    TUMBLE_END(order_time, INTERVAL '5' SECOND),
    COUNT(*) from orders
    GROUP BY user_id, TUMBLE(order_time, INTERVAL '5' SECOND) having count(*) > 3;
  1. Kafka 相关操作
bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --list

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --create --replication-factor 1 --partitions 1 --topic order_topic

bin/kafka-console-producer.sh --broker-list 192.168.56.112:9092 --topic order_topic

bin/kafka-console-consumer.sh --bootstrap-server 192.168.56.112:9092 --topic order_topic --from-beginning

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --describe --topic order_topic 

bin/kafka-topics.sh --zookeeper 192.168.56.112:2181 --delete --topic order_topic 

发送数据样例

{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-09-26 15:20:11", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:28:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:29:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}
{"order_id":"202103241000000001", "order_channel":"webShop", "order_time":"2023-08-10 17:30:10", "pay_amount":"100.00", "real_pay":"100.00", "pay_time":"2021-03-24 10:02:03", "user_id":"0001", "user_name":"Alice", "area_id":"330106"}

滑动窗口

SELECT * FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND));

SELECT * FROM TABLE(
    HOP(
      DATA => TABLE orders,
      TIMECOL => DESCRIPTOR(order_time),
      SLIDE => INTERVAL '5' MINUTES,
      SIZE => INTERVAL '10' MINUTES));
      
      
SELECT window_start, window_end, SUM(pay_amount)
  FROM TABLE(
    HOP(TABLE orders, DESCRIPTOR(order_time), INTERVAL '2' SECOND, INTERVAL '10' SECOND))
  GROUP BY window_start, window_end;

踩坑

  1. Could not find any factory for identifier ‘kafka’ that implements ‘org.apache.flink.table.factories.DynamicTableFactory’ in the classpath.

查看flink version

flink-sql-connector-kafka-1.17.1.jar

https://mvnrepository.com/artifact/org.apache.flink/flink-sql-connector-kafka/1.17.1

下载对应版本jar,放到lib目录下,重启

  1. Could not find any factory for identifier ‘jdbc’ that implements 'org.apache.flink.table.factories.DynamicTableFactory
    flink-connector-jdbc-3.1.0-1.17.jar
    https://repo1.maven.org/maven2/org/apache/flink/flink-connector-jdbc/3.1.0-1.17/flink-connector-jdbc-3.1.0-1.17.jar

  2. Caused by: java.lang.ClassNotFoundException: com.mysql.jdbc.Driver

https://mvnrepository.com/artifact/com.mysql/mysql-connector-j/8.0.31

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/1655427.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

与时代同行,Build with AI 2024 线下活动五月再次开放报名

技术开发日新月异,软硬件迭代和应用场景多样化对开发者提出了更多挑战。面对科技发展潮流,GDG (谷歌开发者社区) 一直秉承开放共创的精神,以热忱之心与开发者们一同探索 AI 的广阔发展前景。 在过去的四月里,我们在北京、上海、深…

数据结构之单单单——链表

一.链表 1)链表的概念 链表(Linked List)是一种物理存储结构上非连续,非顺序的储存结构,数据元素的逻辑顺序是通过链表中指针链接次序实现的。要注意,链表也是线性表----->但链表在物理结构上不是线性的…

docker学习笔记(三)搭建NFS服务实验

目录 什么是NFS 简单架构​编辑 一.搭建nfs服务器 二.新建共享目录和网页文件 三.设置共享目录 四:创建使用nfs共享目录的卷 五:创建容器使用nfs-web-1卷 六:测试访问 七:是否同步测试 什么是NFS NFS 服务器:ne…

1688数据分析实操技巧||1688商品数据采集接口 数据分析

今天,聊一聊B2B平台的数据分析,以1688国内站为例。 1688平台数据接口 1688也属于阿里巴巴的体系,跟淘宝天猫运营很像,因此很多淘宝天猫的玩法调整后也适用于1688。数据分析也是如此。 在1688搞数据分析,搞数据化运营可…

【Ansible】ansible-playbook剧本

playbook 是ansible的脚本 playbook的组成 1)Tasks:任务;通过tasks 调用ansible 的模板将多个操作组织在一个playbook中运行 2)Variables:变量 3)Templates:模板 4)Handles&#xf…

Xilinx FPGA底层逻辑资源简介(1):关于LC,CLB,SLICE,LUT,FF的概念

LC:Logic Cell 逻辑单元 Logic Cell是Xilinx定义的一种标准,用于定义不同系列器件的大小。对于7系列芯片,通常在名字中就已经体现了LC的大小,在UG474中原话为: 对于7a75t芯片,LC的大小为75K,6输…

【YOLOv8改进[Backbone]】使用SCINet改进YOLOv8在黑暗环境的目标检测效果

目录 一 SCINet 1 本文方法 ① 权重共享的照明学习 ② 自校准模块 ③ 无监督训练损失 二 使用SCINet助力YOLOv8在黑暗环境的目标检测效果 1 整体修改 2 配置文件 3 训练 其他 一 SCINet 官方论文地址:https://arxiv.org/pdf/2204.10137 官方代码地址&…

01-单片机商业项目编程,从零搭建低功耗系统设计

一、引言 这是关于《单片机商业编程之从零搭建低功耗系统》的第一篇章,个人善忘,平常项目设计当中的一些思路,以前年轻的时候习惯性的录制成视频,也算是当作是自己的笔记,无奈现在喉咙实在扛不住,因此先尝试…

ElementUI Select选择器多选获取选中对象

html <el-form-item label"账户标签&#xff1a;" prop"tags"><el-selectstyle"width: 500px"value-key"tagId"v-model"form.tags"clearablefilterablemultipleplaceholder"请搜索选择账户标签"><…

SQL查询语句(四)模糊查询

前文介绍的查询语句&#xff0c;无论是利用常规的数学运算符&#xff0c;还是IN&#xff0c;BETWEEN和EXISTS等范围查询关键字&#xff0c;本质上都属于精确查询的范围&#xff0c;也就是说&#xff0c;我们在条件中写明了完全限定死的条件。而有些场景&#xff0c;我们的条件并…

《视觉十四讲》例程运行记录(1)—— 课本源码下载和3rdparty文件夹是空的解决办法

提示&#xff1a;文章写完后&#xff0c;目录可以自动生成&#xff0c;如何生成可参考右边的帮助文档 文章目录 前言一、第二版十四讲课本源码下载1. 安装git工具 二、Pangolin下载和安装1. 源码下载2. Pangolin的安装(1) 安装依赖项(2) 源码编译安装(2) 测试是否安装成功 二、…

PHPStudy 访问网页 403 Forbidden禁止访问

涉及靶场 upload-labd sqli-labs pikachu dvwa 以及所有部署在phpstudy中的靶场 注意&#xff1a;一定要安装解压软件 很多同学解压靶场代码以后访问报错的原因是&#xff1a;电脑上没有解压软件。 这个时候压缩包看起来就是黄色公文包的样子&#xff0c;右键只有“全部提取…

基于C语言中的类型转换,C++标准创造出了更加可视化的类型转换

目录 前言 一、 C语言中的类型转换 二、为什么C需要四种类型转换 三、C中新增的四种强制类型转换操作符以及它们的应用场景 1.static_cast 2.reinterpret_cast 3.const_cast 4.dynamic_cast 前言 在C语言中&#xff0c;如果赋值运算符左右两侧的类型不同&#xff0c;或者…

渗透之sql注入实战2(二次注入)

目录 平台地址&#xff1a; 开始&#xff1a; 方法1&#xff1a; 方法二 找提示 这里存在一个文件包含&#xff08;file&#xff09;。 爆源码 index.php源码 confirm.php源码&#xff1a; search.php源码&#xff1a; change.php源码&#xff1a; delete.php源码&…

【论文浅尝】Large Language Models for Generative Information Extraction: A Survey

本文对生成式IE的LLM进行了全面的探索。使用两种分类法对现有的代表性方法进行分类: (1)众多IE子任务的分类法&#xff0c;旨在对可以使用llm单独或统一提取的不同类型的信息进行分类; (2)学习范式分类法&#xff0c;对利用llm生成IE的各种新方法进行分类。 Preliminaries o…

学习Uni-app开发小程序Day3

经过五一长假&#xff0c;回过头在去看学习的东西&#xff0c;发现仍然是一筹莫展的&#xff0c;看来&#xff0c;学习是不能松懈的&#xff0c;得&#xff0c;自己在把以前的从头复习一遍&#xff0c;加深印象。今天在继续听课&#xff0c;但是出现一个问题&#xff0c;是黑码…

实践指南:如何将SpringBoot项目无缝部署到Tomcat服务器

序言 SpringBoot 是一个用来简化 Spring 应用初始搭建以及开发过程的框架&#xff0c;我们可以通过内置的 Tomcat 容器来轻松地运行我们的应用。但在生产环境中&#xff0c;我们可能需要将应用部署到独立的 Tomcat 服务器上。本文给大家介绍 SpringBoot 项目部署到独立 Tomcat…

Springboot+Vue项目-基于Java+MySQL的个人云盘管理系统(附源码+演示视频+LW)

大家好&#xff01;我是程序猿老A&#xff0c;感谢您阅读本文&#xff0c;欢迎一键三连哦。 &#x1f49e;当前专栏&#xff1a;Java毕业设计 精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; &#x1f380; Python毕业设计 &…

连接docker中的MySQL出现2058错误

出错场景&#xff1a;在虚拟机中用docker技术下载最新版本的MySQL&#xff0c;在本地电脑上连接发现出现2058错误。 解决方法&#xff1a; 按照以下步骤 1. 2. ALTER USER root% IDENTIFIED WITH mysql_native_password BY 自己MySQL的密码; 3.成功

如何查看公网IP开放端口?

在计算机网络中&#xff0c;公网IP是指能够直接访问互联网的IP地址&#xff0c;而开放端口则是指外部网络可以访问的服务端口。查看公网IP开放端口可以帮助我们了解当前网络环境中哪些服务可以被外部网络访问&#xff0c;对于网络安全和远程连接非常重要。 天联组网 天联组网是…