【实时数仓】CDC简介、实现DWD层业务数据的处理(主要任务、接收kafka数据、动态分流*****)

news2025/4/3 14:44:07

文章目录

  • 一 CDC简介
    • 1 什么是CDC
    • 2 CDC的种类
    • 3 Flink-CDC
  • 二 准备业务数据-DWD层
    • 1 主要任务
      • (1)接收Kafka数据,过滤空值数据
      • (2)实现动态分流功能
      • (3)把分好的流保存到对应表、主题中
    • 2 接收Kafka数据,过滤空值数据
      • (1)代码
      • (2)测试
    • 3 根据MySQL的配置表,进行动态分流
      • (1)准备工作
        • a 引入pom.xml 依赖
        • b 在Mysql中创建数据库
        • c 在gmall2022_realtime库中创建配置表table_process
        • d 创建配置表实体类
        • e 在MySQL Binlog添加对配置数据库的监听,并**重启**MySQL
      • (2)FlinkCDC的使用 -- DataStream
        • a 导入依赖
        • b 代码编写
        • c 测试
        • d 端点续传案例测试
      • (3)FlinkCDC的使用 -- FlinkSQL
        • a 导入依赖
        • b 基础信息配置
        • c 代码编写

一 CDC简介

1 什么是CDC

CDC是Change Data Capture(变更数据获取)的简称。核心思想是,监测并捕获数据库的变动(包括数据或数据表的插入、更新以及删除等),将这些变更按发生的顺序完整记录下来,写入到消息中间件中以供其他服务进行订阅及消费。

2 CDC的种类

CDC主要分为基于查询和基于Binlog两种方式,主要了解一下这两种之间的区别:

基于查询的CDC基于Binlog的CDC
开源产品Sqoop、Kafka JDBC SourceCanal、Maxwell、Debezium
执行模式BatchStreaming
是否可以捕获所有数据变化
延迟性高延迟低延迟
是否增加数据库压力

3 Flink-CDC

Flink社区开发了 flink-cdc-connectors 组件,这是一个可以直接从 MySQL、PostgreSQL 等数据库直接读取全量数据和增量变更数据的 source 组件。目前也已开源,参考网址。

二 准备业务数据-DWD层

业务数据的变化,可以通过Maxwell采集到,但是MaxWell是把全部数据统一写入一个Topic中, 这些数据包括业务数据,也包含维度数据,这样显然不利于日后的数据处理,所以这个功能是从Kafka的业务数据ODS层读取数据,经过处理后,将维度数据保存到Hbase,将事实数据写回Kafka作为业务数据的DWD层。

1 主要任务

(1)接收Kafka数据,过滤空值数据

对Maxwell抓取数据进行ETL,有用的部分保留,没用的过滤掉。

(2)实现动态分流功能

由于MaxWell是把全部数据统一写入一个Topic中,这样显然不利于日后的数据处理。所以需要把各个表拆开处理。但是由于每个表有不同的特点,有些表是维度表,有些表是事实表,有的表在某种情况下既是事实表也是维度表。

在实时计算中一般把维度数据写入存储容器,一般是方便通过主键查询的数据库比如HBase,Redis,MySQL等。一般把事实数据写入流中,进行进一步处理,最终形成宽表。但是作为Flink实时计算任务,如何得知哪些表是维度表,哪些是事实表呢?而这些表又应该采集哪些字段呢?

可以将上面的内容放到某一个地方,集中配置。这样的配置不适合写在配置文件中,因为业务端随着需求变化每增加一张表,就要修改配置重启计算程序。所以这里需要一种动态配置方案,把这种配置长期保存起来,一旦配置有变化,实时计算可以自动感知。

这种可以有两个方案实现

  • 一种是用Zookeeper存储,通过Watch感知数据变化。
  • 另一种是用mysql数据库存储,周期性的同步,使用FlinkCDC读取。

这里选择第二种方案,主要是mysql对于配置数据初始化和维护管理,用sql都比较方便。

所以就有了如下图:

在这里插入图片描述

配置表字段说明:

  • sourceTable:原表名。
  • sinkType:输出的类型。
  • sinkTable:写出到哪个表。
  • sinkpk:主键。
  • sinkcolum:保留哪些字段。
  • ext:建表语句的扩展,如引擎,主键增长方式,编码方式等。
  • operateType:操作类型,不记录数据的删除操作。

(3)把分好的流保存到对应表、主题中

业务数据保存到Kafka的主题中。

维度数据保存到Hbase的表中。

2 接收Kafka数据,过滤空值数据

整体工作流程:

在这里插入图片描述

(1)代码

public class BaseDBApp {
    public static void main(String[] args) throws Exception {
        //TODO 1 基本环境准备
        //流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        // 设置并行度
        env.setParallelism(1);

        //TODO 2 检查点设置
        //开启检查点
        env.enableCheckpointing(5000L,CheckpointingMode.EXACTLY_ONCE);
        // 设置检查点超时时间
        env.getCheckpointConfig().setCheckpointTimeout(60000L);
        // 设置重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(3,3000L));
        // 设置job取消后,检查点是否保留
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置状态后端 -- 基于内存 or 文件系统 or RocksDB
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/gmall"));
        // 指定操作HDFS的用户
        System.setProperty("HADOOP_USER_NAME","hzy");

        //TODO 3 从kafka中读取数据
        //声明消费的主题以及消费者组
        String topic = "ods_base_db_m";
        String groupId = "base_db_app_group";
        // 获取消费者对象
        FlinkKafkaConsumer<String> kafkaSource = MyKafkaUtil.getKafkaSource(topic, groupId);
        // 读取数据,封装成流
        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);

        //TODO 4 对数据类型进行转换 String -> JSONObject
        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(JSON::parseObject);

        //TODO 5 简单的ETL
        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter(
                new FilterFunction<JSONObject>() {
                    @Override
                    public boolean filter(JSONObject jsonobj) throws Exception {
                        boolean flag =
                                jsonobj.getString("table") != null &&
                                        jsonobj.getString("table").length() > 0 &&
                                        jsonobj.getJSONObject("data") != null &&
                                        jsonobj.getString("data").length() > 3;
                        return flag;
                    }
                }
        );
        filterDS.print("<<<");

        //TODO 6 动态分流

        //TODO 7 将维度侧输出流的数据写到Hbase中

        //TODO 8 将主流数据写回kafka的dwd层

        env.execute();
    }
}

(2)测试

业务数据总体流程如下:

在这里插入图片描述

开启zookeeper
开启kafka
开启maxwell
开启nm,等待安全模式关闭
开启主程序
模拟生成业务数据,查看主程序输出内容

3 根据MySQL的配置表,进行动态分流

通过FlinkCDC动态监控配置表的变化,以流的形式将配置表的变化读到程序中,并以广播流的形式向下传递,主流从广播流中获取配置信息。

(1)准备工作

a 引入pom.xml 依赖

<!--lomback插件依赖-->
<dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    <scope>provided</scope>
</dependency>
<dependency>
    <groupId>mysql</groupId>
    <artifactId>mysql-connector-java</artifactId>
    <version>5.1.47</version>
</dependency>
<dependency>
    <groupId>com.alibaba.ververica</groupId>
    <artifactId>flink-connector-mysql-cdc</artifactId>
    <version>1.2.0</version>
</dependency>

b 在Mysql中创建数据库

注意和gmall2022业务库区分开

在这里插入图片描述

c 在gmall2022_realtime库中创建配置表table_process

CREATE TABLE `table_process` (
  `source_table` varchar(200) NOT NULL COMMENT '来源表',
  `operate_type` varchar(200) NOT NULL COMMENT '操作类型 insert,update,delete',
   `sink_type` varchar(200) DEFAULT NULL COMMENT '输出类型 hbase kafka',
  `sink_table` varchar(200) DEFAULT NULL COMMENT '输出表(主题)',
  `sink_columns` varchar(2000) DEFAULT NULL COMMENT '输出字段',
  `sink_pk` varchar(200) DEFAULT NULL COMMENT '主键字段',
  `sink_extend` varchar(200) DEFAULT NULL COMMENT '建表扩展',
  PRIMARY KEY (`source_table`,`operate_type`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8

d 创建配置表实体类

@Data
public class TableProcess {
    //动态分流Sink常量,改为小写和脚本一致
    public static final String SINK_TYPE_HBASE = "hbase";
    public static final String SINK_TYPE_KAFKA = "kafka";
    public static final String SINK_TYPE_CK = "clickhouse";
    //来源表
    String sourceTable;
    //操作类型 insert,update,delete
    String operateType;
    //输出类型 hbase kafka
    String sinkType;
    //输出表(主题)
    String sinkTable;
    //输出字段
    String sinkColumns;
    //主键字段
    String sinkPk;
    //建表扩展
    String sinkExtend;
}

e 在MySQL Binlog添加对配置数据库的监听,并重启MySQL

sudo vim /etc/my.cnf
# 添加
binlog-do-db=gmall2022_realtime
# 重启
sudo systemctl restart mysqld

(2)FlinkCDC的使用 – DataStream

新建maven项目gmall2022-cdc。

a 导入依赖

<dependencies>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-java</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-clients_2.12</artifactId>
        <version>1.12.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.1.3</version>
    </dependency>

    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>5.1.48</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba.ververica</groupId>
        <artifactId>flink-connector-mysql-cdc</artifactId>
        <version>1.2.0</version>
    </dependency>

    <dependency>
        <groupId>com.alibaba</groupId>
        <artifactId>fastjson</artifactId>
        <version>1.2.75</version>
    </dependency>
</dependencies>
<build>
    <plugins>
        <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-assembly-plugin</artifactId>
            <version>3.0.0</version>
            <configuration>
                <descriptorRefs>
                    <descriptorRef>jar-with-dependencies</descriptorRef>
                </descriptorRefs>
            </configuration>
            <executions>
                <execution>
                    <id>make-assembly</id>
                    <phase>package</phase>
                    <goals>
                        <goal>single</goal>
                    </goals>
                </execution>
            </executions>
        </plugin>
    </plugins>
</build>

b 代码编写

/**
 * 通过FlinkCDC动态读取MySQL表中的数据 -- DataStreamAPI
 */
public class FlinkCDC01_DS {
    public static void main(String[] args) throws Exception {
        //TODO 1 准备流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        //TODO 2 开启检查点   Flink-CDC将读取binlog的位置信息以状态的方式保存在CK,如果想要做到断点续传,
        // 需要从Checkpoint或者Savepoint启动程序
        // 开启Checkpoint,每隔5秒钟做一次CK,并指定CK的一致性语义
        env.enableCheckpointing(5000L, CheckpointingMode.EXACTLY_ONCE);
        // 设置超时时间为1分钟
        env.getCheckpointConfig().setCheckpointTimeout(60000);
        // 指定从CK自动重启策略
        env.setRestartStrategy(RestartStrategies.fixedDelayRestart(2,2000L));
        // 设置任务关闭的时候保留最后一次CK数据
        env.getCheckpointConfig().enableExternalizedCheckpoints(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
        // 设置状态后端
        env.setStateBackend(new FsStateBackend("hdfs://hadoop101:8020/ck/flinkCDC"));
        // 设置访问HDFS的用户名
        System.setProperty("HADOOP_USER_NAME", "hzy");

        //TODO 3 创建Flink-MySQL-CDC的Source
        Properties props = new Properties();
        props.setProperty("scan.startup.mode","initial");
        SourceFunction<String> sourceFunction = MySQLSource.<String>builder()
                .hostname("hadoop101")
                .port(3306)
                .username("root")
                .password("123456")
                // 可配置多个库
                .databaseList("gmall2022_realtime")
                ///可选配置项,如果不指定该参数,则会读取上一个配置中指定的数据库下的所有表的数据
                //注意:指定的时候需要使用"db.table"的方式
                .tableList("gmall2022_realtime.t_user")
                .debeziumProperties(props)
                .deserializer(new StringDebeziumDeserializationSchema())
                .build();

        //TODO 4 使用CDC Source从MySQL读取数据
        DataStreamSource<String> mysqlDS = env.addSource(sourceFunction);

        //TODO 5 打印输出
        mysqlDS.print();

        //TODO 6 执行任务
        env.execute();
    }
}

c 测试

在gmall2022_realtime添加表,执行程序,添加数据,可以看到以下信息

在这里插入图片描述

d 端点续传案例测试

# 打包并将带依赖的jar包上传至Linux
# 启动HDFS集群
start-dfs.sh
# 启动Flink集群
bin/start-cluster.sh
# 启动程序
bin/flink run -m hadoop101:8081 -c com.hzy.gmall.cdc.FlinkCDC01_DS ./gmall2022-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
# 观察taskManager日志,会从头读取表数据
# 给当前的Flink程序创建Savepoint 
bin/flink savepoint JobId hdfs://hadoop101:8020/flink/save
# 在WebUI中cancelJob
# 在MySQL的gmall2022_realtime.t_user表中添加、修改或者删除数据
# 从Savepoint重启程序
bin/flink run -s hdfs://hadoop101:8020/flink/save/JobId -c com.hzy.gmall.cdc.FlinkCDC01_DS ./gmall2022-cdc-1.0-SNAPSHOT-jar-with-dependencies.jar
# 观察taskManager日志,会从检查点读取表数据

(3)FlinkCDC的使用 – FlinkSQL

使用FlinkCDC通过sql的方式从MySQL中获取数据。

a 导入依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-table-planner-blink_2.12</artifactId>
    <version>1.12.0</version>
</dependency>

b 基础信息配置

修改语言级别

在这里插入图片描述

修改编译级别

在这里插入图片描述

c 代码编写

public class FlinkCDC02_SQL {
    public static void main(String[] args) throws Exception {
        //TODO 1.准备环境
        //1.1流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //1.2 表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

        //TODO 2.创建动态表
        tableEnv.executeSql("CREATE TABLE user_info (" +
                "  id INT," +
                "  name STRING," +
                "  age INT" +
                ") WITH (" +
                "  'connector' = 'mysql-cdc'," +
                "  'hostname' = 'hadoop101'," +
                "  'port' = '3306'," +
                "  'username' = 'root'," +
                "  'password' = '123456'," +
                "  'database-name' = 'gmall2022_realtime'," +
                "  'table-name' = 't_user'" +
                ")");

        tableEnv.executeSql("select * from user_info").print();

        //TODO 6.执行任务
        env.execute();
    }

}

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/87647.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

VCL界面组件DevExpress VCL v22.1 - 支持新的格式标签和矢量图标

DevExpress VCL是Devexpress公司旗下最老牌的用户界面套包&#xff0c;所包含的控件有&#xff1a;数据录入、图表、数据分析、导航、布局等。该控件能帮助您创建优异的用户体验&#xff0c;提供高影响力的业务解决方案&#xff0c;并利用您现有的VCL技能为未来构建下一代应用程…

Altium Designer 22 安装过程

Altium Designer 22 安装过程1、开始安装 Altium Designer&#xff0c;点击 Next 2、选择需要安装的语言&#xff0c;勾选上接受协议&#xff0c;然后点击 Next 3、默认即可&#xff0c;其中 Touch Sensor Support 主要是用于触摸屏&#xff0c;然后点击 Next 4、接着&#xff…

C#语言实例源码系列-实现自己的进程管理器

专栏分享点击跳转>Unity3D特效百例点击跳转>案例项目实战源码点击跳转>游戏脚本-辅助自动化点击跳转>Android控件全解手册 &#x1f449;关于作者 众所周知&#xff0c;人生是一个漫长的流程&#xff0c;不断克服困难&#xff0c;不断反思前进的过程。在这个过程中…

CENTO OS上的网络安全工具(十七)搭建Cascade的Docker开发环境

这一篇&#xff0c;我们继续在Docker上折腾。之前我们已经展示了如何在容器上搭建安全产品的部署环境&#xff0c;这里我们需要更进一步&#xff0c;讨论如何在容器上搭建开发与调试环境。这是学习安全产品并且自己构建安全产品的基础步骤。 一、 构建开发镜像 鉴于前期我们…

如何使用 AWS 和 ChatGPT 创建最智能的多语言虚拟助手

上周ChatGPT发布了&#xff0c;每个人都在尝试令人惊奇的事情。我也开始使用它并想尝试它如何使用AWS的AI 服务进行集成&#xff0c;结果非常棒&#xff01; 在这篇文章中&#xff0c;我将逐步解释我是如何创建这个项目的&#xff0c;这样你也可以做到&#xff01; 最重要的是…

Tuxera NTFS2023免费版Mac电脑系统读写软件

使用 Mac 的巨大痛点之一&#xff1a;移动硬盘只能打开文件&#xff0c;但是无法写入新的资料ntfs。有人说格式化硬盘&#xff0c;改成苹果的 macOS扩展格式&#xff0c;但是原先硬盘的数据要转移&#xff0c;而且拿到 Windows 系统里无法被识别。 有人说格式化硬盘&#xff0…

C++ Reference: Standard C++ Library reference: Containers: list: list: emplace

C官网参考链接&#xff1a;https://cplusplus.com/reference/list/list/emplace/ 公有成员 <list> std::list::emplace template <class... Args> iterator emplace (const_iterator position, Args&&... args);构造并插入元素 通过在position上插入一个…

SpringMVC+SSM整合(完整版)

文章目录一、SpringMVC&#xff08;一&#xff09;SpringMVC简介1、什么是MVC2、什么是SpringMVC3、SpringMVC的特点4、MVC的工作流程&#xff08;二&#xff09;入门案例1、创建maven工程①引入依赖②配置web.xml③扩展配置方式2、总结&#xff08;三&#xff09;RequestMappi…

大学生个人网页模板 简单网页制作作业成品 极简风格个人介绍HTML网页设计代码下载

&#x1f389;精彩专栏推荐&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb;&#x1f447;&#x1f3fb; ✍️ 作者简介: 一个热爱把逻辑思维转变为代码的技术博主 &#x1f482; 作者主页: 【主页——&#x1f680;获取更多优质源码】 &#x1f393; web前端期末大作业…

TZ-PEG-N3 四嗪聚乙二醇叠氮 Tetrazine-PEG-azide

四嗪可用于许多生物成像和生物共轭应用的生物正交反应。目前被广泛应用于蛋白质特定位点功能阐释、亚细胞结构选择性标记、药物靶向传递、活体动物分子影像和生物兼容性材料的制备等。 产品名称 Tetrazine-PEG-N3 四嗪聚乙二醇叠氮 中文名称 四嗪聚乙二醇叠氮 英文名称 …

[附源码]Nodejs计算机毕业设计基于Web的摄影爱好者交流社区Express(程序+LW)

该项目含有源码、文档、程序、数据库、配套开发软件、软件安装教程。欢迎交流 项目运行 环境配置&#xff1a; Node.js Vscode Mysql5.7 HBuilderXNavicat11VueExpress。 项目技术&#xff1a; Express框架 Node.js Vue 等等组成&#xff0c;B/S模式 Vscode管理前后端分…

java计算机毕业设计ssm在线学习交流平台97t28(附源码、数据库)

java计算机毕业设计ssm在线学习交流平台97t28&#xff08;附源码、数据库&#xff09; 项目运行 环境配置&#xff1a; Jdk1.8 Tomcat8.5 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff0…

[附源码]Python计算机毕业设计SSM基于web的教学资源管理系统(程序+LW)

项目运行 环境配置&#xff1a; Jdk1.8 Tomcat7.0 Mysql HBuilderX&#xff08;Webstorm也行&#xff09; Eclispe&#xff08;IntelliJ IDEA,Eclispe,MyEclispe,Sts都支持&#xff09;。 项目技术&#xff1a; SSM mybatis Maven Vue 等等组成&#xff0c;B/S模式 M…

sql错误分析--SQLSyntaxErrorException-

### Error updating database-----指数据库database update错误. Cause: java.sql.SQLSyntaxErrorException--sql语法错误: You have an error in your SQL syntax; check the manual that corresponds to your MySQL server version for the right syntax to use near > 2…

微信公众号开发——向指定用户发送模板消息

&#x1f60a; 作者&#xff1a; 一恍过去&#x1f496; 主页&#xff1a; https://blog.csdn.net/zhuocailing3390&#x1f38a; 社区&#xff1a; Java技术栈交流&#x1f389; 主题&#xff1a; 微信公众号开发——向指定用户发送模板消息⏱️ 创作时间&#xff1a; 20…

ArrayList源码解析与相关知识点

ArrayList源码解析于相关知识点&#xff08;超级详细&#xff09; 文章目录ArrayList源码解析于相关知识点&#xff08;超级详细&#xff09;ArrayList的继承关系Serializable标记接口Cloneable标记接口RandomAccess标记接口AbstractList类属性构造函数无参构造函数指定初始容量…

网络工程毕业设计 SSM疫情期间医院门诊管理系统(源码+论文)

文章目录1 项目简介2 实现效果2.1 界面展示3 设计方案3.1 概述3.2 系统开发流程3.3 系统结构设计4 项目获取1 项目简介 Hi&#xff0c;各位同学好呀&#xff0c;这里是M学姐&#xff01; 今天向大家分享一个今年(2022)最新完成的毕业设计项目作品&#xff0c;【基于SSM的疫情…

测试人生 | 转行测试开发,4年4“跳”年薪涨3倍,我的目标是星辰大海(附大厂面经)!

image1080500 66.1 KB 编者按&#xff1a;本文来自霍格沃兹测试学院优秀学员 TesterC&#xff0c;**从运营岗位转行外包测试&#xff0c;再到测试开发&#xff0c;从待业在家到4年4“跳”进入 BAT 大厂&#xff0c;年薪涨了3倍&#xff01;**他是如何完成如此励志的华丽转身的…

12.4、后渗透测试--内网主机数据包流量嗅探

攻击主机&#xff1a; Kali 192.168.11.106靶机&#xff1a;windows server 2008 r2 192.168.11.134Metasploitable2-Linux&#xff1a; 192.168.11.105当成功获取目标机器的会话后&#xff0c;可以使用嗅探手段获取更多信息。前提&#xff1a;获得 meterpreter shell1、加载s…

centos7 安装 zsh + fzf(历史命令搜索神器)

文章目录zsh 安装用 oh-my-zsh 配置 zshfzf 安装结语zsh 安装 参考 用 yum 自动下载安装 zsh yum install -y zsh 安装完成后查看系统可以用的 shell cat /etc/shells 将 zsh 设置为系统默认 shell chsh -s /bin/zsh 退出终端重新登录 查看当前使用的shell echo $0 用 oh-my-z…