Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-269

com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast STRING into a IntegerType (value: BsonString{value='Success'})

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 2.4.1
    • Component/s: Reads, Schema
    • Environment:
      aws emr 5.29.0 application spark
      Python 3.6.8 (default, Oct 14 2019, 21:22:53)
      [GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux

      Despite reading the schema with sampling Ratio 1.0 and even specifying the samplesize to be total number of documents in the pipeline, mongo spark connector is inferring schema wrong and throwing cast exception.

      The error does not even point out which field or document the error is present.

       

      How I log to pyspark-shell in aws emr 5.29.0 with spark application:

       

      pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1
      

       

       

      How I am reading data:

       

      sqlContext.sql("set spark.sql.caseSensitive=true")
      pipeline="{'$match': {'event_date_time': {$gte: ISODate('2019-12-16T00:00:00Z'), $lt: ISODate('2019-12-17T00:00:00Z')} }}"
      df = spark.read \    
      .option("samplingRatio", "1.0") \    
      .format("mongo") \    
      .option("uri", "mongodb://<connection-uri>") \    
      .option("sampleSize", 1898597) \    
      .option("pipeline", pipeline)\    
      .load()
      

       

      After that simple

       

      df.count()
      

      throws this error.

       

       

      In this case, error was present in integer field in nested structure, i dropped all integer columns, after flattening like this:

       

      def flatten_df(nested_df):
          flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct']
          nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct']
          flat_df = nested_df.select(flat_cols +
                                     [F.col(nc+'.'+c).alias(nc+'__'+c)
                                      for nc in nested_cols
                                      for c in nested_df.select(nc+'.*').columns])
          return flat_df
      
      df=flatten_df(df)
      
      
      for colName, colType in df.dtypes:
        if colType.startswith("int"):
          print(colName, colType)
          df=df.drop(df[colName])
      
      

      still it throws the same error

       

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            prakhar.jain@vogo.in Prakhar Jain
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: