-
Type: Task
-
Resolution: Won't Fix
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.0.0
-
Component/s: Performance
-
Environment:spark 2.1.0
mongodb 3.4.1
I want to implement batch update mongodb through spark mongodb connector.
I change the method save[D](dataset: Dataset[D], writeConfig: WriteConfig) of the object com.mongodb.spark.MongoSpark. In my issue, I need update operation, so I change the worj "case Some(_id) => new ReplaceOneModel[BsonDocument](new BsonDocument("_id", _id), doc, updateOptions)" into UpdateOneModel with upsert option "true".
My dataframe has fields "_id", "merchant_code","name" ... But I get an error listed as below.
Caused by: java.lang.IllegalArgumentException: Invalid BSON field name merchant_code
at org.bson.AbstractBsonWriter.writeName(AbstractBsonWriter.java:516)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:114)
at org.bson.codecs.BsonDocumentCodec.encode(BsonDocumentCodec.java:41)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:85)
at com.mongodb.connection.UpdateCommandMessage.writeTheWrites(UpdateCommandMessage.java:43)
at com.mongodb.connection.BaseWriteCommandMessage.encodeMessageBodyWithMetadata(BaseWriteCommandMessage.java:129)
at com.mongodb.connection.RequestMessage.encodeWithMetadata(RequestMessage.java:160)
at com.mongodb.connection.WriteCommandProtocol.sendMessage(WriteCommandProtocol.java:220)
at com.mongodb.connection.WriteCommandProtocol.execute(WriteCommandProtocol.java:101)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:64)
at com.mongodb.connection.UpdateCommandProtocol.execute(UpdateCommandProtocol.java:37)
at com.mongodb.connection.DefaultServer$DefaultServerProtocolExecutor.execute(DefaultServer.java:168)
at com.mongodb.connection.DefaultServerConnection.executeProtocol(DefaultServerConnection.java:289)
at com.mongodb.connection.DefaultServerConnection.updateCommand(DefaultServerConnection.java:143)
at com.mongodb.operation.MixedBulkWriteOperation$Run$3.executeWriteCommandProtocol(MixedBulkWriteOperation.java:490)
at com.mongodb.operation.MixedBulkWriteOperation$Run$RunExecutor.execute(MixedBulkWriteOperation.java:656)
at com.mongodb.operation.MixedBulkWriteOperation$Run.execute(MixedBulkWriteOperation.java:409)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:177)
at com.mongodb.operation.MixedBulkWriteOperation$1.call(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.OperationHelper.withConnectionSource(OperationHelper.java:422)
at com.mongodb.operation.OperationHelper.withConnection(OperationHelper.java:413)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:168)
at com.mongodb.operation.MixedBulkWriteOperation.execute(MixedBulkWriteOperation.java:74)
at com.mongodb.Mongo.execute(Mongo.java:845)
at com.mongodb.Mongo$2.execute(Mongo.java:828)
at com.mongodb.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:301)
at com.mongodb.MongoCollectionImpl.bulkWrite(MongoCollectionImpl.java:248)
at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3$$anonfun$apply$4.apply(MongoSpark.scala:175)
at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3$$anonfun$apply$4.apply(MongoSpark.scala:167)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:167)
at com.mongodb.spark.MongoSpark$$anonfun$save$3$$anonfun$apply$3.apply(MongoSpark.scala:166)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186)
at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154)
at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171)
at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184)
at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:166)
at com.mongodb.spark.MongoSpark$$anonfun$save$3.apply(MongoSpark.scala:165)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$29.apply(RDD.scala:925)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I don't understand what happened.