Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-175

Inferring schema should support variable types for uses with Json with Schema.

    • Type: Icon: Improvement Improvement
    • Resolution: Works as Designed
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • None

      Schema inference uses the base type when determining the schema for arrays. So when sourcing the following document structure:

      {
          "L1": {
            "L2": {
              "L3": [ {"V2": {"K1": 0},"K1": 0},  {"V5": ["A1", "A2"], "V11": 1} ]
            }
          }
        }
      

      The type of L3 is Array with a value type of Schema.STRING:

        "fullDocument": {
          "_id": "5fb67d988f8729ab566e4f6b",
          "L1": {
            "L2": {
              "L3": [ "{\"V2\": {\"K1\": 0}, \"K1\": 0}","{\"V5\": [\"A1\", \"A2\"], \"V11\": 1}" ]
            }
          }
        },
      

      Configuration:

      {
        "key.converter.schemas.enable": "false",
        "value.converter.schemas.enable": "false",
        "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
        "tasks.max": "1",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "errors.log.enable": "true",
        "errors.log.include.messages": "true",
        "connection.uri":"CONECTIONSTRING",
        "database": "testdb",
        "collection": "testcol",
        "topic.prefix": "test-prefix",
        "output.format.key": "json",
        "output.format.value": "schema",
        "output.schema.infer.value": "true",
        "output.json.formatter": "com.mongodb.kafka.connect.source.json.formatter.SimplifiedJson",
        "copy.existing": "true"
      }
      

      Json schemas do allow variable object types for Structs and Arrays: Array Compatibility. So when output.schema.infer.value=true then when providing schema for Json with schema then there should be no use of a Base type. Note this will require an extra configuration eg: "output.schema.infer.compatibility:[none|all]" - default to all compatibility to keep the current behaviour.

      For reference see:
      https://developer.mongodb.com/community/forums/t/array-of-objects-become-array-of-string-during-upload-to-kafka/11509/3

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            robert.walters@mongodb.com Robert Walters (Inactive)
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: