Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-325

Source Connector Stays Running When Falling Off Oplog

    • Type: Icon: Bug Bug
    • Resolution: Gone away
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 1.7.0
    • Component/s: Source
    • None

      In scenarios where the kafka connector falls off the oplog, it throws an exception indicating that it cannot resume the underlying change stream. However, when mongo.errors.tolerance is set to "none", we would expect this to cause the connector to enter a FAILED state, which may help draw attention to admins/operators that manual intervention is required. 

      However, it appears that the connector continues to be in a RUNNING state, and users will not be aware until symptoms begin to manifest in that system/data pipeline.  

      Example:

      Connector config:1

      {
          "connector.class":"com.mongodb.kafka.connect.MongoSourceConnector",
          "connection.uri":"mongodb+srv://abor-development-20211025:<EMAIL redactopus@example.com>/?readPreference=secondary",
          "database":"development",
          "collection":"ingestion",
          "publish.full.document.only":"true",
          "key.converter":"org.apache.kafka.connect.storage.StringConverter",
          "value.converter":"org.apache.kafka.connect.storage.StringConverter",
          "value.converter.schemas.enable":"false",
          "batch.size":"0",
          "offset.partition.name":"abor-ingestion-1",
          "heartbeat.interval.ms":"60000",
          "poll.await.time.ms":"5000",
          "poll.max.batch.size":"1000",
          "tasks.max":"1",
          "pipeline":"[{\"$match\": { \"$and\": [ {\"fullDocument.pk_credit\":{$ne:null}}, {\"fullDocument.pk_debit\":{$ne:null}} ] } }]",
          "topic.namespace.map":"{ \"development.ingestion\": \"cdc.abor.ingestion\" }",
          "topic.prefix":"",
          "topic.suffix":"",
          "errors.log.enable":"true",
          "name":"abor-mongo-ingestion-source"
      } 

      1.  Connector falls off the oplog as indicated by change stream resume failure in mongod log below:

      {
          "t": {
              "$date": "2022-07-20T08:52:41.428+00:00"
          },
          "s": "I",
          "c": "COMMAND",
          "id": 51803,
          "ctx": "conn302681",
          "msg": "Slow query",
          "attr": {
              "type": "command",
              "ns": "development.ingestion",
              "command": {
                  "getMore": 7867414900284144865,
                  "collection": "ingestion",
                  "maxTimeMS": 1000,
                  "lsid": {
                      "id": {
                          "$uuid": "6cc7d463-0d99-465e-8c08-cf4609766c62"
                      },
                      "uid": {
                          "$binary": {
                              "base64": "wqELVKpJotbJlP9+PF/vEE7da06fWqqQdviC6wP43Ts=",
                              "subType": "0"
                          }
                      }
                  },
                  "$clusterTime": {
                      "clusterTime": {
                          "$timestamp": {
                              "t": 1658307161,
                              "i": 3362
                          }
                      },
                      "signature": {
                          "hash": {
                              "$binary": {
                                  "base64": "LRNoxwZcxRxqwYQuLJ3LFE2sP+0=",
                                  "subType": "0"
                              }
                          },
                          "keyId": 7059853149929996291
                      }
                  },
                  "$audit": {
                      "$impersonatedUsers": [
                          {
                              "user": "abor-development-20211025",
                              "db": "admin"
                          }
                      ],
                      "$impersonatedRoles": [
                          {
                              "role": "readWrite",
                              "db": "development"
                          }
                      ]
                  },
                  "$client": {
                      "driver": {
                          "name": "mongo-java-driver|sync|mongo-kafka|source",
                          "version": "4.5.0|1.7.0"
                      },
                      "os": {
                          "type": "Linux",
                          "name": "Linux",
                          "architecture": "amd64",
                          "version": "5.4.94"
                      },
                      "platform": "Java/Private Build/11.0.15+10-Ubuntu-0ubuntu0.20.04.1",
                      "mongos": {
                          "host": "atlas-1q1s8m-shard-00-02.wzyq3.mongodb.net:27016",
                          "client": "192.168.248.107:37164",
                          "version": "4.4.15"
                      }
                  },
                  "$configServerState": {
                      "opTime": {
                          "ts": {
                              "$timestamp": {
                                  "t": 1658307161,
                                  "i": 496
                              }
                          },
                          "t": 20
                      }
                  },
                  "$db": "development"
              },
              "originatingCommand": {
                  "aggregate": "ingestion",
                  "pipeline": [
                      {
                          "$changeStream": {
                              "fullDocument": "updateLookup",
                              "startAfter": {
                                  "_data": "8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004"
                              }
                          }
                      }
                  ],
                  "fromMongos": true,
                  "needsMerge": true,
                  "collation": {
                      "locale": "simple"
                  },
                  "cursor": {
                      "batchSize": 0
                  },
                  "runtimeConstants": {
                      "localNow": {
                          "$date": "2022-07-20T08:52:41.391Z"
                      },
                      "clusterTime": {
                          "$timestamp": {
                              "t": 1658307161,
                              "i": 3247
                          }
                      }
                  },
                  "use44SortKeys": true,
                  "useNewUpsert": true,
                  "readConcern": {
                      "provenance": "implicitDefault"
                  },
                  "writeConcern": {
                      "w": 1,
                      "wtimeout": 0,
                      "provenance": "implicitDefault"
                  },
                  "clientOperationKey": {
                      "$uuid": "e2974d33-cdac-44ae-bb3d-8ab6e4bb618c"
                  },
                  "lsid": {
                      "id": {
                          "$uuid": "6cc7d463-0d99-465e-8c08-cf4609766c62"
                      },
                      "uid": {
                          "$binary": {
                              "base64": "wqELVKpJotbJlP9+PF/vEE7da06fWqqQdviC6wP43Ts=",
                              "subType": "0"
                          }
                      }
                  },
                  "$readPreference": {
                      "mode": "secondary"
                  },
                  "$clusterTime": {
                      "clusterTime": {
                          "$timestamp": {
                              "t": 1658307161,
                              "i": 3247
                          }
                      },
                      "signature": {
                          "hash": {
                              "$binary": {
                                  "base64": "LRNoxwZcxRxqwYQuLJ3LFE2sP+0=",
                                  "subType": "0"
                              }
                          },
                          "keyId": 7059853149929996291
                      }
                  },
                  "$audit": {
                      "$impersonatedUsers": [
                          {
                              "user": "abor-development-20211025",
                              "db": "admin"
                          }
                      ],
                      "$impersonatedRoles": [
                          {
                              "role": "readWrite",
                              "db": "development"
                          }
                      ]
                  },
                  "$client": {
                      "driver": {
                          "name": "mongo-java-driver|sync|mongo-kafka|source",
                          "version": "4.5.0|1.7.0"
                      },
                      "os": {
                          "type": "Linux",
                          "name": "Linux",
                          "architecture": "amd64",
                          "version": "5.4.94"
                      },
                      "platform": "Java/Private Build/11.0.15+10-Ubuntu-0ubuntu0.20.04.1",
                      "mongos": {
                          "host": "atlas-1q1s8m-shard-00-02.wzyq3.mongodb.net:27016",
                          "client": "192.168.248.107:37164",
                          "version": "4.4.15"
                      }
                  },
                  "$configServerState": {
                      "opTime": {
                          "ts": {
                              "$timestamp": {
                                  "t": 1658307161,
                                  "i": 496
                              }
                          },
                          "t": 20
                      }
                  },
                  "$db": "development"
              },
              "planSummary": "COLLSCAN",
              "cursorid": 7867414900284144865,
              "numYields": 0,
              "ok": 0,
              "errMsg": "Resume of change stream was not possible, as the resume point may no longer be in the oplog.",
              "errName": "ChangeStreamHistoryLost",
              "errCode": 286,
              "reslen": 521,
              "locks": {
                  "FeatureCompatibilityVersion": {
                      "acquireCount": {
                          "r": 1
                      }
                  },
                  "ReplicationStateTransition": {
                      "acquireCount": {
                          "w": 1
                      }
                  },
                  "Global": {
                      "acquireCount": {
                          "r": 1
                      }
                  },
                  "Database": {
                      "acquireCount": {
                          "r": 1
                      }
                  },
                  "Mutex": {
                      "acquireCount": {
                          "r": 1
                      }
                  },
                  "oplog": {
                      "acquireCount": {
                          "r": 1
                      }
                  }
              },
              "readConcern": {
                  "level": "majority"
              },
              "writeConcern": {
                  "w": 1,
                  "wtimeout": 0,
                  "provenance": "implicitDefault"
              },
              "storage": {},
              "protocol": "op_msg",
              "durationMillis": 33
          }
      } 

      See specifically 

      "errMsg": "Resume of change stream was not possible, as the resume point may no longer be in the oplog.",        "errName": "ChangeStreamHistoryLost", 

      2. Then we see exceptions in kafka connector, yet it continues to run and send heartbeats:

       

      "2022-07-20T04:34:10.531Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,531] INFO [abor-mongo-ingestion-source|task-0] An exception occurred when trying to get the next item from the Change Stream (com.mongodb.kafka.connect.source.MongoSourceTask:648)"
      "2022-07-20T04:34:10.523Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,523] INFO [abor-mongo-ingestion-source|task-0] Resuming the change stream after the previous offset: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:430)"
      "2022-07-20T04:34:10.522Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,522] INFO [abor-mongo-ingestion-source|task-0] Watching for collection changes on 'development.ingestion' (com.mongodb.kafka.connect.source.MongoSourceTask:696)"
      "2022-07-20T04:34:10.522Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,522] INFO [abor-mongo-ingestion-source|task-0] Resume token from heartbeat: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:735)"
      "2022-07-20T04:34:10.519Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,518] INFO [abor-mongo-ingestion-source|task-0] Generating heartbeat event. {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.heartbeat.HeartbeatManager:77)"
      "2022-07-20T04:34:10.518Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,518] INFO [abor-mongo-ingestion-source|task-0] An exception occurred when trying to get the next item from the Change Stream (com.mongodb.kafka.connect.source.MongoSourceTask:648)"
      "2022-07-20T04:34:10.510Z","""i-01878ae9b47eaef5e""","""data/kafka-connect""","[2022-07-20 04:34:10,510] INFO [abor-mongo-ingestion-source|task-0] Resuming the change stream after the previous offset: {""_data"": ""8262D53DA7000011682B022C0100296E5A100409AF5D0483BC4597AB38CE193DD58252463C5F6964003C30663662386533662D386236312D633339382D393063652D3239663962393932383036655F313635313730383830305F30000004""} (com.mongodb.kafka.connect.source.MongoSourceTask:430)" 

            Assignee:
            maxim.katcharov@mongodb.com Maxim Katcharov
            Reporter:
            errol.kutan@mongodb.com Errol Kutan
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: