Summary
Currently the source connector converts all change stream documents to extended Json strings before publishing to a topic.
This requires users to use the org.apache.kafka.connect.storage.StringConverter converter which has some space / efficiency costs in Kafka (especially when it comes to storage). There are various other converters that allow the kafka topic subscriber to receive events in the format of their choice.
The MongoDB Source Connector should allow users to configure a number of converters such as io.confluent.connect.avro.AvroConverter, org.apache.kafka.connect.json.JsonConverter or org.apache.kafka.connect.converters.ByteArrayConverter.
Change stream events have a defined schema as such but contain a number of optional fields:
{ _id : { <BSON Object> }, "operationType" : "<operation>", "fullDocument" : { <document> }, "ns" : { "db" : "<database>", "coll" : "<collection>" }, "to" : { "db" : "<database>", "coll" : "<collection>" }, "documentKey" : { "_id" : <value> }, "updateDescription" : { "updatedFields" : { <document> }, "removedFields" : [ "<field>", ... ] } "clusterTime" : <Timestamp>, "txnNumber" : <NumberLong>, "lsid" : { "id" : <UUID>, "uid" : <BinData> } }
The main complexities will be deciding how to handle dynamic document data: fullDocument, documentKey and updateDescription.updatedFields.
A secondary aim will be to ensure users can access data from the ChangeStream in a way that is easily consumable outside of MongoDB: KAFKA-99