OceanBase 升级过程研究(4.2.1.6-4.2.1.8)

news2024/11/16 13:30:39

模拟业务

使用benchmark加载10仓数据模拟业务场景

升级方法

使用滚动升级方式来进行OB升级。该方法前提是OB集群必须满足官方规定的高可用架构(如果 Zone 个数小于 3,滚动升级时则无法构成多数派), 滚动升级的原理就是轮流完成每个ZONE的升级工作,由于OB属于分布式集群,当一个ZONE处于升级状态中,其余的ZONE提供服务,不影响业务

任务类型

升级时会产生一个主任务及多个子任务。主任务根据每个需要替换 binary 版本的升级节点,来生成数个子任务,来逐步替换掉需要升级的 binary 文件。

  • 主任务:将升级路径中每个需要替换 binary 文件的升级节点都生成为一个子任务,用于逐步替换掉需要升级的 binary 文件。 如下图所示,Submit cluster upgrade task 即为生成一个子任务。Wait dag success 为等待子任务执行完成。Submit cluster upgrade task 和 Wait dag success 这个组合可能会执行多次,执行次数取决于升级路径中有多少个替换 binary 的节点。

  • 子任务:升级所有 OceanBase 集群版本。主要是执行升级脚本: 预检查脚本、升级 Pre 脚本、替换 OBServer 节点、升级 Post 脚本、版本检查等。

升级任务(子任务)

由于是滚动升级,所以子任务中的升级任务数量取决于ZONE的数量

升级前检查

在 set zone contaxt 任务模块前的所有任务均是升级前对集群的检查,检查租户的凭据以及运行安装目录etc下的升级检查脚本,查看当前集群是否满足升级。由于是检查操作,所以这些任务模块如果运行失败可以进行回滚。这些检查模块运行成功后开始执行ZONE的升级

upgrade_checker.py 脚本进行升级前置检查,脚本执行成功表示可以继续升级

upgrade_pre.py 该脚本主要用于执行升级前的一系列准备工作,确保集群处于适合升级的状态

upgrade_health_checker.py该脚本主要用于执行集群级别的健康检查,确保集群在升级前处于健康状态

upgrade_checker.py脚本内容

[root@sdw1 etc]# cat upgrade_checker.py
#!/usr/bin/env python
# -*- coding: utf-8 -*-


import sys
import os
import mysql.connector
from mysql.connector import errorcode
import logging
import getopt
import time


class UpgradeParams:
  log_filename = 'upgrade_checker.log'
  old_version = '4.0.0.0'
#### --------------start : my_error.py --------------
class MyError(Exception):
  def __init__(self, value):
    self.value = value
  def __str__(self):
    return repr(self.value)
#### --------------start : actions.py------------
class Cursor:
  __cursor = None
  def __init__(self, cursor):
    self.__cursor = cursor
  def exec_sql(self, sql, print_when_succ = True):
    try:
      self.__cursor.execute(sql)
      rowcount = self.__cursor.rowcount
      if True == print_when_succ:
        logging.info('succeed to execute sql: %s, rowcount = %d', sql, rowcount)
      return rowcount
    except mysql.connector.Error, e:
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
      raise e
    except Exception, e:
      logging.exception('normal error, fail to execute sql: %s', sql)
      raise e
  def exec_query(self, sql, print_when_succ = True):
    try:
      self.__cursor.execute(sql)
      results = self.__cursor.fetchall()
      rowcount = self.__cursor.rowcount
      if True == print_when_succ:
        logging.info('succeed to execute query: %s, rowcount = %d', sql, rowcount)
      return (self.__cursor.description, results)
    except mysql.connector.Error, e:
      logging.exception('mysql connector error, fail to execute sql: %s', sql)
      raise e
    except Exception, e:
      logging.exception('normal error, fail to execute sql: %s', sql)
      raise e


def set_parameter(cur, parameter, value):
  sql = """alter system set {0} = '{1}'""".format(parameter, value)
  logging.info(sql)
  cur.execute(sql)
  wait_parameter_sync(cur, parameter, value)


def wait_parameter_sync(cur, key, value):
  sql = """select count(*) as cnt from oceanbase.__all_virtual_sys_parameter_stat
           where name = '{0}' and value != '{1}'""".format(key, value)
  times = 10
  while times > 0:
    logging.info(sql)
    cur.execute(sql)
    result = cur.fetchall()
    if len(result) != 1 or len(result[0]) != 1:
      logging.exception('result cnt not match')
      raise e
    elif result[0][0] == 0:
      logging.info("""{0} is sync, value is {1}""".format(key, value))
      break
    else:
      logging.info("""{0} is not sync, value should be {1}""".format(key, value))


    times -= 1
    if times == 0:
      logging.exception("""check {0}:{1} sync timeout""".format(key, value))
      raise e
    time.sleep(5)


#### --------------start :  opt.py --------------
help_str = \
"""
Help:
""" +\
sys.argv[0] + """ [OPTIONS]""" +\
'\n\n' +\
'-I, --help          Display this help and exit.\n' +\
'-V, --version       Output version information and exit.\n' +\
'-h, --host=name     Connect to host.\n' +\
'-P, --port=name     Port number to use for connection.\n' +\
'-u, --user=name     User for login.\n' +\
'-t, --timeout=name  Cmd/Query/Inspection execute timeout(s).\n' +\
'-p, --password=name Password to use when connecting to server. If password is\n' +\
'                    not given it\'s empty string "".\n' +\
'-m, --module=name   Modules to run. Modules should be a string combined by some of\n' +\
'                    the following strings: ddl, normal_dml, each_tenant_dml,\n' +\
'                    system_variable_dml, special_action, all. "all" represents\n' +\
'                    that all modules should be run. They are splitted by ",".\n' +\
'                    For example: -m all, or --module=ddl,normal_dml,special_action\n' +\
'-l, --log-file=name Log file path. If log file path is not given it\'s ' + os.path.splitext(sys.argv[0])[0] + '.log\n' +\
'\n\n' +\
'Maybe you want to run cmd like that:\n' +\
sys.argv[0] + ' -h 127.0.0.1 -P 3306 -u admin -p admin\n'


version_str = """version 1.0.0"""


class Option:
  __g_short_name_set = set([])
  __g_long_name_set = set([])
  __short_name = None
  __long_name = None
  __is_with_param = None
  __is_local_opt = None
  __has_value = None
  __value = None
  def __init__(self, short_name, long_name, is_with_param, is_local_opt, default_value = None):
    if short_name in Option.__g_short_name_set:
      raise MyError('duplicate option short name: {0}'.format(short_name))
    elif long_name in Option.__g_long_name_set:
      raise MyError('duplicate option long name: {0}'.format(long_name))
    Option.__g_short_name_set.add(short_name)
    Option.__g_long_name_set.add(long_name)
    self.__short_name = short_name
    self.__long_name = long_name
    self.__is_with_param = is_with_param
    self.__is_local_opt = is_local_opt
    self.__has_value = False
    if None != default_value:
      self.set_value(default_value)
  def is_with_param(self):
    return self.__is_with_param
  def get_short_name(self):
    return self.__short_name
  def get_long_name(self):
    return self.__long_name
  def has_value(self):
    return self.__has_value
  def get_value(self):
    return self.__value
  def set_value(self, value):
    self.__value = value
    self.__has_value = True
  def is_local_opt(self):
    return self.__is_local_opt
  def is_valid(self):
    return None != self.__short_name and None != self.__long_name and True == self.__has_value and None != self.__value


g_opts =\
[\
Option('I', 'help', False, True),\
Option('V', 'version', False, True),\
Option('h', 'host', True, False),\
Option('P', 'port', True, False),\
Option('u', 'user', True, False),\
Option('t', 'timeout', True, False, 0),\
Option('p', 'password', True, False, ''),\
# 要跑哪个模块,默认全跑
Option('m', 'module', True, False, 'all'),\
# 日志文件路径,不同脚本的main函数中中会改成不同的默认值
Option('l', 'log-file', True, False)
]\


def change_opt_defult_value(opt_long_name, opt_default_val):
  global g_opts
  for opt in g_opts:
    if opt.get_long_name() == opt_long_name:
      opt.set_value(opt_default_val)
      return


def has_no_local_opts():
  global g_opts
  no_local_opts = True
  for opt in g_opts:
    if opt.is_local_opt() and opt.has_value():
      no_local_opts = False
  return no_local_opts


def check_db_client_opts():
  global g_opts
  for opt in g_opts:
    if not opt.is_local_opt() and not opt.has_value():
      raise MyError('option "-{0}" has not been specified, maybe you should run "{1} --help" for help'\
          .format(opt.get_short_name(), sys.argv[0]))


def parse_option(opt_name, opt_val):
  global g_opts
  for opt in g_opts:
    if opt_name in (('-' + opt.get_short_name()), ('--' + opt.get_long_name())):
      opt.set_value(opt_val)


def parse_options(argv):
  global g_opts
  short_opt_str = ''
  long_opt_list = []
  for opt in g_opts:
    if opt.is_with_param():
      short_opt_str += opt.get_short_name() + ':'
    else:
      short_opt_str += opt.get_short_name()
  for opt in g_opts:
    if opt.is_with_param():
      long_opt_list.append(opt.get_long_name() + '=')
    else:
      long_opt_list.append(opt.get_long_name())
  (opts, args) = getopt.getopt(argv, short_opt_str, long_opt_list)
  for (opt_name, opt_val) in opts:
    parse_option(opt_name, opt_val)
  if has_no_local_opts():
    check_db_client_opts()


def deal_with_local_opt(opt):
  if 'help' == opt.get_long_name():
    global help_str
    print help_str
  elif 'version' == opt.get_long_name():
    global version_str
    print version_str


def deal_with_local_opts():
  global g_opts
  if has_no_local_opts():
    raise MyError('no local options, can not deal with local options')
  else:
    for opt in g_opts:
      if opt.is_local_opt() and opt.has_value():
        deal_with_local_opt(opt)
        # 只处理一个
        return


def get_opt_host():
  global g_opts
  for opt in g_opts:
    if 'host' == opt.get_long_name():
      return opt.get_value()


def get_opt_port():
  global g_opts
  for opt in g_opts:
    if 'port' == opt.get_long_name():
      return opt.get_value()


def get_opt_user():
  global g_opts
  for opt in g_opts:
    if 'user' == opt.get_long_name():
      return opt.get_value()


def get_opt_password():
  global g_opts
  for opt in g_opts:
    if 'password' == opt.get_long_name():
      return opt.get_value()


def get_opt_timeout():
  global g_opts
  for opt in g_opts:
    if 'timeout' == opt.get_long_name():
      return opt.get_value()


def get_opt_module():
  global g_opts
  for opt in g_opts:
    if 'module' == opt.get_long_name():
      return opt.get_value()


def get_opt_log_file():
  global g_opts
  for opt in g_opts:
    if 'log-file' == opt.get_long_name():
      return opt.get_value()
#### ---------------end----------------------


#### --------------start :  do_upgrade_pre.py--------------
def config_logging_module(log_filenamme):
  logging.basicConfig(level=logging.INFO,\
      format='[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s',\
      datefmt='%Y-%m-%d %H:%M:%S',\
      filename=log_filenamme,\
      filemode='w')
  # 定义日志打印格式
  formatter = logging.Formatter('[%(asctime)s] %(levelname)s %(filename)s:%(lineno)d %(message)s', '%Y-%m-%d %H:%M:%S')
  #######################################
  # 定义一个Handler打印INFO及以上级别的日志到sys.stdout
  stdout_handler = logging.StreamHandler(sys.stdout)
  stdout_handler.setLevel(logging.INFO)
  # 设置日志打印格式
  stdout_handler.setFormatter(formatter)
  # 将定义好的stdout_handler日志handler添加到root logger
  logging.getLogger('').addHandler(stdout_handler)
#### ---------------end----------------------




fail_list=[]


def get_version(version_str):
  versions = version_str.split(".")


  if len(versions) != 4:
    logging.exception("""version:{0} is invalid""".format(version_str))
    raise e


  major = int(versions[0])
  minor = int(versions[1])
  major_patch = int(versions[2])
  minor_patch = int(versions[3])


  if major > 0xffffffff or minor > 0xffff or major_patch > 0xff or minor_patch > 0xff:
    logging.exception("""version:{0} is invalid""".format(version_str))
    raise e


  version = (major << 32) | (minor << 16) | (major_patch << 8) | (minor_patch)
  return version


#### START ####
# 1. 检查前置版本
def check_observer_version(query_cur, upgrade_params):
  (desc, results) = query_cur.exec_query("""select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'""")
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif cmp(results[0][0], upgrade_params.old_version) < 0 :
    fail_list.append('old observer version is expected equal or higher then: {0}, actual version:{1}'.format(upgrade_params.old_version, results[0][0]))
  logging.info('check observer version success, version = {0}'.format(results[0][0]))


def check_data_version(query_cur, input_min_cluster_version):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    input_min_cluster_version[0] = min_cluster_version


    # check data version
    if min_cluster_version < get_version("4.1.0.0"):
      # last barrier cluster version should be 4.1.0.0
      fail_list.append('last barrier cluster version is 4.1.0.0. prohibit cluster upgrade from cluster version less than 4.1.0.0')
    else:
      data_version_str = ''
      data_version = 0
      # check compatible is same
      sql = """select distinct value from oceanbase.__all_virtual_tenant_parameter_info where name='compatible'"""
      (desc, results) = query_cur.exec_query(sql)
      if len(results) != 1:
        fail_list.append('compatible is not sync')
      elif len(results[0]) != 1:
        fail_list.append('column cnt not match')
      else:
        data_version_str = results[0][0]
        data_version = get_version(results[0][0])


        if data_version < get_version("4.1.0.0"):
          # last barrier data version should be 4.1.0.0
          fail_list.append('last barrier data version is 4.1.0.0. prohibit cluster upgrade from data version less than 4.1.0.0')
        else:
          # check target_data_version/current_data_version
          sql = "select count(*) from oceanbase.__all_tenant"
          (desc, results) = query_cur.exec_query(sql)
          if len(results) != 1 or len(results[0]) != 1:
            fail_list.append('result cnt not match')
          else:
            tenant_count = results[0][0]


            sql = "select count(*) from __all_virtual_core_table where column_name in ('target_data_version', 'current_data_version') and column_value = {0}".format(data_version)
            (desc, results) = query_cur.exec_query(sql)
            if len(results) != 1 or len(results[0]) != 1:
              fail_list.append('result cnt not match')
            elif 2 * tenant_count != results[0][0]:
              fail_list.append('target_data_version/current_data_version not match with {0}, tenant_cnt:{1}, result_cnt:{2}'.format(data_version_str, tenant_count, results[0][0]))
            else:
              logging.info("check data version success, all tenant's compatible/target_data_version/current_data_version is {0}".format(data_version_str))


# 2. 检查paxos副本是否同步, paxos副本是否缺失
def check_paxos_replica(query_cur):
  # 2.1 检查paxos副本是否同步
  (desc, results) = query_cur.exec_query("""select count(1) as unsync_cnt from GV$OB_LOG_STAT where in_sync = 'NO'""")
  if results[0][0] > 0 :
    fail_list.append('{0} replicas unsync, please check'.format(results[0][0]))
  # 2.2 检查paxos副本是否有缺失 TODO
  logging.info('check paxos replica success')


# 3. 检查是否有做balance, locality变更
def check_rebalance_task(query_cur):
  # 3.1 检查是否有做locality变更
  (desc, results) = query_cur.exec_query("""select count(1) as cnt from DBA_OB_TENANT_JOBS where job_status='INPROGRESS' and result_code is null""")
  if results[0][0] > 0 :
    fail_list.append('{0} locality tasks is doing, please check'.format(results[0][0]))
  # 3.2 检查是否有做balance
  (desc, results) = query_cur.exec_query("""select count(1) as rebalance_task_cnt from CDB_OB_LS_REPLICA_TASKS""")
  if results[0][0] > 0 :
    fail_list.append('{0} rebalance tasks is doing, please check'.format(results[0][0]))
  logging.info('check rebalance task success')


# 4. 检查集群状态
def check_cluster_status(query_cur, min_cluster_version):
  # 4.1 检查是否非合并状态
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_MAJOR_COMPACTION where (GLOBAL_BROADCAST_SCN > LAST_SCN or STATUS != 'IDLE')""")
  if results[0][0] > 0 :
    fail_list.append('{0} tenant is merging, please check'.format(results[0][0]))
  if len(min_cluster_version) > 0 and min_cluster_version[0] < get_version("4.2.0.0"):
    (desc, results) = query_cur.exec_query("""select /*+ query_timeout(1000000000) */ count(1) from __all_virtual_tablet_compaction_info where max_received_scn > finished_scn and max_received_scn > 0""")
    if results[0][0] > 0 :
      fail_list.append('{0} tablet is merging, please check'.format(results[0][0]))
    logging.info('check cluster tablet major status success, cluster_version={0}'.format(min_cluster_version))
  else:
    logging.info('skip check cluster tablet major status, cluster_version={0}'.format(min_cluster_version))


# 5. 检查是否有异常租户(creating,延迟删除,恢复中)
def check_tenant_status(query_cur):


  # check tenant schema
  (desc, results) = query_cur.exec_query("""select count(*) as count from DBA_OB_TENANTS where status != 'NORMAL'""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('results len not match')
  elif 0 != results[0][0]:
    fail_list.append('has abnormal tenant, should stop')
  else:
    logging.info('check tenant status success')


  # check tenant info
  # don't support restore tenant upgrade
  (desc, results) = query_cur.exec_query("""select count(*) as count from oceanbase.__all_virtual_tenant_info where tenant_role != 'PRIMARY' and tenant_role != 'STANDBY'""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('results len not match')
  elif 0 != results[0][0]:
    fail_list.append('has abnormal tenant info, should stop')
  else:
    logging.info('check tenant info success')


# 6. 检查无恢复任务
def check_restore_job_exist(query_cur):
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_RESTORE_PROGRESS""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('failed to restore job cnt')
  elif results[0][0] != 0:
      fail_list.append("""still has restore job, upgrade is not allowed temporarily""")
  logging.info('check restore job success')


def check_is_primary_zone_distributed(primary_zone_str):
  semicolon_pos = len(primary_zone_str)
  for i in range(len(primary_zone_str)):
    if primary_zone_str[i] == ';':
      semicolon_pos = i
      break
  comma_pos = len(primary_zone_str)
  for j in range(len(primary_zone_str)):
    if primary_zone_str[j] == ',':
      comma_pos = j
      break
  if comma_pos < semicolon_pos:
    return True
  else:
    return False


# 7. 升级前需要primary zone只有一个
def check_tenant_primary_zone(query_cur):
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select tenant_name,primary_zone from DBA_OB_TENANTS where  tenant_id != 1""");
      for item in results:
        if cmp(item[1], "RANDOM") == 0:
          fail_list.append('{0} tenant primary zone random before update not allowed'.format(item[0]))
        elif check_is_primary_zone_distributed(item[1]):
          fail_list.append('{0} tenant primary zone distributed before update not allowed'.format(item[0]))
      logging.info('check tenant primary zone success')


# 8. 修改永久下线的时间,避免升级过程中缺副本
def modify_server_permanent_offline_time(cur):
  set_parameter(cur, 'server_permanent_offline_time', '72h')


# 9. 检查是否有DDL任务在执行
def check_ddl_task_execute(query_cur):
  (desc, results) = query_cur.exec_query("""select count(1) from __all_virtual_ddl_task_status""")
  if 0 != results[0][0]:
    fail_list.append("There are DDL task in progress")
  logging.info('check ddl task execut status success')


# 10. 检查无备份任务
def check_backup_job_exist(query_cur):
  # Backup jobs cannot be in-progress during upgrade.
  (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_JOBS""")
  if len(results) != 1 or len(results[0]) != 1:
    fail_list.append('failed to backup job cnt')
  elif results[0][0] != 0:
    fail_list.append("""still has backup job, upgrade is not allowed temporarily""")
  else:
    logging.info('check backup job success')


# 11. 检查无归档任务
def check_archive_job_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])


    # Archive jobs cannot be in-progress before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVELOG where status!='STOP'""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to archive job cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has archive job, upgrade is not allowed temporarily""")
      else:
        logging.info('check archive job success')


# 12. 检查归档路径是否清空
def check_archive_dest_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    # archive dest need to be cleaned before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_ARCHIVE_DEST""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to archive dest cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has archive destination, upgrade is not allowed temporarily""")
      else:
        logging.info('check archive destination success')


# 13. 检查备份路径是否清空
def check_backup_dest_exist(query_cur):
  min_cluster_version = 0
  sql = """select distinct value from GV$OB_PARAMETERS  where name='min_observer_version'"""
  (desc, results) = query_cur.exec_query(sql)
  if len(results) != 1:
    fail_list.append('min_observer_version is not sync')
  elif len(results[0]) != 1:
    fail_list.append('column cnt not match')
  else:
    min_cluster_version = get_version(results[0][0])
    # backup dest need to be cleaned before upgrade from 4.0.
    if min_cluster_version < get_version("4.1.0.0"):
      (desc, results) = query_cur.exec_query("""select count(1) from CDB_OB_BACKUP_PARAMETER where name='data_backup_dest' and (value!=NULL or value!='')""")
      if len(results) != 1 or len(results[0]) != 1:
        fail_list.append('failed to data backup dest cnt')
      elif results[0][0] != 0:
        fail_list.append("""still has backup destination, upgrade is not allowed temporarily""")
      else:
        logging.info('check backup destination success')


def check_server_version(query_cur):
    sql = """select distinct(substring_index(build_version, '_', 1)) from __all_server""";
    (desc, results) = query_cur.exec_query(sql);
    if len(results) != 1:
      fail_list.append("servers build_version not match")
    else:
      logging.info("check server version success")


# 14. 检查server是否可服务
def check_observer_status(query_cur):
  (desc, results) = query_cur.exec_query("""select count(*) from oceanbase.__all_server where (start_service_time <= 0 or status != "active")""")
  if results[0][0] > 0 :
    fail_list.append('{0} observer not available , please check'.format(results[0][0]))
  logging.info('check observer status success')


# 15  检查schema是否刷新成功
def check_schema_status(query_cur):
  (desc, results) = query_cur.exec_query("""select if (a.cnt = b.cnt, 1, 0) as passed from (select count(*) as cnt from oceanbase.__all_virtual_server_schema_info where refreshed_schema_version > 1 and refreshed_schema_version % 8 = 0) as a join (select count(*) as cnt from oceanbase.__all_server join oceanbase.__all_tenant) as b""")
  if results[0][0] != 1 :
    fail_list.append('{0} schema not available, please check'.format(results[0][0]))
  logging.info('check schema status success')


# 16. 检查是否存在名为all/all_user/all_meta的租户
def check_not_supported_tenant_name(query_cur):
  names = ["all", "all_user", "all_meta"]
  (desc, results) = query_cur.exec_query("""select tenant_name from oceanbase.DBA_OB_TENANTS""")
  for i in range(len(results)):
    if results[i][0].lower() in names:
      fail_list.append('a tenant named all/all_user/all_meta (case insensitive) cannot exist in the cluster, please rename the tenant')
      break
  logging.info('check special tenant name success')


# last check of do_check, make sure no function execute after check_fail_list
def check_fail_list():
  if len(fail_list) != 0 :
     error_msg ="upgrade checker failed with " + str(len(fail_list)) + " reasons: " + ", ".join(['['+x+"] " for x in fail_list])
     raise MyError(error_msg)


def set_query_timeout(query_cur, timeout):
  if timeout != 0:
    sql = """set @@session.ob_query_timeout = {0}""".format(timeout * 1000 * 1000)
    query_cur.exec_sql(sql)


# 开始升级前的检查
def do_check(my_host, my_port, my_user, my_passwd, timeout, upgrade_params):
  try:
    conn = mysql.connector.connect(user = my_user,
                                   password = my_passwd,
                                   host = my_host,
                                   port = my_port,
                                   database = 'oceanbase',
                                   raise_on_warnings = True)
    conn.autocommit = True
    cur = conn.cursor(buffered=True)
    min_cluster_version = [0]
    try:
      query_cur = Cursor(cur)
      set_query_timeout(query_cur, timeout)
      check_observer_version(query_cur, upgrade_params)
      check_data_version(query_cur, min_cluster_version)
      check_paxos_replica(query_cur)
      check_rebalance_task(query_cur)
      check_cluster_status(query_cur, min_cluster_version)
      check_tenant_status(query_cur)
      check_restore_job_exist(query_cur)
      check_tenant_primary_zone(query_cur)
      check_ddl_task_execute(query_cur)
      check_backup_job_exist(query_cur)
      check_archive_job_exist(query_cur)
      check_archive_dest_exist(query_cur)
      check_backup_dest_exist(query_cur)
      check_observer_status(query_cur)
      check_schema_status(query_cur)
      check_server_version(query_cur)
      check_not_supported_tenant_name(query_cur)
      # all check func should execute before check_fail_list
      check_fail_list()
      modify_server_permanent_offline_time(cur)
    except Exception, e:
      logging.exception('run error')
      raise e
    finally:
      cur.close()
      conn.close()
  except mysql.connector.Error, e:
    logging.exception('connection error')
    raise e
  except Exception, e:
    logging.exception('normal error')
    raise e


if __name__ == '__main__':
  upgrade_params = UpgradeParams()
  change_opt_defult_value('log-file', upgrade_params.log_filename)
  parse_options(sys.argv[1:])
  if not has_no_local_opts():
    deal_with_local_opts()
  else:
    check_db_client_opts()
    log_filename = get_opt_log_file()
    upgrade_params.log_filename = log_filename
    # 日志配置放在这里是为了前面的操作不要覆盖掉日志文件
    config_logging_module(upgrade_params.log_filename)
    try:
      host = get_opt_host()
      port = int(get_opt_port())
      user = get_opt_user()
      password = get_opt_password()
      timeout = int(get_opt_timeout())
      logging.info('parameters from cmd: host=\"%s\", port=%s, user=\"%s\", password=\"%s\", timeout=\"%s\", log-file=\"%s\"',\
          host, port, user, password, timeout, log_filename)
      do_check(host, port, user, password, timeout, upgrade_params)
    except mysql.connector.Error, e:
      logging.exception('mysql connctor error')
      raise e
    except Exception, e:
      logging.exception('normal error')
      raise e
[root@sdw1 etc]#



ZONE升级过程

由于采用的是滚动升级的方法。升级任务为了保障升级接断业务的稳定性,先进行一个ZONE升级,当这个ZONE升级完成后,再进行另外一个ZONE升级。即使一个ZONE正在进行升级或者升级失败,其它ZONE依然可以提供服务,保持业务的可持续性

每个ZONE升级任务阶段均是以下过程:

Set zone context (开始) -> Check zone(结束)

ZONE升级任务

ZONE升级前会关闭该ZONE的运行状态,并且停掉observer的进程,之后下载 升级目标版本的依赖(Install dependencies) 以及 OB安装包(Install OB rpm)

依赖和安装包下载完成后,开启observer进程(Wait observer version refreshed) 以及 observer版本的刷新(Wait observer version refreshed)

observer启动成功之后,会检查 clog的状态(Check clog stat)observer版本更新的情况(Check observer refresh schema) 和 集群的检查(Execute upgrade health checker script),之后开启ZONE

注意:当前zone开启成功后代表该zone已经升级成功,之后会重复以上步骤升级剩余的zone,所以升级任务的多少取决于zone以及租户的数量

查看业务情况

初始化数据完成,证明滚动升级不会对业务运行产生影响

总结

1.集群升级前会进行相应的集群检查工作,确保当前OB是否满足升级条件

2.升级任务量的多少取决于zone的多少

3.升级过程中会停止zone 与 observer ,该操作不会进行回滚,所以之前的检查任务如果出错必须查看原因,不可以进行跳过

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2241529.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

三周精通FastAPI:42 手动运行服务器 - Uvicorn Gunicorn with Uvicorn

官方文档&#xff1a;Server Workers - Gunicorn with Uvicorn - FastAPI 使用 fastapi 运行命令 可以直接使用fastapi run命令来启动FastAPI应用&#xff1a; fastapi run main.py如执行 fastapi run openapi.py启动后显示&#xff1a; INFO Using path openapi.py …

springboot的社区团购系统设计录像

springboot的社区团购系统设计录像 springboot的社区团购系统设计

C++清除所有输出【DEV-C++】所有编辑器通用 | 算法基础NO.1

各位小伙伴们&#xff0c;上一期的保留小数位数教学够用一辈子&#xff0c;有不错的点赞量&#xff0c;可我连一个粉丝铁粉都没有&#xff0c;你愿意做我的第一个铁粉吗&#xff1f;OK废话不多说&#xff0c;开始&#xff01; 温故与知心 可能你也学过&#xff0c;且是工作者…

【Pytorch】Python random 模块

Python random 模块主要用于生成随机数&#xff0c;是常用的一个包&#xff0c;random 模块实现了各种分布的伪随机数生成器。在训练传统机器学习模型或者深度神经网络模型的过程中经常会用到。要使用 random 函数必须先导入&#xff1a; import random1. random() 使用random(…

Android OpenGLES2.0开发(八):Camera预览

严以律己&#xff0c;宽以待人 引言 终于到该章节了&#xff0c;还记得Android OpenGLES2.0开发&#xff08;一&#xff09;&#xff1a;艰难的开始章节说的吗&#xff1f;写这个系列的初衷就是因为每次用到GLSurfaceViewCamera预览时&#xff0c;总是CtrlC、CtrlV从来没有研究…

JMX Exporter源码解读+生产环境最佳实践+解决其抓取指标超时问题

文章目录 背景第一版配置-查询所有MBean第二版配置-配置白名单第三版配置-增加Cache第四版配置-修改jmx_exorter源码禁用默认jvm导出第五版配置-基于第四版excludeObjectNameAttributes第六版配置-修改jmx_exorter源码includeObjectNameAttributes配置基于release-1.0.1分支修改…

前端(3)——快速入门JaveScript

参考&#xff1a; 罗大富 JavaScript 教程 | 菜鸟教程 JavaScript 教程 1. JaveScript JavaScript 简称 JS JavaScript 是一种轻量级、解释型、面向对象的脚本语言。它主要被设计用于在网页上实现动态效果&#xff0c;增加用户与网页的交互性。作为一种客户端脚本语言&#…

人工智能:塑造未来的工作与生活

目录 人工智能技术的应用前景与影响 人工智能的历史与现状 人工智能的应用领域 人工智能的前景与挑战 个人视角&#xff1a;人工智能的应用前景与未来 人工智能在生活中的潜力 面对人工智能带来的挑战 我的观点与建议 结语 人工智能技术的应用前景与影响 随着人工智能…

东土国产自主智能控制器,亮相第七届长三角科技成果交易博览会

近日&#xff0c;第七届长三角科技成果交易博览会&#xff08;以下简称“长三角科交会”&#xff09;在上海汽车会展中心开幕。为展示嘉定新城产业集聚成果&#xff0c;宣传新城核心区投资环境&#xff0c;新城公司连续第六届参加长三角科交会。 在此次展会上&#xff0c;新城…

AUTOSAR_EXP_ARAComAPI的7章笔记(4)

☞返回总目录 相关总结&#xff1a;本地 / 网络多绑定用例总结 7.3.2 本地/网络多绑定用例 在前一节中&#xff0c;我们看到了的一种多绑定特殊变体&#xff0c;现在来看&#xff0c;也可认为是一种真实情况的变体。 假设有一个与上一章节相似的情景&#xff0c;唯一的区别…

ubuntu将firewall-config导出为.deb文件

firewall-config ubuntu是canonial 公司维护的&#xff0c;用wireshark测过&#xff0c;开机会给他们公司发遥测&#xff08;开了ufw阻塞所有连接也一样&#xff0c;canonial在里面把代码改了&#xff09;firewall-config是fedora(爱好者维护&#xff0c;公益版本)自带的防火墙…

LabVIEW中坐标排序与旋转 参见附件snippet程序

LabVIEW中坐标排序与旋转 参见附件snippet程序LabVIEW中坐标排序与旋转 参见附件snippet程序 - 北京瀚文网星科技有限公司 在LabVIEW中处理坐标排序的过程&#xff0c;尤其是按顺时针或逆时针排列坐标点&#xff0c;常见的应用包括处理几何形状、路径规划等任务。下面我将为您…

基于微信小程序的校园超市购物系统设计与实现,LW+源码+讲解

摘 要 现代经济快节奏发展以及不断完善升级的信息化技术&#xff0c;让传统数据信息的管理升级为软件存储&#xff0c;归纳&#xff0c;集中处理数据信息的管理方式。本超市购物系统就是在这样的大环境下诞生&#xff0c;其可以帮助管理者在短时间内处理完毕庞大的数据信息&a…

如何使用EasyExcel生成多列表组合填充的复杂Excel示例

作者&#xff1a;Funky_oaNiu 一、&#xff08;需求&#xff09;生成的表格效果&#xff1a;二、搞一个模板文件三、建立对应的表格实体类四、开始填充五、Vue3前端发起请求下载六、官方文档及AI问答 一、&#xff08;需求&#xff09;生成的表格效果&#xff1a; 其中只有顶部…

AdaBoost 二分类问题

代码功能 生成数据集&#xff1a; 使用 make_classification 创建一个模拟分类问题的数据集。 数据集包含 10 个特征&#xff0c;其中 5 个是有用特征&#xff0c;2 个是冗余特征。 数据集划分&#xff1a; 将数据分为训练集&#xff08;70%&#xff09;和测试集&#xff08;3…

权限相关知识

1.Linux权限的概念 在说Linux权限的概念之前我来问大家一个问题&#xff0c;你们觉得什么是权限&#xff1f; 权限平时的体现呢&#xff0c;就比如不是校长的亲戚就不能逛办公室&#xff0c;没充会员的爱奇艺看不了VIP影视剧&#xff0c;没成会员的的蛋糕店拿不到会员价等等等…

Python爬虫项目 | 一、网易云音乐热歌榜歌曲

文章目录 1.文章概要1.1 实现方法1.2 实现代码1.3 最终效果 2.具体讲解2.1 使用的Python库2.2 代码说明2.2.1 创建目录保存文件2.2.2 爬取网易云音乐热歌榜单歌曲 2.3 过程展示 3 总结 1.文章概要 学习Python爬虫知识&#xff0c;实现简单的一个小案例&#xff0c;网易云音乐热…

苍穹外卖-后端部分

软件开发整体介绍 前端搭建 在非中文目录中双击nginx.exe然后浏览器访问localhost即可 后端搭建 基础准备 导入初始文件 使用git进行版本控制 创建本地仓库和远程仓库,提交Git 连接数据库 连接数据库把资料中的文件放入运行即可 前后端联调测试 苍穹外卖项目接口文档…

3D电子商务是什么?如何利用3D技术提升销售转化?

在数字化浪潮席卷全球的今天&#xff0c;网上购物已成为消费者日常生活中不可或缺的一部分。然而&#xff0c;尽管其便捷性无可比拟&#xff0c;但传统电商模式中的“看不见、摸不着”问题始终困扰着消费者与商家。商品是否符合期望、尺寸是否合适、颜色是否真实……这些不确定…

EXCEL延迟退休公式

如图&#xff1a; A B为手工输入 C2EOMONTH(A2,B2*12) D2EOMONTH(C2,IF(C2>DATEVALUE("2025-1-1"),INT((DATEDIF(DATEVALUE("2025-1-1"),C2,"m")4)/4),0)) E2EOMONTH(A2,B2*12IF(EOMONTH(A2,B2*12)>DATEVALUE("2025-1-1"),INT(…