LevelDB源码解析(五)

LevelDB(五)

五、 db

1. skiplist.h

在这里插入图片描述

1. Node(skiplist.h)

跳表中的节点类, 是一个数组。

// 节点的值
Key const key;
// 下一个节点
std::atomic<Node*> next_[1];
// 构造函数  初始化key
Node(const Key& k);
// 返回下n个节点
// 使用std::memory_order_acquire  所有acquire级别的内存操作在执行时是互斥的,且按照执行顺序进行
Node* Next(int n);//将第n个节点设置为x
void SetNext(int n, Node* x);
// 同Next 但是不保证安全  memory_order_relaxed
Node* NoBarrier_Next(int n);
2. Skiplist.h(skiplist.h)

跳表实现类。

// 跳表最大高度为12层
enum { kMaxHeight = 12 };
// 跳表的比较器
Comparator const compare_;
// 跳表的头节点
Node* const head_;
// 跳表最大高度
std::atomic<int> max_height_;
// 随机数对象,用于跳表新增时的插入操作
Random rnd_;
// 快速分配空间
Arena* const arena_;
// 返回当前跳表的最大高度
int GetMaxHeight();
// 使用比较器判断两个key的大小  
bool Equal(const Key& a, const Key& b);// 在arena_中分配一块sizeof(Node)和height - 1个指针的大小
Node* NewNode(const Key& key, int height);// 初始化变量 将head_的每一层都指向null
SkipList(Comparator cmp, Arena* arena);// 如果key小于n 返回true
// 顺序存放的 小于就往下走 
bool KeyIsAfterNode(const Key& key, Node* n);
// 从最高层开始找比key大或相等的节点,遍历到第0层结束 
// 找到每一层中比key大或相等的节点, 放在prev中
Node* FindGreaterOrEqual(const Key& key, Node** prev);
// 从最后一层开始找到第一个比key小的节点
Node* FindLessThan(const Key& key) const;
// 从最后一层开始,先从head跳到最后一层的头节点,再从这个头节点开始找最后一个
Node* FindLast() const;// rnd_对象的随机高度 在1-3中随机
int RandomHeight();// 插入节点 
/*1. 将所有比key大或相等的节点在到prev中2. 获得随机高度如果随机高度比当前最大高度要好大将当前最大高度 到 随机高度 的指针指向head更改当前最大高度3. 在随机高度上创建key节点4. 在0-随机高度上 将节点进行插入
*/
void Insert(const Key& key);// 跳表中有key节点 返回true
bool Contains(const Key& key) const;
3. Iterator(skiplist.h)

跳表的迭代器类。

// 保存跳表指针
const SkipList* list_;
// 保存当前迭代器指向的节点
Node* node_;
// 初始化跳表
explicit Iterator(const SkipList* list);
// 当前迭代器指向的节点不为空 返回true
bool Valid() const;
// 返回迭代器指向节点的key
const Key& key() const;
// 获取迭代器当前指向node_的下一个
void Next();
// 获取迭代器当前指向node_的前一个
void Prev();
// 找到第一个大于等于target的节点
void Seek(const Key& target);
// 找到第一个节点
void SeekToFirst();
// 找到最后一个节点
void SeekToLast();

2. memtable.h

1. KeyComparator(memtable.h)

构造了一个InternalKeyComparator类的仿函数对象。

// 把p到p+5解析成len p+=5 返回数据部分
static Slice GetLengthPrefixedSlice(const char* data);
// 持有的InternalKeyComparator比较器
const InternalKeyComparator comparator;// 初始化比较器
explicit KeyComparator(const InternalKeyComparator& c);// 调用GetLengthPrefixedSlice 比较两个数据部分
int operator()(const char* a, const char* b) const;
2. MemTable(memtable.h)
typedef SkipList<const char*, KeyComparator> Table;// 比较器
KeyComparator comparator_;// 引用计数
int refs_;// 内存分配器
Arena arena_;// 数据存储实体 跳表
Table table_;
// 清空scratch
// 将target的大小 编码到scratch中
// scratch后加上target的数据和大小 
// 返回scratch  ==  encode(target.size) + target.data + target.size
static const char* EncodeKey(std::string* scratch, const Slice& target);
// 初始化比较器
MemTable(const InternalKeyComparator& comparator);
// assert 确保引用计数为0 
~MemTable();// 引用计数++
void Ref();
// 引用计数-- 如果引用计数为0 对MemTable进行析构
void Unref();// 返回已使用的内存
size_t ApproximateMemoryUsage();// 返回一个跳表的迭代器
Iterator* NewIterator();// 在memtable中添加kv
/*1. key的大小2. value的大小3. key + SequenceNumber = 7 + type = 1 = key + 84. encoded_len =  internal_key_size编码后的长度 + internal_key_size  + val_size编码后的长度 + val_size5. 分配encoded_len空间                           6. buf = encode(internal_key_size)7. buf = encode(internal_key_size) + key8. buf = encode(internal_key_size) + key + encode(SequenceNumber)9. buf = encode(internal_key_size) + key + encode(SequenceNumber|type) + encode(valsize)10. buf = encode(internal_key_size) + key + encode(SequenceNumber) + encode(value.size) + value11. 检验12. 插入SkipList
*/
void Add(SequenceNumber seq, ValueType type, const Slice& key, const Slice& value);// 在memtable中获得kv
// LookupKey见dbformat.h 下 LookupKey
/*
1. 拿到 memtable_key
2. 实例化一个跳表的迭代器
3. 查找memtable_key  1. 如果没找到 说明数据不存在 返回false2. 如果数据存在1. 拿到整个entry数据2. 将key的总长度解析到key_length  key_ptr = entry + 53. 检查是同一个user key1. 如果不是 返回false2. 如果是:1. 找到type标志位 = tag2. 判断tag的值 0xff == 2^83. 如果tag的值是kTypeValue 表示值存在 1. 从key_ptr + key_length 就是value的开始 解析到value的数据存放到v中2. 将value设置为v3. 如果tag的值是kTypeDeletion  表示该数据已删除
*/
bool Get(const LookupKey& key, std::string* value, Status* s);
3. MemTableIterator(db/memtable.cc)

MemTableIterator是对memtable的迭代,本质是对skiplist迭代器的封装。

// 持有跳表迭代器
MemTable::Table::Iterator iter_;
// 对一个key生成编码的缓冲区
std::string tmp_;
// 清空scratch
// 将target的大小 编码到scratch中
// scratch后加上target的数据和大小 
// 返回scratch  ==  encode(target.size) + target.data + target.size
static const char* EncodeKey(std::string* scratch, const Slice& target);
// 初始化跳表迭代器
MemTableIterator(MemTable::Table* table);
// iter_  Valid()方法封装
bool Valid() const; 
// iter_  Seek()方法封装
// 返回scratch  ==  encode(target.size) + target.data + target.size
void Seek(const Slice& k); 
// iter_  SeekToFirst()方法封装
void SeekToFirst();
// iter_  SeekToLast()方法封装
void SeekToLast();
// iter_  Next()方法封装
void Next(); 
// iter_  Prev()方法封装
void Prev(); // 返回 iter_ key()方法  的数据部分  GetLengthPrefixedSlice
Slice key();
// 拿到iter_.key()方法 的数据部分 返回key的数据部分
Slice value();
// 返回ok
Status status() const;

3. filename.h

  • kLogFile: dbname/[0-9]+.log

  • kTableFile:dbname/[0-9]+.(sst|ldb)

  • kDBLockFile:dbname/LOCK

  • kDescriptorFile:dbname/MANIFEST-[0-9]+

  • kCurrentFile:dbname/CURRENT

    记载当前的manifest文件名

  • kTempFile:dbname/[0-9]+.dbtmp

    在repair数据库时,会重放wal日志,将这些和已有的sst合并写到临时文件中去,成功之后调用rename原子重命名

  • kInfoLogFile:dbname/LOG.old或者dbname/LOG

std::string LogFileName(const std::string& dbname, uint64_t number);
std::string TableFileName(const std::string& dbname, uint64_t number);
std::string SSTTableFileName(const std::string& dbname, uint64_t number);
std::string DescriptorFileName(const std::string& dbname, uint64_t number);
std::string CurrentFileName(const std::string& dbname);
std::string LockFileName(const std::string& dbname);
std::string TempFileName(const std::string& dbname, uint64_t number);
std::string InfoLogFileName(const std::string& dbname);
std::string OldInfoLogFileName(const std::string& dbname);// 判断filename是否是一种类型的日志文件
bool ParseFileName(const std::string& filename, uint64_t* number, FileType* type);
// 删除开始的dbname添加新的manifest文件名
Status SetCurrentFile(Env* env, const std::string& dbname, uint64_t descriptor_number);

4. dbformat.h

1. config
static const int kNumLevels = 7;// 第0层sstable个数到达这个阈值时触发压缩,默认值为4
static const int kL0_CompactionTrigger = 4;// 第0层sstable到达这个阈值时,延迟写入1ms,将CPU尽可能的移交给压缩线程,默认值为8
static const int kL0_SlowdownWritesTrigger = 8;// 第0层sstable到达这个阈值时将会停止写,等到压缩结束,默认值为12
static const int kL0_StopWritesTrigger = 12;// 新压缩产生的sstable允许最多推送至几层(目标层不允许重叠),默认为2
static const int kMaxMemCompactLevel = 2;// 在数据迭代的过程中,也会检查是否满足压缩条件,该参数控制读取的最大字节数
static const int kReadBytesPeriod = 1048576;
2. ParsedInternalKey

InternalKey的结构体。

user_key + type + sequence

// 空构造
ParsedInternalKey();
// 初始化
ParsedInternalKey(const Slice& u, const SequenceNumber& seq, ValueType t);
  Slice user_key;SequenceNumber sequence;ValueType type;
// 数据的删除标记和数据有效标记
enum ValueType { kTypeDeletion = 0x0, kTypeValue = 0x1 };// 返回InternalKey的长度
inline size_t InternalKeyEncodingLength(const ParsedInternalKey& key);// 返回ValueType添加到seq上 的结果 
static uint64_t PackSequenceAndType(uint64_t seq, ValueType t);// result += user_key.data + user_key.size
// 压缩  添加ValueType的seq 
// result += user_key.data + user_key.size + encode(seq)
void AppendInternalKey(std::string* result, const ParsedInternalKey& key);// 将internal_key解析成ParsedInternalKey格式 
/*1. 判断internal_key的长度  不能小于82. 解析internal_key的后8位 type + sequence3. 取到标志位 给result赋值
*/
bool ParseInternalKey(const Slice& internal_key, ParsedInternalKey* result);// 返回internal_key的前 n - 8 位数据部分
inline Slice ExtractUserKey(const Slice& internal_key);
3. InternalKey
// 保存的InternalKey数据 = user_key + user_key.size + (type + sequence)
std::string rep_;
// 空构造 
InternalKey();
// 构造 InternalKey
InternalKey(const Slice& user_key, SequenceNumber s, ValueType t);
// 将s.data 和 s.size 赋值给rep_
bool DecodeFrom(const Slice& s);
// 返回rep_
Slice Encode();
// 提取rep_的user_key
Slice user_key();
// 将p解析到rep_中
void SetFrom(const ParsedInternalKey& p);
// 清除 rep_
void Clear();
4. InternalKeyComparator

InternalKey比较器类。

const Comparator* user_comparator_;
// 初始化比较器
InternalKeyComparator(const Comparator* c);
// 初始化比较器
const char* Name() const override;
// 比较两个InternalKey的数据部分 小于返回-1 大于返回1
int Compare(const Slice& a, const Slice& b) const override;// start的数据部分 limit的数据部分
// 截取start和user_limit不同的部分
// 如果user_start后面的部分比前面的大 更新start
// 否则不改变
void FindShortestSeparator(std::string* start, const Slice& limit) const override;
//
void FindShortSuccessor(std::string* key) const override;
// 返回比较器对象
const Comparator* user_comparator(); 
// 比较两个InternalKey 小于返回-1 大于返回1
int Compare(const InternalKey& a, const InternalKey& b) const;
5. InternalFilterPolicy

InternalKey的过滤器。

// 过滤器策略
const FilterPolicy* const user_policy_;
// 初始化过滤器策略
InternalFilterPolicy(const FilterPolicy* p);
// 返回过滤器名称
const char* Name();
// 创建n个key的映射 结果存放在dst中
void CreateFilter(const Slice* keys, int n, std::string* dst);
// 在filter中查看key是否存在 
bool KeyMayMatch(const Slice& key, const Slice& filter);
6. LookupKey

在这里插入图片描述

将不同的key进行转换:UserKey InternalKey MemtableKey

// 整个长度的位置
const char* start_;
// user_key的位置
const char* kstart_;
// user_key + 8 的位置
const char* end_;
// 短字符优化  如果字符比较短的话,可以用栈的地址
char space_[200];  
// 将user_key 和 sequence 格式化
/*size_t needed = usize + 13;  4 + 8 + 1(多分配)1. 如果短字符 使用预分配的空间space_2. 否则使用新开辟的空间3. 更新start_4. 将user_key的长度编码到dst后5. 更新kstart_6. 复制user_key的数据部分7. 编码SequenceNumber8. 更新end_
*/
LookupKey(const Slice& user_key, SequenceNumber sequence);
// 清空space_缓存
~LookupKey();
// 返回memtable_key
Slice memtable_key() const;
// 返回internal_key
Slice internal_key() const;
// 返回user_key
Slice user_key() const;

5. table_cache.h

TableCache

用作SST中的缓存。

private:// 封装了双平台的方法,用于读取SST文件Env* const env_;// SST名字const std::string dbname_;// Cache参数配置const Options& options_;// 缓存基类句柄Cache* cache_;
//  查找指定的SSTable,优先去缓存找,找不到则去磁盘读取,读完之后在插入到缓存中
/*1. 先将file_number反序列化2. 如果在缓存中没有找到file_number构造table名fnamefile保存打开fname对应的文件3. 将数据读入缓存中	
*/ 
Status FindTable(uint64_t file_number, uint64_t file_size, Cache::Handle**);// 释放句柄
static void DeleteEntry(const Slice& key, void* value);
static void UnrefEntry(void* arg1, void* arg2);// 构造函数 传入名字,配置和数据的条数
// 初始化ShardedLRUCache缓存
TableCache(const std::string& dbname, const Options& options, int entries);
// 删除ShardedLRUCache缓存
~TableCache();// 支持对sst缓存的遍历
/*1. 查找指定的SSTable2. 新建表的迭代器
*/
Iterator* NewIterator(const ReadOptions& options, uint64_t file_number, uint64_t file_size, Table** tableptr = nullptr);// 获得数据
Status Get(const ReadOptions& options, uint64_t file_number, uint64_t file_size, const Slice& k, void* arg, void (*handle_result)(void*, const Slice&, const Slice&));// 淘汰指定的SST句柄
void Evict(uint64_t file_number);

6. version_edit.h

在leveldb中则使用versionedit来记录每次变更。因此,数据库正常/非正常关闭重新打开后,只需要按顺序把 MANIFEST 文件中的 VersionEdit 执行一遍,就可以把数据恢复到宕机前的最新版本(内存中的数据持久化由 WAL 文件保证,所以 WAL 文件+sstables 文件,就可以保证数据不会丢失)。

Manifest文件可以看作是VersionEdit的日志,为了快速恢复需要将这些变更持久化到磁盘上。

FileMetaData(version_edit.h)

记录一个SST文件中的参数。

FileMetaData();
// 引用计数
int refs;
// 记录已经seek的个数  如果超过一定数量会进行compaction
int allowed_seeks;  // Seeks allowed until compaction
// 唯一标识一个sstable
uint64_t number;
// 文件大小 
uint64_t file_size;    // File size in bytesInternalKey smallest;  // Smallest internal key served by table
InternalKey largest;   // Largest internal key served by table
VersionEdit(version_edit.h)
// 去标识每个操作
enum Tag {kComparator = 1,kLogNumber = 2,kNextFileNumber = 3,kLastSequence = 4,kCompactPointer = 5,kDeletedFile = 6,kNewFile = 7,// 8 was used for large value refskPrevLogNumber = 9
};
typedef std::set<std::pair<int, uint64_t>> DeletedFileSet;
// 比较器名称
std::string comparator_;
// 日志编号,该日志之前的数据均可删除
uint64_t log_number_;
// 已经弃用
uint64_t prev_log_number_;
// 下一个文件编号(ldb、idb、MAINFEST文件共享一个序号空间)
uint64_t next_file_number_;
// 最后的seq_num
SequenceNumber last_sequence_;
// 改动的标志位 便于将改动进行编码
bool has_comparator_;
bool has_log_number_;
bool has_prev_log_number_;
bool has_next_file_number_;
bool has_last_sequence_;
// 相比上次version而言,本次需要压缩的文件有哪些
std::vector<std::pair<int, InternalKey>> compact_pointers_;
// 相比上次version而言,本次需要删除的文件有哪些
DeletedFileSet deleted_files_;
// 相比上次version而言,本次新增的文件有哪些
std::vector<std::pair<int, FileMetaData>> new_files_;
// 清空当前对象的所有数据
VersionEdit(); 
// 清空当前对象的所有数据
void Clear();
//  设置比较器
void SetComparatorName(const Slice& name);
// 设置日志序号
void SetLogNumber(uint64_t num);
// 设置上一个日志序号
void SetPrevLogNumber(uint64_t num);
// 设置下一个文件号
void SetNextFile(uint64_t num);
// 设置最新的SequenceNumber
void SetLastSequence(SequenceNumber seq);
// 设置压缩指针
void SetCompactPointer(int level, const InternalKey& key);
// 添加第level层的文件信息
void AddFile(int level, uint64_t file, uint64_t file_size,const InternalKey& smallest, const InternalKey& largest);// 在删除文件数组中第level层的文件信息
void RemoveFile(int level, uint64_t file);
// 将当前文件信息的改动进行编码
// tag + 操作信息
void EncodeTo(std::string* dst) const;
// 将当前文件信息的改动进行解码
// tag + 操作信息
Status DecodeFrom(const Slice& src);

7. version_set.h

如果某个SST一直在查询,说明该SST查询到数据的概率比较低,就将SST放到高层。

1. Version

针对共享的资源,有三种方式:

  • 悲观锁 ,这是最简单的处理方式。加锁保护,读写互斥。效率低。
  • 乐观锁,它假设多用户并发的事物在处理时不会彼此互相影响,各事务能够在不产生锁的的情况下处理各自影响的那部分数据。在提交数据更新之前,每个事务会先检查在该事务读取数据后,有没有其他事务又修改了该数据。 果其他事务有更新的话,正在提交的事务会进行回滚;这样做不会有锁竞争更不会产生死锁, 但如果数据竞争的概率较高,效率也会受影响 。
  • MVCC,MVCC 是一个数据库常用的概念。Multiversion concurrency control 多版本并发控制。每一个执行操作的用户,看到的都是数据库特定时刻的的快照 (snapshot), writer 的任何未完成的修改都不会被其他的用户所看到;当对数据进行更新的时候并是不直接覆盖,而是先进行标记,然后在其他地方添加新的数据(这些变更存储在versionedit),从而形成一个新版本,此时再来读取的 reader 看到的就是最新的版本了。所以这种处理策略是维护了多个版本的数据的,但只有一个是最新的(versionset中维护着全局最新的seqnum)。具体的事务的并发控制协议理论&MySQL源码分析

Version用于表示某次 compaction 或者打开/恢复的数据库状态。

VersionSet是一个双向链表,其中Version是VersionSet的节点,每个Version都有指向前后Version的指针。

全局数据结构
GetStats
struct GetStats {// 文件信息FileMetaData* seek_file;// 文件层数int seek_file_level;
};
SaverState
// 保存状态码
enum SaverState {// 没有找到kNotFound,// 找到kFound,// 已删除kDeleted,// 出错kCorrupt,
};
Saver
// 保存信息
struct Saver {// 保存状态码SaverState state;// 比较器const Comparator* ucmp;Slice user_key;std::string* value;
};
Version::Get::State
// 保存查找信息
Saver saver;// 保存查找的结果 文件 + 层数
GetStats* stats;
// 读配置
const ReadOptions* options;// key
Slice ikey;// 最后一次读的文件
FileMetaData* last_file_read;
// 最后一次读的文件层数
int last_file_read_level;VersionSet* vset;
// 查找状态
Status s;
// 找到了 和 出错了 会为true
// true表示输出State的状态s
bool found;// 在level层的FileMetaData中查找 arg = State 的ikey值
// 返回值表示是否继续查找
/*1. state->stats->seek_file == nullptr表示没有找到 state->last_file_read != nullptr表示之前读过别的  读过的上一个sst 的allowseek--表示读过多次(2次?) 但是没有找到 更新state 用于对当前sst的allowseek--操作2. 会记录每一次查找的文件信息3. 在VersionSet的表缓存中查找ikey4. 在缓存中查找出错  返回false5. 更新状态s
*/
static bool Match(void* arg, int level, FileMetaData* f);
Version::RecordReadSample::State

leveldb是基于lsm树的思想来实现,因此涉及到压缩,而在leveldb中其压缩有两种模式:minor compaction和major compaction,而major compaction中触发又包括size和seek。因此State类记录了SST文件记录的次数,是否满足触发compaction的条件。

struct State {// 文件信息GetStats stats;  // Holds first matching file// 符合条件的个数int matches;// 对State进行判断  如果是第一次匹配 则对State进行更新 // true 表示State第一将次更新static bool Match(void* arg, int level, FileMetaData* f);
};
全局方法
// 返回配置中文件的最大大小  2kb
static size_t TargetFileSize(const Options* options);// 压缩时的最大大小  10 * 2kb
static int64_t MaxGrandParentOverlapBytes(const Options* options);// 所有压缩文件的大小  25 * 2kb
static int64_t ExpandedCompactionByteSizeLimit(const Options* options);// 第层的最大字节数 10. * 1048576.0 * 10 ^ level
static double MaxBytesForLevel(const Options* options, int level);// 每层中 每个文件的最大 2kb
static uint64_t MaxFileSizeForLevel(const Options* options, int level);// 返回files中所有文件的大小总和
static int64_t TotalFileSize(const std::vector<FileMetaData*>& files);// 使用二分法查找key在files中的下标  返回下标 
int FindFile(const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& files, const Slice& key);// 如果user_key大于FileMetaData中的key 返回true
static bool AfterFile(const Comparator* ucmp, const Slice* user_key, const FileMetaData* f);// 如果user_key小于FileMetaData中的key 返回true
static bool BeforeFile(const Comparator* ucmp, const Slice* user_key, const FileMetaData* f);// 判断参数中的key范围 在文件数组中是否有重叠 
// true表示 当前范围 在文件数组中没有重叠
/*1. 如果没有经过排序  将所有的文件都遍历一次1. 最小的user_key大于文件中最大的key || 最大的user_key小于文件中最小的key 肯定没有重叠2. 有序的1. 如果small key不为空 先使用二分查找找到大于等于small key的文件下标2. 下标不合法 返回false 表示没有重叠 3. largest_user_key比文件中的key小  说明当前范围不在files[index]里 返回false 
*/
bool SomeFileOverlapsRange(const InternalKeyComparator& icmp, bool disjoint_sorted_files, const std::vector<FileMetaData*>& files, const Slice* smallest_user_key, const Slice* largest_user_key);// 返回 arg = TableCache的迭代器
static Iterator* GetFileIterator(void* arg, const ReadOptions& options, const Slice& file_value);// 将kv解析保存在Saver中 = arg
static void SaveValue(void* arg, const Slice& ikey, const Slice& v);// 如果a的序列号大于b  返回true
static bool NewestFirst(FileMetaData* a, FileMetaData* b)
成员变量
// 隶属于哪个versionset
VersionSet* vset_;  
Version* next_;    
Version* prev_;     
// 有多少服务还引用着这个版本
int refs_;  // 当前版本所有数据
// 共7层 7个指针  每个指针指向一个 vector
// 有config::kNumLevels个vector
std::vector<FileMetaData*> files_[config::kNumLevels];// 用于seek次数超过阈值之后需要压缩的文件
FileMetaData* file_to_compact_;
// 用于seek次数超过阈值之后需要压缩的文件所在的level层数
int file_to_compact_level_;// 当产生新版本时,遍历所有的层,比较该层文件总大小与基准大小,得到一个最应当 compact 的层。
// - level 0看文件个数,降低seek的次数,提高读性能,个数/4
// - level >0看文件大小,减少磁盘占用,大小/(10M**level)
// level层的个数 大于 阈值  level层的个数 / N >= 1 
// 需要压缩的最佳level的分值 用于检查size超过阈值之后需要压缩的文件
double compaction_score_;
// 需要压缩的最佳level 用于检查size超过阈值之后需要压缩的文件所在的level
int compaction_level_;
成员方法
// 初始化
explicit Version(VersionSet* vset);// 创建两层迭代器
// 第一层是对files_[level]的迭代器
// 第二层是对vset_->table_cache_表缓存的迭代器
Iterator* NewConcatenatingIterator(const ReadOptions&, int level) const;// 保存每一层的迭代器,其中第0层和非0层创建的迭代器不一样
// 放入 第0层中文件的一级缓存迭代器
// 非0层 放入表和表缓存的二级迭代器
void AddIterators(const ReadOptions&, std::vector<Iterator*>* iters);// 在当前的文件中查找user_key 将结果保存在arg中
/*1. 从最新到最老进行搜索,所以会先搜索第0层2. tmp预留第0层的存储空间 reserve3. 第0层中 将user_key可能存在的文件都记下来 因为存在数据重叠4. 按照sst文件的序号来排序,文件号越大,表示越新  // 排序之后开始搜索,如果查找出现错误 直接返回5. 查找>1的层数6. 对于每一层  使用二分法查找key在files中的下标  返回下标 7. 如果查找出现错误 直接返回
*/
void ForEachOverlapping(Slice user_key, Slice internal_key, void* arg, bool (*func)(void*, int, FileMetaData*));// 用于从SST中读取所需要的数据  返回查找状态
Status Get(const ReadOptions&, const LookupKey& key, std::string* val, GetStats* stats);// 当查找文件而没有查找到时,更新seek次数状态
bool UpdateStats(const GetStats& stats);// 统计读的样本,主要是用在迭代器中
/*1. 解析错误  返回false2. 在当前的文件中查找user_key 将结果保存在state中3. 如果有对同一个sst的多次读取 则allowed_seek更新
*/
bool RecordReadSample(Slice key);// 引用计数++
void Ref();
// 引用计数-- 如果当前Version引用计数为0 进行析构
void Unref();// 将与当前范围有重叠的文件加入到inputs中
// 如果是第0层 则更新最值 扩大搜索范围 
void GetOverlappingInputs(int level,const InternalKey* begin,  // nullptr means before all keysconst InternalKey* end,    // nullptr means after all keysstd::vector<FileMetaData*>* inputs);// 判断是否与当前level层有key重叠
bool OverlapInLevel(int level, const Slice* smallest_user_key, const Slice* largest_user_key);// 选择内存中数据dump到磁盘中的哪一层
/*
由于第0层文件频繁被访问,而且有严格的数量限制,另外多个SST之间还存在重叠,所以为了减少读放大,我们是否可以考虑将内存中的文件dump到磁盘时尽可能送到高层呢?1. 大于 level 0的各层文件间是有序的,如果放到对应的层数会导致文件间不严格有序,会影响读取,则不再尝试。
2. 如果放到 level + 1层,与 level + 2层的文件重叠很大,就会导致 compact 到该文件时,overlap文件过大,则不再尝试。
3. 最大返回 level 2,这应该是个经验值。*/
int PickLevelForMemTableOutput(const Slice& smallest_user_key, const Slice& largest_user_key);//  表示某一层有多少个sst文件
int NumFiles(int level) const;// 删除当前版本中引用计数为0的file
~Version();
2. VersionSet::Builder

Version 和 VersionEdit 之间 的加号Apply 和等号 SaveTo 的操作。

BySmallestKey仿函数比较两个FileMetaData序号的大小。

BySmallestKey{// 持有比较器const InternalKeyComparator* internal_comparator;// 返回两个FileMetaData的大小关系// 先比较两个FileMetaData的最小key,相等再比较seqbool operator()(FileMetaData* f1, FileMetaData* f2);
}
typedef std::set<FileMetaData*, BySmallestKey> FileSet;struct LevelState {// 需要删除的文件std::set<uint64_t> deleted_files;// 新增的文件FileSet* added_files;
};
VersionSet* vset_;// 当前版本节点
Version* base_;// 每一层的新增及删除文件
LevelState levels_[config::kNumLevels];
/*1. Version引用++2. 实例化BySmallestKey的比较器3. 创建7层的set (用比较器实例化set)
*/ 
Builder(VersionSet* vset, Version* base);/*1. 将每层的levels_[level].added_files复制出来 再解引用2. Version引用--
*/ 
~Builder();// 将VersionEdit中所有的变化更新到versionset中
/*1. 将edit中下一次需要压缩的level和最大key更新到versionset中2. 将edit中需要删除的文件(level+文件编号)更新到versionset中3. 针对新增的文件,也需要更新到versionset中,而且需要设置allowed_seeks新增的文件需要在已删除的文件中擦出
*/
void Apply(VersionEdit* edit);// 判断是否应该将当前SST添加进Version版本中
// 如果FileMetaData在删除的数组中  无操作
// FileMetaData添加到Version版本中
void MaybeAddFile(Version* v, int level, FileMetaData* f);// 将当前edit状态应用到version中  edit + version = new version
/*1. 实例化BySmallestKey的比较器2. 拿到Version版本中每层的FileMetaData列表3. 拿到edit中每层的新增文件4. 对Version版本中的文件个数大小进行分配5. 遍历每一个新的文件 比added_file小的和将added_file加入到Version中6. 将剩下的加入到Version中  --added_file后面的
*/
void SaveTo(Version* v);
3. VersionSet::Recover::LogReporter

保存了一个状态。

struct LogReporter : public log::Reader::Reporter {Status* status;void Corruption(size_t bytes, const Status& s) override {if (this->status->ok()) *this->status = s;}
};
4. VersionSet

Versionset是一个双向链表,而且整个db只会有一个versionset。

// 环境封装
Env* const env_;
// 数据库名
const std::string dbname_;
// 配置
const Options* const options_;
// 表缓存
TableCache* const table_cache_;
// 比较器
const InternalKeyComparator icmp_;
// 下一个文件编号
uint64_t next_file_number_;
// manifest文件编号
uint64_t manifest_file_number_;
// 最后一个seq_num
uint64_t last_sequence_;
// 记录当前日志编号
uint64_t log_number_;
// 记录上一个日志编号
uint64_t prev_log_number_; // 用于写manifest文件
WritableFile* descriptor_file_;
// 用于写manifest文件,其中log格式和wal保持一致
log::Writer* descriptor_log_;
// 固定指向双向链表的head,而且其Pre指向最新current
Version dummy_versions_;  
// 指向当前最新版本
Version* current_;        // == dummy_versions_.prev_
// 记录每一层在下一次需要压缩的largest key(InternalKey)
std::string compact_pointer_[config::kNumLevels];
// 找到files中最大的key 保存在largest_key中
// files为空返回false
bool FindLargestKey(const InternalKeyComparator& icmp,const std::vector<FileMetaData*>& files,InternalKey* largest_key);//  找到一个比largest_key最新user_key的FileMetaData
FileMetaData* FindSmallestBoundaryFile(const InternalKeyComparator& icmp,const std::vector<FileMetaData*>& level_files,const InternalKey& largest_key);// 将level_files中最大的InternalKey 所在的文件添加到compaction_files中
void AddBoundaryInputs(const InternalKeyComparator& icmp, const std::vector<FileMetaData*>& level_files, std::vector<FileMetaData*>* compaction_files)
// 在Level+1层获取所有与当前的文件集合有Key重合的文件。
// level n+1 的 sstable 确定   c->inputs_[0]这里已经有了level层的数据了
/*1.  将current_->files_[level]中最大的InternalKey 所在的文件添加到c->inputs_[0]中 处理level层的临界值,因为随着压缩,同一个user_key可能处于两个SST2. 拿到c->inputs_[0]的范围 将level + 1层与 level 层重叠的文件  加入到c->inputs_[1]中3. 拿到level层和level+1层的最大数据范围4. 如果level层和level+1层的数据有重叠, 进行压缩如果level层对应的level+1层的范围 可以在level层进行更多的匹配而且level+1层的个数也不会变化 于是可以进行压缩5. 先找到第level层中最大数据范围的文件 将level层的临界值加入6. 第level层的总大小 第level+1层的总大小 level中 最大数据范围 的大小7. 参与压缩 = level层所有 + level+1层涉及的 扩展后有新增的文件  参与压缩的字节小于limit1. 拿到level层新增的文件的范围 拿到level+1层新增的文件2. 新旧 level+1层的文件个数不变  更新level层的个数和最值8. 更新level+2层 中 前两层最大数据范围内的文件9. 更新下一次要压缩的值 记录当前压缩到哪个key了 
*/
void SetupOtherInputs(Compaction* c);
// 复用Manifest文件 出错返回FALSE
/*1. reuse_logs参数必须要打开2. 解析dscbase出错 获取文件大小出警  文件太大 返回false3. 以追加的方式打开dscname文件4. 保存写日志对象 更新文件的个数
*/
bool ReuseManifest(const std::string& dscname, const std::string& dscbase);// 收尾工作,计算下一次需要压缩的最佳level 保存到Version中
// 当产生新版本时,遍历所有的层,比较该层文件总大小与基准大小,得到一个最应当 compact 的层。
// - level 0看文件个数,降低seek的次数,提高读性能,个数/4
// - level >0看文件大小,减少磁盘占用,大小/(10M**level)
void Finalize(Version* v);// 获取给定input范围的最大值和最小值
void GetRange(const std::vector<FileMetaData*>& inputs, InternalKey* smallest,InternalKey* largest);
// 获取给定input1 + inputs2范围的最大值和最小值
void GetRange2(const std::vector<FileMetaData*>& inputs1,const std::vector<FileMetaData*>& inputs2,InternalKey* smallest, InternalKey* largest);// Save current contents to *log
// 将当前的日志写入快照
/*1. 每一层要压缩的key添加到VersionEdit中2. 将当前Version版本添加到VersionEdit中3. 序列化edit  并记录
*/
Status WriteSnapshot(log::Writer* log);// 将新版本追加到versionset中,并更新current
// 插入双向链表
void AppendVersion(Version* v);// 构造函数,根据参数进行初始化  创建一个空的Version节点
VersionSet(const std::string& dbname, const Options* options, TableCache* table_cache, const InternalKeyComparator*);// 当前Version节点-- 确保是空的 删除manifest创建的资源
~VersionSet();// 将edit应用到当前版本,生成新版本,并将当前状态进行持久化
// 其中在持久化时候不需要加锁
// 用当前的Version和VersionEdit中的改动进行merge,产生新的版本
/*1. 设置VersionEdit基本信息2. 设置下一个文件所使用的编号3. 构建新的版本 current_ + VersionEdit = new Version4. 选择下一次需要压缩的文件5. 第一次时,versionedit的日志文件还没被创建,比如刚打开一全新的数据库时,此时需要将当前的的版本的状态作为base状态写入快照。6. manifest刷盘7. 如果manifest文件有新增,因为CURRENT永远指向的是最新的,此时CURRENT文件也需要进行更新8. 更新新版本AppendVersion,并更新最新的log_number
*/
Status LogAndApply(VersionEdit* edit, port::Mutex* mu); // 从持久化的状态恢复(打开db时候会调用该函数)
/*1. 读取CURRENT文件2. 每次都读文件 保存每一个versionedit3. 生成新版本,并计算下一次待压缩的文件,更新current指针4. 是否复用已有的manifest文件
*/
Status Recover(bool* save_manifest);// 获取当前版本Version
Version* current() const;// 获取manifest文件编号
uint64_t ManifestFileNumber() const; // 分配并返回全局新的文件编号,该编号从manifest读取并且初始化的
uint64_t NewFileNumber(); // 重用FileNum,比如说manifest
// 如果file_number + 1 == 最新的file_number  这个才可以重用
void ReuseFileNumber(uint64_t file_number);// 某一层文件总个数
int NumLevelFiles(int level) const;// 某一层文件总字节
int64_t NumLevelBytes(int level) const;// 返回当前最大的seq
uint64_t LastSequence() const;// 设置最大的Sequence
void SetLastSequence(uint64_t s);// 标记number已经被使用
// 下一个kFileNum从number+1开始
void MarkFileNumberUsed(uint64_t number);// 获取当前日志编号
uint64_t LogNumber() const;// 返回前一个已经压缩的日志编号,目前并未使用,默认0表示没这个文件
uint64_t PrevLogNumber() const;// 选择参与压缩的level和文件
/*1. 判断是否需要进行压缩  NeedsCompaction2. 优先  level层的个数 大于 阈值1. 遍历这一层的每个文件 将符合条件的所有文件加入c->inputs_[0]2. 如果c->inputs_[0]为空,那么就使用第一个3. 使用seek 压缩  将seek=0的文件加入c->inputs_[0]4. 第0层可能存在重叠 所以要遍历所有的文件5. 确定level+1层要压缩的文件6. 返回Compaction
*/ 
Compaction* PickCompaction(); // 手动压缩:
/*1. 将与当前范围有重叠的文件加入到inputs中2. 最多只能压缩MaxFileSizeForLevel大3. 确定level+1层要压缩的文件4. 返回Compaction
*/
Compaction* CompactRange(int level, const InternalKey* begin, const InternalKey* end);// 获取level+1层(level >= 1)重叠部分的字节数
int64_t MaxNextLevelOverlappingBytes();// 为参与压缩的文件创建一个迭代器 创建多层合并迭代器,用于Compaction后的数据进行merge。
// - 第0层需要互相merge,因此可能涉及到文件有多个
// - 其他层,只需要创建一个即可
/*1. 第0层要实例化c->inputs_[0].size()个迭代器  否则只要实例化1个迭代器  迭代器个数+12.   * which = 0 :* level层不为空  *   如果c->level()是第0层    遍历第0层的文件 实例化每个文件的迭代器*   如果c->level()大于第0层  实例化一个LevelFileNumIterator迭代器* * which = 1 :* level + 1层不为空  *   如果c->level() + 1层是大于第0层    实例化一个LevelFileNumIterator迭代器3. 实例化NewMergingIterator迭代器
*/
Iterator* MakeInputIterator(Compaction* c);// 判断是否需要压缩(size / seek触发)
bool NeedsCompaction() const {Version* v = current_;// seek或者size任何一个满足return (v->compaction_score_ >= 1) || (v->file_to_compact_ != nullptr);
}// 读取所有版本的SST number数据 插入到dummy_versions_中
void AddLiveFiles(std::set<uint64_t>* live);// 查询某个key大约需要访问的总字节数(主要是SST的大小)
/*1. 如果该SST中最大的key都比要比较的key要小,那key肯定不再该SST中 大小累加2. 如果最小的key都比ikey要大,针对level>0的SST来说,后面不用比较了3. 定位到SST之后,去内部去查,他大约在什么位置 大小累加
*/
uint64_t ApproximateOffsetOf(Version* v, const InternalKey& key);// 每一行一个level文件元数据,主要是文件的大小(human-readable)
struct LevelSummaryStorage {char buffer[100];
};// 拼接每一层的大小
const char* LevelSummary(LevelSummaryStorage* scratch) const;
5. Compaction

Minor Compaction : immutable memtable持久化为 sst 文件。

检查内存中memtable占用内存大小,一旦超过 options_.write_buffer_size (default 4M),就会尝试 Minor Compaction。

  • 会产生新版本
  • 尽可能放在2层,因为level0层经常被访问,内部又允许重叠,如果文件过多,会放大读性能。

Major Compaction:将不同层级的 sst 的文件进行合并。

  1. ​ Seek Compaction

    Seek compaction主要记录的是某个SST seek次数到达阈值之后,将会参与下一次压缩。

    分散在Get和迭代器迭代中。

  2. Size Compaction

    Size Compaction主要是指某一层SST文件不能太大,这个大对0层来说是SST文件过多,因为level 0层会被频繁访问,而对于其他层表示字节数太大,具体见Builder类的Finalize函数。

  3. Manual Compaction

    Manual Compaction,是指人工触发的Compaction,由外部接口调用产生,实际其内部触发调用的接口是DBImpl中的CompactRange(const Slice begin, const Slice end)。在 Manual Compaction 中会指定的 begin 和 end,它将会一个level 一个level 的分次的Compact 所有level 中与begin 和 end 有重叠(overlap)的 sst 文件。

// 需要压缩的level层数
int level_;// 压缩之后最大的文件大小,等于options->max_file_size
uint64_t max_output_file_size_;// 当前需要操作的版本  -- 链表中的节点
Version* input_version_;// 本次压缩操作对应的文件变更
VersionEdit edit_;// level和level+1两层需要参与压缩的文件元数据
std::vector<FileMetaData*> inputs_[2];  // level_ + 2 元数据
std::vector<FileMetaData*> grandparents_;// 遍历的下标,level_ + 2 层小于某个key
size_t grandparent_index_;  // 一个判断是否是第一个key的标志
bool seen_key_;  // 当前压缩与level_ + 2 files重叠的字节数
int64_t overlapped_bytes_; // 用于记录某个user_key与>=level+2中每一层不重叠的文件个数
size_t level_ptrs_[config::kNumLevels];
// 初始化
Compaction(const Options* options, int level);// 对Version对象unref
~Compaction();// 返回将要压缩的level层数
int level() const;// 返回版本变化
VersionEdit* edit(); // 返回对应层次参与压缩的文件个数 level or level+1
int num_input_files(int which) const;// 获取某一层 第i个文件的SST元数据
FileMetaData* input(int which, int i) const;// 本次压缩产生的最大文件大小
uint64_t MaxOutputFileSize() const;// 表示本次是否可以将本次SST直接移动到上一层
/* 1.level和level+1层没有重叠2.level层的grandparents_重叠度小于阈值(避免后面到level+1层时候阖level+2重叠度太大)
*/
bool IsTrivialMove() const;// 将所有需要删除SST文件添加到*edit
void AddInputDeletions(VersionEdit* edit);// 在>=(level+2)层比user_key小的f个数 
// true表示高层没有这个user_key false表示在某个范围内包含user_key
bool IsBaseLevelForKey(const Slice& user_key);// 为了避免合并到level+1层之后和level+2层重叠太多,导致下次合并level+1时候时间太久,因此需要及时停止输出
// 试想:如果一个internal_key很大  在压缩了level+1层之后还是能与level+2层有很多SST的范围重叠
// 使得合并的level+1层也要和level+2层进行很多个SST进行压缩  
// 如果返回true 表示将这个key写到新的SST中 这样在level层压缩internal_key之前的SST后成了一个SST
// 再将internal_key部分SST压缩就会减少压缩涉及的SST个数/*1. level_ + 2下标 小于level_ + 2层f的个数 &&internal_key 大于 level_ + 2下标f的最大key如果是第一个key(构造函数初始化为false),无需处理如果不是 累加重复大小2. internal_key 小于等于level_ + 2 层下标为grandparent_index_的largest key3. 如果超过limit 返回true 表示要停止
*/
bool ShouldStopBefore(const Slice& internal_key);// 当压缩结束需要释放input_version_
void ReleaseInputs();

8. builder.h

// 构建新的SST
/*1. memtable的迭代器iter指向起始位置2. 构建SST文件名 ...ldb3. 将file指向SST文件4. 用file实例化SST的TableBuilder5. 迭代器的遍历  将key写入文件中  FileMetaData记录当前文件信息meta迭代器指向的第一个key是最小值  最后一个key是最大值6. 完成写入  并将文件大小更新到当前文件信息meta中7. 对文件进行刷盘并关闭8. 构建当前文件的迭代器table_cache9. 如果出错 删除文件
*/
Status BuildTable(const std::string& dbname, Env* env, const Options& options, TableCache* table_cache, Iterator* iter, FileMetaData* meta);

9. snapshot.h

1. SnapshotImpl

双向链表中快照的节点。

SnapshotImpl* prev_;
SnapshotImpl* next_;
// 每个SnapshotImpl与一个sequence number关联
const SequenceNumber sequence_number_;
// 属于哪个快照版本
SnapshotList* list_ = nullptr;
// 返回sequence_number
SequenceNumber sequence_number() const { return sequence_number_; }
2. SnapshotList

用一个双向链表维护快照的信息。

离head越近,说明快照版本越新。

// 快照的空头节点dummy 不存值
SnapshotImpl head_;
SnapshotList();
bool empty() const;
// 返回后一个节点
SnapshotImpl* oldest();
// 返回前一个节点
SnapshotImpl* newest();
// 插入双向链表
SnapshotImpl* New(SequenceNumber sequence_number);
// 删除双向链表
void Delete(const SnapshotImpl* snapshot);

10. log_format.h

enum RecordType {kZeroType = 0,kFullType = 1,kFirstType = 2,kMiddleType = 3,kLastType = 4
};
static const int kMaxRecordType = kLastType;
static const int kBlockSize = 32768;
// Header is checksum (4 bytes), length (2 bytes), type (1 byte).
static const int kHeaderSize = 4 + 2 + 1;

11. log_writer.h

定入日志。

成员变量
// 写文件的对象
WritableFile* dest_;// 当前block的长度
int block_offset_;  // crc
uint32_t type_crc_[kMaxRecordType + 1];
全局函数
// 初始化crc
static void InitTypeCrc(uint32_t* type_crc);
成员函数
// 从ptr写入header + length数据
Status EmitPhysicalRecord(RecordType type, const char* ptr, size_t length);// 写入数据
/*1. 检查还有多少空间可以写2. 如果小于kHeaderSize, 就是日志头信息要占7个字节,小于7字节用0填充开启新的block3. 取到  写入数据大小  和 block中的剩余大小  的较小值4. 计算是否刚好填满这个blockleft == avail: fragment_length = avail  trueleft < avail: fragment_length = left    trueleft > avail: fragment_length = avail   false5. kFullType = 刚好能装下kFirstType = 一个装不下kLastType = 上一份数据到本block  结束kMiddleType = 两个block都放不下这么多数据6. 将数据写入 更新  已经写入一块了 下一块可能也是这份数据
*/
Status AddRecord(const Slice& slice);explicit Writer(WritableFile* dest);// 初始化crc
Writer(WritableFile* dest, uint64_t dest_length);

12. log_reader.h

Reader::Reporter

报告错误的虚基类。

virtual ~Reporter();
virtual void Corruption(size_t bytes, const Status& status) = 0;
Reader
成员变量
enum {kEof = kMaxRecordType + 1,kBadRecord = kMaxRecordType + 2
};// 顺序读文件对象
SequentialFile* const file_;// 报告错误对象
Reporter* const reporter_;// crc校验标志位
bool const checksum_;// buffer_的后备缓存 
char* const backing_store_;// 读数据的缓存
Slice buffer_;// 如果读到的数据小于kBlockSize表示读到了最后一个文件
bool eof_;  // 最后一次读的偏移量
uint64_t last_record_offset_;// 缓冲区结束后第一个位置的偏移
// 记录从initial_offset_开始的那个block起 读到的数据偏移量
uint64_t end_of_buffer_offset_;// 记录开始读取数据的偏移量
uint64_t const initial_offset_;// true 表示要在一次seek后重新同步
bool resyncing_;
成员函数
// 报错
// 如果读到的数据比其偏移量小 报错如果读到的数据比其偏移量小 报错
void ReportDrop(uint64_t bytes, const Status& reason);// ReportDrop的封
void ReportCorruption(uint64_t bytes, const char* reason);// 初始化变量到开始读数据的位置
bool SkipToInitialBlock();// 读取数据部分 返回数据的类型
/*1. 如果buffer长度小于kHeaderSize=7 第一次读1. 如果上次读取了kBlockSize大小,这次也读kBlockSize大小1. 如果读取失败 清空缓存,处理报错 eof置true 2. 如果读取大小小于kBlockSize eof置true 3. 读取正常 continue2. 如果已经读到了最后一块了,直接返回kEof2.  读到最后一块小于kBlockSize的数据  或者  读到kBlockSize大小的数据3. 记录数据开始位置  解析长度,数据类型 拼接长度4. 如果数据长度位置比读到的数据大1. 如果读取的不是最后一个block 报错2. 如果读取的是最后一个block 如果在未读取有效负载的|length|字节的情况下到达文件末尾,则假设写入程序在写入记录的过程中死亡。不报错直接返回5. kZeroType是预分配文件 直接返回6. 如果配置了crc校验  进行检查7. buffer_右移这次读到的数据8. kHeaderSize + length + initial_offset_ == end_of_buffer_offset_如果buffer_.size()为空说明正确9. result保存数据部分 返回数据类型
*/
unsigned int ReadPhysicalRecord(Slice* result);// 初始化
Reader(SequentialFile* file, Reporter* reporter, bool checksum, uint64_t initial_offset);// 释放backing_store_
~Reader();// 读一段数据  scratch存放上次没读完的数据 record存放一段数据结果
bool ReadRecord(Slice* record, std::string* scratch);// 返回最后一次读取数据的位置
uint64_t LastRecordOffset();

13. dumpfile.cc

全局函数
// 检查文件名和type类型的合法性
bool GuessType(const std::string& fname, FileType* type);// 打印日志内容
Status PrintLogContents(Env* env, const std::string& fname,
void (*func)(uint64_t, Slice, WritableFile*), WritableFile* dst);// 写入 pos 和 record
static void WriteBatchPrinter(uint64_t pos, Slice record, WritableFile* dst);// 写入日志
Status DumpLog(Env* env, const std::string& fname, WritableFile* dst) {return PrintLogContents(env, fname, WriteBatchPrinter, dst);
}// 写入VersionEdit信息
static void VersionEditPrinter(uint64_t pos, Slice record, WritableFile* dst);// 写入VersionEdit信息
Status DumpDescriptor(Env* env, const std::string& fname, WritableFile* dst) {return PrintLogContents(env, fname, VersionEditPrinter, dst);
}// 写入table数据
Status DumpTable(Env* env, const std::string& fname, WritableFile* dst);// 根据文件名判断写入数据内容 log日志 VersionEdit信息 table数据
Status DumpFile(Env* env, const std::string& fname, WritableFile* dst);
CorruptionReporter
// 长度报错 写入文件
void Corruption(size_t bytes, const Status& status);
WritableFile* dst_;
WriteBatchItemPrinter

写入kv

  void Put(const Slice& key, const Slice& value) override {std::string r = "  put '";AppendEscapedStringTo(&r, key);r += "' '";AppendEscapedStringTo(&r, value);r += "'\n";dst_->Append(r);}void Delete(const Slice& key) override {std::string r = "  del '";AppendEscapedStringTo(&r, key);r += "'\n";dst_->Append(r);}WritableFile* dst_;

14. db_impl.h

DBImpl::ManualCompaction

手动压缩。

struct ManualCompaction {int level;bool done;const InternalKey* begin;  // null means beginning of key rangeconst InternalKey* end;    // null means end of key rangeInternalKey tmp_storage;   // Used to keep track of compaction progress
};
DBImpl::CompactionStats
struct CompactionStats {CompactionStats() : micros(0), bytes_read(0), bytes_written(0) {}void Add(const CompactionStats& c) {this->micros += c.micros;this->bytes_read += c.bytes_read;this->bytes_written += c.bytes_written;}int64_t micros;int64_t bytes_read;int64_t bytes_written;
};
DBImpl::Writer

WriteBatch和CondVar进行封装,可以进行互斥操作。

struct DBImpl::Writer {explicit Writer(port::Mutex* mu): batch(nullptr), sync(false), done(false), cv(mu) {}Status status;WriteBatch* batch;bool sync;bool done;port::CondVar cv;
};
DBImpl::CompactionState
struct DBImpl::CompactionState {// Files produced by compaction// 记录压缩产生的数据struct Output {uint64_t number;uint64_t file_size;InternalKey smallest, largest;};Output* current_output() { return &outputs[outputs.size() - 1]; }explicit CompactionState(Compaction* c): compaction(c),smallest_snapshot(0),outfile(nullptr),builder(nullptr),total_bytes(0) {}Compaction* const compaction;// 记录最小的seq_num,如果小于等于smallest_snapshot,表示可以删除SequenceNumber smallest_snapshot;// 压缩数据集合std::vector<Output> outputs;// 写文件WritableFile* outfile;// 构建sstTableBuilder* builder;// 大小uint64_t total_bytes;
};
DBImpl::Writer

记录memtable和immemtable。

struct IterState {port::Mutex* const mu;Version* const version GUARDED_BY(mu);MemTable* const mem GUARDED_BY(mu);MemTable* const imm GUARDED_BY(mu);IterState(port::Mutex* mutex, MemTable* mem, MemTable* imm, Version* version): mu(mutex), version(version), mem(mem), imm(imm) {}
};
DBImpl
成员变量
Env* const env_;
const InternalKeyComparator internal_comparator_;
const InternalFilterPolicy internal_filter_policy_;
// 对配置作出限制
const Options options_;  
// 表示是否启用了info_log打印日志,如果启用了后期需要释放内存
const bool owns_info_log_;
// 表示是否启用block缓存
const bool owns_cache_;
// 数据库名字
const std::string dbname_;// table缓存,用于提高性能,缓存了SST的元数据信息
TableCache* const table_cache_;// 文件所保护只有有一个进程打开db
FileLock* db_lock_;// State below is protected by mutex_
port::Mutex mutex_;
// 表示db是否关闭
std::atomic<bool> shutting_down_;
port::CondVar background_work_finished_signal_ GUARDED_BY(mutex_);
// 活跃memtable
MemTable* mem_;
MemTable* imm_ GUARDED_BY(mutex_); 
// 表示是否已经有一个imm_,因为只需要一个线程压缩,也只会有一个imm
std::atomic<bool> has_imm_;        
// wal log句柄
WritableFile* logfile_;
// 当前日志编号
uint64_t logfile_number_ GUARDED_BY(mutex_);
log::Writer* log_;
// 用于采样
uint32_t seed_ GUARDED_BY(mutex_);  // For sampling.// 用于批量写
std::deque<Writer*> writers_ GUARDED_BY(mutex_);
WriteBatch* tmp_batch_ GUARDED_BY(mutex_);
// 快照,leveldb支持从某个快照读取数据
SnapshotList snapshots_ GUARDED_BY(mutex_);// 保护某些SST文件不被删除,主要是在CompactMemTable中
std::set<uint64_t> pending_outputs_ GUARDED_BY(mutex_);// 表示后台压缩线程是否已经被调度或者在运行
bool background_compaction_scheduled_ GUARDED_BY(mutex_);
// 手动压缩句柄
ManualCompaction* manual_compaction_ GUARDED_BY(mutex_);
// 版本管理
VersionSet* const versions_ GUARDED_BY(mutex_);Status bg_error_ GUARDED_BY(mutex_);
// 记录压缩状态信息,用于打印
CompactionStats stats_[config::kNumLevels] GUARDED_BY(mutex_);
全局函数
// 保证*ptr在minvalue,maxvalue的范围内  如果超过了,就设置为边界值
static void ClipToRange(T* ptr, V minvalue, V maxvalue);
// 对Options的参数进行限制 新建日志 新建缓存
Options SanitizeOptions(const std::string& dbname,const InternalKeyComparator* icmp,const InternalFilterPolicy* ipolicy,const Options& src);
// 保留大约10个文件用于其他用途,其余的交给TableCache。
// max_open_files-=10
static int TableCacheSize(const Options& sanitized_options);// 释放IterState持有的对象
static void CleanupIteratorState(void* arg1, void* arg2);
成员函数
// 初始化变量
DBImpl(const Options& options, const std::string& dbname);// 等待后台工作线程执行完成 解锁文件 析构变量
~DBImpl() override;// 新建一个manifest文件 保存在env_中
Status NewDB();// 恢复日志
/*1. 创建目录 锁住 dbname + "/LOCK" 文件2. 如果dbname + "/CURRENT" 文件不存在 如果配置中允许创建 创建文件否则返回错误3. 如果dbname + "/CURRENT" 文件存在  但是文件有错返回错误4. 获取所有文件名 获取所有的在线的sst文件5. 遍历目录下的文件 sst文件和目录下的文件相同的全部删除另存为大于当前编号的日志,其实这部分用于恢复memtable那一部分5. 如果sst文件和目录下的文件有不对应的部分 返回错误6. 排序 恢复memtable日志
*/
Status Recover(VersionEdit* edit, bool* save_manifest);// 忽略当前错误
// 如果当前状态不为ok  写入日志 状态设为ok
void MaybeIgnoreError(Status* s) const;// 删除过时的文件和缓存
/*1, 小于当前的日志编号 或者不等于前一个压缩的日志编号 删除2. 小于当前Manifest文件编号3. 如果在当前所有版本中都没有查到该SST,那肯定直接删除4. 如果在当前所有版本中都没有查到该TempFile,那肯定直接删除
*/
void RemoveObsoleteFiles() EXCLUSIVE_LOCKS_REQUIRED(mutex_);// 把MemTable构建成SST文件FileMetaData
// 如果 Version不为空 选择FileMetaData存放的层数 否则放到每0层
// edit是SST文件的信息 记录压缩状态信息
// 把MemTable构建成SST文件FileMetaData 写到(base中)相应的层数
Status WriteLevel0Table(MemTable* mem, VersionEdit* edit, Version* base) ;// 将第log_number个日志数据恢复成SST文件
/*1.  定义一个LogReporter 表示 出错的struct 继承log::Reader::Reporter 用于记录在当前函数中可能出现的问题2. 打开日志文件 顺序读取日志3. 实例化 LogReporter 出错的struct4. 读文件对象 进行crc校验  从文件开始读5. 循环读取kBlockSize = 32768日志数据 将数据放入record 和 scratch6. 将数据写入WriteBatch对象,便于批量写入7. 实例化一个MemTable对象  把batch数据解析到MemTable中8. 判断数据的max_sequence9. 如果MemTable中使用的大小 大于 配置中的大小 就直接放入第0层10.   	// 如果日志的数据量 > options_.write_buffer_size// 1. 如果日志的数据量 = n * options_.write_buffer_size  mem = nullptr compactions>0// 2. 否则                                              mem != nullptr compactions>0// 3. 如果日志的数据量 < options_.write_buffer_size        mem != nullptr compactions=0  // 4. 当文件存在但是没有数据 mem = nullptr compactions=0 11. 如果配置的重用last_log reuse_logs 并且 日志的数据量 < options_.write_buffer_size获取last_log日志大小  并 更新logfile_ log_12. 只有2,3两种情况  如果MemTable不为空 放入第0层
*/
Status RecoverLogFile(uint64_t log_number, bool last_log, bool* save_manifest, VersionEdit* edit, SequenceNumber* max_sequence);// 返回user_comparator比较器
const Comparator* user_comparator() const {return internal_comparator_.user_comparator();
}// 唤醒所有线程
void RecordBackgroundError(const Status& s);// 压缩MemTable minor压缩
/*1. 构建Version,产生新的版本2. 将imm_中的SST放到base中的相应层数,SST信息更新到edit中 3. 设置edit 前一个日志编号是0, 设置当前的日志编号 构建新版本4. 设置进行压缩的标志 清空无效对象5. 唤醒所有线程
*/
void CompactMemTable() EXCLUSIVE_LOCKS_REQUIRED(mutex_);// 释放CompactionState对象 从pending_outputs_移除
void CleanupCompaction(CompactionState* compact);// 打开压缩文件
/*1. 获取文件编号 将文件编号插入pending_outputs_保护2. 插入压缩信息3. 实例化新的压缩文件指针WritableFile 创建压缩SST对象
*/
Status OpenCompactionOutputFile(CompactionState* compact);// 完成压缩的工作
/*1. 获取最近一个output_number2. 进行SST的收尾工作3. 当获取前数据个数和文件大小 compact更新文件大小4. 刷盘操作5. 确保SST可用
*/
Status FinishCompactionOutputFile(CompactionState* compact, Iterator* input);// 将结果记录到 version,并更新versionedit,最后持久化
Status InstallCompactionResults(CompactionState* compact);// 压缩工作
/*1. 确定可删除的key的seq_num如果没有snapshot,那么就以version最后的为主,否则以快找中的最小的为主2. 构建merge迭代器(本质上归并排序)3. 循环要压缩的数据1. 优先进行minor压缩 优先dump第0层的文件2. 如果合并后的key与grandparent重叠的字节大于阈值,此时需要开启新的sst文件FinishCompactionOutputFile3. 相同的user_key第一次出现时候,不能被删除, 设置了最大值kMaxSequenceNumber4. 如果key的last_sequence小于快照的最小keysmallest_snapshot,要删除,drop = false5. 该key被标记为删除 && 过于古老 && 更高层已经没有该key了,要删除, drop = false6. drop = true 如果不能丢弃,需要将其加入到builder中1. 如果没有创建SST 构建SST文件前置工作OpenCompactionOutputFile()2. 记录当前SST最小的key 添加key3. 文件太大,需要及时刷出,并创建新的FinishCompactionOutputFile7. 在下一轮循环写上一轮的数据,所以最后进行收尾工作FinishCompactionOutputFile8. 将合并产生的新文件应用到新版本中,并删除掉旧文件InstallCompactionResults()*/
Status DoCompactionWork(CompactionState* compact);// 开始后台压缩
/*1. 优先minor压缩CompactMemTable()  将memtable写到level0中2. 判断是否手动压缩1. 如果是手动压缩 挑选本次参与压缩的文件versions_->CompactRange2. 非手动压缩则挑选本次参与压缩的文件PickCompaction3. 非手动压缩:1. 尽可能不压缩,如果可以直接移动,那最好, 直接移动的条件:// 1.第0层只有一个// 2.level和level+1层没有重叠// 3.level层的grandparents_重叠度小于阈值(避免后面到level+1层时候阖level+2重叠度太大)更新元数据:从level层删除,加到level+1层中生成新的版本出错处理2. 否则老老实实进行压缩和清理工作1. 使用CompactionState对挑选的文件进行压缩DoCompactionWork2. 删除无用的文件 CleanupCompaction RemoveObsoleteFiles
*/
void BackgroundCompaction();// 后台线程调用
/*1. 确保后台线程在运行数据库关闭不会有后台线程了,不需要操作出错,不需要操作否则开始后台压缩BackgroundCompaction()2. 执行完后台线程 background_compaction_scheduled_ = false3. 有可能之前产生太多的文件,需要重新调度开启压缩 MaybeScheduleCompaction()
*/
void BackgroundCall();// 本质调用BackgroundCall函数
static void BGWork(void* db);// 可能进行的压缩操作 
// background_compaction_scheduled_ = true;
// env_->Schedule(&DBImpl::BGWork, this);
void MaybeScheduleCompaction();// 对mem进行压缩
// force=false表示允许延迟
/*循环:1. 如果有错误,直接结束2. 如果允许延迟,而且文件个数大于kL0_SlowdownWritesTrigger=8但是没有超过硬限制,那么可以等1s.但是只能等1次3. 如果不能延迟,而且mem又有充足的空间,那么也不不需要进行处理4. 如果已经有一个minior 压缩的线程了 imm_ ,此时就不能在启一个了,等待wait5. 如果第0层文件超过了硬限制kL0_StopWritesTrigger=12,此时需要停止写了。等待着被唤醒,等待wait6. 否则就需要创建新的mem,开始minior 压缩1. 获取new_log_number2. 写文件的对象3. 如果出错,重用new_log_number4. 更新文件 imm_  has_imm_标记压缩任务5. 开始压缩
*/
Status MakeRoomForWrite(bool force /* compact even if there is room? */);// 手动压缩
void TEST_CompactRange(int level, const Slice* begin, const Slice* end);// 如果memtable没有重叠 跳过
Status TEST_CompactMemTable();// 将level>=1的begin到end的范围压缩
void CompactRange(const Slice* begin, const Slice* end) override;// 返回遍历未持久化数据的迭代器
/*1. 当前活跃的memtable和immemtable中的数据插入2. 生成将几个memtable进行合并3. 将这些实例交给CleanupIteratorState进行释放 
*/
Iterator* NewInternalIterator(const ReadOptions&, SequenceNumber* latest_snapshot, uint32_t* seed);// 返回当前快照+未持久化数据的迭代器
Iterator* NewIterator(const ReadOptions&) override;// 查找key匹配的次数,可能触发seek更新
void RecordReadSample(Slice key);// 新建一个快照
const Snapshot* GetSnapshot() override;// 释放一个快照
void ReleaseSnapshot(const Snapshot* snapshot) override;// 获取key对应的value
/*1. 快照判断 如果传进来的options指定了快照,就从指定快照开始读2. 查memtable 查immemtable查immemtable 查SST3. UpdateStats(stats) seek压缩检查
*/
Status Get(const ReadOptions& options, const Slice& key, std::string* value) override;// 要求Writer列表不空且第一个Writer有非空的Writerbatch
// 将writers_中的有条件 大小范围内的  内容写到WriteBatch中
WriteBatch* BuildBatchGroup(Writer** last_writer) ;// 写入数据
Status Write(const WriteOptions& options, WriteBatch* updates) override;// 插入数据
Status Put(const WriteOptions&, const Slice& key, const Slice& value) override;// 删除数据
Status Delete(const WriteOptions&, const Slice& key) override;// 计算某种类型数据的情况,存在value中
bool GetProperty(const Slice& property, std::string* value) override;// 计算range的总偏移量
void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override;

注:

mutex_.AssertHeld();

用在mutex上,assertHeld标识已经持有了锁,意味着下来的代码处理中,就不要再对mute加锁了,可以直接当已经加锁了。

AssertHeld是leveldb基于编译器的线程安全注解(thread safety annotations)实现的一个断言函数,这个是给编译器看的,如果编译器发现执行这里之前没有加锁,就会报错。

15. db_iter.h

DBImpl的迭代器。

成员变量
// kForward=next  kReverse=prev
enum Direction { kForward, kReverse };
// 持有DBImpl对象
DBImpl* db_;
// 比较器
const Comparator* const user_comparator_;
// merge迭代器
Iterator* const iter_;
// SequenceNumber
SequenceNumber const sequence_;
// 出错状态
Status status_;// 记录上一次操作的值
// key
// == current key when direction_==kReverse
std::string saved_key_; 
// value
// == current raw value when direction_==kReverse
std::string saved_value_; // 顺序
Direction direction_;
// 有效性
bool valid_;
// 随机类
Random rnd_;
// 
size_t bytes_until_read_sampling_;
成员函数
// 返回一个随机数
size_t RandomCompactionPeriod();
// dst = k
inline void SaveKey(const Slice& k, std::string* dst);
// 清除保存的value
inline void ClearSavedValue();
// 把iter_->key()解析到ParsedInternalKey  ikey
bool ParseKey(ParsedInternalKey* key);
// 查找下一个有效的key
void FindNextUserEntry(bool skipping, std::string* skip);
// 查找上一个有效的key
void FindPrevUserEntry();// 初始化
DBIter(DBImpl* db, const Comparator* cmp, Iterator* iter, SequenceNumber s, uint32_t seed);
// 析构iter
~DBIter() override; 
// 返回有效性
bool Valid() const override; 
// 如果是kForward返回iter指向的key 否则返回saved_key_
Slice key() const override;
// 如果是kForward返回iter指向的value 否则返回saved_value_
Slice value() const override;
// 返回iter的状态
Status status() const override;
// 下一个key
void Next() override;
// 上一个key
void Prev() override;
// 查找target
void Seek(const Slice& target) override;
// 定位到第一个位置
void SeekToFirst() override;
// 定位到最后一个位置
void SeekToLast() override;

16. repair.cc

Repairer
struct TableInfo {FileMetaData meta;SequenceNumber max_sequence;
};
成员变量
// db名字
const std::string dbname_;
Env* const env_;
InternalKeyComparator const icmp_;
InternalFilterPolicy const ipolicy_;
//
const Options options_;
//info日志是否开启
bool owns_info_log_;
//是否开启block缓存
bool owns_cache_;
// table缓存
TableCache* table_cache_;
//用于做恢复的
VersionEdit edit_;
// manifest文件
std::vector<std::string> manifests_;
//sst文件编号
std::vector<uint64_t> table_numbers_;
// 日志编号
std::vector<uint64_t> logs_;
// sst文件的元数据
std::vector<TableInfo> tables_;
//新的下一个最大的文件编号
uint64_t next_file_number_;
成员函数
// 构造TableCache
Repairer(const std::string& dbname, const Options& options);// 删除TableCache 
~Repairer();// 查找dbname下的不同类型的文件名
// kLogFile=log日志 kDescriptorFile=VersionEdit信息 kTableFile=table数据
Status FindFiles();// 对每个log文件构建SST
void ConvertLogFilesToTables();// 创建fname目录 添加lost路径文件重命名  标记删除
void ArchiveFile(const std::string& fname);// 定义一个报错类LogReporter
// 通过log文件构建SST 参数是log的序号
Status ConvertLogToTable(uint64_t log);// 返回本地配置信息的TableCache的迭代器
Iterator* NewTableIterator(const FileMetaData& meta);//复制src文件中的内容 再打开一个新的src
void RepairTable(const std::string& src, TableInfo t);// 将TableFile 数据插入tables_
void ScanTable(uint64_t number);// 遍历每个table_number 数据插入tables_
void ExtractMetaData();// 将tables_中的所有信息写入日志
Status WriteDescriptor();/*1. 获取所有manifest文件名2. 对每个log文件构建SST3. 遍历每个table_number 数据插入tables_4. 将tables_中的数据写入日志5. 日志记录文件的大小
*/
Status Run();

17. db.h

LevelDB的基类。

// 删除dbname下所有文件
Status DestroyDB(const std::string& name, const Options& options);// 构建repair对象,运行恢复db
Status RepairDB(const std::string& dbname, const Options& options);
// 打开db文件
/*1. 恢复日志2. 新建memtable3. 插入新的日志4. 删除无用的文件 和 进行可能的压缩
*/
// 参数 打开的配置  数据库名 数据库对象
static Status Open(const Options& options, const std::string& name, DB** dbptr);

18. leveldbutil.cc

全局函数
// 输出files中的所有文件信息
bool HandleDumpCommand(Env* env, char** files, int num);//打印信息
static void Usage();
StdoutPrinter
成员函数
// 控制台输出
Status Append(const Slice& data) override {fwrite(data.data(), 1, data.size(), stdout);return Status::OK();
}
Status Close() override { return Status::OK(); }
Status Flush() override { return Status::OK(); }
Status Sync() override { return Status::OK(); }


本文来自互联网用户投稿,文章观点仅代表作者本人,不代表本站立场,不承担相关法律责任。如若转载,请注明出处。 如若内容造成侵权/违法违规/事实不符,请点击【内容举报】进行投诉反馈!

相关文章

立即
投稿

微信公众账号

微信扫一扫加关注

返回
顶部