diff --git a/src/mongo/db/repl/tenant_oplog_applier.cpp b/src/mongo/db/repl/tenant_oplog_applier.cpp index 95c2643513b..9186721c273 100644 --- a/src/mongo/db/repl/tenant_oplog_applier.cpp +++ b/src/mongo/db/repl/tenant_oplog_applier.cpp @@ -604,6 +604,8 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( auto opCtx = cc().makeOperationContext(); tenantMigrationInfo(opCtx.get()) = boost::make_optional(_migrationUuid); + invariant(!_cloneFinishedRecipientOpTime.isNull()); + // Since the client object persists across each noop write call and the same writer thread could // be reused to write noop entries with older optime, we need to clear the lastOp associated // with the client to avoid the invariant in replClientInfo::setLastOp that the optime only goes @@ -617,15 +619,6 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( // track the statements in a retryable write. std::unique_ptr scopedSession; - // Make sure a partial session doesn't escape. - ON_BLOCK_EXIT([this, &scopedSession, &opCtx] { - if (scopedSession) { - auto txnParticipant = TransactionParticipant::get(opCtx.get()); - invariant(txnParticipant); - txnParticipant.invalidate(opCtx.get()); - } - }); - boost::optional prePostImageEntry = boost::none; OpTime originalPrePostImageOpTime; for (auto iter = begin; iter != end; iter++) { @@ -675,12 +668,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( // Check out the session. if (!scopedSession) { auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get()); - if (_isResuming) { - scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); - } else { - scopedSession = - mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); - } + scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); } auto txnParticipant = TransactionParticipant::get(opCtx.get()); @@ -690,14 +678,18 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "for transaction " << txnNumber << " on session " << sessionId, txnParticipant); + // We should only write the noop entry for this transaction commit once. - uassert(5351501, - str::stream() << "Tenant oplog application cannot apply transaction " - << txnNumber << " on session " << sessionId - << " because the transaction with txnNumberAndRetryCounter " - << txnParticipant.getActiveTxnNumberAndRetryCounter().toBSON() - << " has already started", - txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() < txnNumber); + // + // Out-of-order processing is not possible, except in failover cases. + if (txnNumber <= txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber()) { + // It's safe to to skip only if the current active transaction number last write + // opTime is after the oplog catchup phase of this migration attempt. Otherwise, + // it indicates some potential data corruption. + invariant(txnParticipant.getLastWriteOpTime() > _cloneFinishedRecipientOpTime); + continue; + } + txnParticipant.beginOrContinueTransactionUnconditionally( opCtx.get(), {txnNumber, optTxnRetryCounter}); @@ -833,12 +825,7 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( if (!scopedSession) { auto mongoDSessionCatalog = MongoDSessionCatalog::get(opCtx.get()); - if (_isResuming) { - scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); - } else { - scopedSession = - mongoDSessionCatalog->checkOutSessionWithoutOplogRead(opCtx.get()); - } + scopedSession = mongoDSessionCatalog->checkOutSession(opCtx.get()); } auto txnParticipant = TransactionParticipant::get(opCtx.get()); @@ -848,28 +835,27 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( << txnNumber << " on session " << sessionId, txnParticipant); - // beginOrContinue throws on failure, which will abort the migration. Failure should - // only result from out-of-order processing, which should not happen. - TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; - txnParticipant.beginOrContinue(opCtx.get(), - txnNumberAndRetryCounter, - boost::none /* autocommit */, - boost::none /* startTransaction */); - - // We could have an existing lastWriteOpTime for the same retryable write chain from a - // previously aborted migration. This could also happen if the tenant being migrated has - // previously resided in this replica set. So we want to start a new history chain - // instead of linking the newly generated no-op to the existing chain before the current - // migration starts. Otherwise, we could have duplicate entries for the same stmtId. - invariant(!_cloneFinishedRecipientOpTime.isNull()); - if (txnParticipant.getLastWriteOpTime() > _cloneFinishedRecipientOpTime) { - prevWriteOpTime = txnParticipant.getLastWriteOpTime(); - } else { + if (txnParticipant.getLastWriteOpTime() <= _cloneFinishedRecipientOpTime) { + // We can reach here in the following cases: + // 1) LastWriteOpTime is not null. + // - In Back-to-back migration (rs0->rs1->rs0) and migration retry, where + // txnNumber == txnParticipant.o().activeTxnNumber and rs0 happen to contain + // the chain already. This requires chain reset and invalidating the + // in-memory transaction state. Otherwise, we could have duplicate entries + // for the same stmtId. + // 2) LastWriteOpTime is Null. + // - In back-to-back migration (rs0->rs1->rs0), where + // `txnNumber` < txnParticipant.o().activeTxnNumber and stmtId0@activeTxnNumber + // failed previously in rs0 before migrating tenant back into rs0. This requires + // invalidating the in-memory transaction state. Otherwise, we may skip writing + // the history chain for `txnNumber`. + // - New session with no transaction started (This will be a no-op). + + // Reset retryable write history chain. prevWriteOpTime = OpTime(); - // Before we start a new history chain, reset the in-memory retryable write - // state in the txnParticipant so it can be built up from scratch again with - // the new chain. + // Reset the in-memory retryable write state in the txnParticipant about the list of + // committed statements so it can be built up from scratch again with the new chain. LOGV2_DEBUG(5709800, 2, "Tenant oplog applier resetting existing retryable write state", @@ -882,19 +868,42 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( "migrationId"_attr = _migrationUuid); txnParticipant.invalidate(opCtx.get()); txnParticipant.refreshFromStorageIfNeededNoOplogEntryFetch(opCtx.get()); - TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; - txnParticipant.beginOrContinue(opCtx.get(), - txnNumberAndRetryCounter, - boost::none /* autocommit */, - boost::none /* startTransaction */); + } + + if (txnNumber < txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber()) { + // It's safe to to skip only if the current active transaction number last update + // happened during oplog catchup phase of this migration attempt. Otherwise, + // it indicates some potential data corruption. + invariant(txnParticipant.getLastWriteOpTime() > _cloneFinishedRecipientOpTime); + continue; + } + + TxnNumberAndRetryCounter txnNumberAndRetryCounter{txnNumber}; + txnParticipant.beginOrContinue(opCtx.get(), + txnNumberAndRetryCounter, + boost::none /* autocommit */, + boost::none /* startTransaction */); + if (!prevWriteOpTime) { + prevWriteOpTime = txnParticipant.getLastWriteOpTime(); } // We should never process the same donor statement twice, except in failover // cases. In the event of a failover, it is possible that we were able to successfully // log the noop but failed to persist progress checkpoint data. As a result, we can end // up re-applying noop entries. We can safely skip the entry in this case. - if (txnParticipant.checkStatementExecutedNoOplogEntryFetch(opCtx.get(), - stmtIds.front())) { + // + // Statement IDs aren't guaranteed to be strictly increasing in sequence number. + // So, check all the statements in the oplog entry are already executed before we skip + // this oplog entry. + auto shouldSkip = [&] { + for (auto&& stmtId : stmtIds) { + if (!txnParticipant.checkStatementExecuted(opCtx.get(), stmtId)) + return false; + } + return true; + }(); + + if (shouldSkip) { LOGV2_DEBUG(7262200, 1, "Tenant Oplog Applier skipping previously processed retryable write", @@ -953,14 +962,9 @@ void TenantOplogApplier::_writeSessionNoOpsForRange( }); prePostImageEntry = boost::none; - // Invalidate in-memory state so that the next time the session is checked out, it - // would reload the transaction state from config.transactions. + // This opCtx can be used to apply later operations in the batch, clean up before reusing. if (opCtx->inMultiDocumentTransaction()) { - auto txnParticipant = TransactionParticipant::get(opCtx.get()); - invariant(txnParticipant); - txnParticipant.invalidate(opCtx.get()); opCtx->resetMultiDocumentTransactionState(); - scopedSession = {}; } } }