利用谷歌云Pub/Sub 实现多任务并行分发处理方案

news2025/1/13 10:31:19

背景

目前老梁团队负责的Global Data Integration Platform每天有大量文件需要从来自不同地区的上游下载文件并进行处理后再发送到不同下游。老梁的数据集成平台集群有6个服务器节点,老梁希望所有机器的资源都能利用上,提升大量文件并行处理能力,并且不同机器节点的任务必须不能重复,否则可能造成文件下载或处理失败。

原有的服务是使用Quarz集群,通过定时调度去下载,但是Quartz调度框架虽然本身支持负载均衡,但是其Cluster每个节点都不是均衡分配任务,假如某一节点具有竞争资源优势,有机会一直持有任务,导致其他节点空闲下来,服务器可能某天资源消耗过大而导致宕机,这并不是老梁想要的效果。后来也尝试使用生产者消费者模型,通过F5负载均衡+API通知+异步回调方式后,服务多节点并行处理能力有所增强,但由于使用Http方式进行通信导致服务之间存在直接依赖,当消费者服务进行重启或者停机,存在生产者API通知失败的可能,需要做额外的补偿处理。如下图所示:

生产者消费者模型:

解决思路

目前老梁公司已经完成了谷歌云和公司机房的网络搭建,并且公司的自有数据中心跟谷歌云可以直接通过谷歌的Dedicated Interconnect服务,也就是可以通过专线直接进行连接。虽然老梁的数据集成平台还部署在自有数据中心,但相对于文件下载的时间和速度损耗,谷歌云上的服务通过专线进行通信所带来的性能损耗几乎可以忽略(大约几百毫秒),老梁公司的架构战略方向是优先使用云组件,减少On-Premise部署。最后老梁选择采用谷歌云Pub/Sub服务作为事件消息服务,利用Pub/Sub高可用、使用简单并天然支持多消息并行传输的特性,来对现有的数据集成平台进行改造。

Pub/Sub介绍:
Pub/Sub 是一种设计为高度可靠且可伸缩的异步消息传递服务。该服务以十多年来许多 Google 产品都在依赖的核心 Google 基础架构组件为基础而构建。其实可以理解成云上的Kafka。官网:https://cloud.google.com/pubsub/architecture?hl=zh-cn

  • Pub/Sub 是一种可扩缩的异步消息传递服务,可将生成消息的服务与处理这些消息的服务分离开来。
  • Pub/Sub 允许服务异步通信,延迟时间大约为 100 毫秒。
  • Pub/Sub 用于流式分析和数据集成流水线,以注入和分发数据。无论是作为用于消息整合的消息传递中间件,还是作为并行处理任务的队列,它都非常有效。
  • 通过 Pub/Sub,您可以创建事件提供方和使用方的系统,称为发布者和订阅者。发布者通过广播事件而不是同步远程过程调用 (RPC) 与订阅者异步通信。
  • 发布者将事件发送到 Pub/Sub 服务,而不考虑如何或何时处理这些事件。然后,Pub/Sub 会将事件传送到对其做出响应的所有服务。在通过 RPC 进行通信的系统中,发布商必须等待订阅者接收数据。但是,Pub/Sub 中的异步集成可以提高整个系统的灵活性和稳健性。

** 基于Pub/Sub改造后的模型: **
各个消费者节点所拿到的事件都不会重复

大概实施方案

这里只使用模拟场景展示大概思路,具体细节还需要根据各自项目进行优化。

注意事项:

  1. 首先你要创建你应用要使用的TopicSubscription,这里需要注意的是SubscriptionACK截止时间建议设置大点,否者假如你消费者如果消费事件所消耗的时间>ACK截止时间,Pub/Sub将会对消息进行重发,这时候会存在重复事件消息。也就是说,你要确保你的消费节点能在ACK截止时间之前处理好事件并且响应ACKPub/SUb
  2. 建议你服务使用Pull方式从Pub/SubSubscription拉取消息,因为这样可以在你Consumer代码里自由配置你请求所需要的参数,例如setMaxMessages方法可以让你自由定义你每次拉取多少事件,更好地基于你服务器的能力去配置,并且也可以避免在做负载均衡的时候某些机器节点所拿到的任务事件太多导致服务器节点的资源没办法充分利用。

  3. 使用Pub/Sub的自定义Event(事件)必须要自定义一个唯一标识,这样可以在Consumer逻辑加上幂等控制,否则当刚好消费者没有及时处理事件而Pub/Sub因为消费者ACK超时进行补偿重发,这可能会因为重复处理事件给业务带来严重后果。GCP Pub/Sub采用的是至少一次投递的策略,也就是可能对同一消息投递多次,虽然实际应用中不常见,以下官方文档说明了会重复投递的情况,通常就是上面所说的ACK超时导致的

完成流程

这里只截取小部分文件下载的流程作为示范,其他类似需要并行处理的任务都可以参考。

  1. File Watch Dog 从上游远程服务器基于File Pattern去监测有没有新文件
  2. File Watch Dog 把监测到的新文件信息组装成事件分别推送到GCP Pub/Sub Topic
  3. File Process Engine所有节点并行从GCP Pub/Sub Subscription拉取任务,分别拉到不同的事件消息
  4. File Process Engine所有节点分别基于事件消息里的DatafeedId去配置中心查找该Datafeed的连接信息
  5. File Process Engine所有节点分别去上游远程服务器下载自己接收到的事件对应的文件

简单测试

这里使用官方提供的示例代码,简单测试下发布多个消息,看看消费者代码是否会重复消费相同事件。
参考示例:https://cloud.google.com/pubsub/docs/pull#java

import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

public class SubscribeAsyncExample {
  public static void main(String... args) throws Exception {
    // TODO(developer): Replace these variables before running the sample.
    String projectId = "your-project-id";
    String subscriptionId = "your-subscription-id";

    subscribeAsyncExample(projectId, subscriptionId);
  }

  public static void subscribeAsyncExample(String projectId, String subscriptionId) {
    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(projectId, subscriptionId);

    // Instantiate an asynchronous message receiver.
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> {
          // Handle incoming message, then ack the received message.
          System.out.println("Received MessageId: " + message.getMessageId()+"Data: " + message.getData().toStringUtf8());
          consumer.ack();
          System.out.println("Message has been acknowledge")
        };

    Subscriber subscriber = null;
    try {
      subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();
      // Start the subscriber.
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName.toString());
      // Allow the subscriber to run for 30s unless an unrecoverable error occurs.
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
    } catch (TimeoutException timeoutException) {
      // Shut down the subscriber after 30s. Stop receiving messages.
      subscriber.stopAsync();
    }
  }
}

这里我在Topic发布了5条带有序号的消息,分别是:test:1test:2test:3test:4test:5,然后开了三个进程去监听Subscription,看看会不会每个进程会不会出现重复的消息

进程1

进程2

进程3

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/498410.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

C# Microsoft.ClearScript.V8脚本使用

1、ClearScript支持的功能和适用场景 微软的.net是非常强大和灵活的,除了C#体系脚本扩展,也支持其他流行的脚本扩展,Microsoft.ClearScript.V8就是一个.NET绑定到Google V8的脚本引擎。它允许.NET应用程序直接从JavaScript代码中调用函数&am…

Redis布隆过滤器的原理和应用场景,解决缓存穿透

目录 专栏导读一、布隆过滤器BloomFilter是什么二、布隆过滤器BloomFilter能干嘛?三、布隆过滤器使用场景1、解决缓存穿透问题2、黑名单3、网页爬虫对URL的去重,避免爬取相同的URL地址四、操作布隆过滤器BloomFilter1、使用布隆过滤器2、删除key3、判断是否存在五、代码实例1…

黑客开始使用双 DLL 侧载来逃避检测

一个名为“Dragon Breath”、“Golden Eye Dog”或“APT-Q-27”的 APT 黑客组织正在展示一种新趋势,即使用经典 DLL 旁加载技术的多种复杂变体来逃避检测。 这些攻击变体从一个初始向量开始,该向量利用一个干净的应用程序,最常见的是 Telegr…

vue_组件基础

单文件组件 Vue 单文件组件&#xff08;又名 *.vue 文件&#xff0c;缩写为 SFC&#xff09;是一种特殊的文件格式&#xff0c;它允许将 Vue 组件的模板、逻辑 与 样式封装在单个文件中 <template><h3>单文件组件</h3> </template><script> ex…

asp.net+c#操作系统课程在线教学平台

1&#xff0e;系统登录&#xff1a;系统登录是用户访问系统的路口&#xff0c;设计了系统登录界面&#xff0c;包括用户名、密码和验证码&#xff0c;然后对登录进来的用户判断身份信息&#xff0c;判断是管理员用户还是普通用户。 2&#xff0e;系统用户管理&#xff1a;不管是…

答疑解惑:开发者必须彻底搞懂的 SSL/TLS 协议

简介 本期答疑解惑将和大家一起认识SSL/TLS 协议。请尝试回答以下几个问题&#xff1a; 使用浏览器访问https网站和http网站有什么不同&#xff1f;SSL协议作用于网络模型的哪一层&#xff1f;你知道CSDN&#xff0c;博客园正在使用的是什么类型的SSL证书吗&#xff1f;SSL&a…

汇编实现LED循环点亮(延时子程序模板)

在单片机P2口外接8个发光二极管(低电平驱动)。试编写一个汇编程序&#xff0c;实现LED循环点亮功能:P2.0-P2.1-P2.2-P2.3-…-P2.7-P2.6-P25-…-P2.0的顺序&#xff0c;无限循环。要求采用软件延时方式控制闪烁时间间隔(约50ms)。 首先进行电路设计 电路原理图设计 利用 Prot…

php+vue影视电影视频点播推荐avxhe系统

影视推荐系统的主要使用者分为管理员和用户&#xff0c;实现功能包括管理员&#xff1a;首页、个人中心、用户管理、公告信息管理、电影分类管理、影视推荐管理、付费点播管理、点播信息管理、管理员管理、系统管理&#xff0c;用户&#xff1a;首页、个人中心、付费点播管理、…

数字化转型导师坚鹏:企业数字化领导力提升之道

企业数字化领导力提升之道 ——融合中西智慧&#xff0c;践行知行合一思想&#xff0c;实现知行果合一 课程背景&#xff1a; 很多企业存在以下问题&#xff1a; 不知道如何领导面临的数字化时代&#xff1f; 不清楚企业数字化领导力模型的内涵&#xff1f; 不知道如何…

开关电源基础02:基本开关电源拓扑(1)-BUCK拓扑

说在开头&#xff1a;关于海森堡的矩阵&#xff08;1&#xff09; 我们前面说了&#xff0c;海森堡和泡利到了哥本哈根跟着玻尔混&#xff0c;在哥本哈根海森堡感到了一种竞争的气氛&#xff1a;他在德国少年得志&#xff0c;是出了名的天才&#xff0c;现在突然发现身边的每一…

Python每日一练:圆桌争风吃醋的豚鼠韩信点兵(全一行代码解法)

文章目录 前言一、圆桌二、争风吃醋的豚鼠三、韩信点兵总结 前言 很显然&#xff0c;Python的受众远远大于C&#xff0c;其实笔者本人对Python的理解也是远强于C的&#xff0c;C纯粹是为了假装笔者是个职业选手才随便玩玩的&#xff0c;借着十多年前学的C的功底&#xff0c;强…

01、爬虫js逆向之-七麦数据

目标网址&#xff1a;aHR0cHM6Ly93d3cucWltYWkuY24vcmFuay9pbmRleC9icmFuZC9hbGwvZGV2aWNlL2lwaG9uZS9jb3VudHJ5L2NuL2dlbnJlLzM2 &#xff08;需要进行ba64解码即可获取到参数&#xff09; 需要逆向的加密参数&#xff1a;analysis 1、点击数据接口&#xff0c;触发请求 2、点…

2022年NOC大赛编程马拉松赛道复赛图形化低年级A卷-正式卷,包含答案

目录 选择题: 多选题: 编程题: 下载文档打印做题: 2022年NOC大赛编程马拉松赛道复赛图形化低年级A卷-正式卷 2022NOC-图形化复赛低年级A卷正式卷

天地气运流转,皆在五行生克中

在中国的传统文化里&#xff0c;常讲“气运”二字&#xff0c;把两字分开&#xff0c;便是气数与命运。 在现代人的观念里&#xff0c;气运是个复杂又抽象的概念。 天地五行之气轮流转&#xff0c;一切都在五行生克中。 而古人的方法&#xff0c;是通过五行的变化来描述气运的流…

Promise类方法

这篇主要讲一下Promise的类方法的基本使用&#xff0c;至于Promise的基本使用这里就不赘述了&#xff0c;之前也有手写过Promise、实现了Promise的核心逻辑。其实我们平时用Promise也挺多的&#xff0c;不过又出现了两个新的语法&#xff08;ES11&#xff0c;ES12新增了两个&am…

Gradle使用

下载Gradle Gradle Distributions 配置环境变量 测试是否成功 cmd输入gradle -v 在.gradle目录下创建一个init.gradle allprojects { repositories { maven { url file:///D:/maven/myRepository} ## 这里是本地maven仓库地址,没有就会依次向下设置的地址寻…

wisp5学习日记1

这里写目录标题 编译工程问题一 LSD-FET430UIF仿真器排针方向与所给排针方向示意图不一致&#xff0c;不知怎么方向问题2 拟器或仿真器无法找到连接到计算机的USB FET 编译工程 鼠标右键选择build project 问题一 LSD-FET430UIF仿真器排针方向与所给排针方向示意图不一致&…

【Java基础 2】Java 基础语法

&#x1f34a; Java学习&#xff1a;社区快速通道 文章目录 1 变量与基本数据类型1.1 变量1.2 数据类型1.3 标识符1.4 类型转换1.5 关键字大全 2 二进制概述3 方法4 运算符4.1 算术运算符4.2 赋值运算符4.3 关系运算符4.4 逻辑运算符4.5 字符串连接运算符4.6 三目运算符 5 命名…

配置JDK环境变量

文章目录 查看电脑系统下载及安装JavaSE配置系统环境变量测试环境变量配置是否成功。 查看电脑系统 运行输入框中输入&#xff1a;control 下载及安装JavaSE 这个从网上下载就行&#xff0c;jdk-8u141-windows-x64.exe&#xff0c;不提供下载方式了。 主要讲解安装过程&a…

AI 工具合辑盘点(十二)持续更新 之 面向学生群体的 AI 工具和面向所有人的 AI 工具

面向学生群体的 AI 工具 人工智能在教育领域可以发挥多种作用。例如&#xff0c;它可以用于个性化课程、检测抄袭、转录讲座和促进教师与学生之间的快速沟通等等。 教育面临着许多挑战&#xff0c;这些人工智能工具可以帮助教师和学生。这些 AI 可以替代手动工作、降低人为错…