LevelDB源码分析-Compact
Compaction
compact由背景线程完成,代码中触发背景线程的函数为:
void DBImpl::MaybeScheduleCompaction() { mutex_.AssertHeld(); if (background_compaction_scheduled_) { // Already scheduled } else if (shutting_down_.Acquire_Load()) { // DB is being deleted; no more background compactions } else if (!bg_error_.ok()) { // Already got an error; no more changes } else if (imm_ == nullptr && manual_compaction_ == nullptr && !versions_->NeedsCompaction()) { // No work to be done } else { background_compaction_scheduled_ = true; env_->Schedule(&DBImpl::BGWork, this); } }
背景线程调用的函数为:
void DBImpl::BGWork(void *db) { reinterpret_cast<DBImpl *>(db)->BackgroundCall(); }
这个函数是调用BackgroundCall函数实现的:
void DBImpl::BackgroundCall() { MutexLock l(&mutex_); assert(background_compaction_scheduled_); if (shutting_down_.Acquire_Load()) { // No more background work when shutting down. } else if (!bg_error_.ok()) { // No more background work after a background error. } else { BackgroundCompaction(); } background_compaction_scheduled_ = false; // Previous compaction may have produced too many files in a level, // so reschedule another compaction if needed. MaybeScheduleCompaction(); background_work_finished_signal_.SignalAll(); }
这个函数主要调用BackgroundCompaction实现相应逻辑:
void DBImpl::BackgroundCompaction()
如果immutable table存在则先调用CompactMemTable函数:
mutex_.AssertHeld(); if (imm_ != nullptr) { CompactMemTable(); return; }
然后调用PickCompaction函数,PickCompaction函数选取合适的level中需要compact的文件,然后封装一个Compact对象:
else { c = versions_->PickCompaction(); }
通过IsTrivialMove判断选出的sstable是否只有一个且在较低的level,并且该sstable与下下一level不会有太多的overlap,如果是的话,简单的将这个sstable移动到下一level。
Status status; if (c == nullptr) { // Nothing to do } else if (!is_manual && c->IsTrivialMove()) { // Move file to next level assert(c->num_input_files(0) == 1); FileMetaData *f = c->input(0, 0); c->edit()->DeleteFile(c->level(), f->number); c->edit()->AddFile(c->level() + 1, f->number, f->file_size, f->smallest, f->largest); status = versions_->LogAndApply(c->edit(), &mutex_); if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "Moved #%lld to level-%d %lld bytes %s: %s\n", static_cast<unsigned long long>(f->number), c->level() + 1, static_cast<unsigned long long>(f->file_size), status.ToString().c_str(), versions_->LevelSummary(&tmp)); }
如果当前level中需要compact的sstable和level+2中的overlap没有超过阈值时,调用DoCompactionWork进行compact,然后调用CleanupCompaction清除compactstate对象,调用DeleteObsoleteFiles删除旧文件,回收内存和磁盘空间:
else { CompactionState *compact = new CompactionState(c); status = DoCompactionWork(compact); if (!status.ok()) { RecordBackgroundError(status); } CleanupCompaction(compact); c->ReleaseInputs(); DeleteObsoleteFiles(); } delete c;
其中调用的PickCompaction函数为:
Compaction *VersionSet::PickCompaction()
如果有level达到compact的阈值,优先将这个level进行compact。将最大的key大于上一次compact的最大的key的sstable加入inputs_[0]的数组中作为compact的sstable,如果compact_pointer_[level]中没有记录,则从头开始取sstable,如果不存在最大的key大于上一次compact的最大的key的sstable,则将key最小的sstable作为需要compact的sstable:
Compaction *c; int level; // We prefer compactions triggered by too much data in a level over // the compactions triggered by seeks. const bool size_compaction = (current_->compaction_score_ >= 1); const bool seek_compaction = (current_->file_to_compact_ != nullptr); if (size_compaction) { level = current_->compaction_level_; assert(level >= 0); assert(level + 1 < config::kNumLevels); c = new Compaction(options_, level); // Pick the first file that comes after compact_pointer_[level] for (size_t i = 0; i < current_->files_[level].size(); i++) { FileMetaData *f = current_->files_[level][i]; if (compact_pointer_[level].empty() || icmp_.Compare(f->largest.Encode(), compact_pointer_[level]) > 0) { c->inputs_[0].push_back(f); break; } } if (c->inputs_[0].empty()) { // Wrap-around to the beginning of the key space c->inputs_[0].push_back(current_->files_[level][0]); } }
如果没有level达到需要compact的阈值,但是有文件因为seek的命中率不够而达到了compact的阈值,则将这个文件compact:
else if (seek_compaction) { level = current_->file_to_compact_level_; c = new Compaction(options_, level); c->inputs_[0].push_back(current_->file_to_compact_); }
如果compact的是level0,则还需要调用GetOverlappingInputs函数求出有overlap的文件,然后再调用SetupOtherInputs函数完成compact对象的构建:
c->input_version_ = current_; c->input_version_->Ref(); // Files in level 0 may overlap each other, so pick up all overlapping ones if (level == 0) { InternalKey smallest, largest; GetRange(c->inputs_[0], &smallest, &largest); // Note that the next call will discard the file we placed in // c->inputs_[0] earlier and replace it with an overlapping set // which will include the picked file. current_->GetOverlappingInputs(0, &smallest, &largest, &c->inputs_[0]); assert(!c->inputs_[0].empty()); } SetupOtherInputs(c); return c;
CompactMemTable函数
void DBImpl::CompactMemTable()
调用WriteLevel0Table将immutable mmtable写入sstable:
mutex_.AssertHeld(); assert(imm_ != nullptr); // Save the contents of the memtable as a new Table VersionEdit edit; Version *base = versions_->current(); base->Ref(); Status s = WriteLevel0Table(imm_, &edit, base); base->Unref(); if (s.ok() && shutting_down_.Acquire_Load()) { s = Status::IOError("Deleting DB during memtable compaction"); }
更新当前lognumber,旧日志不再需要,生效新的version:
// Replace immutable memtable with the generated Table if (s.ok()) { edit.SetPrevLogNumber(0); edit.SetLogNumber(logfile_number_); // Earlier logs no longer needed s = versions_->LogAndApply(&edit, &mutex_); }
删除旧文件:
if (s.ok()) { // Commit to the new state imm_->Unref(); imm_ = nullptr; has_imm_.Release_Store(nullptr); DeleteObsoleteFiles(); } else { RecordBackgroundError(s); }
WriteLevel0Table函数为:
Status DBImpl::WriteLevel0Table(MemTable *mem, VersionEdit *edit, Version *base)
调用BuildTable接口将immutable memtable写入sstable:
mutex_.AssertHeld(); const uint64_t start_micros = env_->NowMicros(); FileMetaData meta; meta.number = versions_->NewFileNumber(); pending_outputs_.insert(meta.number); Iterator *iter = mem->NewIterator(); Log(options_.info_log, "Level-0 table #%llu: started", (unsigned long long)meta.number); Status s; { mutex_.Unlock(); s = BuildTable(dbname_, env_, options_, table_cache_, iter, &meta); mutex_.Lock(); } Log(options_.info_log, "Level-0 table #%llu: %lld bytes %s", (unsigned long long)meta.number, (unsigned long long)meta.file_size, s.ToString().c_str()); delete iter; pending_outputs_.erase(meta.number);
调用PickLevelForMemTableOutput函数选择合适的level将新的sstable加入:
// Note that if file_size is zero, the file has been deleted and // should not be added to the manifest. int level = 0; if (s.ok() && meta.file_size > 0) { const Slice min_user_key = meta.smallest.user_key(); const Slice max_user_key = meta.largest.user_key(); if (base != nullptr) { level = base->PickLevelForMemTableOutput(min_user_key, max_user_key); } edit->AddFile(level, meta.number, meta.file_size, meta.smallest, meta.largest); } CompactionStats stats; stats.micros = env_->NowMicros() - start_micros; stats.bytes_written = meta.file_size; stats_[level].Add(stats); return s;
PickLevelForMemTableOutput函数:
int Version::PickLevelForMemTableOutput( const Slice &smallest_user_key, const Slice &largest_user_key) { int level = 0; if (!OverlapInLevel(0, &smallest_user_key, &largest_user_key)) { // Push to next level if there is no overlap in next level, // and the #bytes overlapping in the level after that are limited. InternalKey start(smallest_user_key, kMaxSequenceNumber, kValueTypeForSeek); InternalKey limit(largest_user_key, 0, static_cast<ValueType>(0)); std::vector<FileMetaData *> overlaps; while (level < config::kMaxMemCompactLevel) { if (OverlapInLevel(level + 1, &smallest_user_key, &largest_user_key)) { break; } if (level + 2 < config::kNumLevels) { // Check that file does not overlap too many grandparent bytes. GetOverlappingInputs(level + 2, &start, &limit, &overlaps); const int64_t sum = TotalFileSize(overlaps); if (sum > MaxGrandParentOverlapBytes(vset_->options_)) { break; } } level++; } } return level; }
选择level的策略为:如果level0和sstable的key有重合,则插入level0,否则循环搜索之后的level,如果下一level和sstable的key有重合,则直接返回,如果没有重合,考虑下下个level和sstable的重合的key的量,如果超过了设定的阈值,则返回(为了保证之后选取该sstable与下一level进行compact的时候,涉及下一level的文件不会太多,减小开销)。
DoCompactionWork函数
Status DBImpl::DoCompactionWork(CompactionState *compact)
将smallest_snapshot赋值为最老的snapshots的sequencenumber,如果不存在snapshots则赋值为当前数据库的sequencenumber:
const uint64_t start_micros = env_->NowMicros(); int64_t imm_micros = 0; // Micros spent doing imm_ compactions Log(options_.info_log, "Compacting %d@%d + %d@%d files", compact->compaction->num_input_files(0), compact->compaction->level(), compact->compaction->num_input_files(1), compact->compaction->level() + 1); assert(versions_->NumLevelFiles(compact->compaction->level()) > 0); assert(compact->builder == nullptr); assert(compact->outfile == nullptr); if (snapshots_.empty()) { compact->smallest_snapshot = versions_->LastSequence(); } else { compact->smallest_snapshot = snapshots_.oldest()->sequence_number(); }
调用MakeInputIterator获取需要compact的文件上的迭代器,可以通过这个迭代器直接遍历所有需要compact的文件。
// Release mutex while we're actually doing the compaction work mutex_.Unlock(); Iterator *input = versions_->MakeInputIterator(compact->compaction); input->SeekToFirst(); Status status; ParsedInternalKey ikey; std::string current_user_key; bool has_current_user_key = false; SequenceNumber last_sequence_for_key = kMaxSequenceNumber;
开始通过迭代器遍历需要compact的文件。首先,如果有immutable memtable存在,首先调用CompactMemTable函数进行compact。
for (; input->Valid() && !shutting_down_.Acquire_Load();) { // Prioritize immutable compaction work if (has_imm_.NoBarrier_Load() != nullptr) { const uint64_t imm_start = env_->NowMicros(); mutex_.Lock(); if (imm_ != nullptr) { CompactMemTable(); // Wake up MakeRoomForWrite() if necessary. background_work_finished_signal_.SignalAll(); } mutex_.Unlock(); imm_micros += (env_->NowMicros() - imm_start); }
获取迭代器当前指向的entry的key值,调用ShouldStopBefore函数判断是否需要停止当前sstable的构建,如果需要,则调用FinishCompactionOutputFile函数将包含当前sstable的文件输出。ShouldStopBefore函数主要是判断当前的sstable中的key是否与下下一level中的key的overlap过大,如果过大则停止当前sstable的构建:
Slice key = input->key(); if (compact->compaction->ShouldStopBefore(key) && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } }
将key解码,key和之前的key重复且之前的key的sequencenumber比smallest_snapshot要小则丢弃这个key,因为既然之前重复的key的sequencenumber都比smallest_snapshot要小,当前key也肯定小,key标记为删除且之后的level中不存在这个key(也就是说这个key的删除不会影响到之后)也丢弃这个key。
// Handle key/value, add to state, etc. bool drop = false; if (!ParseInternalKey(key, &ikey)) { // Do not hide error keys current_user_key.clear(); has_current_user_key = false; last_sequence_for_key = kMaxSequenceNumber; } else { if (!has_current_user_key || user_comparator()->Compare(ikey.user_key, Slice(current_user_key)) != 0) { // First occurrence of this user key current_user_key.assign(ikey.user_key.data(), ikey.user_key.size()); has_current_user_key = true; last_sequence_for_key = kMaxSequenceNumber; } if (last_sequence_for_key <= compact->smallest_snapshot) { // Hidden by an newer entry for same user key drop = true; // (A) } else if (ikey.type == kTypeDeletion && ikey.sequence <= compact->smallest_snapshot && compact->compaction->IsBaseLevelForKey(ikey.user_key)) { // For this user key: // (1) there is no data in higher levels // (2) data in lower levels will have larger sequence numbers // (3) data in layers that are being compacted here and have // smaller sequence numbers will be dropped in the next // few iterations of this loop (by rule (A) above). // Therefore this deletion marker is obsolete and can be dropped. drop = true; } last_sequence_for_key = ikey.sequence; }
如果不丢弃key的话,当前的写入sstable的文件不存在则调用OpenCompactionOutputFile函数创建一个文件,然后将KV写入,如果写入后sstable大小达到阈值则调用FinishCompactionOutputFile函数写出这个文件。
if (!drop) { // Open output file if necessary if (compact->builder == nullptr) { status = OpenCompactionOutputFile(compact); if (!status.ok()) { break; } } if (compact->builder->NumEntries() == 0) { compact->current_output()->smallest.DecodeFrom(key); } compact->current_output()->largest.DecodeFrom(key); compact->builder->Add(key, input->value()); // Close output file if it is big enough if (compact->builder->FileSize() >= compact->compaction->MaxOutputFileSize()) { status = FinishCompactionOutputFile(compact, input); if (!status.ok()) { break; } } }
循环以上过程:
input->Next(); }
结束遍历之后,将没有写出的文件写出:
if (status.ok() && shutting_down_.Acquire_Load()) { status = Status::IOError("Deleting DB during compaction"); } if (status.ok() && compact->builder != nullptr) { status = FinishCompactionOutputFile(compact, input); } if (status.ok()) { status = input->status(); } delete input; input = nullptr;
记录统计信息:
CompactionStats stats; stats.micros = env_->NowMicros() - start_micros - imm_micros; for (int which = 0; which < 2; which++) { for (int i = 0; i < compact->compaction->num_input_files(which); i++) { stats.bytes_read += compact->compaction->input(which, i)->file_size; } } for (size_t i = 0; i < compact->outputs.size(); i++) { stats.bytes_written += compact->outputs[i].file_size; } mutex_.Lock(); stats_[compact->compaction->level() + 1].Add(stats);
将此次compaction生效:
if (status.ok()) { status = InstallCompactionResults(compact); } if (!status.ok()) { RecordBackgroundError(status); } VersionSet::LevelSummaryStorage tmp; Log(options_.info_log, "compacted to: %s", versions_->LevelSummary(&tmp)); return status;
230 Love u
低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。
持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。
转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。
- 上一篇
LevelDB源码分析-TableBuilder生成sstable
TableBuilder生成sstable(include/table_builder.h table/table_builder.cc) LevelDB使用TableBuilder来构建sstable,并基于TableBuilder封装了一个BuildTable接口,用于将memtable转换为sstable。 sstable的格式为: datablock1 | datablock2 | ... | metablock1 | metablock2 | ... | metaindexblock | indexblock | footer datablock即为存储KV数据的块,metablock为相应datablock的元信息的块(并未实现),metaindexblock为metablock的索引块(并未实现),indexblock为datablock的索引块。 footer的数据格式为: metablockhandle | indexblockhandle | padding | magic metablockhandle为metaindexblock的索引,indexblockha...
- 下一篇
LevelDB源码分析-Get
Get LevelDB提供了Get接口用于给定key的查找: Status DBImpl::Get(const ReadOptions &options, const Slice &key, std::string *value) Get操作可以指定在某个snapshot的情况下进行,如果指定了snapshot,则获取该snapshot的sequencenumber,如果没有指定snapshot,就取当前最新的sequencenumber: Status s; MutexLock l(&mutex_); SequenceNumber snapshot; if (options.snapshot != nullptr) { snapshot = static_cast<const SnapshotImpl *>(options.snapshot)->sequence_number(); } else { snapshot = versions_->LastSequence(); } MemTable *mem = mem_; MemTabl...
相关文章
文章评论
共有0条评论来说两句吧...