-
Type: Bug
-
Resolution: Works as Designed
-
Priority: Unknown
-
None
-
Affects Version/s: None
I'm working with heavily irregular nested data, therefore it is not an option to create a schema by hand. Trying to read the collection and perform an action like df.count()
throws the com.mongodb.spark.exceptions.MongoTypeConversionException errror in 8 from 10 executions. Several attempts to fix the schema manually (fix_spark_schema)
did not work. Another attempt of setting the sampleSize and samplePoolSize manually did not resolve my problem either (the collection consists of ~11.800.000 rows).
From my research I came to the conclusion than when the schema is infered through sampling only a fraction of the data is looked at
if one sampled field is null it will infer NullType for it. Where I don't get my head around is that setting the sampleSize & samplePoolSize
to a higher number than the actuall collection size still can cause an error.
A workaround would be to execute the process till it randomly works and save the "working" schema to infer it in the future when reading the collection. Unfortunately, with a collection that grows every day by several 1000 rows this is not an option I feel comfortable with.
How do I consistently read my collection? Is there a way to take the whole collection as sample for inferring the schema?
def fix_spark_schema(schema): if schema.__class__ == StructType: return StructType([fix_spark_schema(f) for f in schema.fields]) if schema.__class__ == StructField: return StructField(schema.name, fix_spark_schema(schema.dataType), schema.nullable) if schema.__class__ == NullType: return StringType() return schema def read_mongo( collection, database): collection_schema = spark.read \ .format("mongo")\ .option('sampleSize', 1200000)\ .option('samplePoolSize',1200000)\ .option("database", database)\ .option("collection", collection)\ .load()\ .schema df = spark.read\ .format("mongo")\ .option("database", database)\ .option("collection", collection)\ .load(schema=fix_spark_schema(collection_schema), inferSchema=False) return df