CDH:6.3.2
原来的Flink:1.12
要升级的Flink:1.17.1
操作系统:CentOS Linux 7
一、Flink1.17编译
build.sh文件:
#!/bin/bash
set -x
set -e
set -v
FLINK_URL=`sed '/^FLINK_URL=/!d;s/.*=//' flink-parcel.properties`
FLINK_VERSION=`sed '/^FLINK_VERSION=/!d;s/.*=//' flink-parcel.properties`
EXTENS_VERSION=`sed '/^EXTENS_VERSION=/!d;s/.*=//' flink-parcel.properties`
OS_VERSION=`sed '/^OS_VERSION=/!d;s/.*=//' flink-parcel.properties`
CDH_MIN_FULL=`sed '/^CDH_MIN_FULL=/!d;s/.*=//' flink-parcel.properties`
CDH_MIN=`sed '/^CDH_MIN=/!d;s/.*=//' flink-parcel.properties`
CDH_MAX_FULL=`sed '/^CDH_MAX_FULL=/!d;s/.*=//' flink-parcel.properties`
CDH_MAX=`sed '/^CDH_MAX=/!d;s/.*=//' flink-parcel.properties`
flink_service_name="FLINK"
flink_service_name_lower="$( echo $flink_service_name | tr '[:upper:]' '[:lower:]' )"
flink_archive="$( basename $FLINK_URL )"
flink_unzip_folder="${flink_service_name_lower}-${FLINK_VERSION}"
flink_folder_lower="$( basename $flink_archive .tgz )"
flink_parcel_folder="$( echo $flink_folder_lower | tr '[:lower:]' '[:upper:]')"
flink_parcel_name="$flink_parcel_folder-el${OS_VERSION}.parcel"
flink_built_folder="${flink_parcel_folder}_build"
flink_csd_build_folder="flink_csd_build"
function build_cm_ext { #Checkout if dir does not exist
if [ ! -d cm_ext ]; then
git clone https://github.com/cloudera/cm_ext.git
fi
if [ ! -f cm_ext/validator/target/validator.jar ]; then
cd cm_ext
#git checkout "$CM_EXT_BRANCH"
mvn install -Dmaven.test.skip=true
cd ..
fi
}
function get_flink {
if [ ! -f "$flink_archive" ]; then
wget $FLINK_URL
fi
#flink_md5="$( md5sum $flink_archive | cut -d' ' -f1 )"
#if [ "$flink_md5" != "$FLINK_MD5" ]; then
# echo ERROR: md5 of $flink_archive is not correct
#exit 1
#fi
if [ ! -d "$flink_unzip_foleder" ]; then
tar -xvf $flink_archive
fi
}
function build_flink_parcel {
if [ -f "$flink_built_folder/$flink_parcel_name" ] && [ -f "$flink_built_folder/manifest.json" ]; then
return
fi
if [ ! -d $flink_parcel_folder ]; then
get_flink
mkdir -p $flink_parcel_folder/lib
sleep 3
echo ${flink_unzip_folder}
mv ${flink_unzip_folder} ${flink_parcel_folder}/lib/${flink_service_name_lower}
fi
cp -r flink-parcel-src/meta $flink_parcel_folder/
chmod 755 flink-parcel-src/flink*
cp -r flink-parcel-src/flink-master.sh ${flink_parcel_folder}/lib/${flink_service_name_lower}/bin/
cp -r flink-parcel-src/flink-worker.sh ${flink_parcel_folder}/lib/${flink_service_name_lower}/bin/
cp -r flink-parcel-src/flink-yarn.sh ${flink_parcel_folder}/lib/${flink_service_name_lower}/bin/
sed -i -e "s/%flink_version%/$flink_parcel_folder/" ./$flink_parcel_folder/meta/flink_env.sh
sed -i -e "s/%VERSION%/$FLINK_VERSION/" ./$flink_parcel_folder/meta/parcel.json
sed -i -e "s/%EXTENS_VERSION%/$EXTENS_VERSION/" ./$flink_parcel_folder/meta/parcel.json
sed -i -e "s/%CDH_MAX_FULL%/$CDH_MAX_FULL/" ./$flink_parcel_folder/meta/parcel.json
sed -i -e "s/%CDH_MIN_FULL%/$CDH_MIN_FULL/" ./$flink_parcel_folder/meta/parcel.json
sed -i -e "s/%SERVICENAME%/$flink_service_name/" ./$flink_parcel_folder/meta/parcel.json
sed -i -e "s/%SERVICENAMELOWER%/$flink_service_name_lower/" ./$flink_parcel_folder/meta/parcel.json
java -jar cm_ext/validator/target/validator.jar -d ./$flink_parcel_folder
mkdir -p $flink_built_folder
tar zcvhf ./$flink_built_folder/$flink_parcel_name $flink_parcel_folder --owner=root --group=root
java -jar cm_ext/validator/target/validator.jar -f ./$flink_built_folder/$flink_parcel_name
python cm_ext/make_manifest/make_manifest.py ./$flink_built_folder
sha1sum ./$flink_built_folder/$flink_parcel_name |awk '{print $1}' > ./$flink_built_folder/${flink_parcel_name}.sha
}
function build_flink_csd_on_yarn {
JARNAME=${flink_service_name}_ON_YARN-${FLINK_VERSION}.jar
if [ -f "$JARNAME" ]; then
return
fi
rm -rf ${flink_csd_build_folder}
cp -rf ./flink-csd-on-yarn-src ${flink_csd_build_folder}
sed -i -e "s/%SERVICENAME%/$livy_service_name/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%SERVICENAMELOWER%/$flink_service_name_lower/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%VERSION%/$FLINK_VERSION/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%CDH_MIN%/$CDH_MIN/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%CDH_MAX%/$CDH_MAX/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%SERVICENAMELOWER%/$flink_service_name_lower/" ${flink_csd_build_folder}/scripts/control.sh
java -jar cm_ext/validator/target/validator.jar -s ${flink_csd_build_folder}/descriptor/service.sdl -l "SPARK_ON_YARN SPARK2_ON_YARN"
jar -cvf ./$JARNAME -C ${flink_csd_build_folder} .
}
function build_flink_csd_standalone {
JARNAME=${flink_service_name}-${FLINK_VERSION}.jar
if [ -f "$JARNAME" ]; then
return
fi
rm -rf ${flink_csd_build_folder}
cp -rf ./flink-csd-standalone-src ${flink_csd_build_folder}
sed -i -e "s/%VERSIONS%/$FLINK_VERSION/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%CDH_MIN%/$CDH_MIN/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%CDH_MAX%/$CDH_MAX/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%SERVICENAME%/$livy_service_name/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%SERVICENAMELOWER%/$flink_service_name_lower/" ${flink_csd_build_folder}/descriptor/service.sdl
sed -i -e "s/%SERVICENAMELOWER%/$flink_service_name_lower/" ${flink_csd_build_folder}/scripts/control.sh
java -jar cm_ext/validator/target/validator.jar -s ${flink_csd_build_folder}/descriptor/service.sdl -l "SPARK_ON_YARN SPARK2_ON_YARN"
jar -cvf ./$JARNAME -C ${flink_csd_build_folder} .
}
case $1 in
parcel)
build_cm_ext
build_flink_parcel
;;
csd_on_yarn)
build_flink_csd_on_yarn
;;
csd_standalone)
build_flink_csd_standalone
;;
*)
echo "Usage: $0 [parcel|csd_on_yarn|csd_standalone]"
;;
esac
flink-parcel.properties文件:
#FLINK 下载地址
FLINK_URL=https://archive.apache.org/dist/flink/flink-1.17.1/flink-1.17.1-bin-scala_2.12.tgz
#flink版本号
FLINK_VERSION=1.17.1
#扩展版本号
EXTENS_VERSION=BIN-SCALA_2.12
#操作系统版本,以centos为例
OS_VERSION=7
#CDH 小版本
CDH_MIN_FULL=5.5
CDH_MAX_FULL=6.3.3
#CDH大版本
CDH_MIN=5
CDH_MAX=6
还有LICENSE、README.md、flink-parcel-src、cm_ext、flink_csd_build、flink-csd-on-yarn-src 。git下载
1.1 进行编译
./build.sh parcel
#下载并打包完成后会在当前目录生成FLINK-1.17.1-BIN-SCALA_2.12_build文件
#构建flink-yarn csd包
./build.sh csd_on_yarn
#执行完成后会生成FLINK_ON_YARN-1.17.1.jar
1.2 .将FLINK-1.17.1-BIN-SCALA_2.12_build 下面的所有文件复制到/var/www/html/cloudera-repos/flink
1.3 将FLINK_ON_YARN-1.17.1.jar 复制到/var/www/html/cloudera-repos/flink
FlINK-1.12.1 属于旧的版本的东西,做备份用。
1.4 将FLINK-1.17.1-BIN-SCALA_2.12_build 所有的文件复制 到/opt/cloudera/parcel-repo
1.5 将FLINK_ON_YARN-1.17.1.jar 复制到/opt/cloudera/csd/
注意:1.4 和1.5 是所有的服务器节点都要这样做(防止后面哪台服务没启起来)。
二、CDH上配置
2.1 zookeeper 必须升级到3.5.5+ ,原来的zookeeper是3.4.5.
2.2 配置parcel
点击检查新parcel,会发现FLNIK-1.17.1---->停用FLNIK-1.12.1—>分配FLINLK-1.17.1 ---->激活
2.3 将这两项配置security.kerberos.login.keytab、security.kerberos.login.principal设置为空
2.4 将flink-shaded-hadoop-3-uber-3.1.1.7.2.9.0-173-9.0.jar 复制到/opt/cloudera/parcels/FLINK/lib/flink/lib/下
2.5 将commons-cli-1.4.jar 复制到/opt/cloudera/parcels/FLINK/lib/flink/lib/下
[root@cdh01 bin]# cp /opt/cloudera/parcels/CDH-6.3.2-1.cdh6.3.2.p0.1605554/jars/commons-cli-1.4.jar /opt/cloudera/parcels/FLINK-1.17.1-BIN-SCALA_2.12/lib/flink/lib/
2.6 检验一下是否可以启动起来
cd 到/opt/cloudera/parcels/FLINK-1.17.1-BIN-SCALA_2.12/lib/flink/bin目录下,执行./yarn-session.sh -jm 1024 -tm 1024 -s 2 -d命令
2.7 测试yarn-cluster模式
[root@cdh02 flink]# bin/flink run -m yarn-cluster ./examples/batch/WordCount.jar
[root@cdh02 flink]# bin/yarn-session.sh -m yarn-cluster
[root@cdh01 bin]# ./yarn-session.sh -jm 1024 -tm 1024 -s 2 -d
Setting HADOOP_CONF_DIR=/etc/hadoop/conf because no HADOOP_CONF_DIR or HADOOP_CLASSPATH was set.
Setting HBASE_CONF_DIR=/etc/hbase/conf because no HBASE_CONF_DIR was set.
2023-09-26 19:19:38,911 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.memory.process.size, 1728m
2023-09-26 19:19:38,914 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: jobmanager.bind-host, localhost
2023-09-26 19:19:38,915 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.bind-host, localhost
2023-09-26 19:19:38,915 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: taskmanager.host, localhost
2023-09-26 19:19:38,915 INFO org.apache.flink.configuration.GlobalConfiguration [] - Loading configuration property: parallelism.default, 1
2023-09-26 19:19:38,915 INFO org.apache.flink.configuration.GlobalConfiguration
.........
出现这样是已经启动起来了。
2.7 先关掉,再起CDH上启动
[root@cdh01 bin]# yarn application -kill application_1695293473417_4380
WARNING: YARN_OPTS has been replaced by HADOOP_OPTS. Using value of YARN_OPTS.
Killing application application_1695293473417_4380
23/09/26 19:23:32 INFO impl.YarnClientImpl: Killed application application_1695293473417_4380
[root@cdh01 bin]#
三、其他问题解决
按照上面的这些操作,在CDH界面启动Flink-yarn,从结果来看,在CDH主页,Flink是起来了的。
只是点击进去的时候,发现有2个是出于良好的状态,其他的5台服务器是属于关闭状态。
点击进去,这5台服务器的状态又显示已启动。不知道问题在哪里,还得继续排查。