-
Type: Bug
-
Resolution: Done
-
Priority: Major - P3
-
Affects Version/s: None
-
Component/s: Configuration
-
None
I need to change databases in my code but value of uri in ReadConfig is ignored, only the collection is changed. Do I have to rebuild the sparkContext every time I want to change databases in the code? I also looked at the Wiki on your github for the answer but was pointed to a MongoClientProvider that does not exist.
// Boot spark-shell with this config bin/spark-shell --master local[4] --jars /Users/rwilliams/bin/spark/lib/mysql-connector-java-5.1.39-bin.jar \ --conf "spark.mongodb.input.uri=mongodb://localhost/billing" \ --conf "spark.mongodb.input.collection=payment" \ --packages org.mongodb.spark:mongo-spark-connector_2.10:0.3 scala> val paymentsDf = MongoSpark.load(sqlContext) paymentsDf: org.apache.spark.sql.DataFrame = [_id: struct<oid:string>, checkAmount: double, checkDate: timestamp, checkNumber: string, checkType: string, claimCount: int, groupHeader: struct<idCode:string,sendersCode:string,receiversCode:string,groupDate:timestamp,groupTime:timestamp,controlNumber:string,releaseCode:string,isValid:boolean>, header: struct<receiverId:string,interchangeDate:timestamp,interchangeTime:timestamp,repetitionSeparator:string,versionNumber:string,controlNumber:string,usageIndicator:string,componentSeparator:string,isValid:boolean>, payeeId: string, payeeName: string, payerId: string, payerName: string, remitId: null, transactionSet: struct<claimDetails:array<struct<claimPayments:array<struct<amounts:array<struct<amount:double,qualifier:string>>,claimAdjustments:... // Works great (schema shortened for clarity) scala> paymentsDf.printSchema() root |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- checkAmount: double (nullable = true) |-- checkDate: timestamp (nullable = true) |-- checkNumber: string (nullable = true) |-- checkType: string (nullable = true) |-- claimCount: integer (nullable = true) |-- groupHeader: struct (nullable = true) | |-- idCode: string (nullable = true) | |-- sendersCode: string (nullable = true) // Now try to change databases (switching order between boot and this statement didn't help either scala> val claimsReadConfig = ReadConfig(Map( | "uri"->"mongodb://userWithClusterMangerRole:password@server1,server2,server3/database?replicaSet=rs0", | "collection" -> "claims" | )) claimsReadConfig: com.mongodb.spark.config.ReadConfig.Self = ReadConfig(database,claims,Some(mongodb://userWithClusterMangerRole:password@server1,server2,server3/database?replicaSet=rs0),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None)) scala> val claimsDf = sqlContext.read.mongo(claimsReadConfig) claimsDf: org.apache.spark.sql.DataFrame = [] // Shows up empty because there is no class collection in localhost, but there is on the server1,2,3 cluster scala> claimsDf.printSchema() root // It should be this as shown below from switching boot up spark bin/spark-shell --master local[4] --jars /Users/rwilliams/bin/spark/lib/mysql-connector-java-5.1.39-bin.jar \ --conf "spark.mongodb.input.uri=mongodb://userWithClusterManagerRole:password@server1,server2,server3/database?replicaSet=rs0" \ --conf "spark.mongodb.input.collection=claims" \ --packages org.mongodb.spark:mongo-spark-connector_2.10:0.3 scala> val claimsDf = MongoSpark.load(sqlContext) claimsDf: org.apache.spark.sql.DataFrame = [__v: int, _id: struct<oid:string>, billingSourceId: int, dateCollected: timestamp, dateCompleted: timestamp, diagCodes: array<string>, id: int, insuranceCode: string, insuranceName: string, insuranceProviders: array<struct<__v:int,_id:struct<oid:string>,active:boolean,address:string,address2:string,benefitPriority:int,city:string,claimsEnabled:boolean,code:string,contact:string,email:string,faxNumber:string,financialClass:string,name:string,notAcknowledged:boolean,payerGroup:string,phoneNumber:string,phoneNumber2:string,requiresPreauthorization:boolean,state:string,zip:string>>, panels: array<struct<code:string,name:string,specimenType:string,status:string,testType:string,tests:array<struct<__v:int,_id:struct<oid:string>,code:string,name:strin... scala> claimsDf.printSchema() root |-- __v: integer (nullable = true) |-- _id: struct (nullable = true) | |-- oid: string (nullable = true) |-- billingSourceId: integer (nullable = true) |-- dateCollected: timestamp (nullable = true) |-- dateCompleted: timestamp (nullable = true) |-- diagCodes: array (nullable = true) | |-- element: string (containsNull = true) |-- id: integer (nullable = true) |-- insuranceCode: string (nullable = true) |-- insuranceName: string (nullable = true) |-- insuranceProviders: array (nullable = true) | |-- element: struct (containsNull = true) | | |-- __v: integer (nullable = true) | | |-- _id: struct (nullable = true) | | | |-- oid: string (nullable = true) | | |-- active: boolean (nullable = true) | | |-- address: string (nullable = true) | | |-- address2: string (nullable = true)