-
Type: Task
-
Resolution: Done
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.4.1
-
Component/s: Configuration
-
None
-
Environment:Ubuntu 18.04, MongoDB 4.0.6, Spark 2.4.4, Scala 2.11.12, mongo-spark-connector 2.11-2.4.1
Spark gets stuck for 30s until it timeouts when I try to connect to MongoDB using SSL (ssl=true). I have successfully imported server private key and CA certificate into Java Trust and Key Stores. I am using PySpark.
This is my code for importing a collection into Spark:
from pyspark import SparkContext from pyspark.sql import DataFrameReader, SQLContext sc = SparkContext('local', 'script') sqlContext = SQLContext(sc) mongo_url = 'mongodb://<user>:<pass>@<host>:<port>/<database>.<collection>?replicaSet=replica-set-name&ssl=true&authSource=admin&readPreference=nearest&authMechanism=SCRAM-SHA-1 mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load()
This is the command I use to run pyspark:
spark-submit \ --driver-java-options -Djavax.net.ssl.trustStore=/path/to/truststore.ks \ --driver-java-options -Djavax.net.ssl.trustStorePassword=tspassword \ --driver-java-options -Djavax.net.ssl.keyStore=/path/to/keystore.ks \ --driver-java-options -Djavax.net.ssl.keyStorePassword=kspassword \ --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStore=/path/to/truststore.ks \ --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.trustStorePassword=tspassword \ --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStore=/path/to/keystore.ks \ --conf spark.executor.extraJavaOptions=--Djavax.net.ssl.keyStorePassword=kspassword \ --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1 \ script.py
Finally, this is the output I am getting:
19/11/18 14:22:40 INFO SparkContext: Created broadcast 0 from broadcast at MongoSpark.scala:542 19/11/18 14:22:40 INFO cluster: Cluster created with settings {hosts=[<host>:<port>], mode=MULTIPLE, requiredClusterType=REPLICA_SET, serverSelectionTimeout='30000 ms', maxWaitQueueSize=500, requiredReplicaSetName='replica-set-name'} 19/11/18 14:22:40 INFO cluster: Adding discovered server mongodb-data-1.haip.io:27017 to client view of cluster 19/11/18 14:22:40 INFO MongoClientCache: Creating MongoClient: [<host>:<port>] 19/11/18 14:22:40 INFO cluster: No server chosen by com.mongodb.client.internal.MongoClientDelegate$1@5e46f5b5 from cluster description ClusterDescription{type=REPLICA_SET, connectionMode=MULTIPLE, serverDescriptions=[ServerDescription{address=<host>:<port>, type=UNKNOWN, state=CONNECTING}]}. Waiting for 30000 ms before timing out 19/11/18 14:22:41 INFO cluster: Exception in monitor thread while connecting to server <host>:<port> com.mongodb.MongoSocketReadException: Prematurely reached end of stream at com.mongodb.internal.connection.SocketStream.read(SocketStream.java:112) at com.mongodb.internal.connection.InternalStreamConnection.receiveResponseBuffers(InternalStreamConnection.java:580) at com.mongodb.internal.connection.InternalStreamConnection.receiveMessage(InternalStreamConnection.java:445) at com.mongodb.internal.connection.InternalStreamConnection.receiveCommandMessageResponse(InternalStreamConnection.java:299) at com.mongodb.internal.connection.InternalStreamConnection.sendAndReceive(InternalStreamConnection.java:259) at com.mongodb.internal.connection.CommandHelper.sendAndReceive(CommandHelper.java:83) at com.mongodb.internal.connection.CommandHelper.executeCommand(CommandHelper.java:33) at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initializeConnectionDescription(InternalStreamConnectionInitializer.java:105) at com.mongodb.internal.connection.InternalStreamConnectionInitializer.initialize(InternalStreamConnectionInitializer.java:62) at com.mongodb.internal.connection.InternalStreamConnection.open(InternalStreamConnection.java:129) at com.mongodb.internal.connection.DefaultServerMonitor$ServerMonitorRunnable.run(DefaultServerMonitor.java:117) at java.lang.Thread.run(Thread.java:748) Traceback (most recent call last): File "script.py", line 16, in <module> mongo_df = sqlContext.read.format('com.mongodb.spark.sql.DefaultSource').option('uri', mongo_url).load() File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 172, in load File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__ File "/usr/local/spark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 63, in deco File "/usr/local/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py", line 328, in get_return_value py4j.protocol.Py4JJavaError: An error occurred while calling o35.load. : com.mongodb.MongoTimeoutException: Timed out after 30000 ms while waiting for a server that matches com.mongodb.client.internal.MongoClientDelegate$1@5e46f5b5. Client view of cluster state is {type=REPLICA_SET, servers=[{address=<host>:<port>, type=UNKNOWN, state=CONNECTING, exception={com.mongodb.MongoSocketReadException: Prematurely reached end of stream}}] at com.mongodb.internal.connection.BaseCluster.createTimeoutException(BaseCluster.java:408) at com.mongodb.internal.connection.BaseCluster.selectServer(BaseCluster.java:123) at com.mongodb.internal.connection.AbstractMultiServerCluster.selectServer(AbstractMultiServerCluster.java:54) at com.mongodb.client.internal.MongoClientDelegate.getConnectedClusterDescription(MongoClientDelegate.java:147) at com.mongodb.client.internal.MongoClientDelegate.createClientSession(MongoClientDelegate.java:100) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.getClientSession(MongoClientDelegate.java:277) at com.mongodb.client.internal.MongoClientDelegate$DelegateOperationExecutor.execute(MongoClientDelegate.java:181) at com.mongodb.client.internal.MongoDatabaseImpl.executeCommand(MongoDatabaseImpl.java:186) at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:155) at com.mongodb.client.internal.MongoDatabaseImpl.runCommand(MongoDatabaseImpl.java:150) at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237) at com.mongodb.spark.MongoConnector$$anonfun$1.apply(MongoConnector.scala:237) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174) at com.mongodb.spark.MongoConnector$$anonfun$withDatabaseDo$1.apply(MongoConnector.scala:174) at com.mongodb.spark.MongoConnector.withMongoClientDo(MongoConnector.scala:157) at com.mongodb.spark.MongoConnector.withDatabaseDo(MongoConnector.scala:174) at com.mongodb.spark.MongoConnector.hasSampleAggregateOperator(MongoConnector.scala:237) at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator$lzycompute(MongoRDD.scala:221) at com.mongodb.spark.rdd.MongoRDD.hasSampleAggregateOperator(MongoRDD.scala:221) at com.mongodb.spark.sql.MongoInferSchema$.apply(MongoInferSchema.scala:68) at com.mongodb.spark.sql.DefaultSource.constructRelation(DefaultSource.scala:97) at com.mongodb.spark.sql.DefaultSource.createRelation(DefaultSource.scala:50) at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318) at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211) at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167) at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.lang.reflect.Method.invoke(Method.java:498) at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244) at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357) at py4j.Gateway.invoke(Gateway.java:282) at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132) at py4j.commands.CallCommand.execute(CallCommand.java:79) at py4j.GatewayConnection.run(GatewayConnection.java:238) at java.lang.Thread.run(Thread.java:748) 19/11/18 14:23:10 INFO SparkContext: Invoking stop() from shutdown hook 19/11/18 14:23:10 INFO MongoClientCache: Closing MongoClient: [<host>:<port>]