Impala3.4源码阅读笔记(三)data-cache的Store实现

news2025/1/23 4:47:01

前言

本文为笔者个人阅读Apache Impala源码时的笔记,仅代表我个人对代码的理解,个人水平有限,文章可能存在理解错误、遗漏或者过时之处。如果有任何错误或者有更好的见解,欢迎指正。

正文

本文顺承前文Impala3.4源码阅读笔记(二) data-cache的Lookup实现继续分析实现Store的具体流程和细节,其中使用的一些类和对象前文有讲不再赘述,因此建议先阅读完前文后再继续。
Store的实现相较于Lookup要复杂得多,因为其中牵扯了缓存条目的插入与逐出、缓存文件的写入和多线程并发写的问题。还是那张图:
在这里插入图片描述

我们还是沿着执行路径进行分析,先从DataCache::Partition::Store入手,首先会查找缓存元数据中缓存键是否存在:

Cache::UniqueHandle handle(meta_cache_->Lookup(key, Cache::EXPECT_IN_CACHE));
if (handle.get() != nullptr) {
    if (HandleExistingEntry(key, handle, buffer, buffer_len)) return false;
}

只有缓存键不存在或者buffer_len大于原来的缓存长度,HandleExistingEntry才会返回true继续之后的流程。之后会进行并发数和重复待缓存的检查:

const bool exceed_concurrency =
    pending_insert_set_.size() >= FLAGS_data_cache_write_concurrency;
if (exceed_concurrency || pending_insert_set_.find(key.ToString()) != pending_insert_set_.end()) {
    ...
    return false;
}

其中pending_insert_set_是一个字符串集合,保存了待插入或正在插入的CacheKey字符串,如果其大小超过data_cache_write_concurrency限制或CacheKey已存在(说明别的线程正在插入该条目)则本次插入会被放弃。通过了以上检查之后,会调用CacheFile::Allocate在当前缓存文件里申请一个插入位置并将key移进pending_insert_set_准备插入:

cache_file = cache_files_.back().get();
insertion_offset = cache_file->Allocate(charge_len, partition_lock);
...
pending_insert_set_.emplace(key.ToString());

然后是一段比较有意思的代码,设置作用域退出触发器:

auto remove_from_pending_set = MakeScopeExitTrigger([this, &key]() {
    std::unique_lock<SpinLock> partition_lock(lock_);
    pending_insert_set_.erase(key.ToString());
});
return InsertIntoCache(key, cache_file, insertion_offset, buffer, buffer_len);

作用域退出触发器工作原理类似于lock_guard,我们可以传递一个函数给MakeScopeExitTrigger来设置作用域退出触发器,当触发器对象被析构时会执行该函数。此处我们传递一个lambda函数来设置触发器,Store执行完时会离开remove_from_pending_set的所属作用域,然后执行该lambda函数,该函数的功能很简单就是将执行完插入的CacheKey移出pending_insert_set_Store最后就是调用InsertIntoCache完成插入,InsertIntoCache首先会向meta_cache_申请一块空间存放Handle

Cache::UniquePendingHandle pending_handle(
    meta_cache_->Allocate(key, sizeof(CacheEntry), charge_len));
if (UNLIKELY(pending_handle.get() == nullptr)) return false;

然后是将数据写入缓存文件,是的,在逐出旧数据之前会先写入新数据,所以缓存分区大小的限制会被暂时忽略:

if (UNLIKELY(!cache_file->Write(insertion_offset, buffer, buffer_len))) {
    return false;
}

此处文件写入的WriteLookup中读文件的Read方法一样最终都调用了系统API完成,因此不再展开,我们继续看接下来的缓存条目插入:

CacheEntry entry(cache_file, insertion_offset, buffer_len, checksum);
memcpy(meta_cache_->MutableValue(&pending_handle), &entry, sizeof(CacheEntry));
Cache::UniqueHandle handle(meta_cache_->Insert(std::move(pending_handle), this));

首先构造了缓存条目,然后直接用memcpy将条目直接写入handle,在Lookup实现的介绍中说过可以将handle理解为键值对,MutableValue则会返回handle中值位置的写入地址。然后就是关键的缓存元数据插入了,Insert会将上文构造好的handle插入缓存元数据,我们先来看Insert方法的定义:

UniqueHandle Insert(UniquePendingHandle handle, Cache::EvictionCallback* eviction_callback) override {
    HandleBase* h_in = reinterpret_cast<HandleBase*>(DCHECK_NOTNULL(handle.release()));
    HandleBase* h_out = shards_[Shard(h_in->hash())]->Insert(h_in, eviction_callback);
    return UniqueHandle(reinterpret_cast<Cache::Handle*>(h_out), Cache::HandleDeleter(this));
}

可以看见该方法还需要传入一个Cache::EvictionCallback对象的指针,EvictionCallback是一个接口类,只定义了一个回调函数,meta_cache_会在逐出缓存条目时调用该回调函数:

class EvictionCallback {
    public:
    virtual void EvictedEntry(Slice key, Slice value) = 0;
    virtual ~EvictionCallback() = default;
};

Partition类继承了该接口并实现了EvictedEntry方法,该方法中包括了缓存文件的打洞操作:

void DataCache::Partition::EvictedEntry(Slice key, Slice value) {
  ...
  entry.file()->PunchHole(entry.offset(), eviction_len);
  ...
}

PunchHole可以将缓存文件中的一段区间挖掉并将存储空间还给操作系统,通过这种方法删除缓存文件中被逐出的数据片段。PunchHole也使用系统API fallocate实现文件打洞功能,此处不再展开。因为逐出缓存条目时需要回调该函数,所以meta_cache_->Insert(std::move(pending_handle), this)还需要传入当前对象(Partition对象)指针this。meta_cache_->Insert又调用了RLCacheShard::InsertRLCacheShard类通过RLHandle实现了一个循环双链表并包括一个哈希表HandleTable,支持FIFO和LRU两种缓存置换策略,RLCacheShard::Insert比较长,我们只看一步步看其关键部分,首先是插入双链表和哈希表:

RLHandle* to_remove_head = nullptr;
RL_Append(handle);
RLHandle* old = static_cast<RLHandle*>(table_.Insert(handle));
if (old != nullptr) {
    RL_Remove(old);
    if (Unref(old)) {
        old->next = to_remove_head;
        to_remove_head = old;
    }
}

将被逐出的条目会先被串成一个链表,to_remove_head作为链表头结点。RL_Append会将条目插入循环双链表,table_.Insert(handle)将条目插入哈希表,如果哈希表已存在该条目则会将其替换然后返回旧条目,否则返回nullptr。如果有返回旧条目需要将其删除,包括通过RL_Remove将其移出循环双链表、Unref将其引用计数减一,若之后引用计数只剩一还需将其链接到to_remove_head准备逐出。然后就是缓存容量的限制部分:

while (usage_ > capacity_ && rl_.next != &rl_) {
    RLHandle* old = rl_.next;
    RL_Remove(old);
    table_.Remove(old->key(), old->hash());
    if (Unref(old)) {
        old->next = to_remove_head;
        to_remove_head = old;
    }
}

其中rl_是循环双链表的虚头结点,其next结点为最旧的结点,prev为最新的结点,新旧根据置换策略FIFO或LRU决定。当使用量超过容量usage_ > capacity_时会一直循环,不断删除旧条目,删除过程与上文相同。最后就是旧条目的逐出:

while (to_remove_head != nullptr) {
    RLHandle* next = to_remove_head->next;
    FreeEntry(to_remove_head);
    to_remove_head = next;
}

此处调用了FreeEntry方法:

void RLCacheShard<policy>::FreeEntry(RLHandle* e) {
    DCHECK_EQ(e->refs.load(std::memory_order_relaxed), 0);
    if (e->eviction_callback) {
    e->eviction_callback->EvictedEntry(e->key(), e->value());
    }
    UpdateMemTracker(-static_cast<int64_t>(e->charge()));
    Free(e);
}

可以发现其调用了先前传入的回调函数EvictedEntry来将数据从缓存文件逐出。完成了旧条目的逐出之后,Store的过程也就结束了。

最后还有一个问题,置换策略FIFO或LRU是如何实现的?实际上这两个策略唯一的区别就是如何定义缓存条目的新旧,FIFO的逻辑是越早插入的越旧,而LRU的逻辑是越早插入且越久没有被Lookup的越旧。RLCacheShard内的循环双链表本身就是按照插入顺序排序的,所以FIFO策略实际上不需要额外的实现,链表第一个结点就是最旧的,而LRU策略则需要在Lookup后将Lookup访问的结点移到链表尾部让它成为最新的,在RLCacheShard::Lookup中可以看到:

e = static_cast<RLHandle*>(table_.Lookup(key, hash));
if (e != nullptr) {
    e->refs.fetch_add(1, std::memory_order_relaxed);
    RL_UpdateAfterLookup(e);
}

在Lookup最后执行了RL_UpdateAfterLookup来更新结点位置,我们来看其实现:

template<>
void RLCacheShard<Cache::EvictionPolicy::FIFO>::RL_UpdateAfterLookup(RLHandle* /* e */) {
}

template<>
void RLCacheShard<Cache::EvictionPolicy::LRU>::RL_UpdateAfterLookup(RLHandle* e) {
    RL_Remove(e);
    RL_Append(e);
}

对应FIFO和LRU,RL_UpdateAfterLookup有两个特化模板,其中FIFO不需要更新结点位置,所以是个空函数,而LRU的RL_UpdateAfterLookup也十分简单,移出结点重新插入到链表尾就完成了结点位置更新。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.coloradmin.cn/o/721934.html

如若内容造成侵权/违法违规/事实不符,请联系多彩编程网进行投诉反馈,一经查实,立即删除!

相关文章

mac电脑上,webm格式怎么转换成mp4?

mac电脑上&#xff0c;webm格式怎么转换成mp4&#xff1f;webm格式的视频也是最近几年也越来越多的&#xff0c;小编最近就不止一次的下载到过webm格式的视频&#xff0c;很多小伙伴肯定对它还并不是很了解&#xff0c;webm是由谷歌公司所提出以及开发出来的视频文件格式&#…

matlab读取STK生成的报告

一、STK 和 Matlab的生成的图片对比 &#xff08;一&#xff09;STK图片 &#xff08;二&#xff09;Matlab图片 &#xff08;三&#xff09;STK生成的报表数据 "Time (UTCG)","Azimuth (deg)","Elevation (deg)","Range (km)" 20 J…

编译Android平台的OpenCV库并启用OpenCL及Contrib

1.下载好OpenCV与OpenCV_Contirb 版本: 4.7 编译主机系统: Ubuntu 20.04 LTS 准备环境与工具: ANDRIOD SDK 与 NDK ,CMAKE ,NINJA ,GCC,G++ ,MAKE 开始编译: ../opencv/platforms/android/build_sdk.py --extra_modules_path=../opencv_contrib/modules --no_samples_bu…

坚固型3DMAG™ A31315LOLATR-XZ-S-SE-10、A31315LOLATR-XY-S-AR-10霍尔效应磁性位置传感器IC

A31315 3D磁性位置传感器IC是完全集成的坚固型3DMAG™ 霍尔效应磁性位置传感器IC&#xff0c;主要用于支持汽车、工业和消费类应用中的各种非接触式旋转和线性位置测量。 A31315传感器IC集成了垂直和平面霍尔效应元件&#xff0c;可检测三个磁场分量&#xff08;X、Y和Z&#x…

spring如何使用junit进行测试

第一步maven的pom.xml引入坐标&#xff1a; <dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version></dependency> 第二步编写测试方法&#xff1a; 第三步 定义scope类型

gpt4实现对摄像头帧缓冲区图像的LAB阈值选择界面(python-opencv)

代码全是GPT4写的&#xff0c;我就提出Prompt和要改的地方而已。 图形界面效果 代码 import cv2 import numpy as np import time from tkinter import * from PIL import Image, ImageTkclass App:def __init__(self, window, window_title, video_source0):self.window wi…

【面试】美团面试真题和答案

文章目录 前言1.线程池有几种实现方式&#xff1f;2.线程池的参数含义&#xff1f;3.锁升级的过程&#xff1f;4.i 如何保证线程安全&#xff1f;5.HashMap和ConcurrentHashMap有什么区别&#xff1f;6.Autowired和Resource区别&#xff1f;7.说说常用的设计模式8.Redis为什么这…

react中使用 websocket

react中使用 websocket&#xff0c;使用socket.io库 参考官网地址&#xff1a; https://socket.io/zh-CN/docs/v4/client-installation/#from-npm 1.安装 npm install socket.io-client2.示例代码 import React, { useEffect, useRef, useState } from "react"; i…

Mysql数据库(四) Mysql命令行客户端数据条件查询、排序、分组、聚合函数

目录 一、where条件查询 ① 查询年龄大于/等于18岁的学生记录。 ② 查询名字以张开头的学生记录。 ③ 范围查询 二、order by 排序 ① 按照name列升序排序 ② 按照name列降序排序 ③ 先按 name 降序&#xff0c;再按 age 升序排序 ④ 可以使用表达式或函数来进行排序 …

【Nginx】Nginx负载均衡

Nginx 负载均衡 1.Nginx 负载均衡1.1 官方文档1.2 默认方式&#xff1a;轮询&#xff08;round-robin&#xff09;1.3 链接最少、空闲&#xff08;least-connected&#xff09;1.4 会话持续&#xff0c;也叫ip 哈希&#xff08;Session persistence&#xff09;1.5 服务器权重&…

前端学习——CSS3

新增长度单位 <!DOCTYPE html> <html lang"zh-CN"> <head><meta charset"UTF-8"><title>Document</title><style>* {margin: 0;padding: 0;}.box1 {width: 200px;height: 200px;background-color: deepskyblue;…

JMeter元件

【测试计划–线程组/Threads(Users)】 模拟大量用户负载的情况&#xff0c;线程组可以设置运行的线程数(多少线程就代表多少用户)&#xff1b; 【测试计划–线程组–取样器/sampler】 用来模拟用户操作&#xff0c;向服务器发出http请求、Webservice请求、java请求等&#xf…

采用VMD按照某一坐标轴旋转坐标结构

关注 M r . m a t e r i a l , \color{Violet} \rm Mr.material\ , Mr.material , 更 \color{red}{更} 更 多 \color{blue}{多} 多 精 \color{orange}{精} 精 彩 \color{green}{彩} 彩&#xff01; 主要专栏内容包括&#xff1a; †《LAMMPS小技巧》&#xff1a; ‾ \textbf…

【设计模式】第七章:代理模式详解及应用案例

系列文章 【设计模式】七大设计原则 【设计模式】第一章&#xff1a;单例模式 【设计模式】第二章&#xff1a;工厂模式 【设计模式】第三章&#xff1a;建造者模式 【设计模式】第四章&#xff1a;原型模式 【设计模式】第五章&#xff1a;适配器模式 【设计模式】第六章&…

二叉树 — 折纸问题

题目&#xff1a; 一道Google真实出现过的面试题 将一段纸条放在桌上&#xff0c;然后从纸条下边向上方对折1次&#xff0c;压出折痕后展开&#xff0c;此时折痕是凹下去的&#xff08;称为凹折痕&#xff09;&#xff0c;即折痕凸起的方向指向纸条的背面。如果从纸条的下边向上…

CodeForces..碰撞字符.[简单].[遍历求最大连续]

题目描述&#xff1a; 题目解读&#xff1a; 输入仅由<和>组成的字符串s&#xff0c;给定数组a&#xff0c;a满足 a[i] 和 a[i1] 之间的比较运算符为 s[i] 给出字符串a中不同字符的最小个数。 解题思路&#xff1a; 判断字符<和>的最大连续次数即可&#xff0c;…

MySQL注入-SQLi-Less1笔记

前置知识点&#xff1a; 1. SELECT 1,2,3 用于查询数据通道的方式 例如Less-1中,Secury数据库中的users表结构如下&#xff0c;可以看到有散列&#xff0c;当用户在页面输入id的时候&#xff0c;会查询到对应的散列数据也就是<id>/<username>/<password>&a…

复健练习1—取模与快速幂

复建练习1—取模与快速幂 A&#xff0c;poj3070 没啥可说的&#xff0c;就是裸的矩阵快速幂 #include <algorithm> #include <iostream> #include <cstring> #include <cmath> #include <iomanip>using namespace std; typedef long long ll;…

微服务: 03-rabbitmq在springboot中如何使用(下篇)

目录 前言: 上文传送 4.六大模式实际操作(续) 4.4 路由模式: ---> 4.4.1 消费者配置类 ---> 4.4.2 消费者代码 --->4.4.3 生产者代码 4.5 主题模式: (路由升级版) ---> 4.5.1 消费者配置类 ---> 4.5.2 消费者代码 ---> 4.5.3 生产者代码 ---&g…

【大语言模型】15分钟快速掌握LangChain以及ChatGLM

10分钟快速掌握LangChain LangChain简介LangChain中的核心概念1. Components and Chains2. Prompt Templates and Values3. Example Selectors4. Output Parsers5. Indexes and Retrievers6. Chat Message History7. Agents and Tookits LangChain的代码结构1. LangChain中提供…