k8s 搭建基于session模式的flink集群

news2024/12/26 21:29:37

1.flink集群搭建

不废话直接上代码,都是基于官网的,在此记录一下 Kubernetes | Apache Flink

flink-configuration-configmap.yaml

apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    jobmanager.memory.process.size: 1600m
    taskmanager.memory.process.size: 1728m
    parallelism.default: 2    
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.pekko.name = org.apache.pekko
    logger.pekko.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF    

jobmanager-service.yaml Optional service, which is only necessary for non-HA mode.

apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager

Session cluster resource definitions #

jobmanager-session-deployment-non-ha.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: apache/flink:latest
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

taskmanager-session-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: apache/flink:latest
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/flink/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties

 kubectl apply -f xxx.yaml 或者 kubectl apply -f ./flink  flink为文件夹,存放的是以上这几个.yaml文件

为flink的ui界面添加nodeport即可外部访问

2. demo代码测试

创建一个maven工程,pom.xml引入依赖:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>test-platform</artifactId>
        <groupId>com.test</groupId>
        <version>2.0.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>flink-demo</artifactId>

    <properties>
        <maven.compiler.source>11</maven.compiler.source>
        <maven.compiler.target>11</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.17.0</flink.version>
        <log4j.version>2.20.0</log4j.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-streaming-java</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <scope>compile</scope>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>

</project>

log4j2.xml:

<?xml version="1.0" encoding="UTF-8"?>
<configuration monitorInterval="5">
    <Properties>
        <property name="LOG_PATTERN" value="%date{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n" />
        <!-- LOG_LEVEL 配置你需要的日志输出级别       -->
        <property name="LOG_LEVEL" value="INFO" />
    </Properties>

    <appenders>
        <console name="Console" target="SYSTEM_OUT">
            <PatternLayout pattern="${LOG_PATTERN}"/>
            <ThresholdFilter level="${LOG_LEVEL}" onMatch="ACCEPT" onMismatch="DENY"/>
        </console>
    </appenders>

    <loggers>
        <root level="${LOG_LEVEL}">
            <appender-ref ref="Console"/>
        </root>
    </loggers>

</configuration>

计数代码:

package com.test.flink;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.KeyedStream;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;

public class WordCountUnboundStreamDemo {

    public static void main(String[] args) throws Exception {
        // TODO 1.创建执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(
//                3, // 尝试重启的次数
//                Time.of(10, TimeUnit.SECONDS) // 间隔
//        ));
        // TODO 2.读取数据
        DataStreamSource<String> lineDS = env.socketTextStream("192.168.0.28", 7777);

        // TODO 3.处理数据: 切分、转换、分组、聚合
        // TODO 3.1 切分、转换
        SingleOutputStreamOperator<Tuple2<String, Integer>> wordAndOneDS = lineDS //<输入类型, 输出类型>
                .flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
                        // 按照 空格 切分
                        String[] words = value.split(" ");
                        for (String word : words) {
                            // 转换成 二元组 (word,1)
                            Tuple2<String, Integer> wordsAndOne = Tuple2.of(word, 1);
                            // 通过 采集器 向下游发送数据
                            out.collect(wordsAndOne);
                        }
                    }
                });
        // TODO 3.2 分组
        KeyedStream<Tuple2<String, Integer>, String> wordAndOneKS = wordAndOneDS.keyBy(
                new KeySelector<Tuple2<String, Integer>, String>() {
                    @Override
                    public String getKey(Tuple2<String, Integer> value) throws Exception {
                        return value.f0;
                    }
                }
        );
        // TODO 3.3 聚合
        SingleOutputStreamOperator<Tuple2<String, Integer>> sumDS = wordAndOneKS.sum(1);

        // TODO 4.输出数据
        sumDS.print("接收到的数据=======").setParallelism(1);

        // TODO 5.执行:类似 sparkstreaming最后 ssc.start()
        env.execute(sumDS.getClass().getSimpleName());
    }

}

打成jar包导入flink dashboard:

在另一台机器上运行 nc -lk -p 7777,如果出现连接拒绝,查看是否放开端口号

k8s查看读取到的数据

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/974267.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

sa-token学习

官方文档地址&#xff1a;sa-token

解决六大痛点促进企业更好使用生成式AI,亚马逊云科技顾凡采访分享可用方案

亚马逊云科技大中华区战略业务发展部总经理顾凡在接受21世纪经济报道记者专访时表示&#xff0c;生成式人工智能将从四个方面为企业带来机遇&#xff1a;第一是创造全新的客户体验&#xff1b;第二是提高企业内部员工的生产力&#xff1b;第三是帮助企业提升业务运营效率&#…

苹果“嘴硬”?下载超出预期,否认开发者对 Vision Pro 兴趣不高

据报道&#xff0c;苹果于上个月在全球多个城市开设了Vision Pro开发者实验室&#xff0c;旨在让开发者尽早体验并研发这款令人期待的头显技术。这一为期一天的实验室活动邀请了一些开发人员前来测试和上手Vision Pro头显&#xff0c;并亲身体验其应用的真实效果。 在活动中&am…

GE HYDRAN M2气体监测系统

气体监测&#xff1a; HYDRAN M2系统能够监测变压器或油冷却电缆系统中的气体&#xff0c;包括氢气、乙炔、甲烷、乙烷和乙烯等。这些气体的生成和积累可能是绝缘材料故障的迹象。 实时监测&#xff1a; 该系统能够实时监测气体浓度的变化&#xff0c;以及油的温度和压力等参数…

软考·系统架构师——导学

文章目录 考试简介考试安排考试科目《综合知识》考点分布历年案例分析考点历年论文考点 最新消息&#xff1a;自2023年下半年起&#xff0c;计算机软件资格考试的考试方式均由纸笔考试改革为计算机化考试。 考试简介 计算机技术与软件专业技术资格&#xff08;水平&#xff09;…

前端 -- 基础 常用标签 ( 标题标签、段落标签、换行标签 )

标题标签 <h1> - <h6> HTML 提供了 6 个等级的网页标题&#xff0c;即 <h1> - <h6> <h1> 我是一级标题 </h1> 单词 head 的缩写&#xff0c;意味 头部&#xff0c;标题 。 上示&#xff0c;即 你在 <h1> </h1> 两个…

【java】【项目实战】[外卖十二]【完结】项目优化(前后端分离开发)

目录 一、问题说明 二、前后端分离开发 1、介绍 2、开发流程 3、前端技术栈 三、Yapi 1、介绍 2、部署 3、使用 3.1 添加项目​编辑 3.2 添加分类​编辑 3.3 添加接口 3.4 运行 3.5 导出接口 3.6 导入数据 四、Swagger 1、介绍 2、使用方式 2.1 pom 2.2 导入…

加餐2|面试问题:古人怎么向别人推荐自己?

好诗相伴&#xff0c;千金不换&#xff0c;你好&#xff0c;我是天博。 这一讲的加餐&#xff0c;我想和你聊一聊古人是怎么求职的。现代人求职一般都是面向心仪的公司&#xff0c;先投简历再面试&#xff0c;核心就是向用工单位推荐自己。古人也差不多。我们之前在第十二讲里…

项目管理:项目经理如何做好时间管理?

在项目执行过程中&#xff0c;往往会因突发问题&#xff0c;导致项目的延期。任何一个项目都无法回避最终交付日期&#xff0c;并且所有的活动都严格围绕时间坐标进行。因此&#xff0c;对项目进行合理的时间管理&#xff0c;才是保证项目顺利交付的关键。那么&#xff0c;如何…

js中call、apply和bind:

文章目录 一、区别:二、案例&#xff1a;三、实现&#xff1a;【1】call实现【2】apply实现【3】bind实现 一、区别: call、apply、bind相同点&#xff1a;都是改变this的指向&#xff0c;传入的第一个参数都是绑定this的指向&#xff0c;在非严格模式中&#xff0c;如果第一个…

深入协议栈了解TCP的三次握手、四次挥手、CLOSE-WAIT、TIME-WAIT。

TCP网络编程的代码网上很多&#xff0c;这里就不再赘述&#xff0c;简单用一个图展示一下tcp网络编程的流程&#xff1a; 1、深入connect、listen、accept系统调用&#xff0c;进一步理解TCP的三次握手 这三个函数都是系统调用&#xff0c;我们可以分为请求连接方和被…

文案生成器-文章生成器

你是否曾经面对过频繁写作文案的问题&#xff1f;是不是觉得文案的撰写过程既繁琐又耗时&#xff1f;那么今天我要向你介绍一项令人兴奋的技术——文案自动生成。是的&#xff0c;现在我们可以借助人工智能技术&#xff0c;自动生成高质量的文案&#xff0c;为你在营销和推广方…

C++笔记基础全部完整版

01 面向对象 基本概念 面向对象程序设计&#xff08;Object-Oriented Programming&#xff0c;OOP&#xff09;是一种新的程序设计范型。程序设计范型是指设计程序的规范、模型和风格&#xff0c;它是一类程序设计语言的基础。 面向过程 面向过程程序设计范型是使用较广泛的…

【AIGC专题】Stable Diffusion 从入门到企业级实战0401

一、概述 本章是《Stable Diffusion 从入门到企业级实战》系列的第四部分能力进阶篇《Stable Diffusion ControlNet v1.1 图像精准控制》第01节&#xff0c; 利用Stable Diffusion ControlNet Inpaint模型精准控制图像生成。本部分内容&#xff0c;位于整个Stable Diffusion生…

遥感指数数据库

目前遥感指数多种多样&#xff0c;那怎么针对不同的应用领域选择合适的植被指数&#xff1f;不同卫星又有哪些可以反演的指数&#xff1f; Henrich等人开发了Index Database(网址&#xff1a;https://www.indexdatabase.de/)&#xff0c;总结了目前主流的遥感指数&#xff0c;…

EMERSON 1X00781H01L电源模块

应用领域&#xff1a; EMERSON 1X00781H01L 电源模块广泛用于工业自动化、电力系统、通信设备、机器人技术和其他领域&#xff0c;以确保设备和系统的正常运行。 电源输出&#xff1a; 该电源模块通常提供不同的电压和电流输出选项&#xff0c;以满足不同设备和系统的电源需求…

Python(Web时代)—— Django管理工具

介绍 前言 通过模型我们可以进行数据库的操作&#xff0c;如果我们想要对数据表中的数据进行操作还需要单独的开发接口&#xff0c;颇有不方便&#xff0c;今天我们来通过Django提供的管理工具来编辑模型数据。 Django 为我们提供了强大的工具&#xff0c;可以全自动地根据模…

go语言--堆栈

根据隔离适应策略&#xff0c;使用内存时的最小单位为mspan 每个mspan为N个相同大小的“格子 Go中一共有67种mspan

算法:图解递归算法的应用场景和使用途径

文章目录 什么是递归&#xff1f;使用递归的原因&#xff1f;如何理解递归&#xff1f;递归的使用写法典型例题和分析汉诺塔问题合并两个有序链表反转链表两两交换链表中的节点pow 总结 什么是递归&#xff1f; 递归就是函数自己调用自己的情况&#xff0c;在二叉树&#xff0…

Spring Bean 别名处理原理分析

今天来和小伙伴们聊一聊 Spring 中关于 Bean 别名的处理逻辑。 1. Alias 别名&#xff0c;顾名思义就是给一个 Bean 去两个甚至多个名字。整体上来说&#xff0c;在 Spring 中&#xff0c;有两种不同的别名定义方式&#xff1a; 定义 Bean 的 name 属性&#xff0c;name 属性…