Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-151

Issues with the SamplePartitioner

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.1.1, 2.2.1
    • Affects Version/s: 2.2.0
    • Component/s: None
    • None

      Spark Connector 2.2.0
      Spark 2.2
      MongoDB 3.4.6

      The default SamplePartitioner gives unexpected behavior on an aggregation with:

      • A $match clause that returns N objects from the collection, and
      • N * avgObjSize > partitionSize

      Steps to reproduce:

      1. Create a standalone MongoDB instance
      2. Create a test collection with data Size > the default partitionSize 64 MB
        use test
        var tdoc = 'a'.repeat(1000)
        for(ii=0;ii<1024*64;ii++) { db.over64mb.insert({"a": tdoc}); }
        // count = 65536
        // avgObjSize = 1030 
        
      3. Run the following aggregation in Spark
        import org.apache.spark.api.java.JavaSparkContext;
        import org.apache.spark.sql.SparkSession;
        import org.bson.Document;
        
        import com.mongodb.spark.MongoSpark;
        import com.mongodb.spark.config.ReadConfig;
        import com.mongodb.spark.rdd.api.java.JavaMongoRDD;
        
        import static java.util.Collections.singletonList;
        
        public final class DoAggregation {
        
          public static void main(final String[] args) throws InterruptedException {
        
            SparkSession spark = SparkSession.builder()
              .master("local")
              .appName("DoAggregation")
              .config("spark.mongodb.input.uri", "mongodb://localhost:24000/test.over64mb")
              .config("spark.mongodb.output.uri", "mongodb://localhost:24000/test.over64mb")
              .config("spark.mongodb.input.partitionerOptions.samplesPerPartition", "200")
              .getOrCreate();
        
            JavaSparkContext jsc = new JavaSparkContext(spark.sparkContext());
        
            ReadConfig readConfig = ReadConfig.create(jsc);
        
            JavaMongoRDD<Document> rdd = MongoSpark.load(jsc, readConfig).withPipeline(
              singletonList(
                Document.parse("{ $match: { \"a\": {\"$ne\": \"b\"} }}")));
        
            System.out.println("rdd.count(): " + rdd.count());
        
            jsc.close();
        
          }
        }
        

      The $match clause here should find all 65536 documents (the behavior is the same if only a subset is matched, as long as the result set is larger than the partition size).

      The unexpected behavior is that different counts will be produced with each run, such as:

      rdd.count(): 64781
      rdd.count(): 64689
      

      Logging all queries in the mongod log, it seems only one partition was used and it does not cover the full result set:

      2017-10-24T16:19:16.759-0700 I COMMAND  [conn6] command test.over64mb command: collStats { collStats: "over64mb" } numYields:0 reslen:11363 locks:{ Global: { acquireCount: { r: 2 } }, Database: { acquireCount: { r: 1 } }, Collection: { acquireCount: { r: 1 } } } protocol:op_query 1ms
      2017-10-24T16:19:16.813-0700 I COMMAND  [conn6] command test.over64mb command: count { count: "over64mb", query: { a: { $ne: "b" } } } planSummary: COLLSCAN keysExamined:0 docsExamined:65536 numYields:512 reslen:44 locks:{ Global: { acquireCount: { r: 1026 } }, Database: { acquireCount: { r: 513 } }, Collection: { acquireCount: { r: 513 } } } protocol:op_query 34ms
      2017-10-24T16:19:16.883-0700 I COMMAND  [conn6] command test.over64mb command: aggregate { aggregate: "over64mb", pipeline: [ { $match: { a: { $ne: "b" } } }, { $sample: { size: 201 } }, { $project: { _id: 1 } }, { $sort: { _id: 1 } } ], cursor: {}, allowDiskUse: true } planSummary: COLLSCAN cursorid:20327439006 keysExamined:0 docsExamined:65536 hasSortStage:1 numYields:513 nreturned:101 reslen:2718 locks:{ Global: { acquireCount: { r: 1038 } }, Database: { acquireCount: { r: 519 } }, Collection: { acquireCount: { r: 518 } } } protocol:op_query 59ms
      2017-10-24T16:19:16.892-0700 I COMMAND  [conn6] command test.over64mb command: getMore { getMore: 20327439006, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { a: { $ne: "b" } } }, { $sample: { size: 201 } }, { $project: { _id: 1 } }, { $sort: { _id: 1 } } ], cursor: {}, allowDiskUse: true } planSummary: COLLSCAN cursorid:20327439006 keysExamined:0 docsExamined:0 hasSortStage:1 cursorExhausted:1 numYields:0 nreturned:100 reslen:2690 locks:{ Global: { acquireCount: { r: 4 } }, Database: { acquireCount: { r: 2 } }, Collection: { acquireCount: { r: 2 } } } protocol:op_query 1ms
      2017-10-24T16:19:17.217-0700 I COMMAND  [conn6] command test.over64mb command: aggregate { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:3450 docsExamined:3450 numYields:26 nreturned:101 reslen:104526 locks:{ Global: { acquireCount: { r: 60 } }, Database: { acquireCount: { r: 30 } }, Collection: { acquireCount: { r: 29 } } } protocol:op_query 6ms
      2017-10-24T16:19:17.256-0700 I COMMAND  [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:13800 docsExamined:13800 numYields:108 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 228 } }, Database: { acquireCount: { r: 114 } }, Collection: { acquireCount: { r: 114 } } } protocol:op_query 35ms
      2017-10-24T16:19:17.348-0700 I COMMAND  [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:17250 docsExamined:17250 numYields:136 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 286 } }, Database: { acquireCount: { r: 143 } }, Collection: { acquireCount: { r: 143 } } } protocol:op_query 39ms
      2017-10-24T16:19:17.553-0700 I COMMAND  [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:17250 docsExamined:17250 numYields:135 nreturned:16189 reslen:16776983 locks:{ Global: { acquireCount: { r: 284 } }, Database: { acquireCount: { r: 142 } }, Collection: { acquireCount: { r: 142 } } } protocol:op_query 39ms
      2017-10-24T16:19:17.615-0700 I COMMAND  [conn6] command test.over64mb command: getMore { getMore: 18114994321, collection: "over64mb" } originatingCommand: { aggregate: "over64mb", pipeline: [ { $match: { _id: { $gte: ObjectId('59ee2c389de662bb1e05a9fc'), $lte: ObjectId('59ee2c4c9de662bb1e06a6ac') } } }, { $match: { a: { $ne: "b" } } } ], cursor: {}, allowDiskUse: true } planSummary: IXSCAN { _id: 1 } cursorid:18114994321 keysExamined:12939 docsExamined:12939 cursorExhausted:1 numYields:101 nreturned:16021 reslen:16602767 locks:{ Global: { acquireCount: { r: 214 } }, Database: { acquireCount: { r: 107 } }, Collection: { acquireCount: { r: 107 } } } protocol:op_query 33ms
      

      Also, the Sample Partitioner starts by doing a full count of the $match query, which seems unexpected in my view and in some cases may be expensive.

      In my tests if lower sample sizes, such as the default of 10, are used on the example code above, the Spark run would abort with:

      Exception in thread "main" java.util.NoSuchElementException: next on empty iterator
      	at scala.collection.Iterator$$anon$2.next(Iterator.scala:39)
      	at scala.collection.Iterator$$anon$2.next(Iterator.scala:37)
      	at scala.collection.IndexedSeqLike$Elements.next(IndexedSeqLike.scala:63)
      	at scala.collection.IterableLike$class.head(IterableLike.scala:107)
      	at scala.collection.mutable.ArrayOps$ofRef.scala$collection$IndexedSeqOptimized$$super$head(ArrayOps.scala:186)
      	at scala.collection.IndexedSeqOptimized$class.head(IndexedSeqOptimized.scala:126)
      	at scala.collection.mutable.ArrayOps$ofRef.head(ArrayOps.scala:186)
      	at com.mongodb.spark.rdd.partitioner.PartitionerHelper$.setLastBoundaryToLessThanOrEqualTo(PartitionerHelper.scala:127)
      	at com.mongodb.spark.rdd.partitioner.MongoSamplePartitioner.partitions(MongoSamplePartitioner.scala:111)
      	at com.mongodb.spark.rdd.partitioner.DefaultMongoPartitioner.partitions(DefaultMongoPartitioner.scala:34)
      	at com.mongodb.spark.rdd.MongoRDD.getPartitions(MongoRDD.scala:137)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      	at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      	at scala.Option.getOrElse(Option.scala:121)
      	at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
      	at org.apache.spark.rdd.RDD.count(RDD.scala:1158)
      	at org.apache.spark.api.java.JavaRDDLike$class.count(JavaRDDLike.scala:455)
      	at org.apache.spark.api.java.AbstractJavaRDDLike.count(JavaRDDLike.scala:45)
      	at DoAggregation.main(DoAggregation.java:33)
      	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
      	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      	at java.lang.reflect.Method.invoke(Method.java:483)
      	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:755)
      	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:180)
      	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:205)
      	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:119)
      	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
      

      The behavior is the same with 2 or 4 workers.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            alan.mccoy Roger McCoy (Inactive)
            Votes:
            1 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: