零基础入门Flink,掌握基本使用方法

news2024/11/22 0:23:48

Flink基本概念

首先来讲,Flink是一个面向数据流处理批处理的分布式开源计算框架。

那么,流处理和批处理分别处理什么样的数据呢,这就涉及两个概念-无界流和有界流

无界流VS有界流

任何类型的数据都可以形成流数据,比如用户交互记录, 传感器数据,事件日志等等。

Apache Flink 擅长处理无界和有界数据集。 精确的时间控制和有状态的计算,使得 Flink能够运行
任何处理无界流的应用

流数据分为无界流和有界流。

  • 1) 无界流:有定义流的开始,但没有定义流的结束, 会不停地产生数据,无界流采用的是流处理方式。
  • 2) 有界流:有定义流的开始, 也有定义流的结束, 需要在获取所有数据后再进行计算,有界流采用的是批处理方式。

组件结构

DataSet 一般用来处理有界流数据。
DataStream一般用来处理无界流数据。

Flink基础Demo案例

1、基本环境搭建

pom.xml核心配置

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.11.2</flink.version>
        <java.version>1.8</java.version>
        <scala.binary.version>2.11</scala.binary.version>
        <maven.compiler.source>${java.version}</maven.compiler.source>
        <maven.compiler.target>${java.version}</maven.compiler.target>
        <log4j.version>2.12.1</log4j.version>
        <spring.boot.version>2.1.6.RELEASE</spring.boot.version>
        <mysql.jdbc.version>5.1.47</mysql.jdbc.version>
    </properties>

    <dependencies>
        <!-- Flink 的示例和常用工具类 -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-walkthrough-common_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--Flink 的流处理核心库-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <!--Flink 的客户端库,用于提交作业等-->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_${scala.binary.version}</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <!--lombok-->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <version>1.18.8</version>
        </dependency>
    </dependencies>

2、批处理Demo实现

这个demo实现-----通过批处理方式,统计日志文件中的异常数量。

文件准备order_info.log,文件内容如下

2019-08-25 16:32:55,626 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$e1b53a9c] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:56,918 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:32:57,829 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,834 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,847 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 5ms. Found 0 repository interfaces.
2019-08-25 16:32:57,858 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:32:57,859 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:32:57,870 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:32:57,908 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:57,928 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:32:58,144 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:32:58,155 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:32:58,162 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:32:58,176 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:32:58,212 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:32:58,267 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:32:58,269 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:32:58,280 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:32:58,289 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
	at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
	at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
	at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
	... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
	at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
	at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
	at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.CGLIB$globalTransactionScanner$0(<generated>)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba$$FastClassBySpringCGLIB$$ec5dcab5.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$1c7426ba.globalTransactionScanner(<generated>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
	at io.seata.config.ConfigType.getType(ConfigType.java:62)
	at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
	... 35 common frames omitted
2019-08-25 16:36:05,248 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration' of type [org.springframework.cloud.autoconfigure.ConfigurationPropertiesRebinderAutoConfiguration$$EnhancerBySpringCGLIB$$ed668c12] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:06,559 [main] INFO  [c.i.t.s.pending.startup.StockPendingApplication] SpringApplication.java:650 - The following profiles are active: dev
2019-08-25 16:36:07,554 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,555 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,568 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 6ms. Found 0 repository interfaces.
2019-08-25 16:36:07,581 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:244 - Multiple Spring Data modules found, entering strict repository configuration mode!
2019-08-25 16:36:07,583 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:126 - Bootstrapping Spring Data repositories in DEFAULT mode.
2019-08-25 16:36:07,595 [main] INFO  [o.s.d.r.config.RepositoryConfigurationDelegate] RepositoryConfigurationDelegate.java:182 - Finished Spring Data repository scanning in 2ms. Found 0 repository interfaces.
2019-08-25 16:36:07,639 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'nacos-discovery' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,661 [main] WARN  [o.springframework.boot.actuate.endpoint.EndpointId] EndpointId.java:131 - Endpoint ID 'service-registry' contains invalid characters, please migrate to a valid format.
2019-08-25 16:36:07,919 [main] INFO  [o.springframework.cloud.context.scope.GenericScope] GenericScope.java:295 - BeanFactory id=62078519-08a5-3bd1-9959-a8e32b7d3ccd
2019-08-25 16:36:07,930 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:193 - No bean named 'errorChannel' has been explicitly defined. Therefore, a default PublishSubscribeChannel will be created.
2019-08-25 16:36:07,937 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:280 - No bean named 'taskScheduler' has been explicitly defined. Therefore, a default ThreadPoolTaskScheduler will be created.
2019-08-25 16:36:07,944 [main] INFO  [o.s.i.c.DefaultConfiguringBeanFactoryPostProcessor] DefaultConfiguringBeanFactoryPostProcessor.java:431 - No bean named 'integrationHeaderChannelRegistry' has been explicitly defined. Therefore, a default DefaultHeaderChannelRegistry will be created.
2019-08-25 16:36:07,987 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean 'seataConfiguration' of type [com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830] is not eligible for getting processed by all BeanPostProcessors (for example: not eligible for auto-proxying)
2019-08-25 16:36:10,247 [configOperate_1_2] WARN  [io.seata.config.FileConfiguration] FileConfiguration.java:207 - Could not found property config.type, try to use default value instead.
2019-08-25 16:36:14,137 [main] WARN  [o.s.b.w.s.c.AnnotationConfigServletWebServerApplicationContext] AbstractApplicationContext.java:557 - Exception encountered during context initialization - cancelling refresh attempt: org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
2019-08-25 16:36:14,150 [main] INFO  [o.s.b.a.l.ConditionEvaluationReportLoggingListener] ConditionEvaluationReportLoggingListener.java:135 -

Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-08-25 16:36:14,161 [main] ERROR [org.springframework.boot.SpringApplication] SpringApplication.java:821 - Application run failed
org.springframework.beans.factory.BeanCreationException: Error creating bean with name 'globalTransactionScanner' defined in class path resource [com/itcast/trade/stock/pending/config/SeataConfiguration.class]: Bean instantiation via factory method failed; nested exception is org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:627)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiateUsingFactoryMethod(ConstructorResolver.java:456)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.instantiateUsingFactoryMethod(AbstractAutowireCapableBeanFactory.java:1321)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBeanInstance(AbstractAutowireCapableBeanFactory.java:1160)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.doCreateBean(AbstractAutowireCapableBeanFactory.java:555)
	at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.createBean(AbstractAutowireCapableBeanFactory.java:515)
	at org.springframework.beans.factory.support.AbstractBeanFactory.lambda$doGetBean$0(AbstractBeanFactory.java:320)
	at org.springframework.beans.factory.support.DefaultSingletonBeanRegistry.getSingleton(DefaultSingletonBeanRegistry.java:222)
	at org.springframework.beans.factory.support.AbstractBeanFactory.doGetBean(AbstractBeanFactory.java:318)
	at org.springframework.beans.factory.support.AbstractBeanFactory.getBean(AbstractBeanFactory.java:204)
	at org.springframework.context.support.PostProcessorRegistrationDelegate.registerBeanPostProcessors(PostProcessorRegistrationDelegate.java:228)
	at org.springframework.context.support.AbstractApplicationContext.registerBeanPostProcessors(AbstractApplicationContext.java:721)
	at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:534)
	at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140)
	at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:742)
	at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:389)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:311)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1213)
	at org.springframework.boot.SpringApplication.run(SpringApplication.java:1202)
	at com.itcast.trade.stock.pending.startup.StockPendingApplication.main(StockPendingApplication.java:23)
Caused by: org.springframework.beans.BeanInstantiationException: Failed to instantiate [io.seata.spring.annotation.GlobalTransactionScanner]: Factory method 'globalTransactionScanner' threw exception; nested exception is io.seata.common.exception.NotSupportYetException: not support register type: null
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:185)
	at org.springframework.beans.factory.support.ConstructorResolver.instantiate(ConstructorResolver.java:622)
	... 19 common frames omitted
Caused by: io.seata.common.exception.NotSupportYetException: not support register type: null
	at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:84)
	at io.seata.config.ConfigurationFactory.getInstance(ConfigurationFactory.java:68)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:81)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:124)
	at io.seata.spring.annotation.GlobalTransactionScanner.<init>(GlobalTransactionScanner.java:113)
	at com.itcast.trade.stock.pending.config.SeataConfiguration.globalTransactionScanner(SeataConfiguration.java:38)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.CGLIB$globalTransactionScanner$0(<generated>)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830$$FastClassBySpringCGLIB$$691b278a.invoke(<generated>)
	at org.springframework.cglib.proxy.MethodProxy.invokeSuper(MethodProxy.java:244)
	at org.springframework.context.annotation.ConfigurationClassEnhancer$BeanMethodInterceptor.intercept(ConfigurationClassEnhancer.java:363)
	at com.itcast.trade.stock.pending.config.SeataConfiguration$$EnhancerBySpringCGLIB$$28257830.globalTransactionScanner(<generated>)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.beans.factory.support.SimpleInstantiationStrategy.instantiate(SimpleInstantiationStrategy.java:154)
	... 20 common frames omitted
Caused by: java.lang.IllegalArgumentException: illegal type:null
	at io.seata.config.ConfigType.getType(ConfigType.java:62)
	at io.seata.config.ConfigurationFactory.buildConfiguration(ConfigurationFactory.java:82)
	... 35 common frames omitted
2019-08-25 16:36:21,421 [main] INFO  [o.s.c.s.PostProcessorRegistrationDelegate$BeanPostProcessorChecker] PostProcessorRegistrationDelegate.java:330 - Bean

 代码实现

public class BatchProcessorApplication {
    public static void main(String[] args) throws Exception{
        //1.定义Flink运行环境
        ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        //2.读取数据源(日志文件信息)
        DataSource<String> logData = env.readTextFile("./data/order_info.log");

        //3.清洗转换数据
        logData.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String value, Collector<Tuple2<String, Integer>> collector) throws Exception {
                // 1) 根据正则,提取每行日志的级别
                Pattern pattern = Pattern.compile("\\[main\\](.*?)\\[");
                Matcher matcher = pattern.matcher(value);
                if (matcher.find()){
                    // 2) 如果匹配符合规则,放置元组,输出数据
                    collector.collect(new Tuple2<>(matcher.group(1).trim(),1));
                }
            }
            //groupBy(0)代表对collector中每个tuple2的进行分组,sum(1)代表对tuple2中的Integer进行求和
        }).groupBy(0).sum(1).print();
    }
}

3、流处理Demo实现

本地模拟socket请求

代码实现

public class StreamProcessorApplication {

    public static void main(String[] args) throws Exception{
        //1.定义Flink执行环境
        StreamExecutionEnvironment streamEnv = StreamExecutionEnvironment.getExecutionEnvironment();

        //2.从socket数据流中读取实时流数据
        DataStreamSource<String> dataStreamSource = streamEnv.socketTextStream("127.0.0.1", 9999);

        dataStreamSource.flatMap(new FlatMapFunction<String, Tuple2<String,Integer>>() {
            @Override
            public void flatMap(String line, Collector<Tuple2<String, Integer>> collector) throws Exception {
                String[] split = line.split("\t");
                collector.collect(new Tuple2<>(split[0],1));
            }
            // setParallelism设置并行流计算有多少个线程
        }).keyBy(0).timeWindow(Time.seconds(5)).sum(1).print().setParallelism(1);

        streamEnv.execute();
    }

}

Flink部署安装配置

首先确保你的Linux系统已经安装好了JDK

解压flink安装包

tar -zxvf flink-1.11.2-bin-scala_2.11.tgz

进入flink配置目录

masters 文件用于指定 Flink 集群中的主节点(JobManager)地址。它帮助集群中的各个节点知道主节点的位置,从而能够正确地连接和通信。

 flink-conf.yaml 文件是 Flink 的主要配置文件,用于配置 Flink 集群的各种参数。这个文件包含了丰富的配置选项,涵盖了从基本的集群设置到高级的性能调优。

其中的常见配置:

  • jobmanager.rpc.address:

    • 描述: 指定 JobManager 的主机名或IP地址。
    • 示例jobmanager.rpc.address: localhost
  • jobmanager.rpc.port:

    • 描述: 指定 JobManager 的RPC端口号。
    • 示例jobmanager.rpc.port: 6123
  • jobmanager.memory.process.size:

    • 描述: 指定 JobManager 的总内存大小。
    • 示例jobmanager.memory.process.size: 1600m
  • taskmanager.memory.process.size:

    • 描述: 指定 TaskManager 的总内存大小。
    • 示例taskmanager.memory.process.size: 1600m
  • taskmanager.numberOfTaskSlots:

    • 描述: 指定每个 TaskManager 的任务槽位数。
    • 示例taskmanager.numberOfTaskSlots: 4
  • parallelism.default:

    • 描述: 指定默认的并行度。
    • 示例parallelism.default: 4
  • state.backend:

    • 描述: 指定状态后端,可以选择 MemoryStateBackendFsStateBackend 或 RocksDBStateBackend
    • 示例state.backend: rocksdb
  • rest.address:

    • 描述: 指定 REST API 的主机名或IP地址。
    • 示例rest.address: localhost
  • rest.port:

    • 描述: 指定 REST API 的端口号。
    • 示例rest.port: 8081

启动flink,进入到bin目录

 

 

访问8081端口

到此flink安装完毕 

Flink任务提交

在pom.xml文件中配置打包插件

<build>
        <plugins> <!-- 编译插件 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <!--<encoding>${project.build.sourceEncoding} </encoding>-->
                </configuration>
            </plugin>
            <!-- 打jar包插件(会包含所有依赖) -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>2.3</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <filters>
                                <filter>
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <!-- zip -d learn_spark.jar META- INF/*.RSA META-INF/*.DSA META-INF/*.SF -->
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <!-- 可以设置jar包的入口类(可选) -->
                                    <mainClass>com.demo.flink.usage.stream.StreamProcessorApplication</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

点击package打成jar包

界面提交

上传jar包,信息无误后,点击提交

在这里就能看到输出结果了

命令行提交

上传jar包至linux

进入flink的bin目录

执行命令

./flink run -c com.demo.flink.usage.stream.StreamProcessorApplication /usr/local/flink-usage-1.0-SNAPSHOT.jar

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2244981.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Linux设置以及软件的安装(hadoop集群安装02)

一、Linux的常见设置 1、设置静态IP vi /etc/sysconfig/network-scripts/ifcfg-ens33 如何查看自己的虚拟机的网关&#xff1a; 完整的配置&#xff08;不要拷贝我的&#xff09;&#xff1a; TYPE"Ethernet" PROXY_METHOD"none" BROWSER_ONLY"no&…

数据中台方法论:数据汇聚

文章目录 一、数据汇聚概述二、 汇聚数据类型2.1 结构化数据2.2 半结构化数据2.3 非结构化数据 三、汇聚数据模式四、汇聚数据方法四、数据汇聚工具五、数据汇聚使用经验 数据小伙伴们&#xff0c;之前咱们长篇大论的聊聊过【数据中台建设方法论从0到1】&#xff0c;从数据中台…

【Maven】nexus 配置私有仓库配置【转】

介绍&#xff1a;【Maven】Nexus几个仓库的介绍-CSDN博客 一、仓库类型 proxy 远程仓库的代理&#xff0c;比如说nexus配置了一个central repository的proxy,当用户向这个proxy请求一个artifact的时候&#xff0c;会现在本地查找&#xff0c;如果找不到&#xff0c;则会从远程…

3C产品说明书电子化转变:用户体验、环保与商业机遇的共赢

在科技日新月异的当代社会&#xff0c;3C产品&#xff08;涵盖计算机类、通信类和消费类电子产品&#xff09;已成为我们日常生活中不可或缺的重要元素。与此同时&#xff0c;这些产品的配套说明书也经历了一场从纸质到电子化的深刻变革。这一转变不仅体现了技术的飞速进步&…

【YOLOv8】安卓端部署-2-项目实战

文章目录 1 准备Android项目文件1.1 解压文件1.2 放置ncnn模型文件1.3 放置ncnn和opencv的android文件1.4 修改CMakeLists.txt文件 2 手机连接电脑并编译软件2.1 编译软件2.2 更新配置及布局2.3 编译2.4 连接手机 3 自己数据集训练模型的部署4 参考 1 准备Android项目文件 1.1…

虚拟网卡驱动和DM9000C移植

网卡驱动程序框架 网卡驱动程序“收发功能”&#xff1a; 只要把上层的数据发给网卡&#xff0c;从网卡来的数据构造成包给上层即可。网卡只需要 “socket”编程&#xff0c;不需要打开某设备。 驱动程序都是以面向对象的思想写的&#xff0c;都有相关的结构体。 编程步骤 …

Vue3 + Vite 项目引入 Typescript

文章目录 一、TypeScript简介二、TypeScript 开发环境搭建三、编译方式1. 自动编译单个文件2. 自动编译整个项目 四、配置文件1. compilerOptions基本选项严格模式相关选项&#xff08;启用 strict 后自动包含这些&#xff09;模块与导入相关选项 2. include 和 excludeinclude…

Cyberchef使用功能之-多种压缩/解压缩操作对比

cyberchef的compression操作大类中有大量的压缩和解压缩操作&#xff0c;每种操作的功能和区别是什么&#xff0c;本章将进行讲解&#xff0c;作为我的专栏《Cyberchef 从入门到精通教程》中的一篇&#xff0c;详见这里。 关于文件格式和压缩算法的理论部分在之前的文章《压缩…

Istio分布式链路监控搭建:Jaeger与Zipkin

分布式追踪定义 分布式追踪是一种用来跟踪分布式系统中请求的方法&#xff0c;它可以帮助用户更好地理解、控制和优化分布式系统。分布式追踪中用到了两个概念&#xff1a;TraceID 和 SpanID。 TraceID 是一个全局唯一的 ID&#xff0c;用来标识一个请求的追踪信息。一个请求…

Linux修改/etc/hosts不起作用(ping: xxx: Name or service not known)的解决方法——开启NSCD

​ 问题描述 起因是我在实验室云资源池的一台虚拟机&#xff08;CentOS 8.5&#xff09;上的/etc/hosts文件中为Fabric网络节点的域名指定了IP&#xff1a; IP可以ping通&#xff0c;但是ping域名时提示ping: xxx: Name or service not known。 问题本身应该是Linux通用的&a…

Python中Tushare(金融数据库)入门详解

文章目录 Python中Tushare&#xff08;金融数据库&#xff09;入门详解一、引言二、安装与注册1、安装Tushare2、注册与获取Token 三、Tushare基本使用1、设置Token2、获取数据2.1、获取股票基础信息2.2、获取交易日历2.3、获取A股日线行情2.4、获取沪股通和深股通成份股2.5、获…

【网络】网络抓包与协议分析

网络抓包与协议分析 一. 以太网帧格式分析 这是以太网数据帧的基本格式&#xff0c;包含目的地址(6 Byte)、源地址(6 Byte)、类型(2 Byte)、数据(46~1500 Byte)、FCS(4 Byte)。 Mac 地址类型 分为单播地址、组播地址、广播地址。 单播地址&#xff1a;是指第一个字节的最低位…

RabbitMQ的工作队列在Spring Boot中实现(详解常⽤的⼯作模式)

上文着重介绍RabbitMQ 七种工作模式介绍RabbitMQ 七种工作模式介绍_rabbitmq 工作模式-CSDN博客 本篇讲解如何在Spring环境下进⾏RabbitMQ的开发.&#xff08;只演⽰部分常⽤的⼯作模式&#xff09; 目录 引⼊依赖 一.工作队列模式 二.Publish/Subscribe(发布订阅模式) …

python学习_3.正则表达式

来源:B站/麦叔编程 1. 正则表达式的7个境界 假设有一段文字&#xff1a; text 身高:178&#xff0c;体重&#xff1a;168&#xff0c;学号&#xff1a;123456&#xff0c;密码:9527要确定文本中是否包含数字123456&#xff0c;我们可以用in运算符&#xff0c;也可以使用inde…

Python学习------第十天

数据容器-----元组 定义格式&#xff0c;特点&#xff0c;相关操作 元组一旦定义&#xff0c;就无法修改 元组内只有一个数据&#xff0c;后面必须加逗号 """ #元组 (1,"hello",True) #定义元组 t1 (1,"hello") t2 () t3 tuple() prin…

nodejs基于微信小程序的云校园的设计与实现

摘 要 相比于传统的校园管理方式&#xff0c;智能化的管理方式可以大幅提高校园的管理效率&#xff0c;实现了云校园管理的标准化、制度化、程序化的管理&#xff0c;有效地防止了云校园信息的不规范管理&#xff0c;提高了信息的处理速度和精确度&#xff0c;能够及时、准确地…

Excel——宏教程(精简版)

一、宏的简介 1、什么是宏&#xff1f; Excel宏是一种自动化工具&#xff0c;它允许用户录制一系列操作并将其转换为VBA(Visual Basic for Applications)代码。这样&#xff0c;用户可以在需要时执行这些操作&#xff0c;以自动化Excel任务。 2、宏的优点 我们可以利用宏来…

绿光一字线激光模组:工业制造与科技创新的得力助手

在现代工业制造和科技创新领域&#xff0c;绿光一字线激光模组以其独特的性能和广泛的应用前景&#xff0c;成为了不可或缺的关键设备。这种激光模组能够发射出一条明亮且精确的绿色激光线&#xff0c;具有高精度、高稳定性和长寿命的特点&#xff0c;为各种精密加工和测量需求…

Python Turtle绘图:重现汤姆劈树的经典瞬间

Python Turtle绘图&#xff1a;重现汤姆劈树的经典瞬间 &#x1f980; 前言 &#x1f980;&#x1f41e;往期绘画&#x1f41e;&#x1f40b; 效果图 &#x1f40b;&#x1f409; 代码 &#x1f409; &#x1f980; 前言 &#x1f980; 《汤姆与杰瑞》&#xff08;Tom and Jerr…

Oracle - 多区间按权重取值逻辑 ,分时区-多层级-取配置方案(二)

Oracle - 多区间按权重取值逻辑 &#xff0c;分时区-多层级-取配置方案https://blog.csdn.net/shijianduan1/article/details/133386281 某业务配置表&#xff0c;按配置的时间区间及组织层级取方案&#xff0c;形成报表展示出所有部门方案的取值&#xff1b; 例如&#xff0…