-
Type: Bug
-
Resolution: Works as Designed
-
Priority: Major - P3
-
None
-
Affects Version/s: 2.4.1
-
Environment:aws emr 5.29.0 application spark
Python 3.6.8 (default, Oct 14 2019, 21:22:53)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-28)] on linux
Despite reading the schema with sampling Ratio 1.0 and even specifying the samplesize to be total number of documents in the pipeline, mongo spark connector is inferring schema wrong and throwing cast exception.
The error does not even point out which field or document the error is present.
How I log to pyspark-shell in aws emr 5.29.0 with spark application:
pyspark --packages org.mongodb.spark:mongo-spark-connector_2.11:2.4.1
How I am reading data:
sqlContext.sql("set spark.sql.caseSensitive=true") pipeline="{'$match': {'event_date_time': {$gte: ISODate('2019-12-16T00:00:00Z'), $lt: ISODate('2019-12-17T00:00:00Z')} }}" df = spark.read \ .option("samplingRatio", "1.0") \ .format("mongo") \ .option("uri", "mongodb://<connection-uri>") \ .option("sampleSize", 1898597) \ .option("pipeline", pipeline)\ .load()
After that simple
df.count()
throws this error.
In this case, error was present in integer field in nested structure, i dropped all integer columns, after flattening like this:
def flatten_df(nested_df): flat_cols = [c[0] for c in nested_df.dtypes if c[1][:6] != 'struct'] nested_cols = [c[0] for c in nested_df.dtypes if c[1][:6] == 'struct'] flat_df = nested_df.select(flat_cols + [F.col(nc+'.'+c).alias(nc+'__'+c) for nc in nested_cols for c in nested_df.select(nc+'.*').columns]) return flat_df df=flatten_df(df) for colName, colType in df.dtypes: if colType.startswith("int"): print(colName, colType) df=df.drop(df[colName])
still it throws the same error