ceph-rgw zipper的设计理念(2)

news2025/1/14 1:00:03

本文简介

书接上文。本文以CreateBucket为例进行详细讲述设计理念以及接口变化趋势。

1、接收请求和协议处理请求

 rgw_asio_frontend.cc

主要功能:回调函数注册和请求处理

void handle_connection(boost::asio::io_context& context,
                       RGWProcessEnv& env, Stream& stream,
                       timeout_timer& timeout, size_t header_limit,
                       parse_buffer& buffer, bool is_ssl,
                       SharedMutex& pause_mutex,
                       rgw::dmclock::Scheduler *scheduler,
                       const std::string& uri_prefix,
                       boost::system::error_code& ec,
                       spawn::yield_context yield)
{
  // don't impose a limit on the body, since we read it in pieces
  static constexpr size_t body_limit = std::numeric_limits<size_t>::max();

  auto cct = env.driver->ctx();

  // read messages from the stream until eof
  for (;;) {
    // configure the parser
    rgw::asio::parser_type parser;
    parser.header_limit(header_limit);
    parser.body_limit(body_limit);
    timeout.start();
    // parse the header
    http::async_read_header(stream, buffer, parser, yield[ec]);
    timeout.cancel();
    if (ec == boost::asio::error::connection_reset ||
        ec == boost::asio::error::bad_descriptor ||
        ec == boost::asio::error::operation_aborted ||
#ifdef WITH_RADOSGW_BEAST_OPENSSL
        ec == ssl::error::stream_truncated ||
#endif
        ec == http::error::end_of_stream) {
      ldout(cct, 20) << "failed to read header: " << ec.message() << dendl;
      return;
    }
    auto& message = parser.get();

    bool expect_continue = (message[http::field::expect] == "100-continue");

    {
      // process the request
      RGWRequest req{env.driver->get_new_req_id()};

      StreamIO real_client{cct, stream, timeout, parser, yield, buffer,
                           is_ssl, local_endpoint, remote_endpoint};

      auto real_client_io = rgw::io::add_reordering(
                              rgw::io::add_buffering(cct,
                                rgw::io::add_chunking(
                                  rgw::io::add_conlen_controlling(
                                    &real_client))));
      RGWRestfulIO client(cct, &real_client_io);
      optional_yield y = null_yield;

      process_request(env, &req, uri_prefix, &client, y,
                      scheduler, &user, &latency, &http_ret);
    }

    if (!parser.keep_alive()) {
      return;
    }

    // if we failed before reading the entire message, discard any remaining
    // bytes before reading the next
    while (!expect_continue && !parser.is_done()) {
      static std::array<char, 1024> discard_buffer;

      auto& body = parser.get().body();
      body.size = discard_buffer.size();
      body.data = discard_buffer.data();

      timeout.start();
      http::async_read_some(stream, buffer, parser, yield[ec]);
      timeout.cancel();
      if (ec == http::error::need_buffer) {
        continue;
      }
      if (ec == boost::asio::error::connection_reset) {
        return;
      }
      if (ec) {
        ldout(cct, 5) << "failed to discard unread message: "
            << ec.message() << dendl;
        return;
      }
    }
  }
}
 

rgw_process.cc

主要功能:请求处理,包括身份认证、请求处理,函数调用返回等。

int process_request(const RGWProcessEnv& penv,
                    RGWRequest* const req,
                    const std::string& frontend_prefix,
                    RGWRestfulIO* const client_io,
                    optional_yield yield,
            rgw::dmclock::Scheduler *scheduler,
                    string* user,
                    ceph::coarse_real_clock::duration* latency,
                    int* http_ret)
{
  int ret = client_io->init(g_ceph_context);
  dout(1) << "====== starting new request req=" << hex << req << dec
      << " =====" << dendl;
  perfcounter->inc(l_rgw_req);

  RGWEnv& rgw_env = client_io->get_env();

  req_state rstate(g_ceph_context, penv, &rgw_env, req->id);
  req_state *s = &rstate;
  rgw::sal::Driver* driver = penv.driver;


  RGWHandler_REST *handler = rest->get_handler(driver, s,
                                               *penv.auth_registry,
                                               frontend_prefix,
                                               client_io, &mgr, &init_error);

  ldpp_dout(s, 2) << "getting op " << s->op << dendl;
  op = handler->get_op();

  std::tie(ret,c) = schedule_request(scheduler, s, op);

  req->op = op;
  ldpp_dout(op, 10) << "op=" << typeid(*op).name() << dendl;
  s->op_type = op->get_type();

  try {
    ldpp_dout(op, 2) << "verifying requester" << dendl;
    ret = op->verify_requester(*penv.auth_registry, yield);
    ldpp_dout(op, 2) << "normalizing buckets and tenants" << dendl;
    ret = handler->postauth_init(yield);

    ret = rgw_process_authenticated(handler, op, req, s, yield, driver);

  } catch (const ceph::crypto::DigestException& e) {
    dout(0) << "authentication failed" << e.what() << dendl;
    abort_early(s, op, -ERR_INVALID_SECRET_KEY, handler, yield);
  }

done:
  try {
    client_io->complete_request();
  } catch (rgw::io::Exception& e) {
    dout(0) << "ERROR: client_io->complete_request() returned "
            << e.what() << dendl;
  }

  if (handler)
    handler->put_op(op);
  rest->put_handler(handler);

  const auto lat = s->time_elapsed();
  if (latency) {
    *latency = lat;
  }
  dout(1) << "====== req done req=" << hex << req << dec
      << " op status=" << op_ret
      << " http_status=" << s->err.http_ret
      << " latency=" << lat
      << " ======"
      << dendl;

  return (ret < 0 ? ret : s->err.ret);
} /* process_request */

在rgw_process_authenticated函数中进行OP的详细处理。包括身份认证、pre-exec、exec、complete等函数。

int rgw_process_authenticated(RGWHandler_REST * const handler,
                              RGWOp *& op,
                              RGWRequest * const req,
                              req_state * const s,
                                    optional_yield y,
                              rgw::sal::Driver* driver,
                              const bool skip_retarget)
{
  ldpp_dout(op, 2) << "init permissions" << dendl;
  int ret = handler->init_permissions(op, y);
  ldpp_dout(op, 2) << "init op" << dendl;
  ret = op->init_processing(y);

  ldpp_dout(op, 2) << "verifying op mask" << dendl;
  ret = op->verify_op_mask();

  ldpp_dout(op, 2) << "verifying op permissions" << dendl;
  {
    auto span = tracing::rgw::tracer.add_span("verify_permission", s->trace);
    std::swap(span, s->trace);
    ret = op->verify_permission(y);
    std::swap(span, s->trace);
  }

  ldpp_dout(op, 2) << "verifying op params" << dendl;
  ret = op->verify_params();
  ldpp_dout(op, 2) << "executing" << dendl;
  {
    auto span = tracing::rgw::tracer.add_span("execute", s->trace);
    std::swap(span, s->trace);
    op->execute(y);
    std::swap(span, s->trace);
  }

  ldpp_dout(op, 2) << "completing" << dendl;
  op->complete();

  return 0;
}

rgw_op.cc

此处忽略rest或者swift中的协议处理过程,直接到RGWOP::createBucket()中

void RGWCreateBucket::execute(optional_yield y)
{
  const rgw::SiteConfig& site = *s->penv.site;
  const std::optional<RGWPeriod>& period = site.get_period();
  const RGWZoneGroup& my_zonegroup = site.get_zonegroup();

  /*步骤1:处理zonegroup信息,确定桶的placement、storage_class等信息,以及是否是主站点存储*/
  /*步骤2:读取桶的信息,如果存在则进行一些处理*/
  // read the bucket info if it exists
  op_ret = driver->load_bucket(this, rgw_bucket(s->bucket_tenant, s->bucket_name),
                               &s->bucket, y);

  /*步骤3:如果桶不存在,则初始化各种信息,*/
  s->bucket_owner.id = s->user->get_id();
  s->bucket_owner.display_name = s->user->get_display_name();
  createparams.owner = s->user->get_id();

  buffer::list aclbl;
  policy.encode(aclbl);
  createparams.attrs[RGW_ATTR_ACL] = std::move(aclbl);

  if (has_cors) {
    buffer::list corsbl;
    cors_config.encode(corsbl);
    createparams.attrs[RGW_ATTR_CORS] = std::move(corsbl);
  }

  /*步骤4:创建桶*/
  ldpp_dout(this, 10) << "user=" << s->user << " bucket=" << s->bucket << dendl;
  op_ret = s->bucket->create(this, createparams, y);
  /*步骤5:如果失败,则回退处理*/
  .....
}

2、store层处理和rados中的处理

int RadosBucket::create(const DoutPrefixProvider* dpp,
                        const CreateParams& params,
                        optional_yield y)
{
  rgw_bucket key = get_key();
  key.marker = params.marker;
  key.bucket_id = params.bucket_id;

  /*创建桶,此处调用rados.cc中的处理流程*/
  int ret = store->getRados()->create_bucket(
      dpp, y, key, params.owner, params.zonegroup_id,
      params.placement_rule, params.zone_placement, params.attrs,
      params.obj_lock_enabled, params.swift_ver_location,
      params.quota, params.creation_time, &bucket_version, info);

  /*link处理*/
  ret = link(dpp, params.owner, y, false);
  if (ret && !existed && ret != -EEXIST) {
    /* if it exists (or previously existed), don't remove it! */
    ret = unlink(dpp, params.owner, y);
    if (ret < 0) {
      ldpp_dout(dpp, 0) << "WARNING: failed to unlink bucket: ret=" << ret
               << dendl;
    }
  } else if (ret == -EEXIST || (ret == 0 && existed)) {
    ret = -ERR_BUCKET_EXISTS;
  }

  return ret;
}
int RGWRados::create_bucket(const DoutPrefixProvider* dpp,
                            optional_yield y,
                            const rgw_bucket& bucket,
                            const rgw_user& owner,
                            const std::string& zonegroup_id,
                            const rgw_placement_rule& placement_rule,
                            const RGWZonePlacementInfo* zone_placement,
                            const std::map<std::string, bufferlist>& attrs,
                            bool obj_lock_enabled,
                            const std::optional<std::string>& swift_ver_location,
                            const std::optional<RGWQuotaInfo>& quota,
                            std::optional<ceph::real_time> creation_time,
                            obj_version* pep_objv,
                            RGWBucketInfo& info)
{
  int ret = 0;

#define MAX_CREATE_RETRIES 20 /* need to bound retries */
  for (int i = 0; i < MAX_CREATE_RETRIES; i++) {
    /*步骤1:初始化bucket的ver_id和quota、time等初始化信息*/
    /*步骤2:bucket_index 初始化*/
    if (zone_placement) {
      ret = svc.bi->init_index(dpp, info, info.layout.current_index);
      if (ret < 0) {
        return ret;
      }
    }

    /*步骤3:linkbucket_info信息*/
    constexpr bool exclusive = true;
    ret = put_linked_bucket_info(info, exclusive, ceph::real_time(), pep_objv, &attrs, true, dpp, y);
    if (ret == -ECANCELED) {
      ret = -EEXIST;
    }
:
    return ret;
  }

  /* this is highly unlikely */
  ldpp_dout(dpp, 0) << "ERROR: could not create bucket, continuously raced with bucket creation and removal" << dendl;
  return -ENOENT;
}
put_linked_bucket_info函数

int RGWRados::put_linked_bucket_info(RGWBucketInfo& info, bool exclusive, real_time mtime, obj_version *pep_objv,
                                     const map<string, bufferlist> *pattrs, bool create_entry_point,
                                     const DoutPrefixProvider *dpp, optional_yield y)
{
  bool create_head = !info.has_instance_obj || create_entry_point;

  /*步骤1:写bucket_instance*/
  int ret = put_bucket_instance_info(info, exclusive, mtime, pattrs, dpp, y);

  RGWBucketEntryPoint entry_point;
  entry_point.bucket = info.bucket;
  entry_point.owner = info.owner;
  entry_point.creation_time = info.creation_time;
  entry_point.linked = true;
  /*存储bucket_entrypoint实体信息*/
  ret = ctl.bucket->store_bucket_entrypoint_info(info.bucket, entry_point, y, dpp, RGWBucketCtl::Bucket::PutParams()
                                                  .set_exclusive(exclusive)
                                      .set_objv_tracker(&ot)
                                      .set_mtime(mtime));
  if (ret < 0)
    return ret;

  return 0;
}

3、SVC中的处理:bucket index的创建

int RGWSI_BucketIndex_RADOS::init_index(const DoutPrefixProvider *dpp,RGWBucketInfo& bucket_info, const rgw::bucket_index_layout_generation& idx_layout)
{
  librados::IoCtx index_pool;

  string dir_oid = dir_oid_prefix;
  int r = open_bucket_index_pool(dpp, bucket_info, &index_pool);
  if (r < 0) {
    return r;
  }

  dir_oid.append(bucket_info.bucket.bucket_id);

  map<int, string> bucket_objs;
  get_bucket_index_objects(dir_oid, idx_layout.layout.normal.num_shards, idx_layout.gen, &bucket_objs);

  return CLSRGWIssueBucketIndexInit(index_pool,
                    bucket_objs,
                    cct->_conf->rgw_bucket_index_max_aio)();
}

4、总结

至此,一个bucket创建完毕,其他的op类似于此,整体结构变化不大。下图是rgw_rados.h、rgw_sal_rados.h、rgw_service.h和svc_module***.h的相关关系,比较粗糙,仅供参考。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2084334.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

如何使用IDEA搭建Mybatis框架环境(详细教程)

文章目录 ☕前言为什么学习框架技术Mybatis框架简介 &#x1f379;一、如何配置Mybatis框架环境1.1下载需要MyBatis的jar文件1.2部署jar文件1.3创建MyBatis核心配置文件configuration.xml1.4.创建持久类(POJO)和SQL映射文件1.5.创建测试类 &#x1f9cb;二、 MyBatis框架的优缺…

GAN Inversion(GAN 反演)

什么是Inversion&#xff1f; 来龙去脉&#xff1a; 在生成过程中&#xff0c;我们通过将z输入G&#xff0c;然后得到图像&#xff0c;但是你这个Z是不定的&#xff08;随机的高斯分布噪声&#xff09;&#xff0c;所以即使你得到了质量好的生成图像&#xff0c;但是依然无法…

页面间对象传递的几种方法

页面间对象传递的几种方法 1. 使用request对象传递2. 使用session对象传递3. 使用application对象传递4. 使用cookie传递 &#x1f496;The Begin&#x1f496;点点关注&#xff0c;收藏不迷路&#x1f496; 在Web开发中&#xff0c;页面间的数据传递是一个常见的需求。本文将…

Java | Leetcode Java题解之第381题O(1)时间插入、删除和获取随机元素-允许重复

题目&#xff1a; 题解&#xff1a; class RandomizedCollection {Map<Integer, Set<Integer>> idx;List<Integer> nums;/** Initialize your data structure here. */public RandomizedCollection() {idx new HashMap<Integer, Set<Integer>>…

搜索引擎通过分析网页标题中的关键词来判断内容的相关性

在网站链接上的标题&#xff0c;‌写关键词对SEO更适合&#xff0c;‌这一观点是基于多个方面的考虑。‌以下是对这一观点的详细讲解&#xff1a;‌ 关键词优化与SEO相关性 首先&#xff0c;‌搜索引擎的工作原理是通过抓取和分析网页内容来确定其在搜索结果中的排名。‌在这个…

显示中文字体问题解决:ImportError: The _imagingft C module is not installed

使用opencv写入中文时&#xff0c;用以下代码会导致乱码&#xff1a; cv2.putText(im0, f"{label}:{score}", (xmin, ymin), cv2.FONT_HERSHEY_SIMPLEX, 2, (0,255,0), 3)因此需要借助PIL库写入中文字符&#xff0c;用法如下&#xff1a; import cv2 from PIL impo…

Java 输入与输出之 NIO【非阻塞式IO】【NIO网络编程】探索之【二】

上一篇博客我们介绍了NIO的核心原理、FileChannel和Buffer, 对Buffer的用法有了清晰的了解。上篇博客&#xff1a; Java 输入与输出之 NIO【非阻塞式IO】【NIO核心原理】探索之【一】 本篇博客我们将继续来探索NIO&#xff0c;介绍如何使用SocketChannel和ServerSocketChannel来…

完全自由的栏目设计

亮点功能&#xff1a; 可以将任一栏目拖动到其它栏目下 被拖动的栏目其包含的子栏目和文章将一起拖过去。 快来试试吧&#xff01;

原来这么多行业都可以转行大模型,大模型从入门到精通,非常详细收藏我这一篇就够了

转行到大模型&#xff08;Large Model&#xff09;领域已经成为当前科技发展的一大趋势。所谓“大模型”&#xff0c;通常指的是那些包含数亿甚至数十亿参数的深度学习模型&#xff0c;例如自然语言处理中的GPT系列、BERT等模型&#xff0c;以及计算机视觉领域的EfficientNet、…

[Pyplot]设置图中字体为TimesNewRoman

一、简介 本文介绍了如何在linux环境下在python中使用matplotlib.pyplot 绘制图表时&#xff0c;令其中的文字字体为Times New Roman。 二、设置步骤 1. Linux下安装Times New Roman字体 $ sudo apt install ttf-mscorefonts-installer # 安装字体 $ sudo fc-cache # 使新安…

Python与Biome-BGC:生态模型分析的未来趋势

近年来&#xff0c;Python编程语言受到越来越多科研人员的喜爱&#xff0c;在多个编程语言排行榜中持续夺冠。同时&#xff0c;伴随着深度学习的快速发展&#xff0c;人工智能技术在各个领域中的应用越来越广泛。机器学习是人工智能的基础&#xff0c;因此&#xff0c;掌握常用…

扩博智能× Milvus:图像检索助力零售商品图像高效标注

大家好&#xff0c;我是上海扩博智能技术有限公司的Frank&#xff0c;负责算法工程相关的工作。很高兴能在 Milvus 社区和大家分享我们在图像检索方面的经验。 01 扩博智能公司简介 扩博智能 Clobotics 成立于 2016 年&#xff0c;总部位于上海长宁。我们聚焦计算机视觉和机器学…

SOMEIP_ETS_071: Union_Length_too_long

测试目的&#xff1a; 验证当设备&#xff08;DUT&#xff09;接收到一个联合&#xff08;union&#xff09;长度超出实际联合长度的SOME/IP消息时&#xff0c;是否能够返回错误消息。 描述 本测试用例旨在检查DUT在处理一个echoUNION方法的SOME/IP消息时&#xff0c;如果消…

基于DashScope+Streamlit构建你的机器学习助手(入门级)

前言 在LLM&#xff08;大语言模型&#xff09;盛行的今天&#xff0c;博主越来越感觉到AI&#xff08;人工智能&#xff09;的潜力被“无限”激发了。它为什么会突然间完成“鱼跃龙门”呢&#xff1f; 博主认为基础设施&#xff08;也可以称为算力&#xff09;的完善和“天才…

Java-异常处理try catch finally throw和throws

在 Java 中,异常处理机制是通过 try, catch, finally, throw和 throws 这几个关键字来实现的。以下 是这些关键字的基本用途和它们之间的区别: public class ExceptionHandlingExample {public static void main(String[] args) {try {processSomething();} catch (Exceptio…

【视频讲解】SMOTEBoost、RBBoost和RUSBoost不平衡数据集的集成分类酵母数据集、治癌候选药物|数据分享...

全文链接&#xff1a;https://tecdat.cn/?p37502 分析师&#xff1a;Zilin Wu 在当今的大数据时代&#xff0c;科研和实际应用中常常面临着海量数据的处理挑战。在本项目中&#xff0c;我们拥有上万条数据&#xff0c;这既是宝贵的资源&#xff0c;也带来了诸多难题。一方面&a…

RFFT:数据与代码已开源,京东推出广告图生成新方法 | ECCV 2024

论文将多模态可靠反馈网络&#xff08;RFNet&#xff09;结合到一个循环生成图片过程中&#xff0c;可以增加可用的广告图片数量。为了进一步提高生产效率&#xff0c;利用RFNet反馈进行创新的一致条件正则化&#xff0c;对扩散模型进行微调&#xff08;RFFT&#xff09;&#…

行业机遇!程序员:如何选择适合自己的就业方向?

随着科技的不断进步和发展&#xff0c;程序员的就业前景也越来越广阔。而在这个快速发展的行业中&#xff0c; 在各个领域都有着广泛的应用&#xff0c;信息技术的迅猛发展使得程序员在现代社会中占据了举足轻重的地位。从软件开发到网络安全&#xff0c;再到人工智能&#xf…

超越Text2Video-Zero|无需额外训练,条件生成、专门生成和指令引导的视频编辑全搞定!

论文链接&#xff1a;https://arxiv.org/pdf/2407.21475 github链接&#xff1a; https://densechen.github.io/zss/ 亮点直击 本文提出了一种新颖的zero-shot视频采样算法&#xff0c;该算法能够直接从预训练的图像扩散模型中采样高质量的视频片段。 本文提出了一个依赖噪声模…

青岛实训day33(8/21)

1、配置一主二从mysql 1. mycat对mysql8不完全支持 2. mysql8主从问题不大get_pub_key1 3. gtids事务复制 4. 删除/etc/my.cnf 5. 同步data文件需要先停用mysql服务,删除data目录中的auto.cnf 6. gtid模式以及经典模式都需要锁表 flush tables with read lock;unlock tables;…