From 6d08ac27c43ae9eace83cff19fee7934d11a2b32 Mon Sep 17 00:00:00 2001 From: sdong Date: Fri, 17 Oct 2014 09:03:09 -0700 Subject: [PATCH] SERVER-15705 (rocksdb) numRecords and numEntries to work with recovery unit --- src/mongo/db/storage/rocks/rocks_engine.cpp | 12 +++++-- src/mongo/db/storage/rocks/rocks_engine.h | 2 ++ src/mongo/db/storage/rocks/rocks_record_store.cpp | 40 ++++++++++++---------- src/mongo/db/storage/rocks/rocks_record_store.h | 10 +++--- src/mongo/db/storage/rocks/rocks_recovery_unit.cpp | 40 +++++++++++++++++++++- src/mongo/db/storage/rocks/rocks_recovery_unit.h | 18 ++++++++++ .../db/storage/rocks/rocks_sorted_data_impl.cpp | 28 ++++++++++----- .../db/storage/rocks/rocks_sorted_data_impl.h | 23 ++----------- .../storage/rocks/rocks_sorted_data_impl_test.cpp | 9 +++-- 9 files changed, 122 insertions(+), 60 deletions(-) diff --git a/src/mongo/db/storage/rocks/rocks_engine.cpp b/src/mongo/db/storage/rocks/rocks_engine.cpp index 96c0f80..860eca2 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.cpp +++ b/src/mongo/db/storage/rocks/rocks_engine.cpp @@ -89,7 +89,7 @@ namespace mongo { for (const auto& cf : columnFamilyNames) { if (cf == rocksdb::kDefaultColumnFamilyName) { - columnFamilies.emplace_back(); + columnFamilies.emplace_back(cf, _defaultCFOptions()); continue; } auto orderings_iter = orderings.find(cf); @@ -179,7 +179,7 @@ namespace mongo { SortedDataInterface* RocksEngine::getSortedDataInterface(OperationContext* opCtx, const StringData& ident, const IndexDescriptor* desc) { - return new RocksSortedDataImpl(_db.get(), _getColumnFamily(ident), + return new RocksSortedDataImpl(_db.get(), _getColumnFamily(ident), ident, Ordering::make(desc->keyPattern())); } @@ -285,7 +285,7 @@ namespace mongo { } rocksdb::Options RocksEngine::dbOptions() { - rocksdb::Options options; + rocksdb::Options options(rocksdb::DBOptions(), _defaultCFOptions()); // Optimize RocksDB. This is the easiest way to get RocksDB to perform well options.IncreaseParallelism(); @@ -298,6 +298,12 @@ namespace mongo { return options; } + rocksdb::ColumnFamilyOptions RocksEngine::_defaultCFOptions() { + rocksdb::ColumnFamilyOptions options; + // TODO pass or set appropriate options for default CF. + return options; + } + rocksdb::ColumnFamilyOptions RocksEngine::_collectionOptions() const { rocksdb::ColumnFamilyOptions options; invariant( _collectionComparator.get() ); diff --git a/src/mongo/db/storage/rocks/rocks_engine.h b/src/mongo/db/storage/rocks/rocks_engine.h index 3386cd5..709cf85 100644 --- a/src/mongo/db/storage/rocks/rocks_engine.h +++ b/src/mongo/db/storage/rocks/rocks_engine.h @@ -116,6 +116,8 @@ namespace mongo { rocksdb::ColumnFamilyOptions _collectionOptions() const; rocksdb::ColumnFamilyOptions _indexOptions(const Ordering& order) const; + static rocksdb::ColumnFamilyOptions _defaultCFOptions(); + std::string _path; boost::scoped_ptr _db; boost::scoped_ptr _collectionComparator; diff --git a/src/mongo/db/storage/rocks/rocks_record_store.cpp b/src/mongo/db/storage/rocks/rocks_record_store.cpp index 26f894c..5c400ac 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.cpp +++ b/src/mongo/db/storage/rocks/rocks_record_store.cpp @@ -93,11 +93,13 @@ namespace mongo { bool metadataPresent = true; // XXX not using a Snapshot here if (!_db->Get(_readOptions(), rocksdb::Slice(_numRecordsKey), &value).ok()) { - _numRecords = 0; + _numRecords.store(0); metadataPresent = false; } else { - memcpy( &_numRecords, value.data(), sizeof( _numRecords )); + long long numRecords = 0; + memcpy( &numRecords, value.data(), sizeof(numRecords)); + _numRecords.store(numRecords); } // XXX not using a Snapshot here @@ -137,21 +139,27 @@ namespace mongo { _increaseDataSize(txn, -oldLength); } - bool RocksRecordStore::cappedAndNeedDelete() const { + long long RocksRecordStore::numRecords(OperationContext* txn) const { + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); + return _numRecords.load(std::memory_order::memory_order_relaxed) + + ru->getDeltaCounter(_numRecordsKey); + } + + bool RocksRecordStore::cappedAndNeedDelete(OperationContext* txn) const { if (!_isCapped) return false; if (_dataSize > _cappedMaxSize) return true; - if ((_cappedMaxDocs != -1) && (_numRecords > _cappedMaxDocs)) + if ((_cappedMaxDocs != -1) && (numRecords(txn) > _cappedMaxDocs)) return true; return false; } void RocksRecordStore::cappedDeleteAsNeeded(OperationContext* txn) { - if (!cappedAndNeedDelete()) + if (!cappedAndNeedDelete(txn)) return; auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); boost::scoped_ptr iter(ru->NewIterator(_columnFamily.get())); @@ -165,8 +173,8 @@ namespace mongo { // XXX PROBLEMS // 2 threads could delete the same document // multiple inserts using the same snapshot will delete the same document - while ( cappedAndNeedDelete() && iter->Valid() ) { - invariant(_numRecords > 0); + while ( cappedAndNeedDelete(txn) && iter->Valid() ) { + invariant(numRecords(txn) > 0); rocksdb::Slice slice = iter->key(); DiskLoc oldest = _makeDiskLoc( slice ); @@ -350,7 +358,7 @@ namespace mongo { output->appendNumber("nrecords", numRecords); } else - output->appendNumber("nrecords", _numRecords); + output->appendNumber("nrecords", numRecords(txn)); return Status::OK(); } @@ -456,8 +464,9 @@ namespace mongo { boost::mutex::scoped_lock dataSizeLk( _dataSizeLock ); ru->writeBatch()->Delete(_dataSizeKey); - boost::mutex::scoped_lock numRecordsLk( _numRecordsLock ); ru->writeBatch()->Delete(_numRecordsKey); + ru->incrementCounter(_numRecordsKey, &_numRecords, + -ru->getDeltaCounter(_numRecordsKey)); } rocksdb::ReadOptions RocksRecordStore::_readOptions(OperationContext* opCtx) { @@ -519,22 +528,15 @@ namespace mongo { // XXX make sure these work with rollbacks (I don't think they will) void RocksRecordStore::_changeNumRecords( OperationContext* txn, bool insert ) { - boost::mutex::scoped_lock lk( _numRecordsLock ); - + RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); if ( insert ) { - _numRecords++; + ru->incrementCounter(_numRecordsKey, &_numRecords, 1); } else { - _numRecords--; + ru->incrementCounter(_numRecordsKey, &_numRecords, -1); } - RocksRecoveryUnit* ru = RocksRecoveryUnit::getRocksRecoveryUnit( txn ); - const char* nr_ptr = reinterpret_cast( &_numRecords ); - - ru->writeBatch()->Put(rocksdb::Slice(_numRecordsKey), - rocksdb::Slice(nr_ptr, sizeof(long long))); } - void RocksRecordStore::_increaseDataSize( OperationContext* txn, int amount ) { boost::mutex::scoped_lock lk( _dataSizeLock ); diff --git a/src/mongo/db/storage/rocks/rocks_record_store.h b/src/mongo/db/storage/rocks/rocks_record_store.h index 746da39..42612a4 100644 --- a/src/mongo/db/storage/rocks/rocks_record_store.h +++ b/src/mongo/db/storage/rocks/rocks_record_store.h @@ -31,6 +31,7 @@ #pragma once +#include #include #include @@ -66,7 +67,7 @@ namespace mongo { virtual long long dataSize( OperationContext* txn ) const { return _dataSize; } - virtual long long numRecords( OperationContext* txn ) const { return _numRecords; } + virtual long long numRecords( OperationContext* txn ) const; virtual bool isCapped() const { return _isCapped; } @@ -201,7 +202,7 @@ namespace mongo { OperationContext* txn, const DiskLoc& loc); DiskLoc _nextId(); - bool cappedAndNeedDelete() const; + bool cappedAndNeedDelete(OperationContext* txn) const; void cappedDeleteAsNeeded(OperationContext* txn); // The use of this function requires that the passed in DiskLoc outlives the returned Slice @@ -220,15 +221,12 @@ namespace mongo { AtomicUInt64 _nextIdNum; long long _dataSize; - long long _numRecords; + std::atomic _numRecords; const string _dataSizeKey; const string _numRecordsKey; // locks - // TODO I think that when you get one of these, you generally need to acquire the other. - // These could probably be moved into a single lock. - boost::mutex _numRecordsLock; boost::mutex _dataSizeLock; }; } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp index a110312..b837a40 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.cpp @@ -67,6 +67,16 @@ namespace mongo { return; } + for (auto pair : _deltaCounters) { + auto& counter = pair.second; + counter._value->fetch_add(counter._delta, std::memory_order::memory_order_relaxed); + long long newValue = counter._value->load(std::memory_order::memory_order_relaxed); + + // TODO: make the encoding platform indepdent. + const char* nr_ptr = reinterpret_cast(&newValue); + writeBatch()->Put(pair.first, rocksdb::Slice(nr_ptr, sizeof(long long))); + } + rocksdb::Status status = _db->Write(rocksdb::WriteOptions(), _writeBatch->GetWriteBatch()); if ( !status.ok() ) { log() << "uh oh: " << status.ToString(); @@ -78,7 +88,7 @@ namespace mongo { delete change; } _changes.clear(); - + _deltaCounters.clear(); _writeBatch.reset(); if ( _snapshot ) { @@ -121,6 +131,8 @@ namespace mongo { void RocksRecoveryUnit::destroy() { if (_defaultCommit) { commitUnitOfWork(); + } else { + _deltaCounters.clear(); } if (_writeBatch && _writeBatch->GetWriteBatch()->Count() > 0) { @@ -176,6 +188,8 @@ namespace mongo { } rocksdb::Iterator* RocksRecoveryUnit::NewIterator(rocksdb::ColumnFamilyHandle* columnFamily) { + invariant(columnFamily != _db->DefaultColumnFamily()); + rocksdb::ReadOptions options; options.snapshot = snapshot(); auto iterator = _db->NewIterator(options, columnFamily); @@ -185,6 +199,30 @@ namespace mongo { return iterator; } + void RocksRecoveryUnit::incrementCounter(const rocksdb::Slice& counterKey, + std::atomic* counter, long long delta) { + if (delta == 0) { + return; + } + + auto pair = _deltaCounters.find(counterKey.ToString()); + if (pair == _deltaCounters.end()) { + _deltaCounters[counterKey.ToString()] = + mongo::RocksRecoveryUnit::Counter(counter, delta); + } else { + pair->second._delta += delta; + } + } + + long long RocksRecoveryUnit::getDeltaCounter(const rocksdb::Slice& counterKey) { + auto counter = _deltaCounters.find(counterKey.ToString()); + if (counter == _deltaCounters.end()) { + return 0; + } else { + return counter->second._delta; + } + } + RocksRecoveryUnit* RocksRecoveryUnit::getRocksRecoveryUnit(OperationContext* opCtx) { return dynamic_cast(opCtx->recoveryUnit()); } diff --git a/src/mongo/db/storage/rocks/rocks_recovery_unit.h b/src/mongo/db/storage/rocks/rocks_recovery_unit.h index 3e6ce6c..521931e 100644 --- a/src/mongo/db/storage/rocks/rocks_recovery_unit.h +++ b/src/mongo/db/storage/rocks/rocks_recovery_unit.h @@ -30,10 +30,12 @@ #pragma once +#include #include #include #include #include +#include #include #include @@ -93,6 +95,20 @@ namespace mongo { rocksdb::Iterator* NewIterator(rocksdb::ColumnFamilyHandle* columnFamily); + void incrementCounter(const rocksdb::Slice& counterKey, + std::atomic* counter, long long delta); + + long long getDeltaCounter(const rocksdb::Slice& counterKey); + + struct Counter { + std::atomic* _value; + long long _delta; + Counter() : Counter(nullptr, 0) {} + Counter(std::atomic* value, long long delta) : _value(value), _delta(delta) {} + }; + + typedef std::unordered_map CounterMap; + static RocksRecoveryUnit* getRocksRecoveryUnit(OperationContext* opCtx); private: @@ -106,6 +122,8 @@ namespace mongo { // bare because we need to call ReleaseSnapshot when we're done with this const rocksdb::Snapshot* _snapshot; // owned + CounterMap _deltaCounters; + std::vector _changes; bool _destroyed; diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp index afb5bdd..e18f7dc 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.cpp @@ -436,14 +436,20 @@ namespace mongo { RocksSortedDataImpl::RocksSortedDataImpl(rocksdb::DB* db, boost::shared_ptr cf, - Ordering order) - : _db(db), _columnFamily(cf), _order(order) { + const StringData& ident, Ordering order) + : _db(db), _columnFamily(cf), _order(order), + _numEntriesKey("numentries-" + ident.toString()) { invariant( _db ); invariant(_columnFamily.get()); + _numEntries = 0; string value; - bool ok = _db->GetProperty(_columnFamily.get(), "rocksdb.estimate-num-keys", &value); - invariant( ok ); - _numEntries.store(std::atoll(value.c_str())); + if (_db->Get(rocksdb::ReadOptions(), rocksdb::Slice(_numEntriesKey), &value).ok()) { + long long numEntries; + memcpy(&numEntries, value.data(), sizeof(numEntries)); + _numEntries.store(numEntries); + } else { + _numEntries.store(0); + } } SortedDataBuilderInterface* RocksSortedDataImpl::getBulkBuilder(OperationContext* txn, @@ -473,7 +479,8 @@ namespace mongo { } } - ru->registerChange(new ChangeNumEntries(&_numEntries, true)); + ru->incrementCounter(_numEntriesKey, &_numEntries, 1); + ru->writeBatch()->Put(_columnFamily.get(), makeString(key, loc), emptyByteSlice); return Status::OK(); @@ -492,7 +499,8 @@ namespace mongo { return; } - ru->registerChange(new ChangeNumEntries(&_numEntries, false)); + ru->incrementCounter(_numEntriesKey, &_numEntries, -1); + ru->writeBatch()->Delete(_columnFamily.get(), keyData); } @@ -541,7 +549,11 @@ namespace mongo { return Status::OK(); } - long long RocksSortedDataImpl::numEntries(OperationContext* txn) const { return _numEntries.load(); } + long long RocksSortedDataImpl::numEntries(OperationContext* txn) const { + auto ru = RocksRecoveryUnit::getRocksRecoveryUnit(txn); + return _numEntries.load(std::memory_order::memory_order_relaxed) + + ru->getDeltaCounter(_numEntriesKey); + } SortedDataInterface::Cursor* RocksSortedDataImpl::newCursor(OperationContext* txn, int direction) const { diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h index ae34cb0..1895538 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl.h @@ -65,7 +65,7 @@ namespace mongo { MONGO_DISALLOW_COPYING( RocksSortedDataImpl ); public: RocksSortedDataImpl(rocksdb::DB* db, boost::shared_ptr cf, - Ordering order); + const StringData& ident, Ordering order); virtual SortedDataBuilderInterface* getBulkBuilder(OperationContext* txn, bool dupsAllowed); @@ -113,25 +113,8 @@ namespace mongo { std::atomic _numEntries; - class ChangeNumEntries : public RecoveryUnit::Change { - public: - ChangeNumEntries(std::atomic* numEntries, bool increase) - : _numEntries(numEntries), _increase(increase) {} - - void commit() { - if (_increase) { - _numEntries->fetch_add(1); - } else { - _numEntries->fetch_sub(1); - } - } - - void rollback() {} - - private: - std::atomic* _numEntries; - bool _increase; - }; + const std::string _numEntriesKey; + }; } // namespace mongo diff --git a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp index 174d9c7..517bfb5 100644 --- a/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp +++ b/src/mongo/db/storage/rocks/rocks_sorted_data_impl_test.cpp @@ -50,18 +50,21 @@ namespace mongo { rocksdb::DB* db; std::vector cfs; cfs.emplace_back(); - cfs[0].options.comparator = RocksSortedDataImpl::newRocksComparator(_order); + cfs.emplace_back("sroted_data_impl", rocksdb::ColumnFamilyOptions()); + cfs[1].options.comparator = RocksSortedDataImpl::newRocksComparator(_order); rocksdb::DBOptions db_options; db_options.create_if_missing = true; + db_options.create_missing_column_families = true; std::vector handles; auto s = rocksdb::DB::Open(db_options, _tempDir.path(), cfs, &handles, &db); ASSERT(s.ok()); _db.reset(db); - _cf.reset(handles[0]); + _cf.reset(handles[1]); } virtual SortedDataInterface* newSortedDataInterface(bool unique) { - return new RocksSortedDataImpl(_db.get(), _cf, _order); + return new RocksSortedDataImpl(_db.get(), _cf, rocksdb::kDefaultColumnFamilyName, + _order); } virtual RecoveryUnit* newRecoveryUnit() { return new RocksRecoveryUnit(_db.get()); } -- 1.8.1