I am testing source and sink MongoDB kafka connector and after it completes init sync and when it start reading from oplog using change streams, I get below failure and stops copying new changes from source. Please take a look.
SourceConnector config:
curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" localhost:9083/connectors/ --data '{
"name":"mongo-source-assets-shard1oplog2",
"config": {
"connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
"key.converter":"org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable":"false",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"connection.uri":"mongodb://xxx.xxx.xxx.xxx:27017",
"database":"oz_next",
"collection":"assets",
"publish.full.document.only":"true",
"topic.prefix":"oplog.oz_mongo",
"batch.size":"5000",
"copy.existing":"true",
"copy.existing.max.threads":"3",
"copy.existing.queue.size":"64000"}
}'
sinkCConector:
curl -X POST -H "Accept:application/json" -H "Content-Type: application/json" localhost:9083/connectors/ --data '{
"name":"mongo-sink-assets-shard1oplog2",
"config":
{ "topics":"oplog.oz_mongo.oz_next.assets", "connector.class":"com.mongodb.kafka.connect.MongoSinkConnector", "tasks.max":"1", "key.converter":"org.apache.kafka.connect.json.JsonConverter", "key.converter.schemas.enable":"false", "value.converter":"org.apache.kafka.connect.json.JsonConverter", "value.converter.schemas.enable":"false", "connection.uri":"mongodb://10.74.3.104:27017", "database":"poc_oz_next", "collection":"poc_assets", "max.num.retries":"3", "retries.defer.timeout":"5000", "session.timeout.ms":"25000"}}'
connector log:
[2020-05-29 08:40:55,565] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 8872 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:05,566] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-05-29 08:41:05,566] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} flushing 4873 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-05-29 08:41:13,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 8315 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:23,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:416)
[2020-05-29 08:41:23,881] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} flushing 4604 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:433)
[2020-05-29 08:41:31,322] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:31,326] INFO Resuming the change stream after the previous offset (com.mongodb.kafka.connect.source.MongoSourceTask:234)
[2020-05-29 08:41:31,328] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:31,331] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
[2020-05-29 08:41:31,333] INFO Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: "52b8348a4b0b1571cbf199af87458512", copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask:253)
[2020-05-29 08:41:32,954] INFO WorkerSourceTask{id=mongo-source-assets-shard1oplog2-0} Finished commitOffsets successfully in 9073 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:515)
[2020-05-29 08:41:35,328] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:35,331] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
[2020-05-29 08:41:35,333] INFO Failed to resume change stream: Bad resume token: _data of missing or of wrong type{_id: "7ed0cc7a09af100edc4db27f968231f9", copyingData: true} 40647 (com.mongodb.kafka.connect.source.MongoSourceTask:253)
[2020-05-29 08:41:36,330] INFO Watching for collection changes on 'oz_next.assets' (com.mongodb.kafka.connect.source.MongoSourceTask:374)
[2020-05-29 08:41:36,334] INFO Resuming the change stream after the previous offset using resumeAfter (com.mongodb.kafka.connect.source.MongoSourceTask:237)
Connection to 10.74.3.79 closed by remote host. change stream: Bad resume token: _data of missing or of wrong type{_id: "7ed0cc7a09af100edc4db27f96823Connection to 10.74.3.79 closed.com.mongodb.kafka.connect.source.MongoSourceTask:253)
- backports
-
KAFKA-311 Failed to resume change stream - Bad resume token
- Closed