您现在的位置是:首页 > 文章详情



MemTable(db/memtable.h db/memtable.cc db/skiplist.h)




class MemTable { public: // ... // Increase reference count. void Ref() { ++refs_; } // Drop reference count. Delete if no more references exist. void Unref() { --refs_; assert(refs_ >= 0); if (refs_ <= 0) { delete this; } } // ... // Return an iterator that yields the contents of the memtable. // // The caller must ensure that the underlying MemTable remains live // while the returned iterator is live. The keys returned by this // iterator are internal keys encoded by AppendInternalKey in the // db/format.{h,cc} module. Iterator *NewIterator(); // Add an entry into memtable that maps key to value at the // specified sequence number and with the specified type. // Typically value will be empty if type==kTypeDeletion. void Add(SequenceNumber seq, ValueType type, const Slice &key, const Slice &value); // If memtable contains a value for key, store it in *value and return true. // If memtable contains a deletion for key, store a NotFound() error // in *status and return true. // Else, return false. bool Get(const LookupKey &key, std::string *value, Status *s); private: // ... struct KeyComparator { const InternalKeyComparator comparator; explicit KeyComparator(const InternalKeyComparator &c) : comparator(c) {} int operator()(const char *a, const char *b) const; }; friend class MemTableIterator; friend class MemTableBackwardIterator; typedef SkipList<const char *, KeyComparator> Table; KeyComparator comparator_; int refs_; Arena arena_; Table table_; // ... }; 






class Arena { public: // ... // Return a pointer to a newly allocated memory block of "bytes" bytes. char *Allocate(size_t bytes); // Allocate memory with the normal alignment guarantees provided by malloc char *AllocateAligned(size_t bytes); // ... private: char *AllocateFallback(size_t bytes); char *AllocateNewBlock(size_t block_bytes); // Allocation state char *alloc_ptr_; size_t alloc_bytes_remaining_; // Array of new[] allocated memory blocks std::vector<char *> blocks_; // Total memory usage of the arena. port::AtomicPointer memory_usage_; // ... }; 


Arena类提供的方法主要有两个,一个是Allocate(size_t bytes):

inline char *Arena::Allocate(size_t bytes) char *Arena::AllocateFallback(size_t bytes) char *Arena::AllocateNewBlock(size_t block_bytes) { char *result = new char[block_bytes]; blocks_.push_back(result); memory_usage_.NoBarrier_Store( reinterpret_cast<void *>(MemoryUsage() + block_bytes + sizeof(char *))); return result; } 

Allocate(size_t bytes)函数分配的内存不保证对齐。首先检查当前可以被分配的内存空间是否足够,如果够的话,直接将从alloc_ptr_指向的位置开始的bytes个字节分配给调用者,并移动alloc_ptr_指向新的位置,调整alloc_bytes_remaining_:

 // The semantics of what to return are a bit messy if we allow // 0-byte allocations, so we disallow them here (we don't need // them for our internal use). assert(bytes > 0); if (bytes <= alloc_bytes_remaining_) { char *result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result; } 

如果不够,就调用AllocateFallback(size_t bytes)函数:

 return AllocateFallback(bytes); 

AllocateFallback(size_t bytes)函数首先检查bytes大小是否超过1/4个block,如果超过,则直接调用AllocateNewBlock(size_t block_bytes)函数分配一个新的block给调用者,并且这个新的block中除了当前的bytes个字节,其余部分将不会被使用,而alloc_ptr_的指向却不变,也就是说当前可以被分配的内存保持不变,因为此时剩余空间至少为1/4个block,这样设计就可以减少大块的内存碎片:

 if (bytes > kBlockSize / 4) { // Object is more than a quarter of our block size. Allocate it separately // to avoid wasting too much space in leftover bytes. char *result = AllocateNewBlock(bytes); return result; } 


 // We waste the remaining space in the current block. alloc_ptr_ = AllocateNewBlock(kBlockSize); alloc_bytes_remaining_ = kBlockSize; char *result = alloc_ptr_; alloc_ptr_ += bytes; alloc_bytes_remaining_ -= bytes; return result; 

还有一个内存分配函数是AllocateAligned(size_t bytes):

char *Arena::AllocateAligned(size_t bytes) 


 const int align = (sizeof(void *) > 8) ? sizeof(void *) : 8; assert((align & (align - 1)) == 0); // Pointer size should be a power of 2 size_t current_mod = reinterpret_cast<uintptr_t>(alloc_ptr_) & (align - 1); size_t slop = (current_mod == 0 ? 0 : align - current_mod); size_t needed = bytes + slop; char *result; 

剩下的代码和Allocate(size_t bytes)中实现一致:

 if (needed <= alloc_bytes_remaining_) { result = alloc_ptr_ + slop; alloc_ptr_ += needed; alloc_bytes_remaining_ -= needed; } else { // AllocateFallback always returned aligned memory result = AllocateFallback(bytes); } assert((reinterpret_cast<uintptr_t>(result) & (align - 1)) == 0); return result; 


template <typename Key, class Comparator> class SkipList { private: struct Node; public: // ... // Insert key into the list. // REQUIRES: nothing that compares equal to key is currently in the list. void Insert(const Key &key); // Returns true iff an entry that compares equal to key is in the list. bool Contains(const Key &key) const; // Iteration over the contents of a skip list class Iterator { // ... }; private: // ... // Immutable after construction Comparator const compare_; Arena *const arena_; // Arena used for allocations of nodes Node *const head_; // Modified only by Insert(). Read racily by readers, but stale // values are ok. port::AtomicPointer max_height_; // Height of the entire list // ... Node *NewNode(const Key &key, int height); // ... // Return true if key is greater than the data stored in "n" bool KeyIsAfterNode(const Key &key, Node *n) const; // Return the earliest node that comes at or after key. // Return nullptr if there is no such node. // // If prev is non-null, fills prev[level] with pointer to previous // node at "level" for every level in [0..max_height_-1]. Node *FindGreaterOrEqual(const Key &key, Node **prev) const; // Return the latest node with a key < key. // Return head_ if there is no such node. Node *FindLessThan(const Key &key) const; // Return the last node in the list. // Return head_ if list is empty. Node *FindLast() const; // ... }; 





template <typename Key, class Comparator> struct SkipList<Key, Comparator>::Node { // ... Key const key; // Accessors/mutators for links. Wrapped in methods so we can // add the appropriate barriers as necessary. Node *Next(int n) // ... void SetNext(int n, Node *x) // ... // No-barrier variants that can be safely used in a few locations. Node *NoBarrier_Next(int n) // ... void NoBarrier_SetNext(int n, Node *x) // ... private: // Array of length equal to the node height. next_[0] is lowest level link. port::AtomicPointer next_[1]; }; 



template <typename Key, class Comparator> typename SkipList<Key, Comparator>::Node * SkipList<Key, Comparator>::NewNode(const Key &key, int height) { char *mem = arena_->AllocateAligned( sizeof(Node) + sizeof(port::AtomicPointer) * (height - 1)); return new (mem) Node(key); } 

这个函数首先分配了一块对齐的内存,然后通过new (mem) Node(key)这个语句创建一个节点,我也不是很懂。



 // Iteration over the contents of a skip list class Iterator { public: // ... // Returns true iff the iterator is positioned at a valid node. bool Valid() const; // Returns the key at the current position. // REQUIRES: Valid() const Key &key() const; // Advances to the next position. // REQUIRES: Valid() void Next(); // Advances to the previous position. // REQUIRES: Valid() void Prev(); // Advance to the first entry with a key >= target void Seek(const Key &target); // Position at the first entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToFirst(); // Position at the last entry in list. // Final state of iterator is Valid() iff list is not empty. void SeekToLast(); private: const SkipList *list_; Node *node_; // ... }; 




template <typename Key, class Comparator> typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindGreaterOrEqual(const Key &key, Node **prev) const { Node *x = head_; int level = GetMaxHeight() - 1; while (true) { Node *next = x->Next(level); if (KeyIsAfterNode(key, next)) { // Keep searching in this list x = next; } else { if (prev != nullptr) prev[level] = x; if (level == 0) { return next; } else { // Switch to next list level--; } } } } template <typename Key, class Comparator> typename SkipList<Key, Comparator>::Node * SkipList<Key, Comparator>::FindLessThan(const Key &key) const { Node *x = head_; int level = GetMaxHeight() - 1; while (true) { assert(x == head_ || compare_(x->key, key) < 0); Node *next = x->Next(level); if (next == nullptr || compare_(next->key, key) >= 0) { if (level == 0) { return x; } else { // Switch to next list level--; } } else { x = next; } } } template <typename Key, class Comparator> typename SkipList<Key, Comparator>::Node *SkipList<Key, Comparator>::FindLast() const { Node *x = head_; int level = GetMaxHeight() - 1; while (true) { Node *next = x->Next(level); if (next == nullptr) { if (level == 0) { return x; } else { // Switch to next list level--; } } else { x = next; } } } 



template <typename Key, class Comparator> void SkipList<Key, Comparator>::Insert(const Key &key) template <typename Key, class Comparator> bool SkipList<Key, Comparator>::Contains(const Key &key) const 



void MemTable::Add(SequenceNumber s, ValueType type, const Slice &key, const Slice &value) { // Format of an entry is concatenation of: // key_size : varint32 of internal_key.size() // key bytes : char[internal_key.size()] // value_size : varint32 of value.size() // value bytes : char[value.size()] size_t key_size = key.size(); size_t val_size = value.size(); size_t internal_key_size = key_size + 8; const size_t encoded_len = VarintLength(internal_key_size) + internal_key_size + VarintLength(val_size) + val_size; char *buf = arena_.Allocate(encoded_len); char *p = EncodeVarint32(buf, internal_key_size); memcpy(p, key.data(), key_size); p += key_size; EncodeFixed64(p, (s << 8) | type); p += 8; p = EncodeVarint32(p, val_size); memcpy(p, value.data(), val_size); assert(p + val_size == buf + encoded_len); table_.Insert(buf); } 

Add函数将key和value组成key_size | sequencenumber | type | key | value_size | value这样的形式,然后调用SkipList类提供的Insert函数插入跳表中。

bool MemTable::Get(const LookupKey &key, std::string *value, Status *s) 


 Slice memkey = key.memtable_key(); Table::Iterator iter(&table_); iter.Seek(memkey.data()); 


 if (iter.Valid()) { // entry format is: // klength varint32 // userkey char[klength] // tag uint64 // vlength varint32 // value char[vlength] // Check that it belongs to same user key. We do not check the // sequence number since the Seek() call above should have skipped // all entries with overly large sequence numbers. const char *entry = iter.key(); uint32_t key_length; const char *key_ptr = GetVarint32Ptr(entry, entry + 5, &key_length); if (comparator_.comparator.user_comparator()->Compare( Slice(key_ptr, key_length - 8), key.user_key()) == 0) { // Correct user key const uint64_t tag = DecodeFixed64(key_ptr + key_length - 8); switch (static_cast<ValueType>(tag & 0xff)) { case kTypeValue: { Slice v = GetLengthPrefixedSlice(key_ptr + key_length); value->assign(v.data(), v.size()); return true; } case kTypeDeletion: *s = Status::NotFound(Slice()); return true; } } } return false; 


class MemTableIterator : public Iterator { public: explicit MemTableIterator(MemTable::Table *table) : iter_(table) {} virtual bool Valid() const { return iter_.Valid(); } virtual void Seek(const Slice &k) { iter_.Seek(EncodeKey(&tmp_, k)); } virtual void SeekToFirst() { iter_.SeekToFirst(); } virtual void SeekToLast() { iter_.SeekToLast(); } virtual void Next() { iter_.Next(); } virtual void Prev() { iter_.Prev(); } virtual Slice key() const { return GetLengthPrefixedSlice(iter_.key()); } virtual Slice value() const { Slice key_slice = GetLengthPrefixedSlice(iter_.key()); return GetLengthPrefixedSlice(key_slice.data() + key_slice.size()); } virtual Status status() const { return Status::OK(); } private: MemTable::Table::Iterator iter_; std::string tmp_; // For passing to EncodeKey // No copying allowed MemTableIterator(const MemTableIterator &); void operator=(const MemTableIterator &); }; 


227 Love u










