diff options
-rw-r--r-- | db/compaction/compaction_job_test.cc | 4 | ||||
-rw-r--r-- | db/db_impl/db_impl.cc | 14 | ||||
-rw-r--r-- | db/db_wal_test.cc | 2 | ||||
-rw-r--r-- | db/flush_job_test.cc | 2 | ||||
-rw-r--r-- | db/memtable_list_test.cc | 4 | ||||
-rw-r--r-- | db/repair.cc | 2 | ||||
-rw-r--r-- | db/version_set.cc | 59 | ||||
-rw-r--r-- | db/version_set.h | 11 | ||||
-rw-r--r-- | db/version_set_test.cc | 57 | ||||
-rw-r--r-- | db/version_util.h | 3 | ||||
-rw-r--r-- | db/wal_manager_test.cc | 2 | ||||
-rw-r--r-- | include/rocksdb/listener.h | 3 | ||||
-rw-r--r-- | tools/ldb_cmd.cc | 6 | ||||
-rw-r--r-- | tools/ldb_cmd_test.cc | 3 |
14 files changed, 142 insertions, 30 deletions
diff --git a/db/compaction/compaction_job_test.cc b/db/compaction/compaction_job_test.cc index a16891110..886bcb6e1 100644 --- a/db/compaction/compaction_job_test.cc +++ b/db/compaction/compaction_job_test.cc @@ -217,7 +217,7 @@ class CompactionJobTestBase : public testing::Test { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)), + /*error_handler=*/nullptr, /*read_only=*/false)), shutting_down_(false), mock_table_factory_(new mock::MockTableFactory()), error_handler_(nullptr, db_options_, &mutex_), @@ -547,7 +547,7 @@ class CompactionJobTestBase : public testing::Test { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); compaction_job_stats_.Reset(); ASSERT_OK(SetIdentityFile(env_, dbname_)); diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc index 297c6aceb..8084f02cf 100644 --- a/db/db_impl/db_impl.cc +++ b/db/db_impl/db_impl.cc @@ -280,7 +280,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname, dbname_, &immutable_db_options_, file_options_, table_cache_.get(), write_buffer_manager_, &write_controller_, &block_cache_tracer_, io_tracer_, db_id_, db_session_id_, options.daily_offpeak_time_utc, - &error_handler_)); + &error_handler_, read_only)); column_family_memtables_.reset( new ColumnFamilyMemTablesImpl(versions_->GetColumnFamilySet())); @@ -657,6 +657,18 @@ Status DBImpl::CloseHelper() { // versions need to be destroyed before table_cache since it can hold // references to table_cache. + { + Status s = versions_->Close(directories_.GetDbDir(), &mutex_); + if (!s.ok()) { + ROCKS_LOG_ERROR(immutable_db_options_.info_log, + "Unable to close MANIFEST with error -- %s", + s.ToString().c_str()); + if (ret.ok()) { + ret = s; + } + } + } + versions_.reset(); mutex_.Unlock(); if (db_lock_ != nullptr) { diff --git a/db/db_wal_test.cc b/db/db_wal_test.cc index fbc01131e..88c6d1aac 100644 --- a/db/db_wal_test.cc +++ b/db/db_wal_test.cc @@ -1543,7 +1543,7 @@ class RecoveryTestHelper { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", options.daily_offpeak_time_utc, - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); wal_manager.reset( new WalManager(db_options, file_options, /*io_tracer=*/nullptr)); diff --git a/db/flush_job_test.cc b/db/flush_job_test.cc index 21d1571a0..95cde2d4d 100644 --- a/db/flush_job_test.cc +++ b/db/flush_job_test.cc @@ -132,7 +132,7 @@ class FlushJobTestBase : public testing::Test { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); EXPECT_OK(versions_->Recover(column_families, false)); } diff --git a/db/memtable_list_test.cc b/db/memtable_list_test.cc index 9a5b7557f..f32f6e1e0 100644 --- a/db/memtable_list_test.cc +++ b/db/memtable_list_test.cc @@ -108,7 +108,7 @@ class MemTableListTest : public testing::Test { &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr); + /*error_handler=*/nullptr, /*read_only=*/false); std::vector<ColumnFamilyDescriptor> cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); @@ -160,7 +160,7 @@ class MemTableListTest : public testing::Test { &write_controller, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr); + /*error_handler=*/nullptr, /*read_only=*/false); std::vector<ColumnFamilyDescriptor> cf_descs; cf_descs.emplace_back(kDefaultColumnFamilyName, ColumnFamilyOptions()); cf_descs.emplace_back("one", ColumnFamilyOptions()); diff --git a/db/repair.cc b/db/repair.cc index ef21f7ea6..7585d9758 100644 --- a/db/repair.cc +++ b/db/repair.cc @@ -123,7 +123,7 @@ class Repairer { raw_table_cache_.get(), &wb_, &wc_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", db_session_id_, db_options.daily_offpeak_time_utc, - /*error_handler=*/nullptr), + /*error_handler=*/nullptr, /*read_only=*/false), next_file_number_(1), db_lock_(nullptr), closed_(false) { diff --git a/db/version_set.cc b/db/version_set.cc index 93a21cc05..72febac90 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -5073,7 +5073,7 @@ VersionSet::VersionSet( BlockCacheTracer* const block_cache_tracer, const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id, const std::string& db_session_id, const std::string& daily_offpeak_time_utc, - ErrorHandler* const error_handler) + ErrorHandler* const error_handler, const bool read_only) : column_family_set_(new ColumnFamilySet( dbname, _db_options, storage_options, table_cache, write_buffer_manager, write_controller, block_cache_tracer, io_tracer, @@ -5100,7 +5100,58 @@ VersionSet::VersionSet( io_tracer_(io_tracer), db_session_id_(db_session_id), offpeak_time_option_(OffpeakTimeOption(daily_offpeak_time_utc)), - error_handler_(error_handler) {} + error_handler_(error_handler), + read_only_(read_only), + closed_(false) {} + +Status VersionSet::Close(FSDirectory* db_dir, InstrumentedMutex* mu) { + Status s; + if (closed_ || read_only_ || !manifest_file_number_ || !descriptor_log_) { + return s; + } + + std::string manifest_file_name = + DescriptorFileName(dbname_, manifest_file_number_); + uint64_t size = 0; + IOStatus io_s = descriptor_log_->Close(); + descriptor_log_.reset(); + TEST_SYNC_POINT("VersionSet::Close:AfterClose"); + if (io_s.ok()) { + io_s = fs_->GetFileSize(manifest_file_name, IOOptions(), &size, nullptr); + } + if (!io_s.ok() || size != manifest_file_size_) { + if (io_s.ok()) { + // This means the size is not as expected. So we treat it as a + // corruption and set io_s appropriately + io_s = IOStatus::Corruption(); + } + ColumnFamilyData* cfd = GetColumnFamilySet()->GetDefault(); + const ImmutableOptions* ioptions = cfd->ioptions(); + IOErrorInfo io_error_info(io_s, FileOperationType::kVerify, + manifest_file_name, /*length=*/size, + /*offset=*/0); + + for (auto& listener : ioptions->listeners) { + listener->OnIOError(io_error_info); + } + io_s.PermitUncheckedError(); + io_error_info.io_status.PermitUncheckedError(); + ROCKS_LOG_ERROR(db_options_->info_log, + "MANIFEST verification on Close, " + "filename %s, expected size %" PRIu64 + " failed with status %s and " + "actual size %" PRIu64 "\n", + manifest_file_name.c_str(), manifest_file_size_, + io_s.ToString().c_str(), size); + VersionEdit edit; + assert(cfd); + const MutableCFOptions& cf_opts = *cfd->GetLatestMutableCFOptions(); + s = LogAndApply(cfd, cf_opts, ReadOptions(), &edit, mu, db_dir); + } + + closed_ = true; + return s; +} VersionSet::~VersionSet() { // we need to delete column_family_set_ because its destructor depends on @@ -6244,7 +6295,7 @@ Status VersionSet::ReduceNumberOfLevels(const std::string& dbname, nullptr /*BlockCacheTracer*/, nullptr /*IOTracer*/, /*db_id*/ "", /*db_session_id*/ "", options->daily_offpeak_time_utc, - /*error_handler_*/ nullptr); + /*error_handler_*/ nullptr, /*read_only=*/false); Status status; std::vector<ColumnFamilyDescriptor> dummy; @@ -7286,7 +7337,7 @@ ReactiveVersionSet::ReactiveVersionSet( write_buffer_manager, write_controller, /*block_cache_tracer=*/nullptr, io_tracer, /*db_id*/ "", /*db_session_id*/ "", /*daily_offpeak_time_utc*/ "", - /*error_handler=*/nullptr) {} + /*error_handler=*/nullptr, /*read_only=*/true) {} ReactiveVersionSet::~ReactiveVersionSet() = default; diff --git a/db/version_set.h b/db/version_set.h index 5ccb69771..d99edfd6c 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -1160,13 +1160,15 @@ class VersionSet { const std::shared_ptr<IOTracer>& io_tracer, const std::string& db_id, const std::string& db_session_id, const std::string& daily_offpeak_time_utc, - ErrorHandler* const error_handler); + ErrorHandler* const error_handler, const bool read_only); // No copying allowed VersionSet(const VersionSet&) = delete; void operator=(const VersionSet&) = delete; virtual ~VersionSet(); + virtual Status Close(FSDirectory* db_dir, InstrumentedMutex* mu); + Status LogAndApplyToDefaultColumnFamily( const ReadOptions& read_options, VersionEdit* edit, InstrumentedMutex* mu, FSDirectory* dir_contains_current_file, bool new_descriptor_log = false, @@ -1693,6 +1695,9 @@ class VersionSet { Status LogAndApplyHelper(ColumnFamilyData* cfd, VersionBuilder* b, VersionEdit* edit, SequenceNumber* max_last_sequence, InstrumentedMutex* mu); + + const bool read_only_; + bool closed_; }; // ReactiveVersionSet represents a collection of versions of the column @@ -1710,6 +1715,10 @@ class ReactiveVersionSet : public VersionSet { ~ReactiveVersionSet() override; + Status Close(FSDirectory* /*db_dir*/, InstrumentedMutex* /*mu*/) override { + return Status::OK(); + } + Status ReadAndApply( InstrumentedMutex* mu, std::unique_ptr<log::FragmentBufferedReader>* manifest_reader, diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 5def22925..390d355e4 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -1255,7 +1255,7 @@ class VersionSetTestBase { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); reactive_versions_ = std::make_shared<ReactiveVersionSet>( dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, nullptr); @@ -1354,16 +1354,31 @@ class VersionSetTestBase { versions_->GetColumnFamilySet()->NumberOfColumnFamilies()); } + void CloseDB() { + mutex_.Lock(); + versions_->Close(nullptr, &mutex_).PermitUncheckedError(); + versions_.reset(); + mutex_.Unlock(); + } + void ReopenDB() { versions_.reset(new VersionSet( dbname_, &db_options_, env_options_, table_cache_.get(), &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); EXPECT_OK(versions_->Recover(column_families_, false)); } + void GetManifestPath(std::string* manifest_path) const { + assert(manifest_path != nullptr); + uint64_t manifest_file_number = 0; + Status s = versions_->GetCurrentManifestPath( + dbname_, fs_.get(), manifest_path, &manifest_file_number); + ASSERT_OK(s); + } + void VerifyManifest(std::string* manifest_path) const { assert(manifest_path != nullptr); uint64_t manifest_file_number = 0; @@ -1873,7 +1888,7 @@ TEST_F(VersionSetTest, WalAddition) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, /*read_only=*/false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -1941,7 +1956,7 @@ TEST_F(VersionSetTest, WalCloseWithoutSync) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 2); @@ -1995,7 +2010,7 @@ TEST_F(VersionSetTest, WalDeletion) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2034,7 +2049,7 @@ TEST_F(VersionSetTest, WalDeletion) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2155,7 +2170,7 @@ TEST_F(VersionSetTest, DeleteWalsBeforeNonExistingWalNumber) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 1); @@ -2192,7 +2207,7 @@ TEST_F(VersionSetTest, DeleteAllWals) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(new_versions->Recover(column_families_, false)); const auto& wals = new_versions->GetWalSet().GetWals(); ASSERT_EQ(wals.size(), 0); @@ -2235,7 +2250,7 @@ TEST_F(VersionSetTest, AtomicGroupWithWalEdits) { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); std::string db_id; ASSERT_OK( new_versions->Recover(column_families_, /*read_only=*/false, &db_id)); @@ -2343,6 +2358,28 @@ TEST_F(VersionSetTest, OffpeakTimeInfoTest) { versions_->offpeak_time_option().GetOffpeakTimeInfo(now).is_now_offpeak); } +TEST_F(VersionSetTest, ManifestTruncateAfterClose) { + std::string manifest_path; + VersionEdit edit; + + NewDB(); + ASSERT_OK(LogAndApplyToDefaultCF(edit)); + SyncPoint::GetInstance()->SetCallBack( + "VersionSet::Close:AfterClose", [&](void*) { + GetManifestPath(&manifest_path); + std::unique_ptr<WritableFile> manifest_file; + EXPECT_OK(env_->ReopenWritableFile(manifest_path, &manifest_file, + EnvOptions())); + EXPECT_OK(manifest_file->Truncate(0)); + EXPECT_OK(manifest_file->Close()); + }); + SyncPoint::GetInstance()->EnableProcessing(); + CloseDB(); + SyncPoint::GetInstance()->DisableProcessing(); + + ReopenDB(); +} + TEST_F(VersionStorageInfoTest, AddRangeDeletionCompensatedFileSize) { // Tests that compensated range deletion size is added to compensated file // size. @@ -2394,7 +2431,7 @@ class VersionSetWithTimestampTest : public VersionSetTest { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); ASSERT_OK(vset->Recover(column_families_, /*read_only=*/false, /*db_id=*/nullptr)); for (auto* cfd : *(vset->GetColumnFamilySet())) { diff --git a/db/version_util.h b/db/version_util.h index acb27749b..ca2e7a377 100644 --- a/db/version_util.h +++ b/db/version_util.h @@ -27,7 +27,8 @@ class OfflineManifestWriter { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", options.daily_offpeak_time_utc, - /*error_handler=*/nullptr) {} + /*error_handler=*/nullptr, + /*read_only=*/false) {} Status Recover(const std::vector<ColumnFamilyDescriptor>& column_families) { return versions_.Recover(column_families, /*read_only*/ false, diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc index 0ead57ae8..3f47c2901 100644 --- a/db/wal_manager_test.cc +++ b/db/wal_manager_test.cc @@ -55,7 +55,7 @@ class WalManagerTest : public testing::Test { &write_buffer_manager_, &write_controller_, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", /*daily_offpeak_time_utc=*/"", - /*error_handler=*/nullptr)); + /*error_handler=*/nullptr, /*read_only=*/false)); wal_manager_.reset( new WalManager(db_options_, env_options_, nullptr /*IOTracer*/)); diff --git a/include/rocksdb/listener.h b/include/rocksdb/listener.h index 2cc30d871..452ae54cd 100644 --- a/include/rocksdb/listener.h +++ b/include/rocksdb/listener.h @@ -251,7 +251,8 @@ enum class FileOperationType { kRangeSync, kAppend, kPositionedAppend, - kOpen + kOpen, + kVerify }; struct FileOperationInfo { diff --git a/tools/ldb_cmd.cc b/tools/ldb_cmd.cc index 4b57c84f1..81e946bbc 100644 --- a/tools/ldb_cmd.cc +++ b/tools/ldb_cmd.cc @@ -1370,7 +1370,7 @@ void DumpManifestFile(Options options, std::string file, bool verbose, bool hex, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", options.daily_offpeak_time_utc, - /*error_handler=*/nullptr); + /*error_handler=*/nullptr, /*read_only=*/true); Status s = versions.DumpManifest(options, file, verbose, hex, json, cf_descs); if (!s.ok()) { fprintf(stderr, "Error in processing file %s %s\n", file.c_str(), @@ -1514,7 +1514,7 @@ Status GetLiveFilesChecksumInfoFromVersionSet(Options options, /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", options.daily_offpeak_time_utc, - /*error_handler=*/nullptr); + /*error_handler=*/nullptr, /*read_only=*/true); std::vector<std::string> cf_name_list; s = versions.ListColumnFamilies(&cf_name_list, db_path, immutable_db_options.fs.get()); @@ -2348,7 +2348,7 @@ Status ReduceDBLevelsCommand::GetOldNumOfLevels(Options& opt, int* levels) { /*block_cache_tracer=*/nullptr, /*io_tracer=*/nullptr, /*db_id=*/"", /*db_session_id=*/"", opt.daily_offpeak_time_utc, - /*error_handler=*/nullptr); + /*error_handler=*/nullptr, /*read_only=*/true); std::vector<ColumnFamilyDescriptor> dummy; ColumnFamilyDescriptor dummy_descriptor(kDefaultColumnFamilyName, ColumnFamilyOptions(opt)); diff --git a/tools/ldb_cmd_test.cc b/tools/ldb_cmd_test.cc index 145a738eb..af8d58742 100644 --- a/tools/ldb_cmd_test.cc +++ b/tools/ldb_cmd_test.cc @@ -209,7 +209,8 @@ class FileChecksumTestHelper { ImmutableDBOptions immutable_db_options(options_); VersionSet versions(dbname_, &immutable_db_options, sopt, tc.get(), &wb, &wc, nullptr, nullptr, "", "", - options_.daily_offpeak_time_utc, nullptr); + options_.daily_offpeak_time_utc, nullptr, + /*read_only=*/false); std::vector<std::string> cf_name_list; Status s; s = versions.ListColumnFamilies(&cf_name_list, dbname_, |