-
Type: Bug
-
Resolution: Fixed
-
Priority: Unknown
-
Affects Version/s: 1.5.0
When using timestamp type, message written to mongodb is different with what we're receiving from Kafka Topic.
Connector config:
name=mongo-source connector.class=com.mongodb.kafka.connect.MongoSourceConnector tasks.max=1 # Connection and source configuration connection.uri=mongodb://localhost:27018 database=foo collection=baz # Output configuration output.schema.infer.value=true output.format.value=schema topic.prefix= topic.suffix= poll.max.batch.size=1000 poll.await.time.ms=5000 # Change stream options pipeline=[] batch.size=0 change.stream.full.document=updateLookup collation=
Input:
rs1:PRIMARY> db.baz.insert({"ts" : Timestamp(1620742106, 2)})
Output:
{ "schema": { "type": "struct", "fields": [ { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "_data" } ], "optional": true, "name": "_id", "field": "_id" }, { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "clusterTime" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "_id" } ], "optional": true, "name": "documentKey", "field": "documentKey" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "_id" }, { "type": "int64", "optional": true, "name": "org.apache.kafka.connect.data.Timestamp", "version": 1, "field": "ts" } ], "optional": true, "name": "fullDocument", "field": "fullDocument" }, { "type": "struct", "fields": [ { "type": "string", "optional": true, "field": "coll" }, { "type": "string", "optional": true, "field": "db" } ], "optional": true, "name": "ns", "field": "ns" }, { "type": "string", "optional": true, "field": "operationType" } ], "optional": false, "name": "default" }, "payload": { "_id": { "_data": "82609AA32D000000012B022C0100296E5A10047DB96B417494423E8B1A351D894EDFC246645F69640064609AA32D485F85468421F0F30004" }, "clusterTime": 1544382408, "documentKey": { "_id": "609aa32d485f85468421f0f3" }, "fullDocument": { "_id": "609aa32d485f85468421f0f3", "ts": 1539435408 }, "ns": { "coll": "baz", "db": "foo" }, "operationType": "insert" } }
The value of `ts` we're expecting is `1620742106000`, but we're getting `1539435408` instead.
After further investigation, timestamp value is overflown in this section https://github.com/mongodb/mongo-kafka/blob/621394f2197e31e0b6b07d8390bf6ee40e8cd501/src/main/java/com/mongodb/kafka/connect/source/schema/BsonValueToSchemaAndValue.java#L141
Testing with this function in `SchemaAndValueProducerTest.java`
@Test @DisplayName("test infer schema and value producer") void testInferSchemaAndValueProducer2() { Schema expectedSchema = SchemaBuilder.struct() .name(DEFAULT_FIELD_NAME) .field("timestamp", Timestamp.builder().optional().build()) .build(); SchemaAndValue expectedSchemaAndValue = new SchemaAndValue( expectedSchema, new Struct(expectedSchema).put("timestamp", new Date(1620742106000L))); SchemaAndValueProducer valueProducer = new InferSchemaAndValueProducer(SIMPLE_JSON_WRITER_SETTINGS); final String FULL_DOCUMENT_JSON = "{" + "\"timestamp\": {\"$timestamp\": {\"t\": 1620742106, \"i\": 2}} " + "}"; assertSchemaAndValueEquals( expectedSchemaAndValue, valueProducer.get(BsonDocument.parse(FULL_DOCUMENT_JSON))); }
yield this result
Changing the multiplier to `1000L` return expected epoch millis.