主要目标
实现mvcc和2pc, Percolator
partA
将存储分为三个独立的部分,lock(管理锁记录) default(存储数据),write(提交的记录),提高并行性
对于lock存储,只要存储一份(因为一个行同时只能有一个锁)。lock这个结构包含了lockttl,超时时间,lockts(应该是事务开始时间), writekind也许是用来区别读锁写锁吧。
对于数据的话,我们就用key + startTs作为key,来mvcc做
对于write,我们也用key + commotTs作为key,开始时间作为value,mvcc来做
同时我们要注意write后的话,是最先读到的,也就是我们遍历的时候要按timestamp倒序遍历。也就是说我们对key + ts这种键做排序的话,我们就是把这个key按从小到大,然后ts从大到小,实现简单,并且快(可以看看这部分比较是怎么写的(todo))
然后就是currentWrite,就是当前事务为startTs的写入
代码
func (txn *MvccTxn) CurrentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
iter := txn.Reader.IterCF(engine_util.CfWrite)
key_ts := EncodeKey(key, TsMax)
iter.Seek(key_ts)
for ; iter.Valid(); iter.Next() {
key_raw := iter.Item().Key()
ts := decodeTimestamp(key_raw)
key_1 := DecodeUserKey(key_raw)
if !bytes.Equal(key, key_1) {
return nil, 0, nil
}
if ts < txn.StartTS {
return nil, 0, nil
}
val, err := iter.Item().Value()
if err != nil {
panic("val err")
}
write, err_p := ParseWrite(val)
if err_p != nil {
panic("parse err")
}
if write.StartTS == txn.StartTS {
return write, ts, nil
}
}
return nil, 0, nil
}
然后MostRecentWrite,就是以最大version读的write,也就是当前时刻的write
代码
func (txn *MvccTxn) MostRecentWrite(key []byte) (*Write, uint64, error) {
// Your Code Here (4A).
iter := txn.Reader.IterCF(engine_util.CfWrite)
key_ts := EncodeKey(key, TsMax)
iter.Seek(key_ts)
if iter.Valid() {
key_raw := iter.Item().Key()
ts := decodeTimestamp(key_raw)
key_1 := DecodeUserKey(key_raw)
if !bytes.Equal(key, key_1) {
return nil, 0, nil
}
val, err := iter.Item().Value()
if err != nil {
panic("val err")
}
write, err_p := ParseWrite(val)
if err_p != nil {
panic("parse err")
}
return write, ts, nil
}
return nil, 0, nil
}
PartB
需要上真锁latch,因为可能检查完,变了,然后变了还应用的话,就出错了。
KVPreWrite:针对每个 key,首先检验是否存在写写冲突(就是有没有startTS后有没有commit的,用currentWrite来查就行了)(可以阅读参考资料:有证明的),再检查是否存在行锁,如存在则需要根据所属事务是否一致来决定是否返回 KeyError,最后将 key 添加到 CFDefault 和 CFLock 即可。感觉顺序一般都是要先default,lock这样来做的
KVCommit:针对每个 key,首先检查是否存在行锁,如不存在则已经 commit 或 rollback,如存在则需要根据 CFWrite 中的当前事务状态来判断是否返回 KeyError(1.不是当前事务的锁,重试2.如果没锁的话, 已经write的是rollback的话,就放回abort),最后将 key 添加到 CFWrite 中并在 CFLock 中删除即可(注意有一些已经commit了)
KvGet.首先检查行锁,如为当前事务所锁,则返回 Error(可能是因为正在更新,lock解开后,才有write commit),否则调用 mvcc 模块的 GetValue 获得快照读即可。(之前更新的commitversion < startVersion都能读到)
partC
KvCheckTxnStatus:以primaykey做标识,别的key都要有指针来指向他,作为事务状态的标识:1.timeout,回滚改行事务(主要tinykv是混合时钟,所以我们要拿出physical部分来做) 2.lock,那么我们可以等待重试,返回locktll。3.如果已经超时回滚的话,也就是没lock,同时write为空的话,我们就写入write里面标记rollback。4.剩下的是已经commit的
KvBatchRollback:针对每个 key,首先检查是否存在行锁,如果存在则删除 key 在 CFLock 和 CFValue 中的数并且在 CFWrite 中写入一条 rollback 即可(Rollback是以startTs为writeVersion的)[参考文末的的图片].如果不存在或者不归当前事务锁定,则从 CFWrite 中获取当前事务的提交信息,如果不存在则向 CFWrite 写入一条 rollback。存在write,如果不是rollback的的话,我们就要返回abort错误。
KvResolveLock:针对每个 key,根据请求中的参数决定来 commit 或者 rollback 即可。
KvResolveLock代码如下:
func (server *Server) KvResolveLock(_ context.Context, req *kvrpcpb.ResolveLockRequest) (*kvrpcpb.ResolveLockResponse, error) {
// Your Code Here (4C).
resp := new(kvrpcpb.ResolveLockResponse)
txn := server.mvccGenerate(req.StartVersion, req.Context)
iter := txn.Reader.IterCF(engine_util.CfLock)
for {
if !iter.Valid() {
break
}
key := iter.Item().Key()
value, err := iter.Item().Value()
iter.Next()
if err != nil {
panic(err)
}
lock, err_lock := mvcc.ParseLock(value)
if err_lock != nil {
panic(err_lock)
}
write, _, _ := txn.CurrentWrite(key)
if lock != nil && lock.Ts != txn.StartTS {
// other transaction locked the key
continue
}
if write == nil {
if req.StartVersion < req.CommitVersion {
// commit
txn.DeleteLock(key)
txn.PutWrite(key, req.CommitVersion, &mvcc.Write{StartTS: req.StartVersion, Kind: mvcc.WriteKindPut})
}
if req.StartVersion > req.CommitVersion {
// rollback
txn.DeleteValue(key)
txn.DeleteLock(key)
txn.PutWrite(key, req.StartVersion, &mvcc.Write{StartTS: req.StartVersion, Kind: mvcc.WriteKindRollback})
}
}
}
server.storage.Write(req.Context, txn.Writes())
return resp, nil
}
Scanner 扫描到没有 key 或达到 limit 阈值即可。针对 scanner,需要注意不能读有锁的 key,不能读未来的版本(也就是生产了个快照),不能读已删除或者已 rollback 的 key
首先我们前面的封装了一下这个必要的细节,所以,我们要再写个新的定制分装
scanner
结构如下
type Scanner struct {
// Your Data Here (4C).
txn *MvccTxn
preKey []byte
iter engine_util.DBIterator
}
我们要mvccTxn来分装一下事务操作,这是必要的。然后preKey就是防止有多个key version,但是只要返回一个最新的key就行,用这个来防止放回重复,然后就是default列的iter,当然iter是用来查next key的,我们还要拿这个key,去到writecf里面找第一个不是version,且这个write不是回滚操作的.同时scan有错误,不停在操作,记录错误在kvpair里面
代码如下(注意key有没有ts)
func (scan *Scanner) Next() ([]byte, []byte, error) {
// Your Code Here (4C).
for {
if !scan.iter.Valid() {
return nil, nil, ErrEnd{}
}
key := scan.iter.Item().KeyCopy(nil)
// value, err := scan.iter.Item().ValueCopy(nil)
// ts := decodeTimestamp(key)
user_key := DecodeUserKey(key)
scan.iter.Next()
// the same key with preKey should continue
if bytes.Equal(user_key, scan.preKey) {
continue
}
iter := scan.txn.Reader.IterCF(engine_util.CfWrite)
key_ts := EncodeKey(user_key, scan.txn.StartTS)
iter.Seek(key_ts)
for {
if !iter.Valid() {
break
}
key_raw := iter.Item().Key()
ts := decodeTimestamp(key_raw)
key_1 := DecodeUserKey(key_raw)
if !bytes.Equal(user_key, key_1) {
// no write on the key
break
}
scan.preKey = user_key
val, err := iter.Item().Value()
if err != nil {
panic("val err")
}
iter.Next()
write, err_p := ParseWrite(val)
if err_p != nil {
panic("parse err")
}
if write.Kind == WriteKindPut && ts <= scan.txn.StartTS {
// find the latest
log.Infof("ts{%v} timestamp{%v}", ts, scan.txn.StartTS)
scan.iter.Seek(EncodeKey(user_key, write.StartTS))
value, err := scan.iter.Item().ValueCopy(nil)
if err != nil {
return nil, nil, err
}
return user_key, value, nil
}
if (write.Kind == WriteKindRollback || write.Kind == WriteKindDelete) && ts <= scan.txn.StartTS {
// if deleted, impossible
if write.Kind == WriteKindDelete {
break
}
// if rollback, look back
continue
}
}
}
return nil, nil, nil
}
Ref:注意本文只总结了api的细节,接下来要做大局的探讨很深入(todo)
TiKV 源码解析系列文章(十二)分布式事务 - 知乎
TiKV 事务模型概览,Google Spanner 开源实现 | PingCAP
PolarDB 数据库内核月报
Async Commit 原理介绍丨 TiDB 5.0 新特性 | PingCAP
我在lab实现了这个判断