Flink用户自定义连接器(Table API Connectors)学习总结

news2024/12/23 13:41:01

文章目录

    • 前言
    • 背景
    • 官网文档
      • 概述
        • 元数据
        • 解析器
        • 运行时的实现
    • 自定义扩展点
      • 工厂类
      • Source
      • 扩展Sink和编码与解码
    • 自定义flink-http-connector
      • SQL示例
      • 具体代码
        • pom依赖
        • HttpTableFactory
        • HttpTableSource
        • HttpSourceFunction
        • HttpClientUtil
    • 最后
    • 参考资料

前言

结合官网文档和自定义实现一个flink-http-connector,来学习总结Flink用户自定义连接器(Table API Connectors)。

背景

前段时间有个需求:需要Flink查询API接口,将返回的数据转为Flink Table,然后基于Table进行后面的计算。这个需求可以写Flink代码实现:使用HttpClient API请求接口返回数据,然后将返回的数据转为DataStream,最后再将DataStream转为Table。我想了一下是不是可以通过SQL的形式实现这种需求,于是在网上查了一下,还真有。Star比较多的项目:https://github.com/getindata/flink-http-connector.git,但是它要求Java 11,并且它的Http Source只支持Lookup Joins,限制太多,并不能满足我的需求。所以最终又尝试学习了自己写自定义的Table API Connectors,这样可以比较灵活的实现需求。

官网文档

官网文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sourcessinks/,官网文档详细介绍了自定义连接器的概念、扩展点并给出了一个完整的代码实例,实例实现了自定义socket连接器。本文摘录一部分官方文档,便于自己理解。

动态表是 Flink Table & SQL API的核心概念,用于统一有界和无界数据的处理。

动态表只是一个逻辑概念,因此 Flink 并不拥有数据。相应的,动态表的内容存储在外部系统( 如数据库、键值存储、消息队列 )或文件中。

动态 sources 和动态 sinks 可用于从外部系统读取数据和向外部系统写入数据。在文档中,sources 和 sinks 常在术语连接器 下进行总结。

Flink 为 Kafka、Hive 和不同的文件系统提供了预定义的连接器。有关内置 table sources 和 sinks 的更多信息,请参阅连接器部分:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/connectors/table/overview/

概述

实心箭头展示了在转换过程中对象如何从一个阶段到下一个阶段转换为其他对象。

元数据

Table API 和 SQL 都是声明式 API。这包括表的声明。因此,执行 CREATE TABLE 语句会导致目标 catalog 中的元数据更新。

对于大多数 catalog 实现,外部系统中的物理数据不会针对此类操作进行修改。特定于连接器的依赖项不必存在于类路径中。在 WITH 子句中声明的选项既不被验证也不被解释。

动态表的元数据( 通过 DDL 创建或由 catalog 提供 )表示为 CatalogTable 的实例。必要时,表名将在内部解析为 CatalogTable。

解析器

在解析和优化以 table 编写的程序时,需要将 CatalogTable 解析为 DynamicTableSource( 用于在 SELECT 查询中读取 )和 DynamicTableSink( 用于在 INSERT INTO 语句中写入 )。

DynamicTableSourceFactory 和 DynamicTableSinkFactory 提供连接器特定的逻辑,用于将 CatalogTable 的元数据转换为 DynamicTableSource 和 DynamicTableSink 的实例。在大多数情况下,以工厂模式设计的目的是验证选项(例如示例中的 ‘port’ = ‘5022’ ),配置编码解码格式( 如果需要 ),并创建表连接器的参数化实例。

默认情况下,DynamicTableSourceFactory 和 DynamicTableSinkFactory 的实例是使用 Java的 [Service Provider Interfaces (SPI)] (https://docs.oracle.com/javase/tutorial/sound/SPI-intro.html) 发现的。 connector 选项(例如示例中的 ‘connector’ = ‘custom’)必须对应于有效的工厂标识符。

尽管在类命名中可能不明显,但 DynamicTableSource 和 DynamicTableSink 也可以被视为有状态的工厂,它们最终会产生具体的运行时实现来读写实际数据。

规划器使用 source 和 sink 实例来执行连接器特定的双向通信,直到找到最佳逻辑规划。取决于声明可选的接口( 例如 SupportsProjectionPushDown 或 SupportsOverwrite),规划器可能会将更改应用于实例并且改变产生的运行时实现。

运行时的实现

一旦逻辑规划完成,规划器将从表连接器获取 runtime implementation。运行时逻辑在 Flink 的核心连接器接口中实现,例如 InputFormat 或 SourceFunction。

这些接口按另一个抽象级别被分组为 ScanRuntimeProvider、LookupRuntimeProvider 和 SinkRuntimeProvider 的子类。

例如,OutputFormatProvider( 提供 org.apache.flink.api.common.io.OutputFormat )和 SinkFunctionProvider( 提供org.apache.flink.streaming.api.functions.sink.SinkFunction)都是规划器可以处理的 SinkRuntimeProvider 具体实例。

自定义扩展点

工厂类

需要实现 org.apache.flink.table.factories.DynamicTableSourceFactory 接口完成一个工厂类,来生产 DynamicTableSource 类。比如:

public class HttpTableFactory implements DynamicTableSourceFactory {
  ...

我们需要在META-INF/services/org.apache.flink.table.factories.Factory中添加自定义实现的工厂类,如下图:

注意:图中的META-INF.services实际为META-INF/services

默认情况下,Java 的 SPI 机制会自动识别这些工厂类,同时将 connector 配置项作为工厂类的”标识符“。

一个工厂类可以同时实现Source和Sink,比如HoodieTableFactory

public class HoodieTableFactory implements DynamicTableSourceFactory, DynamicTableSinkFactory {
  ...

Source

按照定义,动态表是随时间变化的。

在读取动态表时,表中数据可以是以下情况之一:

  • changelog 流(支持有界或无界),在 changelog 流结束前,所有的改变都会被源源不断地消费,由 ScanTableSource 接口表示。
  • 处于一直变换或数据量很大的外部表,其中的数据一般不会被全量读取,除非是在查询某个值时,由 LookupTableSource 接口表示。
    一个类可以同时实现这两个接口,Planner 会根据查询的 Query 选择相应接口中的方法。
    示例:
public class HttpTableSource implements ScanTableSource {
  ...

可以实现更多的功能接口来优化数据源,比如实现 SupportsProjectionPushDown 接口,这样在运行时在 source 端就处理数据。在 org.apache.flink.table.connector.source.abilities 包下可以找到各种功能接口,更多内容可查看https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sourcessinks/#source-abilities。比如HoodieTableSource

public class HoodieTableSource implements
    ScanTableSource,
    SupportsPartitionPushDown,
    SupportsProjectionPushDown,
    SupportsLimitPushDown,
    SupportsFilterPushDown {
      ...

实现ScanTableSource接口的类必须能够生产Flink内部数据结构,因此每条记录都会按照org.apache.flink.table.data.RowData 的方式进行处理。所以我们可能还要一个类来生产RowData类型的数据,比如实现RichSourceFunction<RowData>

public class HttpSourceFunction extends RichSourceFunction<RowData> {
  ...

扩展Sink和编码与解码

可以参考官网:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sourcessinks/#%E5%8A%A8%E6%80%81%E8%A1%A8%E7%9A%84-sink-%E7%AB%AF

自定义flink-http-connector

自定义flink-http-connector,实现SQL读http接口数据,代码实现参考博文:https://www.cnblogs.com/Springmoon-venn/p/15392511.html,其实大部分和官网实例代码是一样的,不一样的地方是需要HttpSourceFunction中实现通过HttpClient读取接口并返回数据。我在参考博文的基础上添加了两个配置项http.moderead.streaming.enabled分别代表接口的方式、是否流读。接口方式支持post、get两种。

完整代码地址:https://github.com/dongkelun/flink-learning/tree/master/flink-http-connector
Flink版本 1.15

SQL示例

需要将项目打包,并放到$FLINK_HOME/lib下
Jar包地址:https://fast.uc.cn/s/a553136362ac4

create table cust_http_get_source(
    id int,
    name string
)WITH(
 'connector' = 'http',
 'http.url' = 'http://mock.apifox.cn/m1/2518376-0-default/test/flink/get/order',
 'format' = 'json'
);

create table cust_http_post_source(
    id int,
    name string
)WITH(
 'connector' = 'http',
 'http.url' = 'http://mock.apifox.cn/m1/2518376-0-default/test/flink/post/order',
 'http.mode' = 'post',
 'read.streaming.enabled' = 'true',
 'read.streaming.check-interval' = '10',
 'format' = 'json'
);

示例中的接口是我使用apifox在线mock出来的可以直接调用
参数解释:

  • http.mode 调用接口模式、支持post、get两种,默认get
  • read.streaming.enabled 是否流读,默认false
  • read.streaming.check-interval 流读间隔,单位秒,默认60s
    其实read.streaming.enabled和read.streaming.check-interval是拷贝的Hudi的源码配置

结果:

select * from cust_http_post_source;

在这里插入图片描述

具体代码

pom依赖

    <dependencies>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-common</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-table-api-java-bridge</artifactId>
            <version>${flink.version}</version>
            <scope>provided</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.httpcomponents</groupId>
            <artifactId>httpclient</artifactId>
            <version>4.5.14</version>
        </dependency>
    </dependencies>

HttpTableFactory

package com.dkl.flink.connector.http;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.ConfigOption;
import org.apache.flink.configuration.ConfigOptions;
import org.apache.flink.configuration.ReadableConfig;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.factories.DeserializationFormatFactory;
import org.apache.flink.table.factories.DynamicTableSourceFactory;
import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.types.DataType;

import java.util.HashSet;
import java.util.Set;

public class HttpTableFactory implements DynamicTableSourceFactory {

    // 定义所有配置项
    // define all options statically
    public static final ConfigOption<String> URL = ConfigOptions.key("http.url")
            .stringType()
            .noDefaultValue();

    public static final ConfigOption<String> MODE = ConfigOptions.key("http.mode")
            .stringType()
            .defaultValue("get");

    public static final ConfigOption<Boolean> READ_AS_STREAMING = ConfigOptions
            .key("read.streaming.enabled")
            .booleanType()
            .defaultValue(false)// default read as batch
            .withDescription("Whether to read as streaming source, default false");

    public static final ConfigOption<Long> READ_STREAMING_CHECK_INTERVAL = ConfigOptions.key("read.streaming.check-interval")
            .longType()
            .defaultValue(60L)// default 1 minute
            .withDescription("Check interval for streaming read of SECOND, default 1 minute");


    @Override
    public String factoryIdentifier() {
        // 用于匹配: `connector = '...'`
        return "http"; // used for matching to `connector = '...'`
    }

    @Override
    public Set<ConfigOption<?>> requiredOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(URL);
        // 解码的格式器使用预先定义的配置项
        options.add(FactoryUtil.FORMAT); // use pre-defined option for format
        return options;
    }

    @Override
    public Set<ConfigOption<?>> optionalOptions() {
        final Set<ConfigOption<?>> options = new HashSet<>();
        options.add(MODE);
        options.add(READ_AS_STREAMING);
        options.add(READ_STREAMING_CHECK_INTERVAL);
        return options;
    }

    @Override
    public DynamicTableSource createDynamicTableSource(Context context) {
        // 使用提供的工具类或实现你自己的逻辑进行校验
        // either implement your custom validation logic here ...
        // or use the provided helper utility
        final FactoryUtil.TableFactoryHelper helper = FactoryUtil.createTableFactoryHelper(this, context);

        // 找到合适的解码器
        // discover a suitable decoding format
        final DecodingFormat<DeserializationSchema<RowData>> decodingFormat = helper.discoverDecodingFormat(
                DeserializationFormatFactory.class,
                FactoryUtil.FORMAT);

        // 校验所有的配置项
        // validate all options
        helper.validate();

        // 获取校验完的配置项
        // get the validated options
        final ReadableConfig options = helper.getOptions();
        final String url = options.get(URL);
        final String mode = options.get(MODE);
        final boolean isStreaming = options.get(READ_AS_STREAMING);
        final long interval = options.get(READ_STREAMING_CHECK_INTERVAL);

        // 从 catalog 中抽取要生产的数据类型 (除了需要计算的列)
        // derive the produced data type (excluding computed columns) from the catalog table
        final DataType producedDataType =
                context.getCatalogTable().getResolvedSchema().toPhysicalRowDataType();

        // 创建并返回动态表 source
        // create and return dynamic table source
        return new HttpTableSource(url, mode, isStreaming, interval, decodingFormat, producedDataType);
    }
}

HttpTableSource

package com.dkl.flink.connector.http;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.table.connector.ChangelogMode;
import org.apache.flink.table.connector.format.DecodingFormat;
import org.apache.flink.table.connector.source.DynamicTableSource;
import org.apache.flink.table.connector.source.ScanTableSource;
import org.apache.flink.table.connector.source.SourceFunctionProvider;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.types.DataType;

public class HttpTableSource implements ScanTableSource {

    private final String url;
    private final String mode;
    private final boolean isStreaming;
    private final long interval;
    private final DecodingFormat<DeserializationSchema<RowData>> decodingFormat;
    private final DataType producedDataType;

    public HttpTableSource(
            String hostname,
            String mode,
            boolean isStreaming,
            long interval,
            DecodingFormat<DeserializationSchema<RowData>> decodingFormat,
            DataType producedDataType) {
        this.url = hostname;
        this.mode = mode;
        this.isStreaming = isStreaming;
        this.interval = interval;
        this.decodingFormat = decodingFormat;
        this.producedDataType = producedDataType;
    }

    @Override
    public ChangelogMode getChangelogMode() {
        // 在我们的例子中,由解码器来决定 changelog 支持的模式
        // 但是在 source 端指定也可以
        // in our example the format decides about the changelog mode
        // but it could also be the source itself
        return decodingFormat.getChangelogMode();
    }

    @Override
    public ScanRuntimeProvider getScanRuntimeProvider(ScanContext runtimeProviderContext) {

        // 创建运行时类用于提交给集群
        // create runtime classes that are shipped to the cluster
        final DeserializationSchema<RowData> deserializer = decodingFormat.createRuntimeDecoder(
                runtimeProviderContext,
                producedDataType);

        final SourceFunction<RowData> sourceFunction = new HttpSourceFunction(url, mode, isStreaming, interval, deserializer);

        return SourceFunctionProvider.of(sourceFunction, !isStreaming);
    }

    @Override
    public DynamicTableSource copy() {
        return new HttpTableSource(url, mode, isStreaming, interval, decodingFormat, producedDataType);
    }

    @Override
    public String asSummaryString() {
        return "Http Table Source";
    }
}

HttpSourceFunction

package com.dkl.flink.connector.http;

import org.apache.flink.api.common.serialization.DeserializationSchema;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.SimpleCounter;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.apache.flink.table.data.RowData;

import java.util.concurrent.TimeUnit;

/**
 * http table source
 */
public class HttpSourceFunction extends RichSourceFunction<RowData> {

    private volatile boolean isRunning = true;
    private String url;
    private String mode;
    private boolean isStreaming;
    private long interval;
    private DeserializationSchema<RowData> deserializer;
    // count out event
    private transient Counter counter;

    public HttpSourceFunction(String url, String mode, boolean isStreaming,
                              long interval, DeserializationSchema<RowData> deserializer) {
        this.url = url;
        this.mode = mode;
        this.isStreaming = isStreaming;
        this.interval = interval;
        this.deserializer = deserializer;
    }

    @Override
    public void open(Configuration parameters) {

        counter = new SimpleCounter();
        this.counter = getRuntimeContext()
                .getMetricGroup()
                .counter("http-counter");
    }

    @Override
    public void run(SourceContext<RowData> ctx) {
        if (isStreaming) {
            while (isRunning) {
                try {
                    // 接收http消息
                    // receive http message
                    String message = mode.equalsIgnoreCase("get") ? HttpClientUtil.get(url) : HttpClientUtil.post(url, "");
                    // 解码并处理记录
                    // deserializer message
                    ctx.collect(deserializer.deserialize(message.getBytes()));
                    this.counter.inc();

                    TimeUnit.SECONDS.sleep(interval);
                } catch (Exception e) {
                    throw new RuntimeException(e);
                }
            }
        } else {
            try {
                // receive http message
                String message = mode.equalsIgnoreCase("get") ? HttpClientUtil.get(url) : HttpClientUtil.post(url, "");
                // deserializer message
                ctx.collect(deserializer.deserialize(message.getBytes()));
                this.counter.inc();
            } catch (Exception e) {
                throw new RuntimeException(e);
            }
        }


    }

    @Override
    public void cancel() {
        isRunning = false;
    }
}

HttpClientUtil

package com.dkl.flink.connector.http;

import com.dkl.flink.utils.StringUtils;
import org.apache.http.HttpResponse;
import org.apache.http.client.methods.HttpGet;
import org.apache.http.client.methods.HttpPost;
import org.apache.http.entity.StringEntity;
import org.apache.http.impl.client.CloseableHttpClient;
import org.apache.http.impl.client.HttpClients;
import org.apache.http.util.EntityUtils;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;

public class HttpClientUtil {
    public static String post(String uri, String json) throws Exception {
        HttpPost httpPost = new HttpPost(uri);
        CloseableHttpClient client = HttpClients.createDefault();
        StringEntity entity = StringUtils.isNullOrEmpty(json) ? new StringEntity("utf-8") : new StringEntity(json, "utf-8");//解决中文乱码问题
        entity.setContentEncoding("UTF-8");
        entity.setContentType("application/json");
        httpPost.setEntity(entity);
        HttpResponse resp = client.execute(httpPost);
        String respContent = EntityUtils.toString(resp.getEntity(), "UTF-8");
        if (resp.getStatusLine().getStatusCode() == 200) {
            return respContent;
        } else {
            throw new RuntimeException(String.format("调用接口%s失败:%s", uri, respContent));
        }
    }

    public static String get(String uri) throws Exception {
        HttpGet httpGet = new HttpGet(uri);
        CloseableHttpClient client = HttpClients.createDefault();
        HttpResponse resp = client.execute(httpGet);
        String respContent = EntityUtils.toString(resp.getEntity(), "UTF-8");
        if (resp.getStatusLine().getStatusCode() == 200) {
            return respContent;
        } else {
            throw new RuntimeException(String.format("调用接口%s失败:%s", uri, respContent));
        }
    }
}    

最后

其实我最终还是写代码来实现读取Http接口再转为Flink Table,因为我们的需求比较复杂,一个接口返回的数据需要转化为很多张表,且表的个数、Schema都不固定。但是学会了自定义连接器有利于我们加深对Flink的理解和技术掌握,对于后面有需求的也可以轻松扩展,而且对于理解Flink Hudi Connector源码很有帮助。

参考资料

  • https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/dev/table/sourcessinks/
  • https://www.cnblogs.com/Springmoon-venn/p/15392511.html
  • https://www.cnblogs.com/Ning-Blog/p/16714511.html

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/562637.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Leetcode | 1534 统计好三元组

1534 统计好三元组 文章目录 [1534 统计好三元组](https://leetcode.cn/problems/count-good-triplets/description/)题目解法1 暴力穷举解法2 枚举优化 题目 给你一个整数数组 arr &#xff0c;以及 a、b 、c 三个整数。请你统计其中好三元组的数量。 如果三元组 (arr[i], a…

怎么把录音转文字?推荐你这三款工具

随着科技不断发展&#xff0c;录音转文字的技术也逐渐被广泛应用于各种场景中。其中最常见的一种就是会议记录。在日常工作中&#xff0c;会议是企业和组织中必不可少的一个环节&#xff0c;但在会议过程中的录音和记录往往需要花费大量的时间和精力。这个时候&#xff0c;我们…

【AI聊天丨 ChatGPT应用案例一】— 仅用30分钟,ChatGPT帮你完成专利交底书!

Hi&#xff0c;大家好&#xff0c;我是零点壹客&#xff0c;今天主要也是想和大家一起唠唠ChatGPT&#xff0c; 尤其这两个月&#xff0c;ChatGPT出奇的火&#xff0c;想必各位圈友们或多或少的都已经有些了解。 ChatGPT的出现很大程度上已经改变了我们的工作方式&#xff0c;尤…

GPT4限制被破解!ChatGPT实现超长文本处理的新方法

目录 前言 使用chat-gpt过程中有哪些痛点 1.无法理解人类情感和主观性 2.上下文丢失 3.约定被打断 那如何去解决这个痛点 Transformer&#xff08;RMT&#xff09;怎么去实现的 1.Transformer 模型 2.RMT模型 3.计算推理速率 4.渐进学习能力 总结 写到最后 大家好…

DeepPyramid:在白内障手术视频中实现金字塔视图和可变形金字塔接收的语义分割

文章目录 DeepPyramid: Enabling Pyramid View and Deformable Pyramid Reception for Semantic Segmentation in Cataract Surgery Videos摘要本文方法模块细节 实验结果 DeepPyramid: Enabling Pyramid View and Deformable Pyramid Reception for Semantic Segmentation in …

探秘 | 简说IP地址以及路由器的功能究竟是什么?

我们都知道我们在上网的时候都有一个IP地址&#xff0c;用来和其他人进行通信和数据交换。 其中IP地址又分为内网地址和外网地址&#xff0c;也叫作私有地址和公有地址。 为什么要区分私有地址和公有地址呢&#xff1f;原因很简单&#xff0c;因为公有的IP地址不够使用了&…

UnityVR--组件4--Ray/Raycast/Linecast/OverlapSphere

目录 Ray/Raycast/Linecast//OverlapSphere简介 Ray类 Physics.Raycast方法 应用1&#xff1a;实现鼠标点击出射线并检测物体 应用2&#xff1a;实现鼠标点击拖拽物体 Physics.Linecast和Physics.OverlapSphere 应用3&#xff1a;进入范围时触发攻击 Ray/Raycast/Lineca…

day13 网络编程Tomcat服务器

c/s架构和b/s架构的区别 c/s架构:客户端软件,直观,体验好,界面美观,安全性高 b/s架构:浏览器–>服务器,可移植性好,开发和维护性好 网络访问的三要素:ip,端口,协议 udp协议和tcp协议的区别 udp协议:只管发送,不管发送到哪里,是否能不能接收,一对多,无连接通信协议 ​ …

蓝桥:前端开发笔面必刷题——Day3 数组(三)

文章目录 &#x1f4cb;前言&#x1f3af;两数之和 II&#x1f4da;题目内容✅解答 &#x1f3af;移除元素&#x1f4da;题目内容✅解答 &#x1f3af;有序数组的平方&#x1f4da;题目内容✅解答 &#x1f3af;三数之和&#x1f4da;题目内容✅解答 &#x1f4dd;最后 &#x…

混沌演练实践(二)-支付加挂链路演练 | 京东云技术团队

1. 背景 当前微服务架构下&#xff0c;各个服务间依赖高&#xff0c;调用关系复杂&#xff0c;业务场景很少可以通过一个系统来实现&#xff0c;常见的业务场景实现基本涉及多个上下游系统&#xff0c;要保证整体链路的稳定性&#xff0c;需要尽量减少系统之间的耦合性&#x…

Elasticsearch与Clickhouse数据存储对比 | 京东云技术团队

1 背景 京喜达技术部在社区团购场景下采用JDQFlinkElasticsearch架构来打造实时数据报表。随着业务的发展Elasticsearch开始暴露出一些弊端&#xff0c;不适合大批量的数据查询&#xff0c;高频次分页导出导致宕机、存储成本较高。 Elasticsearch的查询语句维护成本较高、在聚…

CloudBase CMS的开发注意事项

引言 在进行基于云开发的微信小程序开发时为了减轻工作量打算用CloudBase CMS来减轻工作量&#xff0c;随后去了解并体验了CloudBase CMS的使用&#xff0c;总体来说还有些许问题没有解决&#xff0c;对减轻后台管理工作并没有起到很大的作用。 项目情景 使用CloudBase CMS来管…

Flutter 笔记 | Flutter 基础组件

Text Text 用于显示简单样式文本&#xff0c;它包含一些控制文本显示样式的一些属性&#xff0c;一个简单的例子如下&#xff1a; Text("Hello world",textAlign: TextAlign.left, );Text("Hello world! Im Jack. "*4,maxLines: 1,overflow: TextOverflo…

HACKABLE: III

文章目录 HACKABLE: III实战演练一、前期准备1、相关信息 二、信息收集1、端口扫描2、访问网站3、查看网站源码4、扫描目录5、访问网址6、查看并下载7、访问网站8、查看文件9、解密10、访问网站11、访问网站12、查看文件13、解密14、访问网站15、访问网站16、下载图片17、隐写1…

tcp连接阿里云linux服务器失败

原因&#xff1a; 自己程序bind的是127.0.0.1 应该改成 bind 阿里云的私网地址 client连接的是阿里云公网地址 参考&#xff1a; 阿里云服务器&#xff0c;客户端socket无法连接的问题 - 爱码网 排查过程记录&#xff1a; 1&#xff0c;安全组设置&#xff1a;有正常设置…

【C++】 模板(泛型编程、函数模板、类模板)

文章目录 模板泛型编程概念 函数模板常规使用显式指定及默认值多模板参数模板函数的声明和定义用函数模板优化冒泡排序 类模板常规使用显式指定及默认值多模板参数类中成员函数的定义和声明嵌套的类模板1.类和类型都能确定2.类和类型都不能确定3.类能确定&#xff0c;类型不确定…

Unity3D下如何实现跨平台低延迟的RTMP、RTSP播放

技术背景 好多开发者&#xff0c;希望我们能探讨下Unity平台RTMP或RTSP直播流数据播放和录制相关的模块&#xff0c;实际上&#xff0c;这块流程我们已经聊过多次&#xff0c;无非就是通过原生的RTMP或者RTSP模块&#xff0c;先从协议层拉取到数据&#xff0c;并解包解码&…

常用的表格检测识别方法——表格结构识别方法(上)

第三章 常用的表格检测识别方法 3.2表格结构识别方法 表格结构识别是表格区域检测之后的任务&#xff0c;其目标是识别出表格的布局结构、层次结构等&#xff0c;将表格视觉信息转换成可重建表格的结构描述信息。这些表格结构描述信息包括&#xff1a;单元格的具体位置、单元格…

子网掩码计算方法

子网掩码是用来划分网络的一种方式&#xff0c;它是一个32位的二进制数&#xff0c;用于将IP地址分成网络地址和主机地址两部分。子网掩码中的1表示网络地址&#xff0c;0表示主机地址。计算子网掩码的方式取决于需要划分的网络数量和主机数量。 以下是一些计算子网掩码的示例…

【LeetCode热题100】打卡第2天:两数相加

两数相加 ⛅前言 大家好&#xff0c;我是知识汲取者&#xff0c;欢迎来到我们的LeetCode热题100刷题专栏&#xff01; 精选 100 道力扣&#xff08;LeetCode&#xff09;上最热门的题目&#xff0c;适合初识算法与数据结构的新手和想要在短时间内高效提升的人&#xff0c;熟练…