大数据组件(一)快速入门调度组件Airflow

news2025/1/3 3:04:24

大数据组件(一)快速入门调度组件Airflow

  • DolphinScheduler和 Airflow是数据领域很流行的两款开源任务调度系统。DolphinScheduler 致力于用可视化的方式去完成一个 DAG 工作流,而 Airflow 则想的是用类似于编程的方式完成一个 DAG 工作流。
    • Apache DolphinScheduler 可以直接在页面上完成对 DAG 工作流的开发。
    • 而 Apache Airflow 需要提交一个 Python 文件到后台服务器上,由 Apache Airflow 去解析这个 Python 文件,进而生成一个 DAG 工作流。
  • 我们,今天以几个简单的案例,快速了解下基于python的调度组件Airflow。
    • 官网地址:https://airflow.incubator.apache.org/

1 Airflow的安装(单机版)

这里,我们利用conda进行安装。

首先,创建环境:

conda create -n airflow  python=3.8 -y
conda activate airflow

然后,利用pip进行安装,需要注意的是:安装时候,需要指定约束文件,否则很容易会出现依赖冲突

CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt"
pip install "apache-airflow==2.7.2" --constraint "${CONSTRAINT_URL}"

我们这里利用MySQL数据库进行配置:

# 3、查询版本
(airflow) [root@centos01 ~]# airflow version
2.7.2
(airflow) [root@centos01 airflow]# pwd
/root/airflow

# 4、数据库初始化、改为Mysql数据库
(airflow) [root@centos01 airflow]# pip install mysql-connector-python
(airflow) [root@centos01 airflow]# airflow db init
(airflow) [root@centos01 airflow]# vim airflow.cfg

#sql_alchemy_conn = sqlite:root/airflow/airflow.db
sql_alchemy_conn = mysql+mysqlconnector://root:123456@127.0.0.1:3306/airflow_db

# 再次初始化
(airflow) [root@centos01 airflow]# airflow db init
# 报错如下:
sqlalchemy.exc.ProgrammingError: (mysql.connector.errors.ProgrammingError) 1067 (42000): Invalid default value for 'updated_at'
[SQL: 
CREATE TABLE dataset (
	id INTEGER NOT NULL AUTO_INCREMENT, 
	uri VARCHAR(3000) COLLATE latin1_general_cs NOT NULL, 
	extra JSON NOT NULL, 
	created_at TIMESTAMP(6) NOT NULL, 
	updated_at TIMESTAMP(6) NOT NULL, 
	is_orphaned BOOL NOT NULL DEFAULT '0', 
	CONSTRAINT dataset_pkey PRIMARY KEY (id)
)]
(Background on this error at: https://sqlalche.me/e/14/f405)


# 解决方案
原因:字段 'update_at' 为 timestamp类型,取值范围是:1970-01-01 00:00:00 到 2037-12-31 23:59:59(UTC +8 北京时间从1970-01-01 08:00:00 开始),而这里默认给了空值,所以导致失败。
推荐修改mysql存储时间戳格式:
mysql> set GLOBAL sql_mode ='STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION' 
重启MySQL会造成参数失效,推荐将参数写入到配置文件my.cnf中。

# 在[mysqld]添加下面两行
[root@centos01 apps]# vim /etc/my.cnf
[mysqld]
skip_ssl
sql_mode=STRICT_TRANS_TABLES,NO_AUTO_CREATE_USER,NO_ENGINE_SUBSTITUTION
# 重启mysql
[root@centos01 apps]# systemctl restart mysqld

# 5、添加admin用户
(airflow) [root@centos01 airflow]# airflow users create \
> --username admin \
> --firstname Undo \
> --lastname Undo \
> --role Admin \
> --email undo@163.com
# 输入密码
[2024-12-27T16:28:17.107+0800] {manager.py:555} INFO - Added Permission %s to role %s
Password:



# 6、启动airflow web服务和调度器, 启动后浏览器访问http://centos01:8080
(airflow) [root@centos01 airflow]# airflow webserver -p 8080 -D
(airflow) [root@centos01 airflow]# airflow scheduler -D

在这里插入图片描述

上图是默认安装时候,会出现的两条信息:

  • Airflow使用SQLite数据库,建议改为Mysql数据库,我们已经修改;
  • SequentialExecutor按顺序运行任务实例,不能并行执行任务,我们下面修改;
  • 注意:右上角默认是UTC时间,我们一定要点击修改时区,同时要修改airflow.cfg文件。
# 7、修改airflow的配置文件
(airflow) [root@centos01 airflow]# vim airflow.cfg 
# 可以使用官方推荐的几种执行器,也可以自定义。这里我们选择本地执行器即可。
[core]
# 存放python调度脚本的目录
dags_folder = /root/airflow/dags

# The executor class that airflow should use. Choices include
# ``SequentialExecutor``, ``LocalExecutor``, ``CeleryExecutor``, ``DaskExecutor``,
# ``KubernetesExecutor``, ``CeleryKubernetesExecutor`` or the
# full import path to the class when using a custom executor.
executor = LocalExecutor

# 修改时区信息

# default_ui_timezone = UTC
default_ui_timezone = Asia/Shanghai

# default_timezone = utc
default_timezone = Asia/Shanghai


# 创建目录,用来存放python调度脚本
(airflow) [root@centos01 airflow]# mkdir dags

我们把airflow启动、关闭封装为脚本:

(airflow) [root@centos01 ~]# vim af.sh
(airflow) [root@centos01 ~]# chmod +x af.sh 
#!/bin/bash
case $1 in
"start"){
   
    echo " --------start airflow-------"
    conda activate airflow;airflow webserver -p 8080 -D;airflow scheduler -D;conda deactivate
};;
"stop"){
   
    echo " --------stop airflow-------"
    ps -ef | egrep 'scheduler|airflow-webserver' | grep -v grep | awk '{print $2}' | xargs kill -15
};;
esac

# 然后重启
(airflow) [root@centos01 ~]# ./af.sh stop
 --------stop airflow-------
(airflow) [root@centos01 ~]# ./af.sh start
 --------start airflow-------
 # 查看是否启动成功
 (airflow) [root@centos01 ~]# ps -ef | grep 8080
root       1935      1  1 20:55 ?        00:00:00 /opt/apps/minoconda3/envs/airflow/bin/python /opt/apps/minoconda3/envs/airflow/bin/airflow webserver -p 8080 -D
root       2618   1630  0 20:55 pts/0    00:00:00 grep --color=auto 8080

# 访问http://centos01:8080/页面,页面上会出现很多官方示例
# 当然,我们也能通过命令查看
(airflow) [root@centos01 ~]# airflow dags list
dag_id                                                  | filepath                                                                                                                             | owner   | paused
========================================================+======================================================================================================================================+=========+=======
dataset_consumes_1                                      | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
dataset_consumes_1_and_2                                | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
dataset_consumes_1_never_scheduled                      | /opt/apps/minoconda3/envs/airflow/lib/python3.8/site-packages/airflow/example_dags/example_datasets.py                               | airflow | True  
......

2 Airflow入门案例

2.1 BashOperator

  • Airflow中最重要的还是各种Operator,其允许生成特定类型的任务,这个任务在实例化时称为DAG中的任务节点,所有的Operator均派生自BaseOparator,并且继承了许多属性和方法。

在这里插入图片描述

  • BashOperator主要执行bash脚本或命令
  • Operator参考:https://airflow.apache.org/docs/
(airflow) [root@centos01 shell_jobs]# cat first_shell.sh 
#!/bin/bash

dt=$1

echo "==== execute first shell ===="

echo "---- first : time is ${dt}"
(airflow) [root@centos01 shell_jobs]# cat second_shell.sh 
#!/bin/bash
dt=$1
echo "==== execute second shell ===="
echo "---- second : time is ${dt}"
  • python脚本如下(可以用VS Code远程连接),注意:要放在配置的目录下

在这里插入图片描述

#!/usr/bin/python
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator



default_args = {
    'owner':'root',
    'start_date':datetime(2024, 12, 27, 22, 0),   # 调度开始时间
    'retries': 1,                                 # 失败重试次数
    'retry_delay': timedelta(minutes=5)           # 失败重试间隔
}

dag = DAG(
    dag_id='MyShellTest',
    default_args=default_args,
    schedule_interval='*/15 * * * *'    # 每15min运行一次
)

first=BashOperator(
    task_id='first',
    # 脚本路径建议写绝对路径
    bash_command='sh /root/shell_jobs/first_shell.sh %s'%datetime.now().strftime("%Y-%m-%d-%H"),
    dag=dag
)

second=BashOperator(
    task_id='second',
    # 脚本路径建议写绝对路径
    bash_command='sh /root/shell_jobs/second_shell.sh %s'%datetime.now().strftime("%Y-%m-%d-%H"),
    dag=dag
)

first >> second

在这里插入图片描述
在这里插入图片描述

在这里插入图片描述

2.2 SSHOperator

  • 在实际的调度任务中,任务脚本大多分布在不同的机器上,我们可以使用SSHOperator来调用远程机器上的脚本任务。

  • Airflow中提供了很多的providers,需要通过pip安装apache-airflow-providers-ssh包。

    • 安装哪个版本的apache-airflow-providers-ssh包呢?

    • 需要查看我们安装时候的https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt约束文件

    • 在这里插入图片描述

  • 最后,配置SSH Connection连接。登录airflow webui ,选择“Admin”->“Connections”。

在这里插入图片描述

# 首先停止airflow webserver与scheduler

# 我们在https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt找相应的版本安装
(airflow) [root@centos01 airflow]# pip install apache-airflow-providers-ssh==3.7.3

# 然后启动airflow webserver与scheduler
我们在另一台机器上创建两个shell脚本:
#!/bin/bash
# 获取主机名
HOSTNAME=$(hostname)
echo "==== 执行脚本主机是: $HOSTNAME, execute first shell ===="

#!/bin/bash
# 获取主机名
HOSTNAME=$(hostname)
echo "==== 执行脚本主机是: $HOSTNAME, execute second shell ===="

[root@centos02 shell_jobs]# ll
total 8
-rw-r--r--. 1 root root 122 Dec 28 16:34 first_shell.sh
-rw-r--r--. 1 root root 123 Dec 28 16:34 second_shell.sh

然后,创建Python脚本

from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.providers.ssh.operators.ssh import SSHOperator

default_args = {
    'owner':'root',
    'start_date': datetime(2024, 12, 28, 16),   # 开始执行时间
    'retries': 1,                               # 失败重试次数
    'retry_delay': timedelta(minutes=5)         # 失败重试间隔
}

# 声明任务图, Airflow使用的是“前一个周期”来调度 DAG 运行
# 即:在2024-12-28 00:00:00时,Airflow会执行的调度账期是:2024-12-27 00:00:00
# dag = DAG('MyTaskTest', default_args=default_args, schedule_interval=timedelta(days=1))

dag = DAG(
    dag_id = 'Myssh2centos02',
    default_args=default_args,
    schedule_interval=timedelta(minutes=10)  # 10min执行一次
)

first=SSHOperator(
    task_id='first',
    ssh_conn_id='AA-My-ssh-centos02',                  # 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/shell_jobs/first_shell.sh ',     # 注意:带一个空格
    dag = dag
)

second=SSHOperator(
    task_id='second',
    ssh_conn_id='My-ssh-centos02',                     # 配置在Airflow webui Connection中配置的SSH Conn id
    command='sh /root/shell_jobs/second_shell.sh ',    # 注意:带一个空格
    remote_host="192.168.42.102",                      # 如果配置remote_host,将会替换Connection中的SSH配置的host
    dag=dag
)

first >> second

在这里插入图片描述

2.3 PythonOperator

  • PythonOperator可以调用Python函数,由于Python基本可以调用任何类型的任务,如果实在找不到合适的Operator,将任务转为Python函数,使用PythonOperator即可。
import random
from datetime import datetime, timedelta
from airflow import DAG
from airflow.operators.python import PythonOperator
import pytz

def print__hello1(*a,**b):
    """
        *  关键字参数允许传入0个或任意个参数,这些可变参数在函数调用时自动组装为一个tuple
        ** 关键字参数允许传入0个或任意个含【参数名】的参数,这些关键字参数在函数内部自动组装为一个dict
    """
    print("hello airflow1")
    print(a)
    print(b)
    # 返回的值只会打印到日志中
    return {"sss1":"print__hello1"}

def print__hello2(execution_ds, execution_ts, random_base):
    print("hello airflow2")
    print("Today:{}, 当前执行的账期为:{}, 随机数为:{}".format(execution_ds, execution_ts, random_base))

    # 将字符串解析为带有时区信息的datetime对象
    utc_dt = datetime.strptime(execution_ts, "%Y-%m-%dT%H:%M:%S%z")
    # 定义目标时区(上海时区)
    shanghai_tz = pytz.timezone('Asia/Shanghai')
    # 将UTC时间转换为目标时区的时间
    shanghai_dt = utc_dt.astimezone(shanghai_tz)
    # 格式化输出
    execution_ts_sh = shanghai_dt.strftime('%Y-%m-%d %H:%M:%S')
    print("Today:{}, 当前执行的账期为:{}, 随机数为:{}".format(execution_ds, execution_ts_sh, random_base))

    
    # 返回的值只会打印到日志中
    return {"sss2":"print__hello2"}

default_args = {
    'owner':'root',
    'start_date':  datetime(2024, 12, 28, 14), # 开始执行时间
    'retries': 1,  # 失败重试次数
    'retry_delay': timedelta(minutes=5) # 失败重试间隔
}

dag = DAG(
    dag_id = 'MyPythonOperator',
    default_args=default_args,
    schedule_interval='*/10 * * * *'  # 每10min运行一次
)

first=PythonOperator(
    task_id='MyPython_first',
    #填写print__hello1方法时,不要加上()
    python_callable=print__hello1,
    # op_args对应print_hello1方法中的a参数
    op_args=[1,2,3,"hello","world"],
    # op_kwargs对应print__hello1方法中的b参数,带参数名称
    op_kwargs={"id":"1","name":"zs","age":18},
    dag = dag
)

second=PythonOperator(
    task_id='MyPython_second',
    # 同样,填写print__hello2 方法时,不要加上()
    python_callable=print__hello2,
    
    op_kwargs={
        # {{ ds }} 是一个Airflow Jinja模板变量,表示当前的执行日期(格式为 YYYY-MM-DD)
        'execution_ds': '{{ ds }}', 
        # {{ ts }} 是一个Airflow Jinja模板变量,表示当前的执行时间戳,格式为 YYYY-MM-DDTHH:MM:SS
        # 注意:默认传递是UTC时间,例如:2024-12-28T07:30:00+00:00
        'execution_ts': '{{ ts }}', 
        # random_base参数对应print_hello2方法中参数的random_base
        "random_base": random.randint(0, 9) 
    },
    dag=dag
)

first >> second

在这里插入图片描述

2.4 HiveOperator

  • 可以通过HiveOperator直接操作Hive SQL
  • 在airflow中使用HiveOperator调用Hive任务,首先需要安装以下依赖并配置Hive Metastore
# 在https://raw.githubusercontent.com/apache/airflow/constraints-2.7.2/constraints-3.8.txt找相应的版本安装
pip install apache-airflow-providers-apache-hive==6.1.6
  • 启动HDFS、Hive Metastore,在Hive中创建下面表,并加载文件。
-- 1、创建表
create table ods_person_info(id int,name string,age int, city_id int) 
row format delimited fields terminated by '\t';

create table dim_city_info(city_id int,city_name string)
row format delimited fields terminated by '\t';


-- 2、准备两个文件
ods_person_info.csv
1	John	30	1001
2	Jane	25	1002
3	Mike	40	1001
4	Sarah	35	1003
5	Liam	22	1004

dim_city_info.csv
1001	New York
1002	Los Angeles
1003	Chicago
1004	Houston
1005	Phoenix



-- 3、加载数据
LOAD DATA LOCAL INPATH '/root/input_doris_data/ods_person_info.csv' 
INTO TABLE ods_person_info;

LOAD DATA LOCAL INPATH '/root/input_doris_data/dim_city_info.csv' 
INTO TABLE dim_city_info;
  • 同样,登录Airflow webui并设置Hive Metastore

在这里插入图片描述

  • 编写Python脚本
from datetime import datetime, timedelta
from airflow import DAG
from airflow.providers.apache.hive.operators.hive import HiveOperator
from airflow.operators.python_operator import PythonOperator
import pytz


default_args = {
    'owner': 'root',
    'start_date': datetime(2024, 12, 30, 12),
    'retries': 1,
    'retry_delay': timedelta(minutes=5)
}

dag = DAG(
    dag_id='MyHiveOperator',
    default_args=default_args,
    schedule_interval=timedelta(hours=1)
)

# 1、利用PythonOperator封装分区变量
def get_partition_hour(**context):
    execution_dt = context['execution_date']
    # 转换Asia/Shanghai分区时间
    shanghai_tz = pytz.timezone('Asia/Shanghai')
    shanghai_dt = execution_dt.astimezone(shanghai_tz)
    # 分区时间
    partition_hour = shanghai_dt.strftime('%Y%m%d%H')
    context['ti'].xcom_push(key='partition_hour', value=partition_hour)
    return {'partition_hour': partition_hour}

time_prep = PythonOperator(
    task_id='prepare_times',
    python_callable=get_partition_hour,
    provide_context=True,
    dag=dag
)

# 2、创建结果表
create_table = HiveOperator(
    task_id='create_table',
    hive_cli_conn_id="AA-My-centos01-hive",
    hql="""
    CREATE TABLE IF NOT EXISTS dwd_person_city_info (
        id INT,
        name STRING,
        age INT,
        city_id INT,
        city_name STRING,
        insert_time STRING
    )
    PARTITIONED BY (p_hour STRING)
    ROW FORMAT DELIMITED
    FIELDS TERMINATED BY '\t'
    """,
    dag=dag
)


# 3、利用PythonOperator来预处理HQL语句,并将处理后的HQL语句传递给HiveOperator
def get_insert_time():
    # 获取当前的执行时间
    now = datetime.now()
    formatted_time = now.strftime('%Y-%m-%d %H:%M:%S')
    return formatted_time


def prepare_hql(**context):
    # Airflow中可以XCom功能来共享数据
    # 通过XCom,一个任务可以将数据推送到一个临时存储中,其他任务可以从这个存储中拉取数据
    partition_hour = context['ti'].xcom_pull(task_ids='prepare_times', key='partition_hour')
    # 调用本地方法
    formatted_time = get_insert_time()

    # 组装hive-sql
    hql = f"""
    INSERT OVERWRITE TABLE dwd_person_city_info 
    PARTITION(p_hour='{partition_hour}')
    SELECT 
          a.id
        , a.name
        , a.age
        , a.city_id
        , b.city_name
        , '{formatted_time}' AS insert_time
    FROM 
        ods_person_info a 
    JOIN 
        dim_city_info b 
    ON a.city_id = b.city_id
    """
    return hql


prepare_hql_task = PythonOperator(
    task_id='prepare_hql',
    python_callable=prepare_hql,
    provide_context=True,
    dag=dag
)

# 4、执行hive-sql插入到相应分区
insert_data = HiveOperator(
    task_id='insert_partitioned_data',
    hive_cli_conn_id="AA-My-centos01-hive",
    # 注意,这里使用了task_instance.xcom_pull而不是ti.xcom_pull
    # 因为在HiveOperator的模板上下文中,ti可能不是直接可用的
    # 而task_instance是Airflow在模板渲染时提供的一个全局变量,用于访问当前任务实例。
    hql="{{ task_instance.xcom_pull(task_ids='prepare_hql', key='return_value') }}",
    dag=dag
)

time_prep >> create_table >> prepare_hql_task >> insert_data
0: jdbc:hive2://192.168.42.101:10000> show tables;
+-----------------------+
|       tab_name        |
+-----------------------+
| dim_city_info         |
| dwd_person_city_info  |
| ods_person_info       |
+-----------------------+
3 rows selected (0.931 seconds)
0: jdbc:hive2://192.168.42.101:10000> show partitions dwd_person_city_info;
+--------------------+
|     partition      |
+--------------------+
| p_hour=2024123012  |
| p_hour=2024123013  |
| p_hour=2024123014  |
| p_hour=2024123015  |
| p_hour=2024123016  |
| p_hour=2024123017  |
+--------------------+
6 rows selected (0.307 seconds)
0: jdbc:hive2://192.168.42.101:10000> select * from  dwd_person_city_info a  where p_hour=2024123017;
+-------+---------+--------+------------+--------------+----------------------+-------------+
| a.id  | a.name  | a.age  | a.city_id  | a.city_name  |    a.insert_time     |  a.p_hour   |
+-------+---------+--------+------------+--------------+----------------------+-------------+
| 1     | John    | 30     | 1001       | New York     | 2024-12-30 18:00:10  | 2024123017  |
| 2     | Jane    | 25     | 1002       | Los Angeles  | 2024-12-30 18:00:10  | 2024123017  |
| 3     | Mike    | 40     | 1001       | New York     | 2024-12-30 18:00:10  | 2024123017  |
| 4     | Sarah   | 35     | 1003       | Chicago      | 2024-12-30 18:00:10  | 2024123017  |
| 5     | Liam    | 22     | 1004       | Houston      | 2024-12-30 18:00:10  | 2024123017  |
+-------+---------+--------+------------+--------------+----------------------+-------------+
5 rows selected (2.301 seconds)

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2268400.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

jpeg学习

相关最全的一篇文章链接:https://www.cnblogs.com/wtysos11/p/14089482.html YUV基础知识 Y表示亮度分量:如果只显示Y的话,图像看起来会是一张黑白照。 U(Cb)表示色度分量:是照片蓝色部分去掉亮度&#x…

内部类(3)

大家好,今天我们继续来看看内部类,今天我们来学习一下内部类的分类,我们来看看一共有几种,它们有什么作用,那么话不多说,我们直接开始。 9.1 内部类的分类 先来看下,内部类都可以在一个类的哪些位置进行定…

你还在用rand()生成随机数?

1. rand() 的缺陷 伪随机数生成器使用数学算法来产生具有良好统计特性的数字序列,但这些数字并非真正随机。 C 标准库中的 rand() 函数并不保证所生成的随机序列的质量。某些 rand() 实现生成的数字周期较短,且这些数字是可以预测的。对于有强伪随机数…

基于FPGA的2ASK+帧同步系统verilog开发,包含testbench,高斯信道,误码统计,可设置SNR

目录 1.算法仿真效果 2.算法涉及理论知识概要 2.1 2ASK调制解调 2.2 帧同步 3.Verilog核心程序 4.完整算法代码文件获得 1.算法仿真效果 vivado2019.2仿真结果如下(完整代码运行后无水印): 设置SNR8db 设置SNR20db 整体波形效果&…

RT-Thread中堆和栈怎么跟单片机内存相联系

现在RT-ThreadMCU的应用方式越来越普遍,RT-Thread需要配置MCU中的RAM到的系统中,进入系统内存管理,才能提供给基于实时系统的应用程序使用,比如给应用程序提供malloc、free等函数调用功能。在嵌入式软件开发中,我们经常…

2、Bert论文笔记

Bert论文 1、解决的问题2、预训练微调2.1预训练微调概念2.2深度双向2.3基于特征和微调(预训练下游策略) 3、模型架构4、输入/输出1.输入:2.输出:3.Learned Embeddings(学习嵌入)1. **Token Embedding**2. **Position Embedding**3…

TiDB 的MPP架构概述

MPP架构介绍: 如图,TiDB Server 作为协调者,首先 TiDB Server 会把每个TiFlash 拥有的region 会在TiFlash上做交换,让表连接在一个TiFlash上。另外 TiFlash会作为计算节点,每个TiFlash都负责数据交换,表连接…

3、redis的高可用

主从复制 主从复制:这是redis高可用的基础。哨兵模式和集群都是建立在此基础之上。 主从模式和数据库的主从模式是一样的,主负责写入,然后把写入的数据同步到从,从节点只能读不能写。read only。 不能做高可用的切换&#xff…

【架构-38】如何选择通信协议和数据格式

一、通信协议选择 不同的协议适用于不同的应用场景,关键在于数据传输的需求,如:实时性、带宽、可靠性等。下面是几种常见通信协议的适用场景: WebSocket 适用场景:实时、双向数据传输、低延迟、持久连接 特点&#x…

SpringCloudAlibaba 技术栈—Sentinel

1、什么是sentinel? Sentinel是一个用于微服务架构的流量管理和控制系统,它通过限制和控制进入系统的流量,来保护系统免受过载和故障的影响,确保服务的稳定性。简而言之,它就是一个帮助微服务在高负载情况下也能稳定运行的工具。…

初学STM32 ---高级定时器互补输出带死区控制

互补输出,还带死区控制,什么意思? 带死区控制的互补输出应用之H桥 捕获/比较通道的输出部分(通道1至3) 死区时间计算 举个栗子(F1为例):DTG[7:0]250,250即二进制&#x…

RoboMIND:多体现基准 机器人操纵的智能规范数据

我们介绍了 RoboMIND,这是机器人操纵的多体现智能规范数据的基准,包括 4 个实施例、279 个不同任务和 61 个不同对象类别的 55k 真实世界演示轨迹。 工业机器人企业 埃斯顿自动化 | 埃夫特机器人 | 节卡机器人 | 珞石机器人 | 法奥机器人 | 非夕科技 | C…

Hadoop HA安装配置(容器环境),大数据职业技能竞赛模块A平台搭建,jdk+zookeeper+hadoop HA

HA概述 (1) 所谓HA(High Availablity),即高可用(7*24小时不中断服务)。 (2) 实现高可用最关键的策略是消除单点故障,HA严格来说应该分为各个组件的HA机制,H…

国产文本编辑器EverEdit - 如何让输出窗口的日志具有双击跳转到文件指定行的功能

1 开发参考:编写脚本时如何向输出窗口打印可跳转到文件位置的日志 1.1 应用场景 编写脚本时,有时对文本进行分析,需要将提示信息打印到输出窗口,同时希望将文本的行、列信息也打印在日志中, 最好是双击日志信息可以跳…

《云原生安全攻防》-- K8s安全配置:CIS安全基准与kube-bench工具

在本节课程中,我们来了解一下K8s集群的安全配置,通过对CIS安全基准和kube-bench工具的介绍,可以快速发现K8s集群中不符合最佳实践的配置项,及时进行修复,从而来提高集群的安全性。 在这个课程中,我们将学习…

3、redis的集群模式

主从模式 哨兵模式 集群 主从模式:这是redis高可用的基础,哨兵模式和集群都是建立在此基础之上。 主从模式和数据库的主从模式是一样的,主负责写入,然后把写入的数据同步到从, 从节点只能读不能写,rea…

计算机图形学知识点汇总

一、计算机图形学定义与内容 1.图形 图形分为“图”和“形”两部分。 其中,“形”指形体或形状,存在于客观世界和虚拟世界,它的本质是“表示”;而图则是包含几何信息与属性信息的点、线等基本图元构成的画面,用于表达…

自动化测试模型(一)

8.8.1 自动化测试模型概述 在自动化测试运用于测试工作的过程中,测试人员根据不同自动化测试工具、测试框架等所进行的测试活动进行了抽象,总结出线性测试、模块化驱动测试、数据驱动测试和关键字驱动测试这4种自动化测试模型。 线性测试 首先&#…

医疗数仓数据仓库设计

医疗数仓数据仓库设计 数据仓库构建流程数据调研明确数据域构建业务总线矩阵明确统计指标交易主题医生主题用户主题评价主题 维度模型设计汇总模型设计 数据仓库构建流程 数据仓库分层规划 优秀可靠的数仓体系,需要良好的数据分层结构。合理的分层,能够…

Go-知识 注释

Go-知识 注释 行注释块注释包注释结构体&接口注释函数&方法注释废弃注释文档 在 go 语言中注释有两种,行注释和块注释 行注释 使用双斜线 // 开始,一般后面紧跟一个空格。行注释是Go语言中最常见的注释形式,在标准包中,…