-
Type: Bug
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.2.0
-
Component/s: API
-
None
-
Environment:artifact: org.mongodb.spark:mongo-spark-connector_2.11:2.2.0
the issue was reproduced on:
1) AWS t2.micro, ubuntu 16.04, java 1.8.0_144, mongodb 3.4.7, python 3.5.2, pyspark 2.2.0 (pip)
2) i7 16Gb RAM desktop, Windows 10, java 1.8.0_31, mongodb 3.4.2, python 3.6.2 (win32), pyspark 2.2.0 (pip)
In both cases, pyspark was shipped with spark-*-2.11-2.2.0 and scala-*-2.11.8 jars.
artifact: org.mongodb.spark:mongo-spark-connector_2.11:2.2.0 the issue was reproduced on: 1) AWS t2.micro, ubuntu 16.04, java 1.8.0_144, mongodb 3.4.7, python 3.5.2, pyspark 2.2.0 (pip) 2) i7 16Gb RAM desktop, Windows 10, java 1.8.0_31, mongodb 3.4.2, python 3.6.2 (win32), pyspark 2.2.0 (pip) In both cases, pyspark was shipped with spark-*-2.11-2.2.0 and scala-*-2.11.8 jars.
Case description
When using mogodb spark connector, sometimes incorrect results are returned by the aggregation pipeline on $limit state, regardless of proceeding stages: the number of returned documents exceeds the $limit value. This behavior was only noticed on relatively long collections with "fat" documents.
Code example
Please, consider the code snippet below that consistently reproduces the issue for the single document limit ($limit:1).
spark.createDataFrame([(i, [k for k in range(100)]) for i in range(100000)], ["seq", "data"]) \ .write.format("com.mongodb.spark.sql.DefaultSource") \ .mode("overwrite").save() test2 = spark.read.format("com.mongodb.spark.sql.DefaultSource") \ .option("sampleSize", 100) \ .option("pipeline", [{'$limit': 1}]) \ .load() print('Test 2: Expected 1 row, got', test2.count(), 'rows:') test2.show()
The following output is produced by the code:
Test 2: Expected 1 row, got 3 rows: +--------------------+--------------------+-----+ | _id| data| seq| +--------------------+--------------------+-----+ |[598ceed7a751cc6b...|[0, 1, 2, 3, 4, 5...| 0| |[598ceed9a751cc6b...|[0, 1, 2, 3, 4, 5...|31697| |[598ceedaa751cc6b...|[0, 1, 2, 3, 4, 5...|66686| +--------------------+--------------------+-----+
Obviously, the DataFrame is expected to have a single row.
Please, consider the complete working example attached as app.py.
The example consists of two tests: one for small dataset (than passes), and one for largr dataset (that fails). Both tests execute queries against the same schema, only the number of documents is different.
The example was launched locally in a standalone mode using the command:
spark-submit --packages org.mongodb.spark:mongo-spark-connector_2.11:2.2.0 app.py