文章目录
- 前言
- 一、Innodb如何作为MySQL插件的
- 二、page cleaner thread
- 三、Update操作源码梳理
- 1、生成undo log
- 2、更新数据
- 3、生成redo log
- 四、MTR与将脏页添加到Flush List
- 1、MTR
- 2、脏页添加到Flush List
- 五、事务提交
- 1、xa-prepare
- 2、xa-commit
- 2.1、process_flush_stage_queue
- 2.2、process_commit_stage_queue
- 总结
前言
在上一章中我们分析了事务持久性(落盘)的理论知识。
相信很多对技术有一定追求的小伙伴会有疑问是否真的是这样实现的呢,我在查阅相关书籍和博客时也有这样的疑问,于是花了点时间看了下相关源码,现整理如下,与大家分享讨论。
继续放一下这张Innodb架构图,我们就结合它进行落盘源码梳理。
根据图中可知,Innodb落盘主要分三部分:
- redo log落盘(redo log -> redo log buffer->redo log file);
- undo log落盘(undo log->Innodb buffer pool->undo tablespace);
- data落盘(data->Innodb buffer pool->user tablespace【独立表空间】)。
另外,MySQL开启binlog的情况下,binlog的落盘也会参与到事务持久性中,我们就这四种数据落盘进行源码梳理。
ps:基于MySQL8.0.32 Innodb存储引擎
一、Innodb如何作为MySQL插件的
首先看一下MySQL源码目录,见下图:
在include/mysql/plugin.h文件中定义了插件描述结构体,主要内容包括存储引擎类型,名称,描述,初始化等函数,状态变量和系统变量的定义等等
*
Plugin description structure.
*/
struct st_mysql_plugin {
int type; /* the plugin type (a MYSQL_XXX_PLUGIN value) */
void *info; /* pointer to type-specific plugin descriptor */
const char *name; /* plugin name */
const char *author; /* plugin author (for I_S.PLUGINS) */
const char *descr; /* general descriptive text (for I_S.PLUGINS) */
int license; /* the plugin license (PLUGIN_LICENSE_XXX) */
/** Function to invoke when plugin is loaded. */
int (*init)(MYSQL_PLUGIN);
/** Function to invoke when plugin is uninstalled. */
int (*check_uninstall)(MYSQL_PLUGIN);
/** Function to invoke when plugin is unloaded. */
int (*deinit)(MYSQL_PLUGIN);
unsigned int version; /* plugin version (for I_S.PLUGINS) */
SHOW_VAR *status_vars;
SYS_VAR **system_vars;
void *__reserved1; /* reserved for dependency checking */
unsigned long flags; /* flags for plugin */
};
在storge/innobase/handler/ha_innodb.cc文件中有对st_mysql_plugin 定义,其中innodb_init函数就是Innodb的初始化函数
mysql_declare_plugin(innobase){
MYSQL_STORAGE_ENGINE_PLUGIN,
&innobase_storage_engine,
innobase_hton_name,
PLUGIN_AUTHOR_ORACLE,
"Supports transactions, row-level locking, and foreign keys",
PLUGIN_LICENSE_GPL,
innodb_init, /* Plugin Init */
nullptr, /* Plugin Check uninstall */
innodb_deinit, /* Plugin Deinit */
INNODB_VERSION_SHORT,
innodb_status_variables_export, /* status variables */
innobase_system_variables, /* system variables */
nullptr, /* reserved */
0, /* flags */
},
i_s_innodb_trx, i_s_innodb_cmp, i_s_innodb_cmp_reset, i_s_innodb_cmpmem,
i_s_innodb_cmpmem_reset, i_s_innodb_cmp_per_index,
i_s_innodb_cmp_per_index_reset, i_s_innodb_buffer_page,
i_s_innodb_buffer_page_lru, i_s_innodb_buffer_stats,
i_s_innodb_temp_table_info, i_s_innodb_metrics,
i_s_innodb_ft_default_stopword, i_s_innodb_ft_deleted,
i_s_innodb_ft_being_deleted, i_s_innodb_ft_config,
i_s_innodb_ft_index_cache, i_s_innodb_ft_index_table, i_s_innodb_tables,
i_s_innodb_tablestats, i_s_innodb_indexes, i_s_innodb_tablespaces,
i_s_innodb_columns, i_s_innodb_virtual, i_s_innodb_cached_indexes,
i_s_innodb_session_temp_tablespaces
mysql_declare_plugin_end;
nnodb_init函数定义了很多Innodb重要函数,xa相关,文件相关等,这里只列了本文涉及到的几个函数
static int innodb_init(void *p) {
DBUG_TRACE;
acquire_plugin_services();
handlerton *innobase_hton = (handlerton *)p;
innodb_hton_ptr = innobase_hton;
......
//xa事务相关
innobase_hton->commit = innobase_commit;
innobase_hton->rollback = innobase_rollback;
innobase_hton->prepare = innobase_xa_prepare;
innobase_hton->recover = innobase_xa_recover;
innobase_hton->flush_logs = innobase_flush_logs;//flush redo log
//Initialize InnoDB for being used to store the DD tables.
innobase_hton->ddse_dict_init = innobase_ddse_dict_init;
......
}
二、page cleaner thread
Innodb脏页(修改过的data page和undo page,即二者内存中的内容与磁盘上的不一致了)是由page cleaner thread刷盘的,先看一下相关源码
static bool innobase_ddse_dict_init(
dict_init_mode_t dict_init_mode, uint, List<const dd::Object_table> *tables,
List<const Plugin_tablespace> *tablespaces) {
DBUG_TRACE;
......
if (innobase_init_files(dict_init_mode, tablespaces)) {
return true;
}
......
}
//** Open or create InnoDB data files.
tatic int innobase_init_files(dict_init_mode_t dict_init_mode,
List<const Plugin_tablespace> *tablespaces) {
DBUG_TRACE;
......
/* Start the InnoDB server. */
err = srv_start(create);
......
}
dberr_t srv_start(bool create_new_db) {
......
/* Even in read-only mode there could be flush job generated by
intrinsic table operations. */
buf_flush_page_cleaner_init();
......
}
/** Initialize page_cleaner. */
//page cleaner并未和buffer pool绑定,其模型为一个协调线程 + 多个工作线程,协调线程本身也是工作线程,如果innodb_page_cleaners设置为8,那么就是1个协调线程,加7个工作线程
void buf_flush_page_cleaner_init() {
ut_ad(page_cleaner == nullptr);
page_cleaner = ut::make_unique<page_cleaner_t>(UT_NEW_THIS_FILE_PSI_KEY);
mutex_create(LATCH_ID_PAGE_CLEANER, &page_cleaner->mutex);
page_cleaner->is_requested = os_event_create();//创建触发刷盘事件
page_cleaner->is_finished = os_event_create();//创建刷盘结束事件
page_cleaner->n_slots = static_cast<ulint>(srv_buf_pool_instances);//buffer pool实例个数,取决于参数innodb_buffer_pool_instances
page_cleaner->slots = ut::make_unique<page_cleaner_slot_t[]>(
UT_NEW_THIS_FILE_PSI_KEY, page_cleaner->n_slots);
ut_d(page_cleaner->n_disabled_debug = 0);
page_cleaner->is_running = true;
//创建coordinator线程
srv_threads.m_page_cleaner_coordinator = os_thread_create(
page_flush_coordinator_thread_key, 0, buf_flush_page_coordinator_thread);
//把coordinator线程添加到work线程组,意味着coordinator线程也会做刷盘的工作,不仅仅只是协调者
srv_threads.m_page_cleaner_workers[0] =
srv_threads.m_page_cleaner_coordinator;
srv_threads.m_page_cleaner_coordinator.start();
/* Make sure page cleaner is active. */
ut_a(buf_flush_page_cleaner_is_active());
}
/** Thread tasked with flushing dirty pages from the buffer pools.*/
static void buf_flush_page_coordinator_thread() {
......
/* We start from 1 because the coordinator thread is part of the
same set,即开启work线程组,从索引1开始,前面可知0分配给了coordinator线程 */
//srv_threads.m_page_cleaner_workers_n大小取决于innodb_page_cleaners配置
for (size_t i = 1; i < srv_threads.m_page_cleaner_workers_n; ++i) {
srv_threads.m_page_cleaner_workers[i] = os_thread_create(
page_flush_thread_key, i, buf_flush_page_cleaner_thread);
srv_threads.m_page_cleaner_workers[i].start();
}
while (!srv_read_only_mode &&
srv_shutdown_state.load() < SRV_SHUTDOWN_CLEANUP &&
recv_sys->spaces != nullptr) {
/* treat flushing requests during recovery. */
ulint n_flushed_lru = 0;
ulint n_flushed_list = 0;
os_event_wait(recv_sys->flush_start);
if (srv_shutdown_state.load() >= SRV_SHUTDOWN_CLEANUP ||
recv_sys->spaces == nullptr) {
break;
}
switch (recv_sys->flush_type) {
case BUF_FLUSH_LRU:
/* Flush pages from end of LRU if required */
pc_request(0, LSN_MAX);
while (pc_flush_slot() > 0) {
}
pc_wait_finished(&n_flushed_lru, &n_flushed_list);
break;
case BUF_FLUSH_LIST:
/* Flush all pages */
do {
pc_request(ULINT_MAX, LSN_MAX);//触发刷盘事件
while (pc_flush_slot() > 0) {
}
} while (!pc_wait_finished(&n_flushed_lru, &n_flushed_list));
break;
default:
ut_d(ut_error);
}
os_event_reset(recv_sys->flush_start);
os_event_set(recv_sys->flush_end);
}
......
//后面的是支持在crash recovery和shutdown时能够应用多个page cleaner特性的代码,来加快崩溃恢复和关闭实例的速度。
......
}
/** Worker thread of page_cleaner. */
static void buf_flush_page_cleaner_thread() {
#ifdef UNIV_LINUX
/* linux might be able to set different setting for each thread
worth to try to set high priority for page cleaner threads */
if (buf_flush_page_cleaner_set_priority(buf_flush_page_cleaner_priority)) {
ib::info(ER_IB_MSG_129)
<< "page_cleaner worker priority: " << buf_flush_page_cleaner_priority;
}
#endif /* UNIV_LINUX */
for (;;) {
os_event_wait(page_cleaner->is_requested);//等待刷盘事件
ut_d(buf_flush_page_cleaner_disabled_loop());//刷盘
if (!page_cleaner->is_running) {
break;
}
pc_flush_slot();
}
}
至此就能对page clear源码调用链路和工作机制有了大致的了解了。
三、Update操作源码梳理
本小节以update操作来阐明Innodb写数据(update,insert,delete)时对undo,data,redo的处理机制
客户端发送一条Update SQL到MySQL Server,经过语法解析等机制,会调用Innodb的(位于storge/innobase/handler/ha_innodb.cc文件)
int ha_innobase::update_row(const uchar *old_row, uchar *new_row) {
......
error = row_update_for_mysql((byte *)old_row, m_prebuilt);
......
}
/** Does an update or delete of a row for MySQL.*/
//位于storge/innobase/row/row0mysql.cc文件
dberr_t row_update_for_mysql(const byte *mysql_rec, row_prebuilt_t *prebuilt) {
if (prebuilt->table->is_intrinsic()) {
return (row_del_upd_for_mysql_using_cursor(prebuilt));//看注释一些特殊的临时表走这里
} else {
ut_a(prebuilt->template_type == ROW_MYSQL_WHOLE_ROW);
return (row_update_for_mysql_using_upd_graph(mysql_rec, prebuilt));//更新
}
}
static dberr_t row_update_for_mysql_using_upd_graph(const byte *mysql_rec,row_prebuilt_t *prebuilt) {
......
row_upd_step(thr);
......
}
//位于storge/innobase/row/row0upd.cc文件
/** Updates a row in a table. This is a high-level function used in SQLexecution graphs. */
que_thr_t *row_upd_step(que_thr_t *thr) /*!< in: query thread */
{
......
node = static_cast<upd_node_t *>(thr->run_node);
/* DO THE CHECKS OF THE CONSISTENCY CONSTRAINTS HERE */
err = row_upd(node, thr);
......
}
static dberr_t row_upd(upd_node_t *node, /*!< in: row update node */
que_thr_t *thr) /*!< in: query thread */
{
......
switch (node->state) {
case UPD_NODE_UPDATE_CLUSTERED:
case UPD_NODE_INSERT_CLUSTERED:
if (!node->table->is_intrinsic()) {
log_free_check();
}
err = row_upd_clust_step(node, thr);
if (err != DB_SUCCESS) {
return err;
}
}
......
}
/** Updates the clustered index record.*/
static dberr_t row_upd_clust_step(upd_node_t *node, que_thr_t *const thr)
{
......
mtr_t mtr;
mtr_start(&mtr);//开启mtr,MySQL的MTR机制
if (row_upd_changes_ord_field_binary(index, node->update, thr, node->row,node->ext, nullptr)) {
/* Update causes an ordering field (ordering fields within
the B-tree) of the clustered index record to change: perform
the update by delete marking and inserting.
TODO! What to do to the 'Halloween problem', where an update
moves the record forward in index so that it is again
updated when the cursor arrives there? Solution: the
read operation must check the undo record undo number when
choosing records to update. MySQL solves now the problem
externally! */
err =row_upd_clust_rec_by_insert(flags, node, index, thr, referenced, &mtr);//删除(伪删除,只是打个删除标记)再插入
if (err != DB_SUCCESS) {
goto exit_func;
}
node->state = UPD_NODE_UPDATE_ALL_SEC;
} else {
err = row_upd_clust_rec(flags, node, index, offsets, &heap, thr, &mtr);//更新
if (err != DB_SUCCESS) {
goto exit_func;
}
node->state = UPD_NODE_UPDATE_SOME_SEC;
}
node->index = index->next();
......
}
/** Updates a clustered index record of a row when the ordering fields do
not change.
@return DB_SUCCESS if operation successfully completed, else error
code or DB_LOCK_WAIT */
[[nodiscard]] static dberr_t row_upd_clust_rec(
ulint flags, /*!< in: undo logging and locking flags */
upd_node_t *node, /*!< in: row update node */
dict_index_t *index, /*!< in: clustered index */
ulint *offsets, /*!< in: rec_get_offsets() on node->pcur */
mem_heap_t **offsets_heap,
/*!< in/out: memory heap, can be emptied */
que_thr_t *thr, /*!< in: query thread */
mtr_t *mtr) /*!< in: mtr; gets committed here */
{
......
/* Try optimistic updating of the record, keeping changes within
the page; we do not check locks because we assume the x-lock on the
record to update */
if (node->cmpl_info & UPD_NODE_NO_SIZE_CHANGE) {
err = btr_cur_update_in_place(flags | BTR_NO_LOCKING_FLAG, btr_cur, offsets,
node->update, node->cmpl_info, thr,
thr_get_trx(thr)->id, mtr);//更新b-tree
} else {
err = btr_cur_optimistic_update(//该函数处理后依旧调用btr_cur_update_in_place函数
flags | BTR_NO_LOCKING_FLAG, btr_cur, &offsets, offsets_heap,
node->update, node->cmpl_info, thr, thr_get_trx(thr)->id, mtr);
}
if (err == DB_SUCCESS) {
goto success;
}
mtr->commit();//提交mtr
success:
if (dict_index_is_online_ddl(index)) {
dtuple_t *new_v_row = nullptr;
dtuple_t *old_v_row = nullptr;
if (!(node->cmpl_info & UPD_NODE_NO_ORD_CHANGE)) {
new_v_row = node->upd_row;
old_v_row = node->update->old_vrow;
}
row_log_table_update(btr_cur_get_rec(btr_cur), index, offsets,
rebuilt_old_pk, new_v_row, old_v_row);
}
}
mtr->commit();//提交mtr
......
}
//位于storge/innobase/btr/btr0cur.cc文件
//相当重要的函数,前面说了那么久,其实这里才开始处理undo log,data ,redo log
dberr_t btr_cur_update_in_place(ulint flags, btr_cur_t *cursor, ulint *offsets,
const upd_t *update, ulint cmpl_info,
que_thr_t *thr, trx_id_t trx_id, mtr_t *mtr) {
......
roll_ptr_t roll_ptr = 0;//回滚指针,用于生成undo log版本链表的,实现事务回滚和MVCC
/* Do lock checking and undo logging */
err = btr_cur_upd_lock_and_undo(flags, cursor, offsets, update, cmpl_info,
thr, mtr, &roll_ptr);//生成undo log
......
if (block->ahi.index.load() != nullptr) {
......
row_upd_rec_in_place(rec, index, offsets, update, page_zip);//更新data page
btr_cur_update_in_place_log(flags, rec, index, update, trx_id, roll_ptr, mtr);//生成redo log
......
}
......
}
1、生成undo log
在本小节你可以看到undo log会被添加到redo log中,用以保证undo log的安全
/** For an update, checks the locks and does the undo logging.
@return DB_SUCCESS, DB_WAIT_LOCK, or error number */
[[nodiscard]] static inline dberr_t btr_cur_upd_lock_and_undo(
ulint flags, /*!< in: undo logging and locking flags */
btr_cur_t *cursor, /*!< in: cursor on record to update */
const ulint *offsets, /*!< in: rec_get_offsets() on cursor */
const upd_t *update, /*!< in: update vector */
ulint cmpl_info, /*!< in: compiler info on secondary index
updates */
que_thr_t *thr, /*!< in: query thread
(can be NULL if BTR_NO_LOCKING_FLAG) */
mtr_t *mtr, /*!< in/out: mini-transaction */
roll_ptr_t *roll_ptr) /*!< out: roll pointer */
{
......
/* Append the info about the update in the undo log */
return (trx_undo_report_row_operation(flags, TRX_UNDO_MODIFY_OP, thr, index,
nullptr, update, cmpl_info, rec,
offsets, roll_ptr));
}
//该函数是生成undo log的总入口,delete,insert也走这里
dberr_t trx_undo_report_row_operation(
ulint flags, /*!< in: if BTR_NO_UNDO_LOG_FLAG bit is
set, does nothing */
ulint op_type, /*!< in: TRX_UNDO_INSERT_OP or
TRX_UNDO_MODIFY_OP */
que_thr_t *thr, /*!< in: query thread */
dict_index_t *index, /*!< in: clustered index */
const dtuple_t *clust_entry, /*!< in: in the case of an insert,
index entry to insert into the
clustered index, otherwise NULL */
const upd_t *update, /*!< in: in the case of an update,
the update vector, otherwise NULL */
ulint cmpl_info, /*!< in: compiler info on secondary
index updates */
const rec_t *rec, /*!< in: in case of an update or delete
marking, the record in the clustered
index, otherwise NULL */
const ulint *offsets, /*!< in: rec_get_offsets(rec) */
roll_ptr_t *roll_ptr) /*!< out: rollback pointer to the
inserted undo log record,
0 if BTR_NO_UNDO_LOG
flag was specified */
{
......
mtr_t mtr;
mtr_start(&mtr);开启mtr,MySQL的MTR机制,注意这里开启了新的MTR,与row_upd_clust_step函数不是同一个了
if (undo == nullptr) {
err = trx_undo_assign_undo(trx, undo_ptr, TRX_UNDO_INSERT);//如果事务上下文没有insert_undo,就去申请可用undo page
undo = undo_ptr->insert_undo;
.......
}
if (undo == nullptr) {
err = trx_undo_assign_undo(trx, undo_ptr, TRX_UNDO_UPDATE);//如果事务上下文没有update_undo,就去申请可用undo page,细节的话放到undo log源码章节
undo = undo_ptr->update_undo;
.......
}
undo_block = buf_page_get_gen(page_id_t(undo->space, page_no),
undo->page_size, RW_X_LATCH, undo->guess_block,
Page_fetch::NORMAL, UT_LOCATION_HERE, &mtr);
buf_block_dbg_add_level(undo_block, SYNC_TRX_UNDO_PAGE);
do {
page_t *undo_page;
ulint offset;
undo_page = buf_block_get_frame(undo_block);
ut_ad(page_no == undo_block->page.id.page_no());
switch (op_type) {//可以看到undo分两类,insert和update
case TRX_UNDO_INSERT_OP:
offset = trx_undo_page_report_insert(undo_page, trx, index, clust_entry,
&mtr);//写insert undo
break;
default:
ut_ad(op_type == TRX_UNDO_MODIFY_OP);
offset = trx_undo_page_report_modify(undo_page, trx, index, rec, offsets,
update, cmpl_info, clust_entry, &mtr);//写upate undo
}
if (UNIV_UNLIKELY(offset == 0)) {//offset不等于0表示有undo 变更,进入下面的逻辑
/* The record did not fit on the page. We erase the
end segment of the undo log page and write a log
record of it: this is to ensure that in the debug
version the replicate page constructed using the log
records stays identical to the original page */
if (!trx_undo_erase_page_end(undo_page, &mtr)) {//这个函数要特别注意,之前兰MySQL技术内幕一书时,看到undo log由redo log做安全保证,看源码是一直忽略了这个函数,没想到就是它
/* The record did not fit on an empty
undo page. Discard the freshly allocated
page and return an error. */
/* When we remove a page from an undo
log, this is analogous to a
pessimistic insert in a B-tree, and we
must reserve the counterpart of the
tree latch, which is the rseg
mutex. We must commit the mini-transaction
first, because it may be holding lower-level
latches, such as SYNC_FSP and SYNC_FSP_PAGE. */
mtr_commit(&mtr);//mtr提交
}
}
......
}
/** Erases the unused undo log page end.
@return true if the page contained something, false if it was empty */
static bool trx_undo_erase_page_end(
page_t *undo_page, /*!< in/out: undo page whose end to erase */
mtr_t *mtr) /*!< in/out: mini-transaction */
{
ulint first_free;
first_free =
mach_read_from_2(undo_page + TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_FREE);
memset(undo_page + first_free, 0xff,
(UNIV_PAGE_SIZE - FIL_PAGE_DATA_END) - first_free);
mlog_write_initial_log_record(undo_page, MLOG_UNDO_ERASE_END, mtr);//把undo 写到redo
return (first_free != TRX_UNDO_PAGE_HDR + TRX_UNDO_PAGE_HDR_SIZE);
}
/** Writes the initial part of a log record consisting of one-byte item
type and four-byte space and page numbers. Also pushes info
to the mtr memo that a buffer page has been modified. */
void mlog_write_initial_log_record(//本函数生成了undo的redo
const byte *ptr, /*!< in: pointer to (inside) a buffer
frame holding the file page where
modification is made */
mlog_id_t type, /*!< in: log item type: MLOG_1BYTE, ... */
mtr_t *mtr) /*!< in: mini-transaction handle */
{
byte *log_ptr = nullptr;
ut_ad(type <= MLOG_BIGGEST_TYPE);
ut_ad(type > MLOG_8BYTES);
/* If no logging is requested, we may return now */
if (!mlog_open(mtr, REDO_LOG_INITIAL_INFO_SIZE, log_ptr)) {//开启一个redo log对象(在redo log buffer)
return;
}
log_ptr = mlog_write_initial_log_record_fast(ptr, type, log_ptr, mtr);//将undo写到redo
mlog_close(mtr, log_ptr);//关闭redo log对象
}
/** Writes the initial part of a log record (3..11 bytes).
If the implementation of this function is changed, all
size parameters to mlog_open() should be adjusted accordingly!
@return new value of log_ptr */
static inline byte *mlog_write_initial_log_record_fast(
const byte *ptr, /*!< in: pointer to (inside) a buffer
frame holding the file page where
modification is made */
mlog_id_t type, /*!< in: log item type: MLOG_1BYTE, ... */
byte *log_ptr, /*!< in: pointer to mtr log which has
been opened */
mtr_t *mtr) /*!< in/out: mtr */
{
......
return (mlog_write_initial_log_record_low(type, space, offset, log_ptr, mtr));
......
}
/** Writes a log record about an operation.
@param[in] type Redo log record type
@param[in] space_id Tablespace identifier
@param[in] page_no Page number
@param[in,out] log_ptr Current end of mini-transaction log
@param[in,out] mtr Mini-transaction
@return end of mini-transaction log */
static inline byte *mlog_write_initial_log_record_low(mlog_id_t type,
space_id_t space_id,
page_no_t page_no,
byte *log_ptr,
mtr_t *mtr) {
ut_ad(type <= MLOG_BIGGEST_TYPE);
mach_write_to_1(log_ptr, type);
log_ptr++;
log_ptr += mach_write_compressed(log_ptr, space_id);
log_ptr += mach_write_compressed(log_ptr, page_no);
mtr->added_rec();
return (log_ptr);
}
2、更新数据
/** Replaces the new column values stored in the update vector to the
record given. No field size changes are allowed. This function is
usually invoked on a clustered index. The only use case for a
secondary index is row_ins_sec_index_entry_by_modify() or its
counterpart in ibuf_insert_to_index_page(). */
void row_upd_rec_in_place(
rec_t *rec, /*!< in/out: record where replaced */
const dict_index_t *index, /*!< in: the index the record belongs to */
const ulint *offsets, /*!< in: array returned by rec_get_offsets() */
const upd_t *update, /*!< in: update vector */
page_zip_des_t *page_zip) /*!< in: compressed page with enough space
available, or NULL */
{
const upd_field_t *upd_field;
const dfield_t *new_val;
ulint n_fields;
ulint i;
ut_ad(rec_offs_validate(rec, index, offsets));
ut_ad(!index->table->skip_alter_undo);
if (rec_offs_comp(offsets)) {
/* Keep the INSTANT/VERSION bit of prepared physical record */
const bool is_instant = rec_get_instant_flag_new(rec);
const bool is_versioned = rec_new_is_versioned(rec);
/* Set the info_bits from the update vector */
rec_set_info_bits_new(rec, update->info_bits);
/* Set the INSTANT/VERSION bit from the values kept */
if (is_versioned) {
rec_new_set_versioned(rec);
} else if (is_instant) {
ut_ad(index->table->has_instant_cols());
ut_ad(!rec_new_is_versioned(rec));
rec_new_set_instant(rec);
} else {
rec_new_reset_instant_version(rec);
}
/* Only one of the bit (INSTANT or VERSION) could be set */
ut_a(!(rec_get_instant_flag_new(rec) && rec_new_is_versioned(rec)));
} else {
/* INSTANT bit is irrelevant for the record in Redundant format */
bool is_versioned = rec_old_is_versioned(rec);
rec_set_info_bits_old(rec, update->info_bits);
if (is_versioned) {
rec_old_set_versioned(rec, true);
} else {
rec_old_set_versioned(rec, false);
}
}
n_fields = upd_get_n_fields(update);
for (i = 0; i < n_fields; i++) {//依次更新变更的字段
upd_field = upd_get_nth_field(update, i);
/* No need to update virtual columns for non-virtual index */
if (upd_fld_is_virtual_col(upd_field) && !dict_index_has_virtual(index)) {
continue;
}
uint32_t field_no = upd_field->field_no;
new_val = &(upd_field->new_val);
ut_ad(!dfield_is_ext(new_val) ==
!rec_offs_nth_extern(index, offsets, field_no));
/* Updating default value for instantly added columns must not be done
in-place. See also row_upd_changes_field_size_or_external() */
ut_ad(!rec_offs_nth_default(index, offsets, field_no));
rec_set_nth_field(index, rec, offsets, field_no, dfield_get_data(new_val),
dfield_get_len(new_val));
}
if (page_zip) {
page_zip_write_rec(page_zip, rec, index, offsets, 0);
}
}
3、生成redo log
/** Writes a redo log record of updating a record in-place.
@param[in] flags Undo logging and locking flags
@param[in] rec Record
@param[in] index Index of the record
@param[in] update Update vector
@param[in] trx_id Transaction id
@param[in] roll_ptr Roll ptr
@param[in] mtr Mini-transaction */
void btr_cur_update_in_place_log(ulint flags, const rec_t *rec,//生成data的redo log,可以看到除了数据变更,回滚指针也被记录了
dict_index_t *index, const upd_t *update,
trx_id_t trx_id, roll_ptr_t roll_ptr,
mtr_t *mtr) {
byte *log_ptr = nullptr;
ut_d(const page_t *page = page_align(rec));
ut_ad(flags < 256);
ut_ad(page_is_comp(page) == dict_table_is_comp(index->table));
const bool opened = mlog_open_and_write_index(
mtr, rec, index, MLOG_REC_UPDATE_IN_PLACE,
1 + DATA_ROLL_PTR_LEN + 14 + 2 + MLOG_BUF_MARGIN, log_ptr);
if (!opened) {
/* Logging in mtr is switched off during crash recovery */
return;
}
/* For secondary indexes, we could skip writing the dummy system fields
to the redo log but we have to change redo log parsing of
MLOG_REC_UPDATE_IN_PLACE/MLOG_COMP_REC_UPDATE_IN_PLACE or we have to add
new redo log record. For now, just write dummy sys fields to the redo
log if we are updating a secondary index record.
*/
mach_write_to_1(log_ptr, flags);
log_ptr++;
if (index->is_clustered()) {
log_ptr =
row_upd_write_sys_vals_to_log(index, trx_id, roll_ptr, log_ptr, mtr);
} else {
/* Dummy system fields for a secondary index */
/* TRX_ID Position */
log_ptr += mach_write_compressed(log_ptr, 0);
/* ROLL_PTR */
trx_write_roll_ptr(log_ptr, 0);
log_ptr += DATA_ROLL_PTR_LEN;
/* TRX_ID */
log_ptr += mach_u64_write_compressed(log_ptr, 0);
}
mach_write_to_2(log_ptr, page_offset(rec));
log_ptr += 2;
row_upd_index_write_log(index, update, log_ptr, mtr);
}
通过本章源码梳理,可以真切的看到undo,data,redo变更顺序,特别是undo也会被写到redo来确保undo的安全,但是看了这么多都是内存操作,啥时候落盘呢?别急,我们先了解下Innodb的MTR(Mini-Transaction)概念。
四、MTR与将脏页添加到Flush List
1、MTR
Innodb作者把对底层页面的一次原子访问称为一个Mini-Transaction(最小事务),想b-tree插入一条数据就可以称为一个MTR,通过上面源码梳理可知一个update语句包含了多个MTR,一个MTR包含了多个redo,所以有了:
2、脏页添加到Flush List
我们在上一章源码中已经有看到 mtr_commit(&mtr)这样的操作,没错,该操作就会将该MTR中的dirty page添加到Flush List,源码如下:
//位于storge/innobase/include/mtr0mtr.h文件
#define mtr_commit(m) (m)->commit()
/** Mini-transaction handle and buffer */
struct mtr_t {//MTR的结构体
...
}
/** Commit a mini-transaction.
mtr_commit(&mtr)执行的真正方法
*/
//位于storge/innobase/include/mtr0mtr.cc文件
void mtr_t::commit() {
ut_ad(is_active());
ut_ad(!is_inside_ibuf());
ut_ad(m_impl.m_magic_n == MTR_MAGIC_N);
m_impl.m_state = MTR_STATE_COMMITTING;
DBUG_EXECUTE_IF("mtr_commit_crash", DBUG_SUICIDE(););
Command cmd(this);
if (m_impl.m_n_log_recs > 0 ||
(m_impl.m_modifications && m_impl.m_log_mode == MTR_LOG_NO_REDO)) {
ut_ad(!srv_read_only_mode || m_impl.m_log_mode == MTR_LOG_NO_REDO);
cmd.execute();//执行
} else {
cmd.release_all();
cmd.release_resources();
}
#ifndef UNIV_HOTBACKUP
check_nolog_and_unmark();
#endif /* !UNIV_HOTBACKUP */
ut_d(remove_from_debug_list());
}
/** Write the redo log record, add dirty pages to the flush list and release
the resources. */
void mtr_t::Command::execute() {
ut_ad(m_impl->m_log_mode != MTR_LOG_NONE);
#ifndef UNIV_HOTBACKUP
ulint len = prepare_write();
if (len > 0) {
mtr_write_log_t write_log;
write_log.m_left_to_write = len;
auto handle = log_buffer_reserve(*log_sys, len);
write_log.m_handle = handle;
write_log.m_lsn = handle.start_lsn;
m_impl->m_log.for_each_block(write_log);//把MTR包含的redo log添加(mtr_write_log_t里operator方法)到redo log buffer
ut_ad(write_log.m_left_to_write == 0);
ut_ad(write_log.m_lsn == handle.end_lsn);
log_wait_for_space_in_log_recent_closed(*log_sys, handle.start_lsn);
DEBUG_SYNC_C("mtr_redo_before_add_dirty_blocks");
add_dirty_blocks_to_flush_list(handle.start_lsn, handle.end_lsn);//把MTR包含的脏页(数据脏页,undo log脏页)添加到Flush List
log_buffer_close(*log_sys, handle);
m_impl->m_mtr->m_commit_lsn = handle.end_lsn;
} else {
DEBUG_SYNC_C("mtr_noredo_before_add_dirty_blocks");
add_dirty_blocks_to_flush_list(0, 0);
}
#endif /* !UNIV_HOTBACKUP */
release_all();
release_resources();
}
通过本节看到redo log如何被添加到redo log buffer,脏页如何被添加到Flush List。
在文件第二章(page cleaner thread)已知后台page cleaner thread线程组会将Flush List内容刷盘,那么redo log 到了innodb log buffer后合适刷盘呢,见下一章?
五、事务提交
MySQL为了保证主从一致,在开启binlog时,采用xa-2pc模式提交,即我们在执行commit时,MySQL实际执行包含xa-prepare和xa-commit两个阶段。
//binlog 定义,在sql/binlog.cc文件
mysql_declare_plugin(binlog){
MYSQL_STORAGE_ENGINE_PLUGIN,
&binlog_storage_engine,
"binlog",
PLUGIN_AUTHOR_ORACLE,
"This is a pseudo storage engine to represent the binlog in a transaction",
PLUGIN_LICENSE_GPL,
binlog_init, /* Plugin Init */
nullptr, /* Plugin Check uninstall */
binlog_deinit, /* Plugin Deinit */
0x0100 /* 1.0 */,
nullptr, /* status variables */
nullptr, /* system variables */
nullptr, /* config options */
0,
} mysql_declare_plugin_end;
static int binlog_init(void *p) {
binlog_hton = (handlerton *)p;
binlog_hton->state = opt_bin_log ? SHOW_OPTION_YES : SHOW_OPTION_NO;
binlog_hton->db_type = DB_TYPE_BINLOG;
binlog_hton->savepoint_offset = sizeof(my_off_t);
binlog_hton->close_connection = binlog_close_connection;
binlog_hton->savepoint_set = binlog_savepoint_set;
binlog_hton->savepoint_rollback = binlog_savepoint_rollback;
binlog_hton->savepoint_rollback_can_release_mdl =
binlog_savepoint_rollback_can_release_mdl;
binlog_hton->commit = binlog_commit;
binlog_hton->rollback = binlog_rollback;
binlog_hton->prepare = binlog_prepare;
binlog_hton->set_prepared_in_tc = binlog_set_prepared_in_tc;
binlog_hton->recover = binlog_dummy_recover;
binlog_hton->flags = HTON_NOT_USER_SELECTABLE | HTON_HIDDEN;
return 0;
}
MySQL 启动时,init_server_components() 函数按以下规则选择事务协调器,开启了binlog就会选择mysql_bin_log对象
//位于sql/mysqld.cc
#ifdef _WIN32
int win_main(int argc, char **argv)
#else//入口函数
int mysqld_main(int argc, char **argv)
#endif
{
......
if (init_server_components()) unireg_abort(MYSQLD_ABORT_EXIT);
......
}
static int init_server_components() {
DBUG_TRACE;
......
tc_log = &tc_log_dummy;
......
if (total_ha_2pc > 1 || (1 == total_ha_2pc && opt_bin_log)) {
if (opt_bin_log)
tc_log = &mysql_bin_log;//binlog对象,即extern MYSQL_PLUGIN_IMPORT MYSQL_BIN_LOG mysql_bin_log; 位于sql/binlog.h
else
tc_log = &tc_log_mmap;
}
......
}
/*
TODO use mmap instead of IO_CACHE for binlog
(mmap+fsync is two times faster than write+fsync)
*/
//位于sql/binlog.h binlog对象
class MYSQL_BIN_LOG : public TC_LOG {
public:
class Binlog_ofile;
...
}
1、xa-prepare
//位于sql/binlog.cc文件
int MYSQL_BIN_LOG::prepare(THD *thd, bool all) {
DBUG_TRACE;
int error = ha_prepare_low(thd, all);
}
// 位于sql/handler.cc文件
int ha_prepare_low(THD *thd, bool all) {
......
if (ha_list) {
for (auto const &ha_info : ha_list) {
......
auto ht = ha_info.ht();
//如果是Innodb,这里就是调用第一章Innodb暴漏的函数,即innobase_hton->rollback = innobase_rollback;
int err = ht->prepare(ht, thd, all);
......
}
}
......
}
//位于storge/innobase/handler/ha_innodb.cc文件
/** This function is used to prepare an X/Open XA distributed transaction.
@return 0 or error number */
static int innobase_xa_prepare(handlerton *hton, THD *thd, bool prepare_trx)
{
......
dberr_t err = trx_prepare_for_mysql(trx);
......
}
//位于storge/innobase/trx/trx0trx.cc文件
dberr_t trx_prepare_for_mysql(trx_t *trx) {
......
trx->op_info = "preparing";
trx_prepare(trx);
trx->op_info = "";
......
}
/** Prepares a transaction.
@param[in] trx the transction to prepare. */
static void trx_prepare(trx_t *trx) {
lsn_t lsn = 0;
if (trx->rsegs.m_redo.rseg != nullptr && trx_is_redo_rseg_updated(trx)) {//永久表处理
lsn = trx_prepare_low(trx, &trx->rsegs.m_redo, false);
}
if (trx->rsegs.m_noredo.rseg != nullptr && trx_is_temp_rseg_updated(trx)) {//临时表的处理
trx_prepare_low(trx, &trx->rsegs.m_noredo, true);
}
trx_sys_mutex_enter();
trx->state.store(TRX_STATE_PREPARED, std::memory_order_relaxed);//设置事务状态为TRX_STATE_PREPARED
trx_sys->n_prepared_trx++;
trx_sys_mutex_exit();
if (lsn > 0) {
trx_flush_logs(trx, lsn);//将redo log flush到os cache
}
}
static lsn_t trx_prepare_low( trx_t *trx, trx_undo_ptr_t *undo_ptr, bool noredo_logging)
{
if (undo_ptr->insert_undo != nullptr || undo_ptr->update_undo != nullptr) {
if (noredo_logging) {
mtr_set_log_mode(&mtr, MTR_LOG_NO_REDO);//设置临时表的redo log不刷盘
}
if (undo_ptr->insert_undo != nullptr) {
//该函数位于storge/innobase/trx/trx0undo.cc文件
trx_undo_set_state_at_prepare(trx, undo_ptr->insert_undo, false, &mtr);//设置update undo状态为准备状态
}
if (undo_ptr->update_undo != nullptr) {
if (!noredo_logging) {
trx_undo_gtid_set(trx, undo_ptr->update_undo, true);
}
trx_undo_set_state_at_prepare(trx, undo_ptr->update_undo, false, &mtr);//设置update undo状态为准备状态
}
}
}
经过本小节的源码梳理可知在开启binlog模式下MySQL事务在prepare阶段如何工作的,可知该阶段binlog什么也没做,只是通过调用Innodb的innobase_xa_prepare函数让Innodb准备好,而Innodb对redo,undo做了相应准备工作,特别注意的是这一步的redo只是写到os cache,并非磁盘。
2、xa-commit
//位于sql/binlog.cc文件
TC_LOG::enum_result MYSQL_BIN_LOG::commit(THD *thd, bool all) {
bool skip_commit = is_loggable_xa_prepare(thd);
binlog_cache_mngr *cache_mngr = thd_get_cache_mngr(thd);
/*
No cache manager means nothing to log, but we still have to commit
the transaction.
*/
if (cache_mngr == nullptr) {
if (!skip_commit && trx_coordinator::commit_in_engines(thd, all))//提交事务到存储引擎
return RESULT_ABORTED;
return RESULT_SUCCESS;
}
if (stmt_stuff_logged || trx_stuff_logged) {
if (ordered_commit(thd, all, skip_commit)) //事务顺序提交
return RESULT_INCONSISTENT;
}else if (!skip_commit) {
if (trx_coordinator::commit_in_engines(thd, all))//提交事务到存储引擎
return RESULT_INCONSISTENT;
}
}
//这个函数是很重要的,它包含了binlog提交三步曲,
int MYSQL_BIN_LOG::ordered_commit(THD *thd, bool all, bool skip_commit) {
//1:Flush Stag
change_stage(thd, Commit_stage_manager::BINLOG_FLUSH_STAGE, thd, nullptr,&LOCK_log);//获取Lock_log mutex
leave_mutex_before_commit_stage = &LOCK_log;
flush_error = process_flush_stage_queue(&total_bytes, &do_rotate, &wait_queue);//binlog写到os cache,并向刷redo log到磁盘
//2: Sync Stage
change_stage(thd, Commit_stage_manager::SYNC_STAGE, wait_queue, &LOCK_log, &LOCK_sync));//释放Lock_log mutex,获取LOCK_sync mutex
leave_mutex_before_commit_stage = &LOCK_sync;
if (flush_error == 0 && total_bytes > 0) {
DEBUG_SYNC(thd, "before_sync_binlog_file");
std::pair<bool, bool> result = sync_binlog_file(false);//binlog fsync 磁盘
sync_error = result.first;
}
//3: Commit Stage 提交事务
change_stage(thd, Commit_stage_manager::COMMIT_STAGE, final_queue,leave_mutex_before_commit_stage, &LOCK_commit);//释放LOCK_sync mutex,获取LOCK_commit mutex
process_commit_stage_queue(thd, commit_queue);//提交事务
mysql_mutex_unlock(&LOCK_commit);//释放LOCK_commit mutex
}
2.1、process_flush_stage_queue
该函数主要任务:binlog写到os cache和通知Innodb刷redo log到磁盘
//位于sql/binlog.cc文件
int MYSQL_BIN_LOG::process_flush_stage_queue(my_off_t *total_bytes_var,
bool *rotate_var,
THD **out_queue_var) {
......
THD *first_seen = fetch_and_process_flush_stage_queue();//获取一组事务头节点,并通知Innodb刷redo log到磁盘
......
for (THD *head = first_seen; head; head = head->next_to_commit) {
Thd_backup_and_restore switch_thd(current_thd, head);
std::pair<int, my_off_t> result = flush_thread_caches(head);//binlog写到os cache,
total_bytes += result.second;
if (flush_error == 1) flush_error = result.first;
......
}
THD *MYSQL_BIN_LOG::fetch_and_process_flush_stage_queue(const bool check_and_skip_flush_logs) {
THD *first_seen =
Commit_stage_manager::get_instance().fetch_queue_skip_acquire_lock(
Commit_stage_manager::BINLOG_FLUSH_STAGE);获取一组事务头节点,
/*
We flush prepared records of transactions to the log of storage
engine (for example, InnoDB redo log) in a group right before
flushing them to binary log.
*/
ha_flush_logs(true);//通知Innodb刷redo log到磁盘
}
//位于sql/handler.cc文件
bool ha_flush_logs(bool binlog_group_flush) {
if (plugin_foreach(nullptr, flush_handlerton, MYSQL_STORAGE_ENGINE_PLUGIN,
static_cast<void *>(&binlog_group_flush))) {
return true;
}
return false;
}
//通过调hton->flush_logs(即innobase_flush_logs函数)通知Innodb刷redo log到磁盘
static bool flush_handlerton(THD *, plugin_ref plugin, void *arg) {
handlerton *hton = plugin_data<handlerton *>(plugin);
if (hton->state == SHOW_OPTION_YES && hton->flush_logs &&
hton->flush_logs(hton, *(static_cast<bool *>(arg))))
return true;
return false;
}
2.2、process_commit_stage_queue
/**
Commit a sequence of sessions.
This function commit an entire queue of sessions starting with the
session in @c first. If there were an error in the flushing part of
the ordered commit, the error code is passed in and all the threads
are marked accordingly (but not committed).
*/
void MYSQL_BIN_LOG::process_commit_stage_queue(THD *thd, THD *first) {
for (THD *head = first; head; head = head->next_to_commit) {
/*
Flush/Sync error should be ignored and continue
to commit phase. And thd->commit_error cannot be
COMMIT_ERROR at this moment.
*/
finish_transaction_in_engines(head, all, false);//通知Innodb提交事务
}
}
void finish_transaction_in_engines(THD *thd, bool all, bool run_after_commit) {
if (thd->get_transaction()->m_flags.commit_low) {
if (trx_coordinator::commit_in_engines(thd, all, run_after_commit))//通知Innodb提交事务
thd->commit_error = THD::CE_COMMIT_ERROR;
} else if (is_xa_rollback(thd)) {
if (trx_coordinator::rollback_in_engines(thd, all))//通知Innodb回滚事务
thd->commit_error = THD::CE_COMMIT_ERROR;
}
}
//位于sql/tc_log.cc文件
bool trx_coordinator::commit_in_engines(THD *thd, bool all,bool run_after_commit) {
return ha_commit_low(thd, all, run_after_commit);//通知Innodb提交事务
}
int ha_commit_low(THD *thd, bool all, bool run_after_commit) {
for (auto &ha_info : ha_list) {
int err;
auto ht = ha_info.ht();
if ((err = ht->commit(ht, thd, all))) {//通过调ht->commit(即innobase_commit函数)通知Innodb提交事务
char errbuf[MYSQL_ERRMSG_SIZE];
my_error(ER_ERROR_DURING_COMMIT, MYF(0), err,
my_strerror(errbuf, MYSQL_ERRMSG_SIZE, err));
error = 1;
}
assert(!thd->status_var_aggregated);
thd->status_var.ha_commit_count++;
ha_info.reset(); /* keep it conveniently zero-filled */
}
}
总结
本文通过大量源码分析MySQL ,Innodb落盘相关细节,比如通过update操作源码分析undo,redo何时生成的,通过binlog对象分析我们执行commit操作时MySQL基于xa-2pc来保证binlog与redo log的一致性。最终得到如下流程图。