云原生之深入解析Flink on k8s的运行模式与实战操作

news2024/11/17 11:40:58

一、概述

  • Flink 核心是一个流式的数据流执行引擎,并且能够基于同一个 Flink 运行时,提供支持流处理和批处理两种类型应用。其针对数据流的分布式计算提供了数据分布,数据通信及容错机制等功能。
  • Flink 官网
  • 不同版本的文档
  • flink on k8s 官方文档
  • GitHub 地址

二、Flink 运行模式

  • 官方文档
  • Flink on yarn 有三种运行模式:
    • yarn-session 模式(Seesion Mode)
    • yarn-cluster 模式(Per-Job Mode)
    • Application 模式(Application Mode)

在这里插入图片描述

  • 注意:Per-Job 模式(已弃用),Per-job 模式仅由 YARN 支持,并已在 Flink 1.15 中弃用,它将被丢弃在 FLINK-26000 中。

三、Flink on k8s 实战操作

① flink 下载

  • 下载地址,如下所示:
wget https://dlcdn.apache.org/flink/flink-1.14.6/flink-1.14.6-bin-scala_2.12.tgz

② 构建基础镜像

docker pull apache/flink:1.14.6-scala_2.12
docker tag apache/flink:1.14.6-scala_2.12 myharbor.com/bigdata/flink:1.14.6-scala_2.12
docker push myharbor.com/bigdata/flink:1.14.6-scala_2.12

③ session 模式

  • Flink Session 集群作为长时间运行的 Kubernetes Deployment 执行,可以在一个 Session 集群上运行多个 Flink 作业,每个作业都需要在集群部署完成后提交到集群。
  • Kubernetes 中的 Flink Session 集群部署至少包含三个组件:
    • 运行 JobManager 的部署;
    • TaskManagers 池的部署;
    • 暴露JobManager 的 REST 和 UI 端口的服务。

(A)Native Kubernetes 模式

  • 参数配置:
https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#kubernetes-namespace
  • 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 . --no-cache

# 上传镜像
docker push myharbor.com/bigdata/flink-session:1.14.6-scala_2.12
  • 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 创建 flink 集群:
./bin/kubernetes-session.sh \
-Dkubernetes.cluster-id=my-first-flink-cluster  \
-Dkubernetes.container.image=myharbor.com/bigdata/flink-session:1.14.6-scala_2.12 \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
-Dkubernetes.rest-service.exposed.type=NodePort

在这里插入图片描述
在这里插入图片描述

  • 提交任务(注意 jdk 版本,目前 jdk8 是正常的):
./bin/flink run \
--target kubernetes-session \
-Dkubernetes.cluster-id=my-first-flink-cluster \
-Dkubernetes.namespace=flink \
-Dkubernetes.jobmanager.service-account=flink-service-account \
./examples/streaming/TopSpeedWindowing.jar

#   参数配置
./examples/streaming/WordCount.jar
-Dkubernetes.taskmanager.cpu=2000m \
-Dexternal-resource.limits.kubernetes.cpu=4000m \
-Dexternal-resource.limits.kubernetes.memory=10Gi \
-Dexternal-resource.requests.kubernetes.cpu=2000m \
-Dexternal-resource.requests.kubernetes.memory=8Gi \
-Dkubernetes.taskmanager.cpu=2000m \
  • 查看:
kubectl get pods -n flink
kubectl logs -f my-first-flink-cluster-taskmanager-1-1

在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:
kubectl delete deployment/my-first-flink-cluster -n flink
kubectl delete ns flink --force

(B)Standalone 模式

  • 构建镜像:默认用户是 flink 用户,这里换成 admin,根据企业需要更换用户,脚本可以通过上面运行的 pod 拿到。启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash

###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() {
    if [ $(id -u) != 0 ]; then
        # Don't need to drop privs if EUID != 0
        return
    elif [ -x /sbin/su-exec ]; then
        # Alpine
        echo su-exec admin
    else
        # Others
        echo gosu admin
    fi
}

copy_plugins_if_required() {
  if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
    return 0
  fi

  echo "Enabling required built-in plugins"
  for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
    echo "Linking ${target_plugin} to plugin directory"
    plugin_name=${target_plugin%.jar}

    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
      echo "Plugin ${target_plugin} does not exist. Exiting."
      exit 1
    else
      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
      echo "Successfully enabled ${target_plugin}"
    fi
  done
}

set_config_option() {
  local option=$1
  local value=$2

  # escape periods for usage in regular expressions
  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")

  # either override an existing entry, or append a new one
  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  else
        echo "${option}: ${value}" >> "${CONF_FILE}"
  fi
}

prepare_configuration() {
    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
    set_config_option blob.server.port 6124
    set_config_option query.server.port 6125

    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
    fi

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}

maybe_enable_jemalloc() {
    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
        JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
        JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
        if [ -f "$JEMALLOC_PATH" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
        elif [ -f "$JEMALLOC_FALLBACK" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
        else
            if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
                MSG_PATH=$JEMALLOC_PATH
            else
                MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
            fi
            echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
        fi
    fi
}

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@")
if [ "$1" = "help" ]; then
    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
    printf "    Or $(basename "$0") help\n\n"
    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
    exit 0
elif [ "$1" = "jobmanager" ]; then
    args=("${args[@]:1}")

    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
    args=("${args[@]:1}")

    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
    args=("${args[@]:1}")

    echo "Starting History Server"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
    args=("${args[@]:1}")

    echo "Starting Task Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi

args=("${args[@]}")

# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
  • 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009

USER root

# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH

# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/

#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/
RUN chmod +x /opt/apache/docker-entrypoint.sh

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin /opt/apache

#设置的工作目录
WORKDIR $FLINK_HOME

# 对外暴露端口
EXPOSE 6123 8081

# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  • 创建命名空间和 serviceaccount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 编排 yaml 文件:
    • flink-configuration-configmap.yaml:
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 3200m
    taskmanager.memory.process.size: 2728m
    taskmanager.memory.flink.size: 2280m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
    • jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
    • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager
    • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager
  • 如下几个配置文件是公共的:
    • jobmanager-session-deployment-non-ha.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-jobmanager
spec:
  replicas: 1
  selector:
    matchLabels:
      app: flink
      component: jobmanager
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      containers:
      - name: jobmanager
        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
        args: ["jobmanager"]
        ports:
        - containerPort: 6123
          name: rpc
        - containerPort: 6124
          name: blob-server
        - containerPort: 8081
          name: webui
        livenessProbe:
          tcpSocket:
            port: 6123
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.14.6/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
    • taskmanager-session-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.14.6/conf/
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
  • 创建 flink 集群:
    • 如下所示:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink

# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink

# Create the deployments for the cluster
kubectl create -f jobmanager-session-deployment-non-ha.yaml -n flink
kubectl create -f taskmanager-session-deployment.yaml -n flink
    • 镜像逆向解析 dockerfile:
alias whaler="docker run -t --rm -v /var/run/docker.sock:/var/run/docker.sock:ro pegleg/whaler"
whaler flink:1.14.6-scala_2.12
    • 查看:
kubectl get pods,svc -n flink -owide

在这里插入图片描述

    • Web UI 地址。
  • 提交任务:
./bin/flink run -m local-168-182-110:30081 ./examples/streaming/WordCount.jar
kubectl logs flink-taskmanager-54649bf96c-zjtkh -n flink

在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f taskmanager-session-deployment.yaml -n flink
kubectl delete -f jobmanager-session-deployment.yaml -n flink
kubectl delete ns flink --force
  • 访问 flink web:
    • 端口就是 jobmanager-rest-service.yaml 文件中的 NodePort:
http://192.168.182.110:30081/#/overview

在这里插入图片描述

④ application 模式(推荐)

  • Kubernetes 中一个基本的 Flink Application 集群部署包含三个组件:
    • 运行 JobManager 的应用程序;
    • TaskManagers 池的部署;
    • 暴露 JobManager 的 REST 和 UI 端口的服务。

(A)Native Kubernetes 模式(常用)

  • 构建镜像 Dockerfile:
FROM myharbor.com/bigdata/flink:1.14.6-scala_2.12
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone
RUN export LANG=zh_CN.UTF-8
RUN mkdir -p $FLINK_HOME/usrlib
COPY  TopSpeedWindowing.jar $FLINK_HOME/usrlib/
  • 开始构建镜像:
docker build -t myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 . --no-cache

# 上传镜像
docker push myharbor.com/bigdata/flink-application:1.14.6-scala_2.12

# 删除镜像
docker rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
crictl rmi myharbor.com/bigdata/flink-application:1.14.6-scala_2.12
  • 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 创建 flink 集群并提交任务:
./bin/flink run-application \
    --target kubernetes-application \
    -Dkubernetes.cluster-id=my-first-application-cluster  \
 -Dkubernetes.container.image=myharbor.com/bigdata/flink-application:1.14.6-scala_2.12 \
 -Dkubernetes.jobmanager.replicas=1 \
 -Dkubernetes.namespace=flink \
 -Dkubernetes.jobmanager.service-account=flink-service-account \
 -Dexternal-resource.limits.kubernetes.cpu=2000m \
 -Dexternal-resource.limits.kubernetes.memory=2Gi \
 -Dexternal-resource.requests.kubernetes.cpu=1000m \
 -Dexternal-resource.requests.kubernetes.memory=1Gi \
 -Dkubernetes.rest-service.exposed.type=NodePort \
 local:///opt/flink/usrlib/TopSpeedWindowing.jar
  • local 是应用模式中唯一支持的方案,local 代表本地环境,这里即 pod 或者容器环境,并非宿主机。查看:
kubectl get pods pods,svc -n flink

在这里插入图片描述

kubectl logs -f my-first-application-cluster-taskmanager-1-1 -n flink

在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

  • 删除 flink 集群:

kubectl delete deployment/my-first-application-cluster -n flink
kubectl delete ns flink --force

(B)Standalone 模式

  • 构建镜像 Dockerfile,启动脚本 docker-entrypoint.sh:
#!/usr/bin/env bash

###############################################################################
#  Licensed to the Apache Software Foundation (ASF) under one
#  or more contributor license agreements.  See the NOTICE file
#  distributed with this work for additional information
#  regarding copyright ownership.  The ASF licenses this file
#  to you under the Apache License, Version 2.0 (the
#  "License"); you may not use this file except in compliance
#  with the License.  You may obtain a copy of the License at
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
#  Unless required by applicable law or agreed to in writing, software
#  distributed under the License is distributed on an "AS IS" BASIS,
#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
#  See the License for the specific language governing permissions and
# limitations under the License.
###############################################################################

COMMAND_STANDALONE="standalone-job"
COMMAND_HISTORY_SERVER="history-server"

# If unspecified, the hostname of the container is taken as the JobManager address
JOB_MANAGER_RPC_ADDRESS=${JOB_MANAGER_RPC_ADDRESS:-$(hostname -f)}
CONF_FILE="${FLINK_HOME}/conf/flink-conf.yaml"

drop_privs_cmd() {
    if [ $(id -u) != 0 ]; then
        # Don't need to drop privs if EUID != 0
        return
    elif [ -x /sbin/su-exec ]; then
        # Alpine
        echo su-exec admin
    else
        # Others
        echo gosu admin
    fi
}

copy_plugins_if_required() {
  if [ -z "$ENABLE_BUILT_IN_PLUGINS" ]; then
    return 0
  fi

  echo "Enabling required built-in plugins"
  for target_plugin in $(echo "$ENABLE_BUILT_IN_PLUGINS" | tr ';' ' '); do
    echo "Linking ${target_plugin} to plugin directory"
    plugin_name=${target_plugin%.jar}

    mkdir -p "${FLINK_HOME}/plugins/${plugin_name}"
    if [ ! -e "${FLINK_HOME}/opt/${target_plugin}" ]; then
      echo "Plugin ${target_plugin} does not exist. Exiting."
      exit 1
    else
      ln -fs "${FLINK_HOME}/opt/${target_plugin}" "${FLINK_HOME}/plugins/${plugin_name}"
      echo "Successfully enabled ${target_plugin}"
    fi
  done
}

set_config_option() {
  local option=$1
  local value=$2

  # escape periods for usage in regular expressions
  local escaped_option=$(echo ${option} | sed -e "s/\./\\\./g")

  # either override an existing entry, or append a new one
  if grep -E "^${escaped_option}:.*" "${CONF_FILE}" > /dev/null; then
        sed -i -e "s/${escaped_option}:.*/$option: $value/g" "${CONF_FILE}"
  else
        echo "${option}: ${value}" >> "${CONF_FILE}"
  fi
}

prepare_configuration() {
    set_config_option jobmanager.rpc.address ${JOB_MANAGER_RPC_ADDRESS}
    set_config_option blob.server.port 6124
    set_config_option query.server.port 6125

    if [ -n "${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}" ]; then
        set_config_option taskmanager.numberOfTaskSlots ${TASK_MANAGER_NUMBER_OF_TASK_SLOTS}
    fi

    if [ -n "${FLINK_PROPERTIES}" ]; then
        echo "${FLINK_PROPERTIES}" >> "${CONF_FILE}"
    fi
    envsubst < "${CONF_FILE}" > "${CONF_FILE}.tmp" && mv "${CONF_FILE}.tmp" "${CONF_FILE}"
}

maybe_enable_jemalloc() {
    if [ "${DISABLE_JEMALLOC:-false}" == "false" ]; then
        JEMALLOC_PATH="/usr/lib/$(uname -m)-linux-gnu/libjemalloc.so"
        JEMALLOC_FALLBACK="/usr/lib/x86_64-linux-gnu/libjemalloc.so"
        if [ -f "$JEMALLOC_PATH" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_PATH
        elif [ -f "$JEMALLOC_FALLBACK" ]; then
            export LD_PRELOAD=$LD_PRELOAD:$JEMALLOC_FALLBACK
        else
            if [ "$JEMALLOC_PATH" = "$JEMALLOC_FALLBACK" ]; then
                MSG_PATH=$JEMALLOC_PATH
            else
                MSG_PATH="$JEMALLOC_PATH and $JEMALLOC_FALLBACK"
            fi
            echo "WARNING: attempted to load jemalloc from $MSG_PATH but the library couldn't be found. glibc will be used instead."
        fi
    fi
}

maybe_enable_jemalloc

copy_plugins_if_required

prepare_configuration

args=("$@")
if [ "$1" = "help" ]; then
    printf "Usage: $(basename "$0") (jobmanager|${COMMAND_STANDALONE}|taskmanager|${COMMAND_HISTORY_SERVER})\n"
    printf "    Or $(basename "$0") help\n\n"
    printf "By default, Flink image adopts jemalloc as default memory allocator. This behavior can be disabled by setting the 'DISABLE_JEMALLOC' environment variable to 'true'.\n"
    exit 0
elif [ "$1" = "jobmanager" ]; then
    args=("${args[@]:1}")

    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/jobmanager.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_STANDALONE} ]; then
    args=("${args[@]:1}")

    echo "Starting Job Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/standalone-job.sh" start-foreground "${args[@]}"
elif [ "$1" = ${COMMAND_HISTORY_SERVER} ]; then
    args=("${args[@]:1}")

    echo "Starting History Server"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/historyserver.sh" start-foreground "${args[@]}"
elif [ "$1" = "taskmanager" ]; then
    args=("${args[@]:1}")

    echo "Starting Task Manager"

    exec $(drop_privs_cmd) "$FLINK_HOME/bin/taskmanager.sh" start-foreground "${args[@]}"
fi

args=("${args[@]}")

# Running command in pass-through mode
exec $(drop_privs_cmd) "${args[@]}"
  • 编排 Dockerfile:
FROM myharbor.com/bigdata/centos:7.9.2009

USER root

# 安装常用工具
RUN yum install -y vim tar wget curl rsync bzip2 iptables tcpdump less telnet net-tools lsof

# 设置时区,默认是UTC时区
RUN rm -f /etc/localtime && ln -sv /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && echo "Asia/Shanghai" > /etc/timezone

RUN mkdir -p /opt/apache

ADD jdk-8u212-linux-x64.tar.gz /opt/apache/

ADD flink-1.14.6-bin-scala_2.12.tgz  /opt/apache/

ENV FLINK_HOME /opt/apache/flink-1.14.6
ENV JAVA_HOME /opt/apache/jdk1.8.0_212
ENV PATH $JAVA_HOME/bin:$PATH

# 创建用户应用jar目录
RUN mkdir $FLINK_HOME/usrlib/

#RUN mkdir home
COPY docker-entrypoint.sh /opt/apache/

RUN groupadd --system --gid=9999 admin && useradd --system --home-dir $FLINK_HOME --uid=9999 --gid=admin admin

RUN chown -R admin:admin /opt/apache
RUN chmod +x ${FLINK_HOME}/docker-entrypoint.sh

#设置的工作目录
WORKDIR $FLINK_HOME

# 对外暴露端口
EXPOSE 6123 8081

# 执行脚本,构建镜像时不执行,运行实例才会执行
ENTRYPOINT ["/opt/apache/docker-entrypoint.sh"]
CMD ["help"]
docker build -t myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12 . --no-cache

# 上传镜像
docker push myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12

# 删除镜像
docker rmi myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
  • 创建命名空间和 serviceacount:
# 创建namespace
kubectl create ns flink
# 创建serviceaccount
kubectl create serviceaccount flink-service-account -n flink
# 用户授权
kubectl create clusterrolebinding flink-role-binding-flink --clusterrole=edit --serviceaccount=flink:flink-service-account
  • 编排 yaml 文件:
    • flink-configuration-configmap.yaml:
apiVersion: v1
kind: ConfigMap
metadata:
  name: flink-config
  labels:
    app: flink
data:
  flink-conf.yaml: |+
    jobmanager.rpc.address: flink-jobmanager
    taskmanager.numberOfTaskSlots: 2
    blob.server.port: 6124
    jobmanager.rpc.port: 6123
    taskmanager.rpc.port: 6122
    queryable-state.proxy.ports: 6125
    jobmanager.memory.process.size: 3200m
    taskmanager.memory.process.size: 2728m
    taskmanager.memory.flink.size: 2280m
    parallelism.default: 2
  log4j-console.properties: |+
    # This affects logging for both user code and Flink
    rootLogger.level = INFO
    rootLogger.appenderRef.console.ref = ConsoleAppender
    rootLogger.appenderRef.rolling.ref = RollingFileAppender

    # Uncomment this if you want to _only_ change Flink's logging
    #logger.flink.name = org.apache.flink
    #logger.flink.level = INFO

    # The following lines keep the log level of common libraries/connectors on
    # log level INFO. The root logger does not override this. You have to manually
    # change the log levels here.
    logger.akka.name = akka
    logger.akka.level = INFO
    logger.kafka.name= org.apache.kafka
    logger.kafka.level = INFO
    logger.hadoop.name = org.apache.hadoop
    logger.hadoop.level = INFO
    logger.zookeeper.name = org.apache.zookeeper
    logger.zookeeper.level = INFO

    # Log all infos to the console
    appender.console.name = ConsoleAppender
    appender.console.type = CONSOLE
    appender.console.layout.type = PatternLayout
    appender.console.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n

    # Log all infos in the given rolling file
    appender.rolling.name = RollingFileAppender
    appender.rolling.type = RollingFile
    appender.rolling.append = false
    appender.rolling.fileName = ${sys:log.file}
    appender.rolling.filePattern = ${sys:log.file}.%i
    appender.rolling.layout.type = PatternLayout
    appender.rolling.layout.pattern = %d{yyyy-MM-dd HH:mm:ss,SSS} %-5p %-60c %x - %m%n
    appender.rolling.policies.type = Policies
    appender.rolling.policies.size.type = SizeBasedTriggeringPolicy
    appender.rolling.policies.size.size=100MB
    appender.rolling.strategy.type = DefaultRolloverStrategy
    appender.rolling.strategy.max = 10

    # Suppress the irrelevant (wrong) warnings from the Netty channel handler
    logger.netty.name = org.jboss.netty.channel.DefaultChannelPipeline
    logger.netty.level = OFF
    • jobmanager-service.yaml可选服务,仅非 HA 模式需要:
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager
spec:
  type: ClusterIP
  ports:
  - name: rpc
    port: 6123
  - name: blob-server
    port: 6124
  - name: webui
    port: 8081
  selector:
    app: flink
    component: jobmanager
    • jobmanager-rest-service.yaml 可选服务,将 jobmanager rest 端口公开为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:
  name: flink-jobmanager-rest
spec:
  type: NodePort
  ports:
  - name: rest
    port: 8081
    targetPort: 8081
    nodePort: 30081
  selector:
    app: flink
    component: jobmanager
    • taskmanager-query-state-service.yaml 可选服务,公开 TaskManager 端口以访问可查询状态作为公共 Kubernetes 节点的端口:
apiVersion: v1
kind: Service
metadata:
  name: flink-taskmanager-query-state
spec:
  type: NodePort
  ports:
  - name: query-state
    port: 6125
    targetPort: 6125
    nodePort: 30025
  selector:
    app: flink
    component: taskmanager
    • jobmanager-application-non-ha.yaml,非高可用(注意这里的挂载 /mnt/bigdata/flink/usrlib,最好这里使用共享目录):
apiVersion: batch/v1
kind: Job
metadata:
  name: flink-jobmanager
spec:
  template:
    metadata:
      labels:
        app: flink
        component: jobmanager
    spec:
      restartPolicy: OnFailure
      containers:
        - name: jobmanager
          image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
          env:
          args: ["standalone-job", "--job-classname", "org.apache.flink.examples.java.wordcount.WordCount","--output","/tmp/result"]
          ports:
            - containerPort: 6123
              name: rpc
            - containerPort: 6124
              name: blob-server
            - containerPort: 8081
              name: webui
          livenessProbe:
            tcpSocket:
              port: 6123
            initialDelaySeconds: 30
            periodSeconds: 60
          volumeMounts:
            - name: flink-config-volume
              mountPath: /opt/apache/flink-1.14.6/conf
            - name: job-artifacts-volume
              mountPath: /opt/apache/flink-1.14.6/usrlib
          securityContext:
            runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
        - name: flink-config-volume
          configMap:
            name: flink-config
            items:
              - key: flink-conf.yaml
                path: flink-conf.yaml
              - key: log4j-console.properties
                path: log4j-console.properties
        - name: job-artifacts-volume
          hostPath:
            path: /mnt/nfsdata/flink/application/job-artifacts
    • taskmanager-job-deployment.yaml:
apiVersion: apps/v1
kind: Deployment
metadata:
  name: flink-taskmanager
spec:
  replicas: 2
  selector:
    matchLabels:
      app: flink
      component: taskmanager
  template:
    metadata:
      labels:
        app: flink
        component: taskmanager
    spec:
      containers:
      - name: taskmanager
        image: myharbor.com/bigdata/flink-centos-admin:1.14.6-scala_2.12
        env:
        args: ["taskmanager"]
        ports:
        - containerPort: 6122
          name: rpc
        - containerPort: 6125
          name: query-state
        livenessProbe:
          tcpSocket:
            port: 6122
          initialDelaySeconds: 30
          periodSeconds: 60
        volumeMounts:
        - name: flink-config-volume
          mountPath: /opt/apache/flink-1.14.6/conf
        - name: job-artifacts-volume
          mountPath: /opt/apache/flink-1.14.6/usrlib
        securityContext:
          runAsUser: 9999  # refers to user _flink_ from official flink image, change if necessary
      volumes:
      - name: flink-config-volume
        configMap:
          name: flink-config
          items:
          - key: flink-conf.yaml
            path: flink-conf.yaml
          - key: log4j-console.properties
            path: log4j-console.properties
      - name: job-artifacts-volume
        hostPath:
          path: /mnt/nfsdata/flink/application/job-artifacts
  • 创建 flink 集群并提交任务:
kubectl create ns flink
# Configuration and service definition
kubectl create -f flink-configuration-configmap.yaml -n flink

# service
kubectl create -f jobmanager-service.yaml -n flink
kubectl create -f jobmanager-rest-service.yaml -n flink
kubectl create -f taskmanager-query-state-service.yaml -n flink

# Create the deployments for the cluster
kubectl create -f  jobmanager-application-non-ha.yaml -n flink
kubectl create -f  taskmanager-job-deployment.yaml -n flink
  • 查看:
kubectl get pods,svc -n flink

在这里插入图片描述

  • 删除 flink 集群:
kubectl delete -f flink-configuration-configmap.yaml -n flink
kubectl delete -f jobmanager-service.yaml -n flink
kubectl delete -f jobmanager-rest-service.yaml -n flink
kubectl delete -f taskmanager-query-state-service.yaml -n flink
kubectl delete -f jobmanager-application-non-ha.yaml -n flink
kubectl delete -f taskmanager-job-deployment.yaml -n flink

kubectl delete ns flink --force
  • 查看:
kubectl get pods,svc -n flink
kubectl exec -it flink-taskmanager-54cb7fc57c-g484q -n flink -- bash

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/707331.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

linux-2.6.22.6内核网卡驱动框架分析

网络协议分为很多层&#xff0c;而驱动这层对应于实际的物理网卡部分&#xff0c;这也是最底层的部分&#xff0c;以cs89x0.c这个驱动程序为例来分析下网卡驱动程序框架。 正常开发一个驱动程序时&#xff0c;一般都遵循以下几个步骤&#xff1a; 1.分配某个结构体 2.设置该结…

IDEA将java项目打包为jar包

方法 首先在src -> resources目录下建立一个文件夹&#xff0c;然后再在新建文件夹里面建立META-INF文件夹&#xff08;不推荐直接建立META-INF&#xff0c;否则后面打包完的jar包需要手动修改配置&#xff09; 然后点击File -> Project Structure -> Artifacts -&g…

第三章:Faster R-CNN网络详解(《Faster R-CNN: 基于区域提议网络的实时目标检测》)

(目标检测篇&#xff09;系列文章目录 第一章:R-CNN网络详解 第二章:Fast R-CNN网络详解 第三章:Faster R-CNN网络详解 第四章:YOLO v1网络详解 第五章:YOLO v2网络详解 第六章:YOLO v3网络详解 文章目录 系列文章目录技术干货集锦前言一、摘要二、正文分析 1.引入库2.读…

Mysql的逻辑架构_读写锁_事物

概览 一. MySql的逻辑架构1. 逻辑架构图2. 连接管理与安全性 二. 并发控制1. 读写锁2. 锁粒度 三. 事务1. 特性2. 隔离级别3. 死锁4. 事物日志&#xff1f;5.MySql中的事物 mysql最与众不同的特性&#xff1a;存储引擎架构 架构的设计&#xff1a; 将查询处理(Query Processin…

7、注解与自定义注解

1 注解 注解很厉害&#xff0c;它可以增强我们的java代码&#xff0c;同时利用反射技术可以扩充实现很多功能。它们被广泛应用于三大框架底层。 传统我们通过xml文本文件声明方式(如下图,但是XML比较繁琐且不易检查)&#xff0c;而现在最主流的开发都是基于注解方式&#xff0c…

房贷计算器——新增选择还款方式

房贷计算器——新增选择还款方式 #!/usr/bin/env python # coding: utf-8# In[4]: 文字‘房贷计算器’ 文字‘贷款总金额’&#xff1a;输入框 文字‘贷款期限’&#xff1a;输入框 文字‘年利率’&#xff1a;输入框 按钮‘开始计算’ 返回&#xff1a; 月供 总利息 from tki…

【Framework】bindService启动流程

前言 在【Service启动流程之startService】 中&#xff0c;我们已经分析了startService的流程&#xff0c;这篇就继续讲bindService的流程&#xff0c;他们两有很多相似之处。同样&#xff0c;流程图在总结处。 我们在调用bindService方法时候&#xff0c;实际调用的是Contex…

台庆|三联开关怎么接线?

三联开关是一种常见的开关类型&#xff0c;通常用于控制一个电路中的三个不同的电器或灯具。它的用途非常广泛&#xff0c;因此了解如何正确接线是非常重要的。在本文中&#xff0c;我们将详细讨论三联开关的接线方法。 我们先来看看三联开关实物图与线路图&#xff1a; 接下来…

【音视频处理】FFmpeg详解,命令行、源码、编译安装

大家好&#xff0c;欢迎来到停止重构的频道。 本期我们讨论FFmpeg。 这里先提一个问题&#xff0c;FFmpeg命令行功能如此强大&#xff0c;为什么还需要舍近求远地调用库函数呢 &#xff1f; 我们按这样的顺序讨论 &#xff1a; 1、 FFmpeg命令行说明 2、 FFmpeg代码结构…

如何在 JavaScript 中压缩字符串

在 JavaScript 中&#xff0c;可以有范围很广的压缩&#xff0c;比如 gzip 之类的文件压缩等等。 在这里&#xff0c;我们将讨论两种压缩字符串的方法。 最初&#xff0c;我们将重点介绍霍夫曼算法。 稍后&#xff0c;我们将介绍解决任务的 LZString 方法。 在 JavaScript 中使…

主成分分析系列(一)概览及数据为何要中心化

一、概览 主成分分析&#xff08;Principle Component Analysis&#xff0c;PCA&#xff09;算法属于数据降维算法里面的一种。数据降维算法的主要想法是从高维度数据中找到一种结构&#xff0c;这种结构蕴含了数据中的大部分信息&#xff0c;从而将高维数据降维到低维数据&am…

Apikit 自学日记:参数构造器

构造器是测试时系统提供的快速生成请求数据的工具。一般用于快速对数据进行加密和生成随机数值。可在请求参数中某个字段的右侧选择构造器操作&#xff0c;通过构造器生成该字段的参数值。构造器由两种类型的操作组成&#xff1a;设置初始数据和多重操作。 设置初始数据 其中初…

基于C语言的开源csv解析库:MiniCSV使用示例

文章目录 MiniCSV简介官方示例csv文件解析示例CodeBlocks工程下载 MiniCSV简介 之前写了一篇基于C语言字符串操作函数的csv文件解析&#xff1a;C语言解析csv格式文件&#xff0c;本文介绍一个开源简洁的csv解析库的使用&#xff1a;MiniCSV&#xff0c;使用标准C语言设计。 …

Spring Boot 中的 Redis 的数据操作配置和使用

Spring Boot 中的 Redis 的数据操作配置和使用 Redis 是一种高性能的 NoSQL 数据库&#xff0c;它支持多种数据结构&#xff0c;包括字符串、哈希、列表、集合和有序集合。Redis 还提供了丰富的命令&#xff0c;可以对数据进行快速的 CRUD 操作。Spring Boot 是一个基于 Sprin…

数据结构--栈的引用--前中后缀表达式(前部分)

数据结构–栈的引用–前中后缀表达式(前部分) 常见的算数表达式 由三个部分组成: 操作数、运算符、界限符 \color{red}操作数、运算符、界限符 操作数、运算符、界限符 ps:界限符是必不可少的,反映了计算的先后顺序 波兰表达式(让计算机更容易识别的算数表达式) Reverse Po…

高性能分布式缓存Redis(一) 快速实战

一、缓存发展史&缓存分类 1.1、大型网站中缓存的使用 访问量越大&#xff0c;响应力越差&#xff0c;用户体验越差。 引入缓存、示意图如下&#xff1a; 读写策略&#xff1a; Cache Aside Pattern&#xff08;旁路缓存模式&#xff09;Read/Write Through Pattern&am…

AA-TransUNet github: 用于预测任务的注意力增强的TransUNet

文章目录 来源AA_TransUNet架构数据集和预训练模型使用作者 来源 github地址 AA_TransUNet架构 数据集和预训练模型 如果你对本文中使用的数据集&#xff08;降水图和云量数据集&#xff09;感兴趣&#xff0c;请访问SmaAt-UNet了解更多细节。 对于预训练的AA_TransUNet模型…

从磁盘看 IO

计算机上的易失和非易失存储器 常见磁盘可以分为两类&#xff1a;机械磁盘和固态磁盘。 第一类&#xff0c;机械磁盘&#xff0c;也称为硬盘驱动器&#xff08;Hard Disk Driver&#xff09;&#xff0c;通常缩写为 HDD。机械磁 盘主要由盘片和读写磁头组成&#xff0c;数据就…

认识固态继电器及其工作原理

什么是固态继电器&#xff0c;有什么优缺点&#xff1f; 固态继电器 简称SSR&#xff0c;又被称之为“无触点开关”它利用电子元件&#xff08;如双向可控硅等半导体器件&#xff09;的开关特性&#xff0c;可到达无触点无火花地接通和断开电路。 固态继电器工作可靠&#…

1.3 Metasploit 生成SSL加密载荷

在本节中&#xff0c;我们将介绍如何通过使用Metasploit生成加密载荷&#xff0c;以隐藏网络特征。前一章节我们已经通过Metasploit生成了一段明文的ShellCode&#xff0c;但明文的网络传输存在安全隐患&#xff0c;因此本节将介绍如何通过生成SSL证书来加密ShellCode&#xff…