Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-118

Mongodb batch update error invalid bson field name merchant_code

    • Type: Icon: Task Task
    • Resolution: Won't Fix
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.0.0
    • Component/s: Performance
    • Environment:
      spark 2.1.0
      mongodb 3.4.1

      I want to implement batch update mongodb through spark mongodb connector.
      I change the method save[D](dataset: Dataset[D], writeConfig: WriteConfig) of the object com.mongodb.spark.MongoSpark. In my issue, I need update operation, so I change the worj "case Some(_id) => new ReplaceOneModel[BsonDocument](new BsonDocument("_id", _id), doc, updateOptions)" into UpdateOneModel with upsert option "true".
      My dataframe has fields "_id", "merchant_code","name" ... But I get an error listed as below.

      Caused by: java.lang.IllegalArgumentException: Invalid BSON field name merchant_code
      at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:516)
      at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:114)
      at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
      at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:85)
      at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:43)
      at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
      at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
      at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:220)
      at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
      at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64)
      at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37)
      at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
      at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
      at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143)
      at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:490)
      at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:656)
      at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:409)
      at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:177)
      at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
      at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:422)
      at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:413)
      at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
      at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
      at com.mongodb.Mongo.execute(Mongo.java:845)
      at com.mongodb.Mongo$2.execute(Mongo.java:828)
      at com.mongodb.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:301)
      at com.mongodb.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:248)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3$$anonfun$apply$4.apply(MongoSpark.scala:175)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3$$anonfun$apply$4.apply(MongoSpark.scala:167)
      at scala.collection.Iterator$class.foreach(Iterator.scala:893)
      at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:167)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:166)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
      at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
      at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
      at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:166)
      at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:165)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
      at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
      at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
      at org.apache.spark.scheduler.Task.run(Task.scala:99)
      at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
      at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
      at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
      at java.lang.Thread.run(Thread.java:745)

      I don't understand what happened.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            pafvell pafvell
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: