When an operator sends a message to the DLQ, it should include an "operatorName" field in the message.
=== Original ticket ===
A user had a problem where we could not easily understand that the problem was coming from the emit stage, we thought it was coming from the source stage. To make this easier the name of the stage that send the DLQ message should be included when possible..
In this below example the error is in $emit because a string timestamp is sent and not a bson timestamp. timeField in source and emit have the same field name leading to confusion.
let source = { "$source": { connectionName: "real-time-tracking-kafka-connection", topic: "real_time_tracking", timeField: { $dateFromString: { dateString: "$timestamp" } } } }; let merge = { $emit: { connectionName: "real-time-tracking-atlas-connection", db: "chilly_trucks", coll: "real_time_tracking", timeseries: { timeField: "timestamp", metaField: "metadata", granularity: "seconds" } } }; // Process and merge sp.process([source, merge]); //sp.createStreamProcessor("rtt_processor", [source, merge]);
{ _dlqMessage: { _stream_meta: { source: { type: 'kafka', topic: 'real_time_tracking', partition: 0, offset: 2198, key: Binary.createFromBase64('', 0), headers: [] } }, errInfo: { reason: "timeField 'timestamp' must be present and contain a valid BSON UTC datetime value" }, doc: { timestamp: '2024-07-12T15:53:33.874913+00:00', location: { type: 'Point', coordinates: [ 9.789905308249763, 29.916388161696958 ] }, temperature: 10.8, speed: 74.6, fuel_level: 74.79343891666663, condenser_on: false, mileage: 162840.56108333336, metadata: { vehicle_info: { vehicle_id: 'yN14Ea75IW15Hw51', model: 'Model Z', year: 2012 }, driver_info: { driver_id: 'Oo77TV94', name: 'Mr. Aaron Hoover Jr.', license_number: 'Bp37NG68Ut85qQ78' } }, _ts: ISODate('2024-07-12T15:53:33.874Z'), _stream_meta: { source: { type: 'kafka', topic: 'real_time_tracking', partition: 0, offset: 2198 } } }, processorName: 'internal-anon-stream-processor-66914fd2d899a9455620f394' } }