Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-61

Can't change between database servers with ReadConfig

    • Type: Icon: Bug Bug
    • Resolution: Done
    • Priority: Icon: Major - P3 Major - P3
    • 0.4
    • Affects Version/s: None
    • Component/s: Configuration
    • None

      I need to change databases in my code but value of uri in ReadConfig is ignored, only the collection is changed. Do I have to rebuild the sparkContext every time I want to change databases in the code? I also looked at the Wiki on your github for the answer but was pointed to a MongoClientProvider that does not exist.

      // Boot spark-shell with this config
      bin/spark-shell --master local[4] --jars /Users/rwilliams/bin/spark/lib/mysql-connector-java-5.1.39-bin.jar \
                        --conf "spark.mongodb.input.uri=mongodb://localhost/billing" \
                        --conf "spark.mongodb.input.collection=payment" \
                        --packages org.mongodb.spark:mongo-spark-connector_2.10:0.3
      
      scala> val paymentsDf = MongoSpark.load(sqlContext)
      paymentsDf: org.apache.spark.sql.DataFrame = [_id: struct<oid:string>, checkAmount: double, checkDate: timestamp, checkNumber: string, checkType: string, claimCount: int, groupHeader: struct<idCode:string,sendersCode:string,receiversCode:string,groupDate:timestamp,groupTime:timestamp,controlNumber:string,releaseCode:string,isValid:boolean>, header: struct<receiverId:string,interchangeDate:timestamp,interchangeTime:timestamp,repetitionSeparator:string,versionNumber:string,controlNumber:string,usageIndicator:string,componentSeparator:string,isValid:boolean>, payeeId: string, payeeName: string, payerId: string, payerName: string, remitId: null, transactionSet: struct<claimDetails:array<struct<claimPayments:array<struct<amounts:array<struct<amount:double,qualifier:string>>,claimAdjustments:...
      
      // Works great (schema shortened for clarity)
      scala> paymentsDf.printSchema()
      root
       |-- _id: struct (nullable = true)
       |    |-- oid: string (nullable = true)
       |-- checkAmount: double (nullable = true)
       |-- checkDate: timestamp (nullable = true)
       |-- checkNumber: string (nullable = true)
       |-- checkType: string (nullable = true)
       |-- claimCount: integer (nullable = true)
       |-- groupHeader: struct (nullable = true)
       |    |-- idCode: string (nullable = true)
       |    |-- sendersCode: string (nullable = true)
      
      // Now try to change databases (switching order between boot and this statement didn't help either
      
      scala> val claimsReadConfig = ReadConfig(Map(
           |                            "uri"->"mongodb://userWithClusterMangerRole:password@server1,server2,server3/database?replicaSet=rs0",
           |                            "collection" -> "claims"
           |                        ))
      claimsReadConfig: com.mongodb.spark.config.ReadConfig.Self = ReadConfig(database,claims,Some(mongodb://userWithClusterMangerRole:password@server1,server2,server3/database?replicaSet=rs0),1000,64,_id,15,ReadPreferenceConfig(primary,None),ReadConcernConfig(None))
      
      scala> val claimsDf = sqlContext.read.mongo(claimsReadConfig)
      claimsDf: org.apache.spark.sql.DataFrame = []
      
      // Shows up empty because there is no class collection in localhost, but there is on the server1,2,3 cluster
      scala> claimsDf.printSchema()
      root
      
      // It should be this as shown below from switching boot up
      spark bin/spark-shell --master local[4] --jars /Users/rwilliams/bin/spark/lib/mysql-connector-java-5.1.39-bin.jar \
                        --conf "spark.mongodb.input.uri=mongodb://userWithClusterManagerRole:password@server1,server2,server3/database?replicaSet=rs0" \
                        --conf "spark.mongodb.input.collection=claims" \
                        --packages org.mongodb.spark:mongo-spark-connector_2.10:0.3
      
      scala> val claimsDf = MongoSpark.load(sqlContext)
      claimsDf: org.apache.spark.sql.DataFrame = [__v: int, _id: struct<oid:string>, billingSourceId: int, dateCollected: timestamp, dateCompleted: timestamp, diagCodes: array<string>, id: int, insuranceCode: string, insuranceName: string, insuranceProviders: array<struct<__v:int,_id:struct<oid:string>,active:boolean,address:string,address2:string,benefitPriority:int,city:string,claimsEnabled:boolean,code:string,contact:string,email:string,faxNumber:string,financialClass:string,name:string,notAcknowledged:boolean,payerGroup:string,phoneNumber:string,phoneNumber2:string,requiresPreauthorization:boolean,state:string,zip:string>>, panels: array<struct<code:string,name:string,specimenType:string,status:string,testType:string,tests:array<struct<__v:int,_id:struct<oid:string>,code:string,name:strin...
      scala> claimsDf.printSchema()
      root
       |-- __v: integer (nullable = true)
       |-- _id: struct (nullable = true)
       |    |-- oid: string (nullable = true)
       |-- billingSourceId: integer (nullable = true)
       |-- dateCollected: timestamp (nullable = true)
       |-- dateCompleted: timestamp (nullable = true)
       |-- diagCodes: array (nullable = true)
       |    |-- element: string (containsNull = true)
       |-- id: integer (nullable = true)
       |-- insuranceCode: string (nullable = true)
       |-- insuranceName: string (nullable = true)
       |-- insuranceProviders: array (nullable = true)
       |    |-- element: struct (containsNull = true)
       |    |    |-- __v: integer (nullable = true)
       |    |    |-- _id: struct (nullable = true)
       |    |    |    |-- oid: string (nullable = true)
       |    |    |-- active: boolean (nullable = true)
       |    |    |-- address: string (nullable = true)
       |    |    |-- address2: string (nullable = true)
      

            Assignee:
            Unassigned Unassigned
            Reporter:
            rmwilliams Richard Williams
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: