在上一篇中,介绍了etcd底层存储的内容,包括wal、raft.MemoryStorage以及backend。在介绍backend时提到了backend只是etcd kv存储的一部分,负责持久化存储,backend加内存化treeIndex才构成etcd完整的支持mvcc的kv存储。所以这篇就来介绍etcd mvcc的实现。
文章目录
- 整体架构
- index
- keyIndex
- treeIndex
- kvstore
整体架构
在看mvcc的具体实现之前,我们先从整体上介绍mvcc。
etcd维护了一个全局递增的版本号,每次变更都会产生新的版本号(版本号实际是main_sub的格式,会在kvstore部分详细介绍)。同时,etcd在内存中维护key => 版本号之间的映射关系,而持久化存储Backend中存储的是版本号 => value之间的映射关系。
查询时,首先根据key在查询到版本号,然后再根据版本号去backend中查询value;增加或者修改时,先在内存中增加key和版本号之间的映射,再将版本号和value存储至backend。
整体架构图如下。index模块在内存中维护了key => 版本号的映射。backend模块是kv存储,负责存储版本号 => value,前面已经详细介绍过。KV在index和backend上封装了支持mvcc的kv存储,其支持读写事务。
index
etcd使用了google开源的B树(btree package)在内存中维护了key和相应版本之间的关系。
treeIndex的结构如下,其使用泛型的B树来维护key和版本之间的关系。B树传入的类型为keyIndex,可以认为其是一个key为etcd key,value为key index的B树。
type treeIndex struct {
sync.RWMutex
tree *btree.BTreeG[*keyIndex] //泛型实现
lg *zap.Logger
}
keyIndex
key index是etcd中记录版本信息的数据结构,其结构如下。key是etcd的key,用来构建B树;modified表示最近一次修改的版本号;generations中记录着该key的修改历史。
type keyIndex struct {
key []byte
modified revision // the main rev of the last modification
generations []generation
}
etcd使用revision表示版本,revision有两个字段。main表示事务操作的主版本号,同一事务中发生变更的key共享同一main版本号,sub表示同一事务中发生变更的次数。
// A revision indicates modification of the key-value space.
// The set of changes that share same main revision changes the key-value space atomically.
type revision struct {
// main is the main revision of a set of changes that happen atomically.
main int64
// sub is the sub revision of a change in a set of changes that happen
// atomically. Each change has different increasing sub revision in that
// set.
sub int64
}
etcd中使用generations来记录历史修改数据,其是由generation组成的数组。generation是代的意思,一个key从创建到删除是一个generation,generation记录key从创建到删除中间所有变更的版本信息。当key被删除后再次被操作,会创建新的generation来记录版本信息。
type generation struct {
ver int64
created revision // when the generation is created (put in first revision).
revs []revision
}
在此基础上,keyIndex提供了一系列增删查的方法,包括追加版本、基于版本的压缩、基于版本的匹配查询等,都很简单,不一一描述。这里提一下tombstone方法,当etcd的删除key时,会显式调用相应keyIndex的tombstone方法,结束当前generation并开启新的空generation。
func (ki *keyIndex) tombstone(lg *zap.Logger, main int64, sub int64) error {
if ki.isEmpty() {
lg.Panic(
"'tombstone' got an unexpected empty keyIndex",
zap.String("key", string(ki.key)),
)
}
if ki.generations[len(ki.generations)-1].isEmpty() {
return ErrRevisionNotFound
}
ki.put(lg, main, sub)
ki.generations = append(ki.generations, generation{})
keysGauge.Dec()
return nil
}
treeIndex
理解了keyIndex,再回过头来看index。index的接口定义如下,可以看到index提供了一系列基于key和rev的增删查方法。key的部分由B树来实现,rev的部分由keyIndex来实现。也就是说treeIndex就是在B树上面做了一层封装,封装的内容就是对B树的值keyIndex的操作。
type index interface {
Get(key []byte, atRev int64) (rev, created revision, ver int64, err error)
Range(key, end []byte, atRev int64) ([][]byte, []revision)
Revisions(key, end []byte, atRev int64, limit int) ([]revision, int)
CountRevisions(key, end []byte, atRev int64) int
Put(key []byte, rev revision)
Tombstone(key []byte, rev revision) error
Compact(rev int64) map[revision]struct{}
Keep(rev int64) map[revision]struct{}
Equal(b index) bool
Insert(ki *keyIndex)
KeyIndex(ki *keyIndex) *keyIndex
}
kvstore
kvstore是真正意义的支持mvcc的kv存储。store的结构体如下。
ReadView及WriteView是抽象出的读写方法。mu读写锁,但该读写锁并非用来做读写事务的并发保护,而且是将事务操作和非事务操作隔离。
backend以及index构建kv存储的两大部分。currentRev是全局递增的版本号,已经保护该版本号的revMu。
另外还有一些压缩相关的内容,我们先略过,暂时只关心读写相关的内容。
type store struct {
ReadView
WriteView
cfg StoreConfig
// mu read locks for txns and write locks for non-txn store changes.
mu sync.RWMutex
b backend.Backend
kvindex index
le lease.Lessor
// revMuLock protects currentRev and compactMainRev.
// Locked at end of write txn and released after write txn unlock lock.
// Locked before locking read txn and released after locking.
revMu sync.RWMutex
// currentRev is the revision of the last completed transaction.
currentRev int64
// compactMainRev is the main revision of the last compaction.
compactMainRev int64
fifoSched schedule.Scheduler
stopc chan struct{}
lg *zap.Logger
hashes HashStorage
}
调用Read方法得到一个读事务storeTxnRead,其是在backend.ReadTx上的封装。创建时需要对store.mu和tx.rwmu分别加锁,解锁需要显式调用End方法。backend.ReadTx的提交是由backend在batchInterval是统一提交。
storeTxnRead同时还提供了读方法(range方法),比较简单,先根据key查版本号,再根据版本号查value。其通过版本号保证不会读到更新的值。
type storeTxnRead struct {
s *store
tx backend.ReadTx
firstRev int64
rev int64
trace *traceutil.Trace
}
func (s *store) Read(mode ReadTxMode, trace *traceutil.Trace) TxnRead {
s.mu.RLock()
s.revMu.RLock()
// For read-only workloads, we use shared buffer by copying transaction read buffer
// for higher concurrency with ongoing blocking writes.
// For write/write-read transactions, we use the shared buffer
// rather than duplicating transaction read buffer to avoid transaction overhead.
var tx backend.ReadTx
if mode == ConcurrentReadTxMode {
tx = s.b.ConcurrentReadTx()
} else {
tx = s.b.ReadTx()
}
tx.RLock() // RLock is no-op. concurrentReadTx does not need to be locked after it is created.
firstRev, rev := s.compactMainRev, s.currentRev
s.revMu.RUnlock()
return newMetricsTxnRead(&storeTxnRead{s, tx, firstRev, rev, trace})
}
func (tr *storeTxnRead) End() {
tr.tx.RUnlock() // RUnlock signals the end of concurrentReadTx.
tr.s.mu.RUnlock()
}
调用Write方法得到一个写事务storeTxnWrite,写事务是在读事务及backend.BatchTx的封装。在写事务的range方法中,会使用最新的版本号进行读,所以可以读到最新的修改。写事务结束时,会将全局的版本号递增。
type storeTxnWrite struct {
storeTxnRead
tx backend.BatchTx
// beginRev is the revision where the txn begins; it will write to the next revision.
beginRev int64
changes []mvccpb.KeyValue
}
func (s *store) Write(trace *traceutil.Trace) TxnWrite {
s.mu.RLock()
tx := s.b.BatchTx()
tx.LockInsideApply()
tw := &storeTxnWrite{
storeTxnRead: storeTxnRead{s, tx, 0, 0, trace},
tx: tx,
beginRev: s.currentRev,
changes: make([]mvccpb.KeyValue, 0, 4),
}
return newMetricsTxnWrite(tw)
}
func (tw *storeTxnWrite) Range(ctx context.Context, key, end []byte, ro RangeOptions) (r *RangeResult, err error) {
rev := tw.beginRev
if len(tw.changes) > 0 {
rev++
}
return tw.rangeKeys(ctx, key, end, rev, ro)
}
func (tw *storeTxnWrite) End() {
// only update index if the txn modifies the mvcc state.
if len(tw.changes) != 0 {
// hold revMu lock to prevent new read txns from opening until writeback.
tw.s.revMu.Lock()
tw.s.currentRev++
}
tw.tx.Unlock()
if len(tw.changes) != 0 {
tw.s.revMu.Unlock()
}
tw.s.mu.RUnlock()
}
以上就是etcd mvcc模块的介绍,后续会介绍etcd如何在mvcc的基础上实现事务。