[BitSail] Connector开发详解系列二:SourceSplitCoordinator

news2024/12/27 19:38:49

更多技术交流、求职机会,欢迎关注字节跳动数据平台微信公众号,回复【1】进入官方交流群

ource Connector

 

本文将主要介绍创建、管理Split的角色SplitCoordinator。

SourceSplitCoordinator

大数据处理框架的核心目的就是将大规模的数据拆分成为多个合理的Split,SplitCoordinator承担这个创建、管理Split的角色。

 

SourceSplitCoordinator接口

public interface SourceSplitCoordinator<SplitT extends SourceSplit, StateT> extends Serializable, AutoCloseable {

  void start();

  void addReader(int subtaskId);

  void addSplitsBack(List<SplitT> splits, int subtaskId);

  void handleSplitRequest(int subtaskId, @Nullable String requesterHostname);

  default void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
  }

  StateT snapshotState() throws Exception;

  default void notifyCheckpointComplete(long checkpointId) throws Exception {
  }

  void close();

  interface Context<SplitT extends SourceSplit, StateT> {

    boolean isRestored();

    /**
     * Return the state to the split coordinator, for the exactly-once.
     */
    StateT getRestoreState();

    /**
     * Return total parallelism of the source reader.
     */
    int totalParallelism();

    /**
     * When Source reader started, it will be registered itself to coordinator.
     */
    Set<Integer> registeredReaders();

    /**
     * Assign splits to reader.
     */
    void assignSplit(int subtaskId, List<SplitT> splits);

    /**
     * Mainly use in boundedness situation, represents there will no more split will send to source reader.
     */
    void signalNoMoreSplits(int subtask);

    /**
     * If split coordinator have any event want to send source reader, use this method.
     * Like send Pause event to Source Reader in CDC2.0.
     */
    void sendEventToSourceReader(int subtaskId, SourceEvent event);

    /**
     * Schedule to run the callable and handler, often used in un-boundedness mode.
     */
    <T> void runAsync(Callable<T> callable,
                      BiConsumer<T, Throwable> handler,
                      int initialDelay,
                      long interval);

    /**
     * Just run callable and handler once, often used in boundedness mode.
     */
    <T> void runAsyncOnce(Callable<T> callable,
                          BiConsumer<T, Throwable> handler);
  }
}

构造方法

开发者在构造方法中一般主要进行一些配置的设置和分片信息存储的容器的创建。

以ClickhouseSourceSplitCoordinator的构造为例:

public ClickhouseSourceSplitCoordinator(SourceSplitCoordinator.Context<ClickhouseSourceSplit, EmptyState> context,
                                  BitSailConfiguration jobConf) {
  this.context = context;
  this.jobConf = jobConf;
  this.splitAssignmentPlan = Maps.newConcurrentMap();
}

在自定义了State的场景中,需要对checkpoint时存储在SourceSplitCoordinator.Context的状态进行保存和恢复。

以RocketMQSourceSplitCoordinator为例:

public RocketMQSourceSplitCoordinator(
    SourceSplitCoordinator.Context<RocketMQSplit, RocketMQState> context,
    BitSailConfiguration jobConfiguration,
    Boundedness boundedness) {
  this.context = context;
  this.jobConfiguration = jobConfiguration;
  this.boundedness = boundedness;
  this.discoveryInternal = jobConfiguration.get(RocketMQSourceOptions.DISCOVERY_INTERNAL);
  this.pendingRocketMQSplitAssignment = Maps.newConcurrentMap();

  this.discoveredPartitions = new HashSet<>();
  if (context.isRestored()) {
    RocketMQState restoreState = context.getRestoreState();
    assignedPartitions = restoreState.getAssignedWithSplits();
    discoveredPartitions.addAll(assignedPartitions.keySet());
  } else {
    assignedPartitions = Maps.newHashMap();
  }

  prepareConsumerProperties();
}

start方法

进行一些数据源所需分片元数据的提取工作,如果有抽象出来的Split Assigner类,一般在这里进行初始化。如果使用的是封装的Split Assign函数,这里会进行待分配切片的初始化工作。

流批一体场景

以RocketMQSourceSplitCoordinator为例:

private void prepareRocketMQConsumer() {
  try {
    consumer = RocketMQUtils.prepareRocketMQConsumer(jobConfiguration,
        String.format(COORDINATOR_INSTANCE_NAME_TEMPLATE,
            cluster, topic, consumerGroup, UUID.randomUUID()));
    consumer.start();
  } catch (Exception e) {
    throw BitSailException.asBitSailException(RocketMQErrorCode.CONSUMER_CREATE_FAILED, e);
  }
}

@Override
public void start() {
  prepareRocketMQConsumer();
  splitAssigner = new FairRocketMQSplitAssigner(jobConfiguration, assignedPartitions);
  if (discoveryInternal > 0) {
    context.runAsync(
        this::fetchMessageQueues,
        this::handleMessageQueueChanged,
        0,
        discoveryInternal
    );
  } else {
    context.runAsyncOnce(
        this::fetchMessageQueues,
        this::handleMessageQueueChanged
    );
  }
}

批式场景

以ClickhouseSourceSplitCoordinator为例:

public void start() {
  List<ClickhouseSourceSplit> splitList;
  try {
    SimpleDivideSplitConstructor constructor = new SimpleDivideSplitConstructor(jobConf);
    splitList = constructor.construct();
  } catch (IOException e) {
    ClickhouseSourceSplit split = new ClickhouseSourceSplit(0);
    split.setReadTable(true);
    splitList = Collections.singletonList(split);
    LOG.error("Failed to construct splits, will directly read the table.", e);
  }

  int readerNum = context.totalParallelism();
  LOG.info("Found {} readers and {} splits.", readerNum, splitList.size());
  if (readerNum > splitList.size()) {
    LOG.error("Reader number {} is larger than split number {}.", readerNum, splitList.size());
  }

  for (ClickhouseSourceSplit split : splitList) {
    int readerIndex = ReaderSelector.getReaderIndex(readerNum);
    splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
    LOG.info("Will assign split {} to the {}-th reader", split.uniqSplitId(), readerIndex);
  }
}

Assigner

将划分好的切片分配给Reader,开发过程中,我们通常让SourceSplitCoordinator专注于处理和Reader 的通讯工作,实际split的分发逻辑一般封装在Assigner进行,这个Assigner可以是一个封装的Split Assign函数,也可以是一个抽象出来的Split Assigner类。

Assign函数示例

以ClickhouseSourceSplitCoordinator为例:

tryAssignSplitsToReader函数将存储在splitAssignmentPlan中的划分好的切片分配给相应的Reader。

private void tryAssignSplitsToReader() {
  Map<Integer, List<ClickhouseSourceSplit>> splitsToAssign = new HashMap<>();

  for (Integer readerIndex : splitAssignmentPlan.keySet()) {
    if (CollectionUtils.isNotEmpty(splitAssignmentPlan.get(readerIndex)) && context.registeredReaders().contains(readerIndex)) {
      splitsToAssign.put(readerIndex, Lists.newArrayList(splitAssignmentPlan.get(readerIndex)));
    }
  }

  for (Integer readerIndex : splitsToAssign.keySet()) {
    LOG.info("Try assigning splits reader {}, splits are: [{}]", readerIndex,
        splitsToAssign.get(readerIndex).stream().map(ClickhouseSourceSplit::uniqSplitId).collect(Collectors.toList()));
    splitAssignmentPlan.remove(readerIndex);
    context.assignSplit(readerIndex, splitsToAssign.get(readerIndex));
    context.signalNoMoreSplits(readerIndex);
    LOG.info("Finish assigning splits reader {}", readerIndex);
  }
}

Assigner方法示例

以RocketMQSourceSplitCoordinator为例:

public class FairRocketMQSplitAssigner implements SplitAssigner<MessageQueue> {

  private BitSailConfiguration readerConfiguration;

  private AtomicInteger atomicInteger;

  public Map<MessageQueue, String> rocketMQSplitIncrementMapping;

  public FairRocketMQSplitAssigner(BitSailConfiguration readerConfiguration,
                                   Map<MessageQueue, String> rocketMQSplitIncrementMapping) {
    this.readerConfiguration = readerConfiguration;
    this.rocketMQSplitIncrementMapping = rocketMQSplitIncrementMapping;
    this.atomicInteger = new AtomicInteger(CollectionUtils
        .size(rocketMQSplitIncrementMapping.keySet()));
  }

  @Override
  public String assignSplitId(MessageQueue messageQueue) {
    if (!rocketMQSplitIncrementMapping.containsKey(messageQueue)) {
      rocketMQSplitIncrementMapping.put(messageQueue, String.valueOf(atomicInteger.getAndIncrement()));
    }
    return rocketMQSplitIncrementMapping.get(messageQueue);
  }

  @Override
  public int assignToReader(String splitId, int totalParallelism) {
    return splitId.hashCode() % totalParallelism;
  }
}

addReader方法

调用Assigner,为Reader添加切片。

批式场景示例

以ClickhouseSourceSplitCoordinator为例:

public void addReader(int subtaskId) {
  LOG.info("Found reader {}", subtaskId);
  tryAssignSplitsToReader();
}

流批一体场景示例

以RocketMQSourceSplitCoordinator为例:

private void notifyReaderAssignmentResult() {
  Map<Integer, List<RocketMQSplit>> tmpRocketMQSplitAssignments = new HashMap<>();

  for (Integer pendingAssignmentReader : pendingRocketMQSplitAssignment.keySet()) {

    if (CollectionUtils.isNotEmpty(pendingRocketMQSplitAssignment.get(pendingAssignmentReader))
        && context.registeredReaders().contains(pendingAssignmentReader)) {

      tmpRocketMQSplitAssignments.put(pendingAssignmentReader, Lists.newArrayList(pendingRocketMQSplitAssignment.get(pendingAssignmentReader)));
    }
  }

  for (Integer pendingAssignmentReader : tmpRocketMQSplitAssignments.keySet()) {

    LOG.info("Assigning splits to reader {}, splits = {}.", pendingAssignmentReader,
        tmpRocketMQSplitAssignments.get(pendingAssignmentReader));

    context.assignSplit(pendingAssignmentReader,
        tmpRocketMQSplitAssignments.get(pendingAssignmentReader));
    Set<RocketMQSplit> removes = pendingRocketMQSplitAssignment.remove(pendingAssignmentReader);
    removes.forEach(removeSplit -> {
      assignedPartitions.put(removeSplit.getMessageQueue(), removeSplit.getSplitId());
    });

    LOG.info("Assigned splits to reader {}", pendingAssignmentReader);

    if (Boundedness.BOUNDEDNESS == boundedness) {
      LOG.info("Signal reader {} no more splits assigned in future.", pendingAssignmentReader);
      context.signalNoMoreSplits(pendingAssignmentReader);
    }
  }
}

@Override
public void addReader(int subtaskId) {
  LOG.info(
      "Adding reader {} to RocketMQ Split Coordinator for consumer group {}.",
      subtaskId,
      consumerGroup);
  notifyReaderAssignmentResult();
}

addSplitsBack方法

对于一些Reader没有处理完的切片,进行重新分配,重新分配的策略可以自己定义,常用的策略是哈希取模,对于返回的Split列表中的所有Split进行重新分配后再Assign给不同的Reader。

批式场景示例

以ClickhouseSourceSplitCoordinator为例:

ReaderSelector使用哈希取模的策略对Split列表进行重分配。

tryAssignSplitsToReader方法将重分配后的Split集合通过Assigner分配给Reader。

public void addSplitsBack(List<ClickhouseSourceSplit> splits, int subtaskId) {
  LOG.info("Source reader {} return splits {}.", subtaskId, splits);

  int readerNum = context.totalParallelism();
  for (ClickhouseSourceSplit split : splits) {
    int readerIndex = ReaderSelector.getReaderIndex(readerNum);
    splitAssignmentPlan.computeIfAbsent(readerIndex, k -> new HashSet<>()).add(split);
    LOG.info("Re-assign split {} to the {}-th reader.", split.uniqSplitId(), readerIndex);
  }

  tryAssignSplitsToReader();
}

流批一体场景示例

以RocketMQSourceSplitCoordinator为例:

addSplitChangeToPendingAssignment使用哈希取模的策略对Split列表进行重分配。

notifyReaderAssignmentResult将重分配后的Split集合通过Assigner分配给Reader。

private synchronized void addSplitChangeToPendingAssignment(Set<RocketMQSplit> newRocketMQSplits) {
  int numReader = context.totalParallelism();
  for (RocketMQSplit split : newRocketMQSplits) {
    int readerIndex = splitAssigner.assignToReader(split.getSplitId(), numReader);
    pendingRocketMQSplitAssignment.computeIfAbsent(readerIndex, r -> new HashSet<>())
        .add(split);
  }
  LOG.debug("RocketMQ splits {} finished assignment.", newRocketMQSplits);
}

@Override
public void addSplitsBack(List<RocketMQSplit> splits, int subtaskId) {
  LOG.info("Source reader {} return splits {}.", subtaskId, splits);
  addSplitChangeToPendingAssignment(new HashSet<>(splits));
  notifyReaderAssignmentResult();
}

snapshotState方法

存储处理切片的快照信息,用于恢复时在构造方法中使用。

public RocketMQState snapshotState() throws Exception {
  return new RocketMQState(assignedPartitions);
}

close方法

关闭在分片过程中与数据源交互读取元数据信息的所有未关闭连接器。

public void close() {
  if (consumer != null) {
    consumer.shutdown();
  }
}

About BitSail:

⭐️ Star 不迷路 https://github.com/bytedance/bitsail

提交问题和建议:https://github.com/bytedance/bitsail/issues

贡献代码:https://github.com/bytedance/bitsail/pulls

BitSail官网:https://bytedance.github.io/bitsail/zh/

订阅邮件列表:bitsail+subscribe@googlegroups.com

加入BitSail技术社群

 

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/723337.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

皂液机低功耗红外测距感应方案 免触碰红外感应模块WTU201F2 B004

近年来&#xff0c;随着卫生意识的提高&#xff0c;自动感应设备在公共场所、家庭和工作场所中变得越来越重要。在这个领域中&#xff0c;皂液机的自动感应功能成为了关键。为了提供更为智能、高效的用户体验&#xff0c;深圳唯创知音推出了全新的皂液机红外测距感应方案——WT…

自动生成的webservice客户端设置请求消息头信息

这里讲的头消息是指发送webservice请求的HTTP头信息&#xff08;MIME 头信息&#xff09;&#xff0c;而不是SOAP报文里面的Header标签内容。 package example;import mypackage.GetOperInfoRequest; import mypackage.GetOperInfoResponse; import mypackage.Webservice11; i…

怎么把MP3文件转换成OPUS,分享这两个方法给大家!

MP3和OPUS是两种常见的音频格式&#xff0c;用于存储和传输音乐、语音等内容。然而&#xff0c;随着技术的进步和需求的变化&#xff0c;有时我们需要将MP3文件转换为OPUS格式&#xff0c;以便在特定场景下获得更好的音频体验。本文将介绍两种简单而有效的方法&#xff0c;以帮…

flutter RepaintBoundary 截屏图片下载,保存图片不清晰的问题

flutter RepaintBoundary 截屏图片下载&#xff0c;保存图片不清晰的问题 前言一、什么是RepaintBoundary二、RepaintBoundary 能干什么三、RepaintBoundary 保存图片模糊的问题四、RepaintBoundary 使用小demo总结 前言 最近工作中&#xff0c;突然遇到截屏保存图片的问题&…

宝塔安装Jenkins-图文小白教程

一、Jenkins包下载 大家可以从Jenkins官网&#xff08;https://www.jenkins.io/&#xff09;根据自己的需要下载最新的版本。 但Jenkins官网下载较慢&#xff0c;容易造成下载失败。可以去国内的开源镜像网站下载Jenkins最新版本。目前博主使用的是清华大学的开源镜像网站&…

这样建立自己的『知识管理系统』,效率翻倍

在移动互联网时代&#xff0c;我们可以轻松获取大量的知识&#xff0c;但这些知识往往是碎片化的&#xff0c;没有系统性&#xff0c;缺乏深度。尽管我们努力学习了很多知识&#xff0c;但能力的提升却变得缓慢。 为了解决这个问题&#xff0c;我们需要建立一个系统化的知识体系…

【Leetcode】27.移除元素

给你一个数组 nums 和一个值 val&#xff0c;你需要 原地 移除所有数值等于 val 的元素&#xff0c;并返回移除后数组的新长度。 不要使用额外的数组空间&#xff0c;你必须仅使用 O(1) 额外空间并原地修改输入数组。 元素的顺序可以改变。你不需要考虑数组中超出新长度后面的…

Gerrit REST API简单介绍

Gerrit是一款开源免费的代码审查工具&#xff0c;如果其它平台想要获取gerrit数据&#xff0c;比如统计仓库代码提交数据等信息&#xff0c;可以使用Gerrit提供的REST API来获取&#xff0c;本文记录一些我使用到的Gerrit API。 目录 准备工作gerrit APIGerrit REST API使用实例…

一例.bat脚本打包样本的分析

样本的基本信息 hosts.exe MD5: 72ddf833fa206326e15c2c97679d323e SHA1: ad148ff4b7f77831b469be8bb19d32d029c23b50banish.exe MD5: 4a43ea617017d5de7d93eb2380634eee SHA1: b0af5aa27cd0e49955f1ab2d18d69f7bc8fd4d21分析过程 查壳 脱掉upx壳&#xff0c;用IDA打开&…

【文件 part 6 - 格式化读写文件函数 随机读写】

格式化读写文件函数 /* 函数调用: */ fprintf ( 文件指针&#xff0c;格式字符串&#xff0c;输出表列&#xff09;&#xff1b; fscanf ( 文件指针&#xff0c;格式字符串&#xff0c;输入表列&#xff09;&#xff1b;/** 函数功能:* 从磁盘文件中读入或输出字符* fprint…

用 ChatGPT 制作中英双语字幕

用 ChatGPT 制作中英双语字幕 0. 背景1. 使用剪映生成英文字幕2. 使用 ChatGPT 的制作中英双语字幕 0. 背景 最近在学习 AI 相关的知识&#xff0c;有很多视频是英文的。 为了提高学习效率&#xff0c;我考虑将这些视频加上中英双语字幕。 效果展示如下&#xff0c; 1. 使用…

如何用logrotate管理每日增长的日志

这篇文章主要介绍了如何用logrotate管理每日增长的日志问题&#xff0c;具有很好的参考价值&#xff0c;希望对大家有所帮助。如有错误或未考虑完全的地方&#xff0c;望不吝赐教&#xff01; logrotate简介 logrotate is designed to ease administration of systems that gen…

数据结构--二叉树的存储结构

数据结构–二叉树的存储结构 二叉树的顺序存储 #define MaxSize 100 struct TreeNode {ElemType value;bool isEmpty; }; TreeNode tree[MaxSize];初始化 void init() {for (int i 0; i < MaxSize; i)tree[i].isEmpty true; }几个重要常考的基本操作: i的左孩子: 2 i 2…

骨传导蓝牙耳机哪个好,推荐几款造诣不错的骨传导耳机

欢迎来到骨传导耳机的奇妙世界&#xff01;这种别具一格的耳机&#xff0c;也被亲切地称为“不入耳式”耳机。它采用了一种神奇的技术&#xff0c;通过颅骨、骨迷路、内耳淋巴液和听神经之间的信号传导&#xff0c;保护我们的听力。不仅如此&#xff0c;这款耳机的开放式设计&a…

Spring七大组件

一、Spring七大组件 1.核心容器(Spring core) Spring-core相当于一个创建并管理bean的容器&#xff1a; 创建&#xff1a;底层使用反射技术创建bean的实例&#xff1b;管理&#xff1a;容器中的每个bean&#xff0c;spring容器默认按照单例模式管理&#xff1b;设计&#xff1…

0501逻辑存储结构和架构-InnoDB引擎-MySQL-数据库

文章目录 1 逻辑结构1.1 表空间1.2 段1.3 区1.4 页1.5 行 2 架构2.1 内存架构2.1.1 Buffer Pool2.1.2 change bufer2.1.3 自适应哈希2.1.4 log buffer 2.2 磁盘架构2.2.1 System Tablespace2.2.2 File-Per-Table Tablespace2.2.3 General Tablespaces2.2.4 Undo Tablespaces2.2…

NC30 缺失的第一个正整数

import java.util.*;public class Solution {/*** 代码中的类名、方法名、参数名已经指定&#xff0c;请勿修改&#xff0c;直接返回方法规定的值即可*** param nums int整型一维数组* return int整型*/public int minNumberDisappeared (int[] nums) {// write code hereHashM…

CCF-CSP真题《202303-3 LDAP》思路+python,c++满分题解

想查看其他题的真题及题解的同学可以前往查看&#xff1a;CCF-CSP真题附题解大全 试题编号&#xff1a;202303-3试题名称&#xff1a;LDAP时间限制&#xff1a;12.0s内存限制&#xff1a;1.0GB问题描述&#xff1a; 题目背景 西西艾弗岛运营公司是一家负责维护和运营岛上基础设…

Android Studio实现内容丰富的安卓电影购票系统

如需源码可以添加q-------3290510686&#xff0c;也有演示视频演示具体功能&#xff0c;源码不免费&#xff0c;尊重创作&#xff0c;尊重劳动。 项目编号043 1.开发环境 android stuido jdk1.8 eclipse mysql tomcat 2.功能介绍 安卓端&#xff1a; 1.注册登录 2.查看电影列表…

datagrip 无法断句

使用datagrip 的时候出现了几个sql 执行会无法断开语句的情况可以在这里设置: When caret outside statement execute: