本文主要是为Flink的java客户端使用和flink-sql使用的大致介绍,具体使用查看文档页面。
java client使用
文档
Apache Flink Documentation | Apache Flink
数据处理模型
maven依赖
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.example</groupId>
<artifactId>flink_test</artifactId>
<version>1.0-SNAPSHOT</version>
<dependencies>
<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>1.15.4</version>
</dependency>
<!-- Apache Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>1.15.4</version>
</dependency>
<!-- Kafka Client -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.1</version>
</dependency>
<!--json-->
<dependency>
<groupId>org.json</groupId>
<artifactId>json</artifactId>
<version>20210307</version>
</dependency>
<!-- 解决 No ExecutorFactory found to execute the application-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-jdbc</artifactId>
<version>1.15.4</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.25</version>
</dependency>
</dependencies>
<!--build fat jar-->
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<archive>
<manifest>
<mainClass>com.KafkaDataProcessor</mainClass>
</manifest>
<manifestEntries>
<Encoding>UTF-8</Encoding>
</manifestEntries>
</archive>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<compilerArgs>
<arg>-Xlint:unchecked</arg>
<arg>-Xlint:deprecation</arg>
</compilerArgs>
</configuration>
</plugin>
</plugins>
</build>
</project>
代码样例
读取kafka并打印结果
KafkaFlinkExample
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class KafkaFlinkExample {
public static void main(String[] args) throws Exception {
// 设置 Flink 程序的执行环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
final StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.10.153:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);
env.addSource(consumer)
.map(data -> "Received: " + data)
.print();
env.execute("Kafka Flink Example");
}
}
处理kafka数据并保存结果入新的topic
KafkaDataProcessor
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
import org.apache.flink.streaming.util.serialization.SimpleStringSchema;
import org.json.JSONObject;
import java.util.Properties;
public class KafkaDataProcessor {
public static void main(String[] args) throws Exception {
// 设置 Flink 程序的执行环境
// StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建一个本地流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
// 设置 Kafka 的配置信息
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "192.168.10.153:9092");
properties.setProperty("group.id", "flink-consumer-group");
// 创建 Kafka 消费者,并从指定的 topic 中读取数据
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), properties);
DataStream<String> kafkaDataStream = env.addSource(kafkaConsumer);
// 将 JSON 数据解析并添加性别字段
DataStream<String> processedDataStream = kafkaDataStream.map(new MapFunction<String, String>() {
@Override
public String map(String value) throws Exception {
// 解析 JSON 数据
JSONObject jsonObject = new JSONObject(value);
String name = jsonObject.getString("name");
int id = jsonObject.getInt("id");
int age = jsonObject.getInt("age");
// 根据姓名判断性别
String gender;
if (name.equals("jack")) {
gender = "male_xxx";
} else {
gender = "female_xxx";
}
// 构造新的 JSON 数据
JSONObject newJsonObject = new JSONObject();
newJsonObject.put("name", name);
newJsonObject.put("id", id);
newJsonObject.put("age", age);
newJsonObject.put("gender", gender);
return newJsonObject.toString();
}
});
// 创建 Kafka 生产者,并将新的数据写入指定的 topic
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<>("output-topic", new SimpleStringSchema(), properties);
processedDataStream.addSink(kafkaProducer);
// 执行程序
env.execute("Kafka Data Processor");
}
}
设置执行并行度
LocalWebUI
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import java.util.Properties;
public class LocalWebUI {
public static void main(String[] args) throws Exception {
//StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration configuration = new Configuration();
//创建一个带webUI的本地执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(configuration);
int parallelism = env.getParallelism();
Properties props = new Properties();
props.setProperty("bootstrap.servers", "192.168.10.153:9092");
props.setProperty("group.id", "test");
FlinkKafkaConsumer<String> consumer = new FlinkKafkaConsumer<>("input-topic", new SimpleStringSchema(), props);
System.out.println("执行环境的并行度:" + parallelism);
// DataStreamSource<String> lines = env.socketTextStream("localhost", 8888);
DataStreamSource<String> lines = env.addSource(consumer);
int parallelism1 = lines.getParallelism();
System.out.println("socketTextStream创建的DataStreamSource的并行度:" + parallelism1);
SingleOutputStreamOperator<String> uppered = lines.map(line -> line.toUpperCase());
int parallelism2 = uppered.getParallelism();
System.out.println("调用完map方法得到的DataStream的并行度:" + parallelism2);
DataStreamSink<String> print = uppered.print();
int parallelism3 = print.getTransformation().getParallelism();
System.out.println("调用完print方法得到的DataStreamSink的并行度:" + parallelism3);
env.execute();
}
}
本地执行
Flink可以和Spark类似,开发过程中,在本地临时执行,需要两个条件
1.需要flink-client依赖引入,否则会报No ExecutorFactory found to execute the application
<dependency> <groupId>org.apache.flink</groupId> <artifactId>flink-clients</artifactId> <version>1.15.4</version> </dependency>
2.设置flink的执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
集群执行
打包
mvn clean package
提交任务
flink run -c com.KafkaDataProcessor /root/flink_test-1.0-SNAPSHOT-jar-with-dependencies.jar
观察任务状态
Job---->>Running Jobs
结束任务
Job---->>Running Jobs--->>点击任务---->>Cannel Job
flink-sql
文档
https://nightlies.apache.org/flink/flink-docs-release-1.14/zh/docs/dev/table/common/
启动客户端
cd /root/flink-1.15.4/bin
./sql-client.sh
需求案例
汇总kafka数据,将结果保存入mysql中
依赖准备
mysql版本是8.0.25,flink版本是1.15.4,connector的版本一定要和flink版本保持一致。所有集群节点的lib一定要保持一致然后重启。
依赖下载位置:Central Repository:
mysql-connector-java-8.0.25.jar
flink-connector-jdbc-1.15.4.jar
kafka-clients-2.8.1.jar
flink-connector-kafka-1.15.4.jar
准备结果表
CREATE TABLE sync_test_1 (
`day_time` varchar(64) NOT NULL,
`total_gmv` bigint(11) DEFAULT NULL,
PRIMARY KEY (`day_time`)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=utf8mb4;
sql配置
create table flink_test_1 (
id BIGINT,
day_time VARCHAR,
amnount BIGINT,
proctime AS PROCTIME ()
)
with (
'connector' = 'kafka',
'topic' = 'flink_test',
'properties.bootstrap.servers' = '192.168.10.153:9092',
'properties.group.id' = 'flink_gp_test1',
'scan.startup.mode' = 'earliest-offset',
'format' = 'json',
'json.fail-on-missing-field' = 'false',
'json.ignore-parse-errors' = 'true',
'properties.zookeeper.connect' = '192.168.10.153:2181/kafka'
);
CREATE TABLE sync_test_1 (
day_time string,
total_gmv bigint,
PRIMARY KEY (day_time) NOT ENFORCED
) WITH (
'connector' = 'jdbc',
'url' = 'jdbc:mysql://192.168.10.151:3306/flink_web?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=false',
'table-name' = 'sync_test_1',
'username' = 'root',
'password' = '123456'
);
INSERT INTO sync_test_1
SELECT day_time,SUM(amnount) AS total_gmv
FROM flink_test_1
GROUP BY day_time;
测试数据
./bin/kafka-console-producer.sh --bootstrap-server 192.168.10.153:9092 --topic flink_test
{"day_time": "20201009","id": 7,"amnount":20}
查看数据结果
来源:
docs/sql_demo/demo_1.md · 无情(朱慧培)/flink-streaming-platform-web - Gitee.com
flink-sql大量使用案例_flink sql使用_第一片心意的博客-CSDN博客