summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authormrambacher <mrambach@gmail.com>2021-03-15 04:32:24 -0700
committerFacebook GitHub Bot <facebook-github-bot@users.noreply.github.com>2021-03-15 04:34:11 -0700
commit3dff28cf9b12d5bb2ac8bf339c2f635d57c77a1c (patch)
tree0b462116b1d9fe0208bd093a621f40404648b322
parentb8f40f7f7beafb70195b1600011e704d67f592c3 (diff)
Use SystemClock* instead of std::shared_ptr<SystemClock> in lower level routines (#8033)
Summary: For performance purposes, the lower level routines were changed to use a SystemClock* instead of a std::shared_ptr<SystemClock>. The shared ptr has some performance degradation on certain hardware classes. For most of the system, there is no risk of the pointer being deleted/invalid because the shared_ptr will be stored elsewhere. For example, the ImmutableDBOptions stores the Env which has a std::shared_ptr<SystemClock> in it. The SystemClock* within the ImmutableDBOptions is essentially a "short cut" to gain access to this constant resource. There were a few classes (PeriodicWorkScheduler?) where the "short cut" property did not hold. In those cases, the shared pointer was preserved. Using db_bench readrandom perf_level=3 on my EC2 box, this change performed as well or better than 6.17: 6.17: readrandom : 28.046 micros/op 854902 ops/sec; 61.3 MB/s (355999 of 355999 found) 6.18: readrandom : 32.615 micros/op 735306 ops/sec; 52.7 MB/s (290999 of 290999 found) PR: readrandom : 27.500 micros/op 871909 ops/sec; 62.5 MB/s (367999 of 367999 found) (Note that the times for 6.18 are prior to revert of the SystemClock). Pull Request resolved: https://github.com/facebook/rocksdb/pull/8033 Reviewed By: pdillinger Differential Revision: D27014563 Pulled By: mrambacher fbshipit-source-id: ad0459eba03182e454391b5926bf5cdd45657b67
-rw-r--r--db/blob/blob_file_builder.cc25
-rw-r--r--db/blob/blob_file_builder.h5
-rw-r--r--db/blob/blob_file_builder_test.cc81
-rw-r--r--db/blob/blob_file_cache_test.cc8
-rw-r--r--db/blob/blob_file_reader.cc3
-rw-r--r--db/blob/blob_file_reader_test.cc16
-rw-r--r--db/blob/blob_log_sequential_reader.cc4
-rw-r--r--db/blob/blob_log_sequential_reader.h5
-rw-r--r--db/blob/blob_log_writer.cc6
-rw-r--r--db/blob/blob_log_writer.h5
-rw-r--r--db/builder.cc14
-rw-r--r--db/column_family.cc4
-rw-r--r--db/compaction/compaction_iterator.cc2
-rw-r--r--db/compaction/compaction_iterator.h2
-rw-r--r--db/compaction/compaction_job.cc21
-rw-r--r--db/compaction/compaction_job.h1
-rw-r--r--db/compaction/compaction_picker_fifo.cc2
-rw-r--r--db/db_impl/db_impl.cc57
-rw-r--r--db/db_impl/db_impl.h2
-rw-r--r--db/db_impl/db_impl_compaction_flush.cc7
-rw-r--r--db/db_impl/db_impl_files.cc2
-rw-r--r--db/db_impl/db_impl_open.cc20
-rw-r--r--db/db_impl/db_impl_secondary.cc4
-rw-r--r--db/db_impl/db_impl_write.cc24
-rw-r--r--db/db_iter.cc2
-rw-r--r--db/db_iter.h2
-rw-r--r--db/error_handler.cc2
-rw-r--r--db/external_sst_file_ingestion_job.h10
-rw-r--r--db/flush_job.cc2
-rw-r--r--db/flush_job.h2
-rw-r--r--db/import_column_family_job.h7
-rw-r--r--db/internal_stats.h8
-rw-r--r--db/memtable.cc4
-rw-r--r--db/memtable.h2
-rw-r--r--db/merge_helper.cc14
-rw-r--r--db/merge_helper.h14
-rw-r--r--db/perf_context_test.cc20
-rw-r--r--db/periodic_work_scheduler.cc4
-rw-r--r--db/prefix_test.cc7
-rw-r--r--db/range_del_aggregator_bench.cc6
-rw-r--r--db/repair.cc3
-rw-r--r--db/table_cache.cc12
-rw-r--r--db/version_set.cc12
-rw-r--r--db/version_set.h4
-rw-r--r--db/version_set_test.cc2
-rw-r--r--db/wal_manager.cc5
-rw-r--r--db/wal_manager_test.cc1
-rw-r--r--db/write_batch.cc3
-rw-r--r--db/write_controller.cc6
-rw-r--r--db/write_controller.h5
-rw-r--r--db/write_controller_test.cc48
-rw-r--r--db_stress_tool/db_stress_driver.cc21
-rw-r--r--db_stress_tool/db_stress_stat.h8
-rw-r--r--db_stress_tool/db_stress_test_base.cc34
-rw-r--r--db_stress_tool/db_stress_test_base.h2
-rw-r--r--env/env_encryption.cc3
-rw-r--r--env/env_test.cc2
-rw-r--r--env/file_system_tracer.h20
-rw-r--r--env/fs_posix.cc4
-rw-r--r--env/mock_env.cc50
-rw-r--r--env/mock_env.h6
-rw-r--r--file/delete_scheduler.cc5
-rw-r--r--file/delete_scheduler.h4
-rw-r--r--file/file_util.h5
-rw-r--r--file/filename.cc6
-rw-r--r--file/filename.h3
-rw-r--r--file/random_access_file_reader.cc4
-rw-r--r--file/random_access_file_reader.h4
-rw-r--r--file/random_access_file_reader_test.cc2
-rw-r--r--file/sst_file_manager_impl.cc5
-rw-r--r--file/writable_file_writer.h5
-rw-r--r--include/rocksdb/metadata.h6
-rw-r--r--logging/env_logger.h12
-rw-r--r--memtable/memtablerep_bench.cc2
-rw-r--r--monitoring/instrumented_mutex.cc5
-rw-r--r--monitoring/instrumented_mutex.h10
-rw-r--r--monitoring/iostats_context_imp.h2
-rw-r--r--monitoring/perf_context_imp.h6
-rw-r--r--options/cf_options.cc1
-rw-r--r--options/cf_options.h2
-rw-r--r--options/db_options.cc6
-rw-r--r--options/db_options.h2
-rw-r--r--port/win/env_win.cc2
-rw-r--r--port/win/win_logger.cc3
-rw-r--r--port/win/win_logger.h5
-rw-r--r--table/block_based/block_based_table_builder.cc4
-rw-r--r--table/block_based/block_based_table_reader.cc11
-rw-r--r--table/block_based/block_based_table_reader.h3
-rw-r--r--table/block_based/block_based_table_reader_test.cc4
-rw-r--r--table/block_fetcher_test.cc4
-rw-r--r--table/format.cc5
-rw-r--r--table/get_context.cc9
-rw-r--r--table/get_context.h8
-rw-r--r--table/sst_file_writer.cc5
-rw-r--r--table/table_reader_bench.cc5
-rw-r--r--table/table_test.cc3
-rw-r--r--tools/block_cache_analyzer/block_cache_trace_analyzer.cc8
-rw-r--r--tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc8
-rw-r--r--tools/db_bench_tool.cc36
-rw-r--r--trace_replay/block_cache_tracer.cc10
-rw-r--r--trace_replay/block_cache_tracer.h9
-rw-r--r--trace_replay/block_cache_tracer_test.cc20
-rw-r--r--trace_replay/io_tracer.cc4
-rw-r--r--trace_replay/io_tracer.h8
-rw-r--r--trace_replay/io_tracer_test.cc14
-rw-r--r--trace_replay/trace_replay.cc3
-rw-r--r--trace_replay/trace_replay.h5
-rw-r--r--util/dynamic_bloom_test.cc8
-rw-r--r--util/filter_bench.cc4
-rw-r--r--util/rate_limiter.cc12
-rw-r--r--util/rate_limiter.h4
-rw-r--r--util/repeatable_thread.h8
-rw-r--r--util/repeatable_thread_test.cc4
-rw-r--r--util/ribbon_test.cc6
-rw-r--r--util/stop_watch.h9
-rw-r--r--util/timer.h4
-rw-r--r--util/timer_test.cc22
-rw-r--r--utilities/blob_db/blob_compaction_filter.cc9
-rw-r--r--utilities/blob_db/blob_compaction_filter.h21
-rw-r--r--utilities/blob_db/blob_db_impl.cc6
-rw-r--r--utilities/blob_db/blob_db_impl.h2
-rw-r--r--utilities/blob_db/blob_db_iterator.h4
-rw-r--r--utilities/persistent_cache/block_cache_tier_file.cc2
-rw-r--r--utilities/persistent_cache/persistent_cache_bench.cc6
-rw-r--r--utilities/persistent_cache/persistent_cache_tier.h5
-rw-r--r--utilities/transactions/pessimistic_transaction.cc2
-rw-r--r--utilities/transactions/transaction_base.cc6
-rw-r--r--utilities/ttl/db_ttl_impl.cc38
-rw-r--r--utilities/ttl/db_ttl_impl.h39
-rw-r--r--utilities/write_batch_with_index/write_batch_with_index_internal.cc10
130 files changed, 621 insertions, 580 deletions
diff --git a/db/blob/blob_file_builder.cc b/db/blob/blob_file_builder.cc
index 0e023bb77..6f085feff 100644
--- a/db/blob/blob_file_builder.cc
+++ b/db/blob/blob_file_builder.cc
@@ -27,7 +27,7 @@
namespace ROCKSDB_NAMESPACE {
BlobFileBuilder::BlobFileBuilder(
- VersionSet* versions, Env* env, FileSystem* fs,
+ VersionSet* versions, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
@@ -36,14 +36,14 @@ BlobFileBuilder::BlobFileBuilder(
const std::shared_ptr<IOTracer>& io_tracer,
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions)
- : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, env,
- fs, immutable_cf_options, mutable_cf_options,
- file_options, job_id, column_family_id,
- column_family_name, io_priority, write_hint, io_tracer,
- blob_file_paths, blob_file_additions) {}
+ : BlobFileBuilder([versions]() { return versions->NewFileNumber(); }, fs,
+ immutable_cf_options, mutable_cf_options, file_options,
+ job_id, column_family_id, column_family_name, io_priority,
+ write_hint, io_tracer, blob_file_paths,
+ blob_file_additions) {}
BlobFileBuilder::BlobFileBuilder(
- std::function<uint64_t()> file_number_generator, Env* env, FileSystem* fs,
+ std::function<uint64_t()> file_number_generator, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options, const FileOptions* file_options,
int job_id, uint32_t column_family_id,
@@ -70,7 +70,6 @@ BlobFileBuilder::BlobFileBuilder(
blob_count_(0),
blob_bytes_(0) {
assert(file_number_generator_);
- assert(env);
assert(fs_);
assert(immutable_cf_options_);
assert(file_options_);
@@ -78,7 +77,6 @@ BlobFileBuilder::BlobFileBuilder(
assert(blob_file_paths_->empty());
assert(blob_file_additions_);
assert(blob_file_additions_->empty());
- clock_ = env->GetSystemClock();
}
BlobFileBuilder::~BlobFileBuilder() = default;
@@ -185,16 +183,17 @@ Status BlobFileBuilder::OpenBlobFileIfNeeded() {
FileTypeSet tmp_set = immutable_cf_options_->checksum_handoff_file_types;
Statistics* const statistics = immutable_cf_options_->statistics;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(file), blob_file_paths_->back(), *file_options_, clock_,
- io_tracer_, statistics, immutable_cf_options_->listeners,
+ std::move(file), blob_file_paths_->back(), *file_options_,
+ immutable_cf_options_->clock, io_tracer_, statistics,
+ immutable_cf_options_->listeners,
immutable_cf_options_->file_checksum_gen_factory,
tmp_set.Contains(FileType::kBlobFile)));
constexpr bool do_flush = false;
std::unique_ptr<BlobLogWriter> blob_log_writer(new BlobLogWriter(
- std::move(file_writer), clock_, statistics, blob_file_number,
- immutable_cf_options_->use_fsync, do_flush));
+ std::move(file_writer), immutable_cf_options_->clock, statistics,
+ blob_file_number, immutable_cf_options_->use_fsync, do_flush));
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
diff --git a/db/blob/blob_file_builder.h b/db/blob/blob_file_builder.h
index 70a268c4f..7ee7064e8 100644
--- a/db/blob/blob_file_builder.h
+++ b/db/blob/blob_file_builder.h
@@ -30,7 +30,7 @@ class IOTracer;
class BlobFileBuilder {
public:
- BlobFileBuilder(VersionSet* versions, Env* env, FileSystem* fs,
+ BlobFileBuilder(VersionSet* versions, FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
const FileOptions* file_options, int job_id,
@@ -42,7 +42,7 @@ class BlobFileBuilder {
std::vector<std::string>* blob_file_paths,
std::vector<BlobFileAddition>* blob_file_additions);
- BlobFileBuilder(std::function<uint64_t()> file_number_generator, Env* env,
+ BlobFileBuilder(std::function<uint64_t()> file_number_generator,
FileSystem* fs,
const ImmutableCFOptions* immutable_cf_options,
const MutableCFOptions* mutable_cf_options,
@@ -74,7 +74,6 @@ class BlobFileBuilder {
std::function<uint64_t()> file_number_generator_;
FileSystem* fs_;
- std::shared_ptr<SystemClock> clock_;
const ImmutableCFOptions* immutable_cf_options_;
uint64_t min_blob_size_;
uint64_t blob_file_size_;
diff --git a/db/blob/blob_file_builder_test.cc b/db/blob/blob_file_builder_test.cc
index 06f556aaa..11af00e5d 100644
--- a/db/blob/blob_file_builder_test.cc
+++ b/db/blob/blob_file_builder_test.cc
@@ -41,7 +41,7 @@ class BlobFileBuilderTest : public testing::Test {
protected:
BlobFileBuilderTest() : mock_env_(Env::Default()) {
fs_ = mock_env_.GetFileSystem().get();
- clock_ = mock_env_.GetSystemClock();
+ clock_ = mock_env_.GetSystemClock().get();
}
void VerifyBlobFile(uint64_t blob_file_number,
@@ -110,7 +110,7 @@ class BlobFileBuilderTest : public testing::Test {
MockEnv mock_env_;
FileSystem* fs_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
FileOptions file_options_;
};
@@ -127,6 +127,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
"BlobFileBuilderTest_BuildAndCheckOneFile"),
0);
options.enable_blob_files = true;
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -140,11 +141,11 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckOneFile) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@@ -210,6 +211,7 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
0);
options.enable_blob_files = true;
options.blob_file_size = value_size;
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -223,11 +225,11 @@ TEST_F(BlobFileBuilderTest, BuildAndCheckMultipleFiles) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
std::vector<std::pair<std::string, std::string>> expected_key_value_pairs(
number_of_blobs);
@@ -295,6 +297,7 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
0);
options.enable_blob_files = true;
options.min_blob_size = 1024;
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -308,11 +311,11 @@ TEST_F(BlobFileBuilderTest, InlinedValues) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
for (size_t i = 0; i < number_of_blobs; ++i) {
const std::string key = std::to_string(i);
@@ -347,6 +350,7 @@ TEST_F(BlobFileBuilderTest, Compression) {
test::PerThreadDBPath(&mock_env_, "BlobFileBuilderTest_Compression"), 0);
options.enable_blob_files = true;
options.blob_compression_type = kSnappyCompression;
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -360,11 +364,11 @@ TEST_F(BlobFileBuilderTest, Compression) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
const std::string key("1");
const std::string uncompressed_value(value_size, 'x');
@@ -429,7 +433,7 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
0);
options.enable_blob_files = true;
options.blob_compression_type = kSnappyCompression;
-
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -442,11 +446,11 @@ TEST_F(BlobFileBuilderTest, CompressionError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack("CompressData:TamperWithReturnValue",
[](void* arg) {
@@ -506,6 +510,7 @@ TEST_F(BlobFileBuilderTest, Checksum) {
options.enable_blob_files = true;
options.file_checksum_gen_factory =
std::make_shared<DummyFileChecksumGenFactory>();
+ options.env = &mock_env_;
ImmutableCFOptions immutable_cf_options(options);
MutableCFOptions mutable_cf_options(options);
@@ -519,11 +524,11 @@ TEST_F(BlobFileBuilderTest, Checksum) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
const std::string key("1");
const std::string value("deadbeef");
@@ -615,11 +620,11 @@ TEST_P(BlobFileBuilderIOErrorTest, IOError) {
std::vector<std::string> blob_file_paths;
std::vector<BlobFileAddition> blob_file_additions;
- BlobFileBuilder builder(
- TestFileNumberGenerator(), &mock_env_, fs_, &immutable_cf_options,
- &mutable_cf_options, &file_options_, job_id, column_family_id,
- column_family_name, io_priority, write_hint, nullptr /*IOTracer*/,
- &blob_file_paths, &blob_file_additions);
+ BlobFileBuilder builder(TestFileNumberGenerator(), fs_, &immutable_cf_options,
+ &mutable_cf_options, &file_options_, job_id,
+ column_family_id, column_family_name, io_priority,
+ write_hint, nullptr /*IOTracer*/, &blob_file_paths,
+ &blob_file_additions);
SyncPoint::GetInstance()->SetCallBack(sync_point_, [this](void* arg) {
Status* const s = static_cast<Status*>(arg);
diff --git a/db/blob/blob_file_cache_test.cc b/db/blob/blob_file_cache_test.cc
index 15378ca90..bcb6d048a 100644
--- a/db/blob/blob_file_cache_test.cc
+++ b/db/blob/blob_file_cache_test.cc
@@ -42,15 +42,15 @@ void WriteBlobFile(uint32_t column_family_id,
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
- immutable_cf_options.env->GetSystemClock()));
+ immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
- BlobLogWriter blob_log_writer(
- std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
- statistics, blob_file_number, use_fsync, do_flush);
+ BlobLogWriter blob_log_writer(std::move(file_writer),
+ immutable_cf_options.clock, statistics,
+ blob_file_number, use_fsync, do_flush);
constexpr bool has_ttl = false;
constexpr ExpirationRange expiration_range;
diff --git a/db/blob/blob_file_reader.cc b/db/blob/blob_file_reader.cc
index f1414e8cb..6e1f78d5a 100644
--- a/db/blob/blob_file_reader.cc
+++ b/db/blob/blob_file_reader.cc
@@ -118,8 +118,7 @@ Status BlobFileReader::OpenFile(
}
file_reader->reset(new RandomAccessFileReader(
- std::move(file), blob_file_path,
- immutable_cf_options.env->GetSystemClock(), io_tracer,
+ std::move(file), blob_file_path, immutable_cf_options.clock, io_tracer,
immutable_cf_options.statistics, BLOB_DB_BLOB_FILE_READ_MICROS,
blob_file_read_hist, immutable_cf_options.rate_limiter,
immutable_cf_options.listeners));
diff --git a/db/blob/blob_file_reader_test.cc b/db/blob/blob_file_reader_test.cc
index 04fea58d6..058e6c4e5 100644
--- a/db/blob/blob_file_reader_test.cc
+++ b/db/blob/blob_file_reader_test.cc
@@ -50,15 +50,15 @@ void WriteBlobFile(const ImmutableCFOptions& immutable_cf_options,
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
- immutable_cf_options.env->GetSystemClock()));
+ immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
- BlobLogWriter blob_log_writer(
- std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
- statistics, blob_file_number, use_fsync, do_flush);
+ BlobLogWriter blob_log_writer(std::move(file_writer),
+ immutable_cf_options.clock, statistics,
+ blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, compression_type, has_ttl,
expiration_range_header);
@@ -280,15 +280,15 @@ TEST_F(BlobFileReaderTest, Malformed) {
std::unique_ptr<WritableFileWriter> file_writer(
new WritableFileWriter(std::move(file), blob_file_path, FileOptions(),
- immutable_cf_options.env->GetSystemClock()));
+ immutable_cf_options.clock));
constexpr Statistics* statistics = nullptr;
constexpr bool use_fsync = false;
constexpr bool do_flush = false;
- BlobLogWriter blob_log_writer(
- std::move(file_writer), immutable_cf_options.env->GetSystemClock(),
- statistics, blob_file_number, use_fsync, do_flush);
+ BlobLogWriter blob_log_writer(std::move(file_writer),
+ immutable_cf_options.clock, statistics,
+ blob_file_number, use_fsync, do_flush);
BlobLogHeader header(column_family_id, kNoCompression, has_ttl,
expiration_range);
diff --git a/db/blob/blob_log_sequential_reader.cc b/db/blob/blob_log_sequential_reader.cc
index cb328ee56..448b3b6f7 100644
--- a/db/blob/blob_log_sequential_reader.cc
+++ b/db/blob/blob_log_sequential_reader.cc
@@ -13,8 +13,8 @@
namespace ROCKSDB_NAMESPACE {
BlobLogSequentialReader::BlobLogSequentialReader(
- std::unique_ptr<RandomAccessFileReader>&& file_reader,
- const std::shared_ptr<SystemClock>& clock, Statistics* statistics)
+ std::unique_ptr<RandomAccessFileReader>&& file_reader, SystemClock* clock,
+ Statistics* statistics)
: file_(std::move(file_reader)),
clock_(clock),
statistics_(statistics),
diff --git a/db/blob/blob_log_sequential_reader.h b/db/blob/blob_log_sequential_reader.h
index f7aada62d..f8e1c02bd 100644
--- a/db/blob/blob_log_sequential_reader.h
+++ b/db/blob/blob_log_sequential_reader.h
@@ -36,8 +36,7 @@ class BlobLogSequentialReader {
// Create a reader that will return log records from "*file_reader".
BlobLogSequentialReader(std::unique_ptr<RandomAccessFileReader>&& file_reader,
- const std::shared_ptr<SystemClock>& clock,
- Statistics* statistics);
+ SystemClock* clock, Statistics* statistics);
// No copying allowed
BlobLogSequentialReader(const BlobLogSequentialReader&) = delete;
@@ -65,7 +64,7 @@ class BlobLogSequentialReader {
Status ReadSlice(uint64_t size, Slice* slice, char* buf);
const std::unique_ptr<RandomAccessFileReader> file_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Statistics* statistics_;
diff --git a/db/blob/blob_log_writer.cc b/db/blob/blob_log_writer.cc
index ce31bafdb..2dabc98e8 100644
--- a/db/blob/blob_log_writer.cc
+++ b/db/blob/blob_log_writer.cc
@@ -19,9 +19,9 @@
namespace ROCKSDB_NAMESPACE {
BlobLogWriter::BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
- const std::shared_ptr<SystemClock>& clock,
- Statistics* statistics, uint64_t log_number,
- bool use_fs, bool do_flush, uint64_t boffset)
+ SystemClock* clock, Statistics* statistics,
+ uint64_t log_number, bool use_fs, bool do_flush,
+ uint64_t boffset)
: dest_(std::move(dest)),
clock_(clock),
statistics_(statistics),
diff --git a/db/blob/blob_log_writer.h b/db/blob/blob_log_writer.h
index 17d4e06f0..c1f9f31ad 100644
--- a/db/blob/blob_log_writer.h
+++ b/db/blob/blob_log_writer.h
@@ -31,8 +31,7 @@ class BlobLogWriter {
// Create a writer that will append data to "*dest".
// "*dest" must be initially empty.
// "*dest" must remain live while this BlobLogWriter is in use.
- BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest,
- const std::shared_ptr<SystemClock>& clock,
+ BlobLogWriter(std::unique_ptr<WritableFileWriter>&& dest, SystemClock* clock,
Statistics* statistics, uint64_t log_number, bool use_fsync,
bool do_flush, uint64_t boffset = 0);
// No copying allowed
@@ -69,7 +68,7 @@ class BlobLogWriter {
private:
std::unique_ptr<WritableFileWriter> dest_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Statistics* statistics_;
uint64_t log_number_;
uint64_t block_offset_; // Current offset in block
diff --git a/db/builder.cc b/db/builder.cc
index 3065aac04..3a713432b 100644
--- a/db/builder.cc
+++ b/db/builder.cc
@@ -125,7 +125,6 @@ Status BuildTable(
assert(env);
FileSystem* fs = db_options.fs.get();
assert(fs);
- const auto& clock = env->GetSystemClock();
TableProperties tp;
if (iter->Valid() || !range_del_agg->IsEmpty()) {
@@ -154,7 +153,7 @@ Status BuildTable(
file->SetIOPriority(io_priority);
file->SetWriteLifeTimeHint(write_hint);
file_writer.reset(new WritableFileWriter(
- std::move(file), fname, file_options, clock, io_tracer,
+ std::move(file), fname, file_options, ioptions.clock, io_tracer,
ioptions.statistics, ioptions.listeners,
ioptions.file_checksum_gen_factory,
tmp_set.Contains(FileType::kTableFile)));
@@ -176,11 +175,10 @@ Status BuildTable(
std::unique_ptr<BlobFileBuilder> blob_file_builder(
(mutable_cf_options.enable_blob_files && blob_file_additions)
- ? new BlobFileBuilder(versions, env, fs, &ioptions,
- &mutable_cf_options, &file_options, job_id,
- column_family_id, column_family_name,
- io_priority, write_hint, io_tracer,
- &blob_file_paths, blob_file_additions)
+ ? new BlobFileBuilder(
+ versions, fs, &ioptions, &mutable_cf_options, &file_options,
+ job_id, column_family_id, column_family_name, io_priority,
+ write_hint, io_tracer, &blob_file_paths, blob_file_additions)
: nullptr);
CompactionIterator c_iter(
@@ -258,7 +256,7 @@ Status BuildTable(
// Finish and check for file errors
TEST_SYNC_POINT("BuildTable:BeforeSyncTable");
if (s.ok() && !empty) {
- StopWatch sw(clock, ioptions.statistics, TABLE_SYNC_MICROS);
+ StopWatch sw(ioptions.clock, ioptions.statistics, TABLE_SYNC_MICROS);
*io_status = file_writer->Sync(ioptions.use_fsync);
}
TEST_SYNC_POINT("BuildTable:BeforeCloseTableFile");
diff --git a/db/column_family.cc b/db/column_family.cc
index 2f3613144..80c3c9817 100644
--- a/db/column_family.cc
+++ b/db/column_family.cc
@@ -557,8 +557,8 @@ ColumnFamilyData::ColumnFamilyData(
// if _dummy_versions is nullptr, then this is a dummy column family.
if (_dummy_versions != nullptr) {
- internal_stats_.reset(new InternalStats(
- ioptions_.num_levels, db_options.env->GetSystemClock(), this));
+ internal_stats_.reset(
+ new InternalStats(ioptions_.num_levels, ioptions_.clock, this));
table_cache_.reset(new TableCache(ioptions_, file_options, _table_cache,
block_cache_tracer, io_tracer));
blob_file_cache_.reset(
diff --git a/db/compaction/compaction_iterator.cc b/db/compaction/compaction_iterator.cc
index ef50e1365..1e8ffd1f6 100644
--- a/db/compaction/compaction_iterator.cc
+++ b/db/compaction/compaction_iterator.cc
@@ -80,7 +80,7 @@ CompactionIterator::CompactionIterator(
earliest_write_conflict_snapshot_(earliest_write_conflict_snapshot),
snapshot_checker_(snapshot_checker),
env_(env),
- clock_(env_->GetSystemClock()),
+ clock_(env_->GetSystemClock().get()),
report_detailed_time_(report_detailed_time),
expect_valid_internal_key_(expect_valid_internal_key),
range_del_agg_(range_del_agg),
diff --git a/db/compaction/compaction_iterator.h b/db/compaction/compaction_iterator.h
index a3073ee2e..cce2afa2f 100644
--- a/db/compaction/compaction_iterator.h
+++ b/db/compaction/compaction_iterator.h
@@ -248,7 +248,7 @@ class CompactionIterator {
const SequenceNumber earliest_write_conflict_snapshot_;
const SnapshotChecker* const snapshot_checker_;
Env* env_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
bool report_detailed_time_;
bool expect_valid_internal_key_;
CompactionRangeDelAggregator* range_del_agg_;
diff --git a/db/compaction/compaction_job.cc b/db/compaction/compaction_job.cc
index cbfc996f2..df9e029f9 100644
--- a/db/compaction/compaction_job.cc
+++ b/db/compaction/compaction_job.cc
@@ -321,7 +321,6 @@ CompactionJob::CompactionJob(
db_options_(db_options),
file_options_(file_options),
env_(db_options.env),
- clock_(env_->GetSystemClock()),
io_tracer_(io_tracer),
fs_(db_options.fs, io_tracer),
file_options_for_read_(
@@ -421,7 +420,7 @@ void CompactionJob::Prepare() {
if (c->ShouldFormSubcompactions()) {
{
- StopWatch sw(clock_, stats_, SUBCOMPACTION_SETUP_TIME);
+ StopWatch sw(db_options_.clock, stats_, SUBCOMPACTION_SETUP_TIME);
GenSubcompactionBoundaries();
}
assert(sizes_.size() == boundaries_.size() + 1);
@@ -587,7 +586,7 @@ Status CompactionJob::Run() {
const size_t num_threads = compact_->sub_compact_states.size();
assert(num_threads > 0);
- const uint64_t start_micros = clock_->NowMicros();
+ const uint64_t start_micros = db_options_.clock->NowMicros();
// Launch a thread for each of subcompactions 1...num_threads-1
std::vector<port::Thread> thread_pool;
@@ -606,7 +605,7 @@ Status CompactionJob::Run() {
thread.join();
}
- compaction_stats_.micros = clock_->NowMicros() - start_micros;
+ compaction_stats_.micros = db_options_.clock->NowMicros() - start_micros;
compaction_stats_.cpu_micros = 0;
for (size_t i = 0; i < compact_->sub_compact_states.size(); i++) {
compaction_stats_.cpu_micros +=
@@ -902,7 +901,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
assert(sub_compact);
assert(sub_compact->compaction);
- uint64_t prev_cpu_micros = clock_->CPUNanos() / 1000;
+ uint64_t prev_cpu_micros = db_options_.clock->CPUNanos() / 1000;
ColumnFamilyData* cfd = sub_compact->compaction->column_family_data();
@@ -980,7 +979,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
std::unique_ptr<BlobFileBuilder> blob_file_builder(
mutable_cf_options->enable_blob_files
? new BlobFileBuilder(
- versions_, env_, fs_.get(),
+ versions_, fs_.get(),
sub_compact->compaction->immutable_cf_options(),
mutable_cf_options, &file_options_, job_id_, cfd->GetID(),
cfd->GetName(), Env::IOPriority::IO_LOW, write_hint_,
@@ -1196,7 +1195,7 @@ void CompactionJob::ProcessKeyValueCompaction(SubcompactionState* sub_compact) {
}
sub_compact->compaction_job_stats.cpu_micros =
- clock_->CPUNanos() / 1000 - prev_cpu_micros;
+ db_options_.clock->CPUNanos() / 1000 - prev_cpu_micros;
if (measure_io_stats_) {
sub_compact->compaction_job_stats.file_write_nanos +=
@@ -1475,7 +1474,7 @@ Status CompactionJob::FinishCompactionOutputFile(
// Finish and check for file errors
if (s.ok()) {
- StopWatch sw(clock_, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
+ StopWatch sw(db_options_.clock, stats_, COMPACTION_OUTFILE_SYNC_MICROS);
io_s = sub_compact->outfile->Sync(db_options_.use_fsync);
}
if (s.ok() && io_s.ok()) {
@@ -1714,7 +1713,7 @@ Status CompactionJob::OpenCompactionOutputFile(
// Try to figure out the output file's oldest ancester time.
int64_t temp_current_time = 0;
- auto get_time_status = env_->GetCurrentTime(&temp_current_time);
+ auto get_time_status = db_options_.clock->GetCurrentTime(&temp_current_time);
// Safe to proceed even if GetCurrentTime fails. So, log and proceed.
if (!get_time_status.ok()) {
ROCKS_LOG_WARN(db_options_.info_log,
@@ -1751,8 +1750,8 @@ Status CompactionJob::OpenCompactionOutputFile(
const auto& listeners =
sub_compact->compaction->immutable_cf_options()->listeners;
sub_compact->outfile.reset(new WritableFileWriter(
- std::move(writable_file), fname, file_options_, clock_, io_tracer_,
- db_options_.statistics.get(), listeners,
+ std::move(writable_file), fname, file_options_, db_options_.clock,
+ io_tracer_, db_options_.statistics.get(), listeners,
db_options_.file_checksum_gen_factory.get(),
tmp_set.Contains(FileType::kTableFile)));
diff --git a/db/compaction/compaction_job.h b/db/compaction/compaction_job.h
index 24449d45e..17937b541 100644
--- a/db/compaction/compaction_job.h
+++ b/db/compaction/compaction_job.h
@@ -160,7 +160,6 @@ class CompactionJob {
const FileOptions file_options_;
Env* env_;
- std::shared_ptr<SystemClock> clock_;
std::shared_ptr<IOTracer> io_tracer_;
FileSystemPtr fs_;
// env_option optimized for compaction table reads
diff --git a/db/compaction/compaction_picker_fifo.cc b/db/compaction/compaction_picker_fifo.cc
index 59213aec9..3e2129def 100644
--- a/db/compaction/compaction_picker_fifo.cc
+++ b/db/compaction/compaction_picker_fifo.cc
@@ -45,7 +45,7 @@ Compaction* FIFOCompactionPicker::PickTTLCompaction(
uint64_t total_size = GetTotalFilesSize(level_files);
int64_t _current_time;
- auto status = ioptions_.env->GetCurrentTime(&_current_time);
+ auto status = ioptions_.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
ROCKS_LOG_BUFFER(log_buffer,
"[%s] FIFO compaction: Couldn't get current time: %s. "
diff --git a/db/db_impl/db_impl.cc b/db/db_impl/db_impl.cc
index 1a5cdecd3..fc3cb1e65 100644
--- a/db/db_impl/db_impl.cc
+++ b/db/db_impl/db_impl.cc
@@ -151,13 +151,12 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
own_info_log_(options.info_log == nullptr),
initial_db_options_(SanitizeOptions(dbname, options)),
env_(initial_db_options_.env),
- clock_(initial_db_options_.env->GetSystemClock()),
io_tracer_(std::make_shared<IOTracer>()),
immutable_db_options_(initial_db_options_),
fs_(immutable_db_options_.fs, io_tracer_),
mutable_db_options_(initial_db_options_),
stats_(immutable_db_options_.statistics.get()),
- mutex_(stats_, clock_, DB_MUTEX_WAIT_MICROS,
+ mutex_(stats_, immutable_db_options_.clock, DB_MUTEX_WAIT_MICROS,
immutable_db_options_.use_adaptive_mutex),
default_cf_handle_(nullptr),
max_total_in_memory_state_(0),
@@ -192,7 +191,7 @@ DBImpl::DBImpl(const DBOptions& options, const std::string& dbname,
bg_purge_scheduled_(0),
disable_delete_obsolete_files_(0),
pending_purge_obsolete_files_(0),
- delete_obsolete_files_last_run_(clock_->NowMicros()),
+ delete_obsolete_files_last_run_(immutable_db_options_.clock->NowMicros()),
last_stats_dump_time_microsec_(0),
next_job_id_(1),
has_unpersisted_data_(false),
@@ -753,7 +752,8 @@ void DBImpl::PersistStats() {
return;
}
TEST_SYNC_POINT("DBImpl::PersistStats:StartRunning");
- uint64_t now_seconds = clock_->NowMicros() / kMicrosInSecond;
+ uint64_t now_seconds =
+ immutable_db_options_.clock->NowMicros() / kMicrosInSecond;
Statistics* statistics = immutable_db_options_.statistics.get();
if (!statistics) {
@@ -1654,8 +1654,8 @@ Status DBImpl::GetImpl(const ReadOptions& read_options, const Slice& key,
}
#endif // NDEBUG
- PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
- StopWatch sw(clock_, stats_, DB_GET);
+ PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
+ StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(
@@ -1845,8 +1845,8 @@ std::vector<Status> DBImpl::MultiGet(
const std::vector<ColumnFamilyHandle*>& column_family,
const std::vector<Slice>& keys, std::vector<std::string>* values,
std::vector<std::string>* timestamps) {
- PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
- StopWatch sw(clock_, stats_, DB_MULTIGET);
+ PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
+ StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
PERF_TIMER_GUARD(get_snapshot_time);
#ifndef NDEBUG
@@ -1975,9 +1975,8 @@ std::vector<Status> DBImpl::MultiGet(
break;
}
}
-
if (read_options.deadline.count() &&
- clock_->NowMicros() >
+ immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
break;
}
@@ -1986,7 +1985,7 @@ std::vector<Status> DBImpl::MultiGet(
if (keys_read < num_keys) {
// The only reason to break out of the loop is when the deadline is
// exceeded
- assert(clock_->NowMicros() >
+ assert(immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count()));
for (++keys_read; keys_read < num_keys; ++keys_read) {
stat_list[keys_read] = Status::TimedOut();
@@ -2426,8 +2425,8 @@ Status DBImpl::MultiGetImpl(
autovector<KeyContext*, MultiGetContext::MAX_BATCH_SIZE>* sorted_keys,
SuperVersion* super_version, SequenceNumber snapshot,
ReadCallback* callback) {
- PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
- StopWatch sw(clock_, stats_, DB_MULTIGET);
+ PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
+ StopWatch sw(immutable_db_options_.clock, stats_, DB_MULTIGET);
// For each of the given keys, apply the entire "get" process as follows:
// First look in the memtable, then in the immutable memtable (if any).
@@ -2438,7 +2437,7 @@ Status DBImpl::MultiGetImpl(
uint64_t curr_value_size = 0;
while (keys_left) {
if (read_options.deadline.count() &&
- clock_->NowMicros() >
+ immutable_db_options_.clock->NowMicros() >
static_cast<uint64_t>(read_options.deadline.count())) {
s = Status::TimedOut();
break;
@@ -2991,7 +2990,8 @@ const Snapshot* DBImpl::GetSnapshotForWriteConflictBoundary() {
SnapshotImpl* DBImpl::GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock) {
int64_t unix_time = 0;
- env_->GetCurrentTime(&unix_time).PermitUncheckedError(); // Ignore error
+ immutable_db_options_.clock->GetCurrentTime(&unix_time)
+ .PermitUncheckedError(); // Ignore error
SnapshotImpl* s = new SnapshotImpl;
if (lock) {
@@ -3136,12 +3136,16 @@ FileSystem* DBImpl::GetFileSystem() const {
return immutable_db_options_.fs.get();
}
+SystemClock* DBImpl::GetSystemClock() const {
+ return immutable_db_options_.clock;
+}
+
#ifndef ROCKSDB_LITE
-Status DBImpl::StartIOTrace(Env* env, const TraceOptions& trace_options,
+Status DBImpl::StartIOTrace(Env* /*env*/, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
assert(trace_writer != nullptr);
- return io_tracer_->StartIOTrace(env->GetSystemClock(), trace_options,
+ return io_tracer_->StartIOTrace(GetSystemClock(), trace_options,
std::move(trace_writer));
}
@@ -4422,9 +4426,9 @@ Status DBImpl::IngestExternalFiles(
std::vector<ExternalSstFileIngestionJob> ingestion_jobs;
for (const auto& arg : args) {
auto* cfd = static_cast<ColumnFamilyHandleImpl*>(arg.column_family)->cfd();
- ingestion_jobs.emplace_back(
- clock_, versions_.get(), cfd, immutable_db_options_, file_options_,
- &snapshots_, arg.options, &directories_, &event_logger_, io_tracer_);
+ ingestion_jobs.emplace_back(versions_.get(), cfd, immutable_db_options_,
+ file_options_, &snapshots_, arg.options,
+ &directories_, &event_logger_, io_tracer_);
}
// TODO(yanqin) maybe make jobs run in parallel
@@ -4691,9 +4695,9 @@ Status DBImpl::CreateColumnFamilyWithImport(
// Import sst files from metadata.
auto cfh = static_cast_with_check<ColumnFamilyHandleImpl>(*handle);
auto cfd = cfh->cfd();
- ImportColumnFamilyJob import_job(clock_, versions_.get(), cfd,
- immutable_db_options_, file_options_,
- import_options, metadata.files, io_tracer_);
+ ImportColumnFamilyJob import_job(versions_.get(), cfd, immutable_db_options_,
+ file_options_, import_options,
+ metadata.files, io_tracer_);
SuperVersionContext dummy_sv_ctx(/* create_superversion */ true);
VersionEdit dummy_edit;
@@ -4969,7 +4973,8 @@ void DBImpl::WaitForIngestFile() {
Status DBImpl::StartTrace(const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock(&trace_mutex_);
- tracer_.reset(new Tracer(clock_, trace_options, std::move(trace_writer)));
+ tracer_.reset(new Tracer(immutable_db_options_.clock, trace_options,
+ std::move(trace_writer)));
return Status::OK();
}
@@ -4988,8 +4993,8 @@ Status DBImpl::EndTrace() {
Status DBImpl::StartBlockCacheTrace(
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
- return block_cache_tracer_.StartTrace(env_, trace_options,
- std::move(trace_writer));
+ return block_cache_tracer_.StartTrace(immutable_db_options_.clock,
+ trace_options, std::move(trace_writer));
}
Status DBImpl::EndBlockCacheTrace() {
diff --git a/db/db_impl/db_impl.h b/db/db_impl/db_impl.h
index 9b732703c..97d8a4d22 100644
--- a/db/db_impl/db_impl.h
+++ b/db/db_impl/db_impl.h
@@ -486,6 +486,7 @@ class DBImpl : public DB {
#endif // ROCKSDB_LITE
// ---- End of implementations of the DB interface ----
+ SystemClock* GetSystemClock() const;
struct GetImplOptions {
ColumnFamilyHandle* column_family = nullptr;
@@ -1057,7 +1058,6 @@ class DBImpl : public DB {
bool own_info_log_;
const DBOptions initial_db_options_;
Env* const env_;
- std::shared_ptr<SystemClock> clock_;
std::shared_ptr<IOTracer> io_tracer_;
const ImmutableDBOptions immutable_db_options_;
FileSystemPtr fs_;
diff --git a/db/db_impl/db_impl_compaction_flush.cc b/db/db_impl/db_impl_compaction_flush.cc
index b025eb08f..a3ed3636c 100644
--- a/db/db_impl/db_impl_compaction_flush.cc
+++ b/db/db_impl/db_impl_compaction_flush.cc
@@ -2564,7 +2564,7 @@ void DBImpl::BackgroundCallFlush(Env::Priority thread_pri) {
s.ToString().c_str(), error_cnt);
log_buffer.FlushBufferToLog();
LogFlush(immutable_db_options_.info_log);
- clock_->SleepForMicroseconds(1000000);
+ immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
}
@@ -2637,7 +2637,8 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
if (s.IsBusy()) {
bg_cv_.SignalAll(); // In case a waiter can proceed despite the error
mutex_.Unlock();
- clock_->SleepForMicroseconds(10000); // prevent hot loop
+ immutable_db_options_.clock->SleepForMicroseconds(
+ 10000); // prevent hot loop
mutex_.Lock();
} else if (!s.ok() && !s.IsShutdownInProgress() &&
!s.IsManualCompactionPaused() && !s.IsColumnFamilyDropped()) {
@@ -2655,7 +2656,7 @@ void DBImpl::BackgroundCallCompaction(PrepickedCompaction* prepicked_compaction,
"Accumulated background error counts: %" PRIu64,
s.ToString().c_str(), error_cnt);
LogFlush(immutable_db_options_.info_log);
- clock_->SleepForMicroseconds(1000000);
+ immutable_db_options_.clock->SleepForMicroseconds(1000000);
mutex_.Lock();
} else if (s.IsManualCompactionPaused()) {
ManualCompactionState* m = prepicked_compaction->manual_compaction_state;
diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc
index 791549a23..e3afde03b 100644
--- a/db/db_impl/db_impl_files.cc
+++ b/db/db_impl/db_impl_files.cc
@@ -120,7 +120,7 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
mutable_db_options_.delete_obsolete_files_period_micros == 0) {
doing_the_full_scan = true;
} else {
- const uint64_t now_micros = clock_->NowMicros();
+ const uint64_t now_micros = immutable_db_options_.clock->NowMicros();
if ((delete_obsolete_files_last_run_ +
mutable_db_options_.delete_obsolete_files_period_micros) <
now_micros) {
diff --git a/db/db_impl/db_impl_open.cc b/db/db_impl/db_impl_open.cc
index 01b65a4b1..8e50668a6 100644
--- a/db/db_impl/db_impl_open.cc
+++ b/db/db_impl/db_impl_open.cc
@@ -293,15 +293,15 @@ Status DBImpl::NewDB(std::vector<std::string>* new_filenames) {
file->SetPreallocationBlockSize(
immutable_db_options_.manifest_preallocation_size);
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(file), manifest, file_options, clock_, io_tracer_,
- nullptr /* stats */, immutable_db_options_.listeners, nullptr,
- tmp_set.Contains(FileType::kDescriptorFile)));
+ std::move(file), manifest, file_options, immutable_db_options_.clock,
+ io_tracer_, nullptr /* stats */, immutable_db_options_.listeners,
+ nullptr, tmp_set.Contains(FileType::kDescriptorFile)));
log::Writer log(std::move(file_writer), 0, false);
std::string record;
new_db.EncodeTo(&record);
s = log.AddRecord(record);
if (s.ok()) {
- s = SyncManifest(clock_, &immutable_db_options_, log.file());
+ s = SyncManifest(&immutable_db_options_, log.file());
}
}
if (s.ok()) {
@@ -1297,7 +1297,7 @@ Status DBImpl::RestoreAliveLogFiles(const std::vector<uint64_t>& wal_numbers) {
Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
MemTable* mem, VersionEdit* edit) {
mutex_.AssertHeld();
- const uint64_t start_micros = clock_->NowMicros();
+ const uint64_t start_micros = immutable_db_options_.clock->NowMicros();
FileMetaData meta;
std::vector<BlobFileAddition> blob_file_additions;
@@ -1325,7 +1325,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
cfd->GetLatestMutableCFOptions()->paranoid_file_checks;
int64_t _current_time = 0;
- env_->GetCurrentTime(&_current_time)
+ immutable_db_options_.clock->GetCurrentTime(&_current_time)
.PermitUncheckedError(); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
meta.oldest_ancester_time = current_time;
@@ -1399,7 +1399,7 @@ Status DBImpl::WriteLevel0TableForRecovery(int job_id, ColumnFamilyData* cfd,
}
InternalStats::CompactionStats stats(CompactionReason::kFlush, 1);
- stats.micros = clock_->NowMicros() - start_micros;
+ stats.micros = immutable_db_options_.clock->NowMicros() - start_micros;
if (has_output) {
stats.bytes_written = meta.fd.GetFileSize();
@@ -1492,9 +1492,9 @@ IOStatus DBImpl::CreateWAL(uint64_t log_file_num, uint64_t recycle_log_number,
const auto& listeners = immutable_db_options_.listeners;
FileTypeSet tmp_set = immutable_db_options_.checksum_handoff_file_types;
std::unique_ptr<WritableFileWriter> file_writer(new WritableFileWriter(
- std::move(lfile), log_fname, opt_file_options, clock_, io_tracer_,
- nullptr /* stats */, listeners, nullptr,
- tmp_set.Contains(FileType::kWalFile)));
+ std::move(lfile), log_fname, opt_file_options,
+ immutable_db_options_.clock, io_tracer_, nullptr /* stats */, listeners,
+ nullptr, tmp_set.Contains(FileType::kWalFile)));
*new_log = new log::Writer(std::move(file_writer), log_file_num,
immutable_db_options_.recycle_log_file_num > 0,
immutable_db_options_.manual_wal_flush);
diff --git a/db/db_impl/db_impl_secondary.cc b/db/db_impl/db_impl_secondary.cc
index e9bd19004..161187692 100644
--- a/db/db_impl/db_impl_secondary.cc
+++ b/db/db_impl/db_impl_secondary.cc
@@ -327,8 +327,8 @@ Status DBImplSecondary::GetImpl(const ReadOptions& read_options,
ColumnFamilyHandle* column_family,
const Slice& key, PinnableSlice* pinnable_val) {
assert(pinnable_val != nullptr);
- PERF_CPU_TIMER_GUARD(get_cpu_nanos, clock_);
- StopWatch sw(clock_, stats_, DB_GET);
+ PERF_CPU_TIMER_GUARD(get_cpu_nanos, immutable_db_options_.clock);
+ StopWatch sw(immutable_db_options_.clock, stats_, DB_GET);
PERF_TIMER_GUARD(get_snapshot_time);
auto cfh = static_cast<ColumnFamilyHandleImpl*>(column_family);
diff --git a/db/db_impl/db_impl_write.cc b/db/db_impl/db_impl_write.cc
index 083cd92d1..4afc08217 100644
--- a/db/db_impl/db_impl_write.cc
+++ b/db/db_impl/db_impl_write.cc
@@ -160,7 +160,8 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
RecordTick(stats_, WRITE_WITH_WAL);
}
- StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
+ StopWatch write_sw(immutable_db_options_.clock,
+ immutable_db_options_.statistics.get(), DB_WRITE);
write_thread_.JoinBatchGroup(&w);
if (w.state == WriteThread::STATE_PARALLEL_MEMTABLE_WRITER) {
@@ -465,7 +466,8 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,
uint64_t* log_used, uint64_t log_ref,
bool disable_memtable, uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
- StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
+ StopWatch write_sw(immutable_db_options_.clock,
+ immutable_db_options_.statistics.get(), DB_WRITE);
WriteContext write_context;
@@ -621,7 +623,8 @@ Status DBImpl::UnorderedWriteMemtable(const WriteOptions& write_options,
SequenceNumber seq,
const size_t sub_batch_cnt) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
- StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
+ StopWatch write_sw(immutable_db_options_.clock,
+ immutable_db_options_.statistics.get(), DB_WRITE);
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
false /*disable_memtable*/);
@@ -676,7 +679,8 @@ Status DBImpl::WriteImplWALOnly(
WriteThread::Writer w(write_options, my_batch, callback, log_ref,
disable_memtable, sub_batch_cnt, pre_release_callback);
RecordTick(stats_, WRITE_WITH_WAL);
- StopWatch write_sw(clock_, immutable_db_options_.statistics.get(), DB_WRITE);
+ StopWatch write_sw(immutable_db_options_.clock,
+ immutable_db_options_.statistics.get(), DB_WRITE);
write_thread->JoinBatchGroup(&w);
assert(w.state != WriteThread::STATE_PARALLEL_MEMTABLE_WRITER);
@@ -1093,7 +1097,7 @@ IOStatus DBImpl::WriteToWAL(const WriteThread::WriteGroup& write_group,
}
if (io_s.ok() && need_log_sync) {
- StopWatch sw(clock_, stats_, WAL_FILE_SYNC_MICROS);
+ StopWatch sw(immutable_db_options_.clock, stats_, WAL_FILE_SYNC_MICROS);
// It's safe to access logs_ with unlocked mutex_ here because:
// - we've set getting_synced=true for all logs,
// so other threads won't pop from logs_ while we're here,
@@ -1457,8 +1461,10 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
uint64_t time_delayed = 0;
bool delayed = false;
{
- StopWatch sw(clock_, stats_, WRITE_STALL, &time_delayed);
- uint64_t delay = write_controller_.GetDelay(clock_, num_bytes);
+ StopWatch sw(immutable_db_options_.clock, stats_, WRITE_STALL,
+ &time_delayed);
+ uint64_t delay =
+ write_controller_.GetDelay(immutable_db_options_.clock, num_bytes);
if (delay > 0) {
if (write_options.no_slowdown) {
return Status::Incomplete("Write stall");
@@ -1475,14 +1481,14 @@ Status DBImpl::DelayWrite(uint64_t num_bytes,
const uint64_t kDelayInterval = 1000;
uint64_t stall_end = sw.start_time() + delay;
while (write_controller_.NeedsDelay()) {
- if (clock_->NowMicros() >= stall_end) {
+ if (immutable_db_options_.clock->NowMicros() >= stall_end) {
// We already delayed this write `delay` microseconds
break;
}
delayed = true;
// Sleep for 0.001 seconds
- clock_->SleepForMicroseconds(kDelayInterval);
+ immutable_db_options_.clock->SleepForMicroseconds(kDelayInterval);
}
mutex_.Lock();
write_thread_.EndWriteStall();
diff --git a/db/db_iter.cc b/db/db_iter.cc
index e9480adb8..71ae91bb0 100644
--- a/db/db_iter.cc
+++ b/db/db_iter.cc
@@ -45,7 +45,7 @@ DBIter::DBIter(Env* _env, const ReadOptions& read_options,
ColumnFamilyData* cfd, bool expose_blob_index)
: prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
env_(_env),
- clock_(_env->GetSystemClock()),
+ clock_(_env->GetSystemClock().get()),
logger_(cf_options.info_log),
user_comparator_(cmp),
merge_operator_(cf_options.merge_operator),
diff --git a/db/db_iter.h b/db/db_iter.h
index 9b049ffbb..284d9c12d 100644
--- a/db/db_iter.h
+++ b/db/db_iter.h
@@ -300,7 +300,7 @@ class DBIter final : public Iterator {
const SliceTransform* prefix_extractor_;
Env* const env_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Logger* logger_;
UserComparatorWrapper user_comparator_;
const MergeOperator* const merge_operator_;
diff --git a/db/error_handler.cc b/db/error_handler.cc
index f8d878e7c..80c503c4d 100644
--- a/db/error_handler.cc
+++ b/db/error_handler.cc
@@ -670,7 +670,7 @@ void ErrorHandler::RecoverFromRetryableBGIOError() {
// a period of time and redo auto resume if it is allowed.
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait0");
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:BeforeWait1");
- int64_t wait_until = db_->clock_->NowMicros() + wait_interval;
+ int64_t wait_until = db_options_.clock->NowMicros() + wait_interval;
cv_.TimedWait(wait_until);
TEST_SYNC_POINT("RecoverFromRetryableBGIOError:AfterWait0");
} else {
diff --git a/db/external_sst_file_ingestion_job.h b/db/external_sst_file_ingestion_job.h
index f46e7356c..c669089d9 100644
--- a/db/external_sst_file_ingestion_job.h
+++ b/db/external_sst_file_ingestion_job.h
@@ -74,13 +74,13 @@ struct IngestedFileInfo {
class ExternalSstFileIngestionJob {
public:
ExternalSstFileIngestionJob(
- const std::shared_ptr<SystemClock>& clock, VersionSet* versions,
- ColumnFamilyData* cfd, const ImmutableDBOptions& db_options,
- const EnvOptions& env_options, SnapshotList* db_snapshots,
+ VersionSet* versions, ColumnFamilyData* cfd,
+ const ImmutableDBOptions& db_options, const EnvOptions& env_options,
+ SnapshotList* db_snapshots,
const IngestExternalFileOptions& ingestion_options,
Directories* directories, EventLogger* event_logger,
const std::shared_ptr<IOTracer>& io_tracer)
- : clock_(clock),
+ : clock_(db_options.clock),
fs_(db_options.fs, io_tracer),
versions_(versions),
cfd_(cfd),
@@ -170,7 +170,7 @@ class ExternalSstFileIngestionJob {
template <typename TWritableFile>
Status SyncIngestedFile(TWritableFile* file);
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
FileSystemPtr fs_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
diff --git a/db/flush_job.cc b/db/flush_job.cc
index ececba81b..b73dc8ce0 100644
--- a/db/flush_job.cc
+++ b/db/flush_job.cc
@@ -127,7 +127,7 @@ FlushJob::FlushJob(const std::string& dbname, ColumnFamilyData* cfd,
pick_memtable_called(false),
thread_pri_(thread_pri),
io_tracer_(io_tracer),
- clock_(db_options_.env->GetSystemClock()),
+ clock_(db_options_.clock),
full_history_ts_low_(std::move(full_history_ts_low)) {
// Update the thread status to indicate flush.
ReportStartedFlush();
diff --git a/db/flush_job.h b/db/flush_job.h
index 35ef4a00c..78b5c7613 100644
--- a/db/flush_job.h
+++ b/db/flush_job.h
@@ -162,7 +162,7 @@ class FlushJob {
IOStatus io_status_;
const std::shared_ptr<IOTracer> io_tracer_;
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
const std::string full_history_ts_low_;
};
diff --git a/db/import_column_family_job.h b/db/import_column_family_job.h
index 0896b4fad..3cf4eb56e 100644
--- a/db/import_column_family_job.h
+++ b/db/import_column_family_job.h
@@ -21,14 +21,13 @@ class SystemClock;
// to ExternalSstFileIngestionJob.
class ImportColumnFamilyJob {
public:
- ImportColumnFamilyJob(const std::shared_ptr<SystemClock>& clock,
- VersionSet* versions, ColumnFamilyData* cfd,
+ ImportColumnFamilyJob(VersionSet* versions, ColumnFamilyData* cfd,
const ImmutableDBOptions& db_options,
const EnvOptions& env_options,
const ImportColumnFamilyOptions& import_options,
const std::vector<LiveFileMetaData>& metadata,
const std::shared_ptr<IOTracer>& io_tracer)
- : clock_(clock),
+ : clock_(db_options.clock),
versions_(versions),
cfd_(cfd),
db_options_(db_options),
@@ -61,7 +60,7 @@ class ImportColumnFamilyJob {
IngestedFileInfo* file_to_import,
SuperVersion* sv);
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
VersionSet* versions_;
ColumnFamilyData* cfd_;
const ImmutableDBOptions& db_options_;
diff --git a/db/internal_stats.h b/db/internal_stats.h
index 6f449aa3d..c77481d8a 100644
--- a/db/internal_stats.h
+++ b/db/internal_stats.h
@@ -125,8 +125,7 @@ class InternalStats {
kIntStatsNumMax,
};
- InternalStats(int num_levels, const std::shared_ptr<SystemClock>& clock,
- ColumnFamilyData* cfd)
+ InternalStats(int num_levels, SystemClock* clock, ColumnFamilyData* cfd)
: db_stats_{},
cf_stats_value_{},
cf_stats_count_{},
@@ -638,7 +637,7 @@ class InternalStats {
uint64_t bg_error_count_;
const int number_levels_;
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
ColumnFamilyData* cfd_;
uint64_t started_at_;
};
@@ -677,8 +676,7 @@ class InternalStats {
kIntStatsNumMax,
};
- InternalStats(int /*num_levels*/,
- const std::shared_ptr<SystemClock>& /*clock*/,
+ InternalStats(int /*num_levels*/, SystemClock* /*clock*/,
ColumnFamilyData* /*cfd*/) {}
struct CompactionStats {
diff --git a/db/memtable.cc b/db/memtable.cc
index 8283a5c6b..5789956fa 100644
--- a/db/memtable.cc
+++ b/db/memtable.cc
@@ -104,7 +104,7 @@ MemTable::MemTable(const InternalKeyComparator& cmp,
: 0),
prefix_extractor_(mutable_cf_options.prefix_extractor.get()),
flush_state_(FLUSH_NOT_REQUESTED),
- clock_(ioptions.env->GetSystemClock()),
+ clock_(ioptions.clock),
insert_with_hint_prefix_extractor_(
ioptions.memtable_insert_with_hint_prefix_extractor),
oldest_key_time_(std::numeric_limits<uint64_t>::max()),
@@ -684,7 +684,7 @@ struct Saver {
Statistics* statistics;
bool inplace_update_support;
bool do_merge;
- std::shared_ptr<SystemClock> clock;
+ SystemClock* clock;
ReadCallback* callback_;
bool* is_blob_index;
diff --git a/db/memtable.h b/db/memtable.h
index 67010d867..81d40f077 100644
--- a/db/memtable.h
+++ b/db/memtable.h
@@ -512,7 +512,7 @@ class MemTable {
std::atomic<FlushStateEnum> flush_state_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// Extract sequential insert prefixes.
const SliceTransform* insert_with_hint_prefix_extractor_;
diff --git a/db/merge_helper.cc b/db/merge_helper.cc
index dd9e1c4d3..31cd3b6c5 100644
--- a/db/merge_helper.cc
+++ b/db/merge_helper.cc
@@ -29,7 +29,7 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
Statistics* stats,
const std::atomic<bool>* shutting_down)
: env_(env),
- clock_(env->GetSystemClock()),
+ clock_(env->GetSystemClock().get()),
user_comparator_(user_comparator),
user_merge_operator_(user_merge_operator),
compaction_filter_(compaction_filter),
@@ -50,11 +50,13 @@ MergeHelper::MergeHelper(Env* env, const Comparator* user_comparator,
}
}
-Status MergeHelper::TimedFullMerge(
- const MergeOperator* merge_operator, const Slice& key, const Slice* value,
- const std::vector<Slice>& operands, std::string* result, Logger* logger,
- Statistics* statistics, const std::shared_ptr<SystemClock>& clock,
- Slice* result_operand, bool update_num_ops_stats) {
+Status MergeHelper::TimedFullMerge(const MergeOperator* merge_operator,
+ const Slice& key, const Slice* value,
+ const std::vector<Slice>& operands,
+ std::string* result, Logger* logger,
+ Statistics* statistics, SystemClock* clock,
+ Slice* result_operand,
+ bool update_num_ops_stats) {
assert(merge_operator != nullptr);
if (operands.size() == 0) {
diff --git a/db/merge_helper.h b/db/merge_helper.h
index 1d6d59585..f3bcd948b 100644
--- a/db/merge_helper.h
+++ b/db/merge_helper.h
@@ -45,11 +45,13 @@ class MergeHelper {
// Returns one of the following statuses:
// - OK: Entries were successfully merged.
// - Corruption: Merge operator reported unsuccessful merge.
- static Status TimedFullMerge(
- const MergeOperator* merge_operator, const Slice& key, const Slice* value,
- const std::vector<Slice>& operands, std::string* result, Logger* logger,
- Statistics* statistics, const std::shared_ptr<SystemClock>& clock,
- Slice* result_operand = nullptr, bool update_num_ops_stats = false);
+ static Status TimedFullMerge(const MergeOperator* merge_operator,
+ const Slice& key, const Slice* value,
+ const std::vector<Slice>& operands,
+ std::string* result, Logger* logger,
+ Statistics* statistics, SystemClock* clock,
+ Slice* result_operand = nullptr,
+ bool update_num_ops_stats = false);
// Merge entries until we hit
// - a corrupted key
@@ -139,7 +141,7 @@ class MergeHelper {
private:
Env* env_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
const Comparator* user_comparator_;
const MergeOperator* user_merge_operator_;
const CompactionFilter* compaction_filter_;
diff --git a/db/perf_context_test.cc b/db/perf_context_test.cc
index 95da75e64..908e684f7 100644
--- a/db/perf_context_test.cc
+++ b/db/perf_context_test.cc
@@ -93,7 +93,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::string value;
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
timer.Start();
auto status = db->Get(read_options, key, &value);
auto elapsed_nanos = timer.ElapsedNanos();
@@ -112,7 +112,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::unique_ptr<Iterator> iter(db->NewIterator(read_options));
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
iter->SeekToFirst();
hist_seek_to_first.Add(get_perf_context()->user_key_comparison_count);
auto elapsed_nanos = timer.ElapsedNanos();
@@ -133,7 +133,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
std::string key = "k" + ToString(i);
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
iter->Seek(key);
auto elapsed_nanos = timer.ElapsedNanos();
hist_seek.Add(get_perf_context()->user_key_comparison_count);
@@ -147,7 +147,7 @@ TEST_F(PerfContextTest, SeekIntoDeletion) {
get_perf_context()->Reset();
ASSERT_TRUE(iter->Valid());
- StopWatchNano timer2(SystemClock::Default(), true);
+ StopWatchNano timer2(SystemClock::Default().get(), true);
iter->Next();
auto elapsed_nanos2 = timer2.ElapsedNanos();
if (FLAGS_verbose) {
@@ -166,7 +166,7 @@ TEST_F(PerfContextTest, StopWatchNanoOverhead) {
const int kTotalIterations = 1000000;
std::vector<uint64_t> timings(kTotalIterations);
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
for (auto& timing : timings) {
timing = timer.ElapsedNanos(true /* reset */);
}
@@ -187,7 +187,7 @@ TEST_F(PerfContextTest, StopWatchOverhead) {
uint64_t elapsed = 0;
std::vector<uint64_t> timings(kTotalIterations);
- StopWatch timer(SystemClock::Default(), nullptr, 0, &elapsed);
+ StopWatch timer(SystemClock::Default().get(), nullptr, 0, &elapsed);
for (auto& timing : timings) {
timing = elapsed;
}
@@ -541,7 +541,7 @@ TEST_F(PerfContextTest, SeekKeyComparison) {
HistogramImpl hist_time_diff;
SetPerfLevel(kEnableTime);
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
for (const int i : keys) {
std::string key = "k" + ToString(i);
std::string value = "v" + ToString(i);
@@ -594,7 +594,8 @@ TEST_F(PerfContextTest, DBMutexLockCounter) {
for (PerfLevel perf_level_test :
{PerfLevel::kEnableTimeExceptForMutex, PerfLevel::kEnableTime}) {
for (int c = 0; c < 2; ++c) {
- InstrumentedMutex mutex(nullptr, SystemClock::Default(), stats_code[c]);
+ InstrumentedMutex mutex(nullptr, SystemClock::Default().get(),
+ stats_code[c]);
mutex.Lock();
ROCKSDB_NAMESPACE::port::Thread child_thread([&] {
SetPerfLevel(perf_level_test);
@@ -621,7 +622,8 @@ TEST_F(PerfContextTest, FalseDBMutexWait) {
SetPerfLevel(kEnableTime);
int stats_code[] = {0, static_cast<int>(DB_MUTEX_WAIT_MICROS)};
for (int c = 0; c < 2; ++c) {
- InstrumentedMutex mutex(nullptr, SystemClock::Default(), stats_code[c]);
+ InstrumentedMutex mutex(nullptr, SystemClock::Default().get(),
+ stats_code[c]);
InstrumentedCondVar lock(&mutex);
get_perf_context()->Reset();
mutex.Lock();
diff --git a/db/periodic_work_scheduler.cc b/db/periodic_work_scheduler.cc
index ebca23d51..677eec90c 100644
--- a/db/periodic_work_scheduler.cc
+++ b/db/periodic_work_scheduler.cc
@@ -13,7 +13,7 @@ namespace ROCKSDB_NAMESPACE {
PeriodicWorkScheduler::PeriodicWorkScheduler(
const std::shared_ptr<SystemClock>& clock) {
- timer = std::unique_ptr<Timer>(new Timer(clock));
+ timer = std::unique_ptr<Timer>(new Timer(clock.get()));
}
void PeriodicWorkScheduler::Register(DBImpl* dbi,
@@ -87,7 +87,7 @@ PeriodicWorkTestScheduler* PeriodicWorkTestScheduler::Default(
MutexLock timer_mu_guard(&scheduler.timer_mu_);
scheduler.timer->Shutdown();
}
- scheduler.timer.reset(new Timer(clock));
+ scheduler.timer.reset(new Timer(clock.get()));
}
}
return &scheduler;
diff --git a/db/prefix_test.cc b/db/prefix_test.cc
index 24b153575..37673eb8c 100644
--- a/db/prefix_test.cc
+++ b/db/prefix_test.cc
@@ -598,7 +598,6 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
HistogramImpl hist_put_time;
HistogramImpl hist_put_comparison;
-
// insert x random prefix, each with y continuous element.
for (auto prefix : prefixes) {
for (uint64_t sorted = 0; sorted < FLAGS_items_per_prefix; sorted++) {
@@ -609,7 +608,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
std::string value(FLAGS_value_size, 0);
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
ASSERT_OK(db->Put(write_options, key, value));
hist_put_time.Add(timer.ElapsedNanos());
hist_put_comparison.Add(get_perf_context()->user_key_comparison_count);
@@ -632,7 +631,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
std::string value = "v" + ToString(0);
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
auto key_prefix = options.prefix_extractor->Transform(key);
uint64_t total_keys = 0;
for (iter->Seek(key);
@@ -666,7 +665,7 @@ TEST_F(PrefixTest, DynamicPrefixIterator) {
Slice key = TestKeyToSlice(s, test_key);
get_perf_context()->Reset();
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
iter->Seek(key);
hist_no_seek_time.Add(timer.ElapsedNanos());
hist_no_seek_comparison.Add(get_perf_context()->user_key_comparison_count);
diff --git a/db/range_del_aggregator_bench.cc b/db/range_del_aggregator_bench.cc
index a92efa132..061232f99 100644
--- a/db/range_del_aggregator_bench.cc
+++ b/db/range_del_aggregator_bench.cc
@@ -171,6 +171,8 @@ int main(int argc, char** argv) {
ParseCommandLineFlags(&argc, &argv, true);
Stats stats;
+ ROCKSDB_NAMESPACE::SystemClock* clock =
+ ROCKSDB_NAMESPACE::SystemClock::Default().get();
ROCKSDB_NAMESPACE::Random64 rnd(FLAGS_seed);
std::default_random_engine random_gen(FLAGS_seed);
std::normal_distribution<double> normal_dist(FLAGS_tombstone_width_mean,
@@ -219,7 +221,7 @@ int main(int argc, char** argv) {
ROCKSDB_NAMESPACE::kMaxSequenceNumber));
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_add_tombstones(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true /* auto_start */);
+ clock, true /* auto_start */);
range_del_agg.AddTombstones(std::move(fragmented_range_del_iter));
stats.time_add_tombstones += stop_watch_add_tombstones.ElapsedNanos();
}
@@ -236,7 +238,7 @@ int main(int argc, char** argv) {
parsed_key.user_key = key_string;
ROCKSDB_NAMESPACE::StopWatchNano stop_watch_should_delete(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true /* auto_start */);
+ clock, true /* auto_start */);
range_del_agg.ShouldDelete(parsed_key, mode);
uint64_t call_time = stop_watch_should_delete.ElapsedNanos();
diff --git a/db/repair.cc b/db/repair.cc
index 94c31cac3..df41719df 100644
--- a/db/repair.cc
+++ b/db/repair.cc
@@ -425,7 +425,8 @@ class Repairer {
Arena arena;
ScopedArenaIterator iter(mem->NewIterator(ro, &arena));
int64_t _current_time = 0;
- status = env_->GetCurrentTime(&_current_time); // ignore error
+ immutable_db_options_.clock->GetCurrentTime(&_current_time)
+ .PermitUncheckedError(); // ignore error
const uint64_t current_time = static_cast<uint64_t>(_current_time);
SnapshotChecker* snapshot_checker = DisableGCSnapshotChecker::Instance();
diff --git a/db/table_cache.cc b/db/table_cache.cc
index b78b5b1e1..1cd3d4fcf 100644
--- a/db/table_cache.cc
+++ b/db/table_cache.cc
@@ -106,15 +106,14 @@ Status TableCache::GetTableReader(
TableFileName(ioptions_.cf_paths, fd.GetNumber(), fd.GetPathId());
std::unique_ptr<FSRandomAccessFile> file;
FileOptions fopts = file_options;
- const auto& clock = ioptions_.env->GetSystemClock();
- Status s = PrepareIOFromReadOptions(ro, clock, fopts.io_options);
+ Status s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
if (s.ok()) {
s = ioptions_.fs->NewRandomAccessFile(fname, fopts, &file, nullptr);
}
RecordTick(ioptions_.statistics, NO_FILE_OPENS);
if (s.IsPathNotFound()) {
fname = Rocks2LevelTableFileName(fname);
- s = PrepareIOFromReadOptions(ro, clock, fopts.io_options);
+ s = PrepareIOFromReadOptions(ro, ioptions_.clock, fopts.io_options);
if (s.ok()) {
s = ioptions_.fs->NewRandomAccessFile(fname, file_options, &file,
nullptr);
@@ -126,10 +125,10 @@ Status TableCache::GetTableReader(
if (!sequential_mode && ioptions_.advise_random_on_open) {
file->Hint(FSRandomAccessFile::kRandom);
}
- StopWatch sw(clock, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
+ StopWatch sw(ioptions_.clock, ioptions_.statistics, TABLE_OPEN_IO_MICROS);
std::unique_ptr<RandomAccessFileReader> file_reader(
new RandomAccessFileReader(
- std::move(file), fname, clock, io_tracer_,
+ std::move(file), fname, ioptions_.clock, io_tracer_,
record_read_stats ? ioptions_.statistics : nullptr, SST_READ_MICROS,
file_read_hist, ioptions_.rate_limiter, ioptions_.listeners));
s = ioptions_.table_factory->NewTableReader(
@@ -162,8 +161,7 @@ Status TableCache::FindTable(const ReadOptions& ro,
HistogramImpl* file_read_hist, bool skip_filters,
int level, bool prefetch_index_and_filter_in_cache,
size_t max_file_size_for_l0_meta_pin) {
- PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos,
- ioptions_.env->GetSystemClock());
+ PERF_TIMER_GUARD_WITH_CLOCK(find_table_nanos, ioptions_.clock);
uint64_t number = fd.GetNumber();
Slice key = GetSliceForFileNumber(&number);
*handle = cache_->Lookup(key);
diff --git a/db/version_set.cc b/db/version_set.cc
index 12430a8a6..e2ecb8b0a 100644
--- a/db/version_set.cc
+++ b/db/version_set.cc
@@ -1762,7 +1762,7 @@ Version::Version(ColumnFamilyData* column_family_data, VersionSet* vset,
const std::shared_ptr<IOTracer>& io_tracer,
uint64_t version_number)
: env_(vset->env_),
- clock_(env_->GetSystemClock()),
+ clock_(vset->clock_),
cfd_(column_family_data),
info_log_((cfd_ == nullptr) ? nullptr : cfd_->ioptions()->info_log),
db_statistics_((cfd_ == nullptr) ? nullptr
@@ -2534,7 +2534,7 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions,
uint32_t ttl_expired_files_count = 0;
int64_t _current_time;
- auto status = ioptions.env->GetCurrentTime(&_current_time);
+ auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (status.ok()) {
const uint64_t current_time = static_cast<uint64_t>(_current_time);
for (FileMetaData* f : files) {
@@ -2703,7 +2703,7 @@ void VersionStorageInfo::ComputeExpiredTtlFiles(
expired_ttl_files_.clear();
int64_t _current_time;
- auto status = ioptions.env->GetCurrentTime(&_current_time);
+ auto status = ioptions.clock->GetCurrentTime(&_current_time);
if (!status.ok()) {
return;
}
@@ -2730,7 +2730,7 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction(
files_marked_for_periodic_compaction_.clear();
int64_t temp_current_time;
- auto status = ioptions.env->GetCurrentTime(&temp_current_time);
+ auto status = ioptions.clock->GetCurrentTime(&temp_current_time);
if (!status.ok()) {
return;
}
@@ -3789,7 +3789,7 @@ VersionSet::VersionSet(const std::string& dbname,
table_cache_(table_cache),
env_(_db_options->env),
fs_(_db_options->fs, io_tracer),
- clock_(env_->GetSystemClock()),
+ clock_(_db_options->clock),
dbname_(dbname),
db_options_(_db_options),
next_file_number_(2),
@@ -4176,7 +4176,7 @@ Status VersionSet::ProcessManifestWrites(
}
}
if (s.ok()) {
- io_s = SyncManifest(clock_, db_options_, descriptor_log_->file());
+ io_s = SyncManifest(db_options_, descriptor_log_->file());
TEST_SYNC_POINT_CALLBACK(
"VersionSet::ProcessManifestWrites:AfterSyncManifest", &io_s);
}
diff --git a/db/version_set.h b/db/version_set.h
index 99fcbf4d1..93f8c8abd 100644
--- a/db/version_set.h
+++ b/db/version_set.h
@@ -794,7 +794,7 @@ class Version {
private:
Env* env_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
friend class ReactiveVersionSet;
friend class VersionSet;
@@ -1345,7 +1345,7 @@ class VersionSet {
Cache* table_cache_;
Env* const env_;
FileSystemPtr const fs_;
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* const clock_;
const std::string dbname_;
std::string db_id_;
const ImmutableDBOptions* const db_options_;
diff --git a/db/version_set_test.cc b/db/version_set_test.cc
index 2bfbf03d0..c91b87293 100644
--- a/db/version_set_test.cc
+++ b/db/version_set_test.cc
@@ -2773,7 +2773,7 @@ class VersionSetTestMissingFiles : public VersionSetTestBase,
Status s = fs_->NewWritableFile(fname, FileOptions(), &file, nullptr);
ASSERT_OK(s);
std::unique_ptr<WritableFileWriter> fwriter(new WritableFileWriter(
- std::move(file), fname, FileOptions(), env_->GetSystemClock()));
+ std::move(file), fname, FileOptions(), env_->GetSystemClock().get()));
std::vector<std::unique_ptr<IntTblPropCollectorFactory>>
int_tbl_prop_collector_factories;
diff --git a/db/wal_manager.cc b/db/wal_manager.cc
index 7e77e0361..7c7c27c12 100644
--- a/db/wal_manager.cc
+++ b/db/wal_manager.cc
@@ -140,8 +140,8 @@ void WalManager::PurgeObsoleteWALFiles() {
return;
}
- int64_t current_time;
- Status s = env_->GetCurrentTime(&current_time);
+ int64_t current_time = 0;
+ Status s = db_options_.clock->GetCurrentTime(&current_time);
if (!s.ok()) {
ROCKS_LOG_ERROR(db_options_.info_log, "Can't get current time: %s",
s.ToString().c_str());
@@ -171,7 +171,6 @@ void WalManager::PurgeObsoleteWALFiles() {
size_t log_files_num = 0;
uint64_t log_file_size = 0;
-
for (auto& f : files) {
uint64_t number;
FileType type;
diff --git a/db/wal_manager_test.cc b/db/wal_manager_test.cc
index 8dc05b982..5ea5d0c58 100644
--- a/db/wal_manager_test.cc
+++ b/db/wal_manager_test.cc
@@ -49,6 +49,7 @@ class WalManagerTest : public testing::Test {
db_options_.wal_dir = dbname_;
db_options_.env = env_.get();
db_options_.fs = env_->GetFileSystem();
+ db_options_.clock = env_->GetSystemClock().get();
versions_.reset(
new VersionSet(dbname_, &db_options_, env_options_, table_cache_.get(),
diff --git a/db/write_batch.cc b/db/write_batch.cc
index 2095b1ca4..1d9423e0d 100644
--- a/db/write_batch.cc
+++ b/db/write_batch.cc
@@ -2041,7 +2041,8 @@ class MemTableInserter : public WriteBatch::Handler {
std::string new_value;
Status merge_status = MergeHelper::TimedFullMerge(
merge_operator, key, &get_value_slice, {value}, &new_value,
- moptions->info_log, moptions->statistics, SystemClock::Default());
+ moptions->info_log, moptions->statistics,
+ SystemClock::Default().get());
if (!merge_status.ok()) {
// Failed to merge!
diff --git a/db/write_controller.cc b/db/write_controller.cc
index a10e98973..73f4f33eb 100644
--- a/db/write_controller.cc
+++ b/db/write_controller.cc
@@ -43,8 +43,7 @@ bool WriteController::IsStopped() const {
// If it turns out to be a performance issue, we can redesign the thread
// synchronization model here.
// The function trust caller will sleep micros returned.
-uint64_t WriteController::GetDelay(const std::shared_ptr<SystemClock>& clock,
- uint64_t num_bytes) {
+uint64_t WriteController::GetDelay(SystemClock* clock, uint64_t num_bytes) {
if (total_stopped_.load(std::memory_order_relaxed) > 0) {
return 0;
}
@@ -108,8 +107,7 @@ uint64_t WriteController::GetDelay(const std::shared_ptr<SystemClock>& clock,
return sleep_amount;
}
-uint64_t WriteController::NowMicrosMonotonic(
- const std::shared_ptr<SystemClock>& clock) {
+uint64_t WriteController::NowMicrosMonotonic(SystemClock* clock) {
return clock->NowNanos() / std::milli::den;
}
diff --git a/db/write_controller.h b/db/write_controller.h
index 9a661d13e..dfd47cee2 100644
--- a/db/write_controller.h
+++ b/db/write_controller.h
@@ -57,8 +57,7 @@ class WriteController {
// return how many microseconds the caller needs to sleep after the call
// num_bytes: how many number of bytes to put into the DB.
// Prerequisite: DB mutex held.
- uint64_t GetDelay(const std::shared_ptr<SystemClock>& clock,
- uint64_t num_bytes);
+ uint64_t GetDelay(SystemClock* clock, uint64_t num_bytes);
void set_delayed_write_rate(uint64_t write_rate) {
// avoid divide 0
if (write_rate == 0) {
@@ -86,7 +85,7 @@ class WriteController {
RateLimiter* low_pri_rate_limiter() { return low_pri_rate_limiter_.get(); }
private:
- uint64_t NowMicrosMonotonic(const std::shared_ptr<SystemClock>& clock);
+ uint64_t NowMicrosMonotonic(SystemClock* clock);
friend class WriteControllerToken;
friend class StopWriteToken;
diff --git a/db/write_controller_test.cc b/db/write_controller_test.cc
index af984befd..f24c291ee 100644
--- a/db/write_controller_test.cc
+++ b/db/write_controller_test.cc
@@ -32,21 +32,21 @@ TEST_F(WriteControllerTest, ChangeDelayRateTest) {
auto delay_token_0 =
controller.GetDelayToken(controller.delayed_write_rate());
ASSERT_EQ(static_cast<uint64_t>(2000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_1 = controller.GetDelayToken(2000000u);
ASSERT_EQ(static_cast<uint64_t>(10000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_2 = controller.GetDelayToken(1000000u);
ASSERT_EQ(static_cast<uint64_t>(20000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
auto delay_token_3 = controller.GetDelayToken(20000000u);
ASSERT_EQ(static_cast<uint64_t>(1000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
// This is more than max rate. Max delayed rate will be used.
auto delay_token_4 =
controller.GetDelayToken(controller.delayed_write_rate() * 3);
ASSERT_EQ(static_cast<uint64_t>(500000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
}
TEST_F(WriteControllerTest, SanityTest) {
@@ -62,69 +62,77 @@ TEST_F(WriteControllerTest, SanityTest) {
auto delay_token_1 = controller.GetDelayToken(10000000u);
ASSERT_EQ(static_cast<uint64_t>(2000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
clock_->now_micros_ += 1999900u; // sleep debt 1000
auto delay_token_2 = controller.GetDelayToken(10000000u);
// Rate reset after changing the token.
ASSERT_EQ(static_cast<uint64_t>(2000000),
- controller.GetDelay(clock_, 20000000u));
+ controller.GetDelay(clock_.get(), 20000000u));
clock_->now_micros_ += 1999900u; // sleep debt 1000
// One refill: 10240 bytes allowed, 1000 used, 9240 left
- ASSERT_EQ(static_cast<uint64_t>(1124), controller.GetDelay(clock_, 1000u));
+ ASSERT_EQ(static_cast<uint64_t>(1124),
+ controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 1124u; // sleep debt 0
delay_token_2.reset();
// 1000 used, 8240 left
- ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
+ ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 100u; // sleep credit 100
// 1000 used, 7240 left
- ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
+ ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 100u; // sleep credit 200
// One refill: 10240 fileed, sleep credit generates 2000. 8000 used
// 7240 + 10240 + 2000 - 8000 = 11480 left
- ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 8000u));
+ ASSERT_EQ(static_cast<uint64_t>(1024u),
+ controller.GetDelay(clock_.get(), 8000u));
clock_->now_micros_ += 200u; // sleep debt 824
// 1000 used, 10480 left.
- ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 1000u));
+ ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 1000u));
clock_->now_micros_ += 200u; // sleep debt 624
// Out of bound sleep, still 10480 left
ASSERT_EQ(static_cast<uint64_t>(3000624u),
- controller.GetDelay(clock_, 30000000u));
+ controller.GetDelay(clock_.get(), 30000000u));
clock_->now_micros_ += 3000724u; // sleep credit 100
// 6000 used, 4480 left.
- ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 6000u));
+ ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_.get(), 6000u));
clock_->now_micros_ += 200u; // sleep credit 300
// One refill, credit 4480 balance + 3000 credit + 10240 refill
// Use 8000, 9720 left
- ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 8000u));
+ ASSERT_EQ(static_cast<uint64_t>(1024u),
+ controller.GetDelay(clock_.get(), 8000u));
clock_->now_micros_ += 3024u; // sleep credit 2000
// 1720 left
- ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
+ ASSERT_EQ(static_cast<uint64_t>(0u),
+ controller.GetDelay(clock_.get(), 8000u));
// 1720 balance + 20000 credit = 20170 left
// Use 8000, 12170 left
- ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
+ ASSERT_EQ(static_cast<uint64_t>(0u),
+ controller.GetDelay(clock_.get(), 8000u));
// 4170 left
- ASSERT_EQ(static_cast<uint64_t>(0u), controller.GetDelay(clock_, 8000u));
+ ASSERT_EQ(static_cast<uint64_t>(0u),
+ controller.GetDelay(clock_.get(), 8000u));
// Need a refill
- ASSERT_EQ(static_cast<uint64_t>(1024u), controller.GetDelay(clock_, 9000u));
+ ASSERT_EQ(static_cast<uint64_t>(1024u),
+ controller.GetDelay(clock_.get(), 9000u));
delay_token_1.reset();
- ASSERT_EQ(static_cast<uint64_t>(0), controller.GetDelay(clock_, 30000000u));
+ ASSERT_EQ(static_cast<uint64_t>(0),
+ controller.GetDelay(clock_.get(), 30000000u));
delay_token_1.reset();
ASSERT_FALSE(controller.IsStopped());
}
diff --git a/db_stress_tool/db_stress_driver.cc b/db_stress_tool/db_stress_driver.cc
index 69411aa29..b944214ec 100644
--- a/db_stress_tool/db_stress_driver.cc
+++ b/db_stress_tool/db_stress_driver.cc
@@ -57,6 +57,7 @@ void ThreadBody(void* v) {
}
bool RunStressTest(StressTest* stress) {
+ SystemClock* clock = db_stress_env->GetSystemClock().get();
stress->InitDb();
SharedState shared(db_stress_env, stress);
stress->FinishInitDb(&shared);
@@ -69,9 +70,9 @@ bool RunStressTest(StressTest* stress) {
uint32_t n = shared.GetNumThreads();
- uint64_t now = db_stress_env->NowMicros();
+ uint64_t now = clock->NowMicros();
fprintf(stdout, "%s Initializing worker threads\n",
- db_stress_env->TimeToString(now / 1000000).c_str());
+ clock->TimeToString(now / 1000000).c_str());
std::vector<ThreadState*> threads(n);
for (uint32_t i = 0; i < n; i++) {
threads[i] = new ThreadState(i, &shared);
@@ -104,9 +105,9 @@ bool RunStressTest(StressTest* stress) {
}
}
- now = db_stress_env->NowMicros();
+ now = clock->NowMicros();
fprintf(stdout, "%s Starting database operations\n",
- db_stress_env->TimeToString(now / 1000000).c_str());
+ clock->TimeToString(now / 1000000).c_str());
shared.SetStart();
shared.GetCondVar()->SignalAll();
@@ -114,16 +115,16 @@ bool RunStressTest(StressTest* stress) {
shared.GetCondVar()->Wait();
}
- now = db_stress_env->NowMicros();
+ now = clock->NowMicros();
if (FLAGS_test_batches_snapshots) {
fprintf(stdout, "%s Limited verification already done during gets\n",
- db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
+ clock->TimeToString((uint64_t)now / 1000000).c_str());
} else if (FLAGS_skip_verifydb) {
fprintf(stdout, "%s Verification skipped\n",
- db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
+ clock->TimeToString((uint64_t)now / 1000000).c_str());
} else {
fprintf(stdout, "%s Starting verification\n",
- db_stress_env->TimeToString((uint64_t)now / 1000000).c_str());
+ clock->TimeToString((uint64_t)now / 1000000).c_str());
}
shared.SetStartVerify();
@@ -142,11 +143,11 @@ bool RunStressTest(StressTest* stress) {
delete threads[i];
threads[i] = nullptr;
}
- now = db_stress_env->NowMicros();
+ now = clock->NowMicros();
if (!FLAGS_skip_verifydb && !FLAGS_test_batches_snapshots &&
!shared.HasVerificationFailedYet()) {
fprintf(stdout, "%s Verification successful\n",
- db_stress_env->TimeToString(now / 1000000).c_str());
+ clock->TimeToString(now / 1000000).c_str());
}
stress->PrintStatistics();
diff --git a/db_stress_tool/db_stress_stat.h b/db_stress_tool/db_stress_stat.h
index e8bc0986a..429cf3b2a 100644
--- a/db_stress_tool/db_stress_stat.h
+++ b/db_stress_tool/db_stress_stat.h
@@ -11,9 +11,9 @@
#include "monitoring/histogram.h"
#include "port/port.h"
-#include "rocksdb/env.h"
#include "rocksdb/snapshot.h"
#include "rocksdb/statistics.h"
+#include "rocksdb/system_clock.h"
#include "util/gflags_compat.h"
#include "util/random.h"
@@ -73,7 +73,7 @@ class Stats {
seconds_ = 0;
num_compact_files_succeed_ = 0;
num_compact_files_failed_ = 0;
- start_ = Env::Default()->NowMicros();
+ start_ = SystemClock::Default()->NowMicros();
last_op_finish_ = start_;
finish_ = start_;
}
@@ -102,13 +102,13 @@ class Stats {
}
void Stop() {
- finish_ = Env::Default()->NowMicros();
+ finish_ = SystemClock::Default()->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
void FinishedSingleOp() {
if (FLAGS_histogram) {
- auto now = Env::Default()->NowMicros();
+ auto now = SystemClock::Default()->NowMicros();
auto micros = now - last_op_finish_;
hist_.Add(micros);
if (micros > 20000) {
diff --git a/db_stress_tool/db_stress_test_base.cc b/db_stress_tool/db_stress_test_base.cc
index b66ee8955..e506bf73d 100644
--- a/db_stress_tool/db_stress_test_base.cc
+++ b/db_stress_tool/db_stress_test_base.cc
@@ -35,6 +35,7 @@ StressTest::StressTest()
#ifndef ROCKSDB_LITE
txn_db_(nullptr),
#endif
+ clock_(db_stress_env->GetSystemClock().get()),
new_column_family_name_(1),
num_times_reopened_(0),
db_preload_finished_(false),
@@ -226,9 +227,9 @@ bool StressTest::BuildOptionsTable() {
}
void StressTest::InitDb() {
- uint64_t now = db_stress_env->NowMicros();
+ uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Initializing db_stress\n",
- db_stress_env->TimeToString(now / 1000000).c_str());
+ clock_->TimeToString(now / 1000000).c_str());
PrintEnv();
Open();
BuildOptionsTable();
@@ -236,9 +237,9 @@ void StressTest::InitDb() {
void StressTest::FinishInitDb(SharedState* shared) {
if (FLAGS_read_only) {
- uint64_t now = db_stress_env->NowMicros();
+ uint64_t now = clock_->NowMicros();
fprintf(stdout, "%s Preloading db with %" PRIu64 " KVs\n",
- db_stress_env->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
+ clock_->TimeToString(now / 1000000).c_str(), FLAGS_max_key);
PreloadDbAndReopenAsReadOnly(FLAGS_max_key, shared);
}
if (FLAGS_enable_compaction_filter) {
@@ -255,10 +256,9 @@ void StressTest::FinishInitDb(SharedState* shared) {
bool StressTest::VerifySecondaries() {
#ifndef ROCKSDB_LITE
if (FLAGS_test_secondary) {
- uint64_t now = db_stress_env->NowMicros();
- fprintf(
- stdout, "%s Start to verify secondaries against primary\n",
- db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
+ uint64_t now = clock_->NowMicros();
+ fprintf(stdout, "%s Start to verify secondaries against primary\n",
+ clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
for (size_t k = 0; k != secondaries_.size(); ++k) {
Status s = secondaries_[k]->TryCatchUpWithPrimary();
@@ -300,10 +300,9 @@ bool StressTest::VerifySecondaries() {
}
}
if (FLAGS_test_secondary) {
- uint64_t now = db_stress_env->NowMicros();
- fprintf(
- stdout, "%s Verification of secondaries succeeded\n",
- db_stress_env->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
+ uint64_t now = clock_->NowMicros();
+ fprintf(stdout, "%s Verification of secondaries succeeded\n",
+ clock_->TimeToString(static_cast<uint64_t>(now) / 1000000).c_str());
}
#endif // ROCKSDB_LITE
return true;
@@ -462,9 +461,9 @@ void StressTest::PreloadDbAndReopenAsReadOnly(int64_t number_of_keys,
#endif
db_preload_finished_.store(true);
- auto now = db_stress_env->NowMicros();
+ auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database in read-only\n",
- db_stress_env->TimeToString(now / 1000000).c_str());
+ clock_->TimeToString(now / 1000000).c_str());
// Reopen as read-only, can ignore all options related to updates
Open();
} else {
@@ -1724,7 +1723,7 @@ Status StressTest::TestPauseBackground(ThreadState* thread) {
// 1 chance in 625 of pausing full 16s.)
int pwr2_micros =
std::min(thread->rand.Uniform(25), thread->rand.Uniform(25));
- db_stress_env->SleepForMicroseconds(1 << pwr2_micros);
+ clock_->SleepForMicroseconds(1 << pwr2_micros);
return db_->ContinueBackgroundWork();
}
@@ -2487,10 +2486,9 @@ void StressTest::Reopen(ThreadState* thread) {
secondaries_.clear();
num_times_reopened_++;
- auto now = db_stress_env->NowMicros();
+ auto now = clock_->NowMicros();
fprintf(stdout, "%s Reopening database for the %dth time\n",
- db_stress_env->TimeToString(now / 1000000).c_str(),
- num_times_reopened_);
+ clock_->TimeToString(now / 1000000).c_str(), num_times_reopened_);
Open();
}
} // namespace ROCKSDB_NAMESPACE
diff --git a/db_stress_tool/db_stress_test_base.h b/db_stress_tool/db_stress_test_base.h
index 8fc15def1..484cc1439 100644
--- a/db_stress_tool/db_stress_test_base.h
+++ b/db_stress_tool/db_stress_test_base.h
@@ -13,6 +13,7 @@
#include "db_stress_tool/db_stress_shared_state.h"
namespace ROCKSDB_NAMESPACE {
+class SystemClock;
class Transaction;
class TransactionDB;
@@ -218,6 +219,7 @@ class StressTest {
TransactionDB* txn_db_;
#endif
Options options_;
+ SystemClock* clock_;
std::vector<ColumnFamilyHandle*> column_families_;
std::vector<std::string> column_family_names_;
std::atomic<int> new_column_family_name_;
diff --git a/env/env_encryption.cc b/env/env_encryption.cc
index 8eec392ae..b94ac0e3b 100644
--- a/env/env_encryption.cc
+++ b/env/env_encryption.cc
@@ -15,6 +15,7 @@
#include "env/env_encryption_ctr.h"
#include "monitoring/perf_context_imp.h"
#include "rocksdb/convenience.h"
+#include "rocksdb/system_clock.h"
#include "util/aligned_buffer.h"
#include "util/coding.h"
#include "util/random.h"
@@ -1063,7 +1064,7 @@ Status CTREncryptionProvider::CreateNewPrefix(const std::string& /*fname*/,
return Status::InvalidArgument("Encryption Cipher is missing");
}
// Create & seed rnd.
- Random rnd((uint32_t)Env::Default()->NowMicros());
+ Random rnd((uint32_t)SystemClock::Default()->NowMicros());
// Fill entire prefix block with random values.
for (size_t i = 0; i < prefixLength; i++) {
prefix[i] = rnd.Uniform(256) & 0xFF;
diff --git a/env/env_test.cc b/env/env_test.cc
index ab5ec6649..a943a67fb 100644
--- a/env/env_test.cc
+++ b/env/env_test.cc
@@ -2215,7 +2215,7 @@ TEST_F(EnvTest, IsDirectory) {
std::unique_ptr<WritableFileWriter> fwriter;
fwriter.reset(new WritableFileWriter(std::move(wfile), test_file_path,
FileOptions(),
- SystemClock::Default()));
+ SystemClock::Default().get()));
constexpr char buf[] = "test";
s = fwriter->Append(buf);
ASSERT_OK(s);
diff --git a/env/file_system_tracer.h b/env/file_system_tracer.h
index faa99ec32..da87797d3 100644
--- a/env/file_system_tracer.h
+++ b/env/file_system_tracer.h
@@ -23,7 +23,7 @@ class FileSystemTracingWrapper : public FileSystemWrapper {
const std::shared_ptr<IOTracer>& io_tracer)
: FileSystemWrapper(t),
io_tracer_(io_tracer),
- clock_(SystemClock::Default()) {}
+ clock_(SystemClock::Default().get()) {}
~FileSystemTracingWrapper() override {}
@@ -86,7 +86,7 @@ class FileSystemTracingWrapper : public FileSystemWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
};
// The FileSystemPtr is a wrapper class that takes pointer to storage systems
@@ -138,7 +138,7 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
const std::string& file_name)
: FSSequentialFileWrapper(t),
io_tracer_(io_tracer),
- clock_(SystemClock::Default()),
+ clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSSequentialFileTracingWrapper() override {}
@@ -154,7 +154,7 @@ class FSSequentialFileTracingWrapper : public FSSequentialFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
std::string file_name_;
};
@@ -210,7 +210,7 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
const std::string& file_name)
: FSRandomAccessFileWrapper(t),
io_tracer_(io_tracer),
- clock_(SystemClock::Default()),
+ clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSRandomAccessFileTracingWrapper() override {}
@@ -229,7 +229,7 @@ class FSRandomAccessFileTracingWrapper : public FSRandomAccessFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};
@@ -285,7 +285,7 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
const std::string& file_name)
: FSWritableFileWrapper(t),
io_tracer_(io_tracer),
- clock_(SystemClock::Default()),
+ clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSWritableFileTracingWrapper() override {}
@@ -319,7 +319,7 @@ class FSWritableFileTracingWrapper : public FSWritableFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};
@@ -382,7 +382,7 @@ class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper {
const std::string& file_name)
: FSRandomRWFileWrapper(t),
io_tracer_(io_tracer),
- clock_(SystemClock::Default()),
+ clock_(SystemClock::Default().get()),
file_name_(file_name) {}
~FSRandomRWFileTracingWrapper() override {}
@@ -404,7 +404,7 @@ class FSRandomRWFileTracingWrapper : public FSRandomRWFileWrapper {
private:
std::shared_ptr<IOTracer> io_tracer_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// Stores file name instead of full path.
std::string file_name_;
};
diff --git a/env/fs_posix.cc b/env/fs_posix.cc
index 2b76088af..452131083 100644
--- a/env/fs_posix.cc
+++ b/env/fs_posix.cc
@@ -774,7 +774,9 @@ class PosixFileSystem : public FileSystem {
LockHoldingInfo lhi;
int64_t current_time = 0;
// Ignore status code as the time is only used for error message.
- Env::Default()->GetCurrentTime(&current_time).PermitUncheckedError();
+ SystemClock::Default()
+ ->GetCurrentTime(&current_time)
+ .PermitUncheckedError();
lhi.acquire_time = current_time;
lhi.acquiring_thread = Env::Default()->GetThreadID();
diff --git a/env/mock_env.cc b/env/mock_env.cc
index 9a917e21a..539f20e93 100644
--- a/env/mock_env.cc
+++ b/env/mock_env.cc
@@ -1033,26 +1033,43 @@ Status MockFileSystem::CorruptBuffer(const std::string& fname) {
iter->second->CorruptBuffer();
return Status::OK();
}
+namespace {
+class MockSystemClock : public SystemClockWrapper {
+ public:
+ explicit MockSystemClock(const std::shared_ptr<SystemClock>& c)
+ : SystemClockWrapper(c), fake_sleep_micros_(0) {}
-MockEnv::MockEnv(Env* base_env)
- : CompositeEnvWrapper(base_env, std::make_shared<MockFileSystem>(this)),
- fake_sleep_micros_(0) {}
+ void FakeSleepForMicroseconds(int64_t micros) {
+ fake_sleep_micros_.fetch_add(micros);
+ }
+
+ const char* Name() const override { return "MockSystemClock"; }
-Status MockEnv::GetCurrentTime(int64_t* unix_time) {
- auto s = CompositeEnvWrapper::GetCurrentTime(unix_time);
- if (s.ok()) {
- *unix_time += fake_sleep_micros_.load() / (1000 * 1000);
+ Status GetCurrentTime(int64_t* unix_time) override {
+ auto s = SystemClockWrapper::GetCurrentTime(unix_time);
+ if (s.ok()) {
+ auto fake_time = fake_sleep_micros_.load() / (1000 * 1000);
+ *unix_time += fake_time;
+ }
+ return s;
}
- return s;
-}
-uint64_t MockEnv::NowMicros() {
- return CompositeEnvWrapper::NowMicros() + fake_sleep_micros_.load();
-}
+ uint64_t NowMicros() override {
+ return SystemClockWrapper::NowMicros() + fake_sleep_micros_.load();
+ }
-uint64_t MockEnv::NowNanos() {
- return CompositeEnvWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
-}
+ uint64_t NowNanos() override {
+ return SystemClockWrapper::NowNanos() + fake_sleep_micros_.load() * 1000;
+ }
+
+ private:
+ std::atomic<int64_t> fake_sleep_micros_;
+};
+} // namespace
+MockEnv::MockEnv(Env* base_env)
+ : CompositeEnvWrapper(
+ base_env, std::make_shared<MockFileSystem>(this),
+ std::make_shared<MockSystemClock>(base_env->GetSystemClock())) {}
Status MockEnv::CorruptBuffer(const std::string& fname) {
auto mock = static_cast_with_check<MockFileSystem>(GetFileSystem().get());
@@ -1060,7 +1077,8 @@ Status MockEnv::CorruptBuffer(const std::string& fname) {
}
void MockEnv::FakeSleepForMicroseconds(int64_t micros) {
- fake_sleep_micros_.fetch_add(micros);
+ auto mock = static_cast_with_check<MockSystemClock>(GetSystemClock().get());
+ mock->FakeSleepForMicroseconds(micros);
}
#ifndef ROCKSDB_LITE
diff --git a/env/mock_env.h b/env/mock_env.h
index 24965849d..5e7faf55b 100644
--- a/env/mock_env.h
+++ b/env/mock_env.h
@@ -23,11 +23,6 @@ class MockEnv : public CompositeEnvWrapper {
public:
explicit MockEnv(Env* base_env);
- // Results of these can be affected by FakeSleepForMicroseconds()
- Status GetCurrentTime(int64_t* unix_time) override;
- uint64_t NowMicros() override;
- uint64_t NowNanos() override;
-
Status CorruptBuffer(const std::string& fname);
// Doesn't really sleep, just affects output of GetCurrentTime(), NowMicros()
@@ -35,7 +30,6 @@ class MockEnv : public CompositeEnvWrapper {
void FakeSleepForMicroseconds(int64_t micros);
private:
- std::atomic<int64_t> fake_sleep_micros_;
};
} // namespace ROCKSDB_NAMESPACE
diff --git a/file/delete_scheduler.cc b/file/delete_scheduler.cc
index 003bd2cc9..5f9994074 100644
--- a/file/delete_scheduler.cc
+++ b/file/delete_scheduler.cc
@@ -22,9 +22,8 @@
namespace ROCKSDB_NAMESPACE {
-DeleteScheduler::DeleteScheduler(const std::shared_ptr<SystemClock>& clock,
- FileSystem* fs, int64_t rate_bytes_per_sec,
- Logger* info_log,
+DeleteScheduler::DeleteScheduler(SystemClock* clock, FileSystem* fs,
+ int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager,
double max_trash_db_ratio,
uint64_t bytes_max_delete_chunk)
diff --git a/file/delete_scheduler.h b/file/delete_scheduler.h
index 9981d13bf..6d3f6b4a4 100644
--- a/file/delete_scheduler.h
+++ b/file/delete_scheduler.h
@@ -34,7 +34,7 @@ class SystemClock;
// case DeleteScheduler will delete files immediately.
class DeleteScheduler {
public:
- DeleteScheduler(const std::shared_ptr<SystemClock>& clock, FileSystem* fs,
+ DeleteScheduler(SystemClock* clock, FileSystem* fs,
int64_t rate_bytes_per_sec, Logger* info_log,
SstFileManagerImpl* sst_file_manager,
double max_trash_db_ratio, uint64_t bytes_max_delete_chunk);
@@ -101,7 +101,7 @@ class DeleteScheduler {
void MaybeCreateBackgroundThread();
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
FileSystem* fs_;
// total size of trash files
diff --git a/file/file_util.h b/file/file_util.h
index b015e81e5..48878833f 100644
--- a/file/file_util.h
+++ b/file/file_util.h
@@ -68,9 +68,8 @@ inline IOStatus GenerateOneFileChecksum(
allow_mmap_reads, io_tracer);
}
-inline IOStatus PrepareIOFromReadOptions(
- const ReadOptions& ro, const std::shared_ptr<SystemClock>& clock,
- IOOptions& opts) {
+inline IOStatus PrepareIOFromReadOptions(const ReadOptions& ro,
+ SystemClock* clock, IOOptions& opts) {
if (ro.deadline.count()) {
std::chrono::microseconds now =
std::chrono::microseconds(clock->NowMicros());
diff --git a/file/filename.cc b/file/filename.cc
index f1b943619..86aaba252 100644
--- a/file/filename.cc
+++ b/file/filename.cc
@@ -419,11 +419,11 @@ Status SetIdentityFile(Env* env, const std::string& dbname,
return s;
}
-IOStatus SyncManifest(const std::shared_ptr<SystemClock>& clock,
- const ImmutableDBOptions* db_options,
+IOStatus SyncManifest(const ImmutableDBOptions* db_options,
WritableFileWriter* file) {
TEST_KILL_RANDOM("SyncManifest:0", rocksdb_kill_odds * REDUCE_ODDS2);
- StopWatch sw(clock, db_options->statistics.get(), MANIFEST_FILE_SYNC_MICROS);
+ StopWatch sw(db_options->clock, db_options->statistics.get(),
+ MANIFEST_FILE_SYNC_MICROS);
return file->Sync(db_options->use_fsync);
}
diff --git a/file/filename.h b/file/filename.h
index 912ef6735..7f34ade28 100644
--- a/file/filename.h
+++ b/file/filename.h
@@ -167,8 +167,7 @@ extern Status SetIdentityFile(Env* env, const std::string& dbname,
const std::string& db_id = {});
// Sync manifest file `file`.
-extern IOStatus SyncManifest(const std::shared_ptr<SystemClock>& clock,
- const ImmutableDBOptions* db_options,
+extern IOStatus SyncManifest(const ImmutableDBOptions* db_options,
WritableFileWriter* file);
// Return list of file names of info logs in `file_names`.
diff --git a/file/random_access_file_reader.cc b/file/random_access_file_reader.cc
index 21d471f09..458b1bd00 100644
--- a/file/random_access_file_reader.cc
+++ b/file/random_access_file_reader.cc
@@ -326,10 +326,10 @@ Status RandomAccessFileReader::MultiRead(const IOOptions& opts,
IOStatus RandomAccessFileReader::PrepareIOOptions(const ReadOptions& ro,
IOOptions& opts) {
- if (clock_.get() != nullptr) {
+ if (clock_ != nullptr) {
return PrepareIOFromReadOptions(ro, clock_, opts);
} else {
- return PrepareIOFromReadOptions(ro, SystemClock::Default(), opts);
+ return PrepareIOFromReadOptions(ro, SystemClock::Default().get(), opts);
}
}
} // namespace ROCKSDB_NAMESPACE
diff --git a/file/random_access_file_reader.h b/file/random_access_file_reader.h
index 29106b5fc..cc931122b 100644
--- a/file/random_access_file_reader.h
+++ b/file/random_access_file_reader.h
@@ -67,7 +67,7 @@ class RandomAccessFileReader {
FSRandomAccessFilePtr file_;
std::string file_name_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Statistics* stats_;
uint32_t hist_type_;
HistogramImpl* file_read_hist_;
@@ -77,7 +77,7 @@ class RandomAccessFileReader {
public:
explicit RandomAccessFileReader(
std::unique_ptr<FSRandomAccessFile>&& raf, const std::string& _file_name,
- const std::shared_ptr<SystemClock>& clock = nullptr,
+ SystemClock* clock = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr, uint32_t hist_type = 0,
HistogramImpl* file_read_hist = nullptr,
diff --git a/file/random_access_file_reader_test.cc b/file/random_access_file_reader_test.cc
index 22857f8ef..b2d13b265 100644
--- a/file/random_access_file_reader_test.cc
+++ b/file/random_access_file_reader_test.cc
@@ -43,7 +43,7 @@ class RandomAccessFileReaderTest : public testing::Test {
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(fpath, opts, &f, nullptr));
(*reader).reset(new RandomAccessFileReader(std::move(f), fpath,
- env_->GetSystemClock()));
+ env_->GetSystemClock().get()));
}
void AssertResult(const std::string& content,
diff --git a/file/sst_file_manager_impl.cc b/file/sst_file_manager_impl.cc
index c53160e80..df037842b 100644
--- a/file/sst_file_manager_impl.cc
+++ b/file/sst_file_manager_impl.cc
@@ -31,8 +31,9 @@ SstFileManagerImpl::SstFileManagerImpl(
compaction_buffer_size_(0),
cur_compactions_reserved_size_(0),
max_allowed_space_(0),
- delete_scheduler_(clock_, fs_.get(), rate_bytes_per_sec, logger.get(),
- this, max_trash_db_ratio, bytes_max_delete_chunk),
+ delete_scheduler_(clock_.get(), fs_.get(), rate_bytes_per_sec,
+ logger.get(), this, max_trash_db_ratio,
+ bytes_max_delete_chunk),
cv_(&mu_),
closing_(false),
bg_thread_(nullptr),
diff --git a/file/writable_file_writer.h b/file/writable_file_writer.h
index 0556f7c77..e6894281c 100644
--- a/file/writable_file_writer.h
+++ b/file/writable_file_writer.h
@@ -123,7 +123,7 @@ class WritableFileWriter {
std::string file_name_;
FSWritableFilePtr writable_file_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
AlignedBuffer buf_;
size_t max_buffer_size_;
// Actually written data size can be used for truncate
@@ -148,8 +148,7 @@ class WritableFileWriter {
public:
WritableFileWriter(
std::unique_ptr<FSWritableFile>&& file, const std::string& _file_name,
- const FileOptions& options,
- const std::shared_ptr<SystemClock>& clock = nullptr,
+ const FileOptions& options, SystemClock* clock = nullptr,
const std::shared_ptr<IOTracer>& io_tracer = nullptr,
Statistics* stats = nullptr,
const std::vector<std::shared_ptr<EventListener>>& listeners = {},
diff --git a/include/rocksdb/metadata.h b/include/rocksdb/metadata.h
index 9a64a7a8f..ea7049842 100644
--- a/include/rocksdb/metadata.h
+++ b/include/rocksdb/metadata.h
@@ -117,13 +117,13 @@ struct SstFileMetaData {
// An SST file may be generated by compactions whose input files may
// in turn be generated by earlier compactions. The creation time of the
// oldest SST file that is the compaction ancester of this file.
- // The timestamp is provided Env::GetCurrentTime().
+ // The timestamp is provided SystemClock::GetCurrentTime().
// 0 if the information is not available.
//
// Note: for TTL blob files, it contains the start of the expiration range.
uint64_t oldest_ancester_time;
- // Timestamp when the SST file is created, provided by Env::GetCurrentTime().
- // 0 if the information is not available.
+ // Timestamp when the SST file is created, provided by
+ // SystemClock::GetCurrentTime(). 0 if the information is not available.
uint64_t file_creation_time;
// The checksum of a SST file, the value is decided by the file content and
diff --git a/logging/env_logger.h b/logging/env_logger.h
index eb0e55df8..e8e9f1abe 100644
--- a/logging/env_logger.h
+++ b/logging/env_logger.h
@@ -31,9 +31,10 @@ class EnvLogger : public Logger {
const std::string& fname, const EnvOptions& options, Env* env,
InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL)
: Logger(log_level),
- file_(std::move(writable_file), fname, options, env->GetSystemClock()),
- last_flush_micros_(0),
env_(env),
+ clock_(env_->GetSystemClock().get()),
+ file_(std::move(writable_file), fname, options, clock_),
+ last_flush_micros_(0),
flush_pending_(false) {}
~EnvLogger() {
@@ -50,7 +51,7 @@ class EnvLogger : public Logger {
flush_pending_ = false;
file_.Flush().PermitUncheckedError();
}
- last_flush_micros_ = env_->NowMicros();
+ last_flush_micros_ = clock_->NowMicros();
}
void Flush() override {
@@ -136,7 +137,7 @@ class EnvLogger : public Logger {
// We will ignore any error returned by Append().
file_.Append(Slice(base, p - base)).PermitUncheckedError();
flush_pending_ = true;
- const uint64_t now_micros = env_->NowMicros();
+ const uint64_t now_micros = clock_->NowMicros();
if (now_micros - last_flush_micros_ >= flush_every_seconds_ * 1000000) {
FlushLocked();
}
@@ -154,11 +155,12 @@ class EnvLogger : public Logger {
}
private:
+ Env* env_;
+ SystemClock* clock_;
WritableFileWriter file_;
mutable port::Mutex mutex_; // Mutex to protect the shared variables below.
const static uint64_t flush_every_seconds_ = 5;
std::atomic_uint_fast64_t last_flush_micros_;
- Env* env_;
std::atomic<bool> flush_pending_;
};
diff --git a/memtable/memtablerep_bench.cc b/memtable/memtablerep_bench.cc
index 68d6727fa..d476d03fb 100644
--- a/memtable/memtablerep_bench.cc
+++ b/memtable/memtablerep_bench.cc
@@ -418,7 +418,7 @@ class Benchmark {
uint64_t bytes_written = 0;
uint64_t bytes_read = 0;
uint64_t read_hits = 0;
- StopWatchNano timer(SystemClock::Default(), true);
+ StopWatchNano timer(SystemClock::Default().get(), true);
RunThreads(&threads, &bytes_written, &bytes_read, true, &read_hits);
auto elapsed_time = static_cast<double>(timer.ElapsedNanos() / 1000);
std::cout << "Elapsed time: " << static_cast<int>(elapsed_time) << " us"
diff --git a/monitoring/instrumented_mutex.cc b/monitoring/instrumented_mutex.cc
index 989c091c3..adca63f26 100644
--- a/monitoring/instrumented_mutex.cc
+++ b/monitoring/instrumented_mutex.cc
@@ -13,9 +13,8 @@
namespace ROCKSDB_NAMESPACE {
namespace {
#ifndef NPERF_CONTEXT
-Statistics* stats_for_report(const std::shared_ptr<SystemClock>& clock,
- Statistics* stats) {
- if (clock.get() != nullptr && stats != nullptr &&
+Statistics* stats_for_report(SystemClock* clock, Statistics* stats) {
+ if (clock != nullptr && stats != nullptr &&
stats->get_stats_level() > kExceptTimeForMutex) {
return stats;
} else {
diff --git a/monitoring/instrumented_mutex.h b/monitoring/instrumented_mutex.h
index bd68825cc..19af1b473 100644
--- a/monitoring/instrumented_mutex.h
+++ b/monitoring/instrumented_mutex.h
@@ -22,12 +22,10 @@ class InstrumentedMutex {
explicit InstrumentedMutex(bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), clock_(nullptr), stats_code_(0) {}
- explicit InstrumentedMutex(const std::shared_ptr<SystemClock>& clock,
- bool adaptive = false)
+ explicit InstrumentedMutex(SystemClock* clock, bool adaptive = false)
: mutex_(adaptive), stats_(nullptr), clock_(clock), stats_code_(0) {}
- InstrumentedMutex(Statistics* stats,
- const std::shared_ptr<SystemClock>& clock, int stats_code,
+ InstrumentedMutex(Statistics* stats, SystemClock* clock, int stats_code,
bool adaptive = false)
: mutex_(adaptive),
stats_(stats),
@@ -49,7 +47,7 @@ class InstrumentedMutex {
friend class InstrumentedCondVar;
port::Mutex mutex_;
Statistics* stats_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
int stats_code_;
};
@@ -96,7 +94,7 @@ class InstrumentedCondVar {
bool TimedWaitInternal(uint64_t abs_time_us);
port::CondVar cond_;
Statistics* stats_;
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
int stats_code_;
};
diff --git a/monitoring/iostats_context_imp.h b/monitoring/iostats_context_imp.h
index c3a863eb5..92a3f7126 100644
--- a/monitoring/iostats_context_imp.h
+++ b/monitoring/iostats_context_imp.h
@@ -40,7 +40,7 @@ extern __thread IOStatsContext iostats_context;
// Declare and set start time of the timer
#define IOSTATS_CPU_TIMER_GUARD(metric, clock) \
PerfStepTimer iostats_step_timer_##metric( \
- &(iostats_context.metric), clock.get(), true, \
+ &(iostats_context.metric), clock, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
iostats_step_timer_##metric.Start();
diff --git a/monitoring/perf_context_imp.h b/monitoring/perf_context_imp.h
index 7af09193f..b7a56adef 100644
--- a/monitoring/perf_context_imp.h
+++ b/monitoring/perf_context_imp.h
@@ -46,14 +46,14 @@ extern thread_local PerfContext perf_context;
perf_step_timer_##metric.Start();
// Declare and set start time of the timer
-#define PERF_TIMER_GUARD_WITH_CLOCK(metric, clock) \
- PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), clock.get()); \
+#define PERF_TIMER_GUARD_WITH_CLOCK(metric, clock) \
+ PerfStepTimer perf_step_timer_##metric(&(perf_context.metric), clock); \
perf_step_timer_##metric.Start();
// Declare and set start time of the timer
#define PERF_CPU_TIMER_GUARD(metric, clock) \
PerfStepTimer perf_step_timer_##metric( \
- &(perf_context.metric), clock.get(), true, \
+ &(perf_context.metric), clock, true, \
PerfLevel::kEnableTimeAndCPUTimeExceptForMutex); \
perf_step_timer_##metric.Start();
diff --git a/options/cf_options.cc b/options/cf_options.cc
index bde569b33..96ac51541 100644
--- a/options/cf_options.cc
+++ b/options/cf_options.cc
@@ -818,6 +818,7 @@ ImmutableCFOptions::ImmutableCFOptions(const ImmutableDBOptions& db_options,
info_log_level(db_options.info_log_level),
env(db_options.env),
fs(db_options.fs.get()),
+ clock(db_options.clock),
allow_mmap_reads(db_options.allow_mmap_reads),
allow_mmap_writes(db_options.allow_mmap_writes),
db_paths(db_options.db_paths),
diff --git a/options/cf_options.h b/options/cf_options.h
index 7bbea71e4..3aae6c8c4 100644
--- a/options/cf_options.h
+++ b/options/cf_options.h
@@ -64,6 +64,8 @@ struct ImmutableCFOptions {
FileSystem* fs;
+ SystemClock* clock;
+
// Allow the OS to mmap file for reading sst tables. Default: false
bool allow_mmap_reads;
diff --git a/options/db_options.cc b/options/db_options.cc
index 9eb5f8f0b..69596bf1f 100644
--- a/options/db_options.cc
+++ b/options/db_options.cc
@@ -17,6 +17,7 @@
#include "rocksdb/file_system.h"
#include "rocksdb/rate_limiter.h"
#include "rocksdb/sst_file_manager.h"
+#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/options_type.h"
#include "rocksdb/wal_filter.h"
#include "util/string_util.h"
@@ -582,6 +583,11 @@ ImmutableDBOptions::ImmutableDBOptions(const DBOptions& options)
allow_data_in_errors(options.allow_data_in_errors),
db_host_id(options.db_host_id),
checksum_handoff_file_types(options.checksum_handoff_file_types) {
+ if (env != nullptr) {
+ clock = env->GetSystemClock().get();
+ } else {
+ clock = SystemClock::Default().get();
+ }
}
void ImmutableDBOptions::Dump(Logger* log) const {
diff --git a/options/db_options.h b/options/db_options.h
index a454e42b0..b7979dc86 100644
--- a/options/db_options.h
+++ b/options/db_options.h
@@ -11,6 +11,7 @@
#include "rocksdb/options.h"
namespace ROCKSDB_NAMESPACE {
+class SystemClock;
struct ImmutableDBOptions {
static const char* kName() { return "ImmutableDBOptions"; }
@@ -26,6 +27,7 @@ struct ImmutableDBOptions {
bool track_and_verify_wals_in_manifest;
Env* env;
std::shared_ptr<FileSystem> fs;
+ SystemClock* clock;
std::shared_ptr<RateLimiter> rate_limiter;
std::shared_ptr<SstFileManager> sst_file_manager;
std::shared_ptr<Logger> info_log;
diff --git a/port/win/env_win.cc b/port/win/env_win.cc
index c28144938..408a84aae 100644
--- a/port/win/env_win.cc
+++ b/port/win/env_win.cc
@@ -1059,7 +1059,7 @@ IOStatus WinFileSystem::NewLogger(const std::string& fname,
// Set creation, last access and last write time to the same value
SetFileTime(hFile, &ft, &ft, &ft);
}
- result->reset(new WinLogger(&WinEnvThreads::gettid, clock_, hFile));
+ result->reset(new WinLogger(&WinEnvThreads::gettid, clock_.get(), hFile));
}
return s;
}
diff --git a/port/win/win_logger.cc b/port/win/win_logger.cc
index c1ca97717..a45f3c6d4 100644
--- a/port/win/win_logger.cc
+++ b/port/win/win_logger.cc
@@ -32,8 +32,7 @@ namespace ROCKSDB_NAMESPACE {
namespace port {
-WinLogger::WinLogger(uint64_t (*gettid)(),
- const std::shared_ptr<SystemClock>& clock, HANDLE file,
+WinLogger::WinLogger(uint64_t (*gettid)(), SystemClock* clock, HANDLE file,
const InfoLogLevel log_level)
: Logger(log_level),
file_(file),
diff --git a/port/win/win_logger.h b/port/win/win_logger.h
index 7cd39f6c9..809c7d5a2 100644
--- a/port/win/win_logger.h
+++ b/port/win/win_logger.h
@@ -26,8 +26,7 @@ class SystemClock;
namespace port {
class WinLogger : public ROCKSDB_NAMESPACE::Logger {
public:
- WinLogger(uint64_t (*gettid)(), const std::shared_ptr<SystemClock>& clock,
- HANDLE file,
+ WinLogger(uint64_t (*gettid)(), SystemClock* clock, HANDLE file,
const InfoLogLevel log_level = InfoLogLevel::ERROR_LEVEL);
virtual ~WinLogger();
@@ -54,7 +53,7 @@ protected:
uint64_t (*gettid_)(); // Return the thread id for the current thread
std::atomic_size_t log_size_;
std::atomic_uint_fast64_t last_flush_micros_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
bool flush_pending_;
Status CloseInternal();
diff --git a/table/block_based/block_based_table_builder.cc b/table/block_based/block_based_table_builder.cc
index bc7c113c6..c84444923 100644
--- a/table/block_based/block_based_table_builder.cc
+++ b/table/block_based/block_based_table_builder.cc
@@ -1083,7 +1083,7 @@ void BlockBasedTableBuilder::CompressAndVerifyBlock(
bool abort_compression = false;
StopWatchNano timer(
- r->ioptions.env->GetSystemClock(),
+ r->ioptions.clock,
ShouldReportDetailedTime(r->ioptions.env, r->ioptions.statistics));
if (is_status_ok && raw_block_contents.size() < kCompressionSizeLimit) {
@@ -1177,7 +1177,7 @@ void BlockBasedTableBuilder::WriteRawBlock(const Slice& block_contents,
Rep* r = rep_;
Status s = Status::OK();
IOStatus io_s = IOStatus::OK();
- StopWatch sw(r->ioptions.env->GetSystemClock(), r->ioptions.statistics,
+ StopWatch sw(r->ioptions.clock, r->ioptions.statistics,
WRITE_RAW_BLOCK_MICROS);
handle->set_offset(r->get_offset());
handle->set_size(block_contents.size());
diff --git a/table/block_based/block_based_table_reader.cc b/table/block_based/block_based_table_reader.cc
index e210455fa..30593aaa2 100644
--- a/table/block_based/block_based_table_reader.cc
+++ b/table/block_based/block_based_table_reader.cc
@@ -1501,7 +1501,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
CompressionType raw_block_comp_type;
BlockContents raw_block_contents;
if (!contents) {
- StopWatch sw(rep_->clock, statistics, READ_BLOCK_GET_MICROS);
+ StopWatch sw(rep_->ioptions.clock, statistics, READ_BLOCK_GET_MICROS);
BlockFetcher block_fetcher(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle,
&raw_block_contents, rep_->ioptions, do_uncompress,
@@ -1590,7 +1590,7 @@ Status BlockBasedTable::MaybeReadBlockAndLoadToCache(
// Avoid making copy of block_key and cf_name when constructing the access
// record.
BlockCacheTraceRecord access_record(
- rep_->clock->NowMicros(),
+ rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", trace_block_type,
/*block_size=*/usage, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
@@ -1935,7 +1935,8 @@ Status BlockBasedTable::RetrieveBlock(
std::unique_ptr<TBlocklike> block;
{
- StopWatch sw(rep_->clock, rep_->ioptions.statistics, READ_BLOCK_GET_MICROS);
+ StopWatch sw(rep_->ioptions.clock, rep_->ioptions.statistics,
+ READ_BLOCK_GET_MICROS);
s = ReadBlockFromFile(
rep_->file.get(), prefetch_buffer, rep_->footer, ro, handle, &block,
rep_->ioptions, do_uncompress, maybe_compressed, block_type,
@@ -2427,7 +2428,7 @@ Status BlockBasedTable::Get(const ReadOptions& read_options, const Slice& key,
referenced_key = key;
}
BlockCacheTraceRecord access_record(
- rep_->clock->NowMicros(),
+ rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", lookup_data_block_context.block_type,
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
@@ -2763,7 +2764,7 @@ void BlockBasedTable::MultiGet(const ReadOptions& read_options,
referenced_key = key;
}
BlockCacheTraceRecord access_record(
- rep_->clock->NowMicros(),
+ rep_->ioptions.clock->NowMicros(),
/*block_key=*/"", lookup_data_block_context.block_type,
lookup_data_block_context.block_size, rep_->cf_id_for_tracing(),
/*cf_name=*/"", rep_->level_for_tracing(),
diff --git a/table/block_based/block_based_table_reader.h b/table/block_based/block_based_table_reader.h
index 1cb978e0d..a5e563b34 100644
--- a/table/block_based/block_based_table_reader.h
+++ b/table/block_based/block_based_table_reader.h
@@ -32,7 +32,6 @@ class Footer;
class InternalKeyComparator;
class Iterator;
class FSRandomAccessFile;
-class SystemClock;
class TableCache;
class TableReader;
class WritableFile;
@@ -521,7 +520,6 @@ struct BlockBasedTable::Rep {
file_size(_file_size),
level(_level),
immortal_table(_immortal_table) {
- clock = ioptions.env->GetSystemClock();
}
~Rep() { status.PermitUncheckedError(); }
const ImmutableCFOptions& ioptions;
@@ -529,7 +527,6 @@ struct BlockBasedTable::Rep {
const BlockBasedTableOptions table_options;
const FilterPolicy* const filter_policy;
const InternalKeyComparator& internal_comparator;
- std::shared_ptr<SystemClock> clock;
Status status;
std::unique_ptr<RandomAccessFileReader> file;
char cache_key_prefix[kMaxCacheKeyPrefixSize];
diff --git a/table/block_based/block_based_table_reader_test.cc b/table/block_based/block_based_table_reader_test.cc
index 36ec3c4d3..30b979d38 100644
--- a/table/block_based/block_based_table_reader_test.cc
+++ b/table/block_based/block_based_table_reader_test.cc
@@ -135,8 +135,8 @@ class BlockBasedTableReaderTest
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
- reader->reset(
- new RandomAccessFileReader(std::move(f), path, env_->GetSystemClock()));
+ reader->reset(new RandomAccessFileReader(std::move(f), path,
+ env_->GetSystemClock().get()));
}
std::string ToInternalKey(const std::string& key) {
diff --git a/table/block_fetcher_test.cc b/table/block_fetcher_test.cc
index ed52a0c99..9eba26beb 100644
--- a/table/block_fetcher_test.cc
+++ b/table/block_fetcher_test.cc
@@ -267,8 +267,8 @@ class BlockFetcherTest : public testing::Test {
std::string path = Path(filename);
std::unique_ptr<FSRandomAccessFile> f;
ASSERT_OK(fs_->NewRandomAccessFile(path, opt, &f, nullptr));
- reader->reset(
- new RandomAccessFileReader(std::move(f), path, env_->GetSystemClock()));
+ reader->reset(new RandomAccessFileReader(std::move(f), path,
+ env_->GetSystemClock().get()));
}
void NewTableReader(const ImmutableCFOptions& ioptions,
diff --git a/table/format.cc b/table/format.cc
index 66acb8837..4bfadac7e 100644
--- a/table/format.cc
+++ b/table/format.cc
@@ -353,9 +353,8 @@ Status UncompressBlockContentsForCompressionType(
assert(uncompression_info.type() != kNoCompression &&
"Invalid compression type");
- StopWatchNano timer(
- ioptions.env->GetSystemClock(),
- ShouldReportDetailedTime(ioptions.env, ioptions.statistics));
+ StopWatchNano timer(ioptions.clock, ShouldReportDetailedTime(
+ ioptions.env, ioptions.statistics));
size_t uncompressed_size = 0;
CacheAllocationPtr ubuf =
UncompressData(uncompression_info, data, n, &uncompressed_size,
diff --git a/table/get_context.cc b/table/get_context.cc
index 1c5e12c83..948c21b36 100644
--- a/table/get_context.cc
+++ b/table/get_context.cc
@@ -44,10 +44,9 @@ GetContext::GetContext(
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge,
- SequenceNumber* _max_covering_tombstone_seq,
- const std::shared_ptr<SystemClock>& clock, SequenceNumber* seq,
- PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
- bool* is_blob_index, uint64_t tracing_get_id)
+ SequenceNumber* _max_covering_tombstone_seq, SystemClock* clock,
+ SequenceNumber* seq, PinnedIteratorsManager* _pinned_iters_mgr,
+ ReadCallback* callback, bool* is_blob_index, uint64_t tracing_get_id)
: ucmp_(ucmp),
merge_operator_(merge_operator),
logger_(logger),
@@ -78,7 +77,7 @@ GetContext::GetContext(
Statistics* statistics, GetState init_state, const Slice& user_key,
PinnableSlice* pinnable_val, bool* value_found, MergeContext* merge_context,
bool do_merge, SequenceNumber* _max_covering_tombstone_seq,
- const std::shared_ptr<SystemClock>& clock, SequenceNumber* seq,
+ SystemClock* clock, SequenceNumber* seq,
PinnedIteratorsManager* _pinned_iters_mgr, ReadCallback* callback,
bool* is_blob_index, uint64_t tracing_get_id)
: GetContext(ucmp, merge_operator, logger, statistics, init_state, user_key,
diff --git a/table/get_context.h b/table/get_context.h
index 34434e10d..97ee9d751 100644
--- a/table/get_context.h
+++ b/table/get_context.h
@@ -99,8 +99,7 @@ class GetContext {
Logger* logger, Statistics* statistics, GetState init_state,
const Slice& user_key, PinnableSlice* value, bool* value_found,
MergeContext* merge_context, bool do_merge,
- SequenceNumber* max_covering_tombstone_seq,
- const std::shared_ptr<SystemClock>& clock,
+ SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
@@ -110,8 +109,7 @@ class GetContext {
const Slice& user_key, PinnableSlice* value,
std::string* timestamp, bool* value_found,
MergeContext* merge_context, bool do_merge,
- SequenceNumber* max_covering_tombstone_seq,
- const std::shared_ptr<SystemClock>& clock,
+ SequenceNumber* max_covering_tombstone_seq, SystemClock* clock,
SequenceNumber* seq = nullptr,
PinnedIteratorsManager* _pinned_iters_mgr = nullptr,
ReadCallback* callback = nullptr, bool* is_blob_index = nullptr,
@@ -185,7 +183,7 @@ class GetContext {
bool* value_found_; // Is value set correctly? Used by KeyMayExist
MergeContext* merge_context_;
SequenceNumber* max_covering_tombstone_seq_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// If a key is found, seq_ will be set to the SequenceNumber of most recent
// write to the key or kMaxSequenceNumber if unknown
SequenceNumber* seq_;
diff --git a/table/sst_file_writer.cc b/table/sst_file_writer.cc
index b4d021ec3..bc4361aeb 100644
--- a/table/sst_file_writer.cc
+++ b/table/sst_file_writer.cc
@@ -259,9 +259,8 @@ Status SstFileWriter::Open(const std::string& file_path) {
0 /* file_creation_time */, "SST Writer" /* db_id */, db_session_id);
FileTypeSet tmp_set = r->ioptions.checksum_handoff_file_types;
r->file_writer.reset(new WritableFileWriter(
- std::move(sst_file), file_path, r->env_options,
- r->ioptions.env->GetSystemClock(), nullptr /* io_tracer */,
- nullptr /* stats */, r->ioptions.listeners,
+ std::move(sst_file), file_path, r->env_options, r->ioptions.clock,
+ nullptr /* io_tracer */, nullptr /* stats */, r->ioptions.listeners,
r->ioptions.file_checksum_gen_factory,
tmp_set.Contains(FileType::kTableFile)));
diff --git a/table/table_reader_bench.cc b/table/table_reader_bench.cc
index 9982c5748..079029108 100644
--- a/table/table_reader_bench.cc
+++ b/table/table_reader_bench.cc
@@ -51,8 +51,7 @@ static std::string MakeKey(int i, int j, bool through_db) {
return key.Encode().ToString();
}
-uint64_t Now(const std::shared_ptr<SystemClock>& clock,
- bool measured_by_nanosecond) {
+uint64_t Now(SystemClock* clock, bool measured_by_nanosecond) {
return measured_by_nanosecond ? clock->NowNanos() : clock->NowMicros();
}
} // namespace
@@ -83,7 +82,7 @@ void TableReaderBenchmark(Options& opts, EnvOptions& env_options,
std::string dbname = test::PerThreadDBPath("rocksdb_table_reader_bench_db");
WriteOptions wo;
Env* env = Env::Default();
- const auto& clock = env->GetSystemClock();
+ auto* clock = env->GetSystemClock().get();
TableBuilder* tb = nullptr;
DB* db = nullptr;
Status s;
diff --git a/table/table_test.cc b/table/table_test.cc
index e1184f31b..74147ecbb 100644
--- a/table/table_test.cc
+++ b/table/table_test.cc
@@ -1140,7 +1140,8 @@ class BlockBasedTableTest
&trace_writer));
// Always return Status::OK().
assert(c->block_cache_tracer_
- .StartTrace(env_, trace_opt, std::move(trace_writer))
+ .StartTrace(env_->GetSystemClock().get(), trace_opt,
+ std::move(trace_writer))
.ok());
{
std::string user_key = "k01";
diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
index bffd480b4..29ec8cb91 100644
--- a/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
+++ b/tools/block_cache_analyzer/block_cache_trace_analyzer.cc
@@ -19,6 +19,7 @@
#include <sstream>
#include "monitoring/histogram.h"
+#include "rocksdb/system_clock.h"
#include "util/gflags_compat.h"
#include "util/string_util.h"
@@ -1519,6 +1520,7 @@ Status BlockCacheTraceAnalyzer::RecordAccess(
}
Status BlockCacheTraceAnalyzer::Analyze() {
+ SystemClock* clock = env_->GetSystemClock().get();
std::unique_ptr<BlockCacheTraceReader> reader;
Status s = Status::OK();
if (is_human_readable_trace_file_) {
@@ -1542,7 +1544,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
return s;
}
}
- uint64_t start = env_->NowMicros();
+ uint64_t start = clock->NowMicros();
uint64_t time_interval = 0;
while (s.ok()) {
BlockCacheTraceRecord access;
@@ -1568,7 +1570,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
cache_simulator_->Access(access);
}
access_sequence_number_++;
- uint64_t now = env_->NowMicros();
+ uint64_t now = clock->NowMicros();
uint64_t duration = (now - start) / kMicrosInSecond;
if (duration > 10 * time_interval) {
uint64_t trace_duration =
@@ -1582,7 +1584,7 @@ Status BlockCacheTraceAnalyzer::Analyze() {
time_interval++;
}
}
- uint64_t now = env_->NowMicros();
+ uint64_t now = clock->NowMicros();
uint64_t duration = (now - start) / kMicrosInSecond;
uint64_t trace_duration =
trace_end_timestamp_in_seconds_ - trace_start_timestamp_in_seconds_;
diff --git a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
index 442a6370e..84e2fe521 100644
--- a/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
+++ b/tools/block_cache_analyzer/block_cache_trace_analyzer_test.cc
@@ -225,7 +225,9 @@ TEST_F(BlockCacheTracerTest, BlockCacheAnalyzer) {
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
- BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
+ const auto& clock = env_->GetSystemClock();
+ BlockCacheTraceWriter writer(clock.get(), trace_opt,
+ std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceDataBlock, 50);
ASSERT_OK(env_->FileExists(trace_file_path_));
@@ -610,9 +612,11 @@ TEST_F(BlockCacheTracerTest, MixedBlocks) {
// kSSTStoringEvenKeys.
TraceOptions trace_opt;
std::unique_ptr<TraceWriter> trace_writer;
+ const auto& clock = env_->GetSystemClock();
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
- BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
+ BlockCacheTraceWriter writer(clock.get(), trace_opt,
+ std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
// Write blocks of different types.
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock,
diff --git a/tools/db_bench_tool.cc b/tools/db_bench_tool.cc
index 871269521..9630b779c 100644
--- a/tools/db_bench_tool.cc
+++ b/tools/db_bench_tool.cc
@@ -1854,7 +1854,8 @@ class ReporterAgent {
private:
std::string Header() const { return "secs_elapsed,interval_qps"; }
void SleepAndReport() {
- auto time_started = env_->NowMicros();
+ auto* clock = env_->GetSystemClock().get();
+ auto time_started = clock->NowMicros();
while (true) {
{
std::unique_lock<std::mutex> lk(mutex_);
@@ -1869,7 +1870,7 @@ class ReporterAgent {
auto total_ops_done_snapshot = total_ops_done_.load();
// round the seconds elapsed
auto secs_elapsed =
- (env_->NowMicros() - time_started + kMicrosInSecond / 2) /
+ (clock->NowMicros() - time_started + kMicrosInSecond / 2) /
kMicrosInSecond;
std::string report = ToString(secs_elapsed) + "," +
ToString(total_ops_done_snapshot - last_report_) +
@@ -1932,6 +1933,7 @@ static std::unordered_map<OperationType, std::string, std::hash<unsigned char>>
class CombinedStats;
class Stats {
private:
+ SystemClock* clock_;
int id_;
uint64_t start_ = 0;
uint64_t sine_interval_;
@@ -1951,7 +1953,7 @@ class Stats {
friend class CombinedStats;
public:
- Stats() { Start(-1); }
+ Stats() : clock_(FLAGS_env->GetSystemClock().get()) { Start(-1); }
void SetReporterAgent(ReporterAgent* reporter_agent) {
reporter_agent_ = reporter_agent;
@@ -1966,8 +1968,8 @@ class Stats {
last_report_done_ = 0;
bytes_ = 0;
seconds_ = 0;
- start_ = FLAGS_env->NowMicros();
- sine_interval_ = FLAGS_env->NowMicros();
+ start_ = clock_->NowMicros();
+ sine_interval_ = clock_->NowMicros();
finish_ = start_;
last_report_finish_ = start_;
message_.clear();
@@ -1999,7 +2001,7 @@ class Stats {
}
void Stop() {
- finish_ = FLAGS_env->NowMicros();
+ finish_ = clock_->NowMicros();
seconds_ = (finish_ - start_) * 1e-6;
}
@@ -2019,7 +2021,7 @@ class Stats {
"ElapsedTime", "Stage", "State", "OperationProperties");
int64_t current_time = 0;
- FLAGS_env->GetCurrentTime(&current_time);
+ clock_->GetCurrentTime(&current_time).PermitUncheckedError();
for (auto ts : thread_list) {
fprintf(stderr, "%18" PRIu64 " %10s %12s %20s %13s %45s %12s",
ts.thread_id,
@@ -2040,9 +2042,7 @@ class Stats {
}
}
- void ResetSineInterval() {
- sine_interval_ = FLAGS_env->NowMicros();
- }
+ void ResetSineInterval() { sine_interval_ = clock_->NowMicros(); }
uint64_t GetSineInterval() {
return sine_interval_;
@@ -2054,7 +2054,7 @@ class Stats {
void ResetLastOpTime() {
// Set to now to avoid latency from calls to SleepForMicroseconds
- last_op_finish_ = FLAGS_env->NowMicros();
+ last_op_finish_ = clock_->NowMicros();
}
void FinishedOps(DBWithColumnFamilies* db_with_cfh, DB* db, int64_t num_ops,
@@ -2063,7 +2063,7 @@ class Stats {
reporter_agent_->ReportFinishedOps(num_ops);
}
if (FLAGS_histogram) {
- uint64_t now = FLAGS_env->NowMicros();
+ uint64_t now = clock_->NowMicros();
uint64_t micros = now - last_op_finish_;
if (hist_.find(op_type) == hist_.end())
@@ -2092,7 +2092,7 @@ class Stats {
else next_report_ += 100000;
fprintf(stderr, "... finished %" PRIu64 " ops%30s\r", done_, "");
} else {
- uint64_t now = FLAGS_env->NowMicros();
+ uint64_t now = clock_->NowMicros();
int64_t usecs_since_last = now - last_report_finish_;
// Determine whether to print status where interval is either
@@ -2104,15 +2104,13 @@ class Stats {
next_report_ += FLAGS_stats_interval;
} else {
-
fprintf(stderr,
- "%s ... thread %d: (%" PRIu64 ",%" PRIu64 ") ops and "
+ "%s ... thread %d: (%" PRIu64 ",%" PRIu64
+ ") ops and "
"(%.1f,%.1f) ops/second in (%.6f,%.6f) seconds\n",
- FLAGS_env->TimeToString(now/1000000).c_str(),
- id_,
+ clock_->TimeToString(now / 1000000).c_str(), id_,
done_ - last_report_done_, done_,
- (done_ - last_report_done_) /
- (usecs_since_last / 1000000.0),
+ (done_ - last_report_done_) / (usecs_since_last / 1000000.0),
done_ / ((now - start_) / 1000000.0),
(now - last_report_finish_) / 1000000.0,
(now - start_) / 1000000.0);
diff --git a/trace_replay/block_cache_tracer.cc b/trace_replay/block_cache_tracer.cc
index a7bd39278..b9c7477fd 100644
--- a/trace_replay/block_cache_tracer.cc
+++ b/trace_replay/block_cache_tracer.cc
@@ -99,9 +99,9 @@ uint64_t BlockCacheTraceHelper::GetBlockOffsetInFile(
}
BlockCacheTraceWriter::BlockCacheTraceWriter(
- Env* env, const TraceOptions& trace_options,
+ SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
- : env_(env),
+ : clock_(clock),
trace_options_(trace_options),
trace_writer_(std::move(trace_writer)) {}
@@ -142,7 +142,7 @@ Status BlockCacheTraceWriter::WriteBlockAccess(
Status BlockCacheTraceWriter::WriteHeader() {
Trace trace;
- trace.ts = env_->NowMicros();
+ trace.ts = clock_->NowMicros();
trace.type = TraceType::kTraceBegin;
PutLengthPrefixedSlice(&trace.payload, kTraceMagic);
PutFixed32(&trace.payload, kMajorVersion);
@@ -444,7 +444,7 @@ BlockCacheTracer::BlockCacheTracer() { writer_.store(nullptr); }
BlockCacheTracer::~BlockCacheTracer() { EndTrace(); }
Status BlockCacheTracer::StartTrace(
- Env* env, const TraceOptions& trace_options,
+ SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
if (writer_.load()) {
@@ -453,7 +453,7 @@ Status BlockCacheTracer::StartTrace(
get_id_counter_.store(1);
trace_options_ = trace_options;
writer_.store(
- new BlockCacheTraceWriter(env, trace_options, std::move(trace_writer)));
+ new BlockCacheTraceWriter(clock, trace_options, std::move(trace_writer)));
return writer_.load()->WriteHeader();
}
diff --git a/trace_replay/block_cache_tracer.h b/trace_replay/block_cache_tracer.h
index 9dc62b7fb..23672e1df 100644
--- a/trace_replay/block_cache_tracer.h
+++ b/trace_replay/block_cache_tracer.h
@@ -9,13 +9,14 @@
#include <fstream>
#include "monitoring/instrumented_mutex.h"
-#include "rocksdb/env.h"
#include "rocksdb/options.h"
#include "rocksdb/trace_reader_writer.h"
#include "table/table_reader_caller.h"
#include "trace_replay/trace_replay.h"
namespace ROCKSDB_NAMESPACE {
+class Env;
+class SystemClock;
extern const uint64_t kMicrosInSecond;
extern const uint64_t kSecondInMinute;
@@ -172,7 +173,7 @@ struct BlockCacheTraceHeader {
// payload.
class BlockCacheTraceWriter {
public:
- BlockCacheTraceWriter(Env* env, const TraceOptions& trace_options,
+ BlockCacheTraceWriter(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
~BlockCacheTraceWriter() = default;
// No copy and move.
@@ -191,7 +192,7 @@ class BlockCacheTraceWriter {
Status WriteHeader();
private:
- Env* env_;
+ SystemClock* clock_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_;
};
@@ -266,7 +267,7 @@ class BlockCacheTracer {
BlockCacheTracer& operator=(BlockCacheTracer&&) = delete;
// Start writing block cache accesses to the trace_writer.
- Status StartTrace(Env* env, const TraceOptions& trace_options,
+ Status StartTrace(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
// Stop writing block cache accesses to the trace_writer.
diff --git a/trace_replay/block_cache_tracer_test.cc b/trace_replay/block_cache_tracer_test.cc
index bccf6ef61..01b834ed0 100644
--- a/trace_replay/block_cache_tracer_test.cc
+++ b/trace_replay/block_cache_tracer_test.cc
@@ -28,6 +28,7 @@ class BlockCacheTracerTest : public testing::Test {
BlockCacheTracerTest() {
test_path_ = test::PerThreadDBPath("block_cache_tracer_test");
env_ = ROCKSDB_NAMESPACE::Env::Default();
+ clock_ = env_->GetSystemClock().get();
EXPECT_OK(env_->CreateDir(test_path_));
trace_file_path_ = test_path_ + "/block_cache_trace";
}
@@ -64,7 +65,7 @@ class BlockCacheTracerTest : public testing::Test {
record.block_type = block_type;
record.block_size = kBlockSize + key_id;
record.block_key = (kBlockKeyPrefix + std::to_string(key_id));
- record.access_timestamp = env_->NowMicros();
+ record.access_timestamp = clock_->NowMicros();
record.cf_id = kCFId;
record.cf_name = kDefaultColumnFamilyName;
record.caller = GetCaller(key_id);
@@ -94,7 +95,7 @@ class BlockCacheTracerTest : public testing::Test {
record.block_type = TraceType::kBlockTraceDataBlock;
record.block_size = kBlockSize;
record.block_key = kBlockKeyPrefix + std::to_string(key_id);
- record.access_timestamp = env_->NowMicros();
+ record.access_timestamp = clock_->NowMicros();
record.cf_id = kCFId;
record.cf_name = kDefaultColumnFamilyName;
record.caller = GetCaller(key_id);
@@ -151,6 +152,7 @@ class BlockCacheTracerTest : public testing::Test {
}
Env* env_;
+ SystemClock* clock_;
EnvOptions env_options_;
std::string trace_file_path_;
std::string test_path_;
@@ -188,7 +190,7 @@ TEST_F(BlockCacheTracerTest, AtomicWrite) {
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTracer writer;
- ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
record.referenced_key));
ASSERT_OK(env_->FileExists(trace_file_path_));
@@ -214,8 +216,8 @@ TEST_F(BlockCacheTracerTest, ConsecutiveStartTrace) {
ASSERT_OK(
NewFileTraceWriter(env_, env_options_, trace_file_path_, &trace_writer));
BlockCacheTracer writer;
- ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
- ASSERT_NOK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
+ ASSERT_NOK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
ASSERT_OK(env_->FileExists(trace_file_path_));
}
@@ -227,7 +229,7 @@ TEST_F(BlockCacheTracerTest, AtomicNoWriteAfterEndTrace) {
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
BlockCacheTracer writer;
- ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
ASSERT_OK(writer.WriteBlockAccess(record, record.block_key, record.cf_name,
record.referenced_key));
writer.EndTrace();
@@ -262,7 +264,7 @@ TEST_F(BlockCacheTracerTest, NextGetId) {
// next get id should always return 0 before we call StartTrace.
ASSERT_EQ(0, writer.NextGetId());
ASSERT_EQ(0, writer.NextGetId());
- ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
ASSERT_EQ(1, writer.NextGetId());
ASSERT_EQ(2, writer.NextGetId());
writer.EndTrace();
@@ -276,7 +278,7 @@ TEST_F(BlockCacheTracerTest, NextGetId) {
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
- ASSERT_OK(writer.StartTrace(env_, trace_opt, std::move(trace_writer)));
+ ASSERT_OK(writer.StartTrace(clock_, trace_opt, std::move(trace_writer)));
ASSERT_EQ(1, writer.NextGetId());
}
}
@@ -288,7 +290,7 @@ TEST_F(BlockCacheTracerTest, MixedBlocks) {
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
- BlockCacheTraceWriter writer(env_, trace_opt, std::move(trace_writer));
+ BlockCacheTraceWriter writer(clock_, trace_opt, std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
// Write blocks of different types.
WriteBlockAccess(&writer, 0, TraceType::kBlockTraceUncompressionDictBlock,
diff --git a/trace_replay/io_tracer.cc b/trace_replay/io_tracer.cc
index c9eda4b82..5d57b8396 100644
--- a/trace_replay/io_tracer.cc
+++ b/trace_replay/io_tracer.cc
@@ -19,7 +19,7 @@
#include "util/string_util.h"
namespace ROCKSDB_NAMESPACE {
-IOTraceWriter::IOTraceWriter(const std::shared_ptr<SystemClock>& clock,
+IOTraceWriter::IOTraceWriter(SystemClock* clock,
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: clock_(clock),
@@ -220,7 +220,7 @@ IOTracer::IOTracer() : tracing_enabled(false) { writer_.store(nullptr); }
IOTracer::~IOTracer() { EndIOTrace(); }
-Status IOTracer::StartIOTrace(const std::shared_ptr<SystemClock>& clock,
+Status IOTracer::StartIOTrace(SystemClock* clock,
const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer) {
InstrumentedMutexLock lock_guard(&trace_writer_mutex_);
diff --git a/trace_replay/io_tracer.h b/trace_replay/io_tracer.h
index bdf346814..07355201d 100644
--- a/trace_replay/io_tracer.h
+++ b/trace_replay/io_tracer.h
@@ -93,8 +93,7 @@ struct IOTraceHeader {
// timestamp and type, followed by the trace payload.
class IOTraceWriter {
public:
- IOTraceWriter(const std::shared_ptr<SystemClock>& clock,
- const TraceOptions& trace_options,
+ IOTraceWriter(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
~IOTraceWriter() = default;
// No copy and move.
@@ -110,7 +109,7 @@ class IOTraceWriter {
Status WriteHeader();
private:
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_;
};
@@ -168,8 +167,7 @@ class IOTracer {
// Start writing IO operations to the trace_writer.
TSAN_SUPPRESSION Status
- StartIOTrace(const std::shared_ptr<SystemClock>& clock,
- const TraceOptions& trace_options,
+ StartIOTrace(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
// Stop writing IO operations to the trace_writer.
diff --git a/trace_replay/io_tracer_test.cc b/trace_replay/io_tracer_test.cc
index 78e14969d..66ae9b2ad 100644
--- a/trace_replay/io_tracer_test.cc
+++ b/trace_replay/io_tracer_test.cc
@@ -23,6 +23,7 @@ class IOTracerTest : public testing::Test {
IOTracerTest() {
test_path_ = test::PerThreadDBPath("io_tracer_test");
env_ = ROCKSDB_NAMESPACE::Env::Default();
+ clock_ = env_->GetSystemClock().get();
EXPECT_OK(env_->CreateDir(test_path_));
trace_file_path_ = test_path_ + "/io_trace";
}
@@ -79,6 +80,7 @@ class IOTracerTest : public testing::Test {
}
Env* env_;
+ SystemClock* clock_;
EnvOptions env_options_;
std::string trace_file_path_;
std::string test_path_;
@@ -92,8 +94,7 @@ TEST_F(IOTracerTest, MultipleRecordsWithDifferentIOOpOptions) {
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
IOTracer writer;
- ASSERT_OK(writer.StartIOTrace(env_->GetSystemClock(), trace_opt,
- std::move(trace_writer)));
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
// Write general record.
IOTraceRecord record0(0, TraceType::kIOTracer, 0 /*io_op_data*/,
@@ -202,8 +203,7 @@ TEST_F(IOTracerTest, AtomicWrite) {
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
IOTracer writer;
- ASSERT_OK(writer.StartIOTrace(env_->GetSystemClock(), trace_opt,
- std::move(trace_writer)));
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
writer.WriteIOOp(record);
ASSERT_OK(env_->FileExists(trace_file_path_));
}
@@ -266,8 +266,7 @@ TEST_F(IOTracerTest, AtomicNoWriteAfterEndTrace) {
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
IOTracer writer;
- ASSERT_OK(writer.StartIOTrace(env_->GetSystemClock(), trace_opt,
- std::move(trace_writer)));
+ ASSERT_OK(writer.StartIOTrace(clock_, trace_opt, std::move(trace_writer)));
writer.WriteIOOp(record);
writer.EndIOTrace();
// Write the record again. This time the record should not be written since
@@ -302,8 +301,7 @@ TEST_F(IOTracerTest, AtomicMultipleWrites) {
std::unique_ptr<TraceWriter> trace_writer;
ASSERT_OK(NewFileTraceWriter(env_, env_options_, trace_file_path_,
&trace_writer));
- IOTraceWriter writer(env_->GetSystemClock(), trace_opt,
- std::move(trace_writer));
+ IOTraceWriter writer(clock_, trace_opt, std::move(trace_writer));
ASSERT_OK(writer.WriteHeader());
// Write 10 records
WriteIOOp(&writer, 10);
diff --git a/trace_replay/trace_replay.cc b/trace_replay/trace_replay.cc
index 37b9a9416..bf00f0468 100644
--- a/trace_replay/trace_replay.cc
+++ b/trace_replay/trace_replay.cc
@@ -185,8 +185,7 @@ void TracerHelper::DecodeIterPayload(Trace* trace, IterPayload* iter_payload) {
}
}
-Tracer::Tracer(const std::shared_ptr<SystemClock>& clock,
- const TraceOptions& trace_options,
+Tracer::Tracer(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer)
: clock_(clock),
trace_options_(trace_options),
diff --git a/trace_replay/trace_replay.h b/trace_replay/trace_replay.h
index f0afcc45a..a42ec8fbb 100644
--- a/trace_replay/trace_replay.h
+++ b/trace_replay/trace_replay.h
@@ -150,8 +150,7 @@ class TracerHelper {
// timestamp and type, followed by the trace payload.
class Tracer {
public:
- Tracer(const std::shared_ptr<SystemClock>& clock,
- const TraceOptions& trace_options,
+ Tracer(SystemClock* clock, const TraceOptions& trace_options,
std::unique_ptr<TraceWriter>&& trace_writer);
~Tracer();
@@ -191,7 +190,7 @@ class Tracer {
// Returns true if a trace should be skipped, false otherwise.
bool ShouldSkipTrace(const TraceType& type);
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
TraceOptions trace_options_;
std::unique_ptr<TraceWriter> trace_writer_;
uint64_t trace_request_count_;
diff --git a/util/dynamic_bloom_test.cc b/util/dynamic_bloom_test.cc
index 0954f7239..9f032befd 100644
--- a/util/dynamic_bloom_test.cc
+++ b/util/dynamic_bloom_test.cc
@@ -178,7 +178,7 @@ TEST_F(DynamicBloomTest, VaryingLengths) {
TEST_F(DynamicBloomTest, perf) {
KeyMaker km;
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
uint32_t num_probes = static_cast<uint32_t>(FLAGS_num_probes);
if (!FLAGS_enable_perf) {
@@ -238,7 +238,7 @@ TEST_F(DynamicBloomTest, concurrent_with_perf) {
std::function<void(size_t)> adder([&](size_t t) {
KeyMaker km;
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
timer.Start();
for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) {
std_bloom.AddConcurrently(km.Seq(i));
@@ -261,7 +261,7 @@ TEST_F(DynamicBloomTest, concurrent_with_perf) {
elapsed = 0;
std::function<void(size_t)> hitter([&](size_t t) {
KeyMaker km;
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
timer.Start();
for (uint64_t i = 1 + t; i <= num_keys; i += num_threads) {
bool f = std_bloom.MayContain(km.Seq(i));
@@ -286,7 +286,7 @@ TEST_F(DynamicBloomTest, concurrent_with_perf) {
std::atomic<uint32_t> false_positives(0);
std::function<void(size_t)> misser([&](size_t t) {
KeyMaker km;
- StopWatchNano timer(SystemClock::Default());
+ StopWatchNano timer(SystemClock::Default().get());
timer.Start();
for (uint64_t i = num_keys + 1 + t; i <= 2 * num_keys; i += num_threads) {
bool f = std_bloom.MayContain(km.Seq(i));
diff --git a/util/filter_bench.cc b/util/filter_bench.cc
index 461b24dbd..da9ae49b9 100644
--- a/util/filter_bench.cc
+++ b/util/filter_bench.cc
@@ -360,7 +360,7 @@ void FilterBench::Go() {
}
ROCKSDB_NAMESPACE::StopWatchNano timer(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true);
+ ROCKSDB_NAMESPACE::SystemClock::Default().get(), true);
infos_.clear();
while ((working_mem_size_mb == 0 || total_size < max_mem) &&
@@ -600,7 +600,7 @@ double FilterBench::RandomQueryTest(uint32_t inside_threshold, bool dry_run,
}
ROCKSDB_NAMESPACE::StopWatchNano timer(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true);
+ ROCKSDB_NAMESPACE::SystemClock::Default().get(), true);
for (uint64_t q = 0; q < max_queries; q += batch_size) {
bool inside_this_time = random_.Next() <= inside_threshold;
diff --git a/util/rate_limiter.cc b/util/rate_limiter.cc
index 600e44f9e..cefd9e299 100644
--- a/util/rate_limiter.cc
+++ b/util/rate_limiter.cc
@@ -59,7 +59,7 @@ GenericRateLimiter::GenericRateLimiter(
exit_cv_(&request_mutex_),
requests_to_wait_(0),
available_bytes_(0),
- next_refill_us_(NowMicrosMonotonic(clock_)),
+ next_refill_us_(NowMicrosMonotonic()),
fairness_(fairness > 100 ? 100 : fairness),
rnd_((uint32_t)time(nullptr)),
leader_(nullptr),
@@ -67,7 +67,7 @@ GenericRateLimiter::GenericRateLimiter(
num_drains_(0),
prev_num_drains_(0),
max_bytes_per_sec_(rate_bytes_per_sec),
- tuned_time_(NowMicrosMonotonic(clock_)) {
+ tuned_time_(NowMicrosMonotonic()) {
total_requests_[0] = 0;
total_requests_[1] = 0;
total_bytes_through_[0] = 0;
@@ -109,7 +109,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
if (auto_tuned_) {
static const int kRefillsPerTune = 100;
- std::chrono::microseconds now(NowMicrosMonotonic(clock_));
+ std::chrono::microseconds now(NowMicrosMonotonic());
if (now - tuned_time_ >=
kRefillsPerTune * std::chrono::microseconds(refill_period_us_)) {
Status s = Tune();
@@ -150,7 +150,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
(!queue_[Env::IO_LOW].empty() &&
&r == queue_[Env::IO_LOW].front()))) {
leader_ = &r;
- int64_t delta = next_refill_us_ - NowMicrosMonotonic(clock_);
+ int64_t delta = next_refill_us_ - NowMicrosMonotonic();
delta = delta > 0 ? delta : 0;
if (delta == 0) {
timedout = true;
@@ -230,7 +230,7 @@ void GenericRateLimiter::Request(int64_t bytes, const Env::IOPriority pri,
void GenericRateLimiter::Refill() {
TEST_SYNC_POINT("GenericRateLimiter::Refill");
- next_refill_us_ = NowMicrosMonotonic(clock_) + refill_period_us_;
+ next_refill_us_ = NowMicrosMonotonic() + refill_period_us_;
// Carry over the left over quota from the last period
auto refill_bytes_per_period =
refill_bytes_per_period_.load(std::memory_order_relaxed);
@@ -285,7 +285,7 @@ Status GenericRateLimiter::Tune() {
const int kAllowedRangeFactor = 20;
std::chrono::microseconds prev_tuned_time = tuned_time_;
- tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic(clock_));
+ tuned_time_ = std::chrono::microseconds(NowMicrosMonotonic());
int64_t elapsed_intervals = (tuned_time_ - prev_tuned_time +
std::chrono::microseconds(refill_period_us_) -
diff --git a/util/rate_limiter.h b/util/rate_limiter.h
index bfb4f3502..ec391162b 100644
--- a/util/rate_limiter.h
+++ b/util/rate_limiter.h
@@ -74,9 +74,7 @@ class GenericRateLimiter : public RateLimiter {
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Status Tune();
- uint64_t NowMicrosMonotonic(const std::shared_ptr<SystemClock>& clock) {
- return clock->NowNanos() / std::milli::den;
- }
+ uint64_t NowMicrosMonotonic() { return clock_->NowNanos() / std::milli::den; }
// This mutex guard all internal states
mutable port::Mutex request_mutex_;
diff --git a/util/repeatable_thread.h b/util/repeatable_thread.h
index 1e12c4840..c75ad7c49 100644
--- a/util/repeatable_thread.h
+++ b/util/repeatable_thread.h
@@ -21,9 +21,8 @@ namespace ROCKSDB_NAMESPACE {
class RepeatableThread {
public:
RepeatableThread(std::function<void()> function,
- const std::string& thread_name,
- const std::shared_ptr<SystemClock>& clock, uint64_t delay_us,
- uint64_t initial_delay_us = 0)
+ const std::string& thread_name, SystemClock* clock,
+ uint64_t delay_us, uint64_t initial_delay_us = 0)
: function_(function),
thread_name_("rocksdb:" + thread_name),
clock_(clock),
@@ -129,8 +128,7 @@ class RepeatableThread {
const std::function<void()> function_;
const std::string thread_name_;
- const std::shared_ptr<SystemClock> clock_;
- ;
+ SystemClock* clock_;
const uint64_t delay_us_;
const uint64_t initial_delay_us_;
diff --git a/util/repeatable_thread_test.cc b/util/repeatable_thread_test.cc
index 5aa6b6c37..19f22d726 100644
--- a/util/repeatable_thread_test.cc
+++ b/util/repeatable_thread_test.cc
@@ -42,7 +42,7 @@ TEST_F(RepeatableThreadTest, TimedTest) {
test_cv.SignalAll();
}
},
- "rt_test", clock, 1 * kSecond);
+ "rt_test", clock.get(), 1 * kSecond);
// Wait for execution finish.
{
ROCKSDB_NAMESPACE::MutexLock l(&mutex);
@@ -91,7 +91,7 @@ TEST_F(RepeatableThreadTest, MockEnvTest) {
#endif // OS_MACOSX && !NDEBUG
ROCKSDB_NAMESPACE::RepeatableThread thread(
- [&] { count++; }, "rt_test", mock_clock_, 1 * kSecond, 1 * kSecond);
+ [&] { count++; }, "rt_test", mock_clock_.get(), 1 * kSecond, 1 * kSecond);
for (int i = 1; i <= kIteration; i++) {
// Bump current time
thread.TEST_WaitForRun([&] { mock_clock_->SetCurrentTime(i); });
diff --git a/util/ribbon_test.cc b/util/ribbon_test.cc
index ba40efcb0..c668cb576 100644
--- a/util/ribbon_test.cc
+++ b/util/ribbon_test.cc
@@ -705,7 +705,7 @@ TYPED_TEST(RibbonTypeParamTest, CompactnessAndBacktrackAndFpRate) {
cur = other_keys_begin;
{
ROCKSDB_NAMESPACE::StopWatchNano timer(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true);
+ ROCKSDB_NAMESPACE::SystemClock::Default().get(), true);
while (cur != other_keys_end) {
bool fp = soln.FilterQuery(*cur, hasher);
fp_count += fp ? 1 : 0;
@@ -734,7 +734,7 @@ TYPED_TEST(RibbonTypeParamTest, CompactnessAndBacktrackAndFpRate) {
Index ifp_count = 0;
cur = other_keys_begin;
ROCKSDB_NAMESPACE::StopWatchNano timer(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true);
+ ROCKSDB_NAMESPACE::SystemClock::Default().get(), true);
while (cur != other_keys_end) {
ifp_count += isoln.FilterQuery(*cur, hasher) ? 1 : 0;
++cur;
@@ -768,7 +768,7 @@ TYPED_TEST(RibbonTypeParamTest, CompactnessAndBacktrackAndFpRate) {
Index bfp_count = 0;
cur = other_keys_begin;
ROCKSDB_NAMESPACE::StopWatchNano timer(
- ROCKSDB_NAMESPACE::SystemClock::Default(), true);
+ ROCKSDB_NAMESPACE::SystemClock::Default().get(), true);
while (cur != other_keys_end) {
uint64_t h = hasher.GetHash(*cur);
uint32_t h1 = ROCKSDB_NAMESPACE::Lower32of64(h);
diff --git a/util/stop_watch.h b/util/stop_watch.h
index b6421c631..e26380d97 100644
--- a/util/stop_watch.h
+++ b/util/stop_watch.h
@@ -14,7 +14,7 @@ namespace ROCKSDB_NAMESPACE {
// and overwrite is true, it will be added to *elapsed if overwrite is false.
class StopWatch {
public:
- StopWatch(const std::shared_ptr<SystemClock>& clock, Statistics* statistics,
+ StopWatch(SystemClock* clock, Statistics* statistics,
const uint32_t hist_type, uint64_t* elapsed = nullptr,
bool overwrite = true, bool delay_enabled = false)
: clock_(clock),
@@ -73,7 +73,7 @@ class StopWatch {
uint64_t start_time() const { return start_time_; }
private:
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Statistics* statistics_;
const uint32_t hist_type_;
uint64_t* elapsed_;
@@ -88,8 +88,7 @@ class StopWatch {
// a nano second precision stopwatch
class StopWatchNano {
public:
- explicit StopWatchNano(const std::shared_ptr<SystemClock>& clock,
- bool auto_start = false)
+ explicit StopWatchNano(SystemClock* clock, bool auto_start = false)
: clock_(clock), start_(0) {
if (auto_start) {
Start();
@@ -112,7 +111,7 @@ class StopWatchNano {
}
private:
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
uint64_t start_;
};
diff --git a/util/timer.h b/util/timer.h
index 15520ec2f..736d0bf0a 100644
--- a/util/timer.h
+++ b/util/timer.h
@@ -38,7 +38,7 @@ namespace ROCKSDB_NAMESPACE {
// A map from a function name to the function keeps track of all the functions.
class Timer {
public:
- explicit Timer(const std::shared_ptr<SystemClock>& clock)
+ explicit Timer(SystemClock* clock)
: clock_(clock),
mutex_(clock),
cond_var_(&mutex_),
@@ -310,7 +310,7 @@ class Timer {
}
};
- const std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// This mutex controls both the heap_ and the map_. It needs to be held for
// making any changes in them.
mutable InstrumentedMutex mutex_;
diff --git a/util/timer_test.cc b/util/timer_test.cc
index 3158d7516..3407fe9ee 100644
--- a/util/timer_test.cc
+++ b/util/timer_test.cc
@@ -26,7 +26,7 @@ class TimerTest : public testing::Test {
TEST_F(TimerTest, SingleScheduleOnce) {
const int kInitDelayUs = 1 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
int count = 0;
timer.Add([&] { count++; }, "fn_sch_test", kInitDelayUs, 0);
@@ -45,7 +45,7 @@ TEST_F(TimerTest, SingleScheduleOnce) {
TEST_F(TimerTest, MultipleScheduleOnce) {
const int kInitDelay1Us = 1 * kUsPerSec;
const int kInitDelay2Us = 3 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
int count1 = 0;
timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Us, 0);
@@ -78,7 +78,7 @@ TEST_F(TimerTest, SingleScheduleRepeatedly) {
const int kInitDelayUs = 1 * kUsPerSec;
const int kRepeatUs = 1 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
int count = 0;
timer.Add([&] { count++; }, "fn_sch_test", kInitDelayUs, kRepeatUs);
@@ -108,7 +108,7 @@ TEST_F(TimerTest, MultipleScheduleRepeatedly) {
const int kRepeatUs = 2 * kUsPerSec;
const int kLargeRepeatUs = 100 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
int count1 = 0;
timer.Add([&] { count1++; }, "fn_sch_test1", kInitDelay1Us, kRepeatUs);
@@ -168,7 +168,7 @@ TEST_F(TimerTest, AddAfterStartTest) {
{{"Timer::Run::Waiting", "TimerTest:AddAfterStartTest:1"}});
SyncPoint::GetInstance()->EnableProcessing();
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
ASSERT_TRUE(timer.Start());
@@ -193,7 +193,7 @@ TEST_F(TimerTest, AddAfterStartTest) {
TEST_F(TimerTest, CancelRunningTask) {
static constexpr char kTestFuncName[] = "test_func";
const int kRepeatUs = 1 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
ASSERT_TRUE(timer.Start());
int* value = new int;
*value = 0;
@@ -229,7 +229,7 @@ TEST_F(TimerTest, ShutdownRunningTask) {
const int kRepeatUs = 1 * kUsPerSec;
constexpr char kTestFunc1Name[] = "test_func1";
constexpr char kTestFunc2Name[] = "test_func2";
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
@@ -268,7 +268,7 @@ TEST_F(TimerTest, AddSameFuncName) {
const int kRepeat1Us = 5 * kUsPerSec;
const int kRepeat2Us = 4 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
ASSERT_TRUE(timer.Start());
int func_counter1 = 0;
@@ -309,7 +309,7 @@ TEST_F(TimerTest, RepeatIntervalWithFuncRunningTime) {
const int kRepeatUs = 5 * kUsPerSec;
const int kFuncRunningTimeUs = 1 * kUsPerSec;
- Timer timer(mock_clock_);
+ Timer timer(mock_clock_.get());
ASSERT_TRUE(timer.Start());
int func_counter = 0;
@@ -348,7 +348,7 @@ TEST_F(TimerTest, DestroyRunningTimer) {
const int kInitDelayUs = 1 * kUsPerSec;
const int kRepeatUs = 1 * kUsPerSec;
- auto timer_ptr = new Timer(mock_clock_);
+ auto timer_ptr = new Timer(mock_clock_.get());
int count = 0;
timer_ptr->Add([&] { count++; }, "fn_sch_test", kInitDelayUs, kRepeatUs);
@@ -363,7 +363,7 @@ TEST_F(TimerTest, DestroyRunningTimer) {
TEST_F(TimerTest, DestroyTimerWithRunningFunc) {
const int kRepeatUs = 1 * kUsPerSec;
- auto timer_ptr = new Timer(mock_clock_);
+ auto timer_ptr = new Timer(mock_clock_.get());
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({
diff --git a/utilities/blob_db/blob_compaction_filter.cc b/utilities/blob_db/blob_compaction_filter.cc
index 9dde831c9..fb4c4466b 100644
--- a/utilities/blob_db/blob_compaction_filter.cc
+++ b/utilities/blob_db/blob_compaction_filter.cc
@@ -10,6 +10,7 @@
#include <cinttypes>
#include "db/dbformat.h"
+#include "rocksdb/system_clock.h"
#include "test_util/sync_point.h"
namespace ROCKSDB_NAMESPACE {
@@ -435,10 +436,10 @@ BlobIndexCompactionFilterFactoryBase::CreateUserCompactionFilterFromFactory(
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactory::CreateCompactionFilter(
const CompactionFilter::Context& _context) {
- assert(env());
+ assert(clock());
int64_t current_time = 0;
- Status s = env()->GetCurrentTime(&current_time);
+ Status s = clock()->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
@@ -460,10 +461,10 @@ BlobIndexCompactionFilterFactory::CreateCompactionFilter(
std::unique_ptr<CompactionFilter>
BlobIndexCompactionFilterFactoryGC::CreateCompactionFilter(
const CompactionFilter::Context& _context) {
- assert(env());
+ assert(clock());
int64_t current_time = 0;
- Status s = env()->GetCurrentTime(&current_time);
+ Status s = clock()->GetCurrentTime(&current_time);
if (!s.ok()) {
return nullptr;
}
diff --git a/utilities/blob_db/blob_compaction_filter.h b/utilities/blob_db/blob_compaction_filter.h
index ae6a0bc8e..1493cfc1a 100644
--- a/utilities/blob_db/blob_compaction_filter.h
+++ b/utilities/blob_db/blob_compaction_filter.h
@@ -10,12 +10,12 @@
#include "db/blob/blob_index.h"
#include "monitoring/statistics.h"
#include "rocksdb/compaction_filter.h"
-#include "rocksdb/env.h"
#include "utilities/blob_db/blob_db_gc_stats.h"
#include "utilities/blob_db/blob_db_impl.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
namespace ROCKSDB_NAMESPACE {
+class SystemClock;
namespace blob_db {
struct BlobCompactionContext {
@@ -136,11 +136,12 @@ class BlobIndexCompactionFilterGC : public BlobIndexCompactionFilterBase {
// that creates non-GC filters.
class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
public:
- BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl, Env* _env,
+ BlobIndexCompactionFilterFactoryBase(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
: blob_db_impl_(_blob_db_impl),
- env_(_env),
+ clock_(_clock),
statistics_(_statistics),
user_comp_filter_(_cf_options.compaction_filter),
user_comp_filter_factory_(_cf_options.compaction_filter_factory) {}
@@ -150,13 +151,13 @@ class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
const CompactionFilter::Context& context) const;
BlobDBImpl* blob_db_impl() const { return blob_db_impl_; }
- Env* env() const { return env_; }
+ SystemClock* clock() const { return clock_; }
Statistics* statistics() const { return statistics_; }
const CompactionFilter* user_comp_filter() const { return user_comp_filter_; }
private:
BlobDBImpl* blob_db_impl_;
- Env* env_;
+ SystemClock* clock_;
Statistics* statistics_;
const CompactionFilter* user_comp_filter_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
@@ -165,10 +166,11 @@ class BlobIndexCompactionFilterFactoryBase : public CompactionFilterFactory {
class BlobIndexCompactionFilterFactory
: public BlobIndexCompactionFilterFactoryBase {
public:
- BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl, Env* _env,
+ BlobIndexCompactionFilterFactory(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
- : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
_statistics) {}
const char* Name() const override {
@@ -182,10 +184,11 @@ class BlobIndexCompactionFilterFactory
class BlobIndexCompactionFilterFactoryGC
: public BlobIndexCompactionFilterFactoryBase {
public:
- BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl, Env* _env,
+ BlobIndexCompactionFilterFactoryGC(BlobDBImpl* _blob_db_impl,
+ SystemClock* _clock,
const ColumnFamilyOptions& _cf_options,
Statistics* _statistics)
- : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _env, _cf_options,
+ : BlobIndexCompactionFilterFactoryBase(_blob_db_impl, _clock, _cf_options,
_statistics) {}
const char* Name() const override {
diff --git a/utilities/blob_db/blob_db_impl.cc b/utilities/blob_db/blob_db_impl.cc
index 777f54887..e3e0befe4 100644
--- a/utilities/blob_db/blob_db_impl.cc
+++ b/utilities/blob_db/blob_db_impl.cc
@@ -91,7 +91,7 @@ BlobDBImpl::BlobDBImpl(const std::string& dbname,
fifo_eviction_seq_(0),
evict_expiration_up_to_(0),
debug_level_(0) {
- clock_ = env_->GetSystemClock();
+ clock_ = env_->GetSystemClock().get();
blob_dir_ = (bdb_options_.path_relative)
? dbname + "/" + bdb_options_.blob_dir
: bdb_options_.blob_dir;
@@ -196,12 +196,12 @@ Status BlobDBImpl::Open(std::vector<ColumnFamilyHandle*>* handles) {
db_options_.listeners.push_back(std::make_shared<BlobDBListenerGC>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactoryGC>(
- this, env_, cf_options_, statistics_);
+ this, clock_, cf_options_, statistics_);
} else {
db_options_.listeners.push_back(std::make_shared<BlobDBListener>(this));
cf_options_.compaction_filter_factory =
std::make_shared<BlobIndexCompactionFilterFactory>(
- this, env_, cf_options_, statistics_);
+ this, clock_, cf_options_, statistics_);
}
// Reset user compaction filter after building into compaction factory.
diff --git a/utilities/blob_db/blob_db_impl.h b/utilities/blob_db/blob_db_impl.h
index 14f45e5b6..ea7554e8f 100644
--- a/utilities/blob_db/blob_db_impl.h
+++ b/utilities/blob_db/blob_db_impl.h
@@ -403,7 +403,7 @@ class BlobDBImpl : public BlobDB {
// the base DB
DBImpl* db_impl_;
Env* env_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
// the options that govern the behavior of Blob Storage
BlobDBOptions bdb_options_;
DBOptions db_options_;
diff --git a/utilities/blob_db/blob_db_iterator.h b/utilities/blob_db/blob_db_iterator.h
index 8d474f393..fd2b2f8f5 100644
--- a/utilities/blob_db/blob_db_iterator.h
+++ b/utilities/blob_db/blob_db_iterator.h
@@ -22,7 +22,7 @@ using ROCKSDB_NAMESPACE::ManagedSnapshot;
class BlobDBIterator : public Iterator {
public:
BlobDBIterator(ManagedSnapshot* snapshot, ArenaWrappedDBIter* iter,
- BlobDBImpl* blob_db, const std::shared_ptr<SystemClock>& clock,
+ BlobDBImpl* blob_db, SystemClock* clock,
Statistics* statistics)
: snapshot_(snapshot),
iter_(iter),
@@ -140,7 +140,7 @@ class BlobDBIterator : public Iterator {
std::unique_ptr<ManagedSnapshot> snapshot_;
std::unique_ptr<ArenaWrappedDBIter> iter_;
BlobDBImpl* blob_db_;
- std::shared_ptr<SystemClock> clock_;
+ SystemClock* clock_;
Statistics* statistics_;
Status status_;
PinnableSlice value_;
diff --git a/utilities/persistent_cache/block_cache_tier_file.cc b/utilities/persistent_cache/block_cache_tier_file.cc
index 2dcabc850..149275fb2 100644
--- a/utilities/persistent_cache/block_cache_tier_file.cc
+++ b/utilities/persistent_cache/block_cache_tier_file.cc
@@ -221,7 +221,7 @@ bool RandomAccessCacheFile::OpenImpl(const bool enable_direct_reads) {
return false;
}
freader_.reset(new RandomAccessFileReader(std::move(file), Path(),
- env_->GetSystemClock()));
+ env_->GetSystemClock().get()));
return true;
}
diff --git a/utilities/persistent_cache/persistent_cache_bench.cc b/utilities/persistent_cache/persistent_cache_bench.cc
index 082010c34..ea41a83b8 100644
--- a/utilities/persistent_cache/persistent_cache_bench.cc
+++ b/utilities/persistent_cache/persistent_cache_bench.cc
@@ -127,7 +127,7 @@ class CacheTierBenchmark {
std::bind(&CacheTierBenchmark::Read, this));
// Wait till FLAGS_nsec and then signal to quit
- StopWatchNano t(SystemClock::Default(), /*auto_start=*/true);
+ StopWatchNano t(SystemClock::Default().get(), /*auto_start=*/true);
size_t sec = t.ElapsedNanos() / 1000000000ULL;
while (!quit_) {
sec = t.ElapsedNanos() / 1000000000ULL;
@@ -194,7 +194,7 @@ class CacheTierBenchmark {
auto block = NewBlock(key);
// insert
- StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
+ StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
while (true) {
Status status = cache_->Insert(block_key, block.get(), FLAGS_iosize);
if (status.ok()) {
@@ -226,7 +226,7 @@ class CacheTierBenchmark {
Slice key = FillKey(k, val);
// Lookup in cache
- StopWatchNano timer(SystemClock::Default(), /*auto_start=*/true);
+ StopWatchNano timer(SystemClock::Default().get(), /*auto_start=*/true);
std::unique_ptr<char[]> block;
size_t size;
Status status = cache_->Lookup(key, &block, &size);
diff --git a/utilities/persistent_cache/persistent_cache_tier.h b/utilities/persistent_cache/persistent_cache_tier.h
index 5e8b76209..286436da0 100644
--- a/utilities/persistent_cache/persistent_cache_tier.h
+++ b/utilities/persistent_cache/persistent_cache_tier.h
@@ -87,7 +87,8 @@ struct PersistentCacheConfig {
const std::shared_ptr<Logger>& _log,
const uint32_t _write_buffer_size = 1 * 1024 * 1024 /*1MB*/) {
env = _env;
- clock = (env != nullptr) ? env->GetSystemClock() : SystemClock::Default();
+ clock = (env != nullptr) ? env->GetSystemClock().get()
+ : SystemClock::Default().get();
path = _path;
log = _log;
cache_size = _cache_size;
@@ -129,7 +130,7 @@ struct PersistentCacheConfig {
// Env abstraction to use for system level operations
//
Env* env;
- std::shared_ptr<SystemClock> clock;
+ SystemClock* clock;
//
// Path for the block cache where blocks are persisted
//
diff --git a/utilities/transactions/pessimistic_transaction.cc b/utilities/transactions/pessimistic_transaction.cc
index 17ddd37ac..7d4276972 100644
--- a/utilities/transactions/pessimistic_transaction.cc
+++ b/utilities/transactions/pessimistic_transaction.cc
@@ -120,7 +120,7 @@ void PessimisticTransaction::Reinitialize(
bool PessimisticTransaction::IsExpired() const {
if (expiration_time_ > 0) {
- if (db_->GetEnv()->NowMicros() >= expiration_time_) {
+ if (dbimpl_->GetSystemClock()->NowMicros() >= expiration_time_) {
// Transaction is expired.
return true;
}
diff --git a/utilities/transactions/transaction_base.cc b/utilities/transactions/transaction_base.cc
index 1482dc961..49fa99d7d 100644
--- a/utilities/transactions/transaction_base.cc
+++ b/utilities/transactions/transaction_base.cc
@@ -28,7 +28,7 @@ TransactionBaseImpl::TransactionBaseImpl(
write_options_(write_options),
cmp_(GetColumnFamilyUserComparator(db->DefaultColumnFamily())),
lock_tracker_factory_(lock_tracker_factory),
- start_time_(db_->GetEnv()->NowMicros()),
+ start_time_(dbimpl_->GetSystemClock()->NowMicros()),
write_batch_(cmp_, 0, true, 0),
tracked_locks_(lock_tracker_factory_.Create()),
indexing_enabled_(true) {
@@ -67,7 +67,7 @@ void TransactionBaseImpl::Reinitialize(DB* db,
name_.clear();
log_number_ = 0;
write_options_ = write_options;
- start_time_ = db_->GetEnv()->NowMicros();
+ start_time_ = dbimpl_->GetSystemClock()->NowMicros();
indexing_enabled_ = true;
cmp_ = GetColumnFamilyUserComparator(db_->DefaultColumnFamily());
}
@@ -540,7 +540,7 @@ WriteBatchWithIndex* TransactionBaseImpl::GetWriteBatch() {
}
uint64_t TransactionBaseImpl::GetElapsedTime() const {
- return (db_->GetEnv()->NowMicros() - start_time_) / 1000;
+ return (dbimpl_->GetSystemClock()->NowMicros() - start_time_) / 1000;
}
uint64_t TransactionBaseImpl::GetNumPuts() const { return num_puts_; }
diff --git a/utilities/ttl/db_ttl_impl.cc b/utilities/ttl/db_ttl_impl.cc
index 0e0222e40..0463025f8 100644
--- a/utilities/ttl/db_ttl_impl.cc
+++ b/utilities/ttl/db_ttl_impl.cc
@@ -11,25 +11,26 @@
#include "rocksdb/convenience.h"
#include "rocksdb/env.h"
#include "rocksdb/iterator.h"
+#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/db_ttl.h"
#include "util/coding.h"
namespace ROCKSDB_NAMESPACE {
void DBWithTTLImpl::SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
- Env* env) {
+ SystemClock* clock) {
if (options->compaction_filter) {
options->compaction_filter =
- new TtlCompactionFilter(ttl, env, options->compaction_filter);
+ new TtlCompactionFilter(ttl, clock, options->compaction_filter);
} else {
options->compaction_filter_factory =
std::shared_ptr<CompactionFilterFactory>(new TtlCompactionFilterFactory(
- ttl, env, options->compaction_filter_factory));
+ ttl, clock, options->compaction_filter_factory));
}
if (options->merge_operator) {
options->merge_operator.reset(
- new TtlMergeOperator(options->merge_operator, env));
+ new TtlMergeOperator(options->merge_operator, clock));
}
}
@@ -97,12 +98,15 @@ Status DBWithTTL::Open(
"ttls size has to be the same as number of column families");
}
+ SystemClock* clock = (db_options.env == nullptr)
+ ? SystemClock::Default().get()
+ : db_options.env->GetSystemClock().get();
+
std::vector<ColumnFamilyDescriptor> column_families_sanitized =
column_families;
for (size_t i = 0; i < column_families_sanitized.size(); ++i) {
DBWithTTLImpl::SanitizeOptions(
- ttls[i], &column_families_sanitized[i].options,
- db_options.env == nullptr ? Env::Default() : db_options.env);
+ ttls[i], &column_families_sanitized[i].options, clock);
}
DB* db;
@@ -125,7 +129,8 @@ Status DBWithTTLImpl::CreateColumnFamilyWithTtl(
const ColumnFamilyOptions& options, const std::string& column_family_name,
ColumnFamilyHandle** handle, int ttl) {
ColumnFamilyOptions sanitized_options = options;
- DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options, GetEnv());
+ DBWithTTLImpl::SanitizeOptions(ttl, &sanitized_options,
+ GetEnv()->GetSystemClock().get());
return DBWithTTL::CreateColumnFamily(sanitized_options, column_family_name,
handle);
@@ -140,11 +145,11 @@ Status DBWithTTLImpl::CreateColumnFamily(const ColumnFamilyOptions& options,
// Appends the current timestamp to the string.
// Returns false if could not get the current_time, true if append succeeds
Status DBWithTTLImpl::AppendTS(const Slice& val, std::string* val_with_ts,
- Env* env) {
+ SystemClock* clock) {
val_with_ts->reserve(kTSLength + val.size());
char ts_string[kTSLength];
int64_t curtime;
- Status st = env->GetCurrentTime(&curtime);
+ Status st = clock->GetCurrentTime(&curtime);
if (!st.ok()) {
return st;
}
@@ -170,12 +175,13 @@ Status DBWithTTLImpl::SanityCheckTimestamp(const Slice& str) {
}
// Checks if the string is stale or not according to TTl provided
-bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl, Env* env) {
+bool DBWithTTLImpl::IsStale(const Slice& value, int32_t ttl,
+ SystemClock* clock) {
if (ttl <= 0) { // Data is fresh if TTL is non-positive
return false;
}
int64_t curtime;
- if (!env->GetCurrentTime(&curtime).ok()) {
+ if (!clock->GetCurrentTime(&curtime).ok()) {
return false; // Treat the data as fresh if could not get current time
}
int32_t timestamp_value =
@@ -273,12 +279,12 @@ Status DBWithTTLImpl::Merge(const WriteOptions& options,
Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
class Handler : public WriteBatch::Handler {
public:
- explicit Handler(Env* env) : env_(env) {}
+ explicit Handler(SystemClock* clock) : clock_(clock) {}
WriteBatch updates_ttl;
Status PutCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ts;
- Status st = AppendTS(value, &value_with_ts, env_);
+ Status st = AppendTS(value, &value_with_ts, clock_);
if (!st.ok()) {
return st;
}
@@ -288,7 +294,7 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
Status MergeCF(uint32_t column_family_id, const Slice& key,
const Slice& value) override {
std::string value_with_ts;
- Status st = AppendTS(value, &value_with_ts, env_);
+ Status st = AppendTS(value, &value_with_ts, clock_);
if (!st.ok()) {
return st;
}
@@ -301,9 +307,9 @@ Status DBWithTTLImpl::Write(const WriteOptions& opts, WriteBatch* updates) {
void LogData(const Slice& blob) override { updates_ttl.PutLogData(blob); }
private:
- Env* env_;
+ SystemClock* clock_;
};
- Handler handler(GetEnv());
+ Handler handler(GetEnv()->GetSystemClock().get());
Status st = updates->Iterate(&handler);
if (!st.ok()) {
return st;
diff --git a/utilities/ttl/db_ttl_impl.h b/utilities/ttl/db_ttl_impl.h
index 7ef32e83d..ab3ff3729 100644
--- a/utilities/ttl/db_ttl_impl.h
+++ b/utilities/ttl/db_ttl_impl.h
@@ -13,8 +13,8 @@
#include "db/db_impl/db_impl.h"
#include "rocksdb/compaction_filter.h"
#include "rocksdb/db.h"
-#include "rocksdb/env.h"
#include "rocksdb/merge_operator.h"
+#include "rocksdb/system_clock.h"
#include "rocksdb/utilities/db_ttl.h"
#include "rocksdb/utilities/utility_db.h"
#include "utilities/compaction_filters/layered_compaction_filter_base.h"
@@ -29,7 +29,7 @@ namespace ROCKSDB_NAMESPACE {
class DBWithTTLImpl : public DBWithTTL {
public:
static void SanitizeOptions(int32_t ttl, ColumnFamilyOptions* options,
- Env* env);
+ SystemClock* clock);
explicit DBWithTTLImpl(DB* db);
@@ -82,9 +82,10 @@ class DBWithTTLImpl : public DBWithTTL {
virtual DB* GetBaseDB() override { return db_; }
- static bool IsStale(const Slice& value, int32_t ttl, Env* env);
+ static bool IsStale(const Slice& value, int32_t ttl, SystemClock* clock);
- static Status AppendTS(const Slice& val, std::string* val_with_ts, Env* env);
+ static Status AppendTS(const Slice& val, std::string* val_with_ts,
+ SystemClock* clock);
static Status SanityCheckTimestamp(const Slice& str);
@@ -151,19 +152,19 @@ class TtlIterator : public Iterator {
class TtlCompactionFilter : public LayeredCompactionFilterBase {
public:
- TtlCompactionFilter(int32_t ttl, Env* env,
+ TtlCompactionFilter(int32_t ttl, SystemClock* clock,
const CompactionFilter* _user_comp_filter,
std::unique_ptr<const CompactionFilter>
_user_comp_filter_from_factory = nullptr)
: LayeredCompactionFilterBase(_user_comp_filter,
std::move(_user_comp_filter_from_factory)),
ttl_(ttl),
- env_(env) {}
+ clock_(clock) {}
virtual bool Filter(int level, const Slice& key, const Slice& old_val,
std::string* new_val, bool* value_changed) const
override {
- if (DBWithTTLImpl::IsStale(old_val, ttl_, env_)) {
+ if (DBWithTTLImpl::IsStale(old_val, ttl_, clock_)) {
return true;
}
if (user_comp_filter() == nullptr) {
@@ -188,15 +189,17 @@ class TtlCompactionFilter : public LayeredCompactionFilterBase {
private:
int32_t ttl_;
- Env* env_;
+ SystemClock* clock_;
};
class TtlCompactionFilterFactory : public CompactionFilterFactory {
public:
TtlCompactionFilterFactory(
- int32_t ttl, Env* env,
+ int32_t ttl, SystemClock* clock,
std::shared_ptr<CompactionFilterFactory> comp_filter_factory)
- : ttl_(ttl), env_(env), user_comp_filter_factory_(comp_filter_factory) {}
+ : ttl_(ttl),
+ clock_(clock),
+ user_comp_filter_factory_(comp_filter_factory) {}
virtual std::unique_ptr<CompactionFilter> CreateCompactionFilter(
const CompactionFilter::Context& context) override {
@@ -208,7 +211,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
}
return std::unique_ptr<TtlCompactionFilter>(new TtlCompactionFilter(
- ttl_, env_, nullptr, std::move(user_comp_filter_from_factory)));
+ ttl_, clock_, nullptr, std::move(user_comp_filter_from_factory)));
}
void SetTtl(int32_t ttl) {
@@ -221,7 +224,7 @@ class TtlCompactionFilterFactory : public CompactionFilterFactory {
private:
int32_t ttl_;
- Env* env_;
+ SystemClock* clock_;
std::shared_ptr<CompactionFilterFactory> user_comp_filter_factory_;
};
@@ -229,10 +232,10 @@ class TtlMergeOperator : public MergeOperator {
public:
explicit TtlMergeOperator(const std::shared_ptr<MergeOperator>& merge_op,
- Env* env)
- : user_merge_op_(merge_op), env_(env) {
+ SystemClock* clock)
+ : user_merge_op_(merge_op), clock_(clock) {
assert(merge_op);
- assert(env);
+ assert(clock);
}
virtual bool FullMergeV2(const MergeOperationInput& merge_in,
@@ -288,7 +291,7 @@ class TtlMergeOperator : public MergeOperator {
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
- if (!env_->GetCurrentTime(&curtime).ok()) {
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
merge_in.logger,
"Error: Could not get current time to be attached internally "
@@ -329,7 +332,7 @@ class TtlMergeOperator : public MergeOperator {
// Augment the *new_value with the ttl time-stamp
int64_t curtime;
- if (!env_->GetCurrentTime(&curtime).ok()) {
+ if (!clock_->GetCurrentTime(&curtime).ok()) {
ROCKS_LOG_ERROR(
logger,
"Error: Could not get current time to be attached internally "
@@ -347,7 +350,7 @@ class TtlMergeOperator : public MergeOperator {
private:
std::shared_ptr<MergeOperator> user_merge_op_;
- Env* env_;
+ SystemClock* clock_;
};
} // namespace ROCKSDB_NAMESPACE
#endif // ROCKSDB_LITE
diff --git a/utilities/write_batch_with_index/write_batch_with_index_internal.cc b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
index f54379924..c41ccd2fa 100644
--- a/utilities/write_batch_with_index/write_batch_with_index_internal.cc
+++ b/utilities/write_batch_with_index/write_batch_with_index_internal.cc
@@ -208,23 +208,23 @@ Status WriteBatchWithIndexInternal::MergeKey(const Slice& key,
static_cast_with_check<DBImpl>(db_->GetRootDB())
->immutable_db_options();
Statistics* statistics = immutable_db_options.statistics.get();
- Env* env = immutable_db_options.env;
Logger* logger = immutable_db_options.info_log.get();
-
+ SystemClock* clock = immutable_db_options.clock;
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
- logger, statistics, env->GetSystemClock(), result_operand);
+ logger, statistics, clock, result_operand);
} else if (db_options_ != nullptr) {
Statistics* statistics = db_options_->statistics.get();
Env* env = db_options_->env;
Logger* logger = db_options_->info_log.get();
+ SystemClock* clock = env->GetSystemClock().get();
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
- logger, statistics, env->GetSystemClock(), result_operand);
+ logger, statistics, clock, result_operand);
} else {
return MergeHelper::TimedFullMerge(
merge_operator, key, value, merge_context.GetOperands(), result,
- nullptr, nullptr, SystemClock::Default(), result_operand);
+ nullptr, nullptr, SystemClock::Default().get(), result_operand);
}
} else {
return Status::InvalidArgument("Must provide a column_family");