Flink基本概念
首先来讲,Flink是一个面向数据流处理和批处理的分布式开源计算框架。
那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流
无界流VS有界流
任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。
Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用
流数据分为无界流和有界流。
- 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
- 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。
组件结构
DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。
Flink基础Demo案例
1、基本环境搭建
pom.xml核心配置
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.11.2</flink.version>
<java.version>1.8</java.version>
<scala.binary.version>2.11</scala.binary.version>
<maven.compiler.source>${java.version}</maven.compiler.source>
<maven.compiler.target>${java.version}</maven.compiler.target>
<log4j.version>2.12.1</log4j.version>
<spring.boot.version>2.1.6.RELEASE</spring.boot.version>
<mysql.jdbc.version>5.1.47</mysql.jdbc.version>
</properties>
<dependencies>
<!-- Flink 的示例和常用工具类 -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--Flink 的流处理核心库-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--Flink 的客户端库,用于提交作业等-->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_${scala.binary.version}</artifactId>
<version>${flink.version}</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.8</version>
</dependency>
</dependencies>
2、批处理Demo实现
这个demo实现-----通过批处理方式,统计日志文件中的异常数量。
文件准备order_info.log,文件内容如下
2019-08-25 16:32:55,626 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:56,918 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:32:57,829 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,834 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,847 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
2019-08-25 16:32:57,858 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,859 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,870 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:32:57,908 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:57,928 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:58,144 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:32:58,155 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:32:58,162 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:32:58,176 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:32:58,212 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:58,267 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:32:58,269 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:32:58,280 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
at io.seata.config.ConfigType.getType(ConfigType.java:62)
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
... 35 common frames omitted
2019-08-25 16:36:05,248 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:06,559 [main] INFO [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:36:07,554 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,555 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,568 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
2019-08-25 16:36:07,581 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,583 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,595 [main] INFO [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:36:07,639 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,661 [main] WARN [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,919 [main] INFO [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:36:07,930 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:36:07,937 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:36:07,944 [main] INFO [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:36:07,987 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:10,247 [configOperate_1_2] WARN [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:36:14,137 [main] WARN [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:36:14,150 [main] INFO [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)
at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
at io.seata.config.ConfigType.getType(ConfigType.java:62)
at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
... 35 common frames omitted
2019-08-25 16:36:21,421 [main] INFO [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean
代码实现
public class BatchProcessorApplication {
public static void main(String[] args) throws Exception{
//1.定义Flink运行环境
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
//2.读取数据源(日志文件信息)
DataSource<String> logData = env.readTextFile("./data/order_info.log");
//3.清洗转换数据
logData.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
// 1) 根据正则,提取每行日志的级别
Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");
Matcher matcher = pattern.matcher(value);
if (matcher.find()){
// 2) 如果匹配符合规则,放置元组,输出数据
collector.collect(new Tuple2<>(matcher.group(1).trim(),1));
}
}
//groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和
}).groupBy(0).sum(1).print();
}
}
3、流处理Demo实现
本地模拟socket请求
代码实现
public class StreamProcessorApplication {
public static void main(String[] args) throws Exception{
//1.定义Flink执行环境
StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();
//2.从socket数据流中读取实时流数据
DataStreamSource<String> dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999);
dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
@Override
public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
String[] split = line.split("\t");
collector.collect(new Tuple2<>(split[0],1));
}
// setParallelism设置并行流计算有多少个线程
}).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);
streamEnv.execute();
}
}
Flink部署安装配置
首先确保你的Linux系统已经安装好了JDK
解压flink安装包
tar -zxvf flink-1.11.2-bin-scala_2.11.tgz
进入flink配置目录
masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。
flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。
其中的常见配置:
-
jobmanager.rpc.address
:- 描述: 指定 JobManager 的主机名或IP地址。
- 示例:
jobmanager.rpc.address: localhost
-
jobmanager.rpc.port
:- 描述: 指定 JobManager 的RPC端口号。
- 示例:
jobmanager.rpc.port: 6123
-
jobmanager.memory.process.size
:- 描述: 指定 JobManager 的总内存大小。
- 示例:
jobmanager.memory.process.size: 1600m
-
taskmanager.memory.process.size
:- 描述: 指定 TaskManager 的总内存大小。
- 示例:
taskmanager.memory.process.size: 1600m
-
taskmanager.numberOfTaskSlots
:- 描述: 指定每个 TaskManager 的任务槽位数。
- 示例:
taskmanager.numberOfTaskSlots: 4
-
parallelism.default
:- 描述: 指定默认的并行度。
- 示例:
parallelism.default: 4
-
state.backend
:- 描述: 指定状态后端,可以选择
MemoryStateBackend
、FsStateBackend
或RocksDBStateBackend
。 - 示例:
state.backend: rocksdb
- 描述: 指定状态后端,可以选择
-
rest.address
:- 描述: 指定 REST API 的主机名或IP地址。
- 示例:
rest.address: localhost
-
rest.port
:- 描述: 指定 REST API 的端口号。
- 示例:
rest.port: 8081
启动flink,进入到bin目录
访问8081端口
到此flink安装完毕
Flink任务提交
在pom.xml文件中配置打包插件
<build>
<plugins> <!-- 编译插件 -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.5.1</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<!--<encoding>${project.build.sourceEncoding} </encoding>-->
</configuration>
</plugin>
<!-- 打jar包插件(会包含所有依赖) -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<filters>
<filter>
<artifact>*:*</artifact>
<excludes>
<!-- zip -d learn_spark.jar META- INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
<exclude>META-INF/*.SF</exclude>
<exclude>META-INF/*.DSA</exclude>
<exclude>META-INF/*.RSA</exclude>
</excludes>
</filter>
</filters>
<transformers>
<transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!-- 可以设置jar包的入口类(可选) -->
<mainClass>com.demo.flink.usage.stream.StreamProcessorApplication</mainClass>
</transformer>
</transformers>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
点击package打成jar包
界面提交
上传jar包,信息无误后,点击提交
在这里就能看到输出结果了
命令行提交
上传jar包至linux
进入flink的bin目录
执行命令
./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar