您现在的位置是:首页 > 文章详情

LevelDB源码分析-TableBuilder生成sstable

日期:2019-04-05点击:527

TableBuilder生成sstable(include/table_builder.h table/table_builder.cc)

LevelDB使用TableBuilder来构建sstable,并基于TableBuilder封装了一个BuildTable接口,用于将memtable转换为sstable。

sstable的格式为:

datablock1 | datablock2 | ... | metablock1 | metablock2 | ... | metaindexblock | indexblock | footer

datablock即为存储KV数据的块,metablock为相应datablock的元信息的块(并未实现),metaindexblock为metablock的索引块(并未实现),indexblock为datablock的索引块。

footer的数据格式为:

metablockhandle | indexblockhandle | padding | magic

metablockhandle为metaindexblock的索引,indexblockhandle为indexblock的索引,padding为了补全定长,magic为8字节的校验码。

TableBuilder类

TableBuilder类的定义为:

class LEVELDB_EXPORT TableBuilder { public: // ... // Add key,value to the table being constructed. // REQUIRES: key is after any previously added key according to comparator. // REQUIRES: Finish(), Abandon() have not been called void Add(const Slice &key, const Slice &value); // Advanced operation: flush any buffered key/value pairs to file. // Can be used to ensure that two adjacent entries never live in // the same data block. Most clients should not need to use this method. // REQUIRES: Finish(), Abandon() have not been called void Flush(); // ... // Finish building the table. Stops using the file passed to the // constructor after this function returns. // REQUIRES: Finish(), Abandon() have not been called Status Finish(); // ... private: // .. void WriteBlock(BlockBuilder *block, BlockHandle *handle); void WriteRawBlock(const Slice &data, CompressionType, BlockHandle *handle); struct Rep; Rep *rep_; }; 
TableBuilder的成员变量

TableBuilder只有一个类型为Rep *的成员变量rep_。

Rep结构

Rep对TableBuilder构建sstable的过程中的相关数据进行了封装:

struct TableBuilder::Rep { Options options; Options index_block_options; WritableFile *file; uint64_t offset; Status status; BlockBuilder data_block; BlockBuilder index_block; std::string last_key; int64_t num_entries; bool closed; // Either Finish() or Abandon() has been called. FilterBlockBuilder *filter_block; // We do not emit the index entry for a block until we have seen the // first key for the next data block. This allows us to use shorter // keys in the index block. For example, consider a block boundary // between the keys "the quick brown fox" and "the who". We can use // "the r" as the key for the index block entry since it is >= all // entries in the first block and < all entries in subsequent // blocks. // // Invariant: r->pending_index_entry is true only if data_block is empty. bool pending_index_entry; BlockHandle pending_handle; // Handle to add to index block std::string compressed_output; // ... }; 

其中,file为保存sstable的文件,offset为文件中当前偏移的位置,data_block用于构建datablock,index_block用于构建indexblock,last_key为当前最后一个key,num_entries为sstable中entry数量,filter_block用于构建bloom filter,pending_handle用于构建block的索引(存入index_block中),compressed_output为压缩后的sstable。

这里用到了其他的一些辅助的结构,BlockBuilder在LevelDB源码分析-TableBuilder生成sstable中已经分析过了。FilterBlockBuilder将在以后分析。

WritableFile类

WritableFile类为文件操作定义了接口:

class LEVELDB_EXPORT WritableFile { public: WritableFile() = default; WritableFile(const WritableFile &) = delete; WritableFile &operator=(const WritableFile &) = delete; virtual ~WritableFile(); virtual Status Append(const Slice &data) = 0; virtual Status Close() = 0; virtual Status Flush() = 0; virtual Status Sync() = 0; }; 

具体实现在PosixWritableFile类中:

class PosixWritableFile : public WritableFile { private: // buf_[0, pos_-1] contains data to be written to fd_. std::string filename_; int fd_; char buf_[kBufSize]; size_t pos_; public: // ... virtual Status Append(const Slice &data) { // 见下文分析 } virtual Status Close() { // ... } virtual Status Flush() { return FlushBuffered(); } Status SyncDirIfManifest() { const char *f = filename_.c_str(); const char *sep = strrchr(f, '/'); Slice basename; std::string dir; if (sep == nullptr) { dir = "."; basename = f; } else { dir = std::string(f, sep - f); basename = sep + 1; } Status s; if (basename.starts_with("MANIFEST")) { int fd = open(dir.c_str(), O_RDONLY); if (fd < 0) { s = PosixError(dir, errno); } else { if (fsync(fd) < 0) { s = PosixError(dir, errno); } close(fd); } } return s; } virtual Status Sync() { // 见下文分析 } private: Status FlushBuffered() { // ... } Status WriteRaw(const char *p, size_t n) { // ... } }; 

其中filename_为文件名,int fd_为文件句柄,char buf_[kBufSize]为缓冲区,size_t pos_为当前缓冲区剩余空间起始位置的偏移。

Append函数用于写文件,如果缓冲区空间足够,则先将数据写入缓冲区:

 size_t n = data.size(); const char *p = data.data(); // Fit as much as possible into buffer. size_t copy = std::min(n, kBufSize - pos_); memcpy(buf_ + pos_, p, copy); p += copy; n -= copy; pos_ += copy; if (n == 0) { return Status::OK(); } 

如果空间不够,则将当前缓冲区的内容刷入文件:

 // Can't fit in buffer, so need to do at least one write. Status s = FlushBuffered(); if (!s.ok()) { return s; } 

然后看写入的数据是否大于缓冲区,如果大于缓冲区,则直接写入文件,否则也写入缓冲区。WriteRaw(const char *p, size_t n)调用write函数将数据写入文件:

 // Small writes go to buffer, large writes are written directly. if (n < kBufSize) { memcpy(buf_, p, n); pos_ = n; return Status::OK(); } return WriteRaw(p, n); 

Sync函数用于将file中没有被同步到硬盘的部分强制同步到硬盘上,而不是驻留在内存中,保证了持久化。这个函数首先调用SyncDirIfManifest函数将manifest文件写入硬盘(通过fsync函数):

 // Ensure new files referred to by the manifest are in the filesystem. Status s = SyncDirIfManifest(); if (!s.ok()) { return s; } 

然后将缓冲区中的数据刷入文件:

 s = FlushBuffered(); 

最后将文件写入硬盘(通过fsyncdata函数):

 if (s.ok()) { if (fdatasync(fd_) != 0) { s = PosixError(filename_, errno); } } return s; 
BlockHandle类

BlockHandle类封装了sstable中block的索引:

class BlockHandle { public: BlockHandle(); // The offset of the block in the file. uint64_t offset() const { return offset_; } void set_offset(uint64_t offset) { offset_ = offset; } // The size of the stored block uint64_t size() const { return size_; } void set_size(uint64_t size) { size_ = size; } void EncodeTo(std::string *dst) const; Status DecodeFrom(Slice *input); // Maximum encoding length of a BlockHandle enum { kMaxEncodedLength = 10 + 10 }; private: uint64_t offset_; uint64_t size_; }; 

其中offset_为block的偏移,size_为block的大小。EncodeTo函数将BlockHandle编码为字符串,DecodeFrom函数将Slice封装的数据解码为BlockHandle,以便于处理。

TableBuilder的成员函数

首先是Add函数:

void TableBuilder::Add(const Slice &key, const Slice &value) 

首先判断是否需要向index_block中添加索引,如果需要,则调用FindShortestSeparator函数根据last_key计算这个block的索引key值,并将pending_handle编码为字符串作为value值,然后存入index_block中:

 Rep *r = rep_; assert(!r->closed); if (!ok()) return; if (r->num_entries > 0) { assert(r->options.comparator->Compare(key, Slice(r->last_key)) > 0); } if (r->pending_index_entry) { assert(r->data_block.empty()); r->options.comparator->FindShortestSeparator(&r->last_key, key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } 

接下来向filter_block中添加key的映射:

 if (r->filter_block != nullptr) { r->filter_block->AddKey(key); } 

接着讲实际的KV值存入data_block:

 r->last_key.assign(key.data(), key.size()); r->num_entries++; r->data_block.Add(key, value); 

最后判断当前index_block的大小是否超过设定值,若超过则调用Flush函数写入文件对象:

 const size_t estimated_block_size = r->data_block.CurrentSizeEstimate(); if (estimated_block_size >= r->options.block_size) { Flush(); } 

这里值得注意的是FindShortestSeparator函数,LevelDB为了节省内存空间,在这里选取的key并不是last_key的key,而是只需要能够区分两个block就可以了。比如block和blrck之间只需要blp就可以区分。

Add函数调用了Flush函数:

void TableBuilder::Flush() 

Flush函数首先调用WriteBlock函数将当前还未写入文件对象的block写入文件对象:

 Rep *r = rep_; assert(!r->closed); if (!ok()) return; if (r->data_block.empty()) return; assert(!r->pending_index_entry); WriteBlock(&r->data_block, &r->pending_handle); 

然后使用r->file->Flush将文件缓冲区(由WritableFile封装)中的内容写入文件:

 if (ok()) { r->pending_index_entry = true; r->status = r->file->Flush(); } if (r->filter_block != nullptr) { r->filter_block->StartBlock(r->offset); } 

Flush函数调用了WriteBlock函数:

void TableBuilder::WriteBlock(BlockBuilder *block, BlockHandle *handle) 

WriteBlock函数首先通过block->Finish()向block中加入restarts和num_of_restarts:

 // File format contains a sequence of blocks where each block has: // block_data: uint8[n] // type: uint8 // crc: uint32 assert(ok()); Rep *r = rep_; Slice raw = block->Finish(); 

再根据设定选项对block进行压缩:

 Slice block_contents; CompressionType type = r->options.compression; // TODO(postrelease): Support more compression options: zlib? switch (type) { case kNoCompression: block_contents = raw; break; case kSnappyCompression: { std::string *compressed = &r->compressed_output; if (port::Snappy_Compress(raw.data(), raw.size(), compressed) && compressed->size() < raw.size() - (raw.size() / 8u)) { block_contents = *compressed; } else { // Snappy not supported, or compressed less than 12.5%, so just // store uncompressed form block_contents = raw; type = kNoCompression; } break; } } 

最后通过WriteRawBlock函数写入文件对象:

 WriteRawBlock(block_contents, type, handle); r->compressed_output.clear(); block->Reset(); 

WriteBlock函数调用了WriteRawBlock函数:

void TableBuilder::WriteRawBlock(const Slice &block_contents, CompressionType type, BlockHandle *handle) 

WriteRawBlock函数通过r->file->Append将传入的除了type和crc之外的block中的数据存入文件对象:

 Rep *r = rep_; handle->set_offset(r->offset); handle->set_size(block_contents.size()); r->status = r->file->Append(block_contents); 

然后计算type和crc,属于同一批:

 if (r->status.ok()) { char trailer[kBlockTrailerSize]; trailer[0] = type; uint32_t crc = crc32c::Value(block_contents.data(), block_contents.size()); crc = crc32c::Extend(crc, trailer, 1); // Extend crc to cover block type EncodeFixed32(trailer + 1, crc32c::Mask(crc)); r->status = r->file->Append(Slice(trailer, kBlockTrailerSize)); if (r->status.ok()) { r->offset += block_contents.size() + kBlockTrailerSize; } } 

其次是Finish函数:

Status TableBuilder::Finish() { Rep *r = rep_; Flush(); assert(!r->closed); r->closed = true; BlockHandle filter_block_handle, metaindex_block_handle, index_block_handle; // Write filter block if (ok() && r->filter_block != nullptr) { WriteRawBlock(r->filter_block->Finish(), kNoCompression, &filter_block_handle); } // Write metaindex block if (ok()) { BlockBuilder meta_index_block(&r->options); if (r->filter_block != nullptr) { // Add mapping from "filter.Name" to location of filter data std::string key = "filter."; key.append(r->options.filter_policy->Name()); std::string handle_encoding; filter_block_handle.EncodeTo(&handle_encoding); meta_index_block.Add(key, handle_encoding); } // TODO(postrelease): Add stats and other meta blocks WriteBlock(&meta_index_block, &metaindex_block_handle); } // Write index block if (ok()) { if (r->pending_index_entry) { r->options.comparator->FindShortSuccessor(&r->last_key); std::string handle_encoding; r->pending_handle.EncodeTo(&handle_encoding); r->index_block.Add(r->last_key, Slice(handle_encoding)); r->pending_index_entry = false; } WriteBlock(&r->index_block, &index_block_handle); } // Write footer if (ok()) { Footer footer; footer.set_metaindex_handle(metaindex_block_handle); footer.set_index_handle(index_block_handle); std::string footer_encoding; footer.EncodeTo(&footer_encoding); r->status = r->file->Append(footer_encoding); if (r->status.ok()) { r->offset += footer_encoding.size(); } } return r->status; } 

Finish函数首先写入metaindexblock,然后又写入indexblock,最后写入footer。

BuildTable接口

LevelDB调用BuildTable接口进行sstable文件的构建:

Status BuildTable(const std::string &dbname, Env *env, const Options &options, TableCache *table_cache, Iterator *iter, FileMetaData *meta) 

创建一个文件对象用于存储sstable:

 Status s; meta->file_size = 0; iter->SeekToFirst(); std::string fname = TableFileName(dbname, meta->number); if (iter->Valid()) { WritableFile *file; s = env->NewWritableFile(fname, &file); if (!s.ok()) { return s; } 

创建一个TableBuilder对象用于构建sstable,并通过迭代器遍历memtable将值依次添加进sstable:

 TableBuilder *builder = new TableBuilder(options, file); meta->smallest.DecodeFrom(iter->key()); for (; iter->Valid(); iter->Next()) { Slice key = iter->key(); meta->largest.DecodeFrom(key); builder->Add(key, iter->value()); } // Finish and check for builder errors s = builder->Finish(); if (s.ok()) { meta->file_size = builder->FileSize(); assert(meta->file_size > 0); } delete builder; 

将文件强制写入磁盘:

 // Finish and check for file errors if (s.ok()) { s = file->Sync(); } if (s.ok()) { s = file->Close(); } delete file; file = nullptr; 

228 Love u

原文链接:https://my.oschina.net/yunanlong/blog/3032915
关注公众号

低调大师中文资讯倾力打造互联网数据资讯、行业资源、电子商务、移动互联网、网络营销平台。

持续更新报道IT业界、互联网、市场资讯、驱动更新,是最及时权威的产业资讯及硬件资讯报道平台。

转载内容版权归作者及来源网站所有,本站原创内容转载请注明来源。

文章评论

共有0条评论来说两句吧...

文章二维码

扫描即可查看该文章

点击排行

推荐阅读

最新文章