Uploaded image for project: 'Kafka Connector'
  1. Kafka Connector
  2. KAFKA-223

Write MongoDB errors to the DLQ

    • Done
    • Write MongoDB errors to the DLQ
    • Hide

      Engineer(s): Valentin

      Summary: MongoDB Kafka Sink Connector should report sink records that it fails to put into the sink to the dead letter queue (DLQ) in situations, when the connector is configured to tolerate errors and to continue processing other records instead of failing fast and requiring manual intervention.

      2022-01-11: Setting initial target end date to 2022-01-28

      • Basic writing to DLQ is completed
      • Special handling for unordered bulk operations in progress

      Show
      Engineer(s): Valentin Summary: MongoDB Kafka Sink Connector should report sink records that it fails to put into the sink to the dead letter queue (DLQ) in situations, when the connector is configured to tolerate errors and to continue processing other records instead of failing fast and requiring manual intervention. 2022-01-11: Setting initial target end date to 2022-01-28 Basic writing to DLQ is completed Special handling for unordered bulk operations in progress
    • 8
    • 6
    • 6
    • 100

      Documentation:

      errors.deadletterqueue.topic.name: Name of topic to use as the dead letter queue.

       

      But:

      When errors.deadletterqueue.topic.name is a valid Kafka topic name ** And exception happens at com.mongodb.kafka.connect.sink.MongoSinkTask.bulkWriteBatch(MongoSinkTask.java:209) Then no message is send to the DLQ.

       

      Cause probably is in catch block which does not use the ErrantRecordReporter

      private void bulkWriteBatch(final List<MongoProcessedSinkRecordData> batch) {
        ...
        try {
          ...
        } catch (MongoException e) {
          LOGGER.warn(
              "Writing {} document(s) into collection [{}] failed.",
              writeModels.size(),
              namespace.getFullName());
          handleMongoException(config, e);
        } catch (InterruptedException e) {
          Thread.currentThread().interrupt();
          throw new DataException("Rate limiting was interrupted", e);
        } catch (Exception e) {
          if (config.logErrors()) {
            LOGGER.error("Failed to write mongodb documents", e);
          }
          if (!config.tolerateErrors()) {
            throw new DataException("Failed to write mongodb documents", e);
          }
        }
      }
      

      Developer notes

      The scope (WRITING-9407) was rewritten, and this section was moved there.

            Assignee:
            valentin.kovalenko@mongodb.com Valentin Kavalenka
            Reporter:
            louis.devergnies@ext.adeo.com Louis DEVERGNIES
            Votes:
            2 Vote for this issue
            Watchers:
            7 Start watching this issue

              Created:
              Updated:
              Resolved:
              7 weeks, 1 day