What did I do
I loaded some sample data to my mongodb database. Then I tried to read change stream of the collection in databricks
df = ( spark .readStream .format("mongodb") .option("database", database) .option("collection", collection) .option("connection.uri", connection_uri) .option("change.stream.lookup.full.document", "updateLookup") .load() .writeStream .format("memory") .trigger(continuous="1 second") .outputMode("append") .queryName("test") .start())
Then I update some data and try to read the change stream with select * from test
What did I see
_id,access,accommodates,address,amenities,availability,bathrooms,bed_type,bedrooms,beds,calendar_last_scraped,cancellation_policy,cleaning_fee,description,extra_people,first_review,guests_included,host,house_rules,images,interaction,last_review,last_scraped,listing_url,maximum_nights,minimum_nights,monthly_price,name,neighborhood_overview,notes,number_of_reviews,price,property_type,review_scores,reviews,reviews_per_month,room_type,security_deposit,space,summary,transit,weekly_price "{""_data"": ""8263787959000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null "{""_data"": ""8263787A2A000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null
Looks like the schema of df is the scheam of the collection, not the change stream.
How do I fix it
I can fix it by providing (part of the) change stream schema
df_2 = ( spark .readStream .format("mongodb") .option("database", database) .option("collection", collection) .option("connection.uri", connection_uri) .option("change.stream.lookup.full.document", "updateLookup") .schema( StructType() .add("_id", StringType()) .add("clusterTime", StringType()) .add("fullDocument", StringType()) ) .load() .writeStream .format("memory") .trigger(continuous="1 second") .outputMode("append") .queryName("test_2") .start() )
With provided schema, some columns of the change stream is saved to the view correctly
_id,clusterTime,fullDocument "{""_data"": ""8263787A2A000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}","{""$timestamp"": {""t"": 1668839978, ""i"": 3}}","{""_id"": ""10006546"", ""listing_url"": ""https://www.airbnb.com/rooms/10006546"", ""name"": ""1_Ribeira Charming Duplex"", ""summary"": ""Fantastic duplex apartment with three bedrooms, located ......."
What do I expect
I should be able to read the change stream events without providing any schema.