ceph-mon运行原理分析

news2025/1/6 20:25:16

一、流程:ceph-deploy部署ceph-mon组建集群

1.ceph-deploy部署ceph-mon的工作流程及首次启动

1)通过命令创建ceph-mon,命令为:ceph-deploy create mon keyring

def mon(args):
    if args.subcommand == 'create':
        mon_create(args)
    elif args.subcommand == 'add':
        mon_add(args)
    elif args.subcommand == 'destroy':
        mon_destroy(args)
    elif args.subcommand == 'create-initial':
        mon_create_initial(args)
    else:
        LOG.error('subcommand %s not implemented', args.subcommand)

2)在创建mon时,会根据传入的args参数生成配置文件ceph.conf。

def mon_create(distro, args, monitor_keyring):
    hostname = distro.conn.remote_module.shortname()
    logger = distro.conn.logger
    logger.debug('remote hostname: %s' % hostname)
    path = paths.mon.path(args.cluster, hostname)
    uid = distro.conn.remote_module.path_getuid(constants.base_path)
    gid = distro.conn.remote_module.path_getgid(constants.base_path)
    done_path = paths.mon.done(args.cluster, hostname)
    init_path = paths.mon.init(args.cluster, hostname, distro.init)
 
    conf_data = conf.ceph.load_raw(args)
 
    # write the configuration file
    distro.conn.remote_module.write_conf(    #写入配置/etc/ceph/ceph.conf
        args.cluster,
        conf_data,
        args.overwrite_conf,
    )
 
def write_conf(cluster, conf, overwrite):  #写入配置/etc/ceph/ceph.conf
    """ write cluster configuration to /etc/ceph/{cluster}.conf """
    import os
 
    path = '/etc/ceph/{cluster}.conf'.format(cluster=cluster)
    tmp = '{path}.{pid}.tmp'.format(path=path, pid=os.getpid())
 
    if os.path.exists(path):
        with open(path) as f:
            old = f.read()
            if old != conf and not overwrite:
                raise RuntimeError('config file %s exists with different content; use --overwrite-conf to overwrite' % path)
    with open(tmp, 'w') as f:
        f.write(conf)
        f.flush()
        os.fsync(f)
    os.rename(tmp, path)

3)检查ceph-mon组件工作目录(/var/lib/ceph/mon/mycluster-myhostname)是否存在,不存在就创建,除了创建该目录外,还需要在该路径下创建keyring秘钥。然后执行命令"ceph-mon --cluster args.cluster --mkfs -i hostname --keyring --setuser  uid  --setgroup gid"启动ceph-mon进程,此时也是第一次启动ceph-mon。然后它会创建done文件并启动cepn-mon服务。

# if the mon path does not exist, create it
    distro.conn.remote_module.create_mon_path(path, uid, gid)  #path为/var/lib/ceph/mon/mycluster-myhostname
    if not distro.conn.remote_module.path_exists(done_path):
        logger.debug('done path does not exist: %s' % done_path)
        if not distro.conn.remote_module.path_exists(paths.mon.constants.tmp_path):   #如果路径不存在还需要创建keyring
            logger.info('creating tmp path: %s' % paths.mon.constants.tmp_path)
            distro.conn.remote_module.makedir(paths.mon.constants.tmp_path)
        keyring = paths.mon.keyring(args.cluster, hostname)
 
        logger.info('creating keyring file: %s' % keyring)
        distro.conn.remote_module.write_monitor_keyring(  #创建keyring
            keyring,
            monitor_keyring,
            uid, gid,
        )
        user_args = []
        if uid != 0:
            user_args = user_args + [ '--setuser', str(uid) ]
        if gid != 0:
            user_args = user_args + [ '--setgroup', str(gid) ]
 
        remoto.process.run(   #第一次运行时需要执行的命令
            distro.conn,
            [
                'ceph-mon',
                '--cluster', args.cluster,
                '--mkfs',  
                '-i', hostname,
                '--keyring', keyring,
            ] + user_args
        )
    # create the done file 创建done文件
    distro.conn.remote_module.create_done_path(done_path, uid, gid)
 
    # create init path
    distro.conn.remote_module.create_init_path(init_path, uid, gid)
    # start mon service 启动服务
    start_mon_service(distro, args.cluster, hostname) 
 
 
def create_mon_path(path, uid=-1, gid=-1):
    """create the mon path if it does not exist"""
    if not os.path.exists(path):
        os.makedirs(path)
        os.chown(path, uid, gid);

4)启动之后,需要将ceph-mon加入到mon_in_quorum里面,这是一个set的数据结构,这里面包含着集群的所有ceph-mon。该mon_in_quorum里面包含着leader,其他全都是peon(普通成员)。

def mon_create_initial(args):
 # create them normally through mon_create
 args.mon = mon_initial_members
 mon_create(args)
 
 # make the sets to be able to compare late
 mon_in_quorum = set([])  
 for host in mon_initial_members:
        mon_name = 'mon.%s' % host
        LOG.info('processing monitor %s', mon_name)
        sleeps = [20, 20, 15, 10, 10, 5]
        tries = 5
        rlogger = logging.getLogger(host)
        distro = hosts.get(
            host,
            username=args.username,
            callbacks=[packages.ceph_is_installed]
        )
 
        while tries:
            status = mon_status_check(distro.conn, rlogger, host, args)
            has_reached_quorum = status.get('state', '') in ['peon', 'leader']
            if not has_reached_quorum:
                LOG.warning('%s monitor is not yet in quorum, tries left: %s' % (mon_name, tries))
                tries -= 1
                sleep_seconds = sleeps.pop()
                LOG.warning('waiting %s seconds before retrying', sleep_seconds)
                time.sleep(sleep_seconds)  # Magic number
            else:
                mon_in_quorum.add(host)  //添加进mon_in_quorum
                LOG.info('%s monitor has reached quorum!', mon_name)
                break
        distro.conn.exit()


2.ceph-mon数据存储方式

1)存储方式:mon它的数据可以通过两种方式来进行存储,一种是rocksDB存储、一种是leveldb存储,在ceph中具体使用哪一种存储方式取决于/var/lib/ceph/mon/$ceph-id目录下的kv_backend文件的内容,如果kv_backend中为rocksdb,则使用rocksdb存储,若为空或读取错误时,使用leveldb存储,它们都是一个key/value类型的数据库,区别在于rocksdb配置更灵活,支持的压缩算法比较多,除了snappy压缩外还支持zstd压缩,并且压缩比也更高。

int open(ostream &out) {
    string kv_type;
    int r = read_meta("kv_backend", &kv_type); //读取kv_backend文件,获取存储类型kv_type
    if (r < 0 || kv_type.empty()) { 
      // assume old monitors that did not mark the type were leveldb.
      kv_type = "leveldb";
      r = write_meta("kv_backend", kv_type);
      if (r < 0)
    return r;
    }
    _open(kv_type);
    r = db->open(out);
    if (r < 0)
      return r;
.....
}

2)存储位置:mon的数据存储在一个可配置的路径mon_data下面,mon_data默认位置为/var/lib/ceph/mon/$ceph-id目录下,该目录存放了mon的keyring秘钥、kv存储引擎名称(rocksdb)、mon支持的版本(octopus)、以及RocksDB的存储文件store.db。

 Option("mon_data", Option::TYPE_STR, Option::LEVEL_ADVANCED)
  .set_flag(Option::FLAG_NO_MON_UPDATE)
  .set_default("/var/lib/ceph/mon/$cluster-$id") //默认mon_data配置路径为/var/lib/ceph/mon/$cluster-$id
  .add_service("mon")
  .set_description("path to mon database")
 
MonitorDBStore *store = new MonitorDBStore(g_conf()->mon_data);

ceph3:/var/lib/ceph/mon/ceph-ceph3# ls

done  keyring  kv_backend  min_mon_release  store.db  systemd

ceph3:/var/lib/ceph/mon/ceph-ceph3# cat kv_backend

rocksdb

ceph3:/var/lib/ceph/mon/ceph-ceph3#

3)ceph-mon数据主要包括集群健康状态、配置、osd是否存活和Paxos等数据,而存储在Rocksdb中的也正是这些数据,存储方式主要是采用SSTable(Sorted String Table)的方式存储。通过encode_pending将数据编码后存入rocksdb。

MonitorDBStore::TransactionRef t = paxos->get_pending_transaction();
 
if (should_stash_full())
  encode_full(t);
 
encode_pending(t);
have_pending = false;
 
if (format_version > 0) {
  t->put(get_service_name(), "format_version", format_version);
}

二、流程:ceph-mon加入集群后二次启动

1.启动流程

1.在ceph_mon.cc文件的main函数中,首先判断linxdfs序列号是否正确,然后设置线程名ceph-mon;接着读取启动时传入的命令行参数“/usr/bin/ceph-mon -f --cluster ceph --id ceph1 --setuser root --setgroup root”,并检验命令行参数。

int main(int argc, const char **argv)
{
  //检查序列号
  char* const linxdfspath = "/etc/linxsn/linxdfs_sn.conf";
.....
  ceph_pthread_setname(pthread_self(), "ceph-mon");
......
//解析命令行参数
 std::string val;
  for (std::vector<const char*>::iterator i = args.begin(); i != args.end(); ) {
    if (ceph_argparse_double_dash(args, i)) {
      break;
    } else if (ceph_argparse_flag(args, i, "--mkfs", (char*)NULL)) { //若命令行参数中有mkfs参数,则会进行370的mkfs操作
      mkfs = true;
    } else if (ceph_argparse_flag(args, i, "--compact", (char*)NULL)) {
      compact = true;
    } else if (ceph_argparse_flag(args, i, "--force-sync", (char*)NULL)) {
      force_sync = true;
    } else if (ceph_argparse_flag(args, i, "--yes-i-really-mean-it", (char*)NULL)) {
      yes_really = true;
    } else if (ceph_argparse_witharg(args, i, &val, "--osdmap", (char*)NULL)) {
      osdmapfn = val;
    } else if (ceph_argparse_witharg(args, i, &val, "--inject_monmap", (char*)NULL)) {
      inject_monmap = val;
    } else if (ceph_argparse_witharg(args, i, &val, "--extract-monmap", (char*)NULL)) {
      extract_monmap = val;
    } else {
      ++i;
    }
  }

2.然后进行mkfs流程,该流程里面会检查并创建/var/lib/ceph/mon/$ceph_id目录,该目录包括以下几个文件:done keyring kv_backend min_mon_release systemd和子目录 store.db 。

// -- mkfs --
  if (mkfs) { //第一次启动时,mkfs一定会为true,并进入该if内部创建/var/lib/ceph/$ceph_id目录,同时会为该目录填充内容
 
    int err = check_mon_data_exists(); //当mkfs为true时,第一次启动会检查mon_data存在,不存在会mkdir创建
    if (err == -ENOENT) {
      if (::mkdir(g_conf()->mon_data.c_str(), 0755)) {
    derr << "mkdir(" << g_conf()->mon_data << ") : "
         << cpp_strerror(errno) << dendl;
    exit(1);
      }
    } else if (err < 0) {
      derr << "error opening '" << g_conf()->mon_data << "': "
           << cpp_strerror(-err) << dendl;
      exit(-err);
    }

3.构建monmap,将mon_data中的数据(store.db)decode解码到bufflist中,再写入到文件,以此来构建monmap。

......
  MonMap monmap;  //构建monmap
  {
    // note that even if we don't find a viable monmap, we should go ahead
    // and try to build it up in the next if-else block.
    bufferlist mapbl;
    int err = obtain_monmap(*store, mapbl);   //从store.db中获取monmap信息并构建monmap
    if (err >= 0) {
      try {
        monmap.decode(mapbl);
      } catch (const buffer::error& e) {
        derr << "can't decode monmap: " << e.what() << dendl;
      }
    } else {
      derr << "unable to obtain a monmap: " << cpp_strerror(err) << dendl;
    }
 
    dout(10) << __func__ << " monmap:\n";
    JSONFormatter jf(true);
    jf.dump_object("monmap", monmap);
    jf.flush(*_dout);
    *_dout << dendl;
 
    if (!extract_monmap.empty()) {
      int r = mapbl.write_file(extract_monmap.c_str());

4.创建Messager对象msgr,从monmap中获取rank并绑定到msgr上面,设置msgr信息、绑定地址等

//创建msgr
  Messenger *msgr = Messenger::create(g_ceph_context, public_msgr_type,
                      entity_name_t::MON(rank), "mon",
                      0,  // zero nonce
                      Messenger::HAS_MANY_CONNECTIONS);
  msgr->set_cluster_protocol(CEPH_MON_PROTOCOL);
  msgr->set_default_send_priority(CEPH_MSG_PRIO_HIGH);
 
  msgr->set_default_policy(Messenger::Policy::stateless_server(0));
  msgr->set_policy(entity_name_t::TYPE_MON,
                   Messenger::Policy::lossless_peer_reuse(
             CEPH_FEATURE_SERVER_LUMINOUS));
  msgr->set_policy(entity_name_t::TYPE_OSD,
                   Messenger::Policy::stateless_server(
             CEPH_FEATURE_SERVER_LUMINOUS));
  msgr->set_policy(entity_name_t::TYPE_CLIENT,
                   Messenger::Policy::stateless_server(0));
  msgr->set_policy(entity_name_t::TYPE_MDS,
                   Messenger::Policy::stateless_server(0));
  // bind
  err = msgr->bindv(bind_addrs);
  if (public_addrs != bind_addrs) {
    msgr->set_addrs(public_addrs);
  }

5.创建Monitor对象mon,设置传入的cmd信息,调用preinit进行预初始化(预初始化里面主要包括对paxos、msgr对应的服务端,客户端初始化)。

//创建mon对象
  mon = new Monitor(g_ceph_context, g_conf()->name.get_id(), store,
            msgr, mgr_msgr, &monmap);  //创建mon对象
  mon->orig_argc = argc;
  mon->orig_argv = argv;
  err = mon->preinit();  //预初始化
int Monitor::preinit()
{
  paxos->init_logger();
  init_paxos();
  messenger->set_auth_client(this);
  messenger->set_auth_server(this);
  mgr_messenger->set_auth_client(this);
....
}

6.启动msgr,然后调用init对mon进行初始化同时启动mon。

msgr->start();
mgr_msgr->start();
 
mon->init(); //初始化mon

7.当触发SIGINT、SIGTERM信号时就会释放所有mon、msgr等资源。

register_async_signal_handler_oneshot(SIGINT, handle_mon_signal);
  register_async_signal_handler_oneshot(SIGTERM, handle_mon_signal);
 
  if (g_conf()->inject_early_sigterm)
    kill(getpid(), SIGTERM);
 
  msgr->wait();
  mgr_msgr->wait();
 
  store->close();
 
  shutdown_async_signal_handler();
 
  delete mon;
  delete store;
  delete msgr;
 
.....

3.加入集群

ceph-mon需要与其他监视器节点进行通信以构建监视器集群。它会尝试连接到其他已知的监视器节点,并通过消息交换建立集群中的监视器之间的通信。

3.1)建立通信连接(绑定地址、端口等)

 

 ceph-mon模块通信依赖于AsyncMessager的异步通信,在ceph-mon.cc里面创建mon和Messenger对象(由于继承关系实质上是创建的AsyncMessenger对象),并且在初始化mon和AsyncMessager时,服务端会绑定本机ip和端口(通过配置获取),然后再调用_init_local_connection函数建立连接。

//创建Messenger对象
Messenger *Messenger::create(CephContext *cct, const string &type,
                 entity_name_t name, string lname,
                 uint64_t nonce, uint64_t cflags)
{
  int r = -1;
  if (type == "random") {
    r = 0;
    //r = ceph::util::generate_random_number(0, 1);
  }
  if (r == 0 || type.find("async") != std::string::npos)
    return new AsyncMessenger(cct, name, type, std::move(lname), nonce);  //异步对象
  lderr(cct) << "unrecognized ms_type '" << type << "'" << dendl;
  return nullptr;
} 
 
// bind
  err = msgr->bindv(bind_addrs);
  if (err < 0) {
    derr << "unable to bind monitor to " << bind_addrs << dendl;
    prefork.exit(1);
  }
//绑定socket具体实现
int AsyncMessenger::bindv(const entity_addrvec_t &bind_addrs)
{
  lock.lock();
 
  if (!pending_bind && started) {
    ldout(cct,10) << __func__ << " already started" << dendl;
    lock.unlock();
    return -1;
  }
 
  ldout(cct,10) << __func__ << " " << bind_addrs << dendl;
 
  if (!stack->is_ready()) {
    ldout(cct, 10) << __func__ << " Network Stack is not ready for bind yet - postponed" << dendl;
    pending_bind_addrs = bind_addrs;
    pending_bind = true;
    lock.unlock();
    return 0;
  }
 
  lock.unlock();
 
  // bind to a socket
  set<int> avoid_ports;
  entity_addrvec_t bound_addrs;
  unsigned i = 0;
  for (auto &&p : processors) {
    int r = p->bind(bind_addrs, avoid_ports, &bound_addrs);
    if (r) {
      // Note: this is related to local tcp listen table problem.
      // Posix(default kernel implementation) backend shares listen table
      // in the kernel, so all threads can use the same listen table naturally
      // and only one thread need to bind. But other backends(like dpdk) uses local
      // listen table, we need to bind/listen tcp port for each worker. So if the
      // first worker failed to bind, it could be think the normal error then handle
      // it, like port is used case. But if the first worker successfully to bind
      // but the second worker failed, it's not expected and we need to assert
      // here
      ceph_assert(i == 0);
      return r;
    }
    ++i;
  }
  _finish_bind(bind_addrs, bound_addrs);
  return 0;
}
//启动AsyncMessenger
int AsyncMessenger::start()
{
  std::scoped_lock l{lock};
  ldout(cct,1) << __func__ << " start" << dendl;
 
  // register at least one entity, first!
  ceph_assert(my_name.type() >= 0);
 
  ceph_assert(!started);
  started = true;
  stopped = false;
 
  if (!did_bind) {
    entity_addrvec_t newaddrs = *my_addrs;
    for (auto& a : newaddrs.v) {
      a.nonce = nonce;
    }
    set_myaddrs(newaddrs);
    _init_local_connection();  //建立连接
  }
 
  return 0;
}
3.2)加入集群

ceph-mon在与其他ceph-mon建立起链接过后会进入STATE_PROBING状态,然后发送OP_PROBE消息给各个节点,等待其他节点同步完成后开始插入到集群中。

void Monitor::bootstrap() 
{
.....
  // probe monitors
  dout(10) << "probing other monitors" << dendl;
  for (unsigned i = 0; i < monmap->size(); i++) {
    if ((int)i != rank)
      send_mon_message(
    new MMonProbe(monmap->fsid, MMonProbe::OP_PROBE, name, has_ever_joined,  //发送probe消息给其他节点
              ceph_release()),
    i);
......
  dout(10) << "bootstrap" << dendl;
  wait_for_paxos_write();  //等待其他节点同步完成
......
  if (monmap->contains(name))
    quorum.insert(name);  //插入集群中
....
}

4.mon选举

当mon增加或减少时,ceph-mon进程会触发回调函数call_async里start_election开始进行选举,在该函数里主要做了以下几件事:

1)如果Paxos正在STATE_WRITING或者STATE_WRITING_PREVIOUS状态,则等待paxos更新完成。

2)调用_reset()重置monitor中的服务,包括probe timeout事件、health检查事件、scrub事件等,并且restart paxos以及所有的paxos service服务。

3)设置自己进入STATE_ELECTING状态,并增加l_mon_num_elections和l_mon_election_call这些统计数据。

4)调用elector的call_election()进行选举。

 

void Monitor::start_election()
{
  dout(10) << "start_election" << dendl;
  wait_for_paxos_write(); //等待paxos的更新完成
  _reset();
  state = STATE_ELECTING;  //设置自身状态
 
  logger->inc(l_mon_num_elections);
  logger->inc(l_mon_election_call);
 
  clog->info() << "mon." << name << " calling monitor election";
  elector.call_election();
}
 
// called by bootstrap(), or on leader|peon -> electing
void Monitor::_reset()
{
  dout(10) << __func__ << dendl;
 
  // disable authentication
  {
    std::lock_guard l(auth_lock);
    authmon()->_set_mon_num_rank(0, 0);
  }
 
  cancel_probe_timeout();
  timecheck_finish();
  health_events_cleanup();  //重置健康服务
  health_check_log_times.clear();
  scrub_event_cancel();
 
  leader_since = utime_t();
  quorum_since = {};
  if (!quorum.empty()) {
    exited_quorum = ceph_clock_now();
  }
  quorum.clear();
  outside_quorum.clear();  //重置选举服务
  quorum_feature_map.clear();
 
  scrub_reset();
 
  paxos->restart();
 
  for (auto& svc : paxos_service) {
    svc->restart();
  }
}

5)Elector::call_election (),在这里主要做了以下几件事:

5.1)从Mon store中读出mon的election_epoch存储在epoch中,更新epoch的值使其变为奇数,表明进入了选举cycle。epoch为偶数,表明已经形成了稳定的quorum。epoch为偶数时表示为稳定状态,奇数为还在选举中。

5.2)把自己加入到acked_me map中,并设置electing_me为true,希望大家选自己当leader。

5.3)向monmap中的成员发送MMonElection::OP_PROPOSE消息。

void ElectionLogic::start()
{
  if (!participating) {
    ldout(cct, 0) << "not starting new election -- not participating" << dendl;
    return;
  }
  ldout(cct, 5) << "start -- can i be leader?" << dendl;
 
  acked_me.clear();
  init();
   
  // start by trying to elect me
  if (epoch % 2 == 0) {
    bump_epoch(epoch+1);  // odd == election cycle·更新epoch值为奇数
  } else {
    elector->validate_store();
  }
  electing_me = true;
  acked_me.insert(elector->get_my_rank());  //加入acked_me
  leader_acked = -1;
 
  elector->propose_to_peers(epoch);  //发送OP_PROPOSE消息
  elector->_start();
}

6)其它的Monitor收到消息后,经过dispatch逻辑,即Monitor:: ms_dispatch() --> Monitor::_ms_dispatch() --> Monitor::dispatch_op()--> Elector::dispatch(),之后进入消息处理流程。dispatch()中调用Elector::handle_propose(),首先确保收到消息的epoch版本是处于选举的版本(奇数)并且满足对feature的要求,接着判断将自己的选举epoch设置为和消息中包含的epoch的值,最后调用ElectionLogic::receive_propose比对rank值,如果其他的Monitor它们自己的rank值更小,则自己不去确认此次选举,而是重新发起一轮选举,如果它们自己的rank值更大,则进入Elector::defer()流程,发送MMonElection::OP_ACK消息,确认该轮选举为最小的那个Monitor,这样经过rank值小的Monitor多次选举后,最终选出了rank值最小的那个Monitor,选它为leader。

bool ms_dispatch(Message *m) override {
    std::lock_guard l{lock};
    _ms_dispatch(m); //
    return true;
  }
 
void Monitor::_ms_dispatch(Message *m)
{
......
  if ((is_synchronizing() ||
       (!s->authenticated && !exited_quorum.is_zero())) &&
      !src_is_mon &&
      m->get_type() != CEPH_MSG_PING) {
    waitlist_or_zap_client(op);
  } else {
    dispatch_op(op);  //
  }
  return;
}
void Monitor::dispatch_op(MonOpRequestRef op)
{
......
    // elector messages
    case MSG_MON_ELECTION:
      op->set_type_election();
      //check privileges here for simplicity
      if (!op->get_session()->is_capable("mon", MON_CAP_X)) {
        dout(0) << "MMonElection received from entity without enough caps!"
          << op->get_session()->caps << dendl;
        return;;
      }
      if (!is_probing() && !is_synchronizing()) {
        elector.dispatch(op);  //
      }
......
}
 
void Elector::dispatch(MonOpRequestRef op)
{
  op->mark_event("elector:dispatch");
  ceph_assert(op->is_type_election());
 
  switch (op->get_req()->get_type()) {
     
  case MSG_MON_ELECTION:
......
      switch (em->op) {
      case MMonElection::OP_PROPOSE:  //处理OP_PROPOSE消息
    handle_propose(op);
    return;
......
}
 
void Elector::handle_propose(MonOpRequestRef op)
{
  op->mark_event("elector:handle_propose");
  auto m = op->get_req<MMonElection>();
  dout(5) << "handle_propose from " << m->get_source() << dendl;
  int from = m->get_source().num();
 
  ceph_assert(m->epoch % 2 == 1); // election  确保选举epoch为奇数
  uint64_t required_features = mon->get_required_features();
  mon_feature_t required_mon_features = mon->get_required_mon_features();
 
  dout(10) << __func__ << " required features " << required_features
           << " " << required_mon_features
           << ", peer features " << m->get_connection()->get_features()
           << " " << m->mon_features
           << dendl;
 
  if ((required_features ^ m->get_connection()->get_features()) &
      required_features) {
    dout(5) << " ignoring propose from mon" << from
        << " without required features" << dendl;
    nak_old_peer(op);
    return;
  } else if (mon->monmap->min_mon_release > m->mon_release) {
    dout(5) << " ignoring propose from mon" << from
        << " release " << (int)m->mon_release
        << " < min_mon_release " << (int)mon->monmap->min_mon_release
        << dendl;
    nak_old_peer(op);
    return;
  } else if (!m->mon_features.contains_all(required_mon_features)) {
    // all the features in 'required_mon_features' not in 'm->mon_features'
    mon_feature_t missing = required_mon_features.diff(m->mon_features);
    dout(5) << " ignoring propose from mon." << from
            << " without required mon_features " << missing
            << dendl;
    nak_old_peer(op);
  }
  logic.receive_propose(from, m->epoch);  //比对rank值,决定选举权
}
 
void ElectionLogic::receive_propose(int from, epoch_t mepoch)
{
......
  if (elector->get_my_rank() < from) {
    // i would win over them.
    if (leader_acked >= 0) {        // we already acked someone
      ceph_assert(leader_acked < from);  // and they still win, of course
      ldout(cct, 5) << "no, we already acked " << leader_acked << dendl;
    } else {
      // wait, i should win!
      if (!electing_me) {
    elector->trigger_new_election();
      }
    }
  } else {   //自身rank值更大
    // they would win over me
    if (leader_acked < 0 ||      // haven't acked anyone yet, or
    leader_acked > from ||   // they would win over who you did ack, or
    leader_acked == from) {  // this is the guy we're already deferring to
      defer(from);  //确认选举
    } else {
      // ignore them!
      ldout(cct, 5) << "no, we already acked " << leader_acked << dendl;
    }
  }......}

5.同步数据

选举完成后,ceph-mon需要同步leader节点数据,触发MSG_MON_SYNC事件类型,经过调用栈dispatch_op->handle_sync->handle_sync_chunk→sync_finish调用apply_transaction进行数据同步。

void Monitor::sync_finish(version_t last_committed)
{
......
 if (sync_full) {
    // finalize the paxos commits
    auto tx(std::make_shared<MonitorDBStore::Transaction>());
    paxos->read_and_prepare_transactions(tx, sync_start_version,
                     last_committed);
    tx->put(paxos->get_name(), "last_committed", last_committed);
 
    dout(30) << __func__ << " final tx dump:\n";
    JSONFormatter f(true);
    tx->dump(&f);
    f.flush(*_dout);
    *_dout << dendl;
 
    store->apply_transaction(tx);
  }
......
}

6.健康检查

当其他节点传入的消息op类型为CEPH_MSG_PING时,mon会执行handle_ping流程去处理,处理过程是先通过op获取到请求的消息,然后构造reply消息进行回复,reply消息的内容是通过mon内置的healthMonitor获取到的状态信息。

void Monitor::dispatch_op(MonOpRequestRef op)
{
.......
    case CEPH_MSG_PING:
      handle_ping(op); 
      return;
......
}
 
void Monitor::handle_ping(MonOpRequestRef op)
{
  auto m = op->get_req<MPing>();
  dout(10) << __func__ << " " << *m << dendl;
  MPing *reply = new MPing;
  bufferlist payload;
  boost::scoped_ptr<Formatter> f(new JSONFormatter(true));
  f->open_object_section("pong");
 
  healthmon()->get_health_status(false, f.get(), nullptr);
  get_mon_status(f.get());
 
  f->close_section();
  stringstream ss;
  f->flush(ss);
  encode(ss.str(), payload);
  reply->set_payload(payload);  //设置发送内容,即健康信息
  dout(10) << __func__ << " reply payload len " << reply->get_payload().length() << dendl;
  m->get_connection()->send_message(reply);  //发送回复
}

三、ceph-mon集群正常工作时的工作流程

ceph-mon集群正常运行情况下,mon数量和状态并没有发生变化,因此不会触发重新选举leader的行为,所以此时的ceph-mon更多的是监控和维护集群的状态,它会执行一些监控流程,比如监控集群状态情况、记录日志等。

1.记录日志

ceph-mon通过dout宏来将日志输出到指定文件中,日志路径可通过配置写入log_file变量中,当需要打印日志时,可通过如下方式写入日志到文件中(需要将ceph.conf中对应模块日志级别debug mgr、debug mon等调至20 dout(20)才能生效):

void LogMonitor::update_from_paxos(bool *need_bootstrap)
{
.......
      if (g_conf()->mon_cluster_log_to_file) {  //获取配置中的log_file变量,该变量存放日志位置
    string log_file = channels.get_log_file(channel);
    dout(20) << __func__ << " logging for channel '" << channel
         << "' to file '" << log_file << "'" << dendl;
......
}

2.监控集群状态

2.1)ceph-mon定期进行对集群其他节点进行状态收集,状态收集的周期默认为30s,可通过mon_data_avail_warn进行配置更改周期长度,状态收集的过程实质是更新monmap、osdmap和pgmap这些表来监控集群的状态。

Option("mon_data_avail_warn", Option::TYPE_INT, Option::LEVEL_ADVANCED)
.set_default(30)  //配置默认30s
.add_service("mon")
.set_description("issue MON_DISK_LOW health warning when mon available space below this percentage"),

2.2)每个节点的ceph-mon都会收集自身的节点状态,然后互相通信来同步各自节点的状态。

2.2.1)ceph-mon 在处理同步的流程中,根据ceph-mon发出的同步请求MMonSync::OP_CHUNK给leader进行处理,调用Monitor::handle_sync_chunk(MonOpRequestRef op)将数据发送给集群leader节点。

void Monitor::handle_sync(MonOpRequestRef op)
{
  auto m = op->get_req<MMonSync>();
  dout(10) << __func__ << " " << *m << dendl;
  switch (m->op) {
 
    // provider ---------
 
  case MMonSync::OP_CHUNK:  //同步
  case MMonSync::OP_LAST_CHUNK:
    handle_sync_chunk(op); 
    break;
......
}
 
void Monitor::handle_sync_chunk(MonOpRequestRef op)
{
......
  if (m->op == MMonSync::OP_CHUNK) {
    sync_reset_timeout();
    sync_get_next_chunk();
  } else if (m->op == MMonSync::OP_LAST_CHUNK) {
    sync_finish(m->last_committed);
  }
......
}

2.2.2)选举完成后,ceph-mon需要同步leader节点数据,触发MSG_MON_SYNC事件类型,经过调用栈dispatch_op->handle_sync->handle_sync_chunk→sync_finish调用apply_transaction进行数据同步。

void Monitor::sync_finish(version_t last_committed)
{
......
 if (sync_full) {
    // finalize the paxos commits
    auto tx(std::make_shared<MonitorDBStore::Transaction>());
    paxos->read_and_prepare_transactions(tx, sync_start_version,
                     last_committed);
    tx->put(paxos->get_name(), "last_committed", last_committed);
 
    dout(30) << __func__ << " final tx dump:\n";
    JSONFormatter f(true);
    tx->dump(&f);
    f.flush(*_dout);
    *_dout << dendl;
 
    store->apply_transaction(tx);
  }
......
}

其他:ceph-mon通信方式分析

1)vip迁移到另外节点,ceph-mon恢复需要同步哪些数据

当vip发生迁移时,需要同步迁移ceph-mon的节点的/var/lib/ceph/mon/$cluster-$ceph-id/目录内的所有数据,因为该目录存储了ceph-mon的所有数据。可参考:https://www.bookstack.cn/read/ceph-handbook/Advance_usage-mon_backup.mdhttps://www.bookstack.cn/read/ceph-handbook/Advance_usage-mon_backup.md

2)数据通信

建立通信连接后,AsyncMessenger对象中的NetworkStack成员会默认创建三个worker(可配置),每个worker线程被创建时都会被命名为msgr-worker-0/1/2以此类推,这些线程是真正被用来进行通信的,具体通信方式是:每个线程中包含一个EventCenter去获取发生的事件,通过EventCenter内置的EpollDriver对象来获取并处理这些事件,该对象使用epoll网络模型,当某个socket有事件到来时,会被该epoll对象监测到并根据不同的事件类型进行处理,EventCenter中支持的事件类型有file事件和timer事件,主要包含事件的创建、删除以及处理超时事件。

NetworkStack::NetworkStack(CephContext *c, const string &t): type(t), started(false), cct(c)
{
  const uint64_t InitEventNumber = 5000;
  num_workers = cct->_conf->ms_async_op_threads;        // cct->_conf->ms_async_op_threads默认配置为3
  for (unsigned i = 0; i < num_workers; ++i) {
    Worker *w = create_worker(cct, type, i);
    w->center.init(InitEventNumber, i, type);
    workers.push_back(w);
  }
  cct->register_fork_watcher(this);
}
//线程命名为msgr-worker-%u
std::function<void ()> NetworkStack::add_thread(unsigned worker_id)
{
  Worker *w = workers[worker_id];  //worker线程
  return [this, w]() {
      char tp_name[16];
      sprintf(tp_name, "msgr-worker-%u", w->id);
      ceph_pthread_setname(pthread_self(), tp_name);
      const unsigned EventMaxWaitUs = 30000000;
      w->center.set_owner();  //创建CenterDriver
      ldout(cct, 10) << __func__ << " starting" << dendl;
      w->initialize();
      w->init_done();
      while (!w->done) {
        ldout(cct, 30) << __func__ << " calling event process" << dendl;
 
//创建worker如下
Worker* NetworkStack::create_worker(CephContext *c, const string &type, unsigned i)
{
  if (type == "posix")
    return new PosixWorker(c, i);
  ...
}
//EventCenter
class Worker : public Thread {
  ...
  EventCenter center;
  ...
}
 
//初始化EventCenter
int EventCenter::init(int nevent, unsigned center_id, const std::string &type)
{
  // can't init multi times
  ceph_assert(this->nevent == 0);
 
  this->type = type;
  this->center_id = center_id;
 
  if (type == "dpdk") {
#ifdef HAVE_DPDK
    driver = new DPDKDriver(cct);
#endif
  } else {
#ifdef HAVE_EPOLL
  driver = new EpollDriver(cct);  //使用epoll模型
#else
#ifdef HAVE_KQUEUE
  driver = new KqueueDriver(cct);
#else
  driver = new SelectDriver(cct);
#endif
#endif
  }
......
 
  int fds[2];
  if (pipe_cloexec(fds, 0) < 0) {  //创建管道
    int e = errno;
    lderr(cct) << __func__ << " can't create notify pipe: " << cpp_strerror(e) << dendl;
    return -e;
  }
 
  notify_receive_fd = fds[0];
  notify_send_fd = fds[1];
  r = net.set_nonblock(notify_receive_fd); //设置非阻塞socket
  if (r < 0) {
    return r;
  }
  r = net.set_nonblock(notify_send_fd);
  if (r < 0) {
    return r;
  }
 
  return r;
}
}  // Used by internal thread
  int create_file_event(int fd, int mask, EventCallbackRef ctxt);  //创建file事件
  uint64_t create_time_event(uint64_t milliseconds, EventCallbackRef ctxt); //创建timer事件
  void delete_file_event(int fd, int mask);
  void delete_time_event(uint64_t id);
  int process_events(unsigned timeout_microseconds, ceph::timespan *working_dur = nullptr);  //处理超时事件


本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/796816.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

【Jenkins系列】-Pipeline语法全集

Jenkins为您提供了两种开发Pipeline的方式&#xff1a;脚本式和声明式。 脚本式流水线&#xff08;也称为“传统”流水线&#xff09;基于Groovy作为其特定于域的语言。而声明式流水线提供了简化且更友好的语法&#xff0c;并带有用于定义它们的特定语句&#xff0c;而无需学习…

金融行业软件测试面试题及其答案

下面是一些常见的金融行业软件测试面试题及其答案&#xff1a; 1. 什么是金融行业软件测试&#xff1f; 金融行业软件测试是针对金融领域的软件系统进行验证和确认的过程&#xff0c;旨在确保软件在安全、稳定、可靠和符合法规要求的条件下运行。 2. 解释一下金融软件中的风险…

golang,gin框架的请求参数(一)--推荐

golang&#xff0c;gin框架的请求参数&#xff08;一&#xff09; gin框架的重要性不过多的强调&#xff0c;重点就gin使用中的参数传递&#xff0c;获取进行梳理文件&#xff0c;满足使用需求。 获取前端请求参数的几种方法&#xff1a; 一、获取参数【浏览器地址获取参数】…

linux的一些基本指令第二期

rm rm -r 你要删除的目录名 加了-r 之后表示递归删除你要删除的目录 使用这个命名之后&#xff0c;他会一边递归到叶子节点&#xff0c;一边询问你是否要删除&#xff0c;然后会递归回来&#xff0c;在回来的途中删除&#xff1a; rm -rf 你要删除的目录名 强制执行删除操作…

权威认证!伙伴云入选亿欧2023AIGC应用场景创新TOP50榜单

近日, 知名科技与产业创新服务平台【亿欧】发布2023年度AIGC应用场景创新TOP50榜单。 伙伴云作为行业领先的零代码应用搭建平台&#xff0c;率先推出AI智能搭建系统功能&#xff0c;并将Chat GPT应用嵌入伙伴云应用在解决方案层&#xff0c;最终凭借前沿的技术创新力和突出的数…

【NVMe2.0d 17 - 1】Reservation 核心命令

文章目录 Reservation RegisterReservation AcquireReservation ReportReservation Release Reservation Register Reservation Register命令用于注册、取消注册或替换reservation key。 该命令使用Command Dword 10和内存中的Reservation Register data structure 如果该命…

基于seaborn.countplot的柱状图显示Y值及填充形状

参考链接&#xff1a; seaborn 柱状图上显示y值的方法 python柱形图填充 figs,ax plt.subplots(figsize(10,4)) fig sns.countplot(x"状态",hue"等级", datadatas)// 不可通过设置hatch参数来设置 marks ["o","---","***&quo…

IDEA批量启动多个微服务

注&#xff1a;现在盛行微服务开发&#xff0c;通常需要通过idea启动多个项目&#xff0c;每次都一个一个的启动&#xff0c;太麻烦了&#xff0c;不过被担心idea帮我们考虑到这个了&#xff08;不用安装插件哦&#xff09; 配置步骤&#xff1a; 将需要一次性全部启动的微服务…

[STL]详解vector模拟实现

[STL]vector模拟实现 文章目录 [STL]vector模拟实现1. 整体结构总览2. 成员变量解析3. 默认成员函数构造函数1构造函数2构造函数3拷贝构造函数析构函数 4. 迭代器相关函数begin函数end函数begin函数const版本end函数const版本 5.容量相关函数size函数capacity函数reserve函数re…

【Nodejs】操作mongodb数据库

1.简介 Mongoose是一个让我们可以通过Node来操作MongoDB的模块。Mongoose是一个对象文档模型(ODM)库,它对Node原生的MongoDB模块进行了进一步的优化封装&#xff0c;并提供了更多的功能。在大多数情况下&#xff0c;它被用来把结构化的模式应用到一个MongoDB集合&#xff0c;并…

【每日一题】2500. 删除每行中的最大值

【每日一题】2500. 删除每行中的最大值 2500. 删除每行中的最大值题目描述解题思路 2500. 删除每行中的最大值 题目描述 给你一个 m x n 大小的矩阵 grid &#xff0c;由若干正整数组成。 执行下述操作&#xff0c;直到 grid 变为空矩阵&#xff1a; 从每一行删除值最大的元…

阿里Java开发手册~建表规约

1. 【强制】表达是与否概念的字段&#xff0c;必须使用 is _ xxx 的方式命名&#xff0c;数据类型是 unsigned tinyint &#xff08; 1 表示是&#xff0c; 0 表示否 &#xff09; 。 说明&#xff1a; 任何字段如果为非负数&#xff0c;必须是 unsigned 。 正例&am…

Jenkins+Docker+Docker-Compose自动部署,SpringCloud架构公共包一个任务配置

前言 Jenkins和docker的安装&#xff0c;随便百度吧&#xff0c;实际场景中我们很多微服务的架构&#xff0c;都是有公共包&#xff0c;肯定是希望一个任务能够把公共包的配置加进去&#xff0c;一并构建&#xff0c;ok&#xff0c;直接上干货。 Jenkins 全局环境安装 pwd e…

建造者模式——复杂对象的组装与创建

1、简介 1.1、概述 建造者模式又称为生成器模式&#xff0c;它是一种较为复杂、使用频率也相对较低的创建型模式。建造者模式向客户端返回的不是一个简单的产品&#xff0c;而是一个由多个部件组成的复杂产品。 建造者模式是较为复杂的创建型模式&#xff0c;它将客户端与包…

【chatGpt】关于websocket连接中对未授权的捕捉问题

目录 问题 有效提问 有效的细节提问 问题 一路上&#xff0c;通过简单的error进行判断弹出授权&#xff0c;会有很多乱弹的现象&#xff1a; &#xff08;1&#xff09;链路正常切换会断 &#xff08;2&#xff09;服务器没有启动会连接不上 &#xff08;3&#xff09;没…

Pytorch深度学习-----DataLoader的用法

系列文章目录 PyTorch深度学习——Anaconda和PyTorch安装 Pytorch深度学习-----数据模块Dataset类 Pytorch深度学习------TensorBoard的使用 Pytorch深度学习------Torchvision中Transforms的使用&#xff08;ToTensor&#xff0c;Normalize&#xff0c;Resize &#xff0c;Co…

【Golang】基于OAuth2.0微信扫码实现客户端用户登录(原理+代码实现+视频讲解)

前言: 细心汇总,包括原理+配置+代码详细实现 文章目录 原理讲解什么是OAuth2.0解决方案授权码模式讲解认证流程Go语言实现微信扫码登录1. 内网穿透配置2. 微信测试账号申请3. 验证和微信服务器连接二维码生成回调地址测试原理讲解 什么是OAuth2.0 OAuth 2.0是一种授权协议,…

JavaScript学习 --消息摘要算法

消息摘要算法&#xff08;也称哈希算法&#xff09;是一种将任意大小的数据转换为一个固定大小的数据序列的算法。在JavaScript中&#xff0c;常见的消息摘要算法包括MD5、SHA-1、SHA-256等。它们适用于安全传输敏感数据、防篡改数据等场景。在本篇博客中&#xff0c;我们将介绍…

slurm/sbatch/srun 多步骤串行运行多个依赖性任务

在slurm系统下&#xff0c;有时候需要按步骤运行A、B、C三个任务&#xff0c;但是直接写在脚本里会同时提交&#xff0c;所以需要建立依赖关系。 错误做法&#xff1a; 搜索网上做法及slurm串行教程&#xff0c;做法多为如下&#xff0c;使用bash或python来按顺序/循环内来串…

顺序表详解

&#x1f493;博主个人主页:不是笨小孩&#x1f440; ⏩专栏分类:数据结构与算法&#x1f440; &#x1f69a;代码仓库:笨小孩的代码库&#x1f440; ⏩社区&#xff1a;不是笨小孩&#x1f440; &#x1f339;欢迎大家三连关注&#xff0c;一起学习&#xff0c;一起进步&#…