Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-307

irregular nested data - com.mongodb.spark.exceptions.MongoTypeConversionException

    • Type: Icon: Bug Bug
    • Resolution: Works as Designed
    • Priority: Icon: Unknown Unknown
    • None
    • Affects Version/s: None
    • Component/s: Reads, Schema

      I'm working with heavily irregular nested data, therefore it is not an option to create a schema by hand. Trying to read the collection and perform an action like df.count()
      throws the com.mongodb.spark.exceptions.MongoTypeConversionException errror in 8 from 10 executions. Several attempts to fix the schema manually (fix_spark_schema)
      did not work. Another attempt of setting the sampleSize and samplePoolSize manually did not resolve my problem either (the collection consists of ~11.800.000 rows). 

      From my research I came to the conclusion than when the schema is infered through sampling only a fraction of the data is looked at
      if one sampled field is null it will infer NullType for it. Where I don't get my head around is that setting the sampleSize & samplePoolSize
      to a higher number than the actuall collection size still can cause an error.

      A workaround would be to execute the process till it randomly works and save the "working" schema to infer it in the future when reading the collection. Unfortunately, with a collection that grows every day by several 1000 rows this is not an option I feel comfortable with.

      How do I consistently read my collection? Is there a way to take the whole collection as sample for inferring the schema? 

       def fix_spark_schema(schema): 
        if schema.__class__ == StructType:
          return StructType([fix_spark_schema(f) for f in schema.fields])  if    schema.__class__ == StructField:
          return StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable)  if schema.__class__ == NullType:
          return StringType()  
      return schema
      
      def read_mongo( collection,  database):  
          collection_schema = spark.read \
                      .format("mongo")\
                      .option('sampleSize', 1200000)\
                      .option('samplePoolSize',1200000)\
                      .option("database", database)\
                      .option("collection", collection)\
                      .load()\
                      .schema
          
          df = spark.read\
                      .format("mongo")\
                      .option("database", database)\
                      .option("collection", collection)\
                      .load(schema=fix_spark_schema(collection_schema), inferSchema=False)
          
          return df
      

       

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            mo@buffl.co Mo ritz
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: