VM层通过VersionManager
,向上层提供api接口以及各种功能,对于VM上层的模块(是使用了VM层接口的上层模块),那么操作的都是Entry
结构
而VM依赖于DM,所以VM视角里(在自我实现里面),操作的是DataItem
VersionManager
定义
public interface VersionManager {
byte[] read(long xid, long uid) throws Exception;
long insert(long xid, byte[] data) throws Exception;
boolean delete(long xid, long uid) throws Exception;
long begin(int level);
void commit(long xid) throws Exception;
void abort(long xid);
}
同时,可以看到其中有一个CacheManager
,说明也是实现了抽象缓存的,用到了模板方法设计模式,这个在介绍日志模块和数据管理器时也见到过。
VM 的实现类还被设计为 **Entry**
** **的缓存,需要继承** AbstractCache<Entry>**
。需要实现的获取到缓存和从缓存释放的方法很简单:
抽象缓存的实现
GetForCache
@verride
protected Entry getForCache(long uid) throws Exception {
// 核心还是调用dm.read()方法
Entry entry = Entry.loadEntry(this, uid);
if (entry == null) {
throw Error.NullEntryException;
}
return entry;
}
ReleaseForCache
释放缓存
@Override
protected void releaseForCache(Entry entry) {
entry.remove();
}
开启事务Begin
//创建事务,将事务添加到活动事务的映射中,并返回新事务的ID
@Override
public long begin(int level) {
lock.lock(); // 获取锁,防止并发问题
try {
long xid = tm.begin(); // 调用事务管理器的begin方法,开始一个新的事务,并获取事务ID
Transaction t = Transaction.newTransaction(xid, level, activeTransaction); // 创建一个新的事务对象
activeTransaction.put(xid, t); // 将新的事务对象添加到活动事务的映射中
return xid; // 返回新的事务ID
} finally {
lock.unlock(); // 释放锁
}
}
// 创建一个新的事务:设置事务的隔离级别、隔离级别是可重复读时创建快照
public static Transaction newTransaction(long xid, int level, Map<Long, Transaction> active) {
Transaction t = new Transaction();
// 设置事务ID
t.xid = xid;
// 设置事务隔离级别
t.level = level;
// 如果隔离级别不为0,创建快照
if (level != 0) {
t.snapshot = new HashMap<>();
// 将活跃事务的ID添加到快照中
for (Long x : active.keySet()) {
t.snapshot.put(x, true);
}
}
// 返回新创建的事务
return t;
}
提交事务Commit
主要就是 free 掉相关的结构,并且释放持有的锁,并修改事务状态
// Commit 公开的commit方法,用于提交一个事务
@Override
public void commit(long xid) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁
try {
if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}
} catch (NullPointerException n) { // 如果事务对象为null,打印事务ID和活动事务的键集,然后抛出异常
System.out.println(xid);
System.out.println(activeTransaction.keySet());
Panic.panic(n);
}
lock.lock(); // 获取锁,防止并发问题
activeTransaction.remove(xid); // 1、从活动事务中移除这个事务
lock.unlock(); // 释放锁
lt.remove(xid); // 2、从锁表中移除这个事务的锁
tm.commit(xid); // 3、调用事务管理器的commit方法,进行事务的提交操作
}
中止事务Abort
中止事务的方法则有两种,手动和自动。手动指的是调用 abort() 方法,而自动,则是在事务被检测出出现死锁时,会自动撤销回滚事务;或者出现版本跳跃时,也会自动回滚:
@Override
// 公开的abort方法,用于中止一个事务
public void abort(long xid) {
// 调用内部的abort方法,autoAborted参数为false表示这不是一个自动中止的事务
internAbort(xid, false);
}
// 内部的abort方法,处理事务的中止
private void internAbort(long xid, boolean autoAborted) {
// 获取锁,防止并发问题
lock.lock();
// 从活动事务中获取事务对象
Transaction t = activeTransaction.get(xid);
// 如果这不是一个自动中止的事务,那么从活动事务中移除这个事务
if (!autoAborted) {
activeTransaction.remove(xid);
}
// 释放锁
lock.unlock();
// 如果事务已经被自动中止,那么直接返回,不做任何处理
if (t.autoAborted) return;
// 从锁表中移除这个事务的锁
lt.remove(xid);
// 调用事务管理器的abort方法,进行事务的中止操作
tm.abort(xid);
}
读取数据Read
read()
方法读取一个 entry
,需要注意判断可见性
//读取一个entry,注意判断可见性
@Override
public byte[] read(long xid, long uid) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁
if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}
Entry entry = null;
try {
entry = super.get(uid); // 尝试获取数据项
} catch (Exception e) {
if (e == Error.NullEntryException) { // 如果数据项不存在,那么返回null
return null;
} else { // 如果出现其他错误,那么抛出错误
throw e;
}
}
try {
// 在事务隔离级别中讲解了该方法
if (Visibility.isVisible(tm, t, entry)) { // 如果数据项对当前事务可见,那么返回数据项的数据
return entry.data();
} else { // 如果数据项对当前事务不可见,那么返回null
return null;
}
} finally {
entry.release(); // 释放数据项
}
}
插入数据Insert
将数据包裹成 Entry
,然后交给 DM 插入即可
// Insert 将数据包裹成Entry,然后交给DM插入即可
@Override
public long insert(long xid, byte[] data) throws Exception {
lock.lock(); // 获取锁,防止并发问题
Transaction t = activeTransaction.get(xid); // 从活动事务中获取事务对象
lock.unlock(); // 释放锁
if (t.err != null) { // 如果事务已经出错,那么抛出错误
throw t.err;
}
byte[] raw = Entry.wrapEntryRaw(xid, data); // 将事务ID和数据包装成一个新的数据项
return dm.insert(xid, raw); // 调用数据管理器的insert方法,插入新的数据项,并返回数据项的唯一标识符
}
删除数据Delete
// Delete 删除一个数据项
@Override
// 删除一个数据项的方法
public boolean delete(long xid, long uid) throws Exception {
// 获取锁,防止并发问题
lock.lock();
// 从活动事务中获取事务对象
Transaction t = activeTransaction.get(xid);
// 释放锁
lock.unlock();
// 如果事务已经出错,那么抛出错误
if (t.err != null) {
throw t.err;
}
Entry entry = null;
try {
// 尝试获取数据项
entry = super.get(uid);
} catch (Exception e) {
// 如果数据项不存在,那么返回false
if (e == Error.NullEntryException) {
return false;
} else {
// 如果出现其他错误,那么抛出错误
throw e;
}
}
try {
// 如果数据项对当前事务不可见,那么返回false
if (!Visibility.isVisible(tm, t, entry)) {
return false;
}
Lock l = null;
try {
// 尝试为数据项添加锁
l = lt.add(xid, uid);
} catch (Exception e) {
// 如果出现并发更新的错误,那么中止事务,并抛出错误
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// 如果成功获取到锁,那么锁定并立即解锁
if (l != null) {
l.lock();
l.unlock();
}
// 如果数据项已经被当前事务删除,那么返回false
if (entry.getXmax() == xid) {
return false;
}
// 如果数据项的版本被跳过,那么中止事务,并抛出错误
if (Visibility.isVersionSkip(tm, t, entry)) {
t.err = Error.ConcurrentUpdateException;
internAbort(xid, true);
t.autoAborted = true;
throw t.err;
}
// 设置数据项的xmax为当前事务的ID,表示数据项被当前事务删除
entry.setXmax(xid);
// 返回true,表示删除操作成功
return true;
} finally {
// 释放数据项
entry.release();
}
}