Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-179

On bulk write duplicate key error Spark just WARNs

    • Type: Icon: Bug Bug
    • Resolution: Cannot Reproduce
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.2.1
    • Component/s: Writes
    • None
    • Environment:
      Apache Spark 2.2.1, YARN

      Let's have a mongo collection requests, which contains unique index on field field. Then, let's have this pseudo-code:

      case class Request(field: String)
      
      import sparkSession.implicits._
      val ds = sparkSession.createDataset(List(
        Request("one"), Request("one"), Request("two")
      ))
      
      val writeConf = new WriteConfig(db, col, connectionString = Some(uri))
        .withOption("maxBatchSize", 5000)
      
      MongoSpark.save(ds.write.mode("append"), writeConfig)
       

      This code will not fail. You will see in logs this error though:

       

       18/05/03 18:13:32 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 11.0 (TID 276, samplehost.com, executor 5): com.mongodb.MongoBulkWriteException: Bulk write operation error on server samplehost.com:27017. Write errors: [BulkWriteError{index=34, code=11000, message='E11000 duplicate key error collection: sampledb.requests index: field key: { : "one" }', details={ }}]. 
          at com.mongodb.connection.BulkWriteBatchCombiner.getError(BulkWriteBatchCombiner.java:176)
          at com.mongodb.connection.BulkWriteBatchCombiner.throwOnError(BulkWriteBatchCombiner.java:205)
          at com.mongodb.connection.BulkWriteBatchCombiner.getResult(BulkWriteBatchCombiner.java:146)
          at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:188)
          at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
          at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:422)
          at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:413)
          at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
          at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
          at com.mongodb.Mongo.execute(Mongo.java:845)
          at com.mongodb.Mongo$2.execute(Mongo.java:828)
          at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:338)
          at com.mongodb.MongoCollectionImpl.insertMany(MongoCollectionImpl.java:322)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1$$anonfun$apply$2.apply(MongoSpark.scala:119)
          at scala.collection.Iterator$class.foreach(Iterator.scala:893)
          at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118)
          at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
          at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
          at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
          at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
          at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
          at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
          at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118)
          at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117)
          at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
          at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
          at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
          at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
          at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
          at org.apache.spark.scheduler.Task.run(Task.scala:99)
          at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:322)
          at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
          at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
          at java.lang.Thread.run(Thread.java:745)

       

      In the database, record two will not be saved, but the whole Spark job will not fail and look like it is successful. In my opinion, the job should fail.

       

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            jakubco Peter
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: