mysql8 用C++源码角度看客户端发起sql网络请求,并处理sql命令

news2025/2/13 2:54:32

MySQL 8 的 C++ 源码中,处理网络请求和 SQL 命令的流程涉及多个函数和类。以下是关键的函数和类,以及它们的作用:

1. do_command 函数

do_command 函数是 MySQL 服务器中处理客户端命令的核心函数。它从客户端读取一个命令并执行。这个函数在 sql/sql_parse.cc 文件中定义。

函数原型

cpp复制

bool do_command(THD *thd);
主要步骤
  1. 初始化

    • 清除可能的错误状态。

    • 设置线程的当前查询块为空。

    • 清除错误消息和诊断区域。

  2. 读取命令

    • 设置网络读取超时。

    • 调用 thd->get_protocol()->get_command 从客户端读取命令。

  3. 处理命令

    • 如果读取命令失败,处理错误并返回。

    • 否则,调用 dispatch_command 处理具体的命令。

  4. 清理

    • 无论成功还是失败,清理线程的资源和状态。

2. dispatch_command 函数

dispatch_command 函数根据客户端发送的命令类型(如 COM_QUERYCOM_STMT_PREPARE 等)调用相应的处理函数。这个函数也在 sql/sql_parse.cc 文件中定义。

函数原型

cpp复制

bool dispatch_command(THD *thd, COM_DATA *com_data, enum enum_server_command command);
主要步骤
  1. 检查命令类型

    • 根据命令类型调用相应的处理函数。例如:

      • COM_QUERY:调用 mysql_parse 解析和执行 SQL 查询。

      • COM_STMT_PREPARE:调用 mysql_parse 解析和准备预处理语句。

      • COM_QUIT:处理客户端断开连接。

  2. 执行命令

    • 调用相应的处理函数执行命令。

  3. 返回结果

    • 返回命令执行的结果。

3. handle_connection 函数

handle_connection 函数是处理新连接的线程函数。它在 sql/sql_connect.cc 文件中定义。这个函数负责初始化新连接的线程,并调用 do_command 处理客户端命令。

函数原型

cpp复制

extern "C" void *handle_connection(void *arg);
主要步骤
  1. 初始化线程

    • 初始化线程的资源和状态。

    • 设置线程的 TLS(线程局部存储)。

  2. 处理连接

    • 调用 do_command 处理客户端命令。

    • 如果客户端断开连接,清理线程的资源。

  3. 清理线程

    • 释放线程的资源。

    • 退出线程。

4. Connection_handler_manager 类

Connection_handler_manager 类负责管理连接处理程序。它在 sql/sql_connect.cc 文件中定义。这个类的主要职责包括:

  • 接受新的连接。

  • 创建新的线程来处理连接。

  • 管理连接的生命周期。

主要方法
  • process_new_connection:处理新的连接。

  • add_connection:将新的连接添加到连接处理程序中。

##

/**
  Read one command from connection and execute it (query or simple command).
  This function is called in loop from thread function.

  For profiling to work, it must never be called recursively.

  @retval
    0  success
  @retval
    1  request of thread shutdown (see dispatch_command() description)
*/

bool do_command(THD *thd) {
  bool return_value;
  int rc;
  NET *net = nullptr;
  enum enum_server_command command = COM_SLEEP;
  COM_DATA com_data;
  DBUG_TRACE;
  assert(thd->is_classic_protocol());

  /*
    indicator of uninitialized lex => normal flow of errors handling
    (see my_message_sql)
  */
  thd->lex->set_current_query_block(nullptr);

  /*
    XXX: this code is here only to clear possible errors of init_connect.
    Consider moving to prepare_new_connection_state() instead.
    That requires making sure the DA is cleared before non-parsing statements
    such as COM_QUIT.
  */
  thd->clear_error();  // Clear error message
  thd->get_stmt_da()->reset_diagnostics_area();

  /*
    This thread will do a blocking read from the client which
    will be interrupted when the next command is received from
    the client, the connection is closed or "net_wait_timeout"
    number of seconds has passed.
  */
  net = thd->get_protocol_classic()->get_net();
  my_net_set_read_timeout(net, thd->variables.net_wait_timeout);
  net_new_transaction(net);

  /*
    WL#15369 : to make connections threads sleep to test if
    ER_THREAD_STILL_ALIVE, and ER_NUM_THREADS_STILL_ALIVE are
    being logged in the intended way, i.e. when connection threads
    are still alive, even after forcefully disconnecting them in
    close_connections().
 */
  DBUG_EXECUTE_IF("simulate_connection_thread_hang", sleep(15););

  /*
    Synchronization point for testing of KILL_CONNECTION.
    This sync point can wait here, to simulate slow code execution
    between the last test of thd->killed and blocking in read().

    The goal of this test is to verify that a connection does not
    hang, if it is killed at this point of execution.
    (Bug#37780 - main.kill fails randomly)

    Note that the sync point wait itself will be terminated by a
    kill. In this case it consumes a condition broadcast, but does
    not change anything else. The consumed broadcast should not
    matter here, because the read/recv() below doesn't use it.
  */
  DEBUG_SYNC(thd, "before_do_command_net_read");

  /* For per-query performance counters with log_slow_statement */
  struct System_status_var query_start_status;
  thd->clear_copy_status_var();
  if (opt_log_slow_extra) {
    thd->copy_status_var(&query_start_status);
  }

  rc = thd->m_mem_cnt.reset();
  if (rc)
    thd->m_mem_cnt.set_thd_error_status();
  else {
    /*
      Because of networking layer callbacks in place,
      this call will maintain the following instrumentation:
      - IDLE events
      - SOCKET events
      - STATEMENT events
      - STAGE events
      when reading a new network packet.
      In particular, a new instrumented statement is started.
      See init_net_server_extension()
    */
    thd->m_server_idle = true;
    rc = thd->get_protocol()->get_command(&com_data, &command);
    thd->m_server_idle = false;
  }

  if (rc) {
#ifndef NDEBUG
    char desc[VIO_DESCRIPTION_SIZE];
    vio_description(net->vio, desc);
    DBUG_PRINT("info", ("Got error %d reading command from socket %s",
                        net->error, desc));
#endif  // NDEBUG

    MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);

    /* Instrument this broken statement as "statement/com/error" */
    thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
        thd->m_statement_psi, com_statement_info[COM_END].m_key);

    /* Check if we can continue without closing the connection */

    /* The error must be set. */
    assert(thd->is_error());
    thd->send_statement_status();

    /* Mark the statement completed. */
    MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
    thd->m_statement_psi = nullptr;
    thd->m_digest = nullptr;

    if (rc < 0) {
      return_value = true;  // We have to close it.
      goto out;
    }
    net->error = NET_ERROR_UNSET;
    return_value = false;
    goto out;
  }

#ifndef NDEBUG
  char desc[VIO_DESCRIPTION_SIZE];
  vio_description(net->vio, desc);
  DBUG_PRINT("info", ("Command on %s = %d (%s)", desc, command,
                      Command_names::str_notranslate(command).c_str()));
  expected_from_debug_flag = TDM::ANY;
  DBUG_EXECUTE_IF("tdon", { expected_from_debug_flag = TDM::ON; });
  DBUG_EXECUTE_IF("tdzero", { expected_from_debug_flag = TDM::ZERO; });
  DBUG_EXECUTE_IF("tdna", { expected_from_debug_flag = TDM::NOT_AVAILABLE; });
#endif  // NDEBUG
  DBUG_PRINT("info", ("packet: '%*.s'; command: %d",
                      (int)thd->get_protocol_classic()->get_packet_length(),
                      thd->get_protocol_classic()->get_raw_packet(), command));
  if (thd->get_protocol_classic()->bad_packet)
    assert(0);  // Should be caught earlier

  // Reclaim some memory
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);
  /* Restore read timeout value */
  my_net_set_read_timeout(net, thd->variables.net_read_timeout);

  DEBUG_SYNC(thd, "before_command_dispatch");

  return_value = dispatch_command(thd, &com_data, command);
  thd->get_protocol_classic()->get_output_packet()->shrink(
      thd->variables.net_buffer_length);

out:
  /* The statement instrumentation must be closed in all cases. */
  assert(thd->m_digest == nullptr);
  assert(thd->m_statement_psi == nullptr);
  return return_value;
}

##

/**
  Perform one connection-level (COM_XXXX) command.

  @param thd             connection handle
  @param command         type of command to perform
  @param com_data        com_data union to store the generated command

  @todo
    set thd->lex->sql_command to SQLCOM_END here.
  @todo
    The following has to be changed to an 8 byte integer

  @retval
    0   ok
  @retval
    1   request of thread shutdown, i. e. if command is
        COM_QUIT
*/
bool dispatch_command(THD *thd, const COM_DATA *com_data,
                      enum enum_server_command command) {
  assert(thd->lex->m_IS_table_stats.is_valid() == false);
  assert(thd->lex->m_IS_tablespace_stats.is_valid() == false);
#ifndef NDEBUG
  auto tabstat_grd = create_scope_guard([&]() {
    assert(thd->lex->m_IS_table_stats.is_valid() == false);
    assert(thd->lex->m_IS_tablespace_stats.is_valid() == false);
  });
#endif /* NDEBUG */
  bool error = false;
  Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
  DBUG_TRACE;
  DBUG_PRINT("info", ("command: %d", command));

  Sql_cmd_clone *clone_cmd = nullptr;

  /* SHOW PROFILE instrumentation, begin */
#if defined(ENABLED_PROFILING)
  thd->profiling->start_new_query();
#endif

  /* Performance Schema Interface instrumentation, begin */
  thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
      thd->m_statement_psi, com_statement_info[command].m_key);

  thd->set_command(command);
  /*
    Commands which always take a long time are logged into
    the slow log only if opt_log_slow_admin_statements is set.
  */
  thd->enable_slow_log = true;
  thd->lex->sql_command = SQLCOM_END; /* to avoid confusing VIEW detectors */
  /*
    KILL QUERY may come after cleanup in mysql_execute_command(). Next query
    execution is interrupted due to this. So resetting THD::killed here.

    THD::killed value can not be KILL_TIMEOUT here as timer used for statement
    max execution time is disarmed in the cleanup stage of
    mysql_execute_command. KILL CONNECTION should terminate the connection.
    Hence resetting THD::killed only for KILL QUERY case here.
  */
  if (thd->killed == THD::KILL_QUERY) thd->killed = THD::NOT_KILLED;
  thd->set_time();
  if (is_time_t_valid_for_timestamp(thd->query_start_in_secs()) == false) {
    /*
      If the time has gone past end of epoch we need to shutdown the server. But
      there is possibility of getting invalid time value on some platforms.
      For example, gettimeofday() might return incorrect value on solaris
      platform. Hence validating the current time with 5 iterations before
      initiating the normal server shutdown process because of time getting
      past 2038.
    */
    const int max_tries = 5;
    LogErr(WARNING_LEVEL, ER_CONFIRMING_THE_FUTURE, max_tries);

    int tries = 0;
    while (++tries <= max_tries) {
      thd->set_time();
      if (is_time_t_valid_for_timestamp(thd->query_start_in_secs()) == true) {
        LogErr(WARNING_LEVEL, ER_BACK_IN_TIME, tries);
        break;
      }
      LogErr(WARNING_LEVEL, ER_FUTURE_DATE, tries);
    }
    if (tries > max_tries) {
      /*
        If the time has got past epoch, we need to shut this server down.
        We do this by making sure every command is a shutdown and we
        have enough privileges to shut the server down

        TODO: remove this when we have full 64 bit my_time_t support
      */
      LogErr(ERROR_LEVEL, ER_UNSUPPORTED_DATE);
      const ulong master_access = thd->security_context()->master_access();
      thd->security_context()->set_master_access(master_access | SHUTDOWN_ACL);
      error = true;
      kill_mysql();
    }
  }
  thd->set_query_id(next_query_id());
  thd->reset_rewritten_query();
  thd_manager->inc_thread_running();

  if (!(server_command_flags[command] & CF_SKIP_QUESTIONS))
    thd->status_var.questions++;

  /**
    Clear the set of flags that are expected to be cleared at the
    beginning of each command.
  */
  thd->server_status &= ~SERVER_STATUS_CLEAR_SET;

  if (thd->get_protocol()->type() == Protocol::PROTOCOL_PLUGIN &&
      !(server_command_flags[command] & CF_ALLOW_PROTOCOL_PLUGIN)) {
    MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
    my_error(ER_PLUGGABLE_PROTOCOL_COMMAND_NOT_SUPPORTED, MYF(0));
    thd->killed = THD::KILL_CONNECTION;
    error = true;
    goto done;
  }

  /**
    Enforce password expiration for all RPC commands, except the
    following:

    COM_QUERY/COM_STMT_PREPARE and COM_STMT_EXECUTE do a more
    fine-grained check later.
    COM_STMT_CLOSE and COM_STMT_SEND_LONG_DATA don't return anything.
    COM_PING only discloses information that the server is running,
       and that's available through other means.
    COM_QUIT should work even for expired statements.
  */
  if (unlikely(thd->security_context()->password_expired() &&
               command != COM_QUERY && command != COM_STMT_CLOSE &&
               command != COM_STMT_SEND_LONG_DATA && command != COM_PING &&
               command != COM_QUIT && command != COM_STMT_PREPARE &&
               command != COM_STMT_EXECUTE)) {
    MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
    my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
    goto done;
  }

  if (mysql_event_tracking_command_notify(
          thd, AUDIT_EVENT(EVENT_TRACKING_COMMAND_START), command,
          Command_names::str_global(command).c_str())) {
    MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
    goto done;
  }

  /*
    For COM_QUERY,
    wait until query attributes are extracted.
  */
  if (command != COM_QUERY) {
    MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
  }

  switch (command) {
    case COM_INIT_DB: {
      LEX_STRING tmp;
      thd->status_var.com_stat[SQLCOM_CHANGE_DB]++;
      thd->convert_string(&tmp, system_charset_info,
                          com_data->com_init_db.db_name,
                          com_data->com_init_db.length, thd->charset());

      const LEX_CSTRING tmp_cstr = {tmp.str, tmp.length};
      if (!mysql_change_db(thd, tmp_cstr, false)) {
        query_logger.general_log_write(thd, command, thd->db().str,
                                       thd->db().length);
        my_ok(thd);
      }
      break;
    }
    case COM_REGISTER_SLAVE: {
      // TODO: access of protocol_classic should be removed
      if (!register_replica(thd, thd->get_protocol_classic()->get_raw_packet(),
                            thd->get_protocol_classic()->get_packet_length()))
        my_ok(thd);
      break;
    }
    case COM_RESET_CONNECTION: {
      thd->status_var.com_other++;
      thd->cleanup_connection();
      my_ok(thd);
      break;
    }
    case COM_CLONE: {
      thd->status_var.com_other++;

      /* Try loading clone plugin */
      clone_cmd = new (thd->mem_root) Sql_cmd_clone();

      if (clone_cmd && clone_cmd->load(thd)) {
        clone_cmd = nullptr;
      }

      thd->lex->m_sql_cmd = clone_cmd;
      thd->lex->sql_command = SQLCOM_CLONE;

      break;
    }
    case COM_SUBSCRIBE_GROUP_REPLICATION_STREAM: {
      Security_context *sctx = thd->security_context();
      if (!sctx->has_global_grant(STRING_WITH_LEN("GROUP_REPLICATION_STREAM"))
               .first) {
        my_error(ER_SPECIFIC_ACCESS_DENIED_ERROR, MYF(0), "");
        error = true;
        break;
      }

      if (!error && call_gr_incoming_connection_cb(
                        thd, thd->active_vio->mysql_socket.fd,
                        thd->active_vio->ssl_arg
                            ? static_cast<SSL *>(thd->active_vio->ssl_arg)
                            : nullptr)) {
        my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
        error = true;
        break;
      }

      my_ok(thd);

      break;
    }
    case COM_CHANGE_USER: {
      int auth_rc;
      thd->status_var.com_other++;

      thd->cleanup_connection();
      USER_CONN *save_user_connect =
          const_cast<USER_CONN *>(thd->get_user_connect());
      LEX_CSTRING save_db = thd->db();

      /*
        LOCK_thd_security_ctx protects the THD's security-context from
        inspection by SHOW PROCESSLIST while we're updating it. However,
        there is no need to protect this context while we're reading it,
        sinceother threads are not supposed to modify it.
        Nested acquiring of LOCK_thd_data is fine (see below).
      */
      const Security_context save_security_ctx(*(thd->security_context()));

      MUTEX_LOCK(grd_secctx, &thd->LOCK_thd_security_ctx);

      auth_rc = acl_authenticate(thd, COM_CHANGE_USER);
      auth_rc |= mysql_event_tracking_connection_notify(
          thd, AUDIT_EVENT(EVENT_TRACKING_CONNECTION_CHANGE_USER));
      if (auth_rc) {
        *thd->security_context() = save_security_ctx;
        thd->set_user_connect(save_user_connect);
        thd->reset_db(save_db);

        my_error(ER_ACCESS_DENIED_CHANGE_USER_ERROR, MYF(0),
                 thd->security_context()->user().str,
                 thd->security_context()->host_or_ip().str,
                 (thd->password ? ER_THD(thd, ER_YES) : ER_THD(thd, ER_NO)));
        thd->killed = THD::KILL_CONNECTION;
        error = true;
      } else {
#ifdef HAVE_PSI_THREAD_INTERFACE
        /* we've authenticated new user */
        PSI_THREAD_CALL(notify_session_change_user)(thd->get_psi());
#endif /* HAVE_PSI_THREAD_INTERFACE */

        if (save_user_connect) decrease_user_connections(save_user_connect);
        mysql_mutex_lock(&thd->LOCK_thd_data);
        my_free(const_cast<char *>(save_db.str));
        save_db = NULL_CSTR;
        mysql_mutex_unlock(&thd->LOCK_thd_data);
      }
      break;
    }
    case COM_STMT_EXECUTE: {
      /* Clear possible warnings from the previous command */
      thd->reset_for_next_command();

      Prepared_statement *stmt = nullptr;
      if (!mysql_stmt_precheck(thd, com_data, command, &stmt)) {
        PS_PARAM *parameters = com_data->com_stmt_execute.parameters;
        copy_bind_parameter_values(thd, parameters,
                                   com_data->com_stmt_execute.parameter_count);

        mysqld_stmt_execute(thd, stmt, com_data->com_stmt_execute.has_new_types,
                            com_data->com_stmt_execute.open_cursor, parameters);
        thd->bind_parameter_values = nullptr;
        thd->bind_parameter_values_count = 0;
      }
      break;
    }
    case COM_STMT_FETCH: {
      /* Clear possible warnings from the previous command */
      thd->reset_for_next_command();

      Prepared_statement *stmt = nullptr;
      if (!mysql_stmt_precheck(thd, com_data, command, &stmt))
        mysqld_stmt_fetch(thd, stmt, com_data->com_stmt_fetch.num_rows);

      break;
    }
    case COM_STMT_SEND_LONG_DATA: {
      Prepared_statement *stmt;
      thd->get_stmt_da()->disable_status();

      if (!mysql_stmt_precheck(thd, com_data, command, &stmt))
        mysql_stmt_get_longdata(thd, stmt,
                                com_data->com_stmt_send_long_data.param_number,
                                com_data->com_stmt_send_long_data.longdata,
                                com_data->com_stmt_send_long_data.length);
      break;
    }
    case COM_STMT_PREPARE: {
      /* Clear possible warnings from the previous command */
      thd->reset_for_next_command();
      Prepared_statement *stmt = nullptr;

      DBUG_EXECUTE_IF("parser_stmt_to_error_log", {
        LogErr(INFORMATION_LEVEL, ER_PARSER_TRACE,
               com_data->com_stmt_prepare.query);
      });
      DBUG_EXECUTE_IF("parser_stmt_to_error_log_with_system_prio", {
        LogErr(SYSTEM_LEVEL, ER_PARSER_TRACE, com_data->com_stmt_prepare.query);
      });

      if (!mysql_stmt_precheck(thd, com_data, command, &stmt))
        mysqld_stmt_prepare(thd, com_data->com_stmt_prepare.query,
                            com_data->com_stmt_prepare.length, stmt);
      break;
    }
    case COM_STMT_CLOSE: {
      Prepared_statement *stmt = nullptr;
      thd->get_stmt_da()->disable_status();

      if (!mysql_stmt_precheck(thd, com_data, command, &stmt))
        mysqld_stmt_close(thd, stmt);
      break;
    }
    case COM_STMT_RESET: {
      /* Clear possible warnings from the previous command */
      thd->reset_for_next_command();

      Prepared_statement *stmt = nullptr;
      if (!mysql_stmt_precheck(thd, com_data, command, &stmt))
        mysqld_stmt_reset(thd, stmt);
      break;
    }
    case COM_QUERY: {
      /*
        IMPORTANT NOTE:

        Every execution path for COM_QUERY should call once
          MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES()
      */
      assert(thd->m_digest == nullptr);
      thd->m_digest = &thd->m_digest_state;
      thd->m_digest->reset(thd->m_token_array, max_digest_length);

      if (alloc_query(thd, com_data->com_query.query,
                      com_data->com_query.length)) {
        MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
        break;  // fatal error is set
      }

      const char *packet_end = thd->query().str + thd->query().length;

      if (opt_general_log_raw)
        query_logger.general_log_write(thd, command, thd->query().str,
                                       thd->query().length);

      DBUG_PRINT("query", ("%-.4096s", thd->query().str));

#if defined(ENABLED_PROFILING)
      thd->profiling->set_query_source(thd->query().str, thd->query().length);
#endif

      const LEX_CSTRING orig_query = thd->query();

      Parser_state parser_state;
      if (parser_state.init(thd, thd->query().str, thd->query().length)) {
        MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi, false);
        break;
      }

      parser_state.m_input.m_has_digest = true;

      // we produce digest if it's not explicitly turned off
      // by setting maximum digest length to zero
      if (get_max_digest_length() != 0)
        parser_state.m_input.m_compute_digest = true;

      // Initially, prepare and optimize the statement for the primary
      // storage engine. If an eligible secondary storage engine is
      // found, the statement may be reprepared for the secondary
      // storage engine later.
      const auto saved_secondary_engine = thd->secondary_engine_optimization();
      thd->set_secondary_engine_optimization(
          Secondary_engine_optimization::PRIMARY_TENTATIVELY);

      copy_bind_parameter_values(thd, com_data->com_query.parameters,
                                 com_data->com_query.parameter_count);

      /* This will call MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES() */
      dispatch_sql_command(thd, &parser_state);

      // Check if the statement failed and needs to be restarted in
      // another storage engine.
      check_secondary_engine_statement(thd, &parser_state, orig_query.str,
                                       orig_query.length);

      thd->set_secondary_engine_optimization(saved_secondary_engine);

      DBUG_EXECUTE_IF("parser_stmt_to_error_log", {
        LogErr(INFORMATION_LEVEL, ER_PARSER_TRACE, thd->query().str);
      });
      DBUG_EXECUTE_IF("parser_stmt_to_error_log_with_system_prio", {
        LogErr(SYSTEM_LEVEL, ER_PARSER_TRACE, thd->query().str);
      });

      while (!thd->killed && (parser_state.m_lip.found_semicolon != nullptr) &&
             !thd->is_error()) {
        /*
          Multiple queries exits, execute them individually
        */
        const char *beginning_of_next_stmt = parser_state.m_lip.found_semicolon;

        /* Finalize server status flags after executing a statement. */
        thd->update_slow_query_status();
        thd->send_statement_status();

        const std::string &cn = Command_names::str_global(command);
        mysql_event_tracking_general_notify(
            thd, AUDIT_EVENT(EVENT_TRACKING_GENERAL_STATUS),
            thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->mysql_errno()
                                           : 0,
            cn.c_str(), cn.length());

        size_t length =
            static_cast<size_t>(packet_end - beginning_of_next_stmt);

        log_slow_statement(thd);

        thd->reset_copy_status_var();

        /* Remove garbage at start of query */
        while (length > 0 &&
               my_isspace(thd->charset(), *beginning_of_next_stmt)) {
          beginning_of_next_stmt++;
          length--;
        }

        /* PSI end */
        MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
        thd->m_statement_psi = nullptr;
        thd->m_digest = nullptr;

/* SHOW PROFILE end */
#if defined(ENABLED_PROFILING)
        thd->profiling->finish_current_query();
#endif

/* SHOW PROFILE begin */
#if defined(ENABLED_PROFILING)
        thd->profiling->start_new_query("continuing");
        thd->profiling->set_query_source(beginning_of_next_stmt, length);
#endif

        mysql_thread_set_secondary_engine(false);

        /* PSI begin */
        thd->m_digest = &thd->m_digest_state;
        thd->m_digest->reset(thd->m_token_array, max_digest_length);

        thd->m_statement_psi = MYSQL_START_STATEMENT(
            &thd->m_statement_state, com_statement_info[command].m_key,
            thd->db().str, thd->db().length, thd->charset(), nullptr);
        THD_STAGE_INFO(thd, stage_starting);

        thd->set_query(beginning_of_next_stmt, length);
        thd->set_query_id(next_query_id());

        /*
          Count each statement from the client.
        */
        thd->status_var.questions++;
        thd->set_time(); /* Reset the query start time. */
        parser_state.reset(beginning_of_next_stmt, length);
        thd->set_secondary_engine_optimization(
            Secondary_engine_optimization::PRIMARY_TENTATIVELY);
        /* TODO: set thd->lex->sql_command to SQLCOM_END here */
        dispatch_sql_command(thd, &parser_state);

        check_secondary_engine_statement(thd, &parser_state,
                                         beginning_of_next_stmt, length);

        thd->set_secondary_engine_optimization(saved_secondary_engine);
      }

      thd->bind_parameter_values = nullptr;
      thd->bind_parameter_values_count = 0;

      /* Need to set error to true for graceful shutdown */
      if ((thd->lex->sql_command == SQLCOM_SHUTDOWN) &&
          (thd->get_stmt_da()->is_ok()))
        error = true;

      DBUG_PRINT("info", ("query ready"));
      break;
    }
    case COM_FIELD_LIST:  // This isn't actually needed
    {
      char *fields;
      /* Locked closure of all tables */
      LEX_STRING table_name;
      LEX_STRING db;
      push_deprecated_warn(thd, "COM_FIELD_LIST",
                           "SHOW COLUMNS FROM statement");
      /*
        SHOW statements should not add the used tables to the list of tables
        used in a transaction.
      */
      MDL_savepoint mdl_savepoint = thd->mdl_context.mdl_savepoint();

      thd->status_var.com_stat[SQLCOM_SHOW_FIELDS]++;
      if (thd->copy_db_to(&db.str, &db.length)) break;
      thd->convert_string(&table_name, system_charset_info,
                          (char *)com_data->com_field_list.table_name,
                          com_data->com_field_list.table_name_length,
                          thd->charset());
      const Ident_name_check ident_check_status =
          check_table_name(table_name.str, table_name.length);
      if (ident_check_status == Ident_name_check::WRONG) {
        /* this is OK due to convert_string() null-terminating the string */
        my_error(ER_WRONG_TABLE_NAME, MYF(0), table_name.str);
        break;
      } else if (ident_check_status == Ident_name_check::TOO_LONG) {
        my_error(ER_TOO_LONG_IDENT, MYF(0), table_name.str);
        break;
      }
      mysql_reset_thd_for_next_command(thd);
      lex_start(thd);
      /* Must be before we init the table list. */
      if (lower_case_table_names && !is_infoschema_db(db.str, db.length))
        table_name.length = my_casedn_str(files_charset_info, table_name.str);
      Table_ref table_list(db.str, db.length, table_name.str, table_name.length,
                           table_name.str, TL_READ);
      /*
        Init Table_ref members necessary when the undelrying
        table is view.
      */
      table_list.query_block = thd->lex->query_block;
      thd->lex->query_block->m_table_list.link_in_list(&table_list,
                                                       &table_list.next_local);
      thd->lex->add_to_query_tables(&table_list);

      if (is_infoschema_db(table_list.db, table_list.db_length)) {
        ST_SCHEMA_TABLE *schema_table =
            find_schema_table(thd, table_list.alias);
        if (schema_table) table_list.schema_table = schema_table;
      }

      if (!(fields =
                (char *)thd->memdup(com_data->com_field_list.query,
                                    com_data->com_field_list.query_length)))
        break;
      // Don't count end \0
      thd->set_query(fields, com_data->com_field_list.query_length - 1);
      query_logger.general_log_print(thd, command, "%s %s",
                                     table_list.table_name, fields);

      if (open_temporary_tables(thd, &table_list)) break;

      if (check_table_access(thd, SELECT_ACL, &table_list, true, UINT_MAX,
                             false))
        break;

      thd->lex->sql_command = SQLCOM_SHOW_FIELDS;
      // See comment in opt_trace_disable_if_no_security_context_access()
      const Opt_trace_start ots(thd, &table_list, thd->lex->sql_command,
                                nullptr, nullptr, 0, nullptr, nullptr);

      mysqld_list_fields(thd, &table_list, fields);

      thd->lex->cleanup(true);
      /* No need to rollback statement transaction, it's not started. */
      assert(thd->get_transaction()->is_empty(Transaction_ctx::STMT));
      close_thread_tables(thd);
      thd->mdl_context.rollback_to_savepoint(mdl_savepoint);

      if (thd->transaction_rollback_request) {
        /*
          Transaction rollback was requested since MDL deadlock was
          discovered while trying to open tables. Rollback transaction
          in all storage engines including binary log and release all
          locks.
        */
        trans_rollback_implicit(thd);
        thd->mdl_context.release_transactional_locks();
      }

      thd->cleanup_after_query();
      thd->lex->destroy();
      break;
    }
    case COM_QUIT:
      /* Prevent results of the form, "n>0 rows sent, 0 bytes sent" */
      thd->set_sent_row_count(0);
      /* We don't calculate statistics for this command */
      query_logger.general_log_print(thd, command, NullS);
      // Don't give 'abort' message
      // TODO: access of protocol_classic should be removed
      if (thd->is_classic_protocol())
        thd->get_protocol_classic()->get_net()->error = NET_ERROR_UNSET;
      thd->get_stmt_da()->disable_status();  // Don't send anything back
      error = true;                          // End server
      break;
    case COM_BINLOG_DUMP_GTID:
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump_gtid(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
    case COM_BINLOG_DUMP:
      // TODO: access of protocol_classic should be removed
      error = com_binlog_dump(
          thd, (char *)thd->get_protocol_classic()->get_raw_packet(),
          thd->get_protocol_classic()->get_packet_length());
      break;
    case COM_REFRESH: {
      int not_used;
      push_deprecated_warn(thd, "COM_REFRESH", "FLUSH statement");
      /*
        Initialize thd->lex since it's used in many base functions, such as
        open_tables(). Otherwise, it remains uninitialized and may cause crash
        during execution of COM_REFRESH.
      */
      lex_start(thd);

      thd->status_var.com_stat[SQLCOM_FLUSH]++;
      const ulong options = (ulong)com_data->com_refresh.options;
      if (trans_commit_implicit(thd)) break;
      thd->mdl_context.release_transactional_locks();
      if (check_global_access(thd, RELOAD_ACL)) break;
      query_logger.general_log_print(thd, command, NullS);
#ifndef NDEBUG
      bool debug_simulate = false;
      DBUG_EXECUTE_IF("simulate_detached_thread_refresh",
                      debug_simulate = true;);
      if (debug_simulate) {
        /*
          Simulate a reload without a attached thread session.
          Provides a environment similar to that of when the
          server receives a SIGHUP signal and reloads caches
          and flushes tables.
        */
        bool res;
        current_thd = nullptr;
        res = handle_reload_request(nullptr, options | REFRESH_FAST, nullptr,
                                    &not_used);
        current_thd = thd;
        if (res) break;
      } else
#endif
          if (handle_reload_request(thd, options, (Table_ref *)nullptr,
                                    &not_used))
        break;
      if (trans_commit_implicit(thd)) break;
      close_thread_tables(thd);
      thd->mdl_context.release_transactional_locks();
      thd->lex->destroy();
      my_ok(thd);
      break;
    }
    case COM_STATISTICS: {
      System_status_var current_global_status_var;
      ulong uptime;
      size_t length [[maybe_unused]];
      ulonglong queries_per_second1000;
      char buff[250];
      const size_t buff_len = sizeof(buff);

      query_logger.general_log_print(thd, command, NullS);
      thd->status_var.com_stat[SQLCOM_SHOW_STATUS]++;
      mysql_mutex_lock(&LOCK_status);
      calc_sum_of_all_status(&current_global_status_var);
      mysql_mutex_unlock(&LOCK_status);
      if (!(uptime = (ulong)(thd->query_start_in_secs() - server_start_time)))
        queries_per_second1000 = 0;
      else
        queries_per_second1000 = thd->query_id * 1000LL / uptime;

      length = snprintf(buff, buff_len - 1,
                        "Uptime: %lu  Threads: %d  Questions: %lu  "
                        "Slow queries: %llu  Opens: %llu  Flush tables: %lu  "
                        "Open tables: %u  Queries per second avg: %u.%03u",
                        uptime, (int)thd_manager->get_thd_count(),
                        (ulong)thd->query_id,
                        current_global_status_var.long_query_count,
                        current_global_status_var.opened_tables,
                        refresh_version, table_cache_manager.cached_tables(),
                        (uint)(queries_per_second1000 / 1000),
                        (uint)(queries_per_second1000 % 1000));
      // TODO: access of protocol_classic should be removed.
      // should be rewritten using store functions
      if (thd->get_protocol_classic()->write(pointer_cast<const uchar *>(buff),
                                             length))
        break;
      if (thd->get_protocol()->flush()) break;
      thd->get_stmt_da()->disable_status();
      break;
    }
    case COM_PING:
      thd->status_var.com_other++;
      my_ok(thd);  // Tell client we are alive
      break;
    case COM_PROCESS_INFO:
      bool global_access;
      LEX_CSTRING db_saved;
      thd->status_var.com_stat[SQLCOM_SHOW_PROCESSLIST]++;
      push_deprecated_warn(thd, "COM_PROCESS_INFO",
                           "SHOW PROCESSLIST statement");
      global_access = (check_global_access(thd, PROCESS_ACL) == 0);
      if (!thd->security_context()->priv_user().str[0] && !global_access) break;
      query_logger.general_log_print(thd, command, NullS);
      db_saved = thd->db();

      DBUG_EXECUTE_IF("force_db_name_to_null", thd->reset_db(NULL_CSTR););

      mysqld_list_processes(
          thd, global_access ? NullS : thd->security_context()->priv_user().str,
          false, false);

      DBUG_EXECUTE_IF("force_db_name_to_null", thd->reset_db(db_saved););
      break;
    case COM_PROCESS_KILL: {
      push_deprecated_warn(thd, "COM_PROCESS_KILL",
                           "KILL CONNECTION/QUERY statement");
      if (thd_manager->get_thread_id() & (~0xfffffffful))
        my_error(ER_DATA_OUT_OF_RANGE, MYF(0), "thread_id", "mysql_kill()");
      else {
        thd->status_var.com_stat[SQLCOM_KILL]++;
        sql_kill(thd, com_data->com_kill.id, false);
      }
      break;
    }
    case COM_SET_OPTION: {
      thd->status_var.com_stat[SQLCOM_SET_OPTION]++;

      switch (com_data->com_set_option.opt_command) {
        case (int)MYSQL_OPTION_MULTI_STATEMENTS_ON:
          // TODO: access of protocol_classic should be removed
          thd->get_protocol_classic()->add_client_capability(
              CLIENT_MULTI_STATEMENTS);
          my_eof(thd);
          break;
        case (int)MYSQL_OPTION_MULTI_STATEMENTS_OFF:
          thd->get_protocol_classic()->remove_client_capability(
              CLIENT_MULTI_STATEMENTS);
          my_eof(thd);
          break;
        default:
          my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
          break;
      }
      break;
    }
    case COM_DEBUG:
      thd->status_var.com_other++;
      if (check_global_access(thd, SUPER_ACL)) break; /* purecov: inspected */
      query_logger.general_log_print(thd, command, NullS);
      my_eof(thd);
#ifdef WITH_LOCK_ORDER
      LO_dump();
#endif /* WITH_LOCK_ORDER */
      break;
    case COM_SLEEP:
    case COM_CONNECT:         // Impossible here
    case COM_TIME:            // Impossible from client
    case COM_DELAYED_INSERT:  // INSERT DELAYED has been removed.
    case COM_END:
    default:
      my_error(ER_UNKNOWN_COM_ERROR, MYF(0));
      break;
  }

done:
  assert(thd->open_tables == nullptr ||
         (thd->locked_tables_mode == LTM_LOCK_TABLES));

  /* Finalize server status flags after executing a command. */
  thd->update_slow_query_status();
  if (thd->killed) thd->send_kill_message();
  thd->send_statement_status();

  /* After sending response, switch to clone protocol */
  if (clone_cmd != nullptr) {
    assert(command == COM_CLONE);
    error = clone_cmd->execute_server(thd);
  }

  if (command == COM_SUBSCRIBE_GROUP_REPLICATION_STREAM && !error) {
    wait_for_gr_connection_end(thd);
  }

  thd->rpl_thd_ctx.session_gtids_ctx().notify_after_response_packet(thd);

  if (!thd->is_error() && !thd->killed)
    mysql_event_tracking_general_notify(
        thd, AUDIT_EVENT(EVENT_TRACKING_GENERAL_RESULT), 0, nullptr, 0);

  const std::string &cn = Command_names::str_global(command);
  mysql_event_tracking_general_notify(
      thd, AUDIT_EVENT(EVENT_TRACKING_GENERAL_STATUS),
      thd->get_stmt_da()->is_error() ? thd->get_stmt_da()->mysql_errno() : 0,
      cn.c_str(), cn.length());

  /* command_end is informational only. The plugin cannot abort
     execution of the command at this point. */
  mysql_event_tracking_command_notify(
      thd, AUDIT_EVENT(EVENT_TRACKING_COMMAND_END), command, cn.c_str());

  log_slow_statement(thd);

  THD_STAGE_INFO(thd, stage_cleaning_up);

  thd->reset_query();
  thd->set_command(COM_SLEEP);
  thd->set_proc_info(nullptr);
  thd->lex->sql_command = SQLCOM_END;

  /* Performance Schema Interface instrumentation, end */
  MYSQL_END_STATEMENT(thd->m_statement_psi, thd->get_stmt_da());
  thd->m_statement_psi = nullptr;
  thd->m_digest = nullptr;
  thd->reset_query_for_display();

  /* Prevent rewritten query from getting "stuck" in SHOW PROCESSLIST. */
  thd->reset_rewritten_query();

  thd_manager->dec_thread_running();

  /* Freeing the memroot will leave the THD::work_part_info invalid. */
  thd->work_part_info = nullptr;

  /*
    If we've allocated a lot of memory (compared to the default preallocation
    size = 8192; note that we don't actually preallocate anymore), free
    it so that one big query won't cause us to hold on to a lot of RAM forever.
    If not, keep the last block so that the next query will hopefully be able to
    run without allocating memory from the OS.

    The factor 5 is pretty much arbitrary, but ends up allowing three
    allocations (1 + 1.5 + 1.5²) under the current allocation policy.
  */
  constexpr size_t kPreallocSz = 40960;
  if (thd->mem_root->allocated_size() < kPreallocSz)
    thd->mem_root->ClearForReuse();
  else
    thd->mem_root->Clear();

    /* SHOW PROFILE instrumentation, end */
#if defined(ENABLED_PROFILING)
  thd->profiling->finish_current_query();
#endif

  return error;
}

##

/**
  Parse an SQL command from a text string and pass the resulting AST to the
  query executor.

  @param thd          Current session.
  @param parser_state Parser state.
*/

void dispatch_sql_command(THD *thd, Parser_state *parser_state) {
  DBUG_TRACE;
  DBUG_PRINT("dispatch_sql_command", ("query: '%s'", thd->query().str));
  statement_id_to_session(thd);
  DBUG_EXECUTE_IF("parser_debug", turn_parser_debug_on(););

  mysql_reset_thd_for_next_command(thd);
  // It is possible that rewritten query may not be empty (in case of
  // multiqueries). So reset it.
  thd->reset_rewritten_query();
  lex_start(thd);

  thd->m_parser_state = parser_state;
  invoke_pre_parse_rewrite_plugins(thd);
  thd->m_parser_state = nullptr;

  // we produce digest if it's not explicitly turned off
  // by setting maximum digest length to zero
  if (get_max_digest_length() != 0)
    parser_state->m_input.m_compute_digest = true;

  LEX *lex = thd->lex;
  const char *found_semicolon = nullptr;

  bool err = thd->get_stmt_da()->is_error();
  size_t qlen = 0;

  if (!err) {
    err = parse_sql(thd, parser_state, nullptr);
    if (!err) err = invoke_post_parse_rewrite_plugins(thd, false);

    found_semicolon = parser_state->m_lip.found_semicolon;
    qlen = found_semicolon ? (found_semicolon - thd->query().str)
                           : thd->query().length;
    /*
      We set thd->query_length correctly to not log several queries, when we
      execute only first. We set it to not see the ';' otherwise it would get
      into binlog and Query_log_event::print() would give ';;' output.
    */

    if (!thd->is_error() && found_semicolon && (ulong)(qlen)) {
      thd->set_query(thd->query().str, qlen - 1);
    }
  }

  DEBUG_SYNC_C("sql_parse_before_rewrite");

  if (!err) {
    /*
      Rewrite the query for logging and for the Performance Schema
      statement tables. (Raw logging happened earlier.)

      Sub-routines of mysql_rewrite_query() should try to only rewrite when
      necessary (e.g. not do password obfuscation when query contains no
      password).

      If rewriting does not happen here, thd->m_rewritten_query is still
      empty from being reset in alloc_query().
    */
    if (thd->rewritten_query().length() == 0) mysql_rewrite_query(thd);

    if (thd->rewritten_query().length()) {
      lex->safe_to_cache_query = false;  // see comments below

      thd->set_query_for_display(thd->rewritten_query().ptr(),
                                 thd->rewritten_query().length());
    } else if (thd->slave_thread) {
      /*
        In the slave, we add the information to pfs.events_statements_history,
        but not to pfs.threads, as that is what the test suite expects.
      */
      MYSQL_SET_STATEMENT_TEXT(thd->m_statement_psi, thd->query().str,
                               thd->query().length);
    } else {
      thd->set_query_for_display(thd->query().str, thd->query().length);
    }

    if (!(opt_general_log_raw || thd->slave_thread)) {
      if (thd->rewritten_query().length())
        query_logger.general_log_write(thd, COM_QUERY,
                                       thd->rewritten_query().ptr(),
                                       thd->rewritten_query().length());
      else {
        query_logger.general_log_write(thd, COM_QUERY, thd->query().str, qlen);
      }
    }
  }

  const bool with_query_attributes = (thd->bind_parameter_values_count > 0);
  MYSQL_NOTIFY_STATEMENT_QUERY_ATTRIBUTES(thd->m_statement_psi,
                                          with_query_attributes);

  DEBUG_SYNC_C("sql_parse_after_rewrite");

  if (!err) {
    thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
        thd->m_statement_psi, sql_statement_info[thd->lex->sql_command].m_key);

    if (mqh_used && thd->get_user_connect() &&
        check_mqh(thd, lex->sql_command)) {
      if (thd->is_classic_protocol())
        thd->get_protocol_classic()->get_net()->error = NET_ERROR_UNSET;
    } else {
      if (!thd->is_error()) {
        /* Actually execute the query */
        if (found_semicolon) {
          lex->safe_to_cache_query = false;
          thd->server_status |= SERVER_MORE_RESULTS_EXISTS;
        }
        lex->set_trg_event_type_for_tables();

        int error [[maybe_unused]];
        if (unlikely(
                (thd->security_context()->password_expired() ||
                 thd->security_context()->is_in_registration_sandbox_mode()) &&
                lex->sql_command != SQLCOM_SET_PASSWORD &&
                lex->sql_command != SQLCOM_ALTER_USER)) {
          if (thd->security_context()->is_in_registration_sandbox_mode())
            my_error(ER_PLUGIN_REQUIRES_REGISTRATION, MYF(0));
          else
            my_error(ER_MUST_CHANGE_PASSWORD, MYF(0));
          error = 1;
        } else {
          resourcegroups::Resource_group *src_res_grp = nullptr;
          resourcegroups::Resource_group *dest_res_grp = nullptr;
          MDL_ticket *ticket = nullptr;
          MDL_ticket *cur_ticket = nullptr;
          auto mgr_ptr = resourcegroups::Resource_group_mgr::instance();
          const bool switched = mgr_ptr->switch_resource_group_if_needed(
              thd, &src_res_grp, &dest_res_grp, &ticket, &cur_ticket);

          error = mysql_execute_command(thd, true);

          if (switched)
            mgr_ptr->restore_original_resource_group(thd, src_res_grp,
                                                     dest_res_grp);
          thd->resource_group_ctx()->m_switch_resource_group_str[0] = '\0';
          if (ticket != nullptr)
            mgr_ptr->release_shared_mdl_for_resource_group(thd, ticket);
          if (cur_ticket != nullptr)
            mgr_ptr->release_shared_mdl_for_resource_group(thd, cur_ticket);
        }
      }
    }
  } else {
    /*
      Log the failed raw query in the Performance Schema. This statement did
      not parse, so there is no way to tell if it may contain a password of not.

      The tradeoff is:
        a) If we do log the query, a user typing by accident a broken query
           containing a password will have the password exposed. This is very
           unlikely, and this behavior can be documented. Remediation is to
           use a new password when retyping the corrected query.

        b) If we do not log the query, finding broken queries in the client
           application will be much more difficult. This is much more likely.

      Considering that broken queries can typically be generated by attempts at
      SQL injection, finding the source of the SQL injection is critical, so the
      design choice is to log the query text of broken queries (a).
    */
    thd->set_query_for_display(thd->query().str, thd->query().length);

    /* Instrument this broken statement as "statement/sql/error" */
    thd->m_statement_psi = MYSQL_REFINE_STATEMENT(
        thd->m_statement_psi, sql_statement_info[SQLCOM_END].m_key);

    assert(thd->is_error());
    DBUG_PRINT("info",
               ("Command aborted. Fatal_error: %d", thd->is_fatal_error()));
  }

  THD_STAGE_INFO(thd, stage_freeing_items);
  sp_cache_enforce_limit(thd->sp_proc_cache, stored_program_cache_size);
  sp_cache_enforce_limit(thd->sp_func_cache, stored_program_cache_size);
  thd->lex->destroy();
  thd->end_statement();
  thd->cleanup_after_query();
  assert(thd->change_list.is_empty());

  DEBUG_SYNC(thd, "query_rewritten");
}

##

extern "C" {
static void *handle_connection(void *arg) {
  Global_THD_manager *thd_manager = Global_THD_manager::get_instance();
  Connection_handler_manager *handler_manager =
      Connection_handler_manager::get_instance();
  Channel_info *channel_info = static_cast<Channel_info *>(arg);
  bool pthread_reused [[maybe_unused]] = false;

  if (my_thread_init()) {
    connection_errors_internal++;
    channel_info->send_error_and_close_channel(ER_OUT_OF_RESOURCES, 0, false);
    handler_manager->inc_aborted_connects();
    Connection_handler_manager::dec_connection_count();
    delete channel_info;
    my_thread_exit(nullptr);
    return nullptr;
  }

  for (;;) {
    THD *thd = init_new_thd(channel_info);
    if (thd == nullptr) {
      connection_errors_internal++;
      handler_manager->inc_aborted_connects();
      Connection_handler_manager::dec_connection_count();
      break;  // We are out of resources, no sense in continuing.
    }

#ifdef HAVE_PSI_THREAD_INTERFACE
    if (pthread_reused) {
      /*
        Reusing existing pthread:
        Create new instrumentation for the new THD job,
        and attach it to this running pthread.
      */
      PSI_thread *psi = PSI_THREAD_CALL(new_thread)(key_thread_one_connection,
                                                    0 /* no sequence number */,
                                                    thd, thd->thread_id());
      PSI_THREAD_CALL(set_thread_os_id)(psi);
      PSI_THREAD_CALL(set_thread)(psi);
    }
#endif

#ifdef HAVE_PSI_THREAD_INTERFACE
    /* Find the instrumented thread */
    PSI_thread *psi = PSI_THREAD_CALL(get_thread)();
    /* Save it within THD, so it can be inspected */
    thd->set_psi(psi);
#endif /* HAVE_PSI_THREAD_INTERFACE */
    mysql_thread_set_psi_id(thd->thread_id());
    mysql_thread_set_psi_THD(thd);
    const MYSQL_SOCKET socket =
        thd->get_protocol_classic()->get_vio()->mysql_socket;
    mysql_socket_set_thread_owner(socket);
    thd_manager->add_thd(thd);

    if (thd_prepare_connection(thd))
      handler_manager->inc_aborted_connects();
    else {
      while (thd_connection_alive(thd)) {
        if (do_command(thd)) break;
      }
      end_connection(thd);
    }
    close_connection(thd, 0, false, false);

    thd->get_stmt_da()->reset_diagnostics_area();
    thd->release_resources();

    // Clean up errors now, before possibly waiting for a new connection.
#if OPENSSL_VERSION_NUMBER < 0x10100000L
    ERR_remove_thread_state(0);
#endif /* OPENSSL_VERSION_NUMBER < 0x10100000L */
    thd_manager->remove_thd(thd);
    Connection_handler_manager::dec_connection_count();

#ifdef HAVE_PSI_THREAD_INTERFACE
    /* Stop telemetry, while THD is still available. */
    if (psi != nullptr) {
      PSI_THREAD_CALL(abort_telemetry)(psi);
    }

    /* Decouple THD and the thread instrumentation. */
    thd->set_psi(nullptr);
    mysql_thread_set_psi_THD(nullptr);
#endif /* HAVE_PSI_THREAD_INTERFACE */

    delete thd;

#ifdef HAVE_PSI_THREAD_INTERFACE
    /* Delete the instrumentation for the job that just completed. */
    PSI_THREAD_CALL(delete_current_thread)();
#endif /* HAVE_PSI_THREAD_INTERFACE */

    // Server is shutting down so end the pthread.
    if (connection_events_loop_aborted()) break;

    channel_info = Per_thread_connection_handler::block_until_new_connection();
    if (channel_info == nullptr) break;
    pthread_reused = true;
    if (connection_events_loop_aborted()) {
      // Close the channel and exit as server is undergoing shutdown.
      channel_info->send_error_and_close_channel(ER_SERVER_SHUTDOWN, 0, false);
      delete channel_info;
      channel_info = nullptr;
      Connection_handler_manager::dec_connection_count();
      break;
    }
  }

  my_thread_end();
  my_thread_exit(nullptr);
  return nullptr;
}
}  // extern "C"

##

bool Per_thread_connection_handler::add_connection(Channel_info *channel_info) {
  int error = 0;
  my_thread_handle id;

  DBUG_TRACE;

  // Simulate thread creation for test case before we check thread cache
  DBUG_EXECUTE_IF("fail_thread_create", error = 1; goto handle_error;);

  if (!check_idle_thread_and_enqueue_connection(channel_info)) return false;

  /*
    There are no idle threads available to take up the new
    connection. Create a new thread to handle the connection
  */
  channel_info->set_prior_thr_create_utime();
  error =
      mysql_thread_create(key_thread_one_connection, &id, &connection_attrib,
                          handle_connection, (void *)channel_info);
#ifndef NDEBUG
handle_error:
#endif  // !NDEBUG

  if (error) {
    connection_errors_internal++;
    if (!create_thd_err_log_throttle.log())
      LogErr(ERROR_LEVEL, ER_CONN_PER_THREAD_NO_THREAD, error);
    channel_info->send_error_and_close_channel(ER_CANT_CREATE_THREAD, error,
                                               true);
    Connection_handler_manager::dec_connection_count();
    return true;
  }

  Global_THD_manager::get_instance()->inc_thread_created();
  DBUG_PRINT("info", ("Thread created"));
  return false;
}

##gdb调试栈,处理客户端连接

(gdb) bt
#0  my_thread_create (thread=0x7ffde2e84658, attr=0x57cc6d05fd80 <connection_attrib>, func=0x57cc69d4da88 <pfs_spawn_thread(void*)>, arg=0x57cca0dc1ea0)
    at /home/yym/mysql8/mysql-8.1.0/mysys/my_thread.cc:80
#1  0x000057cc69d4dd3a in pfs_spawn_thread_vc (key=5, seqnum=0, thread=0x7ffde2e84658, attr=0x57cc6d05fd80 <connection_attrib>,
    start_routine=0x57cc67e0e643 <handle_connection(void*)>, arg=0x57cca10e5ca0) at /home/yym/mysql8/mysql-8.1.0/storage/perfschema/pfs.cc:3089
#2  0x000057cc67e0e010 in inline_mysql_thread_create (key=5, sequence_number=0, thread=0x7ffde2e84658, attr=0x57cc6d05fd80 <connection_attrib>,
    start_routine=0x57cc67e0e643 <handle_connection(void*)>, arg=0x57cca10e5ca0) at /home/yym/mysql8/mysql-8.1.0/include/mysql/psi/mysql_thread.h:139
#3  0x000057cc67e0ecaf in Per_thread_connection_handler::add_connection (this=0x57cca0c26e10, channel_info=0x57cca10e5ca0)
    at /home/yym/mysql8/mysql-8.1.0/sql/conn_handler/connection_handler_per_thread.cc:420
#4  0x000057cc67f73289 in Connection_handler_manager::process_new_connection (this=0x57cca028d4d0, channel_info=0x57cca10e5ca0)
    at /home/yym/mysql8/mysql-8.1.0/sql/conn_handler/connection_handler_manager.cc:260
#5  0x000057cc679ceea4 in Connection_acceptor<Mysqld_socket_listener>::connection_event_loop (this=0x57cca0d1f290)
    at /home/yym/mysql8/mysql-8.1.0/sql/conn_handler/connection_acceptor.h:65
#6  0x000057cc679bf5a3 in mysqld_main (argc=22, argv=0x57cc9ea287b8) at /home/yym/mysql8/mysql-8.1.0/sql/mysqld.cc:8355
#7  0x000057cc679aa4cd in main (argc=9, argv=0x7ffde2e84d68) at /home/yym/mysql8/mysql-8.1.0/sql/main.cc:25




(gdb) bt
#0  handle_connection (arg=0x57cca10e5ca0) at /home/yym/mysql8/mysql-8.1.0/sql/conn_handler/connection_handler_per_thread.cc:252
#1  0x000057cc69d4dbdc in pfs_spawn_thread (arg=0x57cca0dc1ea0) at /home/yym/mysql8/mysql-8.1.0/storage/perfschema/pfs.cc:3043
#2  0x00007834cd694ac3 in start_thread (arg=<optimized out>) at ./nptl/pthread_create.c:442
#3  0x00007834cd726850 in clone3 () at ../sysdeps/unix/sysv/linux/x86_64/clone3.S:81

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/2297118.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

四、OSG学习笔记-基础图元

前一章节&#xff1a; 三、OSG学习笔记-应用基础-CSDN博客https://blog.csdn.net/weixin_36323170/article/details/145514021 代码&#xff1a;CuiQingCheng/OsgStudy - Gitee.com 一、绘制盒子模型 下面一个简单的 demo #include<windows.h> #include<osg/Node&…

window 安装GitLab服务器笔记

目录 视频&#xff1a; 资源&#xff1a; Linux CeneOS7&#xff1a; VMware&#xff1a; Linux无法安装 yum install vim -y 1.手动创建目录 2.下载repo PS 补充视频不可复制的代码 安装GitLab *修改root用户密码相关&#xff08;我卡在第一步就直接放弃了这个操作&…

MySQL数据库入门到大蛇尚硅谷宋红康老师笔记 基础篇 part 10

第10章_创建和管理表 DDL&#xff1a;数据定义语言。CREATE \ALTER\ DROP \RENAME TRUNCATE DML&#xff1a;数据操作语言。INSERT \DELETE \UPDATE \SELECT&#xff08;重中之重&#xff09; DCL&#xff1a;数据控制语言。COMMIT \…

前端如何判断浏览器 AdBlock/AdBlock Plus(最新版)广告屏蔽插件已开启拦截

2个月前AdBlock/AdBlock Plus疑似升级了一次 因为自己主要负责面对海外的用户项目&#xff0c;发现以前的检测AdBlock/AdBlock Plus开启状态方法已失效了&#xff0c;于是专门研究了一下。并尝试了很多方法。 已失效的老方法 // 定义一个检测 AdBlock 的函数 function chec…

html文件怎么转换成pdf文件,2025最新教程

将HTML文件转换成PDF文件&#xff0c;可以采取以下几种方法&#xff1a; 一、使用浏览器内置功能 打开HTML文件&#xff1a;在Chrome、Firefox、IE等浏览器中打开需要转换的HTML文件。打印对话框&#xff1a;按下CtrlP&#xff08;Windows&#xff09;或CommandP&#xff08;M…

科技查新过不了怎么办

“科技查新过不了怎么办&#xff1f;” “科技查新不通过的原因是什么&#xff1f;” 想必这些问题一直困扰着各位科研和学术的朋友们&#xff0c;尤其是对于查新经验不够多的小伙伴&#xff0c;在历经千难万险&#xff0c;从选择查新机构、填写线上委托单到付费&#xff0c;…

超详细的数据结构3(初阶C语言版)栈和队列。

文章目录 栈和队列1.栈1.1 概念与结构1.2 栈的实现 2. 队列2.1 概念与结构2.2 队列的实现 总结 栈和队列 1.栈 1.1 概念与结构 栈&#xff1a;⼀种特殊的线性表&#xff0c;其只允许在固定的⼀端进行插⼊和删除元素操作。进⾏数据插⼊和删除操作的⼀端称为栈顶&#xff0c;另…

centos 7 关于引用stdatomic.h的问题

问题&#xff1a;/tmp/tmp4usxmdso/main.c:6:23: fatal error: stdatomic.h: No such file or directory #include <stdatomic.h> 解决步骤&#xff1a; 1.这个错误是因为缺少C编译器的标准原子操作头文件 stdatomic.h。在Linux系统中&#xff0c;我们需要安装开发工具…

Unity WebGL包体压缩

最近在开发webgl&#xff0c;踩了很多坑&#xff0c;先来说下包体的问题。 开发完之后发现unity将文件都合并到一个文件了&#xff0c;一共有接近100m。 这对网页端的体验来说是可怕的&#xff0c;因为玩家必须要加载完所有的文件才能进入&#xff0c;这样体验特别差。 于是想…

【对比测评】 .NET 应用的 Web 视图控件:DotNetBrowser 或 EO.WebBrowser

您是否需要 .NET 应用的 Web 视图控件&#xff1f;.NET 生态系统提供了很多东西&#xff0c;有免费的 Web 视图控件&#xff0c;既有开源的&#xff0c;也有专有的。还有一些商业 Web 视图 控件&#xff0c;也是企业经常选择的一种选项。 在这篇博文中&#xff0c;我们比较了商…

Redis 数据类型 String 字符串

Redis 中的 String 数据类型 是最基础且使用最广泛的数据类型之一。它本质上是一个字节序列&#xff0c;可以存储各种类型的数据&#xff0c;如字符串、整数、浮点数等&#xff0c;其字符串类型的值包含⼀般格式的字符串或者类似 JSON、XML 格式的字符串&#xff1b;还可以存储…

查询语句来提取 detail 字段中包含 xxx 的 URL 里的 commodity/ 后面的数字串

您可以使用以下 SQL 查询语句来提取 detail 字段中包含 oss.kxlist.com 的 URL 里的 commodity/ 后面的数字串&#xff1a; <p><img style"max-width:100%;" src"https://oss.kxlist.com//8a989a0c55e4a7900155e7fd7971000b/commodity/20170925/20170…

业务开发 | 基础知识 | Maven 快速入门

Maven 快速入门 1.Maven 全面概述 Apache Maven 是一种软件项目管理和理解工具。基于项目对象模型的概念&#xff08;POM&#xff09;&#xff0c;Maven 可以从中央信息中管理项目的构建&#xff0c;报告和文档。 2.Maven 基本功能 因此实际上 Maven 的基本功能就是作为 Ja…

机器学习 - 词袋模型(Bag of Words)实现文本情感分类的详细示例

为了简单直观的理解模型训练&#xff0c;我这里搜集了两个简单的实现文本情感分类的例子&#xff0c;第一个例子基于朴素贝叶斯分类器&#xff0c;第二个例子基于逻辑回归&#xff0c;通过这两个例子&#xff0c;掌握词袋模型&#xff08;Bag of Words&#xff09;实现文本情感…

【Android开发】Android Studio汉化

前言 该插件是官方支持插件&#xff0c;未对任何软件进行修改和破解 Android Studio 是基于 IntelliJ IDEA 社区版开发的集成开发环境&#xff08;IDE&#xff09;&#xff0c;专门用于Android应用程序的开发。以下是为什么 Android Studio 能使用 IntelliJ IDEA 插件的原因&am…

后端java工程师经验之谈,工作7年,mysql使用心得

mysql 工作7年&#xff0c;mysql使用心得 mysql1.创建变量2.创建存储过程2.1&#xff1a;WHILE循环2.2&#xff1a;repeat循环2.3&#xff1a;loop循环2.4&#xff1a;存储过程&#xff0c;游标2.5&#xff1a;存储过程&#xff0c;有输入参数和输出参数 3.三种注释写法4.case …

ArcGIS Pro批量创建离线服务sd包

背景&#xff1a; 主要针对一个工程内有多个地图框项&#xff1a; 处理方法&#xff1a;通过Python脚本处理打包。 运行环境 在Pro的Python环境中去运行编写的Python脚本。 Python 脚本参考 import arcpy import os# Set output file names outdir r"d:\data\out&…

基于DeepSeek API和VSCode的自动化网页生成流程

1.创建API key 访问官网DeepSeek &#xff0c;点击API开放平台。 在开放平台界面左侧点击API keys&#xff0c;进入API keys管理界面&#xff0c;点击创建API key按钮创建API key&#xff0c;名称自定义。 2.下载并安装配置编辑器VSCode 官网Visual Studio Code - Code Editing…

信创领域的PostgreSQL管理员认证

信创产业&#xff0c;全称为信息技术应用创新产业&#xff0c;是中国为应对国际技术竞争、保障信息安全、实现科技自立而重点发展的战略性新兴产业。其核心目标是通过自主研发和生态构建&#xff0c;逐步替代国外信息技术产品&#xff0c;形成自主可控的国产化信息技术体系。 发…

使用 Visual Studio Code (VS Code) 开发 Python 图形界面程序

安装Python、VS Code Documentation for Visual Studio Code Python Releases for Windows | Python.org 更新pip >python.exe -m pip install --upgrade pip Requirement already satisfied: pip in c:\users\xxx\appdata\local\programs\python\python312\lib\site-pa…