Introduction
本文将介绍如何使用LRU和FIFO实现页面置换的模拟(Python实现),并使用缺页率进行算法的评价。
Requirement
先附上具体的要求:
【实验目的】
(1)了解内存分页管理策略
(2)掌握调页策略
(3)掌握一般常用的调度算法
(4)学会各种存储分配算法的实现方法。
(5)了解页面大小和内存实际容量对命中率的影响。
【实验要求】
(1)采用页式分配存储方案,通过分别计算不同算法的命中率来比较算法的优劣,同时也考虑页面大小及内存实际容量对命中率的影响;
(2)实现LRU 算法 (Least Recently) 、 FIFO 算法 (First IN First Out)的模拟;
【实验原理】
分页存储管理将一个进程的逻辑地址空间分成若干大小相等的片,称为页面或页。
在进程运行过程中,若其所要访问的页面不在内存而需把它们调入内存,但内存已无空闲空间时,为了保证该进程能正常运行,系统必须从内存中调出一页程序或数据,送入磁盘的对换区中。但应将哪 个页面调出,须根据一定的算法来确定。通常,把选择换出页面的算法称为页面置换算法(Page_Replacement Algorithms)。
一个好的页面置换算法,应具有较低的页面更换频率。从理论上讲,应将那些以后不再会访问的页面换出,或将那些在较长时间内不会再访问的页面调出。
一、最佳置换算法OPT(Optimal)
它是由Belady于1966年提出的一种理论上的算法。其所选择的被淘汰页面,将是以后永不使用的或许是在最长(未来)时间内不再被访问的页面。采用最佳置换算法,通常可保证获得最低的缺页率。但由于人目前还无法预知一个进程在内存的若干个页面中,哪一个页面是未来最长时间内不再被访问的,因而该算法是无法实现的,便可以利用此算法来评价其它算法。
二、先进先出(FIFO)页面置换算法
这是最早出现的置换算法。该算法总是淘汰最先进入内存的页面,即选择在内存中驻留时间最久的页面予以淘汰。该算法实现简单只需把一个进程已调入内存的页面,按先后次序链接成一个队列,并设置一个指针,称为替换指针,使它总是指向最老的页面。
三、LRU(Least Recently Used)最近最久未使用置换算法
FIFO置换算法性能之所以较差,是因为它所依据的条件是各个页面调入内存的时间,而页面调入的先后并不能反映页面的使用情况。最近最久未使用(LRU)置换算法,是根据页面调入内存后的使用情况进行决策的。由于无法预测各页面将来的使用情况,只能利用“最近的过去”作为“最近的将来”的近似,因此,LRU置换算法是选择最近最久未使用的页面予以淘汰。该算法赋予每个页面一个访问字段,用来记录一个页面自上次被访问以来所经历的时间t,,当须淘汰一个页面时,选择现有页面中其t值最大的,即最近最久未使用的页面予以淘汰。
Solution
为了更好地处理数据,本例程封装了一个架构,对LRU、FIFO调度算法、内存等对象进行了对象化封装,详细架构如下图所示:
其中,PageList相当于一个页表,本例程中并没有引入多个页表进行调度配置,因为如果引入多个页表,则相当入还要引入PCB的进程调度,这不是本例程的重点,因此,本例程只采用一个页表进行调度,页表事实上就是一个PageList,里面封装的为单个page对象,如下所示。
StorageDispatcherAdapter为内存调度适配器,而LRUDispatcher和FIFODispatcher都是StorageDispatcherAdapter适配器的实现,通过实现LRUDispatcher或者FIFODispatcher算法,可以实现调度算法的无缝切换。
在分页存储管理中,因为引入了置换算法,所以在内存满了之后会存在某一些页面会被调出至外存中,因此在这里的内存模型本人参考了王道考研网课的一张PPT演示图,如下所示。
本例程将存储空间分为内存(本例程中为RAM)和磁盘空间,磁盘空间分为swap和文件区。当某一个页面需要调入内存使用时,先从swap和file中查找是否存在该页面,如果存在该页面,则使用页面置换算法将该页面置换进内存中,如果在swap和file中没有找到,则新建该分页存入内存。
因为本例程是以页面为基础的存储,因此RAM、DiskFile、DiskSwap都继承了StorageMedium类,继承了StorageMedium类之后,页面将以页面为最小单位进行存储介质间的置换和调度。
RAM和Disk都归StorageManger管理,最后由StorageDispatcherAdapter对StorageManger中的存储介质进行调度,StorageDispatcherAdapter中会记录系统运行过程中的调度情况。
最后就是实现LRU和FIFO的调度,StorageDispatcherAdapter中使用put()函数将页面传入StorageMangement管理的存储介质中。
LRU算法实现如下:
class LRUDispatcher(StorageDispatcherAdapter):
""" LRUDispatcher is a dispatch util which can help Memory dispatch page and record dispatch process. """
def __init__(self):
""" initialize some dispatch process info """
self.page_queue: deque[int] = deque()
self.missing_page_num = 0
self.enter_ram_record: List[int] = []
def put(self, storage_manager: StorageManager, page: Page):
""" dispatch StorageManager's storage """
storage_manager.print_ram()
logging.debug("[page_queue] {0}".format(str(self.page_queue)))
# judge page whether in ram
if storage_manager.page_is_in_ram(page):
logging.debug('page {0} in ram'.format(page.page_id))
# fresh page_queue
self.page_queue.remove(page.page_id)
self.page_queue.appendleft(page.page_id)
# not in ram
else:
logging.debug('page {0} not in ram'.format(page.page_id))
# read from disk (swap and file), swap first
if storage_manager.page_is_in_disk_swap(page):
storage_manager.disk.swap.storage.remove(page.page_id)
elif storage_manager.page_is_in_disk_file(page):
storage_manager.disk.file.storage.remove(page.page_id)
else:
# first dispatch, then distribute space in ram
# ram is full then eliminate the last page
if storage_manager.ram_is_full(self.page_queue):
obsolete_page_id = self.page_queue.pop()
block_id = storage_manager.ram.storage.index(obsolete_page_id)
storage_manager.ram.storage[block_id] = page.page_id
self.page_queue.appendleft(page.page_id)
# store in swap
if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
storage_manager.disk.swap.storage.append(obsolete_page_id)
# if swap is full then store in file of disk
else:
storage_manager.disk.file.storage.append(obsolete_page_id)
self._record_enter(page)
else:
# find the first page_id = -1 and store
for index, cur_page_id in enumerate(storage_manager.ram.storage):
if cur_page_id == -1:
storage_manager.ram.storage[index] = page.page_id
self.page_queue.appendleft(page.page_id)
self._record_enter(page)
return
def _record_enter(self, page: Page):
self.missing_page_num += 1
self.enter_ram_record.append(page.page_id)
FIFO算法实现如下:
class FIFODispatcher(StorageDispatcherAdapter):
""" FIFODispatcher is a dispatch util which can help Memory dispatch page and record dispatch process."""
def __init__(self):
""" initialize some dispatch process info """
self.page_queue: deque[int] = deque()
self.missing_page_num = 0
self.enter_ram_record: List[int] = []
def put(self, storage_manager: StorageManager, page: Page):
""" dispatch StorageManager's storage """
storage_manager.print_ram()
logging.debug("[page_queue] {0}".format(str(self.page_queue)))
# judge page whether in ram
if storage_manager.page_is_in_ram(page):
logging.debug('page {0} in ram'.format(page.page_id))
# not in ram
else:
logging.debug('page {0} not in ram'.format(page.page_id))
# read from disk (swap and file), swap first
if storage_manager.page_is_in_disk_swap(page):
storage_manager.disk.swap.storage.remove(page.page_id)
elif storage_manager.page_is_in_disk_file(page):
storage_manager.disk.file.storage.remove(page.page_id)
else:
# first dispatch, then distribute space in ram
# ram is full then eliminate the last page
if storage_manager.ram_is_full(self.page_queue):
obsolete_page_id = self.page_queue.pop()
block_id = storage_manager.ram.storage.index(obsolete_page_id)
storage_manager.ram.storage[block_id] = page.page_id
self.page_queue.appendleft(page.page_id)
# store in swap
if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
storage_manager.disk.swap.storage.append(obsolete_page_id)
# if swap is full then store in file of disk
else:
storage_manager.disk.file.storage.append(obsolete_page_id)
self._record_enter(page)
else:
# find the first page_id = -1 and store
for index, cur_page_id in enumerate(storage_manager.ram.storage):
if cur_page_id == -1:
storage_manager.ram.storage[index] = page.page_id
self.page_queue.appendleft(page.page_id)
self._record_enter(page)
return
def _record_enter(self, page: Page):
self.missing_page_num += 1
self.enter_ram_record.append(page.page_id)
最后随机构建一些数据集,分别模拟测试LRU和FIFO的,结果如下所示。
事实上,在小范围的数据集测试,使用控制变量法,分别控制RAM大小、页表的长度、页面类型的数量测试了很多次之后,如果以缺页率作为评价指标,本人并没有得出明显的差别。本人推测可能是因为测试数据的问题,测试的数据集中页面id使用随机数生成,但是在实际的存储调度中,可能存在某些页面出现的次数特别多,如一次存储调度任务中50%都是在调度page_id=3的页面,这个时候LRU的优势才会逐渐显现出来,而基于随机数的测试集出现某些页面次数特别多的情况比较小,无法很好地测试出LRU的优势。
最终代码如下,不包含详细的数据集测试:
# -*- coding: utf-8 -*-
# @Time : 2022/12/1 18:06
# @Author : Zeeland
# @File : LRU_and_FIFO.py
# @Software: PyCharm
import random
from typing import List
from collections import deque
import logging
import abc
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(filename)s[line:%(lineno)d] - %(levelname)s: %(message)s')
class Page:
def __init__(self, page_id=None, time=None):
self.page_id = page_id # page id
self.enter_time = time # the time stamp of calling into memory
def __str__(self):
return "[Page] page_id:{0} time:{1}".format(self.page_id, self.enter_time)
class StorageMedium:
__meta_class__ = abc.ABCMeta
def __init__(self):
self.max_storage_space = None
self.storage: List[int] = []
class RAM(StorageMedium):
""" RAM can storage memory block and every block can be dispatched by dispatcher. """
def __init__(self, max_storage_space=4):
super().__init__()
self.max_storage_space = max_storage_space
# store memory block
self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]
class DiskFile(StorageMedium):
def __init__(self, max_storage_space=100):
super().__init__()
self.max_storage_space = max_storage_space
# store memory block
self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]
class DiskSwap(StorageMedium):
def __init__(self, max_storage_space=20):
super().__init__()
self.max_storage_space = max_storage_space
# store memory block
self.storage: List[int] = [-1 for _ in range(self.max_storage_space)]
class Disk:
def __init__(self):
self.swap = DiskSwap()
self.file = DiskFile()
class StorageManager:
def __init__(self, ram_max_size=4):
self.ram = None
self.disk = None
self.fresh_memory(ram_max_size)
def fresh_memory(self, ram_max_size=4):
self.ram = RAM(max_storage_space=ram_max_size)
self.disk = Disk()
def page_is_in_ram(self, page: Page) -> bool:
""" return whether the page is in ram """
try:
self.ram.storage.index(page.page_id)
return True
except ValueError as e:
return False
def page_is_in_disk(self, page: Page) -> bool:
if self.page_is_in_disk_file(page) or self.page_is_in_disk_swap(page):
return True
else:
return False
def page_is_in_disk_swap(self, page: Page) -> bool:
try:
self.disk.swap.storage.index(page.page_id)
return True
except ValueError as e:
return False
def page_is_in_disk_file(self, page: Page) -> bool:
try:
self.disk.file.storage.index(page.page_id)
return True
except ValueError as e:
return False
def ram_is_full(self, page_dispatch_queue: deque = None) -> bool:
if not page_dispatch_queue:
for item in self.ram.storage:
if item == -1:
return False
return True
else:
if len(page_dispatch_queue) == self.ram.max_storage_space:
return True
else:
return False
def print_ram(self):
logging.debug('[ram] ' + str(self.ram.storage))
class StorageDispatcherAdapter:
__meta_class__ = abc.ABCMeta
def put(self, *args, **kwargs):
pass
class FIFODispatcher(StorageDispatcherAdapter):
""" FIFODispatcher is a dispatch util which can help Memory dispatch page and record dispatch process."""
def __init__(self):
""" initialize some dispatch process info """
self.page_queue: deque[int] = deque()
self.missing_page_num = 0
self.enter_ram_record: List[int] = []
def put(self, storage_manager: StorageManager, page: Page):
""" dispatch StorageManager's storage """
storage_manager.print_ram()
logging.debug("[page_queue] {0}".format(str(self.page_queue)))
# judge page whether in ram
if storage_manager.page_is_in_ram(page):
logging.debug('page {0} in ram'.format(page.page_id))
# not in ram
else:
logging.debug('page {0} not in ram'.format(page.page_id))
# read from disk (swap and file), swap first
if storage_manager.page_is_in_disk_swap(page):
storage_manager.disk.swap.storage.remove(page.page_id)
self._pop_and_store(storage_manager, page)
elif storage_manager.page_is_in_disk_file(page):
storage_manager.disk.file.storage.remove(page.page_id)
self._pop_and_store(storage_manager, page)
else:
# first dispatch, then distribute space in ram
# ram is full then eliminate the last page
if storage_manager.ram_is_full(self.page_queue):
self._pop_and_store(storage_manager, page)
else:
# find the first page_id = -1 and store
for index, cur_page_id in enumerate(storage_manager.ram.storage):
if cur_page_id == -1:
storage_manager.ram.storage[index] = page.page_id
self.page_queue.appendleft(page.page_id)
self._record_enter(page)
return
def _pop_and_store(self, storage_manager: StorageManager, page: Page):
obsolete_page_id = self.page_queue.pop()
block_id = storage_manager.ram.storage.index(obsolete_page_id)
storage_manager.ram.storage[block_id] = page.page_id
self.page_queue.appendleft(page.page_id)
# store in swap
if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
storage_manager.disk.swap.storage.append(obsolete_page_id)
# if swap is full then store in file of disk
else:
storage_manager.disk.file.storage.append(obsolete_page_id)
self._record_enter(page)
def _record_enter(self, page: Page):
self.missing_page_num += 1
self.enter_ram_record.append(page.page_id)
class LRUDispatcher(StorageDispatcherAdapter):
""" LRUDispatcher is a dispatch util which can help Memory dispatch page and record dispatch process. """
def __init__(self):
""" initialize some dispatch process info """
self.page_queue: deque[int] = deque()
self.missing_page_num = 0
self.enter_ram_record: List[int] = []
def put(self, storage_manager: StorageManager, page: Page):
""" dispatch StorageManager's storage """
storage_manager.print_ram()
logging.debug("[page_queue] {0}".format(str(self.page_queue)))
# judge page whether in ram
if storage_manager.page_is_in_ram(page):
logging.debug('page {0} in ram'.format(page.page_id))
# fresh page_queue
self.page_queue.remove(page.page_id)
self.page_queue.appendleft(page.page_id)
# not in ram
else:
logging.debug('page {0} not in ram'.format(page.page_id))
# read from disk (swap and file), swap first
if storage_manager.page_is_in_disk_swap(page):
storage_manager.disk.swap.storage.remove(page.page_id)
self._pop_and_store(storage_manager, page)
elif storage_manager.page_is_in_disk_file(page):
storage_manager.disk.file.storage.remove(page.page_id)
self._pop_and_store(storage_manager, page)
else:
# first dispatch, then distribute space in ram
# ram is full then eliminate the last page
if storage_manager.ram_is_full(self.page_queue):
self._pop_and_store(storage_manager, page)
else:
# find the first page_id = -1 and store
for index, cur_page_id in enumerate(storage_manager.ram.storage):
if cur_page_id == -1:
storage_manager.ram.storage[index] = page.page_id
self.page_queue.appendleft(page.page_id)
self._record_enter(page)
return
def _pop_and_store(self, storage_manager: StorageManager, page: Page):
obsolete_page_id = self.page_queue.pop()
block_id = storage_manager.ram.storage.index(obsolete_page_id)
storage_manager.ram.storage[block_id] = page.page_id
self.page_queue.appendleft(page.page_id)
# store in swap
if len(storage_manager.disk.swap.storage) < storage_manager.disk.swap.max_storage_space:
storage_manager.disk.swap.storage.append(obsolete_page_id)
# if swap is full then store in file of disk
else:
storage_manager.disk.file.storage.append(obsolete_page_id)
self._record_enter(page)
def _record_enter(self, page: Page):
self.missing_page_num += 1
self.enter_ram_record.append(page.page_id)
class Application:
def __init__(self):
self.page_list = self._page_list_init()
self._print_page_list()
self.storage_manager = StorageManager()
self.storage_dispatcher = None
def run(self, method='LRU', page_list: List[Page] = None):
if page_list is not None:
self.page_list = page_list
self.storage_manager.fresh_memory(ram_max_size=4)
if method == 'LRU':
self.storage_dispatcher = LRUDispatcher()
elif method == 'FIFO':
self.storage_dispatcher = FIFODispatcher()
else:
raise Exception("unknown dispatch method")
# dispatch until end
for i in range(len(self.page_list)):
self.storage_dispatcher.put(self.storage_manager, self.page_list[i])
logging.debug("[missing_page_num] {0}".format(self.storage_dispatcher.missing_page_num))
logging.debug("[missing_page_num] {0}/{1}".format(self.storage_dispatcher.missing_page_num, len(self.page_list)))
return self.storage_dispatcher.missing_page_num / len(self.page_list)
def _page_list_init(self, page_len=17):
# temp_list = [2, 0, 5, 8, 2, 1, 3, 1, 6, 6, 5, 8, 2, 6, 1, 0, 6]
# temp_list = [1, 0, 1, 0, 2, 4, 1, 0, 0, 8, 7, 5, 4, 3, 2, 3, 4]
page_list: List[Page] = [Page(page_id=random.randint(0, 8), time=i + 1) for i in range(page_len)]
# page_list: List[Page] = [Page(page_id=temp_list[i], time=i + 1) for i in range(page_len)]
return page_list
def _print_page_list(self):
logging.debug("[page_id] " + str(list(map(lambda item: item.page_id, self.page_list))))
if __name__ == '__main__':
app = Application()
app.run(method='FIFO')
app.run(method='LRU')
References
- 王道计算机考研 操作系统 页面置换算法 -bilili
- FIFO,LRU,OPT算法及缺页次数计算 -csdn