Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-96452

Streams: Unexpected "ConflictingUpdateOperators" error in $merge

    • Type: Icon: Bug Bug
    • Resolution: Won't Fix
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: None
    • Component/s: None
    • Atlas Streams
    • ALL
    • Sprint 61

      The kafka to $merge test below produces DLQ messages for "ConflictingUpdateOperators" in the $merge stage. This is likely a bug in our code. I was able to repro this consistently when running a release build (though it might repro in a debug build as well).

      [j0:prim] dlqMessage: { _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919, key: BinData(0, ), headers: [] } }, errInfo: { reason: "Failed to process an input document in the current batch in MergeOperator with error: code = ConflictingUpdateOperators, reason = Updating the path '_..." }, operatorName: "MergeOperator", doc: { a: 2, arr: { i: 918 }, str: "vAU6Eur0OfRtQy7I1mBAleGM4ttjz1jyVMGBRMORlme7WjWDGLPJ5LnWWPZbvE3S8cGisLrh1PZdWnAC4iCrFeQAGYMHjaMde9gGsc9ByLcyFsdszsdVmBCCeT3nKX1V", _ts: new Date(1730334396868), _stream_meta: { source: { type: "kafka", topic: "outputTopic1", partition: 8, offset: 10919 } }, _ts: new Date(1730334396871) }, processorName: "reader", dlqTime: new Date(1730334401320) } 

      Add this test in kafka.js:

      runKafkaTest(kafka, () => {
          Random.setRandomSeed(42);    // Write a bunch to a 32 partition Kafka topic.
          const numDocsInBatch = 5001;
          let arr = [];
          const str = makeRandomString(128);
          for (let i = 0; i < numDocsInBatch; i += 1) {
              arr.push({i: i});
          }
          let inputData = [];
          const numInput = 10;
          for (let i = 0; i < numInput; i += 1) {
              inputData.push({a: i, arr: arr, str: str});
          }
          let spName = "writer";
          const totalInput = arr.length * inputData.length;
          sp.createStreamProcessor(spName, [
              {$source: {'connectionName': '__testMemory'}},
              {$unwind: "$arr"},
              {
                  $emit: {
                      connectionName: kafkaPlaintextName,
                      topic: topicName1,
                  }
              }
          ]);
          sp[spName].start();
          for (const doc of inputData) {
              sp[spName].testInsert(doc);
          }
          assert.soon(() => { return sp[spName].stats().outputMessageCount == totalInput; });
          sp[spName].stop();    // Now read from that topic into a $merge.
          spName = "reader";
          sp.createStreamProcessor(spName, [
              { $source: {
                  connectionName: kafkaPlaintextName,
                  topic: topicName1,
                  config: {
                      auto_offset_reset: "earliest"
                  }
              }},
              {$project: {_id: 0}},
              {$merge: {into: {connectionName: dbConnName, db: dbName, coll: sinkCollName1}}}
          ]);
          sp[spName].start();
          assert.soon(() => { 
              jsTestLog(sp[spName].stats());
              return sp[spName].stats().outputMessageCount == totalInput; 
          });
          sp[spName].stop();
      }, 32 /* partitionCount */); 

            Assignee:
            mayuresh.kulkarni@mongodb.com Mayuresh Kulkarni
            Reporter:
            matthew.normyle@mongodb.com Matthew Normyle
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: