[BitSail] Connector开发详解系列四:Sink、Writer

news2025/1/11 18:01:50

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

Sink Connector

BitSail Sink Connector交互流程介绍

在这里插入图片描述

  • Sink:数据写入组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。
  • Writer:负责将接收到的数据写到外部存储。
  • WriterCommitter(可选):对数据进行提交操作,来完成两阶段提交的操作;实现exactly-once的语义。

开发者首先需要创建Sink类,实现Sink接口,主要负责数据写入组件的生命周期管理,构架作业。通过configure方法定义writerConfiguration的配置,通过createTypeInfoConverter方法来进行数据类型转换,将内部类型进行转换写到外部系统,同Source部分。之后我们再定义Writer类实现具体的数据写入逻辑,在write方法调用时将BitSail Row类型把数据写到缓存队列中,在flush方法调用时将缓存队列中的数据刷写到目标数据源中。

Sink

数据写入组件的生命周期管理,主要负责和框架的交互,构架作业,它不参与作业真正的执行。

对于每一个Sink任务,我们要实现一个继承Sink接口的类。

在这里插入图片描述

Sink接口

public interface Sink<InputT, CommitT extends Serializable, WriterStateT extends Serializable> extends Serializable {

  /**
*  @return  The name of writer operation.
*/
String getWriterName();

  /**
* Configure writer with user defined options.
*
*  @param  commonConfiguration Common options.
*  @param  writerConfiguration Options for writer.
*/
void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) throws Exception;

  /**
* Create a writer for processing elements.
*
*  @return  An initialized writer.
*/
Writer<InputT, CommitT, WriterStateT> createWriter(Writer.Context<WriterStateT> context) throws IOException;

  /**
*  @return  A converter which supports conversion from BitSail {  @link  TypeInfo}
* and external engine type.
*/
default TypeInfoConverter createTypeInfoConverter() {
    return new BitSailTypeInfoConverter();
  }

  /**
*  @return  A committer for commit committable objects.
*/
default Optional<WriterCommitter<CommitT>> createCommitter() {
    return Optional.empty();
  }

  /**
*  @return  A serializer which convert committable object to byte array.
*/
default BinarySerializer<CommitT> getCommittableSerializer() {
    return new SimpleBinarySerializer<CommitT>();
  }

  /**
*  @return  A serializer which convert state object to byte array.
*/
default BinarySerializer<WriterStateT> getWriteStateSerializer() {
    return new SimpleBinarySerializer<WriterStateT>();
  }
}

configure方法

负责configuration的初始化,通过commonConfiguration中的配置区分流式任务或者批式任务,向Writer类传递writerConfiguration。

示例

ElasticsearchSink:

public void configure(BitSailConfiguration commonConfiguration, BitSailConfiguration writerConfiguration) {
  writerConf = writerConfiguration;
}

createWriter方法

负责生成一个继承自Writer接口的connector Writer类。

createTypeInfoConverter方法

类型转换,将内部类型进行转换写到外部系统,同Source部分。

createCommitter方法

可选方法,书写具体数据提交逻辑,一般用于想要保证数据exactly-once语义的场景,writer在完成数据写入后,committer来完成提交,进而实现二阶段提交,详细可以参考Doris Connector的实现。

Writer

具体的数据写入逻辑

在这里插入图片描述

Writer接口

public interface Writer<InputT, CommT, WriterStateT> extends Serializable, Closeable {

  /**
* Output an element to target source.
*
*  @param  element Input data from upstream.
*/
void write(InputT element) throws IOException;

  /**
* Flush buffered input data to target source.
*
*  @param  endOfInput Flag indicates if all input data are delivered.
*/
void flush(boolean endOfInput) throws IOException;

  /**
* Prepare commit information before snapshotting when checkpoint is triggerred.
*
*  @return  Information to commit in this checkpoint.
*  @throws  IOException Exceptions encountered when preparing committable information.
*/
List<CommT> prepareCommit() throws IOException;

  /**
* Do snapshot for at each checkpoint.
*
*  @param  checkpointId The id of checkpoint when snapshot triggered.
*  @return  The current state of writer.
*  @throws  IOException Exceptions encountered when snapshotting.
*/
default List<WriterStateT> snapshotState(long checkpointId) throws IOException {
    return Collections.emptyList();
  }

  /**
* Closing writer when operator is closed.
*
*  @throws  IOException Exception encountered when closing writer.
*/
default void close() throws IOException {

  }

  interface Context<WriterStateT> extends Serializable {

    TypeInfo<?>[] getTypeInfos();

    int getIndexOfSubTaskId();

    boolean isRestored();

    List<WriterStateT> getRestoreStates();
  }
}

构造方法

根据writerConfiguration配置初始化数据源的连接对象。

示例

public RedisWriter(BitSailConfiguration writerConfiguration) {
  // initialize ttl
  int ttl = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.TTL, -1);
  TtlType ttlType;
  try {
    ttlType = TtlType.valueOf(StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
  } catch (IllegalArgumentException e) {
    throw BitSailException.asBitSailException(RedisPluginErrorCode.ILLEGAL_VALUE,
        String.format("unknown ttl type: %s", writerConfiguration.get(RedisWriterOptions.TTL_TYPE)));
  }
  int ttlInSeconds = ttl < 0 ? -1 : ttl * ttlType.getContainSeconds();
  log.info("ttl is {}(s)", ttlInSeconds);

  // initialize commandDescription
  String redisDataType = StringUtils.upperCase(writerConfiguration.get(RedisWriterOptions.REDIS_DATA_TYPE));
  String additionalKey = writerConfiguration.getUnNecessaryOption(RedisWriterOptions.ADDITIONAL_KEY, "default_redis_key");
  this.commandDescription = initJedisCommandDescription(redisDataType, ttlInSeconds, additionalKey);
  this.columnSize = writerConfiguration.get(RedisWriterOptions.COLUMNS).size();

  // initialize jedis pool
  JedisPoolConfig jedisPoolConfig = new JedisPoolConfig();
  jedisPoolConfig.setMaxTotal(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_TOTAL_CONNECTIONS));
  jedisPoolConfig.setMaxIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_IDLE_CONNECTIONS));
  jedisPoolConfig.setMinIdle(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MIN_IDLE_CONNECTIONS));
  jedisPoolConfig.setMaxWait(Duration.ofMillis(writerConfiguration.get(RedisWriterOptions.JEDIS_POOL_MAX_WAIT_TIME_IN_MILLIS)));

  String redisHost = writerConfiguration.getNecessaryOption(RedisWriterOptions.HOST, RedisPluginErrorCode.REQUIRED_VALUE);
  int redisPort = writerConfiguration.getNecessaryOption(RedisWriterOptions.PORT, RedisPluginErrorCode.REQUIRED_VALUE);
  String redisPassword = writerConfiguration.get(RedisWriterOptions.PASSWORD);
  int timeout = writerConfiguration.get(RedisWriterOptions.CLIENT_TIMEOUT_MS);

  if (StringUtils.isEmpty(redisPassword)) {
    this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout);
  } else {
    this.jedisPool = new JedisPool(jedisPoolConfig, redisHost, redisPort, timeout, redisPassword);
  }

  // initialize record queue
  int batchSize = writerConfiguration.get(RedisWriterOptions.WRITE_BATCH_INTERVAL);
  this.recordQueue = new CircularFifoQueue<>(batchSize);

  this.logSampleInterval = writerConfiguration.get(RedisWriterOptions.LOG_SAMPLE_INTERVAL);
  this.jedisFetcher = RetryerBuilder.<Jedis>newBuilder()
      .retryIfResult(Objects::isNull)
      .retryIfRuntimeException()
      .withStopStrategy(StopStrategies.stopAfterAttempt(3))
      .withWaitStrategy(WaitStrategies.exponentialWait(100, 5, TimeUnit.MINUTES))
      .build()
      .wrap(jedisPool::getResource);

  this.maxAttemptCount = writerConfiguration.get(RedisWriterOptions.MAX_ATTEMPT_COUNT);
  this.retryer = RetryerBuilder.<Boolean>newBuilder()
      .retryIfResult(needRetry -> Objects.equals(needRetry, true))
      .retryIfException(e -> !(e instanceof BitSailException))
      .withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
      .withStopStrategy(StopStrategies.stopAfterAttempt(maxAttemptCount))
      .build();
}

write方法

该方法调用时会将BitSail Row类型把数据写到缓存队列中,也可以在这里对Row类型数据进行各种格式预处理。直接存储到缓存队列中,或者进行加工处理。如果这里设定了缓存队列的大小,那么在缓存队列写满后要调用flush进行刷写。

示例

redis:将BitSail Row格式的数据直接存储到一定大小的缓存队列中

public void write(Row record) throws IOException {
  validate(record);
  this.recordQueue.add(record);
  if (recordQueue.isAtFullCapacity()) {
    flush(false);
  }
}

Druid:将BitSail Row格式的数据做格式预处理,转化到StringBuffer中储存起来。

@Override
public void write(final Row element) {
  final StringJoiner joiner = new StringJoiner(DEFAULT_FIELD_DELIMITER, "", "");
  for (int i = 0; i < element.getArity(); i++) {
    final Object v = element.getField(i);
    if (v != null) {
      joiner.add(v.toString());
    }
  }
  // timestamp column is a required field to add in Druid.
  // See https://druid.apache.org/docs/24.0.0/ingestion/data-model.html#primary-timestamp
  joiner.add(String.valueOf(processTime));
  data.append(joiner);
  data.append(DEFAULT_LINE_DELIMITER);
}

flush方法

该方法中主要实现将write方法的缓存中的数据刷写到目标数据源中。

示例

redis:将缓存队列中的BitSail Row格式的数据刷写到目标数据源中。

public void flush(boolean endOfInput) throws IOException {
  processorId++;
  try (PipelineProcessor processor = genPipelineProcessor(recordQueue.size(), this.complexTypeWithTtl)) {
    Row record;
    while ((record = recordQueue.poll()) != null) {

      String key = (String) record.getField(0);
      String value = (String) record.getField(1);
      String scoreOrHashKey = value;
      if (columnSize == SORTED_SET_OR_HASH_COLUMN_SIZE) {
        value = (String) record.getField(2);
        // Replace empty key with additionalKey in sorted set and hash.
        if (key.length() == 0) {
          key = commandDescription.getAdditionalKey();
        }
      }

      if (commandDescription.getJedisCommand() == JedisCommand.ZADD) {
        // sorted set
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), parseScoreFromString(scoreOrHashKey), value.getBytes()));
      } else if (commandDescription.getJedisCommand() == JedisCommand.HSET) {
        // hash
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), scoreOrHashKey.getBytes(), value.getBytes()));
      } else if (commandDescription.getJedisCommand() == JedisCommand.HMSET) {
        //mhset
        if ((record.getArity() - 1) % 2 != 0) {
          throw new BitSailException(CONVERT_NOT_SUPPORT, "Inconsistent data entry.");
        }
        List<byte[]> datas = Arrays.stream(record.getFields())
            .collect(Collectors.toList()).stream().map(o -> ((String) o).getBytes())
            .collect(Collectors.toList()).subList(1, record.getFields().length);
        Map<byte[], byte[]> map = new HashMap<>((record.getArity() - 1) / 2);
        for (int index = 0; index < datas.size(); index = index + 2) {
          map.put(datas.get(index), datas.get(index + 1));
        }
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), map));
      } else {
        // set and string
        processor.addInitialCommand(new Command(commandDescription, key.getBytes(), value.getBytes()));
      }
    }
    retryer.call(processor::run);
  } catch (ExecutionException | RetryException e) {
    if (e.getCause() instanceof BitSailException) {
      throw (BitSailException) e.getCause();
    } else if (e.getCause() instanceof RedisUnexpectedException) {
      throw (RedisUnexpectedException) e.getCause();
    }
    throw e;
  } catch (IOException e) {
    throw new RuntimeException("Error while init jedis client.", e);
  }
}

Druid:使用HTTP post方式提交sink作业给数据源。

private HttpURLConnection provideHttpURLConnection(final String coordinatorURL) throws IOException {
    final URL url = new URL("http://" + coordinatorURL + DRUID_ENDPOINT);
    final HttpURLConnection con = (HttpURLConnection) url.openConnection();
    con.setRequestMethod("POST");
    con.setRequestProperty("Content-Type", "application/json");
    con.setRequestProperty("Accept", "application/json, text/plain, */*");
    con.setDoOutput(true);
    return con;
  }
  
  public void flush(final boolean endOfInput) throws IOException {
    final ParallelIndexIOConfig ioConfig = provideDruidIOConfig(data);
    final ParallelIndexSupervisorTask indexTask = provideIndexTask(ioConfig);
    final String inputJSON = provideInputJSONString(indexTask);
    final byte[] input = inputJSON.getBytes();
    try (final OutputStream os = httpURLConnection.getOutputStream()) {
      os.write(input, 0, input.length);
    }
    try (final BufferedReader br =
                 new BufferedReader(new InputStreamReader(httpURLConnection.getInputStream(), StandardCharsets.UTF_8))) {
      final StringBuilder response = new StringBuilder();
      String responseLine;
      while ((responseLine = br.readLine()) != null) {
        response.append(responseLine.trim());
      }
      LOG.info("Druid write task has been sent, and the response is {}", response);
    }
  }

close方法

关闭之前创建的各种目标数据源连接对象。

示例

public void close() throws IOException {
  bulkProcessor.close();
  restClient.close();
  checkErrorAndRethrow();
}

【关于BitSail】:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和建议:https://github.com/bytedance/bitsail/issues

贡献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

加入BitSail技术社群

在这里插入图片描述

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/958107.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

Vue2项目练手——通用后台管理项目第四节

Vue2项目练手——通用后台管理项目 数据的请求mock数据模拟实战文件目录src/api/mock.jssrc/api/mockServeData/home.jsmain.js 首页组件布局可视化图表可视化图表布局Home.vue echarts表Home.vue 数据的请求 mock数据模拟实战 mock官方文档 前端用来模拟后端接口的工具&…

3分钟做出的大屏可视化报表,被领导疯狂点赞

3分钟&#xff0c;不仅做出了大屏可视化报表&#xff0c;还被领导疯狂点赞&#xff01;你没看错&#xff0c;这确实是可以实现的。奥威BI数据可视化工具提供大量可视化大屏报表模板&#xff0c;只需一键下载使用&#xff0c;替换数据源&#xff0c;再根据个性化需求进行调整修改…

Windows环境下的Tomcat服务器安装和配置教程,包括外网远程访问的设置方法

文章目录 前言1.本地Tomcat网页搭建1.1 Tomcat安装1.2 配置环境变量1.3 环境配置1.4 Tomcat运行测试1.5 Cpolar安装和注册 2.本地网页发布2.1.Cpolar云端设置2.2 Cpolar本地设置 3.公网访问测试4.结语 前言 Tomcat作为一个轻量级的服务器&#xff0c;不仅名字很有趣&#xff0…

【算法与数据结构】617、LeetCode合并二叉树

文章目录 一、题目二、解法三、完整代码 所有的LeetCode题解索引&#xff0c;可以看这篇文章——【算法和数据结构】LeetCode题解。 一、题目 二、解法 思路分析&#xff1a;采用递归的方式遍历二叉树&#xff0c;【算法与数据结构】144、94、145LeetCode二叉树的前中后遍历&am…

手机提词器怎么开启?这些方法不要错过

随着科技的发展&#xff0c;手机提词器成为了越来越多人写作的好帮手。在很多情况下比如直播、视频会议我们不方便一边看镜头一边看文稿&#xff0c;这种时候我们就需要使用提词器功能来规避麻烦了&#xff0c;如何开启手机提词器&#xff1f;有哪些需要注意的事项呢&#xff1…

常见变频器品牌-修改参数时的密码汇总

常见变频器品牌-修改参数时的密码汇总 1. 艾默生TD3000系列 密码:8888 2. 艾默生TD3300系列 密码:2002 3. 施耐德变频器 在SUP菜单下,找到COD选项进入,输入6969即可, 4. 台达变频器-B系列 密码:57522 5. 台达变频器-H系列 密码:33582 6. 台达S1系列 密码:57522

网络渗透day6-面试01

&#x1f609; 和渗透测试相关的面试问题。 介绍 如果您想自学网络渗透&#xff0c;有许多在线平台和资源可以帮助您获得相关的知识和技能。以下是一些受欢迎的自学网络渗透的平台和资源&#xff1a; Hack The Box: Hack The Box&#xff08;HTB&#xff09;是一个受欢迎的平…

深度学习-4-二维目标检测-YOLOv3模型

单阶段目标检测模型YOLOv3 R-CNN系列算法需要先产生候选区域&#xff0c;再对候选区域做分类和位置坐标的预测&#xff0c;这类算法被称为两阶段目标检测算法。近几年&#xff0c;很多研究人员相继提出一系列单阶段的检测算法&#xff0c;只需要一个网络即可同时产生候选区域并…

简单深度理解c++数论--资料免费分享

本篇博文想分享一个数论资料,是帮助大家简单深度理解c数论. 作者承诺&#xff1a;分享的东西没有病毒&#xff0c;是资料。 分享的东西是关于数论的。 分享的东西免费&#xff01;免费&#xff01;免费&#xff01;欢迎大家下载学习&#xff01; 创作不易&#xff0c;请多加…

django中配置使用websocket终极解决方案

django ASGI/Channels 启动和 ASGI/daphne的区别 Django ASGI/Channels 是 Django 框架的一个扩展&#xff0c;它提供了异步服务器网关接口&#xff08;ASGI&#xff09;协议的支持&#xff0c;以便处理实时应用程序的并发连接。ASGI 是一个用于构建异步 Web 服务器和应用程序…

若依tab-content面板失效、使用load的解决方法(附详细步骤)

【版权所有&#xff0c;文章允许转载&#xff0c;但须以链接方式注明源地址&#xff0c;否则追究法律责任】【创作不易&#xff0c;点个赞就是对我最大的支持】 前言 仅作为学习笔记&#xff0c;供大家参考 总结的不错的话&#xff0c;记得点赞收藏关注哦&#xff01; 思路&…

智慧工地-工地管理系统源码

智慧工地是聚焦工程施工现场&#xff0c;紧紧围绕人、机、料、法、环等关键要素&#xff0c;综合运用物联网、云计算、大数据、移动计算和智能设备等软硬件信息技术&#xff0c;与施工生产过程相融合。 智慧工地管理平台充分运用数字化技术&#xff0c;聚焦施工现场岗位一线&am…

剑指 Offer 14- I. 剪绳子(中等)

题目&#xff1a; class Solution { public:int cuttingRope(int n) {vector<int> dp(n1); //dp[i]表示长度为i的绳子能得到的最大乘积dp[0]0;dp[1]0;dp[2]1; //长度为0和1的绳子不能剪不了for(int i3;i<n;i){for(int j1;j<i/2;j){ //j代表这一次剪…

K8s组件全解析,你需要知道的一切秘密

当你部署完 Kubernetes&#xff0c;便拥有了一个完整的集群。 Kubernetes&#xff0c;作为目前最流行和广泛采用的容器编排和管理平台&#xff0c;背后有一系列强大的组件&#xff0c;共同协作以实现容器化应用的自动化部署、弹性扩展、服务发现和负载均衡等关键功能。本文将介…

环保环卫行业案例 | 燕千云助力高能环境搭建数智化IT服务管理体系及平台

当前环境卫生问题在全球已引起前所未有的关注&#xff0c;而促进健康又成为环境与发展所关注的核心问题。随着数字化时代的到来&#xff0c;环保环卫行业呈现出多个发展趋势&#xff0c;随着业务系统规模的不断扩大&#xff0c;信息系统的运维问题也日益突出&#xff0c;需要得…

Linux6.41 Kubernetes 对外服务之 Ingress

文章目录 计算机系统5G云计算第三章 LINUX Kubernetes 对外服务之 Ingress一、Ingress 简介1.NodePort2.LoadBalancer3.externalIPs4.Ingress 二.Ingress 组成1.ingress2.ingress-controller3.总结 三、Ingress-Nginx 工作原理四、ingress 暴露服务的方式1.DaemonSetHostNetwor…

详解排序算法(附带Java/Python/Js源码)

冒泡算法 依次比较两个相邻的子元素&#xff0c;如果他们的顺序错误就把他们交换过来&#xff0c;重复地进行此过程直到没有相邻元素需要交换&#xff0c;即完成整个冒泡&#xff0c;时间复杂度。 比较相邻的元素。如果第一个比第二个大&#xff0c;就交换它们两个&#xff1b;…

MATLAB中circshift函数转化为C语言

背景 有项目算法使用matlab中circshift函数进行运算&#xff0c;这里需要将转化为C语言&#xff0c;从而模拟算法运行&#xff0c;将算法移植到qt。 MATLAB中circshift简单介绍 circshift是循环移位函数。可以使用于数组和矩阵元素的循环移位。 当A是数组 Bcircshift(A,p);如果…

大模型时代,如何以 SDS 助力 AI 发展?

本文根据 XSKY星辰天合产品总监赵琳在 2023 闪存峰会上的演讲。该峰会主题为“芯存储 AI 未来”&#xff0c;由 DOIT 传媒主办、杭州华澜微电子股份有限公司协办&#xff0c;于 8 月 29-30 日在杭州召开。 星辰天合产品总监赵琳在 2023 闪存峰会上发表演讲 人工智能已经从经典…