如何打造一个流式数据湖

news2024/12/23 4:16:32

Flink将数据写入到 hudi

准备阶段

启动hadoop集群(单机模式)

./sbin/start-all.sh

hdfs离开安全模式

hdfs dfsadmin -safemode leave

启动hive

后台启动元数据

./hive --service metastore &

启动hiveserver2

./hiveserver2 &

执行sql语句之前先设置本地模式,要不然很慢

set hive.exec.mode.local.auto=true;

启动Flink集群(单机模式)

指向hadoop的路径

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

启动集群

./bin/start-cluster.sh

指向hadoop的路径

export HADOOP_CLASSPATH=`$HADOOP_HOME/bin/hadoop classpath`

启动flink sql client

./bin/sql-client.sh embedded

三种模式

表格模式(table mode)在内存中实体化结果,并将结果用规则的分页表格可视化展示出来。执行如下命令启用:

SET sql-client.execution.result-mode=table;

变更日志模式(changelog mode)不会实体化和可视化结果,而是由插入(+)和撤销(-)组成的持续查询产生结果流。

SET sql-client.execution.result-mode=changelog;

Tableau模式(tableau mode)更接近传统的数据库,会将执行的结果以制表的形式直接打在屏幕之上。具体显示的内容会取决于作业 执行模式的不同(execution.type):

SET sql-client.execution.result-mode=tableau;

设置flink sql属性

set  sql-client.execution.result-mode=tableau;

set execution.checkpointing.interval=3sec;

Flink中创建hudi表

创建表

CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://ip:port/tmp/t1',
  'table.type' = 'MERGE_ON_READ' -- this creates a MERGE_ON_READ table, by default is COPY_ON_WRITE
);

插入数据

INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');

这里遇到了问题,提交任务到web ui后一直处在create状态,后来通过查看日志,原来是slot数不够,重新设置并行度为2,taskmanager数位4,taskmanager.numberOfTaskSlots为4,相当于总共16个slot
在这里插入图片描述
执行成功
在这里插入图片描述

Flink CDC(本地模式)

创建Mysql表

create table flink_cdc.users(
    id bigint auto_increment primary key,
    name varchar(20) null,
    birthday timestamp default CURRENT_TIMESTAMP NOT NULL,
    ts timestamp default CURRENT_TIMESTAMP NOT NULL
);

插入数据

INSERT INTO flink_cdc.users(name) VALUES(“测试”);
INSERT INTO flink_cdc.users(name) VALUES(“张三”);
INSERT INTO flink_cdc.users(name) VALUES(“李四”);
INSERT INTO flink_cdc.users(name) VALUES(“王五”);
INSERT INTO flink_cdc.users(name) VALUES(“赵六");

Flink创建输入表,关联mysql表

CREATE TABLE users_source_mysql(
    id BIGINT PRIMARY KEY NOT ENFORCED,
    name STRING,
    age INT,
    birthday TIMESTAMP(3),
    ts TIMESTAMP(3)
)WITH(
    'connector' = 'mysql-cdc-zaj',
    'hostname' = 'localhost',
    'port' = '3306',
    'username' = 'root',
    'password' = '123456',
    'server-time-zone'= 'Asia/Shanghai',
    'debezium.snapshot.mode' = 'initial',
    'database-name' = 'flink_cdc',
    'table-name' = 'users'
);

查询数据

select * from users_source_mysql;

创建视图,查询输入表,字段与输出表相同

创建视图,增加分区列part,方便后续同步hive分区表

创建视图view_users_cdc和user_soure_mysql进行关联

CREATE VIEW view_users_cdc AS
SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') AS part FROM users_source_mysql;

利用视图来进行查询

select * From view_users_cdc;

在这里插入图片描述

有意思的是,我随机修改了两条数据,cdc会自动把修改的数据放在最后,直接会显示出来

在这里插入图片描述

创建CDC Hudi Sink 表,并自动同步hive分区

CREATE TABLE test_sink_hudi_hive(
   id bigint ,
   name string,
   birthday TIMESTAMP(3),
   ts TIMESTAMP(3),
   part VARCHAR(20),
   primary key(id) not enforced
)
PARTITIONED BY (part)
with(
    'connector'='hudi',
    'path'= 'hdfs://ip:port/hudi-warehouse/test_sink_hudi_hive'
    , 'hoodie.datasource.write.recordkey.field'= 'id'-- 主键
    , 'write.precombine.field'= 'ts'-- 自动precombine的字段
    , 'write.tasks'= '1'
    , 'compaction.tasks'= '1'
    , 'write.rate.limit'= '2000'-- 限速
    , 'table.type'= 'MERGE_ON_READ'-- 默认COPY_ON_WRITE,可选MERGE_ON_READ
    , 'compaction.async.enabled'= 'true'-- 是否开启异步压缩
    , 'compaction.trigger.strategy'= 'num_commits'-- 按次数压缩
    , 'compaction.delta_commits'= '1'-- 默认为5
    , 'changelog.enabled'= 'true'-- 开启changelog变更
    , 'read.streaming.enabled'= 'true'-- 开启流读
    , 'read.streaming.check-interval'= '3'-- 检查间隔,默认60s
    , 'hive_sync.enable'= 'true'-- 开启自动同步hive
    , 'hive_sync.mode'= 'hms'-- 自动同步hive模式,默认jdbc模式, hms:hive metastore
    , 'hive_sync.metastore.uris'= 'thrift://ip:9083'-- hive metastore地址
    -- , 'hive_sync.jdbc_url'= 'jdbc:hive2://ip:10000'-- hiveServer地址
    , 'hive_sync.table'= 'test_sink_hudi_hive_sync'-- hive 新建表名
    , 'hive_sync.db'= 'default'-- hive 同步表存在默认数据库中,也可以自动新建数据库
    , 'hive_sync.username'= 'root'-- HMS 用户名
    , 'hive_sync.password'= '123'-- HMS 密码
    , 'hive_sync.support_timestamp'= 'true'-- 兼容hive timestamp类型
);

利用users_sink_hudi_hive来查询

这时候遇到个问题,直接通过users_sink_hudi_hive表来查询,web ui也会一直运行一个任务,来监视数据变化,可是通过测试插入

200条数据,明显感觉速度不如直接查询视图表,而且插入数据是乱序

在这里插入图片描述

视图数据写入hudi表

INSERT INTO users_sink_hudi_hive
SELECT id,name,birthday,ts,part FROM view_users_cdc;

可以看到web ui有一个新任务,一直在运行状态,对视图表进行监视

在这里插入图片描述

会自动生成hudi MOR模式的两张表(这里我没有生成,有点问题)原理是没有开启hiveserver2,这里还是有问题,我启动了hiveserver2还是无法自动创建ro表和rt表

后续:上述问题已解决,其实ro表rt表已经生成了,在hive的db_hudi数据库中,因为我建hudi表时自动创建了一个新的数据库,如果不指定则新的表在default数据库中。

在这里插入图片描述

查询ro表
在这里插入图片描述

users_sink_hudi_hive_ro:ro表全称read optimized table,对于MOR表同步的xxx_ro表,只暴露压缩后的parquet。其查询方式和COW表类似。设置完hiveInputformat之后和普通的hive表一样查询即可。

users_sink_hudi_rt:rt表示增量视图,主要针对增量查询的rt表;ro表只能查parquet文件数据,rt表parquet文件数据和log文件数据都可查。

查看hive表数据

set hive.exec.mode.local.auto = true;
set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
set hive.mapred.mode = nonstrict; 

promethues监控cdc状态下的Hudi表

 create table stu6_binlog_sink_hudi(
  id bigint not null,
  name string,
  `school` string,
  nickname string,
  age int not null,
  class_num int not null,
  phone bigint not null,
  email string,
  ip string,
  primary key (id) not enforced
)
 partitioned by (`school`)
 with (
  'connector' = 'hudi',
  'path' = 'hdfs://ip:9000/tmp/stu6_binlog_sink_hudi',
  'table.type' = 'MERGE_ON_READ',
  'write.option' = 'insert',
  'write.precombine.field' = 'school',
  'hoodie.metrics.on' = 'true',
  'hoodie.metrics.executor.enable' = 'true',
  'hoodie.metrics.reporter.type' = 'PROMETHEUS_PUSHGATEWAY',
  'hoodie.metrics.pushgateway.job.name' = 'hudi-metrics',
  'hoodie.metrics.pushgateway.host' = 'ip',
  'hoodie.metrics.pushgateway.report.period.seconds' = '10',
  'hoodie.metrics.pushgateway.delete.on.shutdown' = 'false',
  'hoodie.metrics.pushgateway.random.job.name.suffix' = 'false'
  );

OSS实现文件的上传、下载、删除、查询

前期准备

其阿里云申请开通OSS

在Sping Boot项目里引入依赖

<dependency>
    <groupId>com.aliyun.oss</groupId>
    <artifactId>aliyun-sdk-oss</artifactId>
    <!--<version>3.10.2</version>-->
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.httpcomponents/httpcore -->
<dependency>
    <groupId>org.apache.httpcomponents</groupId>
    <artifactId>httpcore</artifactId>
    <version>4.4.1</version>
</dependency>
package com.zaj.order.controller;

import com.aliyun.oss.ClientException;
import com.aliyun.oss.OSS;
import com.aliyun.oss.OSSClientBuilder;
import com.aliyun.oss.OSSException;
import com.aliyun.oss.model.OSSObject;
import com.aliyun.oss.model.OSSObjectSummary;
import com.aliyun.oss.model.ObjectListing;

import java.io.*;

public class Demo {

    /**
     * 上传
     * @param args
     * @throws Exception
     */
    OSSConfigure ossConfigure = new OSSConfigure();
    // Endpoint以华东1(杭州)为例,其它Region请按实际情况填写。
    String endpoint = ossConfigure.endpoint();
    // 阿里云账号AccessKey拥有所有API的访问权限,风险很高。强烈建议您创建并使用RAM用户进行API访问或日常运维,请登录RAM控制台创建RAM用户。
    String accessKeyId = ossConfigure.accessKeyId();
    String accessKeySecret = ossConfigure.accessKeySecret();
    // 填写Bucket名称,例如examplebucket。
    String bucketName = ossConfigure.bucketName();
    // 填写Object完整路径,例如exampledir/exampleobject.txt。Object完整路径中不能包含Bucket名称。
    String objectName = "my-first-key";


    OSS ossClient = new OSSClientBuilder().build(endpoint, accessKeyId, accessKeySecret);
    public static void main(String[] args) throws Exception {

        // 创建OSSClient实例。
        Demo demo = new Demo();
        //demo.download();
        //demo.Enum();
        demo.delete();

    }


    /**
     * 上传
     * @return
     */
    public String upload(){
        try {
            String content = "Hello OSS";
            ossClient.putObject(bucketName, objectName, new ByteArrayInputStream(content.getBytes()));
        } catch (OSSException oe) {
            System.out.println("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            System.out.println("Error Message:" + ce.getMessage());
        } finally {
            if (ossClient != null) {
                ossClient.shutdown();
            }
        }
        return "ok";
    }

    /**
     * 下载
     */
    public String download(){
        try {
            // 调用ossClient.getObject返回一个OSSObject实例,该实例包含文件内容及文件元信息。
            OSSObject ossObject = ossClient.getObject(bucketName, objectName);
            // 调用ossObject.getObjectContent获取文件输入流,可读取此输入流获取其内容。
            InputStream content = ossObject.getObjectContent();
            if (content != null) {
                BufferedReader reader = new BufferedReader(new InputStreamReader(content));
                while (true) {
                    String line = reader.readLine();
                    if (line == null) break;
                    System.out.println("\n" + line);
                }
                // 数据读取完成后,获取的流必须关闭,否则会造成连接泄漏,导致请求无连接可用,程序无法正常工作。
                content.close();
            }
        } catch (OSSException oe) {
            System.out.println("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            System.out.println("Error Message:" + ce.getMessage());
        } catch (IOException e) {
            e.printStackTrace();
        } finally {
            if (ossClient != null) {
                ossClient.shutdown();
            }
        }
        return "ok";
    }

    /**
     * 列举
     * @return
     */
    public String Enum(){
        try {
            // ossClient.listObjects返回ObjectListing实例,包含此次listObject请求的返回结果。
            ObjectListing objectListing = ossClient.listObjects(bucketName);
            // objectListing.getObjectSummaries获取所有文件的描述信息。
            for (OSSObjectSummary objectSummary : objectListing.getObjectSummaries()) {
                System.out.println(" - " + objectSummary.getKey() + "  " +
                        "(size = " + objectSummary.getSize() + ")");
            }
        } catch (OSSException oe) {
            System.out.println("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            System.out.println("Error Message:" + ce.getMessage());
        } finally {
            if (ossClient != null) {
                ossClient.shutdown();
            }
        }
        return "ok";
    }

    public String delete(){
        try {
            // 删除文件。
            ossClient.deleteObject(bucketName, objectName);
        } catch (OSSException oe) {
            System.out.println("Caught an OSSException, which means your request made it to OSS, "
                    + "but was rejected with an error response for some reason.");
            System.out.println("Error Message:" + oe.getErrorMessage());
            System.out.println("Error Code:" + oe.getErrorCode());
            System.out.println("Request ID:" + oe.getRequestId());
            System.out.println("Host ID:" + oe.getHostId());
        } catch (ClientException ce) {
            System.out.println("Caught an ClientException, which means the client encountered "
                    + "a serious internal problem while trying to communicate with OSS, "
                    + "such as not being able to access the network.");
            System.out.println("Error Message:" + ce.getMessage());
        } finally {
            if (ossClient != null) {
                ossClient.shutdown();
            }
        }
        return "ok";
    }


}

OSS-HDFS

OSS-HDFS服务(JindoFS服务)是一款云原生数据湖存储产品。基于统一的元数据管理能力,在完全兼容HDFS文件系统接口的同时,提供充分的POSIX能力支持,能更好地满足大数据和AI等领域的数据湖计算场景。

功能优势

通过OSS-HDFS服务,无需对现有的Hadoop、Spark大数据分析应用做任何修改。通过简单的配置即可像在原生HDFS中那样管理和访问数据,同时获得OSS无限容量、弹性扩展、更高的安全性、可靠性和可用性支撑。

作为云原生数据湖基础,OSS-HDFS在满足EB级数据分析、亿级文件管理服务、TB级吞吐量的同时,全面融合大数据存储生态,除提供对象存储扁平命名空间之外,还提供了分层命名空间服务。分层命名空间支持将对象组织到一个目录层次结构中进行管理,并能通过统一元数据管理能力进行内部自动转换。对Hadoop用户而言,无需做数据复制或转换就可以实现像访问本地HDFS一样高效的数据访问,极大提升整体作业性能,降低了维护成本。

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/124477.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

fpga实操训练(ip rom)

【 声明&#xff1a;版权所有&#xff0c;欢迎转载&#xff0c;请勿用于商业用途。 联系信箱&#xff1a;feixiaoxing 163.com】 altera的fpga本身自带了rom的ip&#xff0c;使用起来也十分方便。实际开发中&#xff0c;使用rom的场景也很多&#xff0c;比如一些默认的配置文件…

TensorFlow之回归模型-2

1 基本概念 回归模型 线性 线性模型 非线性模型 线性回归 逻辑回归 Log Loss&#xff08;损失函数&#xff09; 分类临界值 2 效率预测 回归问题是预测一个持续的值&#xff0c;主要是用于解决不确定性的问题&#xff0c;例如&#xff0c;一个商品在未来可能的价格或…

CMAKE_INSTALL_PREFIX

一、定义 CMAKE_INSTALL_PREFIX为cmake的内置变量&#xff0c;用于指定cmake执行install命令时&#xff0c;安装的路径前缀。Linux下的默认路径是/usr/local &#xff0c;Windows下默认路径是 C:/Program Files/${PROJECT_NAME} 二、用…

dcloud如何苹果ios系统真机测试-HBuilderX真机运行ios测试

dcloud如何运行到IOS真机测试 1&#xff0c;下载安装iTunes 安装完毕后重新打开HBuilderX 2&#xff0c;点击运行真机 将iPhone 与电脑进行链接&#xff0c;点信任&#xff0c; 运行-运行到手机或模拟器-运行到IOS APP 基座 安装过itunes就会有显示&#xff0c;但是这里还有…

进程的学习 —— Linux下的进程

目录前言1 认识进程1.1 进程的概念1.2 进程的管理1.3 查看进程的两种方法1.4 getpid、getppid和fork函数2 进程状态2.1 普遍概念下的进程状态2.2 Linux下的进程状态2.2.1 测试Linux的各种进程状态2.2.2 僵尸进程2.3 孤儿进程3 进程切换与进程优先级3.1 并行、并发3.2 进程切换3…

kafka和sparkStreaming

1、Kafka 1、kafka集群架构 producer 消息生产者&#xff0c;发布消息到Kafka集群的终端或服务 broker Kafka集群中包含的服务器&#xff0c;一个borker就表示kafka集群中的一个节点 topic 每条发布到Kafka集群的消息属于的类别&#xff0c;即Kafka是面向 topic 的。 更通俗…

HDFS 常用命令

一、HDFS常用命令 1、查看版本 hdfs version 2、创建 HDFS 文件系统目录。 格式&#xff1a; hdfs dfs -mkdir /user/dir1 3、列出目录下的所有文件 类似 Linux Shell 的 ls 命令。用它可以列出指定目录下的所有文件 hdfs dfs -ls /user/ 4、把本地文件系统文件和目录拷贝…

整合Tkinter GUI界面的古诗词词云生成

Python语言提供的wordcloud词云功能&#xff0c;使文本数据的可视化&#xff0c;简单而美丽。但网上的大多数词云生成功能&#xff0c;多半没有可交互的GUI界面&#xff0c;使用起来稍觉不便。笔者结合网上的中文词云功能&#xff0c;以唐诗三百首&#xff0c;宋词三百首&#…

拟合算法(模型+代码)

拟合的结果是得到一个确定的曲线 最小二乘法的几何解释&#xff1a; argmin 存在参数k&#xff0c;b使括号里的值最小 第一种有绝对值&#xff0c;不易求导&#xff08;求导在求最小值&#xff09;&#xff0c;计算较为复杂&#xff1b;所以我们往往使用第二种定义&#xff0…

什么软件可以录屏?这3款宝藏录屏软件,码住收藏

当我们处理剪辑视频时&#xff0c;我们需要使用到很多素材。有些素材我们可以直接从电脑网上进行下载。但有些素材我们在网上无法进行下载&#xff0c;这个时候就需要使用录屏软件进行录屏。什么软件可以录屏&#xff1f;今天小编向您分享3个宝藏录屏软件&#xff0c;赶紧码住收…

jmeter基础使用方法

文章目录一 配置环境变量二 Jmeter默认语言设置三 启动线程组的创建发送http请求数据报告一 配置环境变量 设置JMETER_HOME,及jemeter解压目录。 设置CLASSPATH,此处分别配置ApacheJMeter_core.jar和jorphan.jar所在位置。 关于环境变量配置多个值&#xff0c;在多个参数中间…

动态规划——状态压缩dp

文章目录概述状态压缩使用条件状压dp位运算棋盘&#xff08;基于连通性&#xff09;类问题概述例题蒙德里安的梦想小国王玉米田炮兵阵地集合类问题概述例题最短Hamilton路径愤怒的小鸟总结概述 状态压缩 状态压缩就是使用某种方法&#xff0c;简明扼要地以最小代价来表示某种…

MySQL 进阶篇2.0 存储过程 触发器 锁 InnoDB引擎

45.存储过程-介绍 46.存储过程-基本语法 -- 查看 select * from information_sc

Python中import语句用法详解

一. 什么是模块&#xff08;module&#xff09;&#xff1f; 在实际应用中&#xff0c;有时程序所要实现功能比较复杂&#xff0c;代码量也很大。若把所有的代码都存储在一个文件中&#xff0c;则不利于代码的复用和维护。一种更好的方式是将实现不同功能的代码分拆到多个文件…

案例丨多元业态管理服务厂商如何走通数字化转型之路

对于多元业态管理服务厂商来说&#xff0c;不同业态客户的使用习惯不一样&#xff0c;从而导致服务过程中的服务有所区别&#xff0c;是这类服务厂商数字化转型的核心需求。下面就以全国领先的阳光智博为例&#xff0c;看下他们是怎样数字化转型的。 一、企业介绍 阳光智博服务…

ASEMI整流二极管A7二极管和M7二极管能代换吗

编辑-Z A7二极管和M7二极管不仅外观封装很像&#xff0c;各项参数也是非常接近的&#xff0c;那么A7二极管和M7二极管能代换吗&#xff1f;我们先来看看他们的详细参数对比&#xff1a; A7二极管参数&#xff1a; 型号&#xff1a;A7二极管 封装&#xff1a;SOD-123 最大重…

Docker- 7.1、跨主机网络-macvlan

一、macvlan介绍 macvlan 本身是 linxu kernel 模块&#xff0c;其功能是允许在同一个物理网卡上配置多个 MAC 地址而实现虚拟多块网卡&#xff0c;即多个 interface&#xff0c;每个 interface 可以配置自己的IP。macvlan 本质上是一种网卡虚拟化技术。macvlan 的最大优点是性…

教你这样找到Mac“其他”文件并删除它

当我们通过「关于本机」>「存储空间」查看硬盘的空间占用情况时。系统会将存储空间根据不同文件类别所占的空间大小显示在条状图上&#xff0c;大部分类型看文字都比较好理解&#xff0c;但对于“其他”这一类很多小伙伴都感觉很困惑&#xff0c;会产生一些问题如&#xff1…

如何在PPT中嵌入交互式图表?LightningChart助力炫酷展示

我们在PPT演示文稿中嵌入图表很容易&#xff0c;但嵌入交互式图表似乎就没听说过了&#xff0c;接下来我们就一起来看看通过交互式图表在PPT中展示病人心跳的效果&#xff1a; PPT中展示病人心跳下方是一个实时地图在PPT中的展现实例 LightningChart2以上在PPT中展示实时交互的…

Nacos服务注册发现、配置管理

Nacos服务注册发现 引入依赖 <dependencyManagement><dependencies><!-- nacos管理依赖 --><dependency><groupId>com.alibaba.cloud</groupId><artifactId>spring-cloud-alibaba-dependencies</artifactId><version>…