Flink使用总结

news2025/1/15 6:58:38

本文主要是为Flink的java客户端使用和flink-sql使用的大致介绍,具体使用查看文档页面。

java client使用

文档

Apache Flink Documentation | Apache Flink

数据处理模型

 maven依赖

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>flink_test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>
        <!-- Apache Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.15.4</version>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>1.15.4</version>
        </dependency>
        <!-- Apache Flink -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>1.15.4</version>
        </dependency>

        <!-- Kafka Client -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>

        <!--json-->
        <dependency>
            <groupId>org.json</groupId>
            <artifactId>json</artifactId>
            <version>20210307</version>
        </dependency>

        <!-- 解决 No ExecutorFactory found to execute the application-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>1.15.4</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-jdbc</artifactId>
            <version>1.15.4</version>
        </dependency>
        
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>8.0.25</version>
        </dependency>

    </dependencies>

<!--build fat jar-->
    <build>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <version>3.3.0</version>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.KafkaDataProcessor</mainClass>
                        </manifest>
                        <manifestEntries>
                            <Encoding>UTF-8</Encoding>
                        </manifestEntries>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <compilerArgs>
                        <arg>-Xlint:unchecked</arg>
                        <arg>-Xlint:deprecation</arg>
                    </compilerArgs>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

代码样例

读取kafka并打印结果

KafkaFlinkExample

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkExample {
    public static void main(String[] args) throws Exception {
        // 设置 Flink 程序的执行环境
        // StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.10.153:9092");
        props.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);
        env.addSource(consumer)
                .map(data -> "Received: " + data)
                .print();
        env.execute("Kafka Flink Example");
    }
}

处理kafka数据并保存结果入新的topic

KafkaDataProcessor

import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;

import java.util.Properties;

public class KafkaDataProcessor {
    public static void main(String[] args) throws Exception {
        // 设置 Flink 程序的执行环境
//        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        // 创建一个本地流执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

        // 设置 Kafka 的配置信息
        Properties properties = new Properties();
        properties.setProperty("bootstrap.servers", "192.168.10.153:9092");
        properties.setProperty("group.id", "flink-consumer-group");

        // 创建 Kafka 消费者,并从指定的 topic 中读取数据
        FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
        DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);

        // 将 JSON 数据解析并添加性别字段
        DataStream<String> processedDataStream = kafkaDataStream.map(new MapFunction<String, String>() {
            @Override
            public String map(String value) throws Exception {
                // 解析 JSON 数据
                JSONObject jsonObject = new JSONObject(value);
                String name = jsonObject.getString("name");
                int id = jsonObject.getInt("id");
                int age = jsonObject.getInt("age");

                // 根据姓名判断性别
                String gender;
                if (name.equals("jack")) {
                    gender = "male_xxx";
                } else {
                    gender = "female_xxx";
                }

                // 构造新的 JSON 数据
                JSONObject newJsonObject = new JSONObject();
                newJsonObject.put("name", name);
                newJsonObject.put("id", id);
                newJsonObject.put("age", age);
                newJsonObject.put("gender", gender);
                return newJsonObject.toString();
            }
        });

        // 创建 Kafka 生产者,并将新的数据写入指定的 topic
        FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);
        processedDataStream.addSink(kafkaProducer);
        // 执行程序
        env.execute("Kafka Data Processor");
    }
}

设置执行并行度

LocalWebUI 

import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;

import java.util.Properties;

public class LocalWebUI {
    public static void main(String[] args) throws Exception {
        //StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        Configuration configuration = new Configuration();
        //创建一个带webUI的本地执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
        int parallelism = env.getParallelism();

        Properties props = new Properties();
        props.setProperty("bootstrap.servers", "192.168.10.153:9092");
        props.setProperty("group.id", "test");
        FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);

        System.out.println("执行环境的并行度:" + parallelism);
//        DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);

        DataStreamSource<String> lines = env.addSource(consumer);

        int parallelism1 = lines.getParallelism();
        System.out.println("socketTextStream创建的DataStreamSource的并行度:" + parallelism1);
        SingleOutputStreamOperator<String> uppered = lines.map(line -> line.toUpperCase());
        int parallelism2 = uppered.getParallelism();
        System.out.println("调用完map方法得到的DataStream的并行度:" + parallelism2);
        DataStreamSink<String> print = uppered.print();
        int parallelism3 = print.getTransformation().getParallelism();
        System.out.println("调用完print方法得到的DataStreamSink的并行度:" + parallelism3);
        env.execute();
    }
}

本地执行

Flink可以和Spark类似,开发过程中,在本地临时执行,需要两个条件

1.需要flink-client依赖引入,否则会报No ExecutorFactory found to execute the application

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-clients</artifactId>
    <version>1.15.4</version>
</dependency>

2.设置flink的执行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();

 集群执行

打包

mvn clean package

提交任务

flink run -c com.KafkaDataProcessor /root/flink_test-1.0-SNAPSHOT-jar-with-dependencies.jar 

观察任务状态

Job---->>Running Jobs

 结束任务

Job---->>Running Jobs--->>点击任务---->>Cannel Job

 flink-sql

文档

https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/

启动客户端 

cd /root/flink-1.15.4/bin

./sql-client.sh

需求案例

汇总kafka数据,将结果保存入mysql中

依赖准备

mysql版本是8.0.25,flink版本是1.15.4,connector的版本一定要和flink版本保持一致所有集群节点的lib一定要保持一致然后重启

依赖下载位置:Central Repository:

mysql-connector-java-8.0.25.jar
flink-connector-jdbc-1.15.4.jar
kafka-clients-2.8.1.jar
flink-connector-kafka-1.15.4.jar

准备结果表

CREATE TABLE sync_test_1 (
  `day_time` varchar(64) NOT NULL,
  `total_gmv` bigint(11) DEFAULT NULL,
  PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;

sql配置

create table flink_test_1 ( 
  id BIGINT,
  day_time VARCHAR,
  amnount BIGINT,
  proctime AS PROCTIME ()
)
 with ( 
  'connector' = 'kafka',
  'topic' = 'flink_test',
  'properties.bootstrap.servers' = '192.168.10.153:9092', 
  'properties.group.id' = 'flink_gp_test1',
  'scan.startup.mode' = 'earliest-offset',
  'format' = 'json',
  'json.fail-on-missing-field' = 'false',
  'json.ignore-parse-errors' = 'true',
  'properties.zookeeper.connect' = '192.168.10.153:2181/kafka'
 );

CREATE TABLE sync_test_1 (
                   day_time string,
                   total_gmv bigint,
                   PRIMARY KEY (day_time) NOT ENFORCED
 ) WITH (
   'connector' = 'jdbc',
   'url' = 'jdbc:mysql://192.168.10.151:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
   'table-name' = 'sync_test_1',
   'username' = 'root',
   'password' = '123456'
 );

INSERT INTO sync_test_1 
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;

 测试数据

./bin/kafka-console-producer.sh --bootstrap-server 192.168.10.153:9092 --topic flink_test

{"day_time": "20201009","id": 7,"amnount":20}

查看数据结果

 

 

 来源:

docs/sql_demo/demo_1.md · 无情(朱慧培)/flink-streaming-platform-web - Gitee.com

flink-sql大量使用案例_flink sql使用_第一片心意的博客-CSDN博客

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/735477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【YOLOv8-Seg】实战二:LabVIEW+OpenVINO加速YOLOv8-seg实例分割

‍‍&#x1f3e1;博客主页&#xff1a; virobotics的CSDN博客&#xff1a;LabVIEW深度学习、人工智能博主 &#x1f384;所属专栏&#xff1a;『LabVIEW深度学习实战』 &#x1f37b;上期文章&#xff1a; 【YOLOv8-seg】实战一&#xff1a;手把手教你使用YOLOv8实现实例分割 …

【数据分析 - 基础入门之NumPy⑥】- NumPy案例巩固强化

文章目录 前言一、NumPy基础训练1.1 创建一个长度为10的一维全为0的ndarray对象&#xff0c;并让第5个元素为11.2 创建一个元素为从10到49的ndarray对象1.3 将第2题的所有元素位置反转1.4 创建一个10*10的ndarray对象并打印最大最小元素1.5 创建一个10*10的ndarray对象&#xf…

程序设计相关概念

计算机概念 计算机是根据指令操作数据的设备。具有功能性和可编程性的特点。 功能性&#xff1a;对数据的操作&#xff0c;表现为数据计算、输入输出处理和结果存储等。 可编程性&#xff1a;根据一系列指令自动地、可预测地、准确地完成操作者的意图。 计算机的发展 计算机…

【LVS负载均衡集群】

文章目录 一.什么是集群1.集群的含义 二.集群使用在那个场景三.集群的分类1.负载均衡器集群2.高可用集群3.高性能运算集群 四.负载集群的架构1.第一层&#xff0c;负载调度器2.第二层&#xff0c;服务器池3.第三层&#xff0c;共享存储 五.负载均衡集群的工作模式1.地址转换 &a…

Unity减少等待快速进入运行

我们平时播放时一旦修改了c#的脚本总要加载进行等待&#xff0c;网上也缺乏如何设置&#xff0c;以及为什么&#xff1f;这样做可以达到这样的效果。 ------如何设置&#xff1f;【默认并不会开启】 Edit->Project Settings->Editor->Enter Player Mode Options 这样…

企业为什么要做自动化测试?如何成功实施自动化测试?

目录&#xff1a;导读 前言一、Python编程入门到精通二、接口自动化项目实战三、Web自动化项目实战四、App自动化项目实战五、一线大厂简历六、测试开发DevOps体系七、常用自动化测试工具八、JMeter性能测试九、总结&#xff08;尾部小惊喜&#xff09; 前言 企业为什么需要自…

【LeetCode热题100】打卡第33天:环形链表LRU缓存

文章目录 【LeetCode热题100】打卡第33天&#xff1a;环形链表&LRU缓存⛅前言 环形链表&#x1f512;题目&#x1f511;题解 LRU缓存&#x1f512;题目&#x1f511;题解 【LeetCode热题100】打卡第33天&#xff1a;环形链表&LRU缓存 ⛅前言 大家好&#xff0c;我是知…

C++ 实现生产者消费者模型 (线程同步、互斥锁、条件变量锁)详细注释

代码结构 任务&#xff1a;这里用一个int类型的taskNumber代替任务任务队列类&#xff1a;封装了任务队列&#xff0c;存&#xff0c;取等操作。生产者工作函数&#xff1a;生产者执行的函数&#xff0c;向任务队列中添加任务&#xff0c;每个生产者生产3个任务消费者工作函数…

脱离产品怎么可能完成测试?

“脱离应用场景谈技术毫无意义”。其实很多东西都是如此&#xff0c;这个有点哲理的味道了。我们是做engineering&#xff0c;软件工程也是工程&#xff0c;工程的特点就是不能停留在理论和方法&#xff0c;最后要做出东西来&#xff0c;软的也好&#xff0c;硬的也好。 人有…

爬虫反反爬

目录 为什么要反爬&#xff1f; 经常被反爬的主要人群 常见的反爬策略 通过headers字段来反爬 通过headers中的User-Agent字段来反爬 通过referer字段或者是其他字段来反爬 通过cookie来反爬 通过请求参数来反爬 通过从html静态文件中获取请求数据(github登录数据) 通…

【Go】vscode 安装go环境gopls失败

项目场景&#xff1a; 想要在VSCode安装go环境&#xff0c;但是gopls下载失败&#xff0c;导致vscode无法使用language server 问题描述 自动下载失败&#xff0c;在打开命令面板&#xff08;CtrlshiftP&#xff09;之后&#xff0c;输入go install/update 下载也失败 $ g…

并发编程 - Event Bus 设计模式

文章目录 Pre设计CodeBus接口自定义注解 Subscribe同步EventBus异步EventBusSubscriber注册表RegistryEvent广播Dispatcher 测试简单的Subscriber同步Event Bus异步Event Bus 小结 Pre 我们在日常的工作中&#xff0c;都会使用到MQ这种组件&#xff0c; 某subscriber在消息中间…

PillarNext论文解读

这篇文章是轻舟智航23年的一篇论文&#xff0c;是对pillarNet进行改进。 改进方面&#xff1a; 1.训练更长的时间在检测头增加IOU预测score&#xff0c;这个iou分数预测不太清楚&#xff0c;不知道是不是iouloss 2.扩大感受野&#xff0c;包括Neck部分使用FPN或者BiFPN.使用…

3.zabbix操作二

文章目录 zabbix操作二部署zabbix代理服务器安装zabbix_proxy安装数据库配置代理服务器配置文件web端添加agent代理并连接主机 部署zabbix高可用群集zabbix监控Windows系统zabbix监控java应用zabbix监控SNMP zabbix操作二 部署zabbix代理服务器 分布式监控的作用&#xff1a;…

Flink web UI配置账号密码,权限控制

由于Flink自带的web UI界面没有账号密码&#xff0c;需要通过nginx实现该效果。 1.安装httpd-tools工具 yum install httpd-tools -y 2.生成用户名密码文件 htpasswd -c /usr/local/nginx/conf/flinkuser username passwd flinkuser&#xff1a;为生成的用户名密码文件名称 …

Apache Doris (二十一) :Doris Rollup物化索引创建与操作

目录 1. 创建测试表 2. 创建Rollup物化索引表 3. 查看Rollup物化索引表 4. 删除Rollup物化索引表 5. 验证Rollup物化索引使用 进入正文之前&#xff0c;欢迎订阅专题、对博文点赞、评论、收藏&#xff0c;关注IT贫道&#xff0c;获取高质量博客内容&#xff01; 宝子们点…

open3d 通过vscode+ssh连接远程服务器将可视化界面本地显示

当使用远程服务器时&#xff0c;我们希望能像在本地一样写完代码后能立刻出现一些gui窗口。但是目前网络上的资料都不能很好的解决这个问题。本文尝试尽可能简短地解决这个问题。 步骤 1、在服务器上安装open3d 已经非常简化了&#xff0c;可以使用一行代码完成 pip3 insta…

【Java从入门到大牛】方法详解

&#x1f525; 本文由 程序喵正在路上 原创&#xff0c;CSDN首发&#xff01; &#x1f496; 系列专栏&#xff1a;Java从入门到大牛 &#x1f320; 首发时间&#xff1a;2023年7月9日 &#x1f98b; 欢迎关注&#x1f5b1;点赞&#x1f44d;收藏&#x1f31f;留言&#x1f43e…

【计算机组成与体系结构Ⅰ】实验7 IP核的使用、D触发器

一、实验目的 1&#xff1a;学会设计用IP核和原理图的方式设计电路&#xff0c;完成涉及1位数据的2选1多路选择器。 2&#xff1a;设计带异步置零和写使能端的D触发器。 二、实验环境 软件&#xff1a;Vivado 2015.4操作系统&#xff1a;Windows 10 三、实验内容 2.2.1 多路…

49天精通Java,第38天,类加载器,双亲委派机制

目录 一、类加载器子系统的作用1、加载2、链接3、初始化 二、验证【虚拟机必须保证一个类的<clinit>()方法在多线程下被同步加锁】的代码实例三、类加载器的分类1、启动类加载器&#xff08;引导类加载器&#xff09;2、扩展类加载器3、应用程序类加载器&#xff08;系统…