Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-174

Make pipeline errors easier to discover

    • Type: Icon: Improvement Improvement
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 1.5.0
    • Affects Version/s: None
    • Component/s: None
    • None

      When accidentally setting an incorrect pipeline its hard to determine the cause of the error. Especially when reconfiguring a connector:

      The following logs do not make it clear that the pipeline was incorrect:

       (com.mongodb.kafka.connect.source.MongoSourceTask)
       [2020-11-19 12:09:45,477] INFO WorkerSourceTask{id=mongo-connector-test-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask)
       [2020-11-19 12:09:45,478] INFO WorkerSourceTask{id=mongo-connector-test-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask)
       [2020-11-19 12:09:46,914] INFO Watching for collection changes on 'test.products' (com.mongodb.kafka.connect.source.MongoSourceTask)
       [2020-11-19 12:09:46,916] INFO Resuming the change stream after the previous offset: {"_data": "825FB65F61000000012B022C0100296E5A1004718E568607F245848B25C25D9FD3640E46645F696400645FB65F61926F0CD231104D930004"} (com.mongodb.kafka.connect.source.MongoSourceTask)
       [2020-11-19 12:09:46,917] WARN Failed to resume change stream: A pipeline stage specification object must contain exactly one field. 40323
       
       =====================================================================================
       If the resume token is no longer available then there is the potential for data loss.
       Saved resume tokens are managed by Kafka and stored with the offset data.
       
       To restart the change stream with no resume token either: 
         * Create a new partition name using the `offset.partition.name` configuration.
         * Set `errors.tolerance=all` and ignore the erroring resume token. 
         * Manually remove the old offset from its configured storage.
       
       Resetting the offset will allow for the connector to be resume from the latest resume
       token. Using `copy.existing=true` ensures that all data will be outputted by the
       connector but it will duplicate existing data.
       =====================================================================================
        (com.mongodb.kafka.connect.source.MongoSourceTask)
      

      Example pipeline:

      [\{ \"$addFields\"\{ \"myField\": \"$fullDocument.myField\" \}, \"$project\":\{ \"fullDocument\": 0 \} }]

            Assignee:
            ross@mongodb.com Ross Lawley
            Reporter:
            ross@mongodb.com Ross Lawley
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: