From: Hiram Chirino Date: Tue, 30 Oct 2012 16:56:52 -0400 Subject: [PATCH] Added a DB:SuspendCompations() and DB:ResumeCompactions() methods. Fixes issue #184 https://code.google.com/p/leveldb/issues/detail?id=184 diff --git a/db/db_impl.cc b/db/db_impl.cc index 1a4e459..ae7b96d 100644 --- a/db/db_impl.cc +++ b/db/db_impl.cc @@ -135,6 +135,9 @@ DBImpl::DBImpl(const Options& raw_options, const std::string& dbname) table_cache_(new TableCache(dbname_, options_, TableCacheSize(options_))), db_lock_(nullptr), shutting_down_(false), + suspend_cv(&suspend_mutex), + suspend_count(0), + suspended(false), background_work_finished_signal_(&mutex_), mem_(nullptr), imm_(nullptr), @@ -1464,6 +1467,39 @@ void DBImpl::GetApproximateSizes(const Range* range, int n, uint64_t* sizes) { v->Unref(); } +void DBImpl::SuspendCompactions() { + MutexLock l(& suspend_mutex); + env_->Schedule(&SuspendWork, this); + suspend_count++; + while( !suspended ) { + suspend_cv.Wait(); + } +} +void DBImpl::SuspendWork(void* db) { + reinterpret_cast(db)->SuspendCallback(); +} +void DBImpl::SuspendCallback() { + MutexLock l(&suspend_mutex); + Log(options_.info_log, "Compactions suspended"); + suspended = true; + suspend_cv.SignalAll(); + while( suspend_count > 0 ) { + suspend_cv.Wait(); + } + suspended = false; + suspend_cv.SignalAll(); + Log(options_.info_log, "Compactions resumed"); +} +void DBImpl::ResumeCompactions() { + MutexLock l(&suspend_mutex); + suspend_count--; + suspend_cv.SignalAll(); + while( suspended ) { + suspend_cv.Wait(); + } +} + + // Default implementations of convenience methods that subclasses of DB // can call if they wish Status DB::Put(const WriteOptions& opt, const Slice& key, const Slice& value) { diff --git a/db/db_impl.h b/db/db_impl.h index c7b0172..d955c2a 100644 --- a/db/db_impl.h +++ b/db/db_impl.h @@ -48,6 +48,8 @@ class DBImpl : public DB { bool GetProperty(const Slice& property, std::string* value) override; void GetApproximateSizes(const Range* range, int n, uint64_t* sizes) override; void CompactRange(const Slice* begin, const Slice* end) override; + void SuspendCompactions() override; + void ResumeCompactions() override; // Extra methods (for testing) that are not in the public DB interface @@ -170,6 +172,13 @@ class DBImpl : public DB { // Lock over the persistent DB state. Non-null iff successfully acquired. FileLock* db_lock_; + port::Mutex suspend_mutex; + port::CondVar suspend_cv; + int suspend_count; + bool suspended; + static void SuspendWork(void* db); + void SuspendCallback(); + // State below is protected by mutex_ port::Mutex mutex_; std::atomic shutting_down_; diff --git a/db/db_test.cc b/db/db_test.cc index 908b41d..2e65370 100644 --- a/db/db_test.cc +++ b/db/db_test.cc @@ -2051,6 +2051,8 @@ class ModelDB : public DB { }; explicit ModelDB(const Options& options) : options_(options) {} + virtual void SuspendCompactions() override {} + virtual void ResumeCompactions() override {} ~ModelDB() override = default; Status Put(const WriteOptions& o, const Slice& k, const Slice& v) override { return DB::Put(o, k, v); diff --git a/include/leveldb/db.h b/include/leveldb/db.h index a13d147..61c29c0 100644 --- a/include/leveldb/db.h +++ b/include/leveldb/db.h @@ -145,6 +145,12 @@ class LEVELDB_EXPORT DB { // Therefore the following call will compact the entire database: // db->CompactRange(nullptr, nullptr); virtual void CompactRange(const Slice* begin, const Slice* end) = 0; + + // Suspends the background compaction thread. This methods + // returns once suspended. + virtual void SuspendCompactions() = 0; + // Resumes a suspended background compation thread. + virtual void ResumeCompactions() = 0; }; // Destroy the contents of the specified database.