When using the MongoPaginateBySizePartitioner, some documents are duplicated in the dataset.
Versions used:
MongoDB | 3.4.10 |
Spark | 2.4.0 |
Mongo Spark connector | 2.4.1 |
Code to reproduce the problem:
First, insert 2 million documents in a collection:
case class Data(_id: String, i: Double) object insert { def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[4]") .getOrCreate() import spark.implicits._ val twoMillion = spark.range(1L, 2000001L).map { i => Data(i.toString, i.toDouble) } twoMillion .write .mode(SaveMode.Overwrite) .format("com.mongodb.spark.sql") .options( Map("uri" -> "mongodb://******:27017/******", "collection" -> "test")) .save() } }
This correctly inserts exactly 2 million documents:
db.getCollection('test').count({}) // 2000000 db.getCollection('test').find().limit(1) // {"_id" : "500001", "i" : 500001.0}
Then, read the collection using MongoPaginateBySizePartitioner:
object read { case class Data(_id: String, i: Double) def main(args: Array[String]): Unit = { val spark: SparkSession = SparkSession .builder() .master("local[4]") .getOrCreate() import spark.implicits._ val ds = MongoSpark.load(spark, ReadConfig( Map( "uri" -> "mongodb://recload-mongo01.hon.2i.internal:27017/test-favi", "partitioner" -> "MongoPaginateBySizePartitioner", "partitionerOptions.partitionSizeMB" -> "1", "collection" -> "test" ) )).as[Data] ds.createOrReplaceTempView("ds") println("count -> " + ds.count()) spark.sql("select _id, count(*) from ds group by _id having count(*) >= 2").show() } }
This prints:
count -> 3081344 +-------+--------+ | _id|count(1)| +-------+--------+ | 100010| 2| |1000240| 2| |1000280| 2| |1000665| 2| |1000795| 2| |1000839| 2| |1000888| 2| | 100140| 2| |1001866| 2| |1002011| 2| |1002185| 2| | 100227| 2| |1002442| 2| | 100263| 2| |1002783| 2| |1002883| 2| |1002887| 2| | 100320| 2| |1003202| 2| |1003366| 2| +-------+--------+ only showing top 20 rows