[update: bumping this ahead of replacing other callers of Strategy::commandOp]
This ticket will make ClusterAggregate::run replace its use of Strategy::commandOp with a function that:
- uses establishCursors() with batchSize=0 to establish cursors
- wraps the use of establishCursors() with stale shardVersion handling logic (that is, targeting shards based on the namespace and query, invalidating the routing table cache on stale shardVersion errors, and re-targeting and retrying on stale shardVersion errors)
This means shardVersion retries will still be encapsulated within the wrapper, but since it will be a simple loop around establishCursors(), it should be easy to pull this logic up a level to the various callers.
Approximate code:
StatusWith<std::vector<AsyncRequestsSender::Response>> establishCursorsWithShardVersioning( OperationContext* opCtx, const string& db, const BSONObj& command, int options, const string& versionedNS, const BSONObj& targetingQuery, const BSONObj& targetingCollation, std::vector<AsyncRequestsSender::Response>* results) { // Build readPref by extracting readPref from command and checking options for slaveOk auto readPref = ...; // Determine from 'options' and QueryOption_PartialResults if allowPartialResults should be true bool allowPartialResults = ...; // Note, this will be the new shape of the vector of results returned by Strategy::commandOp(). Each AsyncRequestsSender::Response contains the targeted shard's shardId, exact targeted host, and error status or BSONObj command result. It is a strict superset of what was returned before. StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses( (std::vector<AsyncRequestsSender::Response>())); int numAttempts = 0; do { auto routingInfoStatus = Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, versionedNs); if (ErrorCodes::NamespaceNotFound == routingInfoStatus) { // TODO: Does aggregation expect to get an empty vector of responses in this case? break; } auto routingInfo = uassertStatusOK(routingInfoStatus); auto requests = buildRequestsForShardsForQuery(opCtx, routingInfo, command, targetingQuery, targetingCollation); swResponses = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), versionedNs, readPref, requests, allowPartialResults, nullptr /* TODO: can aggregation ever get CommandOnShardedViewNotSupportedOnMongod?); if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) { Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo)); } ++numAttempts; } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK()); // If we couldn't establish a shardVersion, throw. This will be caught at the top by Strategy::clientCommandOp(), the way shardVersion errors thrown by ParallelSortClusteredCursor are now. uassertStatusOK(swResponses.getStatus()); return swResponses.getValue(); }
- is duplicated by
-
SERVER-18814 ClusterPipelineCmd calls find on $cmd collection directly instead of using runCommand
- Closed
-
SERVER-28894 replace aggregation's use of ShardConnection with Shard::runCommand
- Closed