版本
ceph版本为17.
ceph如何进行读写接口的实现
Ceph的客户端通过librados的接口进行集群的访问,这里的访问包括:
1)对集群的整体访问
2)对象的访问
两类接口,这套接口(API)包括C、C++和Python的实现,接口通过网络实现对Ceph集群的访问。在客户端层面,可以在自己的程序中调用该接口,从而集成Ceph集群的存储功能,或者在监控程序中实现对Ceph集群状态的监控。
初始化一个ObjectWriteOperation对象,并进行初始化,设置参数,然后进行写操作。
例子:在RGW中,客户端会初始化一个OBjectWriteOperation的对象,然后调用librados中的接口进行操作。
ObjectWriteOperation op;
op.create(false);
op.setxattr(RGW_ATTR_ID_TAG, bl);
op.mtime2(&mtime_ts);
op.write_full(*meta.data);
op.rmxattr(name.c_str());
rgw_rados_operate(dpp, ref.pool.ioctx(), ref.obj.oid, &op, null_yield);
ioctx.operate(oid, op, flags);
下面针对每个函数调用进行详细的描述
-
Create:初始化op对象
void librados::ObjectWriteOperation::create(bool exclusive),实际调用的是ObjectOperation::create//librados_cxx.cc void librados::ObjectWriteOperation::create(bool exclusive){ ceph_assert(impl); ::ObjectOperation *o = &impl->o; o->create(exclusive); } //osdc/Object.h void create(bool excl) { OSDOp& o = add_op(CEPH_OSD_OP_CREATE); o.op.flags = (excl ? CEPH_OSD_OP_FLAG_EXCL : 0); } //ops是ObjectOperation中的一个属性,类型为small_vector<OSDOp, osdc_opvec_len>; OSDOp& add_op(int op) { ops.emplace_back(); ops.back().op.op = op; out_bl.push_back(nullptr); ceph_assert(ops.size() == out_bl.size()); out_handler.emplace_back(); ceph_assert(ops.size() == out_handler.size()); out_rval.push_back(nullptr); ceph_assert(ops.size() == out_rval.size()); out_ec.push_back(nullptr); ceph_assert(ops.size() == out_ec.size()); return ops.back(); }
-
setxattr和rmxattr
二者类似,设置op对象的一些必要操作。//librados_cxx.cc void librados::ObjectWriteOperation::setxattr(const char *name, const bufferlist& v) { ceph_assert(impl); ::ObjectOperation *o = &impl->o; o->setxattr(name, v); } //osdc/Object.h void setxattr(const char *name, const ceph::buffer::list& bl) { add_xattr(CEPH_OSD_OP_SETXATTR, name, bl); } //ops是ObjectOperation中的一个属性,类型为small_vector<OSDOp, osdc_opvec_len>; void add_xattr(int op, const char *name, const ceph::buffer::list& data) { OSDOp& osd_op = add_op(op); osd_op.op.xattr.name_len = (name ? strlen(name) : 0); osd_op.op.xattr.value_len = data.length(); if (name) osd_op.indata.append(name, osd_op.op.xattr.name_len); osd_op.indata.append(data); }
-
write_full
关键函数,进行对象的写操作。//librados_cxx.cc void librados::ObjectWriteOperation::write_full(const bufferlist& bl){ ceph_assert(impl); ::ObjectOperation *o = &impl->o; bufferlist c = bl; o->write_full(c); } //osdc/Object.h void write_full(ceph::buffer::list& bl) { add_data(CEPH_OSD_OP_WRITEFULL, 0, bl.length(), bl); } void add_data(int op, uint64_t off, uint64_t len, ceph::buffer::list& bl) { OSDOp& osd_op = add_op(op); osd_op.op.extent.offset = off; osd_op.op.extent.length = len; osd_op.indata.claim_append(bl); }
-
rgw_rados_operate
当op初步处理完成后,即可进行operate操作。交由存储池的ioctx进行处理int rgw_rados_operate(const DoutPrefixProvider *dpp, librados::IoCtx& ioctx, const std::string& oid, librados::ObjectWriteOperation *op, optional_yield y, int flags){ if (y) { auto& context = y.get_io_context(); auto& yield = y.get_yield_context(); boost::system::error_code ec; librados::async_operate(context, ioctx, oid, op, flags, yield[ec]); return -ec.value(); } if (is_asio_thread) { ldpp_dout(dpp, 20) << "WARNING: blocking librados call" << dendl; } return ioctx.operate(oid, op, flags); } int librados::IoCtx::operate(const std::string& oid, librados::ObjectWriteOperation *o, int flags) { object_t obj(oid); if (unlikely(!o->impl)) return -EINVAL; return io_ctx_impl->operate(obj, &o->impl->o, (ceph::real_time *)o->impl->prt, translate_flags(flags)); } //librados/IoctxImpl.cc //其中核心部分是objecter_op和objecter的op_submit函数.objecter是osdc的Object类的对象,后面的操作就涉及到osdc中的操作了。 int librados::IoCtxImpl::operate(const object_t& oid, ::ObjectOperation *o, ceph::real_time *pmtime, int flags) { ceph::real_time ut = (pmtime ? *pmtime : ceph::real_clock::now()); /* can't write to a snapshot */ if (snap_seq != CEPH_NOSNAP) return -EROFS; if (!o->size()) return 0; ceph::mutex mylock = ceph::make_mutex("IoCtxImpl::operate::mylock"); ceph::condition_variable cond; bool done; int r; version_t ver; Context *oncommit = new C_SafeCond(mylock, cond, &done, &r); int op = o->ops[0].op.op; ldout(client->cct, 10) << ceph_osd_op_name(op) << " oid=" << oid << " nspace=" << oloc.nspace << dendl; Objecter::Op *objecter_op = objecter->prepare_mutate_op( oid, oloc, *o, snapc, ut, flags | extra_op_flags, oncommit, &ver); objecter->op_submit(objecter_op); { std::unique_lock l{mylock}; cond.wait(l, [&done] { return done;}); } ldout(client->cct, 10) << "Objecter returned from " << ceph_osd_op_name(op) << " r=" << r << dendl; set_sync_op_version(ver); return r; }
总结
一个op经过初始化create,设置参数,setxattr,后面交给ioctx进行operate,submit给osdc。
下一篇,介绍osdc部分的处理。