Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-356

Support compound keys

    • Type: Icon: New Feature New Feature
    • Resolution: Fixed
    • Priority: Icon: Trivial - P5 Trivial - P5
    • 10.4.0
    • Affects Version/s: None
    • Component/s: Reads
    • None
    • Needed
    • Hide

      1. What would you like to communicate to the user about this feature?

      Added a new partitioner: Auto Bucket Partitioner

          A new `$sample` based partitioner that provides support for all collection types. Supports partitioning across single or multiple fields, including nested fields.

          The logic for the partitioner is as follows:

          - Calculate the number of documents per partition. Runs a `$collStats` aggregation to get the average document size.
          - Determines the total count of documents. Uses the `$collStats` count or by running a `countDocuments` query if the user supplies their own `aggregation.pipeline` configuration.
          - Determines the number of partitions. Calculated as: `count / number of documents per partition`
          - Determines the number of documents to $sample. Calculated as: `samples per partition * number of partitions`.
          - Creates the aggregation pipeline to generate the partitions.
            ```
            [{$match: <the $match stage of the users aggregation pipeline - iff the first stage is a $match>},
             {$sample: <number of documents to $sample>},
             {$addFields: {<partition key projection field>: {<'i': '$fieldList[i]' ...>}} // Only added iff fieldList.size() > 1
             {$bucketAuto:

      {               groupBy: <partition key projection field>,               buckets: <number of partitions>           }

             }
            ]
            ```

          Configurations:

          - `fieldList`: The field list to be used for partitioning.
            Either a single field name or a list of comma separated fields.
            Defaults to: "_id".
          - `chunkSize`: The average size (MB) for each partition.
            Note: Uses the average document size to determine the number of documents per partition so
            partitions may not be even.
            Defaults to: 64.
          - `samplesPerPartition`: The number of samples to take per partition.
            Defaults to: 100.
          - `partitionKeyProjectionField`: The field name to use for a projected field that contains all the
            fields used to partition the collection.
            Defaults to: "__idx".
            Recommended to only change if there already is a "__idx" field in the document.

          Partitions are calculated as logical ranges. When using sharded clusters these will map closely to ranged chunks.
          When using with hashed shard keys these logical ranges require broadcast operations.

          Similar to the SamplePartitioner however uses the $bucketAuto aggregation stage to generate the partition bounds.

      2. Would you like the user to see examples of the syntax and/or executable code and its output?

      3. Which versions of the driver/connector does this apply to? - 10.4.0

       

      Show
      1. What would you like to communicate to the user about this feature? Added a new partitioner: Auto Bucket Partitioner     A new `$sample` based partitioner that provides support for all collection types. Supports partitioning across single or multiple fields, including nested fields.     The logic for the partitioner is as follows:     - Calculate the number of documents per partition. Runs a `$collStats` aggregation to get the average document size.     - Determines the total count of documents. Uses the `$collStats` count or by running a `countDocuments` query if the user supplies their own `aggregation.pipeline` configuration.     - Determines the number of partitions. Calculated as: `count / number of documents per partition`     - Determines the number of documents to $sample. Calculated as: `samples per partition * number of partitions`.     - Creates the aggregation pipeline to generate the partitions.       ```       [{$match: <the $match stage of the users aggregation pipeline - iff the first stage is a $match>},        {$sample: <number of documents to $sample>},        {$addFields: {<partition key projection field>: {<'i': '$fieldList [i] ' ...>}} // Only added iff fieldList.size() > 1        {$bucketAuto: {               groupBy: <partition key projection field>,               buckets: <number of partitions>           }        }       ]       ```     Configurations:     - `fieldList`: The field list to be used for partitioning.       Either a single field name or a list of comma separated fields.       Defaults to: "_id".     - `chunkSize`: The average size (MB) for each partition.       Note: Uses the average document size to determine the number of documents per partition so       partitions may not be even.       Defaults to: 64.     - `samplesPerPartition`: The number of samples to take per partition.       Defaults to: 100.     - `partitionKeyProjectionField`: The field name to use for a projected field that contains all the       fields used to partition the collection.       Defaults to: "__idx".       Recommended to only change if there already is a "__idx" field in the document.     Partitions are calculated as logical ranges. When using sharded clusters these will map closely to ranged chunks.     When using with hashed shard keys these logical ranges require broadcast operations.     Similar to the SamplePartitioner however uses the $bucketAuto aggregation stage to generate the partition bounds. 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to? - 10.4.0  

      Add support for compound / shard keys:

      Given the following example:

      db.test.drop();
      db.test.createIndex( { "a": 1, "b": 1 })
      
      db.test.insertMany([
        {_id: 11, a: 1, b: 1},
        {_id: 12, a: 1, b: 2},
        {_id: 13, a: 1, b: 3},
        {_id: 14, a: 2, b: 1},
        {_id: 15, a: 2, b: 2},
        {_id: 16, a: 2, b: 3},
        {_id: 17, a: 2, b: 4}]);
      

      If the partitioner was based on fields a and b and the generated partition ranges looked like:

      {a: 1, b: 1}, {a: 1, b: 3}
      {a: 1, b: 3}, {a: 2, b: 2}
      {a: 2, b: 2}, {a: 2, b: 3}
      

      The to match against {a: 1, b: 3}, {a: 2, b: 2} you would need to project out the compound fields so a full bson comparision can be made, rather than just key value comparisions.

      eg:

      db.test.aggregate([
        {"$addFields": {"__idx": {"a": "$a", "b": "$b"}}},
        {"$match": {"__idx": {"$gte": {"a": 1, "b": 3}, "$lt": {"a": 2, "b": 2}}}}])
      

      A compound key partitioner should be added to generate valid partitions against multiple keys.

      Given changes in chunk sizes post MongoDB 6.0 its recommended that this be a new partitioner that samples the database and provides the ranges and pipelines for the partitioner.

      Was:

      Support hashed shard keys and compound keys

      SPARK-345 disabled compound shard key support but it appears they can be supported along with hashed shard keys.

      Using similar logic from the shard repartitioner a aggregation pipeline could be constructed to lookup chunk ranges.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ross@mongodb.com Ross Lawley
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: