Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-98

MongoShardedPartitioner and hashed shard keys not working correctly

    • Type: Icon: Bug Bug
    • Resolution: Won't Fix
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: API, Performance
    • None

      Hi,

      while I've skimmed through the partitioner package, especially MongoShardedPartitioner I ran into something weird.
      What it I think the MongoShardedPartitioner does is:

      1. Query config.chunks.find({ns: "dbName.collectionName"}) while projecting to include only the fields min, max, shard in the output
      2. generates a MongoPartition containing the index and a boundary query for each chunk.

      When accessing the MongoRDD, the boundary query will be attached to all queries that get sent to MongoDB. Taken from a Mongod logfile:

      2016-11-17T11:22:33.347+0100 I COMMAND  [conn31] getmore fleetdata.data query: { aggregate: "data", pipeline: [ { $match: { $and: [ { sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42920592668 ntoreturn:0 cursorExhausted:1 keyUpdates:0 writeConflicts:0 numYields:21431 nreturned:0 reslen:20 locks:{ Global: { acquireCount: { r: 42868 } }, Database: { acquireCount: { r: 21434 } }, Collection: { acquireCount: { r: 21434 } } } 42765ms
      

      However, this operation is very slow.
      I get 0 as the result when running db.data.find({ sh_key: { $gte: MinKey, $lt: -9218136772714079893 } }).count() – I'm not quite sure if that's supposed to happen. I also ran the same query on the boundaries of a different chunk, but the outcome were also 0: db.data.find({ sh_key: { $gte: NumberLong("-8016443680476223100"), $lt: NumberLong("-7983401451479346525") } }).count()

      When I explain() that query, this output is given:

      {
              "queryPlanner" : {
                      "mongosPlannerVersion" : 1,
                      "winningPlan" : {
                              "stage" : "SHARD_MERGE",
                              "shards" : [
                                      {
                                              "shardName" : "rs0",
                                              "connectionString" : "rs0/hadoopb19:27018,hadoopb24:27018",
                                              "serverInfo" : {
                                                      "host" : "Hadoopb24",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              },
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                              {
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      }
                                                              },
                                                              {
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                                                      }
                                                              }
                                                      ]
                                              },
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      }
                                                                              },
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                                                      }
                                                                              }
                                                                      ]
                                                              },
                                                              "direction" : "forward"
                                                      }
                                              },
                                              "rejectedPlans" : [ ]
                                      },
                                      {
                                              "shardName" : "rs1",
                                              "connectionString" : "rs1/hadoopb28:27018,hadoopb30:27018",
                                              "serverInfo" : {
                                                      "host" : "hadoopb28",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              },
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                              {
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      }
                                                              },
                                                              {
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                                                      }
                                                              }
                                                      ]
                                              },
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      }
                                                                              },
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                                                      }
                                                                              }
                                                                      ]
                                                              },
                                                              "direction" : "forward"
                                                      }
                                              },
                                              "rejectedPlans" : [ ]
                                      },
                                      {
                                              "shardName" : "rs2",
                                              "connectionString" : "rs2/Hadoopb32:27018,hadoopb36:27018",
                                              "serverInfo" : {
                                                      "host" : "Hadoopb36",
                                                      "port" : 27018,
                                                      "version" : "3.2.9",
                                                      "gitVersion" : "22ec9e93b40c85fc7cae7d56e7d6a02fd811088c"
                                              },
                                              "plannerVersion" : 1,
                                              "namespace" : "fleetdata.data",
                                              "indexFilterSet" : false,
                                              "parsedQuery" : {
                                                      "$and" : [
                                                              {
                                                                      "sh_key" : {
                                                                              "$lt" : -9218136772714080000
                                                                      }
                                                              },
                                                              {
                                                                      "sh_key" : {
                                                                              "$gte" : { "$minKey" : 1 }
                                                                      }
                                                              }
                                                      ]
                                              },
                                              "winningPlan" : {
                                                      "stage" : "SHARDING_FILTER",
                                                      "inputStage" : {
                                                              "stage" : "COLLSCAN",
                                                              "filter" : {
                                                                      "$and" : [
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$lt" : -9218136772714080000
                                                                                      }
                                                                              },
                                                                              {
                                                                                      "sh_key" : {
                                                                                              "$gte" : { "$minKey" : 1 }
                                                                                      }
                                                                              }
                                                                      ]
                                                              },
                                                              "direction" : "forward"
                                                      }
                                              },
                                              "rejectedPlans" : [ ]
                                      }
                              ]
                      }
              },
              "ok" : 1
      }
      

      So apparently there is a COLLSCAN being done even though there is an index created on the sh_key:

      db.data.getIndexes()
      [
              {
                      "v" : 1,
                      "key" : {
                              "_id" : 1
                      },
                      "name" : "_id_",
                      "ns" : "fleetdata.data"
              },
              {
                      "v" : 1,
                      "key" : {
                              "sh_key" : "hashed"
                      },
                      "name" : "sh_key_hashed",
                      "ns" : "fleetdata.data"
              },
              {
                      "v" : 1,
                      "key" : {
                              "ts" : 1
                      },
                      "name" : "ts_1",
                      "ns" : "fleetdata.data"
              },
              {
                      "v" : 1,
                      "key" : {
                              "location" : "2dsphere"
                      },
                      "name" : "location_2dsphere",
                      "ns" : "fleetdata.data",
                      "2dsphereIndexVersion" : 3
              },
              {
                      "v" : 1,
                      "key" : {
                              "signals.signal" : 1
                      },
                      "name" : "signals.signal_1",
                      "ns" : "fleetdata.data"
              }
      ]
      

      Output of sh.status():

              {  "_id" : "fleetdata",  "primary" : "rs0",  "partitioned" : true }
                      fleetdata.data
                              shard key: { "sh_key" : "hashed" }
                              unique: false
                              balancing: true
                              chunks:
                                      rs0     389
                                      rs1     388
                                      rs2     392
                              too many chunks to print, use verbose if you want to force print
      

      When I remove the .set("spark.mongodb.input.partitionerOptions.shardkey", "sh_key") line of my SparkConf, I see outputs like this in the log file.

      2016-11-17T13:59:28.282+0100 I COMMAND  [conn13] command fleetdata.data command: aggregate { aggregate: "data", pipeline: [ { $match: { $and: [ { _id: { $gte: ObjectId('58073d6fe5a82e03c315f340'), $lt: ObjectId('58073de7e5a82e03c3195bf4') } }, { signals: { $elemMatch: { signal: "SomeSignal", value: { $gt: 0, $lte: 100 } } } } ] } }, { $group: { _id: "$root_document", firstTimestamp: { $min: "$ts" }, lastTimestamp: { $max: "$ts" }, count: { $sum: { $const: 1 } } } } ], fromRouter: true, cursor: { batchSize: 0 } } cursorid:42428915938 keyUpdates:0 writeConflicts:0 numYields:435 reslen:180 locks:{ Global: { acquireCount: { r: 874 } }, Database: { acquireCount: { r: 437 } }, Collection: { acquireCount: { r: 437 } } } protocol:op_query 141ms
      

      This slows down my entire aggregation pipeline by a lot. Seems like when I use the default "shardKey" (which resolves to _id), the execution times are fine. However, when setting the configuration property to the shardKey of my collection ("sh_key"), the execution times explode.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            j9dy F H
            Votes:
            2 Vote for this issue
            Watchers:
            8 Start watching this issue

              Created:
              Updated:
              Resolved: