Uploaded image for project: 'Spark Connector'
  1. Spark Connector
  2. SPARK-382

Incorrectly infer schema of a change stream

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 10.2.0
    • Affects Version/s: 10.0.5
    • Component/s: None
    • None
    • Needed
    • Hide

      1. What would you like to communicate to the user about this feature?
      Spark streams now support schema inference when:
      change.stream.publish.full.document.only = true

      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?
      10.2.0

      Show
      1. What would you like to communicate to the user about this feature? Spark streams now support schema inference when: change.stream.publish.full.document.only = true 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to? 10.2.0

      What did I do

      I loaded some sample data to my mongodb database. Then I tried to read change stream of the collection in databricks

      df = (
        spark
        .readStream
        .format("mongodb")
        .option("database", database)
        .option("collection", collection)
        .option("connection.uri", connection_uri)
        .option("change.stream.lookup.full.document", "updateLookup")
        .load()
        .writeStream
        .format("memory")
        .trigger(continuous="1 second")
        .outputMode("append")
        .queryName("test")
        .start())
      

      Then I update some data and try to read the change stream with select * from test

      What did I see

      _id,access,accommodates,address,amenities,availability,bathrooms,bed_type,bedrooms,beds,calendar_last_scraped,cancellation_policy,cleaning_fee,description,extra_people,first_review,guests_included,host,house_rules,images,interaction,last_review,last_scraped,listing_url,maximum_nights,minimum_nights,monthly_price,name,neighborhood_overview,notes,number_of_reviews,price,property_type,review_scores,reviews,reviews_per_month,room_type,security_deposit,space,summary,transit,weekly_price
      "{""_data"": ""8263787959000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null
      "{""_data"": ""8263787A2A000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}",null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null,null
      

      Looks like the schema of df is the scheam of the collection, not the change stream.

      How do I fix it

      I can fix it by providing (part of the) change stream schema

      df_2 = (
        spark
        .readStream
        .format("mongodb")
        .option("database", database)
        .option("collection", collection)
        .option("connection.uri", connection_uri)
        .option("change.stream.lookup.full.document", "updateLookup")
        .schema(
          StructType()
          .add("_id", StringType())
          .add("clusterTime", StringType())
          .add("fullDocument", StringType())
        )
        .load()
        .writeStream
        .format("memory")
        .trigger(continuous="1 second")
        .outputMode("append")
        .queryName("test_2")
        .start()
      )
      

      With provided schema, some columns of the change stream is saved to the view correctly

      _id,clusterTime,fullDocument
      "{""_data"": ""8263787A2A000000032B022C0100296E5A1004DDF3C694505147BEBC3FD3E09AAA41F7463C5F6964003C3130303036353436000004""}","{""$timestamp"": {""t"": 1668839978, ""i"": 3}}","{""_id"": ""10006546"", ""listing_url"": ""https://www.airbnb.com/rooms/10006546"", ""name"": ""1_Ribeira Charming Duplex"", ""summary"": ""Fantastic duplex apartment with three bedrooms, located ......."
      

      What do I expect

      I should be able to read the change stream events without providing any schema.

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            me@kytse.com Kit Yam Tse
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: