-
Type: Bug
-
Resolution: Duplicate
-
Priority: Major - P3
-
None
-
Affects Version/s: 1.1.0
-
Component/s: Performance
-
Environment:Spark 1.6, Scala 2.11.8 Linux
-
(copied to CRM)
Sorting in an aggregation pipeline returns an exception. Can not pass allowDiskUse to avoid it.
This is similar to https://jira.mongodb.org/browse/SPARK-81?jql=text%20~%20%22allowDiskUse%22
Running from the pyspark shell with the following command line
pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/eventsdb.davinciEvents?readPreference=primaryPreferred" --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/eventsdb.davinciEvents" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
The script
from pyspark import SparkConf, SparkContext from pyspark.sql import SQLContext from pyspark.sql import functions as F conf = SparkConf().setAppName("SparkMongoTest") conf = conf.setMaster("local[*]") conf.set('spark.mongodb.input.database', 'eventsdb') conf.set('spark.mongodb.input.collection', 'davinciEvents') sc = SparkContext.getOrCreate(conf=conf) pipeline = "{'$sort': {'eventType': 1} }" df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
returns the stacktrace
17/03/22 16:09:18 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1) com.mongodb.MongoCommandException: Command failed with error 16819: 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.' on server 127.0.0.1:27017. The full response is { "ok" : 0.0, "errmsg" : "Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.", "code" : 16819, "codeName" : "Location16819" } at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:115) at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:114) at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159) at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286) at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:173) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:215) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:206) at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:112) at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:227) at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:223) at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:239) at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:212) at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:223) at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:65) at com.mongodb.Mongo.execute(Mongo.java:772) at com.mongodb.Mongo$2.execute(Mongo.java:759) at com.mongodb.OperationIterable.iterator(OperationIterable.java:47) at com.mongodb.AggregateIterableImpl.iterator(AggregateIterableImpl.java:102) at com.mongodb.spark.rdd.MongoRDD.getCursor(MongoRDD.scala:166) at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:142) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306) at org.apache.spark.rdd.RDD.iterator(RDD.scala:270) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66) at org.apache.spark.scheduler.Task.run(Task.scala:89) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) at java.lang.Thread.run(Thread.java:745) 17/03/22 16:09:18 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): com.mongodb.MongoCommandException: Command failed with error 16819: 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.' on server 127.0.0.1:27017. The full response is { "ok" : 0.0, "errmsg" : "Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.", "code" : 16819, "codeName" : "Location16819" }