同学参加CSCC2024数据库系统赛道比赛,我和他一起研究了一些优化的case,最后成功拿到全国2/325。在这里记录一下我们讨论优化过的问题(建议把源码下下来边读边搜代码,否则会晕)
行锁占用内存过大
Q:TPCC测试中行锁占了大量内存,疑似是glibc的malloc的内存管理有问题,生产中换个其他库比如jemalloc就能解决,但是OJ只有glibc
相关代码:
/**
* @description: 申请行级共享锁
* @return {bool} 加锁是否成功
* @param {Transaction*} txn 要申请锁的事务对象指针
* @param {Rid&} rid 加锁的目标记录ID 记录所在的表的fd
* @param {int} tab_fd
*/
bool LockManager::lock_shared_on_record(Transaction *txn, const Rid &rid, int tab_fd) {
std::lock_guard lock(latch_);
if (!check_lock(txn)) {
return false;
}
LockDataId lock_data_id(tab_fd, rid, LockDataType::RECORD);
auto &&it = lock_table_.find(lock_data_id);
if (it == lock_table_.end()) {
it = lock_table_.emplace(std::piecewise_construct, std::forward_as_tuple(lock_data_id),
std::forward_as_tuple()).first;
it->second.oldest_txn_id_ = txn->get_transaction_id();
}
auto &lock_request_queue = it->second;
for (auto &lock_request: lock_request_queue.request_queue_) {
// 如果锁请求队列上该事务已经有共享锁或更高级别的锁(X)了,加锁成功
if (lock_request.txn_id_ == txn->get_transaction_id()) {
// 事务能执行到这里,要么第一次申请,要么等待结束了,拿到锁了
assert(lock_request.granted_);
return true;
}
}
// 如果其他事务有 X 锁,加锁失败(no-wait)
// if (lock_request_queue.group_lock_mode_ == GroupLockMode::X || lock_request_queue.group_lock_mode_ == GroupLockMode::IX || lock_request_queue.group_lock_mode_ == GroupLockMode::SIX) {
// lock_request_queue.cv_.notify_all();
// throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);
// }
// 第一次申请,检查锁队列中有没有冲突的事务
// Check for conflicting locks and apply wait-die logic
if (lock_request_queue.group_lock_mode_ == GroupLockMode::X || lock_request_queue.group_lock_mode_ ==
GroupLockMode::IX || lock_request_queue.group_lock_mode_ == GroupLockMode::SIX) {
if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {
throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);
}
lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();
lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::SHARED);
std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);
auto &&cur = lock_request_queue.request_queue_.begin();
lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {
for (auto &&it = lock_request_queue.request_queue_.begin(); it != lock_request_queue.request_queue_.end();
++it) {
if (it->txn_id_ != txn->get_transaction_id()) {
if (it->lock_mode_ != LockMode::SHARED || it->granted_) {
return false;
}
} else {
cur = it;
break;
}
}
return true;
});
cur->granted_ = true;
lock_request_queue.group_lock_mode_ = static_cast<GroupLockMode>(std::max(
static_cast<int>(GroupLockMode::S), static_cast<int>(lock_request_queue.group_lock_mode_)));
++lock_request_queue.shared_lock_num_;
txn->get_lock_set()->emplace(lock_data_id);
ul.release();
return true;
}
// 每次事务申请锁都要更新最老事务id
if (txn->get_transaction_id() < lock_request_queue.oldest_txn_id_) {
lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();
}
// 将当前事务锁请求加到锁请求队列中
lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::SHARED, true);
// 更新锁请求队列锁模式为共享锁
lock_request_queue.group_lock_mode_ = GroupLockMode::S;
++lock_request_queue.shared_lock_num_;
txn->get_lock_set()->emplace(lock_data_id);
return true;
}
/**
* @description: 申请行级排他锁
* @return {bool} 加锁是否成功
* @param {Transaction*} txn 要申请锁的事务对象指针
* @param {Rid&} rid 加锁的目标记录ID
* @param {int} tab_fd 记录所在的表的fd
*/
bool LockManager::lock_exclusive_on_record(Transaction *txn, const Rid &rid, int tab_fd) {
std::lock_guard lock(latch_);
if (!check_lock(txn)) {
return false;
}
LockDataId lock_data_id(tab_fd, rid, LockDataType::RECORD);
auto &&it = lock_table_.find(lock_data_id);
if (it == lock_table_.end()) {
it = lock_table_.emplace(std::piecewise_construct, std::forward_as_tuple(lock_data_id),
std::forward_as_tuple()).first;
it->second.oldest_txn_id_ = txn->get_transaction_id();
}
auto &lock_request_queue = it->second;
for (auto &lock_request: lock_request_queue.request_queue_) {
// 该事务上的锁请求队列上已经有互斥锁了,加锁成功
if (lock_request.txn_id_ == txn->get_transaction_id()) {
assert(lock_request.granted_);
if (lock_request.lock_mode_ == LockMode::EXCLUSIVE) {
return true;
}
// 如果当前记录没有其他事务在读,升级写锁
if (lock_request.lock_mode_ == LockMode::SHARED && lock_request_queue.request_queue_.size() == 1) {
lock_request.lock_mode_ = LockMode::EXCLUSIVE;
lock_request_queue.group_lock_mode_ = GroupLockMode::X;
lock_request_queue.shared_lock_num_ = 0;
return true;
}
assert(lock_request.lock_mode_ == LockMode::SHARED);
// 整个队列的时间戳不一定严格降序,需比较其中最老的事务id,用一个 oldest_txn_id_ 变量来维护,且等待队列中的处于等待的当前事务不可能还会申请其他锁了(阻塞)
// 无论有没有得到锁都要先进入等待队列,得到锁后 granted_ 置真
if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {
// Younger transaction requests the lock, abort the current transaction
throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);
}
lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();
lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE);
std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);
auto &&cur = lock_request_queue.request_queue_.begin();
// 通过条件:当前请求之前没有任何已授权的请求
lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {
for (auto &&it = lock_request_queue.request_queue_.begin();
it != lock_request_queue.request_queue_.end(); ++it) {
if (it->txn_id_ != txn->get_transaction_id()) {
if (it->granted_) {
return false;
}
} else {
cur = it;
break;
}
}
return true;
});
cur->granted_ = true;
lock_request_queue.group_lock_mode_ = GroupLockMode::X;
txn->get_lock_set()->emplace(lock_data_id);
ul.release();
return true;
}
}
// 如果其他事务有其他锁,加锁失败(no-wait)
if (lock_request_queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {
if (txn->get_transaction_id() > lock_request_queue.oldest_txn_id_) {
// Younger transaction requests the lock, abort the current transaction
throw TransactionAbortException(txn->get_transaction_id(), AbortReason::DEADLOCK_PREVENTION);
}
lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();
lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE);
std::unique_lock<std::mutex> ul(latch_, std::adopt_lock);
auto &&cur = lock_request_queue.request_queue_.begin();
// 通过条件:当前请求之前没有任何已授权的请求
lock_request_queue.cv_.wait(ul, [&lock_request_queue, txn, &cur]() {
for (auto &&it = lock_request_queue.request_queue_.begin(); it != lock_request_queue.request_queue_.end();
++it) {
if (it->txn_id_ != txn->get_transaction_id()) {
if (it->granted_) {
return false;
}
} else {
cur = it;
break;
}
}
return true;
});
cur->granted_ = true;
lock_request_queue.group_lock_mode_ = GroupLockMode::X;
txn->get_lock_set()->emplace(lock_data_id);
ul.release();
return true;
}
if (txn->get_transaction_id() < lock_request_queue.oldest_txn_id_) {
lock_request_queue.oldest_txn_id_ = txn->get_transaction_id();
}
// 将当前事务锁请求加到锁请求队列中
lock_request_queue.request_queue_.emplace_back(txn->get_transaction_id(), LockMode::EXCLUSIVE, true);
// 更新锁请求队列锁模式为排他锁
lock_request_queue.group_lock_mode_ = GroupLockMode::X;
// 添加行级排他锁
txn->get_lock_set()->emplace(lock_data_id);
return true;
}
A:为什么lock_table_一直在emplace但没释放?
Q:lock_table_的value(LockRequestQueue)里面的request_queue(锁请求队列)是会在unlock时erase掉里面的元素的。但因为考虑同一个行(LockDataId)还可能会加锁,所以没有释放lock_table_的key
相关代码:
/**
* @description: 释放锁
* @return {bool} 返回解锁是否成功
* @param {Transaction*} txn 要释放锁的事务对象指针
* @param {LockDataId} lock_data_id 要释放的锁ID
*/
bool LockManager::unlock(Transaction *txn, const LockDataId &lock_data_id) {
std::lock_guard lock(latch_);
auto &txn_state = txn->get_state();
// 事务结束,不能再解锁
if (txn_state == TransactionState::COMMITTED || txn_state == TransactionState::ABORTED) {
return false;
}
if (txn_state == TransactionState::GROWING) {
txn_state = TransactionState::SHRINKING;
}
std::unordered_map<LockDataId, LockRequestQueue>::iterator it;
std::unordered_map<IndexMeta, std::unordered_map<LockDataId, LockRequestQueue> >::iterator ii;
if (lock_data_id.type_ == LockDataType::GAP) {
ii = gap_lock_table_.find(lock_data_id.index_meta_);
if (ii == gap_lock_table_.end()) {
return true;
}
it = ii->second.find(lock_data_id);
if (it == ii->second.end()) {
return true;
}
} else {
it = lock_table_.find(lock_data_id);
if (it == lock_table_.end()) {
return true;
}
}
auto &lock_request_queue = it->second;
auto &request_queue = lock_request_queue.request_queue_;
auto &&request = request_queue.begin();
for (; request != request_queue.end(); ++request) {
if (request->txn_id_ == txn->get_transaction_id()) {
break;
}
}
if (request == request_queue.end()) {
return true;
}
// 一个事务可能对某个记录持有多个锁,S,IX
do {
// 维护锁请求队列
if (request->lock_mode_ == LockMode::SHARED || request->lock_mode_ == LockMode::S_IX) {
--lock_request_queue.shared_lock_num_;
}
if (request->lock_mode_ == LockMode::INTENTION_EXCLUSIVE || request->lock_mode_ == LockMode::S_IX) {
--lock_request_queue.IX_lock_num_;
}
// 删除该锁请求
request_queue.erase(request);
request = request_queue.begin();
for (; request != request_queue.end(); ++request) {
if (request->txn_id_ == txn->get_transaction_id()) {
break;
}
}
} while (request != request_queue.end());
// 维护队列锁模式,为空则无锁
// TODO 擦除锁表
if (request_queue.empty()) {
lock_request_queue.group_lock_mode_ = GroupLockMode::NON_LOCK;
lock_request_queue.oldest_txn_id_ = INT32_MAX;
// 唤醒等待的事务
lock_request_queue.cv_.notify_all();
// if (lock_data_id.type_ == LockDataType::GAP) {
// // 相交的间隙锁也得唤醒
// for (auto &[data_id, queue]: ii->second) {
// // if (queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {
// if (lock_data_id.gap_.isCoincide(data_id.gap_)) {
// queue.cv_.notify_all();
// }
// // }
// }
// }
return true;
}
// 否则找到级别最高的锁和时间戳最小的事务
auto max_lock_mode = LockMode::INTENTION_SHARED;
for (auto &request: request_queue) {
max_lock_mode = std::max(max_lock_mode, request.lock_mode_);
if (request.txn_id_ < lock_request_queue.oldest_txn_id_) {
lock_request_queue.oldest_txn_id_ = request.txn_id_;
}
}
lock_request_queue.group_lock_mode_ = static_cast<GroupLockMode>(static_cast<int>(max_lock_mode) + 1);
// 唤醒等待的事务
lock_request_queue.cv_.notify_all();
// if (lock_data_id.type_ == LockDataType::GAP) {
// // 相交的锁表也得唤醒
// for (auto &[data_id, queue]: ii->second) {
// // if (queue.group_lock_mode_ != GroupLockMode::NON_LOCK) {
// if (lock_data_id.gap_.isCoincide(data_id.gap_)) {
// queue.cv_.notify_all();
// }
// // }
// }
// }
return true;
}
A:现在问题是占内存,不就应该erase掉lock_table_里的东西吗(否则哈希表过大)?
Q:但erase好像并没有释放内存,只是还给内存池了
A:还给内存池就可以了啊。不释放给OS也只会导致其它程序无法分配,你自己的程序不受影响。但现在是你自己的程序内存占的过多,所以不是链接里的那个问题
实现异步写盘后速度反而更慢
异步写盘代码:
DiskScheduler::DiskScheduler(DiskManager *disk_manager) : disk_manager_(disk_manager) {
// TODO(P1): remove this line after you have implemented the disk scheduler API
// throw NotImplementedException(
// "DiskScheduler is not implemented yet. If you have finished implementing the disk scheduler, please remove the
// " "throw exception line in `disk_scheduler.cpp`.");
// Spawn the background thread
stop_thread_ = false;
background_thread_.emplace([&] { StartWorkerThread(); });
// background_thread_->detach();
}
DiskScheduler::~DiskScheduler() {
// Put a `std::nullopt` in the queue to signal to exit the loop
// for (auto &[_, req] : request_queue_) {
// std::ignore = _;
// req.Put(std::nullopt);
// }
stop_thread_ = true;
if (background_thread_.has_value()) {
if (background_thread_.value().joinable()) {
background_thread_->join();
}
}
}
void DiskScheduler::Schedule(DiskRequest r) { request_queue_[r.page_id_.page_no].Put(std::make_optional(std::move(r))); }
void DiskScheduler::ScheduleRead(Page &page) {
auto e = request_queue_[page.get_page_no()].TryReadFromQueue();
if (e.has_value() && e.value().has_value()) {
auto &last_req = e.value();
memcpy(page.get_data(), last_req->data_, PAGE_SIZE);
} else {
disk_manager_->read_page(page.get_page_id().fd, page.get_page_id().page_no, page.get_data(), PAGE_SIZE);
request_queue_[page.get_page_no()].LoadBuffer(DiskRequest(page.get_page_id(), page.get_data()));
}
}
void DiskScheduler::StartWorkerThread() {
while (!stop_thread_) {
for (auto &[_, req] : request_queue_) {
std::ignore = _;
while (auto e = req.Get()) {
if (!e.has_value()) {
break;
}
auto &r = e.value();
disk_manager_->write_page(r.page_id_.fd, r.page_id_.page_no, r.data_, PAGE_SIZE);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
}
}
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
}
}
外部调度代码:
/**
* @description: 更新页面数据, 如果为脏页则需写入磁盘,再更新为新页面,更新page元数据(data, is_dirty, page_id)和page table
* @param {Page*} page 写回页指针
* @param {PageId} new_page_id 新的page_id
* @param {frame_id_t} new_frame_id 新的帧frame_id
*/
void BufferPoolInstance::update_page(Page *page, PageId new_page_id, frame_id_t new_frame_id) {
// Todo:
// 1 如果是脏页,写回磁盘,并且把dirty置为false
// 2 更新page table
// 3 重置page的data,更新page id
if (page->is_dirty()) {
#ifdef ENABLE_LOGGING
// 置换出脏页且 lsn 大于 persist 时需要刷日志回磁盘
if (log_manager_ != nullptr && page->get_page_lsn() > log_manager_->get_persist_lsn()) {
log_manager_->flush_log_to_disk();
}
#endif
#ifdef ENABLE_ASYNC_DISK
auto it = disk_scheduler_.find(page->get_page_fd());
if (it == disk_scheduler_.end()) {
it = disk_scheduler_.emplace(page->get_page_fd(), disk_manager_).first;
}
it->second.Schedule({page->get_page_id(), page->get_data()});
#else
disk_manager_->write_page(page->get_page_id().fd, page->get_page_id().page_no, page->get_data(), PAGE_SIZE);
#endif
page->is_dirty_ = false;
}
page_table_.erase(page->get_page_id());
page_table_[new_page_id] = new_frame_id;
// TODO new_page 需要 reset,update 不需要,因为会被覆盖
page->reset_memory();
page->id_ = new_page_id;
page->pin_count_ = 0;
}
A:这个disk_scheduler_里大概有多少个元素?
Q:一个表对应一个disk_scheduler_,一个disk_scheduler_是一个线程。现在跑的这个test case是有9个表
Q:TPCC大部分跑的sql都是过索引的,buffer pool页面命中率很高,需要置换的次数很少,也就某个sql会有20次来次,其他基本0次或者个位数
A:那本来(实现异步写盘之前)是偶尔write_page卡一下,现在是九个线程一直sleep 1000,可能对主线程有点影响。建议不要构造DiskScheduler的时候就启动线程,可以等put时候再启动,比如看request数量大于多少之后再创建线程之类的,然后处理完了就关闭。也可以用信号量实现,有任务的时候让系统自动拉起线程,比较轻量,可以参考这个实现:
class AsyncIncreWriteThread : public QThread {
private:
//单例模式,这块不需要懒加载。异步增量写入线程一定需要。
static AsyncIncreWriteThread* asyncIncreWriteThread;
SwitchRecordStore* recordStore;
QSemaphore semaphore;
int32_t batchSize;
atomic_int32_t unInCreWriteCount; //保证并发操作安全 信号量会有强制写入操作,而这个数据可靠
AsyncIncreWriteThread(const int32_t& size){
this->recordStore = new SwitchRecordStore();
this->batchSize = size;
this->unInCreWriteCount = 0;
}
//析构函数里面都是同步释放 仅在最头部控制异步
~AsyncIncreWriteThread(){
MemUtil::clearPtrMem(recordStore);
}
void addComitInfo(const vector<pair<ResourceInfo, vector<Record*>>>& upDateInfo, const vector<pair<ResourceInfo, vector<IndexRecord*>>>& indexUpDateInfo, const vector<pair<ResourceInfo, vector<uint64_t>>>& bitMapUpDateInfo){
const int32_t& recCount = recordStore->addComitInfo(upDateInfo, indexUpDateInfo, bitMapUpDateInfo);
//依据的是record个数
unInCreWriteCount.fetch_add(recCount); //先更新个数再释放信号量
semaphore.release(recCount);
}
void forceIncreWriteToDisk(){
semaphore.release(batchSize); //释放所需的信号量 强制开始异步写入
while(unInCreWriteCount.load() != 0); //非阻塞等待完成 如果本来就为0会立刻返回
}
bool isResUnIncreWriteToDisk(const ResourceInfo& resInfo){
return recordStore->isResUnIncreWriteToDisk(resInfo);
}
protected:
virtual void run(){
while(true){
//这里获取信号量不严格 外界调用相关函数释放所需信号量会强制刷进磁盘
semaphore.acquire(batchSize);
if(unInCreWriteCount.load() == 0){ continue; } //无需异步写入 跳过直接等待下一次异步写
//根据记录写磁盘 并更新同步进度
IOUtil::updateSyncProgress(recordStore->increWriteToDisk() * Record::getRecordSize());
unInCreWriteCount.store(0);
}
}
}
缓冲池分片不均衡
Q:缓冲池在事务执行过程中有几千万次fetchpage的函数调用,目前我用了一把大锁来解决并发,效率比较低,profile发现缓冲池几乎不发生写盘,读盘和等锁占了比较多的时间,所以想用缓冲池分片的方法来缓解等锁的问题(原先只有一个缓冲池/一个锁,现在开成多个)。但是现在通过哈希来分配缓冲池并没有实现很好的负载均衡,比如两个缓冲池,一个缓冲池被占满了,一个缓冲池只用了1/3
相关代码:
Page *BufferPoolManager::new_page(PageId *page_id) {
*page_id = {page_id->fd, disk_manager_->allocate_page(page_id->fd)};
return instances_[get_instance_no(*page_id)]->new_page(page_id);
}
PageId定义:
struct PageId {
int fd; // Page所在的磁盘文件开启后的文件描述符, 来定位打开的文件在内存中的位置
page_id_t page_no = INVALID_PAGE_ID;
friend bool operator==(const PageId &x, const PageId &y) { return x.fd == y.fd && x.page_no == y.page_no; }
friend bool operator!=(const PageId &x, const PageId &y) { return !(x == y); }
bool operator<(const PageId &x) const {
if (fd < x.fd) return true;
return page_no < x.page_no;
}
std::string toString() {
return "{fd: " + std::to_string(fd) + " page_no: " + std::to_string(page_no) + "}";
}
// inline int64_t Get() const {
// return (static_cast<int64_t>(fd << 16) | page_no);
// }
};
// PageId的自定义哈希算法, 用于构建unordered_map<PageId, frame_id_t, PageIdHash>
// struct PageIdHash {
// size_t operator()(const PageId &x) const { return (x.fd << 16) | x.page_no; }
// };
namespace std {
template<>
struct hash<PageId> {
size_t operator()(const PageId &obj) const {
// return (obj.fd << 16) | obj.page_no;
std::size_t h1 = std::hash<int>{}(obj.fd);
std::size_t h2 = std::hash<page_id_t>{}(obj.page_no);
return (h1 << 1) ^ (h2);
}
};
}
Q:我们的场景是,fd变化很小,pageno变化很大。或许应该设计一个和pageno相关性比较大的哈希函数
A:这个是一个表一个文件(fd)的吗?
Q:对,9个表9个fd,表有大有小
A:现在这个哈希函数是h2左移之后都变成偶数了,所以如果%2的话,hash结果奇偶是否均匀就看fd的奇偶是否均匀。如果是少量大表多数小表,且大表的fd是偶数,那就是第一个pool的元素多
Bison ast::SemValue拷贝占用大量时间
A:分析了一下,速度慢是因为这个解析器是所有连接共享的,所以总在等待锁。需要改成一个连接(一个线程)一个解析器。首先对flex和bison的配置做修改,主要是需要加%option reentrant和%param {void *xx}这两个配置:
这样改完之后,生成的解析器函数里都会多出来一个yyscanner参数,不同线程传不同的这个就行了:
每个线程一个scanner之后,parse_tree也需要改成多个:
Q:快了不少,但是火焰图几乎没变?
A:火焰图统计的是次数,不是时间
Q:每隔相同时间抽样一次,抽样数量也可以反映时间吧
A:开了多线程之后单次采样给它加的数量多了,所以实际时间和图里的数量应该是随线程数有个倍数关系的
源码仓库
Kosthi/RMDB-2024: CSCC2024数据库管理系统赛道 一等奖参赛作品 (2/325) (github.com)
同学正在找工作!!哪位正在招数据库系统研发快去私信他嗷!!他狠厉害的