Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-81298

Streams: "field not found, expected type array" during network failure in MergeOperator

    • Atlas Streams
    • Fully Compatible
    • Sprint 32

      mstreams: Invalid bytes in UTF8 string

       

      Caused by a message like:

       

      "{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\":

      {\"$numberInt\":\"0\"}

      ,\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\":

      {\"$numberInt\":\"48232\"}

      ,\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\":

      {\"$numberInt\":\"135530929\"}

      ,\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\":

      {\"$numberInt\":\"0\"}

      ,\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\":

      {\"$numberLong\":\"1695272056821\"}

      }},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"

       

      This kafka:

      kafkacat -C -b kafka.0x2f8.io:9093 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=mongo -X sasl.password=mongodata_123 -C -t “topic name”

       

      This streamProcessor:

        {

          id: '650bb29b49d37c600ee82da1',

          name: 'kafkaToCollection',

          lastModified: ISODate("2023-09-21T03:03:55.814Z"),

          state: 'FAILED',

          errorMsg: 'resource has no heartbeat',

          pipeline: [

            *{ '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      },*

            {

              '$merge': {

                into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }

              }

            }

          ]

        }

       

      Happened after 

      { partitions: [

      { partition: 0, offset: 10806944 }

      ] }

      Happened again after 10821979

       

      Splunk

      { [-]

         _p: F

         attr: { [-]

           context: { [-]

             streamProcessorId: 650bb29b49d37c600ee82da1

             streamProcessorName: kafkaToCollection

             tenantId: 650761da7df3a953fbe8390c

           }

           error: invalid bytes in UTF8 string: could not parse JSON document

         }

         c: STREAMS

         ctx: thread373

         id: 75897

         kube:

      { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#]    }

         msg: encountered exception, exiting runLoop(): {error}

         s: E

         stream: stdout

         t:

      { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#]    }

         time: 2023-09-21T03:11:07.151782139Z

      }

      mstreams Error: “field not found, expected type array” fails streamProcessor using $merge

      Works:

      sp.process([ { '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      }] )

       

      Fails:

      sp.process([ { '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      }, { '$merge': { into:

      { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }

      } }] )

       

      sp.createStreamProcessor('kafkaToMongoAttempt6', [ { '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      }, { '$merge': { into:

      { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }

      } }] )

       

      Starting at:

           state: { partitions: [

      { partition: 0, offset: 10878743 }

      ] }

       

      https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=1695268260&latest=1695271863&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271930.8816799

       

      { [-]

         _p: F

         attr: { [-]

           context:

      { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#]      }

           error: field not found, expected type array

           errorCode: 13111

           reason: field not found, expected type array

         }

         c: STREAMS

         ctx: thread1582

         id: 75899

         kube:

      { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#]    }

         msg: encountered exception, exiting runLoop(): {error}

         s: W

         stream: stdout

         t:

      { [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20kafkaToMongoAttempt6%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695271863.8816704#]    }

         time: 2023-09-21T04:50:41.381178725Z

      }

       

      Also repros with below pipeline Around offset 10854764

      sp.process([ 

      { '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      }, 

      { '$tumblingWindow': { interval:

      { size: 5, unit: 'second' }

      , pipeline: [ 

      { '$group': { _id: '$title_url', count: { '$count': {} } } },

      { '$sort':

      { count: -1 }

      },

      { '$limit': 5 }

      { '$group': { 

        _id: null, 

      top_urls: { $push: { top_urls:

      { title_url: "$_id", count: "$count" }

      }} 

      }}] } }, 

      { '$project':

      { _id: 0 }

      }, 

      { '$merge': { into:

      { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1Top5PerHour' }

      } }] )

       

      Field not found, expecting array

      https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=1695269414&latest=1695270314&q=search%20index%3Dmhouse-dev%20650bc54849d37c600ee82df8%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695270378.8814113

       

          error: field not found, expected type array

           errorCode: 13111

           reason: field not found, expected type array

      mstreams async failure just reports “resource has no heartbeat”, we should supply the error message from mstreams

        {

          id: '650bb57f49d37c600ee82dbc',

          name: 'kafkaToCollectionAttempt2',

          lastModified: ISODate("2023-09-21T03:16:15.924Z"),

          state: 'FAILED',

          errorMsg: 'resource has no heartbeat',

          pipeline: [

            { '$source':

      { connectionName: 'TestKafka1', topic: 'wiki1' }

      },

            {

              '$merge': {

                into:

      { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }

              }

            }

          ]

        },

            Assignee:
            matthew.normyle@mongodb.com Matthew Normyle
            Reporter:
            matthew.normyle@mongodb.com Matthew Normyle
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated:
              Resolved: