Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-114

Aggregation pipeline: can not sort more than 100MB (not being able to pass allowDiskUse)

    • Type: Icon: Bug Bug
    • Resolution: Duplicate
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 1.1.0
    • Component/s: Performance
    • Environment:
      Spark 1.6, Scala 2.11.8 Linux

      Sorting in an aggregation pipeline returns an exception. Can not pass allowDiskUse to avoid it.

      This is similar to https://jira.mongodb.org/browse/SPARK-81?jql=text%20~%20%22allowDiskUse%22

      Running from the pyspark shell with the following command line

      pyspark --conf "spark.mongodb.input.uri=mongodb://127.0.0.1/eventsdb.davinciEvents?readPreference=primaryPreferred"               --conf "spark.mongodb.output.uri=mongodb://127.0.0.1/eventsdb.davinciEvents" --packages org.mongodb.spark:mongo-spark-connector_2.10:1.1.0
      

      The script

      from pyspark import SparkConf, SparkContext
      from pyspark.sql import SQLContext
      from pyspark.sql import functions as F
      
      conf = SparkConf().setAppName("SparkMongoTest")
      conf = conf.setMaster("local[*]")
      
      conf.set('spark.mongodb.input.database', 'eventsdb')
      conf.set('spark.mongodb.input.collection', 'davinciEvents')
      
      sc = SparkContext.getOrCreate(conf=conf)
      pipeline = "{'$sort': {'eventType': 1} }"
      
      df = sqlContext.read.format("com.mongodb.spark.sql.DefaultSource").option("pipeline", pipeline).load()
      

      returns the stacktrace

      17/03/22 16:09:18 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
      com.mongodb.MongoCommandException: Command failed with error 16819: 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.' on server 127.0.0.1:27017. The full response is { "ok" : 0.0, "errmsg" : "Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.", "code" : 16819, "codeName" : "Location16819" }
      	at com.mongodb.connection.ProtocolHelper.getCommandFailureException(ProtocolHelper.java:115)
      	at com.mongodb.connection.CommandProtocol.execute(CommandProtocol.java:114)
      	at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:159)
      	at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:286)
      	at com.mongodb.connection.DefaultServerConnection.command(DefaultServerConnection.java:173)
      	at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:215)
      	at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:206)
      	at com.mongodb.operation.CommandOperationHelper.executeWrappedCommandProtocol(CommandOperationHelper.java:112)
      	at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:227)
      	at com.mongodb.operation.AggregateOperation$1.call(AggregateOperation.java:223)
      	at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:239)
      	at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:212)
      	at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:223)
      	at com.mongodb.operation.AggregateOperation.execute(AggregateOperation.java:65)
      	at com.mongodb.Mongo.execute(Mongo.java:772)
      	at com.mongodb.Mongo$2.execute(Mongo.java:759)
      	at com.mongodb.OperationIterable.iterator(OperationIterable.java:47)
      	at com.mongodb.AggregateIterableImpl.iterator(AggregateIterableImpl.java:102)
      	at com.mongodb.spark.rdd.MongoRDD.getCursor(MongoRDD.scala:166)
      	at com.mongodb.spark.rdd.MongoRDD.compute(MongoRDD.scala:142)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
      	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:306)
      	at org.apache.spark.rdd.RDD.iterator(RDD.scala:270)
      	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)
      	at org.apache.spark.scheduler.Task.run(Task.scala:89)
      	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:227)
      	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      	at java.lang.Thread.run(Thread.java:745)
      17/03/22 16:09:18 WARN TaskSetManager: Lost task 0.0 in stage 1.0 (TID 1, localhost): com.mongodb.MongoCommandException: Command failed with error 16819: 'Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.' on server 127.0.0.1:27017. The full response is { "ok" : 0.0, "errmsg" : "Sort exceeded memory limit of 104857600 bytes, but did not opt in to external sorting. Aborting operation. Pass allowDiskUse:true to opt in.", "code" : 16819, "codeName" : "Location16819" }
      

            Assignee:
            Unassigned Unassigned
            Reporter:
            jorge.imperial@mongodb.com Jorge Imperial-Sosa (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: