Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /__ / .__/\_,_/_/ /_/\_\ version 2.4.0 /_/ Using Python version 3.6.7 (default, Dec 21 2018 20:31:01) SparkSession available as 'spark'. >>> # coding: utf8 ... import json >>> from pyspark.sql import SparkSession >>> >>> spark = SparkSession.builder.appName("TestCase").getOrCreate() >>> >>> # Raw data ... data = [ ... { ... "id": 1, ... "type": "good record", ... "attributes": {"params": "anyparam", "liste": ["a", "b", "c"]}, ... }, ... { ... "id": 2, ... "type": "bad record", ... "attributes": {"params": "anyparam", "liste": ["d", None, None, "e"]}, ... }, ... ] >>> print(data) [{'id': 1, 'type': 'good record', 'attributes': {'params': 'anyparam', 'liste': ['a', 'b', 'c']}}, {'id': 2, 'type': 'bad record', 'attributes': {'params': 'anyparam', 'liste': ['d', None, None, 'e']}}] >>> >>> # Loading data in RDD ... rdd_json = sc.parallelize(data) >>> rdd_json.take(2) [{'id': 1, 'type': 'good record', 'attributes': {'params': 'anyparam', 'liste': ['a', 'b', 'c']}}, {'id': 2, 'type': 'bad record', 'attributes': {'params': 'anyparam', 'liste': ['d', None, None, 'e']}}] >>> >>> # Convert this data to replace python None values in json null ... rdd_json2 = rdd_json.map(lambda x: json.dumps(x)) >>> rdd_json2.take(2) ['{"id": 1, "type": "good record", "attributes": {"params": "anyparam", "liste": ["a", "b", "c"]}}', '{"id": 2, "type": "bad record", "attributes": {"params": "anyparam", "liste": ["d", null, null, "e"]}}'] >>> >>> # Transform my rdd to dataframe ... df = spark.read.json(rdd_json2, multiLine=True) >>> df.printSchema() root |-- attributes: struct (nullable = true) | |-- liste: array (nullable = true) | | |-- element: string (containsNull = true) | |-- params: string (nullable = true) |-- id: long (nullable = true) |-- type: string (nullable = true) >>> df.take(2) [Row(attributes=Row(liste=['a', 'b', 'c'], params='anyparam'), id=1, type='good record'), Row(attributes=Row(liste=['d', None, None, 'e'], params='anyparam'), id=2, type='bad record')] >>> >>> # Save it to mongodb ... df.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option( ... "database", "mydb" ... ).option("collection", "test_case").save() [Stage 10:===========================================> (6 + 2) / 8]19/02/27 12:08:33 WARN TaskSetManager: Lost task 7.0 in stage 10.0 (TID 39, host, executor 1): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast null into a StringType at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:87) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) 19/02/27 12:08:34 ERROR TaskSetManager: Task 7 in stage 10.0 failed 4 times; aborting job [Stage 10:==================================================> (7 + 0) / 8]Traceback (most recent call last): File "", line 4, in File "/usr/lib/spark/python/pyspark/sql/readwriter.py", line 734, in save self._jwrite.save() File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/lib/spark/python/pyspark/sql/utils.py", line 63, in deco return f(*a, **kw) File "/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o162.save. : org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 10.0 failed 4 times, most recent failure: Lost task 7.3 in stage 10.0 (TID 42, host, executor 1): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast null into a StringType at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:87) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) Driver stacktrace: at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:2039) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2027) at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:2026) at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48) at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2026) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:966) at scala.Option.foreach(Option.scala:257) at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:966) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2260) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2209) at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2198) at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49) at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2101) at org.apache.spark.SparkContext.runJob(SparkContext.scala:2126) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1.apply(RDD.scala:933) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112) at org.apache.spark.rdd.RDD.withScope(RDD.scala:363) at org.apache.spark.rdd.RDD.foreachPartition(RDD.scala:933) at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:117) at com.mongodb.spark.MongoSpark$.save(MongoSpark.scala:159) at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:70) at org.apache.spark.sql.execution.datasources.SaveIntoDataSourceCommand.run(SaveIntoDataSourceCommand.scala:45) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70) at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68) at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:86) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131) at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127) at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:155) at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151) at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152) at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127) at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80) at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:668) at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78) at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125) at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73) at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:668) at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:276) at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:270) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast null into a StringType at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:87) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12$$anonfun$apply$9.apply(MapFunctions.scala:158) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:35) at scala.collection.TraversableLike$class.map(TraversableLike.scala:234) at scala.collection.AbstractTraversable.map(Traversable.scala:104) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$12.apply(MapFunctions.scala:158) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$arrayTypeToBsonValueMapper$1.apply(MapFunctions.scala:161) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$9.apply(MapFunctions.scala:103) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$dataTypeToBsonValueMapper$10.apply(MapFunctions.scala:107) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1$$anonfun$5.apply(MapFunctions.scala:84) at scala.util.Try$.apply(Try.scala:192) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:84) at com.mongodb.spark.sql.MapFunctions$$anonfun$com$mongodb$spark$sql$MapFunctions$$wrappedDataTypeToBsonValueMapper$1.apply(MapFunctions.scala:83) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$4$$anonfun$apply$2.apply(MapFunctions.scala:64) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:75) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1$$anonfun$apply$3.apply(MapFunctions.scala:72) at scala.collection.IndexedSeqOptimized$class.foreach(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:186) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:72) at com.mongodb.spark.sql.MapFunctions$$anonfun$rowToDocumentMapper$1.apply(MapFunctions.scala:68) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at com.mongodb.spark.MongoSpark$$anonfun$1.apply(MongoSpark.scala:154) at scala.collection.Iterator$$anon$11.next(Iterator.scala:410) at scala.collection.Iterator$GroupedIterator.takeDestructively(Iterator.scala:1074) at scala.collection.Iterator$GroupedIterator.go(Iterator.scala:1089) at scala.collection.Iterator$GroupedIterator.fill(Iterator.scala:1126) at scala.collection.Iterator$GroupedIterator.hasNext(Iterator.scala:1130) at scala.collection.Iterator$class.foreach(Iterator.scala:891) at scala.collection.AbstractIterator.foreach(Iterator.scala:1334) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:119) at com.mongodb.spark.MongoSpark$$anonfun$save$1$$anonfun$apply$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:186) at com.mongodb.spark.MongoConnector$$anonfun$withCollectionDo$1.apply(MongoConnector.scala:184) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:154) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:171) at com.mongodb.spark.MongoConnector.withCollectionDo(MongoConnector.scala:184) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:118) at com.mongodb.spark.MongoSpark$$anonfun$save$1.apply(MongoSpark.scala:117) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.rdd.RDD$$anonfun$foreachPartition$1$$anonfun$apply$28.apply(RDD.scala:935) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:2101) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90) at org.apache.spark.scheduler.Task.run(Task.scala:121) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:402) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:408) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) ... 1 more