Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-28625

replace ClusterAggregate's use of Strategy::commandOp/ShardConnection with establishCursors/Shard::runCommand

    • Type: Icon: Task Task
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • 3.5.7
    • Affects Version/s: 3.5.5
    • Component/s: Sharding
    • None
    • Fully Compatible
    • Sharding 2017-04-17, Sharding 2017-05-08

      [update: bumping this ahead of replacing other callers of Strategy::commandOp]

      This ticket will make ClusterAggregate::run replace its use of Strategy::commandOp with a function that:

      • uses establishCursors() with batchSize=0 to establish cursors
      • wraps the use of establishCursors() with stale shardVersion handling logic (that is, targeting shards based on the namespace and query, invalidating the routing table cache on stale shardVersion errors, and re-targeting and retrying on stale shardVersion errors)

      This means shardVersion retries will still be encapsulated within the wrapper, but since it will be a simple loop around establishCursors(), it should be easy to pull this logic up a level to the various callers.

      Approximate code:

      StatusWith<std::vector<AsyncRequestsSender::Response>> establishCursorsWithShardVersioning(
                               OperationContext* opCtx,
                               const string& db,
                               const BSONObj& command,
                               int options,
                               const string& versionedNS,
                               const BSONObj& targetingQuery,
                               const BSONObj& targetingCollation,
                               std::vector<AsyncRequestsSender::Response>* results) {
              // Build readPref by extracting readPref from command and checking options for slaveOk
              auto readPref = ...;
      
              // Determine from 'options' and QueryOption_PartialResults if allowPartialResults should be true
              bool allowPartialResults = ...;
      
              // Note, this will be the new shape of the vector of results returned by Strategy::commandOp(). Each AsyncRequestsSender::Response contains the targeted shard's shardId, exact targeted host, and error status or BSONObj command result. It is a strict superset of what was returned before.
              StatusWith<std::vector<AsyncRequestsSender::Response>> swResponses(
                  (std::vector<AsyncRequestsSender::Response>()));
              int numAttempts = 0;
              do {
                  auto routingInfoStatus =
                      Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(opCtx, versionedNs);
                  if (ErrorCodes::NamespaceNotFound == routingInfoStatus) {
                      // TODO: Does aggregation expect to get an empty vector of responses in this case?
                      break;
                  }
                  auto routingInfo = uassertStatusOK(routingInfoStatus);
      
                  auto requests =
                      buildRequestsForShardsForQuery(opCtx, routingInfo, command, targetingQuery, targetingCollation);
      
                  swResponses = establishCursors(opCtx, Grid::get(opCtx)->getExecutorPool()->getArbitraryExecutor(), versionedNs, readPref, requests, allowPartialResults, nullptr /* TODO: can aggregation ever get CommandOnShardedViewNotSupportedOnMongod?);
      
                  if (ErrorCodes::isStaleShardingError(swResponses.getStatus().code())) {
                      Grid::get(opCtx)->catalogCache()->onStaleConfigError(std::move(routingInfo));
                  }
      
                  ++numAttempts;
              } while (numAttempts < kMaxNumStaleVersionRetries && !swResponses.getStatus().isOK());
      
              // If we couldn't establish a shardVersion, throw. This will be caught at the top by Strategy::clientCommandOp(), the way shardVersion errors thrown by ParallelSortClusteredCursor are now.
              uassertStatusOK(swResponses.getStatus());
              return swResponses.getValue();
      }
      

      david.storch

            Assignee:
            esha.maharishi@mongodb.com Esha Maharishi (Inactive)
            Reporter:
            esha.maharishi@mongodb.com Esha Maharishi (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: