-
Type:
Bug
-
Resolution: Unresolved
-
Priority:
Major - P3
-
None
-
Affects Version/s: None
-
Component/s: None
-
Atlas Streams
-
ALL
-
None
-
3
-
None
-
None
-
None
-
None
-
None
-
None
In kafka_emit_operator.cpp, increment the outputMessageCount/outputMessageBytes after the delivery callback succeeds for a message. Today, in kafka $emit, we increment these stats too early, and the message delivery might fail.
We should only count messages succesfully delivered in stats. That's what we do in merge_operator and timeseries_emit_operator.
As part of this, we should double-check that flush() waits for all delivery callbacks to finish (and thus all stats to be incremented).