From c0f66bc07c83b1a47a01983438603fa956db1666 Mon Sep 17 00:00:00 2001 From: Jordi Serra Torrens Date: Thu, 25 Mar 2021 16:21:16 +0000 Subject: [PATCH] SERVER-55114: Bump the minor version of the newest chunk on change allowMigrations and reshardingFields --- .../sharding/move_chunk_allowMigrations.js | 4 +- src/mongo/db/s/SConscript | 2 +- .../s/config/config_server_test_fixture.cpp | 32 ++- .../db/s/config/config_server_test_fixture.h | 9 +- ...tion_version_and_change_metadata_test.cpp} | 225 +++++------------- ...rding_catalog_manager_chunk_operations.cpp | 152 ++++++------ .../db/s/drop_collection_coordinator.cpp | 2 +- .../resharding_coordinator_test.cpp | 83 ++++--- 8 files changed, 211 insertions(+), 298 deletions(-) rename src/mongo/db/s/config/{sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp => sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp} (55%) diff --git a/jstests/sharding/move_chunk_allowMigrations.js b/jstests/sharding/move_chunk_allowMigrations.js index 33b4067a96..cace9a749d 100644 --- a/jstests/sharding/move_chunk_allowMigrations.js +++ b/jstests/sharding/move_chunk_allowMigrations.js @@ -160,7 +160,7 @@ const testConfigsvrSetAllowMigrationsCommand = function() { assert.eq(false, configDB.collections.findOne({_id: ns}).allowMigrations); // Check that the collection version has been bumped and the shard has refreshed. - ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(2, 0)); + ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(1, 1)); // Use _configsvrSetAllowMigrations to allow migrations to happen assert.commandWorked(st.configRS.getPrimary().adminCommand( @@ -171,7 +171,7 @@ const testConfigsvrSetAllowMigrationsCommand = function() { assert.eq(undefined, configDB.collections.findOne({_id: ns}).allowMigrations); // Check that the collection version has been bumped and the shard has refreshed. - ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(3, 0)); + ShardVersioningUtil.assertCollectionVersionEquals(st.shard0, ns, Timestamp(1, 2)); }; // Test cases that should disable the balancer. diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 3066728123..3b00fbdef3 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -553,7 +553,7 @@ env.CppUnitTest( 'config/sharding_catalog_manager_add_shard_test.cpp', 'config/sharding_catalog_manager_add_shard_to_zone_test.cpp', 'config/sharding_catalog_manager_assign_key_range_to_zone_test.cpp', - 'config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp', + 'config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp', 'config/sharding_catalog_manager_clear_jumbo_flag_test.cpp', 'config/sharding_catalog_manager_commit_chunk_migration_test.cpp', 'config/sharding_catalog_manager_config_initialization_test.cpp', diff --git a/src/mongo/db/s/config/config_server_test_fixture.cpp b/src/mongo/db/s/config/config_server_test_fixture.cpp index 2497a53815..97541f8c84 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.cpp +++ b/src/mongo/db/s/config/config_server_test_fixture.cpp @@ -268,12 +268,13 @@ Status ConfigServerTestFixture::deleteToConfigCollection(OperationContext* opCtx StatusWith ConfigServerTestFixture::findOneOnConfigCollection(OperationContext* opCtx, const NamespaceString& ns, - const BSONObj& filter) { + const BSONObj& filter, + const BSONObj& sort) { auto config = getConfigShard(); invariant(config); auto findStatus = config->exhaustiveFindOnConfig( - opCtx, kReadPref, repl::ReadConcernLevel::kMajorityReadConcern, ns, filter, BSONObj(), 1); + opCtx, kReadPref, repl::ReadConcernLevel::kMajorityReadConcern, ns, filter, sort, 1); if (!findStatus.isOK()) { return findStatus.getStatus(); } @@ -355,6 +356,33 @@ StatusWith ConfigServerTestFixture::getChunkDoc(OperationContext* opC return ChunkType::fromConfigBSON(doc.getValue()); } +StatusWith ConfigServerTestFixture::getCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss) { + auto collectionDoc = findOneOnConfigCollection( + opCtx, CollectionType::ConfigNS, BSON(CollectionType::kNssFieldName << nss.ns())); + if (!collectionDoc.isOK()) + return collectionDoc.getStatus(); + + const CollectionType coll(collectionDoc.getValue()); + + auto chunkDoc = findOneOnConfigCollection( + opCtx, + ChunkType::ConfigNS, + coll.getTimestamp() ? BSON(ChunkType::collectionUUID << coll.getUuid()) + : BSON(ChunkType::ns << coll.getNss().ns()) /* query */, + BSON(ChunkType::lastmod << -1) /* sort */); + + if (!chunkDoc.isOK()) + return chunkDoc.getStatus(); + + const auto chunkType = ChunkType::fromConfigBSON(chunkDoc.getValue()); + if (!chunkType.isOK()) { + return chunkType.getStatus(); + } + + return chunkType.getValue().getVersion(); +} + void ConfigServerTestFixture::setupDatabase(const std::string& dbName, const ShardId primaryShard, const bool sharded) { diff --git a/src/mongo/db/s/config/config_server_test_fixture.h b/src/mongo/db/s/config/config_server_test_fixture.h index e19c528485..d56b47ea64 100644 --- a/src/mongo/db/s/config/config_server_test_fixture.h +++ b/src/mongo/db/s/config/config_server_test_fixture.h @@ -85,7 +85,8 @@ protected: */ StatusWith findOneOnConfigCollection(OperationContext* opCtx, const NamespaceString& ns, - const BSONObj& filter); + const BSONObj& filter, + const BSONObj& sort = {}); /** * Setup the config.shards collection to contain the given shards. @@ -110,6 +111,12 @@ protected: */ StatusWith getChunkDoc(OperationContext* opCtx, const BSONObj& minKey); + /** + * Returns the collection version. + */ + StatusWith getCollectionVersion(OperationContext* opCtx, + const NamespaceString& nss); + /** * Inserts a document for the database into the config.databases collection. */ diff --git a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp similarity index 55% rename from src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp rename to src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp index 9ff72e16ff..d3348dbce2 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_bump_shard_versions_and_change_metadata_test.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_bump_collection_version_and_change_metadata_test.cpp @@ -50,7 +50,7 @@ const KeyPattern kKeyPattern(BSON("a" << 1)); const ShardType kShard0("shard0000", "shard0000:1234"); const ShardType kShard1("shard0001", "shard0001:1234"); -class ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest +class ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest : public ConfigServerTestFixture { void setUp() { ConfigServerTestFixture::setUp(); @@ -89,90 +89,24 @@ protected: return chunkType; } - /** - * Determines if the chunk's version has been bumped to the targetChunkVersion. - */ - bool chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - const ChunkType& chunkTypeBefore, - const StatusWith swChunkTypeAfter, - const ChunkVersion& targetChunkVersion) { - ASSERT_OK(swChunkTypeAfter.getStatus()); - auto chunkTypeAfter = swChunkTypeAfter.getValue(); - - // Regardless of whether the major version was bumped, the chunk's other fields should be - // unchanged. - ASSERT_EQ(chunkTypeBefore.getName(), chunkTypeAfter.getName()); - ASSERT_EQ(chunkTypeBefore.getNS(), chunkTypeAfter.getNS()); - ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMin(), chunkTypeAfter.getMin()); - ASSERT_BSONOBJ_EQ(chunkTypeBefore.getMax(), chunkTypeAfter.getMax()); - ASSERT(chunkTypeBefore.getHistory() == chunkTypeAfter.getHistory()); - - return chunkTypeAfter.getVersion().majorVersion() == targetChunkVersion.majorVersion(); + void assertChunkUnchanged(const ChunkType& chunkTypeBefore) { + const auto chunkTypeNow = + uassertStatusOK(getChunkDoc(operationContext(), chunkTypeBefore.getMin())); + ASSERT_BSONOBJ_EQ(chunkTypeBefore.toConfigBSON(), chunkTypeNow.toConfigBSON()); } - /** - * If there are multiple chunks per shard, the chunk whose version gets bumped is not - * deterministic. - * - * Asserts that only chunk per shard has its major version increased. - */ - void assertOnlyOneChunkVersionBumped(OperationContext* opCtx, - std::vector originalChunkTypes, - const ChunkVersion& targetChunkVersion) { - auto aChunkVersionWasBumped = false; - for (auto originalChunkType : originalChunkTypes) { - auto swChunkTypeAfter = getChunkDoc(opCtx, originalChunkType.getMin()); - auto wasBumped = chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - originalChunkType, swChunkTypeAfter, targetChunkVersion); - if (aChunkVersionWasBumped) { - ASSERT_FALSE(wasBumped); - } else { - aChunkVersionWasBumped = wasBumped; - } - } - - ASSERT_TRUE(aChunkVersionWasBumped); + void assertChunkVersionChangedAndOtherFieldsUnchanged(const ChunkType& chunkTypeBefore, + const ChunkVersion& targetChunkVersion) { + const auto chunkTypeNow = + uassertStatusOK(getChunkDoc(operationContext(), chunkTypeBefore.getMin())); + ASSERT_BSONOBJ_EQ(chunkTypeBefore.toConfigBSON().removeField(ChunkType::lastmod()), + chunkTypeNow.toConfigBSON().removeField(ChunkType::lastmod())); + ASSERT_EQ(targetChunkVersion, chunkTypeNow.getVersion()); } }; -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionOneChunkPerShard) { - const auto epoch = OID::gen(); - const auto shard0Chunk0 = - generateChunkType(kNss, - ChunkVersion(10, 1, epoch, boost::none /* timestamp */), - kShard0.getName(), - BSON("a" << 1), - BSON("a" << 10)); - const auto shard1Chunk0 = - generateChunkType(kNss, - ChunkVersion(11, 2, epoch, boost::none /* timestamp */), - kShard1.getName(), - BSON("a" << 11), - BSON("a" << 20)); - - const auto collectionVersion = shard1Chunk0.getVersion(); - ChunkVersion targetChunkVersion(collectionVersion.majorVersion() + 1, - 0, - collectionVersion.epoch(), - collectionVersion.getTimestamp()); - - setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard1Chunk0}); - - auto opCtx = operationContext(); - - ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( - opCtx, kNss, [&](OperationContext*, TxnNumber) {}); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); -} - -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionTwoChunksOnOneShard) { +TEST_F(ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest, + BumpsOnlyMinorVersionOfNewestChunk) { const auto epoch = OID::gen(); const auto shard0Chunk0 = generateChunkType(kNss, @@ -193,27 +127,33 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, BSON("a" << 21), BSON("a" << 100)); - const auto collectionVersion = shard0Chunk1.getVersion(); - ChunkVersion targetChunkVersion(collectionVersion.majorVersion() + 1, - 0, - collectionVersion.epoch(), - collectionVersion.getTimestamp()); - setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0}); + const auto collectionVersion = shard0Chunk1.getVersion(); + auto targetCollectionVersion = collectionVersion; + targetCollectionVersion.incMinor(); + auto opCtx = operationContext(); + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( opCtx, kNss, [&](OperationContext*, TxnNumber) {}); - assertOnlyOneChunkVersionBumped( - operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); + assertChunkUnchanged(shard0Chunk0); + assertChunkVersionChangedAndOtherFieldsUnchanged(shard0Chunk1, targetCollectionVersion); + assertChunkUnchanged(shard1Chunk0); +} - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); +TEST_F(ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest, InexistentCollection) { + auto opCtx = operationContext(); + + ASSERT_THROWS_CODE( + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, kNss, [&](OperationContext*, TxnNumber) {}), + DBException, + ErrorCodes::NamespaceNotFound); } -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, - BumpChunkVersionTwoChunksOnTwoShards) { +TEST_F(ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest, NoChunks) { const auto epoch = OID::gen(); const auto shard0Chunk0 = generateChunkType(kNss, @@ -221,45 +161,22 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, kShard0.getName(), BSON("a" << 1), BSON("a" << 10)); - const auto shard0Chunk1 = - generateChunkType(kNss, - ChunkVersion(11, 2, epoch, boost::none /* timestamp */), - kShard0.getName(), - BSON("a" << 11), - BSON("a" << 20)); - const auto shard1Chunk0 = - generateChunkType(kNss, - ChunkVersion(8, 1, epoch, boost::none /* timestamp */), - kShard1.getName(), - BSON("a" << 21), - BSON("a" << 100)); - const auto shard1Chunk1 = - generateChunkType(kNss, - ChunkVersion(12, 1, epoch, boost::none /* timestamp */), - kShard1.getName(), - BSON("a" << 101), - BSON("a" << 200)); - - const auto collectionVersion = shard1Chunk1.getVersion(); - ChunkVersion targetChunkVersion(collectionVersion.majorVersion() + 1, - 0, - collectionVersion.epoch(), - collectionVersion.getTimestamp()); - setupCollection(kNss, kKeyPattern, {shard0Chunk0, shard0Chunk1, shard1Chunk0, shard1Chunk1}); + setupCollection(kNss, kKeyPattern, {shard0Chunk0}); auto opCtx = operationContext(); - ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( - opCtx, kNss, [&](OperationContext*, TxnNumber) {}); - assertOnlyOneChunkVersionBumped( - operationContext(), {shard0Chunk0, shard0Chunk1}, targetChunkVersion); + ASSERT_OK(deleteToConfigCollection( + opCtx, ChunkType::ConfigNS, BSON(ChunkType::name << shard0Chunk0.getName()), false)); - assertOnlyOneChunkVersionBumped( - operationContext(), {shard1Chunk0, shard1Chunk1}, targetChunkVersion); + ASSERT_THROWS_CODE( + ShardingCatalogManager::get(opCtx)->bumpCollectionVersionAndChangeMetadataInTxn( + opCtx, kNss, [&](OperationContext*, TxnNumber) {}), + DBException, + ErrorCodes::IncompatibleShardingMetadata); } -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, +TEST_F(ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest, SucceedsInThePresenceOfTransientTransactionErrors) { const auto epoch = OID::gen(); const auto shard0Chunk0 = @@ -288,16 +205,11 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, } }); - auto targetChunkVersion = ChunkVersion{initialCollectionVersion.majorVersion() + 1, - 0, - initialCollectionVersion.epoch(), - initialCollectionVersion.getTimestamp()}; - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); + auto targetCollectionVersion = initialCollectionVersion; + targetCollectionVersion.incMinor(); - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); + assertChunkUnchanged(shard0Chunk0); + assertChunkVersionChangedAndOtherFieldsUnchanged(shard1Chunk0, targetCollectionVersion); ASSERT_EQ(numCalls, 5) << "transaction succeeded after unexpected number of attempts"; @@ -317,21 +229,15 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, } }); - targetChunkVersion = ChunkVersion{initialCollectionVersion.majorVersion() + 2, - 0, - initialCollectionVersion.epoch(), - initialCollectionVersion.getTimestamp()}; - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, getChunkDoc(operationContext(), shard0Chunk0.getMin()), targetChunkVersion)); + targetCollectionVersion.incMinor(); - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, getChunkDoc(operationContext(), shard1Chunk0.getMin()), targetChunkVersion)); + assertChunkUnchanged(shard0Chunk0); + assertChunkVersionChangedAndOtherFieldsUnchanged(shard1Chunk0, targetCollectionVersion); ASSERT_EQ(numCalls, 5) << "transaction succeeded after unexpected number of attempts"; } -TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, +TEST_F(ShardingCatalogManagerBumpCollectionVersionAndChangeMetadataTest, StopsRetryingOnPermanentServerErrors) { const auto epoch = OID::gen(); const auto shard0Chunk0 = @@ -362,15 +268,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, DBException, ErrorCodes::ShutdownInProgress); - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, - getChunkDoc(operationContext(), shard0Chunk0.getMin()), - shard0Chunk0.getVersion())); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, - getChunkDoc(operationContext(), shard1Chunk0.getMin()), - shard1Chunk0.getVersion())); + assertChunkUnchanged(shard0Chunk0); + assertChunkUnchanged(shard1Chunk0); ASSERT_EQ(numCalls, 1) << "transaction failed after unexpected number of attempts"; @@ -387,15 +286,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, DBException, ErrorCodes::NotWritablePrimary); - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, - getChunkDoc(operationContext(), shard0Chunk0.getMin()), - shard0Chunk0.getVersion())); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, - getChunkDoc(operationContext(), shard1Chunk0.getMin()), - shard1Chunk0.getVersion())); + assertChunkUnchanged(shard0Chunk0); + assertChunkUnchanged(shard1Chunk0); ASSERT_EQ(numCalls, 1) << "transaction failed after unexpected number of attempts"; @@ -417,15 +309,8 @@ TEST_F(ShardingCatalogManagerBumpShardVersionsAndChangeMetadataTest, DBException, ErrorCodes::Interrupted); - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard0Chunk0, - getChunkDoc(operationContext(), shard0Chunk0.getMin()), - shard0Chunk0.getVersion())); - - ASSERT_TRUE(chunkMajorVersionWasBumpedAndOtherFieldsAreUnchanged( - shard1Chunk0, - getChunkDoc(operationContext(), shard1Chunk0.getMin()), - shard1Chunk0.getVersion())); + assertChunkUnchanged(shard0Chunk0); + assertChunkUnchanged(shard1Chunk0); ASSERT_EQ(numCalls, 1) << "transaction failed after unexpected number of attempts"; } diff --git a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp index 223573752e..7df85a9735 100644 --- a/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp +++ b/src/mongo/db/s/config/sharding_catalog_manager_chunk_operations.cpp @@ -404,18 +404,20 @@ BSONObj getShardAndCollectionVersion(OperationContext* opCtx, return result.obj(); } -void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx, - const NamespaceString& nss, - TxnNumber txnNumber, - const std::vector& shardIds) { - auto curCollectionVersion = uassertStatusOK(getCollectionVersion(opCtx, nss)); - ChunkVersion targetChunkVersion(curCollectionVersion.majorVersion() + 1, - 0, - curCollectionVersion.epoch(), - curCollectionVersion.getTimestamp()); +NamespaceStringOrUUID getNsOrUUIDForChunkTargeting(const CollectionType& coll) { + if (coll.getTimestamp()) { + return {coll.getNss().db().toString(), coll.getUuid()}; + } else { + return {coll.getNss()}; + } +} +void bumpCollectionMinorVersion(OperationContext* opCtx, + const NamespaceString& nss, + TxnNumber txnNumber) { auto const configShard = Grid::get(opCtx)->shardRegistry()->getConfigShard(); - auto findCollResponse = uassertStatusOK( + + const auto findCollResponse = uassertStatusOK( configShard->exhaustiveFindOnConfig(opCtx, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, repl::ReadConcernLevel::kLocalReadConcern, @@ -423,58 +425,59 @@ void bumpMajorVersionOneChunkPerShard(OperationContext* opCtx, BSON(CollectionType::kNssFieldName << nss.ns()), {}, 1)); - uassert(ErrorCodes::ConflictingOperationInProgress, - "Collection does not exist", - !findCollResponse.docs.empty()); + uassert( + ErrorCodes::NamespaceNotFound, "Collection does not exist", !findCollResponse.docs.empty()); const CollectionType coll(findCollResponse.docs[0]); + const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll); - for (const auto& shardId : shardIds) { - BSONObjBuilder updateBuilder; - BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set")); - targetChunkVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod()); - updateVersionClause.doneFast(); - auto chunkUpdate = updateBuilder.obj(); - - const auto query = [&]() { - if (coll.getTimestamp()) { - return BSON(ChunkType::collectionUUID << coll.getUuid() - << ChunkType::shard(shardId.toString())); - } else { - return BSON(ChunkType::ns(coll.getNss().ns()) - << ChunkType::shard(shardId.toString())); - } - }(); - auto request = BatchedCommandRequest::buildUpdateOp(ChunkType::ConfigNS, - query, // query - chunkUpdate, // update - false, // upsert - false // multi - ); - - auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( - opCtx, ChunkType::ConfigNS, request, txnNumber); - - auto numDocsExpectedModified = 1; - auto numDocsModified = res.getIntField("n"); - - uassert(5030400, - str::stream() << "Expected to match " << numDocsExpectedModified - << " docs, but only matched " << numDocsModified - << " for write request " << request.toString(), - numDocsExpectedModified == numDocsModified); - - // There exists a constraint that a chunk version must be unique for a given namespace, - // so the minor version is incremented for each chunk placed. - targetChunkVersion.incMinor(); - } -} + // Find the newest chunk + const auto findChunkResponse = uassertStatusOK(configShard->exhaustiveFindOnConfig( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + repl::ReadConcernLevel::kLocalReadConcern, + ChunkType::ConfigNS, + nsOrUUID.uuid() ? BSON(ChunkType::collectionUUID << *nsOrUUID.uuid()) + : BSON(ChunkType::ns() << nsOrUUID.nss()->toString()) /* query */, + BSON(ChunkType::lastmod << -1) /* sort */, + 1 /* limit */)); -NamespaceStringOrUUID getNsOrUUIDForChunkTargeting(const CollectionType& coll) { - if (coll.getTimestamp()) { - return {coll.getNss().db().toString(), coll.getUuid()}; - } else { - return {coll.getNss()}; - } + uassert(ErrorCodes::IncompatibleShardingMetadata, + str::stream() << "Tried to find max chunk version for collection '" << nss.ns() + << ", but found no chunks", + !findChunkResponse.docs.empty()); + + const auto newestChunk = uassertStatusOK(ChunkType::fromConfigBSON(findChunkResponse.docs[0])); + const auto targetVersion = [&]() { + ChunkVersion version = newestChunk.getVersion(); + version.incMinor(); + return version; + }(); + + // Update the newest chunk to have the new (bumped) version + BSONObjBuilder updateBuilder; + BSONObjBuilder updateVersionClause(updateBuilder.subobjStart("$set")); + targetVersion.appendLegacyWithField(&updateVersionClause, ChunkType::lastmod()); + updateVersionClause.doneFast(); + const auto chunkUpdate = updateBuilder.obj(); + const auto request = BatchedCommandRequest::buildUpdateOp( + ChunkType::ConfigNS, + BSON(ChunkType::name << newestChunk.getName()), // query + chunkUpdate, // update + false, // upsert + false // multi + ); + + const auto res = ShardingCatalogManager::get(opCtx)->writeToConfigDocumentInTxn( + opCtx, ChunkType::ConfigNS, request, txnNumber); + + auto numDocsExpectedModified = 1; + auto numDocsModified = res.getIntField("n"); + + uassert(5511400, + str::stream() << "Expected to match " << numDocsExpectedModified + << " docs, but only matched " << numDocsModified << " for write request " + << request.toString(), + numDocsExpectedModified == numDocsModified); } std::vector getShardsOwningChunksForCollection(OperationContext* opCtx, @@ -489,7 +492,7 @@ std::vector getShardsOwningChunksForCollection(OperationContext* opCtx, {}, 1)); uassert( - ErrorCodes::Error(5514600), "Collection does not exist", !findCollResponse.docs.empty()); + ErrorCodes::NamespaceNotFound, "Collection does not exist", !findCollResponse.docs.empty()); const CollectionType coll(findCollResponse.docs[0]); const auto nsOrUUID = getNsOrUUIDForChunkTargeting(coll); @@ -1415,19 +1418,11 @@ void ShardingCatalogManager::bumpMultipleCollectionVersionsAndChangeMetadataInTx // migrations Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - using NssAndShardIds = std::pair>; - std::vector nssAndShardIds; - for (const auto& nss : collNames) { - auto shardIds = getShardsOwningChunksForCollection(opCtx, nss); - nssAndShardIds.emplace_back(nss, std::move(shardIds)); - } - withTransaction(opCtx, NamespaceString::kConfigReshardingOperationsNamespace, [&](OperationContext* opCtx, TxnNumber txnNumber) { - for (const auto& nssAndShardId : nssAndShardIds) { - bumpMajorVersionOneChunkPerShard( - opCtx, nssAndShardId.first, txnNumber, nssAndShardId.second); + for (const auto& nss : collNames) { + bumpCollectionMinorVersion(opCtx, nss, txnNumber); } changeMetadataFunc(opCtx, txnNumber); }); @@ -1520,16 +1515,13 @@ void ShardingCatalogManager::splitOrMarkJumbo(OperationContext* opCtx, void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(OperationContext* opCtx, const NamespaceString& nss, bool allowMigrations) { - std::set shardsIds; + std::vector shardIds; { // Take _kChunkOpLock in exclusive mode to prevent concurrent chunk splits, merges, and // migrations Lock::ExclusiveLock lk(opCtx->lockState(), _kChunkOpLock); - const auto cm = uassertStatusOK( - Grid::get(opCtx)->catalogCache()->getShardedCollectionRoutingInfoWithRefresh(opCtx, - nss)); - cm.getAllShardIds(&shardsIds); + shardIds = getShardsOwningChunksForCollection(opCtx, nss); withTransaction( opCtx, CollectionType::ConfigNS, [&](OperationContext* opCtx, TxnNumber txnNumber) { // Update the 'allowMigrations' field. An unset 'allowMigrations' field implies @@ -1550,9 +1542,7 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(OperationContext* false /* multi */), txnNumber); - // Bump the chunk version for one single chunk - invariant(!shardsIds.empty()); - bumpMajorVersionOneChunkPerShard(opCtx, nss, txnNumber, {*shardsIds.begin()}); + bumpCollectionMinorVersion(opCtx, nss, txnNumber); }); // From now on migrations are not allowed anymore, so it is not possible that new shards @@ -1561,11 +1551,7 @@ void ShardingCatalogManager::setAllowMigrationsAndBumpOneChunk(OperationContext* // Trigger a refresh on each shard containing chunks for this collection. const auto executor = Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(); - sharding_util::tellShardsToRefreshCollection( - opCtx, - {std::make_move_iterator(shardsIds.begin()), std::make_move_iterator(shardsIds.end())}, - nss, - executor); + sharding_util::tellShardsToRefreshCollection(opCtx, shardIds, nss, executor); } } // namespace mongo diff --git a/src/mongo/db/s/drop_collection_coordinator.cpp b/src/mongo/db/s/drop_collection_coordinator.cpp index eb583e8168..e55282204c 100644 --- a/src/mongo/db/s/drop_collection_coordinator.cpp +++ b/src/mongo/db/s/drop_collection_coordinator.cpp @@ -125,7 +125,7 @@ ExecutorFuture DropCollectionCoordinator::_runImpl( sharding_ddl_util::stopMigrations(opCtx, nss()); auto coll = Grid::get(opCtx)->catalogClient()->getCollection(opCtx, nss()); _doc.setCollInfo(std::move(coll)); - } catch (ExceptionFor&) { + } catch (ExceptionFor&) { // The collection is not sharded or doesn't exist. _doc.setCollInfo(boost::none); } diff --git a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp index c021a6c947..a6e992850c 100644 --- a/src/mongo/db/s/resharding/resharding_coordinator_test.cpp +++ b/src/mongo/db/s/resharding/resharding_coordinator_test.cpp @@ -52,6 +52,8 @@ namespace mongo { namespace { +using unittest::assertGet; + class ReshardingCoordinatorPersistenceTest : public ConfigServerTestFixture { protected: void setUp() override { @@ -645,22 +647,6 @@ protected: opCtx, expectedOriginalCollType, expectedCoordinatorDoc); } - void assertChunkVersionDidNotIncreaseAfterStateTransition( - const ChunkType& chunk, const ChunkVersion& collectionVersion) { - auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); - ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); - ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion()); - } - - void assertChunkVersionIncreasedAfterStateTransition(const ChunkType& chunk, - const ChunkVersion& collectionVersion) { - auto chunkAfterTransition = getChunkDoc(operationContext(), chunk.getMin()); - ASSERT_EQ(chunkAfterTransition.getStatus(), Status::OK()); - ASSERT_EQ(chunkAfterTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion() + 1); - } - NamespaceString _originalNss = NamespaceString("db.foo"); UUID _originalUUID = UUID::gen(); OID _originalEpoch = OID::gen(); @@ -723,10 +709,9 @@ TEST_F(ReshardingCoordinatorPersistenceTest, WriteInitialInfoSucceeds) { // bumped twice in 'writeInitialStateAndCatalogUpdatesExpectSuccess': once when reshardingFields // is inserted to the collection doc, and once again when the state transitions to // kPreparingToDonate. - auto donorChunkPostTransition = getChunkDoc(operationContext(), donorChunk.getMin()); - ASSERT_EQ(donorChunkPostTransition.getStatus(), Status::OK()); - ASSERT_EQ(donorChunkPostTransition.getValue().getVersion().majorVersion(), - collectionVersion.majorVersion() + 2); + const auto postTransitionCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + ASSERT_TRUE(collectionVersion.isOlderThan(postTransitionCollectionVersion)); } TEST_F(ReshardingCoordinatorPersistenceTest, BasicStateTransitionSucceeds) { @@ -735,21 +720,28 @@ TEST_F(ReshardingCoordinatorPersistenceTest, BasicStateTransitionSucceeds) { // Ensure the chunks for the original and temporary namespaces exist since they will be bumped // as a product of the state transition to kBlockingWrites. - auto donorChunk = makeAndInsertChunksForDonorShard( + makeAndInsertChunksForDonorShard( _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); - auto donorCollectionVersion = donorChunk.getVersion(); + auto initialOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); - auto recipientChunk = makeAndInsertChunksForRecipientShard( + makeAndInsertChunksForRecipientShard( _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); - auto recipientCollectionVersion = donorChunk.getVersion(); + auto initialTempCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _tempNss)); // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; expectedCoordinatorDoc.setState(CoordinatorStateEnum::kBlockingWrites); writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion); - assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion); + + auto finalOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + ASSERT_TRUE(initialOriginalCollectionVersion.isOlderThan(finalOriginalCollectionVersion)); + + auto finalTempCollectionVersion = assertGet(getCollectionVersion(operationContext(), _tempNss)); + ASSERT_TRUE(initialTempCollectionVersion.isOlderThan(finalTempCollectionVersion)); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSucceeds) { @@ -758,13 +750,11 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu // Ensure the chunks for the original and temporary namespaces exist since they will be bumped // as a product of the state transition to kCloning. - auto donorChunk = makeAndInsertChunksForDonorShard( + makeAndInsertChunksForDonorShard( _originalNss, _originalEpoch, _oldShardKey, std::vector{OID::gen(), OID::gen()}); - auto donorCollectionVersion = donorChunk.getVersion(); - auto recipientChunk = makeAndInsertChunksForRecipientShard( + makeAndInsertChunksForRecipientShard( _tempNss, _tempEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); - auto recipientCollectionVersion = recipientChunk.getVersion(); // Persist the updates on disk auto expectedCoordinatorDoc = coordinatorDoc; @@ -777,9 +767,19 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWithFetchTimestampSu return approxCopySize; }()); + auto initialOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + auto initialTempCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _tempNss)); + writeStateTransitionUpdateExpectSuccess(operationContext(), expectedCoordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(donorChunk, donorCollectionVersion); - assertChunkVersionIncreasedAfterStateTransition(recipientChunk, recipientCollectionVersion); + + auto finalOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + ASSERT_TRUE(initialOriginalCollectionVersion.isOlderThan(finalOriginalCollectionVersion)); + + auto finalTempCollectionVersion = assertGet(getCollectionVersion(operationContext(), _tempNss)); + ASSERT_TRUE(initialTempCollectionVersion.isOlderThan(finalTempCollectionVersion)); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSucceeds) { @@ -805,12 +805,15 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTranstionToDecisionPersistedSu auto updatedChunks = makeChunks(_originalNss, _finalEpoch, _newShardKey, initialChunksIds); auto updatedZones = makeZones(_originalNss, _newShardKey); + auto initialCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + writeDecisionPersistedStateExpectSuccess( operationContext(), expectedCoordinatorDoc, fetchTimestamp, updatedChunks, updatedZones); // Since the epoch is changed, there is no need to bump the chunk versions with the transition. - assertChunkVersionDidNotIncreaseAfterStateTransition(recipientChunk, - recipientChunk.getVersion()); + auto finalCollectionVersion = assertGet(getCollectionVersion(operationContext(), _originalNss)); + ASSERT_EQ(initialCollectionVersion.toLong(), finalCollectionVersion.toLong()); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToErrorSucceeds) { @@ -841,12 +844,16 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionToDoneSucceeds) { // Ensure the chunks for the original namespace exist since they will be bumped as a product of // the state transition to kDone. - auto finalChunk = makeAndInsertChunksForRecipientShard( + makeAndInsertChunksForRecipientShard( _originalNss, _finalEpoch, _newShardKey, std::vector{OID::gen(), OID::gen()}); - auto collectionVersion = finalChunk.getVersion(); + auto initialOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); removeCoordinatorDocAndReshardingFieldsExpectSuccess(operationContext(), coordinatorDoc); - assertChunkVersionIncreasedAfterStateTransition(finalChunk, collectionVersion); + + auto finalOriginalCollectionVersion = + assertGet(getCollectionVersion(operationContext(), _originalNss)); + ASSERT_TRUE(initialOriginalCollectionVersion.isOlderThan(finalOriginalCollectionVersion)); } TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDoesNotExistFails) { @@ -858,7 +865,7 @@ TEST_F(ReshardingCoordinatorPersistenceTest, StateTransitionWhenCoordinatorDocDo .writeStateTransitionAndCatalogUpdatesThenBumpShardVersions( operationContext(), coordinatorDoc), AssertionException, - 5514600); + ErrorCodes::NamespaceNotFound); } TEST_F(ReshardingCoordinatorPersistenceTest, -- 2.17.1