-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: None
-
Component/s: None
-
Atlas Streams
-
Fully Compatible
-
ALL
-
Sprint 32
The string is represented by kafkacat with:
\ud83c\udf78
Which is UTF-16 for 🍸
Then after parsing it with fromjson and serializing it with tosjon, what results is:
\355\240\274\355\275\270
bsoncxx rejects this as invalid, which is true.
===========
The follow string can cause the error. Internal mongod BSON classes work fine.
$6 = 0x7f5edaf49c20 "{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\":
{\"$numberInt\":\"0\"},\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\":
{\"$numberInt\":\"48232\"},\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\":
{\"$numberInt\":\"135530929\"},\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\":
{\"$numberInt\":\"0\"},\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\":
{\"$numberLong\":\"1695272056821\"}}},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"
mstreams: Invalid bytes in UTF8 string
Caused by a message like:
"{\"$set\":{\"$schema\":\"/mediawiki/recentchange/1.0.0\",\"meta\":{\"uri\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"request_id\":\"c6d88f7f-9a3c-4b7e-a7cd-421b90ed4a46\",\"id\":\"48472b65-6635-4b65-990c-fcc5e66310e6\",\"dt\":\"2023-09-21T03:05:04Z\",\"domain\":\"it.wikipedia.org\",\"stream\":\"mediawiki.recentchange\",\"topic\":\"codfw.mediawiki.recentchange\",\"partition\":
{\"$numberInt\":\"0\"},\"offset\":{\"$numberInt\":\"628263911\"}},\"id\":{\"$numberInt\":\"326192030\"},\"type\":\"edit\",\"namespace\":{\"$numberInt\":\"3\"},\"title\":\"Discussioni utente:Valepert\",\"title_url\":\"https://it.wikipedia.org/wiki/Discussioni_utente:Valepert\",\"comment\":\"\355\240\274\355\275\270\",\"timestamp\":{\"$numberInt\":\"1695265504\"},\"user\":\"Valepert\",\"bot\":false,\"notify_url\":\"https://it.wikipedia.org/w/index.php?diff=135583987&oldid=135530929&rcid=326192030\",\"minor\":false,\"patrolled\":true,\"length\":{\"old\":
{\"$numberInt\":\"48232\"},\"new\":{\"$numberInt\":\"48253\"}},\"revision\":{\"old\":
{\"$numberInt\":\"135530929\"},\"new\":{\"$numberInt\":\"135583987\"}},\"server_url\":\"https://it.wikipedia.org\",\"server_name\":\"it.wikipedia.org\",\"server_script_path\":\"/w\",\"wiki\":\"itwiki\",\"parsedcomment\":\"\355\240\274\355\275\270\",\"_ts\":{\"$date\":{\"$numberLong\":\"1695272056821\"}},\"_stream_meta\":{\"sourceType\":\"kafka\",\"sourcePartition\":
{\"$numberInt\":\"0\"},\"sourceOffset\":{\"$numberLong\":\"10882591\"},\"timestamp\":{\"$date\":
{\"$numberLong\":\"1695272056821\"}}},\"_id\":{\"$oid\":\"650c82f98c47efea135f73a3\"}}}"
This kafka:
kafkacat -C -b kafka.0x2f8.io:9093 -X security.protocol=SASL_PLAINTEXT -X sasl.mechanisms=PLAIN -X sasl.username=mongo -X sasl.password=mongodata_123 -C -t “topic name”
This streamProcessor:
{
id: '650bb29b49d37c600ee82da1',
name: 'kafkaToCollection',
lastModified: ISODate("2023-09-21T03:03:55.814Z"),
state: 'FAILED',
errorMsg: 'resource has no heartbeat',
pipeline: [
*{ '$source':
{ connectionName: 'TestKafka1', topic: 'wiki1' }},*
{
'$merge': {
into: { connectionName: 'TestAtlas1', db: 'test', coll: 'wiki1' }
}
}
]
}
Happened after
{ partitions: [
{ partition: 0, offset: 10806944 }] }
Happened again after 10821979
{ [-]
_p: F
attr: { [-]
context: { [-]
streamProcessorId: 650bb29b49d37c600ee82da1
streamProcessorName: kafkaToCollection
tenantId: 650761da7df3a953fbe8390c
}
error: invalid bytes in UTF8 string: could not parse JSON document
}
c: STREAMS
ctx: thread373
id: 75897
kube:
{ [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#] }msg: encountered exception, exiting runLoop(): {error}
s: E
stream: stdout
t:
{ [[+]|https://splunk.corp.mongodb.com/en-US/app/cloud/search?earliest=-60m%40m&latest=now&q=search%20index%3Dmhouse-dev%20650bb29b49d37c600ee82da1%20c%3DSTREAMS&display.page.search.mode=smart&dispatch.sample_ratio=1&sid=1695266377.8807063#] }time: 2023-09-21T03:11:07.151782139Z
}