流批一体计算引擎-5-[Flink]的Python Table API和SQL程序

news2025/1/12 3:48:07

参考Flink从入门到入土(详细教程)

参考flink的默认窗口触发机制
参考彻底搞清Flink中的Window
参考官方Python API文档

1 IDEA中运行Flink

从Flink 1.11版本开始, PyFlink 作业支持在 Windows 系统上运行,因此您也可以在 Windows 上开发和调试 PyFlink 作业了。

1.1 环境配置

pip3 install apache-flink==1.15.3
CMD>set PATH查看环境变量
CMD>set JAVA_HOME查看环境变量
JAVA_HOME=D:\Java\jdk

1.2 Python API

官方Python API文档

根据需要的抽象级别的不同,有两种不同的API可以在PyFlink中使用:
(1)PyFlink Table API允许你使用类似于SQL或者在Python中处理表格数据的方式编写强大的关系查询。
(2)PyFlink DataStream API允许你对Flink的核心组件state和time进行细粒度的控制,以便构建更复杂的流处理应用。

2 PyFlink Table API

参考如何在Apache Flink中使用Python API?

2.1 Python Table API程序的基本结构

所有的 Table API 和 SQL 程序,不管批模式,还是流模式,都遵循相同的结构。

1. 创建 TableEnvironment
2. 创建 source3. 创建 sink 表
4. 查询 source 表,同时执行计算
通过 Table API 创建一张表:
或者通过 SQL 查询语句创建一张表:
5. 将计算结果写入给 sink 表
将 Table API 结果表数据写入 sink 表:
或者通过 SQL 查询语句来写入 sink 表:
# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1. 创建 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2. 创建(SQL表)datagen
table_env.execute_sql("""
    CREATE TABLE datagen (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind' = 'sequence',
        'fields.id.start' = '1',
        'fields.id.end' = '10'
    )"""
)

# 3. 创建(SQL表)print
table_env.execute_sql("""
    CREATE TABLE print (
        id INT,
        data STRING
    ) WITH (
        'connector' = 'print'
    )"""
)

# 4、将(SQL表)datagen转换成(Table API表)source_table
# 方式一:
source_table = table_env.from_path("datagen")
# 方式二:通过SQL查询语句创建
# source_table = table_env.sql_query("SELECT * FROM datagen")

# 5、 查询(Table API表)source_table,同时执行计算
result_table = source_table.select(source_table.id + 1, source_table.data)

# 6. 将(Table API表)result_table的数据写入(SQL表)print
# 方式一:
result_table.execute_insert("print").wait()
# 方式二:通过SQL查询语句来写入
# table_env.execute_sql("INSERT INTO print SELECT * FROM datagen").wait()

2.2 第一步:创建TableEnvironment

TableEnvironment 是 Table API 和 SQL 集成的核心概念。

from pyflink.table import EnvironmentSettings, TableEnvironment

# create a streaming TableEnvironment流模式
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# or create a batch TableEnvironment批模式
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

2.3 第二步:创建表

Table是 Python Table API的核心组件。Table对象由一系列数据转换操作构成,但是它不包含数据本身。 相反,它描述了如何从数据源中读取数据,以及如何将最终结果写出到外部存储等。
表可以被打印、优化并最终在集群中执行。
表也可以是有限流或无限流,以支持流式处理和批处理场景。
一个 Table 实例总是与一个特定的 TableEnvironment 相绑定。
注意:不支持在同一个查询中合并来自不同TableEnvironments的表,例如join或者union它们。

2.3.1 通过列表类型的对象创建

(1)使用一个列表对象创建一张表

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、创建 批 TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# 2、使用一个列表对象创建一张(Table API表)table
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')])
table.execute().print()

结果如下:
在这里插入图片描述
(2)创建具有指定列名的表

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、创建 批 TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# 2、创建具有指定列名的表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table.execute().print()

结果如下:
在这里插入图片描述
(3)手动指定表模式
默认情况下,表结构是从数据中自动提取的。
如果自动生成的表模式不符合你的要求,你也可以手动指定:

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes

# 1、创建 批 TableEnvironment
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)
# 2、创建具有指定列名的表
table1 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# 默认情况下,“id” 列的类型是 64 位整型
print("默认情况下id列的类型为{}".format(table1.get_schema().get_field_data_type("id")))

# 3、指定表模式
table2 = table_env.from_elements([(1, 'Hi'), (2, 'Hello')],
    DataTypes.ROW([DataTypes.FIELD("id", DataTypes.TINYINT()),
                DataTypes.FIELD("data", DataTypes.STRING())]))
print("指定id列的类型为{}".format(table2.get_schema().get_field_data_type("id")))

结果如下:
默认情况下id列的类型为BIGINT
指定id列的类型为TINYINT

2.3.2 通过DDL创建

可以通过 DDL 语句创建表,它代表一张从指定的外部存储读取数据的表:

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 创建 流 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 创建(SQL表)random_source
table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT 
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='3',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='6'
    )
""")
# (SQL表)random_source转换成(Table API表)table
table = table_env.from_path("random_source")
table.execute().print()

结果如下:
在这里插入图片描述

2.3.3 通过Catalog创建

TableEnvironment 维护了一个使用标识符创建的表的catalogs映射。
Catalog中的表既可以是临时的,并与单个Flink会话生命周期相关联,也可以是永久的,跨多个Flink会话可见。

通过SQL DDL创建的表和视图, 例如 “create table …” 和 “create view …",都存储在catalog中。

可以通过SQL直接访问catalog中的表。
如果你要用 Table API 来使用 catalog 中的表,可以使用 “from_path” 方法来创建 Table API 对象:

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、创建 流 TableEnvironment
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2、创建Table API表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

# 3、准备 catalog,将 Table API 表注册到 catalog 中
table_env.create_temporary_view('source_table', table)

# 4、从 catalog 中获取 Table API 表
new_table = table_env.from_path('source_table')
new_table.execute().print()

结果如下:
在这里插入图片描述

2.4 第三步:查询

2.4.1 Table API查询

Table对象有许多方法,可以用于进行关系操作。 这些方法返回新的Table对象,表示对输入Table应用关系操作之后的结果。
这些关系操作可以由多个方法调用组成,例如 table.group_by(…).select(…)。

(1)一个简单的Table API查询

# -*- coding:utf-8 -*-
from pyflink.table import EnvironmentSettings, TableEnvironment, DataTypes
from pyflink.table.expressions import col

# 通过 batch table environment 来执行查询
env_settings = EnvironmentSettings.in_batch_mode()
table_env = TableEnvironment.create(env_settings)

# 创建表
orders = table_env.from_elements(
    [('Jack', 'FRANCE', 10),('Rose', 'ENGLAND', 30),('Jack', 'FRANCE', 20)],
    ["name","country","revenue"])


# 查询所有来自法国客户的收入
revenue = orders.select(col("name"), col("country"), col("revenue")) \
    .where(col("country") == 'FRANCE')

revenue.execute().print()

结果如下:
在这里插入图片描述

2.4.2 SQL查询

Flink 的 SQL 基于 Apache Calcite,它实现了标准的 SQL。
SQL 查询语句使用字符串来表达。
SQL 文档描述了 Flink 对流和批处理所支持的 SQL。
下面示例展示了一个简单的 SQL 聚合查询:

from pyflink.table import EnvironmentSettings, TableEnvironment

# 通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


table_env.execute_sql("""
    CREATE TABLE random_source (
        id BIGINT, 
        data TINYINT
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='8',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='11'
    )
""")

table_env.execute_sql("""
    CREATE TABLE print_sink (
        id BIGINT, 
        data_sum TINYINT 
    ) WITH (
        'connector' = 'print'
    )
""")

table_env.execute_sql("""
    INSERT INTO print_sink
        SELECT id, sum(data) as data_sum FROM 
            (SELECT id / 2 as id, data FROM random_source)
        WHERE id > 1
        GROUP BY id
""").wait()

结果如下:
在这里插入图片描述
上述输出展示了 print 结果表所接收到的 change log。
change log 的格式为:

{subtask id}> {消息类型}{值的字符串格式}

例如,“2> +I(4,11)” 表示这条消息来自第二个 subtask,
其中 “+I” 表示这是一条插入的消息,
"(4, 11)” 是这条消息的内容。

“-U” 表示这是一条撤回消息 (即更新前),这意味着应该在 sink 中删除或撤回该消息。 “+U” 表示这是一条更新的记录 (即更新后),这意味着应该在 sink 中更新或插入该消息。
所以,从上面的 change log,我们可以得到如下结果:

(4, 11)
(2, 15) 
(3, 19)

2.4.3 Table API 和 SQL 的混合使用

Table API 中的 Table 对象和 SQL 中的 Table 可以自由地相互转换。

(1)如何在 SQL 中使用 Table 对象
将Table API表转换成SQL表。

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2、创建一张(SQL表)sink来接收结果数据
table_env.execute_sql("""
    CREATE TABLE table_sink (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")

# 3、创建(Table API表)table
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
# 4、将(Table API表)table转换成SQL中的视图
table_env.create_temporary_view('table_api_table', table)

# 5、将(Table API表)table的数据写入结果表
table_env.execute_sql("INSERT INTO table_sink SELECT * FROM table_api_table").wait()

结果如下:
在这里插入图片描述
(2)如何在 Table API 中使用 SQL 表
将SQL表转换成Table API表。

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2、创建一张(SQL表) sql_source
table_env.execute_sql("""
    CREATE TABLE sql_source (
        id BIGINT, 
        data TINYINT 
    ) WITH (
        'connector' = 'datagen',
        'fields.id.kind'='sequence',
        'fields.id.start'='1',
        'fields.id.end'='4',
        'fields.data.kind'='sequence',
        'fields.data.start'='4',
        'fields.data.end'='7'
    )
""")

# 3、将(SQL表)sql_source转换成(Table API表)table
table = table_env.from_path("sql_source")

# 或者通过SQL查询语句创建(Table API表)table
# table = table_env.sql_query("SELECT * FROM sql_source")

# 4、将表中的数据写出
table.execute().print()

结果如下:
在这里插入图片描述

2.5 第四步:结果输出

2.5.1 打印结果

可以通过 TableResult.print方法,将表的结果打印到标准输出中。该方法通常用于预览表的中间结果。

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2、创建(Table API表)source
source = table_env.from_elements([(1, "Hi", "Hello"),
                                  (2, "Hello", "Hello")],
                                 ["a", "b", "c"])

# 3、将(Table API表)source转换成SQL中的视图
table_env.create_temporary_view('table_api_source', source)

# Get TableResult
table_result = table_env.execute_sql("select a + 1, b, c from table_api_source")

# Print the table
table_result.print()

在这里插入图片描述
该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 Table.limit 来限制收集数据的条数是一种很好的做法。

2.5.2 将结果数据收集到客户端

你可以使用TableResult.collect将Table的结果收集到客户端,结果的类型为迭代器类型。
以下代码展示了如何使用TableResult.collect()方法:

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2、创建(Table API表)source
source = table_env.from_elements([(1, "Hi", "Hello"),
                                  (2, "Hello", "Hello")],
                                 ["a", "b", "c"])

# 3、将(Table API表)source转换成SQL中的视图
table_env.create_temporary_view('table_api_source', source)

# Get TableResult
table_result = table_env.execute_sql("select a + 1, b, c from table_api_source")

# 遍历结果
with table_result.collect() as results:
   for result in results:
       print(result)

在这里插入图片描述
注意:该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 Table.limit 来限制收集数据的条数是一种很好的做法。

2.5.3 将结果数据转换为DataFrame

可以调用to_pandas方法来 将一个Table对象转化成pandas DataFrame:

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)


# 2、创建(Table API表)source
# <class 'pyflink.table.table.Table'>
source = table_env.from_elements([(1, "Hi", "Hello"),
                                  (2, "Hello", "Hello")],
                                 ["a", "b", "c"])
print(source.to_pandas())

# 3、将(Table API表)source转换成SQL中的视图
table_env.create_temporary_view('table_api_source', source)

# 4、Get TableResult
# <class 'pyflink.table.table_result.TableResult'>
# 'TableResult' object has no attribute 'to_pandas'
table_result = table_env.execute_sql("select a + 1, b, c from table_api_source")

在这里插入图片描述

注意:该方式会触发表的物化,同时将表的内容收集到客户端内存中,所以通过 Table.limit 来限制收集数据的条数是一种很好的做法。
注意:并不是所有的数据类型都可以转换为 pandas DataFrames。

2.5.4 将结果写入到一张Sink表中

方式一:可以调用 execute_insert方法来将Table对象中的数据写入到一张 sink 表中

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2、创建(SQL表)sink_table
table_env.execute_sql("""
    CREATE TABLE sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")
# 3、创建(Table API表)table
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

# 4、(Table API表)table的数据写入到(SQL表)sink_table中
table.execute_insert("sink_table").wait()

在这里插入图片描述
方式二:可以通过 SQL 来完成:

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 2、创建(SQL表)sink_table
table_env.execute_sql("""
    CREATE TABLE sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")
# 3、创建(Table API表)table
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])

# 4、将(Table API表)table转换为(SQL表)table_source
table_env.create_temporary_view("table_source", table)

# 5、通过SQL语句写入
table_env.execute_sql("INSERT INTO sink_table SELECT * FROM table_source").wait()

在这里插入图片描述

2.5.5 将结果写入多张Sink表中

你也可以使用 StatementSet 在一个作业中将 Table 中的数据写入到多张 sink 表中:

from pyflink.table import EnvironmentSettings, TableEnvironment

# 1、通过 stream table environment 来执行查询
env_settings = EnvironmentSettings.in_streaming_mode()
table_env = TableEnvironment.create(env_settings)

# 准备 source 表和 sink 表
table = table_env.from_elements([(1, 'Hi'), (2, 'Hello')], ['id', 'data'])
table_env.create_temporary_view("simple_source", table)
table_env.execute_sql("""
    CREATE TABLE first_sink_table (
        id BIGINT, 
        data VARCHAR 
    ) WITH (
        'connector' = 'print'
    )
""")
table_env.execute_sql("""
    CREATE TABLE second_sink_table (
        id BIGINT, 
        data VARCHAR
    ) WITH (
        'connector' = 'print'
    )
""")

# 创建 statement set
statement_set = table_env.create_statement_set()

# 将 "table" 的数据写入 "first_sink_table"
statement_set.add_insert("first_sink_table", table)

# 通过一条 sql 插入语句将数据从 "simple_source" 写入到 "second_sink_table"
statement_set.add_insert_sql("INSERT INTO second_sink_table SELECT * FROM simple_source")

# 执行 statement set
statement_set.execute().wait()

在这里插入图片描述

3 Table API示例代码

Apache Flink提供Table API关系型API来统一处理流和批,即查询在无边界的实时流或有边界的批处理数据集上以相同的语义执行,并产生相同的结果。 Flink的Table API易于编写,通常能简化数据分析,数据管道和ETL应用的编码。

读取一个csv文件,处理后,将结果写到一个结果csv文件中。

3.1 基础代码

aa = map(lambda i: (i,), word_count_data)
    for a in aa:
        print(a)
输出如下:
('to be that',)
('to be ',)

#自定义函数
@udtf(result_types=[DataTypes.STRING()])
   def split(line: Row):
       for s in line[0].split():
           yield Row(s)

3.2 整体代码

from pyflink.table import EnvironmentSettings, TableEnvironment
from pyflink.table import DataTypes
from pyflink.table.expressions import lit, col
from pyflink.table.udf import udtf
from pyflink.common import Row

word_count_data = ["to be that",
                   "to be "]


def word_count(input_path, output_path):
    # 1、通过 stream table environment 来执行查询
    # TableEnvironment是Python Table API作业的入口类。
    env_settings = EnvironmentSettings.in_streaming_mode()
    table_env = TableEnvironment.create(env_settings)
    # 所有数据写入一个文件中
    table_env.get_config().set("parallelism.default", "1")

    # 2、创建源表和结果表
    # 使用TableEnvironment.execute_sql()方法,通过DDL语句来注册源表和结果表
    # 准备 source 表和 sink 表
    if input_path is not None:
        # DDL语句
        my_source_ddl = """
            create table source (
                word STRING
            ) with (
                'connector' = 'filesystem',
                'format' = 'csv',
                'path' = '{}'
            )""".format(input_path)
        # 注册表
        table_env.execute_sql(my_source_ddl)
        # (SQL表)source转化为(Table API表)tab
        tab = table_env.from_path("source")
    else:
        print("Executing word_count example with default input data set.")
        print("Use --input to specify file input.")
        tab = table_env.from_elements(map(lambda i: (i,), word_count_data),
                                  DataTypes.ROW([DataTypes.FIELD('line', DataTypes.STRING())]))
    if output_path is not None:
        # DDL语句
        my_sink_ddl = """
            create table sink (
                word STRING,
                `count` BIGINT
            ) with (
                'connector' = 'filesystem',
                'format' = 'canal-json',
                'path' = '{}'
        )""".format(output_path)
        # 注册表
        table_env.execute_sql(my_sink_ddl)
    else:
        # DDL语句
        my_sink_ddl = """
            create table sink (
                word STRING,
                `count` BIGINT
            ) with (
                'connector' = 'print'
        )"""
        # 注册表
        table_env.execute_sql(my_sink_ddl)

    @udtf(result_types=[DataTypes.STRING()])
    def split(line: Row):
        for s in line[0].split():
            yield Row(s)

    # compute word count
    tab.flat_map(split).alias('word') \
        .group_by(col('word')) \
        .select(col('word'), lit(1).count) \
        .execute_insert('sink') \
        .wait()
    # remove .wait if submitting to a remote cluster

if __name__ == "__main__":
    word_count(None,None)

上面所有的操作,比如创建源表 进行变换以及写入结果表的操作都只是构建作业逻辑图,只有当 execute_insert(sink_name) 被调用的时候, 作业才会被真正提交到集群或者本地进行执行。
在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/179287.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【数据结构】极致详解:树与二叉树(上)——结构与概念

目录 &#x1f6eb;前言&#x1f6eb;&#xff1a; &#x1f680;一、树&#x1f680;&#xff1a; 1.树的概念&#xff1a; 2.树的相关概念&#xff1a; 3.树的表示&#xff1a; 4.树的实际使用场景&#xff1a; &#x1f6f0;️二、二叉树&#x1f6f0;️&#xff1a;…

acwing-Diango项目 (后半)

acwing-Django项目 文章目录acwing-Django项目前言5. 创建账号系统5.1用户名密码登录写登录界面写注册界面写动作 实现三个函数 register login logout5.2 Web端acapp一键登录在django中集成redis(准备工作)首先 pip install django_redis配置一下缓存启动redis-serverredis在d…

特征工程——文本特征

文本特征 expansion编码 consolidation编码 文本长度特征 标点符号特征 词汇属性特征 特殊词汇特征 词频特征 TF-IDF特征 LDA特征 下面的文章主要是梯度提升树模型展开的&#xff0c;抽取的特征主要为帮助梯度提升树模型挖掘其挖掘不到的信息&#xff0c;本文介绍的所…

NodeJS Web 框架 Express 之中间件

NodeJS Web 框架 Express 之中间件参考描述中间件next()一个简单的中间件函数使用全局中间件局部中间件共享注意事项位置next()分类错误级中间件内置中间件express.urlencoded()express.json()第三方中间件参考 项目描述哔哩哔哩黑马程序员搜索引擎Bing 描述 项目描述Edge109…

从0-1开始 测试ZLMediaKit推拉流性能、延时性能

流媒体开发系列文章 文章目录流媒体开发系列文章前言一、环境准备&#xff1f;二、拉流测试过程三、推流测试过程三、延时测试总结前言 目前、比较有名的流媒体服务器有ZLMediaKit、srs、live555、eadydarwin等。因为srs是单线程服务、对于多核服务器的支持需要通过部署多个服…

pytorch深度学习基础(十一)——常用结构化CNN模型构建

结构化CNN模型构建与测试前言GoogLeNet结构Inception块模型构建resNet18模型结构残差块模型构建denseNet模型结构DenseBlocktransition_block模型构建结尾前言 在本专栏的上一篇博客中我们介绍了常用的线性模型&#xff0c;在本文中我们将介绍GoogleNet、resNet、denseNet这类…

APT之木马静态免杀

前言 这篇文章主要是记录手动编写代码进行木马免杀&#xff0c;使用工具也可以免杀&#xff0c;只不过太脚本小子了&#xff0c;而且工具的特征也容易被杀软抓到&#xff0c;指不定哪天就用不了了&#xff0c;所以要学一下手动去免杀木马&#xff0c;也方便以后开发一个只属于…

blender导入骨骼动画方法[psa动作]

先导入女性的psk文件 然后调整缩放大小和人物一样,包括角度朝向. ctrla应用所有改变 然后选择psk文件以及其他人物模型的全部 ,然后 在Layout-物体-父级 -附带空顶相点组 image.png之后会发现所有人物多了修改器,点击其中一个修改器 点添加修改器 -数据传递 勾选顶点数据-选择顶…

人员动作行为AI分析系统 yolov5

人员动作行为AI分析系统通过pythonyolo系列网络学习模型&#xff0c;对现场画面人员行为进行实时分析监测&#xff0c;自动识别出人的各种异常行为动作&#xff0c;立即抓拍存档预警同步回传给后台。 我们使用YOLO算法进行对象检测。YOLO是一个聪明的卷积神经网络(CNN)&#xf…

带滤波器的PID控制仿真-1

采用低通滤波器可有效地滤掉噪声信号&#xff0c;在控制系统的设计中是一种常用的方法。基于低通滤波器的信号处理实例设低通滤波器为&#xff1a;采样时间为1ms&#xff0c;输入信号为带有高频正弦噪声&#xff08; 100Hz&#xff09;的低频&#xff08;0.2Hz)正弦信号。采用低…

离散数学与组合数学-05树

文章目录离散数学与组合数学-05树5.1 认识树5.1.1 树的模型5.1.2 树的应用5.2 无向树5.2.1 定义5.2.2 树的性质5.2.3 性质应用5.3 生成树5.3.1 引入5.3.2 定义5.3.3 算法5.3.4 应用5.4 最小生成树5.4.1 引入5.4.2 定义5.4.3 算法5.5 根树5.5.1 根数定义5.5.2 倒置法5.5.3 树的家…

【编程入门】开源记事本(SwiftUI版)

背景 前面已输出多个系列&#xff1a; 《十余种编程语言做个计算器》 《十余种编程语言写2048小游戏》 《17种编程语言10种排序算法》 《十余种编程语言写博客系统》 《十余种编程语言写云笔记》 本系列对比云笔记&#xff0c;将更为简化&#xff0c;去掉了网络调用&#xff0…

C++模板进阶

这篇文章是对模板初阶的一些补充&#xff0c;让大家在进行深一层的理解。 文章目录1. 非类型模板参数2. 模板的特化2.1 概念2.2 函数模板特化2.3 类模板特化2.3.1 全特化2.3.2 偏特化2.4 类模板特化应用示例3 模板分离编译3.1 什么是分离编译3.2 模板的分离编译3.3 解决方法4.…

【各种**问题系列】什么是 LTS 长期支持

目录 &#x1f341; 什么是长期支持&#xff08;LTS&#xff09;版本&#xff1f; &#x1f342; LTS 版本的优点&#xff1a; &#x1f341; 什么是 Ubuntu LTS&#xff1f; &#x1f342; Ubuntu LTS 软件更新包括什么&#xff1f; 在 Linux 的世界里&#xff0c;特别是谈…

【Java开发】Spring Cloud 08 :链路追踪

任何一个架构难免会出现bug&#xff0c;微服务相比于单体架构日志查询更为困难&#xff0c;因此spring cloud推出了Sleuth等组件的链路追踪技术来实现报错信息的定位及查询。项目源码&#xff1a;尹煜 / coupon-yinyu GitCode1 调用链追踪我们可以想象这样一个场景&#xff0c…

单一数字评估指标、迁移学习、多任务学习、端到端的深度学习

目录1.单一数字评估指标(a single number evaluation metric)有时候要比较那个分类器更好&#xff0c;或者哪个模型更好&#xff0c;有很多指标&#xff0c;很难抉择&#xff0c;这个时候就需要设置一个单一数字评估指标。例1&#xff1a;比较A&#xff0c;B两个分类器的性能&a…

Android MVVM的实现

Android MVVM的实现 前言&#xff1a; 在我们写一些项目的时候&#xff0c;通常会对一些常用的一些常用功能进行抽象封装&#xff0c;简单例子&#xff1a;比如BaseActivity&#xff0c;BaseFragment等等…一般这些Base会去承载一些比如标题栏&#xff0c;主题之类的工作&…

提权漏洞和域渗透历史漏洞整理

Windows提权在线辅助工具 https://i.hacking8.com/tiquan/&#x1f334;Kernel privilege escalation vulnerability collection, with compilation environment, demo GIF map, vulnerability details, executable file (提权漏洞合集) https://github.com/Ascotbe/Kernelhu…

恶意代码分析实战 13 反调试技术

13.1 Lab16-01 首先&#xff0c;将可执行文件拖入IDA中。 我们可以看到有三处都调用了sub_401000函数&#xff0c;并且代码都在哪里停止执行。由于没有一条线从这些方框中引出&#xff0c;这就意味着函数可能终止了程序。 右侧每一个大框中都包含一个检查&#xff0c;这个检查…

Makefile学习②:Makefile基本语法

Makefile学习②&#xff1a;Makefile基本语法 Makefile基本语法 目标&#xff1a; 依赖 &#xff08;Tab&#xff09;命令 目标&#xff1a;一般是指要编译的目标&#xff0c;也可以是一个动作 依赖&#xff1a;指执行当前目标所要依赖的先项&#xff0c;包括其他目标&#xf…