leveldb源码解析五——db打开流程、读写流程、快照、遍历DB
创始人
2024-05-25 05:13:20
0

DB打开流程

使用leveldb的第一步是调用open接口,打开或者重启一个db,得到一个DB*,后续对db的操作通过DB*进行

  static Status Open(const Options& options, const std::string& name,DB** dbptr);

整个open的过程分为以下几步:
1、如果是重启db,在恢复数据库
2、如果是打开新的sb,则创建WAL和memtable
3、保存当前Manifest文件
4、删除多余文件,尝试进行compaction

Status DB::Open(const Options& options, const std::string& dbname, DB** dbptr) {*dbptr = nullptr;DBImpl* impl = new DBImpl(options, dbname);impl->mutex_.Lock();VersionEdit edit;// Recover handles create_if_missing, error_if_existsbool save_manifest = false;// 1、如果是重启db,在恢复数据库Status s = impl->Recover(&edit, &save_manifest);if (s.ok() && impl->mem_ == nullptr) {// Create new log and a corresponding memtable.uint64_t new_log_number = impl->versions_->NewFileNumber();WritableFile* lfile;// 2、如果是新打开的db, 创建WAL和memtables = options.env->NewWritableFile(LogFileName(dbname, new_log_number),&lfile);if (s.ok()) {edit.SetLogNumber(new_log_number);impl->logfile_ = lfile;impl->logfile_number_ = new_log_number;impl->log_ = new log::Writer(lfile);impl->mem_ = new MemTable(impl->internal_comparator_);impl->mem_->Ref();}}// 3、保存当前Manifest文件if (s.ok() && save_manifest) {edit.SetPrevLogNumber(0);  // No older logs needed after recovery.edit.SetLogNumber(impl->logfile_number_);s = impl->versions_->LogAndApply(&edit, &impl->mutex_);}// 4、删除多余文件,尝试进行compactionif (s.ok()) {impl->RemoveObsoleteFiles();impl->MaybeScheduleCompaction();}impl->mutex_.Unlock();if (s.ok()) {assert(impl->mem_ != nullptr);*dbptr = impl;} else {delete impl;}return s;
}

恢复DB

1、首先检查是否是新打开的DB,如果是,则从调用NewDB创建新的DB
2、如果是重启DB,那么先根据Manifest文件,读取Manifest中每次版本的更改,恢复出当前的版本
3、从WAL中恢复memtable

Status DBImpl::Recover(VersionEdit* edit, bool* save_manifest) {...if (!env_->FileExists(CurrentFileName(dbname_))) {if (options_.create_if_missing) {Log(options_.info_log, "Creating DB %s since it was missing.",dbname_.c_str());// 1、原DB不存在,则是新打开的DBs = NewDB();if (!s.ok()) {return s;}}...// 2、如果是重启DB,那么先根据Manifest文件,读取Manifest中每次版本的更改,恢复出当前的版本s = versions_->Recover(save_manifest);...// 3.从WAL中恢复memtablestd::sort(logs.begin(), logs.end());for (size_t i = 0; i < logs.size(); i++) {s = RecoverLogFile(logs[i], (i == logs.size() - 1), save_manifest, edit,&max_sequence);if (!s.ok()) {return s;}// The previous incarnation may not have written any MANIFEST// records after allocating this log number.  So we manually// update the file number allocation counter in VersionSet.versions_->MarkFileNumberUsed(logs[i]);}return Status::OK();
}

写流程

KV写入有3个接口:

  // Set the database entry for "key" to "value".  Returns OK on success,// and a non-OK status on error.// Note: consider setting options.sync = true.// 单个KV写入virtual Status Put(const WriteOptions& options, const Slice& key,const Slice& value) = 0;// Remove the database entry (if any) for "key".  Returns OK on// success, and a non-OK status on error.  It is not an error if "key"// did not exist in the database.// Note: consider setting options.sync = true.// 删除Kvirtual Status Delete(const WriteOptions& options, const Slice& key) = 0;// Apply the specified updates to the database.// Returns OK on success, non-OK on failure.// Note: consider setting options.sync = true.// 批量写入virtual Status Write(const WriteOptions& options, WriteBatch* updates) = 0;

这三个接口底层都是调用批量写入的接口,能够保证批量写入时原子性的。
写流程如下:
1、合并批量请求,等待请求开始或者,或者被合并到其他请求中执行完成
2、轮到自己处理时,检查是否可写,写前预处理
3、合并写请求
4、写WAL
5、写memtable
6、唤醒合并处理的写请求,最后唤醒还没有开始处理的第一个请求

Status DBImpl::Write(const WriteOptions& options, WriteBatch* updates) {Writer w(&mutex_);w.batch = updates;w.sync = options.sync;w.done = false;// 1、合并批量请求,等待请求开始或者,或者被合并到其他请求中执行完成MutexLock l(&mutex_);writers_.push_back(&w);while (!w.done && &w != writers_.front()) {w.cv.Wait();}if (w.done) {return w.status;}// May temporarily unlock and wait.// 2、轮到自己处理时,检查是否可写,写前预处理Status status = MakeRoomForWrite(updates == nullptr);uint64_t last_sequence = versions_->LastSequence();Writer* last_writer = &w;if (status.ok() && updates != nullptr) {  // nullptr batch is for compactions// 3、合并写请求WriteBatch* write_batch = BuildBatchGroup(&last_writer);WriteBatchInternal::SetSequence(write_batch, last_sequence + 1);last_sequence += WriteBatchInternal::Count(write_batch);// Add to log and apply to memtable.  We can release the lock// during this phase since &w is currently responsible for logging// and protects against concurrent loggers and concurrent writes// into mem_.{mutex_.Unlock();// 4、写WALstatus = log_->AddRecord(WriteBatchInternal::Contents(write_batch));bool sync_error = false;if (status.ok() && options.sync) {status = logfile_->Sync();if (!status.ok()) {sync_error = true;}}if (status.ok()) {// 5、写memtablestatus = WriteBatchInternal::InsertInto(write_batch, mem_);}mutex_.Lock();if (sync_error) {// The state of the log file is indeterminate: the log record we// just added may or may not show up when the DB is re-opened.// So we force the DB into a mode where all future writes fail.RecordBackgroundError(status);}}if (write_batch == tmp_batch_) tmp_batch_->Clear();versions_->SetLastSequence(last_sequence);}// 6、唤醒合并处理的写请求,最后唤醒还没有开始处理的第一个请求while (true) {Writer* ready = writers_.front();writers_.pop_front();if (ready != &w) {ready->status = status;ready->done = true;ready->cv.Signal();}if (ready == last_writer) break;}// Notify new head of write queueif (!writers_.empty()) {writers_.front()->cv.Signal();}return status;
}

其中,写前预处理的步骤如下:
1、判断是否需要停写
2、如果memtable写满,则转化为immemtable,等待后台compaction
3、创建新的WAL

Status DBImpl::MakeRoomForWrite(bool force) {mutex_.AssertHeld();assert(!writers_.empty());bool allow_delay = !force;Status s;while (true) {if (!bg_error_.ok()) {// Yield previous errors = bg_error_;break;} else if (allow_delay && versions_->NumLevelFiles(0) >=config::kL0_SlowdownWritesTrigger) {// We are getting close to hitting a hard limit on the number of// L0 files.  Rather than delaying a single write by several// seconds when we hit the hard limit, start delaying each// individual write by 1ms to reduce latency variance.  Also,// this delay hands over some CPU to the compaction thread in// case it is sharing the same core as the writer.// level0有太多的文件,暂停1ms再去写mutex_.Unlock();env_->SleepForMicroseconds(1000);allow_delay = false;  // Do not delay a single write more than oncemutex_.Lock();} else if (!force &&(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {// There is room in current memtable// memtale空间够break;} else if (imm_ != nullptr) {// We have filled up the current memtable, but the previous// one is still being compacted, so we wait.// imemtable还没来得及compaction,等待其compaction完成Log(options_.info_log, "Current memtable full; waiting...\n");background_work_finished_signal_.Wait();} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {// There are too many level-0 files.Log(options_.info_log, "Too many L0 files; waiting...\n");// level0文件太多,等待后台compactionbackground_work_finished_signal_.Wait();} else {// Attempt to switch to a new memtable and trigger compaction of oldassert(versions_->PrevLogNumber() == 0);uint64_t new_log_number = versions_->NewFileNumber();WritableFile* lfile = nullptr;// 创建新的WALs = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);if (!s.ok()) {// Avoid chewing through file number space in a tight loop.versions_->ReuseFileNumber(new_log_number);break;}delete log_;delete logfile_;logfile_ = lfile;logfile_number_ = new_log_number;log_ = new log::Writer(lfile);imm_ = mem_;// memtable转变为imemtable,同时创建新的memtablehas_imm_.store(true, std::memory_order_release);mem_ = new MemTable(internal_comparator_);mem_->Ref();force = false;  // Do not force another compaction if have roomMaybeScheduleCompaction();}}return s;
}

读流程

与读有关的接口:

  // If the database contains an entry for "key" store the// corresponding value in *value and return OK.//// If there is no entry for "key" leave *value unchanged and return// a status for which Status::IsNotFound() returns true.//// May return some other Status on an error.virtual Status Get(const ReadOptions& options, const Slice& key,std::string* value) = 0;

先从简单的Get KV,Get接口可以读指定版本的value,如果不指定,则读最新版本的value,Get分为三步:
1、从memtable中查找
2、从imemtable中查找
3、从sstable中查找

Status DBImpl::Get(const ReadOptions& options, const Slice& key,std::string* value) {Status s;MutexLock l(&mutex_);// 指定版本号SequenceNumber snapshot;if (options.snapshot != nullptr) {snapshot =static_cast(options.snapshot)->sequence_number();} else {snapshot = versions_->LastSequence();}// 给各个组件ref+1,防止在查找过程中因为compaction被释放掉MemTable* mem = mem_;MemTable* imm = imm_;Version* current = versions_->current();mem->Ref();if (imm != nullptr) imm->Ref();current->Ref();bool have_stat_update = false;Version::GetStats stats;// Unlock while reading from files and memtables{mutex_.Unlock();// First look in the memtable, then in the immutable memtable (if any).LookupKey lkey(key, snapshot);// 1、从memtable中查找if (mem->Get(lkey, value, &s)) {// Done} else if (imm != nullptr && imm->Get(lkey, value, &s)) {// 2、从imemtable中查找// Done} else {// 3、从sstable中查找s = current->Get(options, lkey, value, &stats);have_stat_update = true;}mutex_.Lock();}// 在查找过程中,左sstable的信息统计,更新信息之后,可能需要做一次compactionif (have_stat_update && current->UpdateStats(stats)) {MaybeScheduleCompaction();}mem->Unref();if (imm != nullptr) imm->Unref();current->Unref();return s;
}

主要看下第二步:从sstable中查找KV,遍历每层level,对于level0,因为sstable有重叠,需要遍历每个sstable,对于level>0层,由于sstable没有重叠且有序排列,通过二分查找找到包含key的sstable,然后在查找该sstable:

void Version::ForEachOverlapping(Slice user_key, Slice internal_key, void* arg,bool (*func)(void*, int, FileMetaData*)) {const Comparator* ucmp = vset_->icmp_.user_comparator();// Search level-0 in order from newest to oldest.std::vector tmp;tmp.reserve(files_[0].size());for (uint32_t i = 0; i < files_[0].size(); i++) {FileMetaData* f = files_[0][i];if (ucmp->Compare(user_key, f->smallest.user_key()) >= 0 &&ucmp->Compare(user_key, f->largest.user_key()) <= 0) {tmp.push_back(f);}}if (!tmp.empty()) {std::sort(tmp.begin(), tmp.end(), NewestFirst);for (uint32_t i = 0; i < tmp.size(); i++) {if (!(*func)(arg, 0, tmp[i])) {return;}}}// Search other levels.for (int level = 1; level < config::kNumLevels; level++) {size_t num_files = files_[level].size();if (num_files == 0) continue;// Binary search to find earliest index whose largest key >= internal_key.uint32_t index = FindFile(vset_->icmp_, files_[level], internal_key);if (index < num_files) {FileMetaData* f = files_[level][index];if (ucmp->Compare(user_key, f->smallest.user_key()) < 0) {// All of "f" is past any data for user_key} else {if (!(*func)(arg, level, f)) {return;}}}}
}

快照

获取和释放快照接口如下:

  // Return a handle to the current DB state.  Iterators created with// this handle will all observe a stable snapshot of the current DB// state.  The caller must call ReleaseSnapshot(result) when the// snapshot is no longer needed.virtual const Snapshot* GetSnapshot() = 0;// Release a previously acquired snapshot.  The caller must not// use "snapshot" after this call.virtual void ReleaseSnapshot(const Snapshot* snapshot) = 0;

快照在内部用链表管理,获取和释放快照就是在这个链表中添加或者删除节点,本质上是取快照时刻的最大seq,这样,通过这个快照查询数据时,只要KV对中的seq<=这个快照的seq,就是这个快照的KV对
同时还保证在快照释放前,即使sstable该KV对已经可以merge到更高seq的KV对中,但是由于快照的存在,不会被merge,防止该快照读不到。

遍历DB

遍历接口,返回一个Iterator,通过这个迭代器遍历DB

  // Return a heap-allocated iterator over the contents of the database.// The result of NewIterator() is initially invalid (caller must// call one of the Seek methods on the iterator before using it).//// Caller should delete the iterator when it is no longer needed.// The returned iterator should be deleted before this db is deleted.virtual Iterator* NewIterator(const ReadOptions& options) = 0;

迭代整个DB,需要迭代memtable、imemtable和所有的sstable:

Iterator* DBImpl::NewIterator(const ReadOptions& options) {SequenceNumber latest_snapshot;uint32_t seed;Iterator* iter = NewInternalIterator(options, &latest_snapshot, &seed);return NewDBIterator(this, user_comparator(), iter,(options.snapshot != nullptr? static_cast(options.snapshot)->sequence_number(): latest_snapshot),seed);
}

这里的两个迭代器NewInternalIterator用来迭代内部的memtable、imemtable和所有的sstable,NewDBIterator是对NewInternalIterator的封装。
NewInternalIterator会把内部的memtable、imemtable和所有的sstable的迭代器都收集起来,最后形成NewMergingIterator来对不同组件之间的key进行merge排序,对于sstable来说,level0层的每个sstable都需要创建迭代器,对于key不重叠的其他level的sstable,每一层创建一个NewConcatenatingIterator即可。

Iterator* DBImpl::NewInternalIterator(const ReadOptions& options,SequenceNumber* latest_snapshot,uint32_t* seed) {mutex_.Lock();*latest_snapshot = versions_->LastSequence();// Collect together all needed child iteratorsstd::vector list;list.push_back(mem_->NewIterator());mem_->Ref();if (imm_ != nullptr) {list.push_back(imm_->NewIterator());imm_->Ref();}versions_->current()->AddIterators(options, &list);Iterator* internal_iter =NewMergingIterator(&internal_comparator_, &list[0], list.size());versions_->current()->Ref();IterState* cleanup = new IterState(&mutex_, mem_, imm_, versions_->current());internal_iter->RegisterCleanup(CleanupIteratorState, cleanup, nullptr);*seed = ++seed_;mutex_.Unlock();return internal_iter;
}

NewConcatenatingIterator是个两层迭代器,第一层迭代每层sstable,第二层迭代sstable的KV:

Iterator* Version::NewConcatenatingIterator(const ReadOptions& options,int level) const {return NewTwoLevelIterator(new LevelFileNumIterator(vset_->icmp_, &files_[level]), &GetFileIterator,vset_->table_cache_, options);
}

MergingIterator是多个迭代器的组合,每次迭代时,会便面内部所有的迭代器,然后选择最小的key,与归并排序的过程很类似:

  void Seek(const Slice& target) override {for (int i = 0; i < n_; i++) {// 每个迭代器都Seekchildren_[i].Seek(target);}// 找到最小值FindSmallest();direction_ = kForward;}void Next() override {assert(Valid());// Ensure that all children are positioned after key().// If we are moving in the forward direction, it is already// true for all of the non-current_ children since current_ is// the smallest child and key() == current_->key().  Otherwise,// we explicitly position the non-current_ children.if (direction_ != kForward) {for (int i = 0; i < n_; i++) {IteratorWrapper* child = &children_[i];if (child != current_) {child->Seek(key());if (child->Valid() &&comparator_->Compare(key(), child->key()) == 0) {child->Next();}}}direction_ = kForward;}current_->Next();FindSmallest();}

NewDBIterator是对MergingIterator的封装,但是要处理被删除的key,这些key存在于memtable和sstable中,这些key在scan时不应该被显示,所以当MergingIterator遍历到这种key时,NewDBIterator解析出key的类型,并且对key相等且swq小于该key的key进行跳过。

相关内容

热门资讯

喜欢穿一身黑的男生性格(喜欢穿... 今天百科达人给各位分享喜欢穿一身黑的男生性格的知识,其中也会对喜欢穿一身黑衣服的男人人好相处吗进行解...
发春是什么意思(思春和发春是什... 本篇文章极速百科给大家谈谈发春是什么意思,以及思春和发春是什么意思对应的知识点,希望对各位有所帮助,...
网络用语zl是什么意思(zl是... 今天给各位分享网络用语zl是什么意思的知识,其中也会对zl是啥意思是什么网络用语进行解释,如果能碰巧...
为什么酷狗音乐自己唱的歌不能下... 本篇文章极速百科小编给大家谈谈为什么酷狗音乐自己唱的歌不能下载到本地?,以及为什么酷狗下载的歌曲不是...
华为下载未安装的文件去哪找(华... 今天百科达人给各位分享华为下载未安装的文件去哪找的知识,其中也会对华为下载未安装的文件去哪找到进行解...
家里可以做假山养金鱼吗(假山能... 今天百科达人给各位分享家里可以做假山养金鱼吗的知识,其中也会对假山能放鱼缸里吗进行解释,如果能碰巧解...
四分五裂是什么生肖什么动物(四... 本篇文章极速百科小编给大家谈谈四分五裂是什么生肖什么动物,以及四分五裂打一生肖是什么对应的知识点,希...
怎么往应用助手里添加应用(应用... 今天百科达人给各位分享怎么往应用助手里添加应用的知识,其中也会对应用助手怎么添加微信进行解释,如果能...
客厅放八骏马摆件可以吗(家里摆... 今天给各位分享客厅放八骏马摆件可以吗的知识,其中也会对家里摆八骏马摆件好吗进行解释,如果能碰巧解决你...
美团联名卡审核成功待激活(美团... 今天百科达人给各位分享美团联名卡审核成功待激活的知识,其中也会对美团联名卡审核未通过进行解释,如果能...