LevelDB作为一个数据存储引擎,它提供了Put,Delete和Get方法对数据库进行修改和查询操作, 实际上不管是Put操作还是Delete操作, LevelDB都会将其转换成一条记录,然后以顺序写的方式将这条记录追加到log文件的尾部,因为尽管这是一个磁盘的读写操作,但是文件的顺序追加写入效率是很高的,所以并不会导致写入速度降低,如果log文件写入数据成功,那么将这条记录插入到Memtable当中去,本篇文章会介绍LevelDB的一个删除或者写入操作是如何应用到DB当中.
执行Put/Delete操作首先是通过一个WriteBatch来表示的,从下面代码可以看到不管是执行Put还是Delete操作,实际上是在WriteBatch的rep_中修改以及追加一些信息,rep_最开始8个字节用于存储Sequence,表示当前Writebatch中第一个操作所对应的序列号,接下来4个字节用户存储Count,表示当前WriteBatch有多少个Operation,然后紧接着就是count条Operation记录,每一个operation记录就是一个Put操作或者Delete操作.
对于Put和Delete,Operation的表现形式不同,表示Put的Operation第一个字节存储kTypeValue,表示当前这个Operation记录的是一个写入操作,然后依次追加key的长度,key的内容以及value的长度和value的内容,表示要添加的key/value,表示Delete的Operation第一字解存储kTypeDeletion,表示当前这个Operation记录的是一个删除操作,然后依次追加key的长度,key的内容,表示要删除的key.
/*
* ************* WriteBatch rep_ Format **************
* | <Sequence> | <Count> | <Operation 1> | <Operation 2> | ... |
* 8 Bytes 4 Bytes
*
*
* ************** Delete Operation Format *************
* | <kTypeDeletion> | <key size> | <key content> |
* 1 Byte 1 ~ 5 Bytes key.size Bytes
*
*
* ******************************** Put Operation Format ********************************
* | <kTypeValue> | <key size> | <key content> | <value size> | < value content> |
* 1 Byte 1 ~ 5 Bytes key.size Bytes 1 ~ 5 Bytes value.size Bytes
*/
void WriteBatch::Put(const Slice& key, const Slice& value) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeValue));
PutLengthPrefixedSlice(&rep_, key);
PutLengthPrefixedSlice(&rep_, value);
}
void WriteBatch::Delete(const Slice& key) {
WriteBatchInternal::SetCount(this, WriteBatchInternal::Count(this) + 1);
rep_.push_back(static_cast<char>(kTypeDeletion));
PutLengthPrefixedSlice(&rep_, key);
}
在真正的写入操作之前会将WriteBatch封装成一个Write对象, 然后扔到一个队列里面等待消费,我们先来看一下Write对象的定义.
// Information kept for every waiting writer
struct DBImpl::Writer {
Status status; // 本次写入的结果
WriteBatch* batch; // 本次写入对应的WriteBatch
bool sync; // 本次写入数据对应的log是否立即刷盘
bool done; // 本次写入数据是否已经完成
port::CondVar cv; // 条件变量, 如果在此之前有其他Write正在写入,则等待
explicit Writer(port::Mutex* mu) : cv(mu) { }
};
DBimpl::Write()方法中的流程我们分三个阶段来看:
Writer w(&mutex_);
w.batch = my_batch;
w.sync = options.sync;
w.done = false;
// 在这里首先会抢占锁,所以在很多线程进行写入的时候这里会进行互斥,
// 保证每个Writer能够安全的放入writers_队列当中
MutexLock l(&mutex_);
writers_.push_back(&w);
// 当前这个Writer并没有完成并且当前这个Writer并不是writers_
// 队列的第一个(这说明之前还有Writer需要比它先完成),则等待
while (!w.done && &w != writers_.front()) {
w.cv.Wait();
}
// 被唤醒之后发现自己已经被前面的WriteBatch打包一起完成了,
// 则直接返回结果
if (w.done) {
return w.status;
};
// May temporarily unlock and wait.
// 在执行写入之前先检查一下Memtable, 是否还有空间
Status status = MakeRoomForWrite(my_batch == NULL);
uint64_t last_sequence = versions_->LastSequence();
Writer* last_writer = &w;
if (status.ok() && my_batch != NULL) { // NULL batch is for compactions
// 执行完BuildBatchGroup之后db_impl的成员变量tmp_batch_中存储writer_集合
// 中已经被合并的WriteBatch,而last_writer指向writer_集合中最后一个被合并
// 的WriteBatch
WriteBatch* updates = BuildBatchGroup(&last_writer);
// 为当前已经做了合并操作的WriteBatch设置sequence, 这个sequence
// 对应于这个WriteBatch中第一个操作的序列号
WriteBatchInternal::SetSequence(updates, last_sequence + 1);
// 更新last_sequence, 当前合并的WriteBatch中有多少个操作则加上多少
last_sequence += WriteBatchInternal::Count(updates);
// Add to log and apply to memtable. We can release the lock
// during this phase since &w is currently responsible for logging
// and protects against concurrent loggers and concurrent writes
// into mem_.
{
mutex_.Unlock();
status = log_->AddRecord(WriteBatchInternal::Contents(updates));
bool sync_error = false;
if (status.ok() && options.sync) {
status = logfile_->Sync();
if (!status.ok()) {
sync_error = true;
}
}
// 写log成功之后将整个WriteBatch中的所有操作依次插入到memtable当中
if (status.ok()) {
status = WriteBatchInternal::InsertInto(updates, mem_);
}
mutex_.Lock();
if (sync_error) {
// The state of the log file is indeterminate: the log record we
// just added may or may not show up when the DB is re-opened.
// So we force the DB into a mode where all future writes fail.
RecordBackgroundError(status);
}
}
if (updates == tmp_batch_) tmp_batch_->Clear();
versions_->SetLastSequence(last_sequence);
}
// w last_writer
// writer1 -> writer2 -> writer3 -> writer4 -> writer5 -> ...
// done done done x x
//
// 当当前队头Writer完成自身并且顺带完成了后续的几个Writer之后需要更新状态,
// 如上图所示, 当前writer1完成时顺带将writer2, writer3完成, 这时候
// writer2, writer3可能在其他线程里由于不是队头元素而被wait()住了,
// 这时候我们需要将其status和done进行赋值并且pop掉,并且调用条件变
// 量的Signal将其唤醒(第1219行), 唤醒之后发现其done成员变量为ture了
// 这时候就会返回其对应的status了;
while (true) {
Writer* ready = writers_.front();
writers_.pop_front();
if (ready != &w) {
ready->status = status;
ready->done = true;
ready->cv.Signal();
}
// last_writer是本队列中被消费的最后一个条目,也就是上图中的writer3
if (ready == last_writer) break;
}
//
// writer4 -> writer5 -> ...
// x x
//
// 如果队列不为空,调用唤醒对头元素, 也就是writer4
// Notify new head of write queue
if (!writers_.empty()) {
writers_.front()->cv.Signal();
}
return status;
在调用DBImpl::Write()将数据写入到Memtable之前我们会调用MakeRoomForWrite(), 看函数名是提供空间供数据写入,实际上这个函数里面还是做了很多事情,比如在Level 0层的sst文件数量到达软上限或者硬上限的时候分别执行缓写和阻写操作(因为Level0层的sst文件比较特殊,文件和文件之前可能有overlap, 如果文件过多,在迭代器遍历Level0层的时候会有读放大的问题,所以不能放任不管,有必要对其做一个限制), 还有在Memtable大小已经大于writetobuffer上限的时候如果能顺利将其转换成Immutable Memtable并且重新创建一个新的Memtable, 如果当前Immutable Memtable还不为空则可能当前正在做Compact, 这时我们就要等待其完成再做转换了.
// REQUIRES: mutex_ is held
// REQUIRES: this thread is currently at the front of the writer queue
Status DBImpl::MakeRoomForWrite(bool force) {
mutex_.AssertHeld();
assert(!writers_.empty());
bool allow_delay = !force;
Status s;
while (true) {
if (!bg_error_.ok()) {
// Yield previous error
s = bg_error_;
break;
} else if (
allow_delay &&
versions_->NumLevelFiles(0) >= config::kL0_SlowdownWritesTrigger) {
// We are getting close to hitting a hard limit on the number of
// L0 files. Rather than delaying a single write by several
// seconds when we hit the hard limit, start delaying each
// individual write by 1ms to reduce latency variance. Also,
// this delay hands over some CPU to the compaction thread in
// case it is sharing the same core as the writer.
// 如果当前level0层的sst文件已经到达了'软上限',这时候我们要'缓写'
// 所谓的缓写就是sleep 1s之后再写入,第一次allow_delay是true, sleep
// 之后将allow_delay赋值为false, 下次就不sleep了
mutex_.Unlock();
env_->SleepForMicroseconds(1000);
allow_delay = false; // Do not delay a single write more than once
mutex_.Lock();
} else if (!force &&
(mem_->ApproximateMemoryUsage() <= options_.write_buffer_size)) {
// There is room in current memtable
// 如果当前是写入操作, 并且memtable的内存使用量小于write_buffer_size
// 直接break
break;
} else if (imm_ != NULL) {
// We have filled up the current memtable, but the previous
// one is still being compacted, so we wait.
// 如果我们的memetable大小已经达到了write_buffer_size,但是immutable memtable
// 目前不为空,表示可能在执行compact,这时候我们wait等待
Log(options_.info_log, "Current memtable full; waiting...\n");
bg_cv_.Wait();
} else if (versions_->NumLevelFiles(0) >= config::kL0_StopWritesTrigger) {
// There are too many level-0 files.
// 如果我们的level 0层的sst文件已经到达了硬上限,这时候我们执行阻写操作
Log(options_.info_log, "Too many L0 files; waiting...\n");
bg_cv_.Wait();
} else {
// Attempt to switch to a new memtable and trigger compaction of old
assert(versions_->PrevLogNumber() == 0);
// 在执行compact之前先生成一个新的log文件, 如果还没有执行compact成功
// 就把db关闭,那么下次启动db的时候,可以从旧的log文件中恢复数据
uint64_t new_log_number = versions_->NewFileNumber();
WritableFile* lfile = NULL;
s = env_->NewWritableFile(LogFileName(dbname_, new_log_number), &lfile);
if (!s.ok()) {
// Avoid chewing through file number space in a tight loop.
versions_->ReuseFileNumber(new_log_number);
break;
}
delete log_;
delete logfile_;
logfile_ = lfile;
logfile_number_ = new_log_number;
log_ = new log::Writer(lfile);
// 将原先的memtable 转化为immutable memtable(后续执行compaction就是
// 使用的这个immutable memtable), 并且重新创建memtable
imm_ = mem_;
has_imm_.Release_Store(imm_);
mem_ = new MemTable(internal_comparator_);
mem_->Ref();
force = false; // Do not force another compaction if have room
MaybeScheduleCompaction();
}
}
return s;
}
通过梳理数据写入流程我们会发现LevelDB每一个操作(不管是写入还是删除)都有一个序列号,这个序列号是递增的,序列号越大表示该操作越新,再深入学习LevelDB代码后会发现,其快照的概念也是基于这个序列号实现的.