Fix for this ticket also fixes other issues with the following regex special characters: *,+,|,(,),^,?,[,],.,/,\,$.
When a pipe is used in a db name, the change stream aggregation on the oplog can return documents for the wrong database or invalidate the stream when creating a collection in the same database.
Repro 1 in python 3.7 pymongo 3.8 MongoDB 4.0, 4.1.10 causes an invalidate event to occur on the watched collection when creating a new collection on the same db. It appears to process the implicit create collection oplog entry shown below here,
operationType = DocumentSourceChangeStream::kInvalidateOpType;
{ "ts" : Timestamp(1557884892, 6), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "fails_because_of_pipe_char_in_db_name|.$cmd", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "create" : "alt", "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "fails_because_of_pipe_char_in_db_name|.alt" } } } { "ts" : Timestamp(1557884892, 7), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "i", "ns" : "fails_because_of_pipe_char_in_db_name|.alt", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "_id" : 1 } }
import pymongo from pprint import pprint from pymongo import MongoClient client = MongoClient('localhost',28021) flag_fix_reproduction = False db_name = 'invalidates_wrong_stream_because_of_pipe_char_in_db_name|' def dump_queued_watch_events( stream ): last = {} last["operationType"] = "(none seen!)"; while stream.alive: change = stream.try_next() if change is not None: print("Change event = ") pprint(change) last["operationType"] = change["operationType"] elif stream.alive: return last["operationType"] return "(stream closed!)" # assure cleanup between test runs client.drop_database( db_name ) print("cleaning up test dbs...") db = client[db_name] documents = db["documents"] documents.insert_one({"_id": 1}) # implicit create collection here outside watch avoids issue if ( flag_fix_reproduction ): alt = db["alt"] alt.insert_one({"_id": 1}) # IMPLICIT CREATE BEFORE STREAM with documents.watch() as stream: documents.insert_one({"_id": 2}) assert dump_queued_watch_events( stream ) == "insert" documents.insert_one({"_id": 2.1}) assert dump_queued_watch_events( stream ) == "insert" documents.insert_one({"_id": 2.2}) assert dump_queued_watch_events( stream ) == "insert" documents.insert_one({"_id": 2.3}) assert dump_queued_watch_events( stream ) == "insert" # implicit create collection here repros issue if ( flag_fix_reproduction ): alt.insert_one({"_id": 3.5}) # no DDL event IN-STREAM else: alt = db["alt"] alt.insert_one({"_id": 3}) # IMPLICIT CREATE IN-STREAM # repro fails here: assert dump_queued_watch_events( stream ) != "invalidate" documents.insert_one({"_id": 3}) assert dump_queued_watch_events( stream ) == "insert" print(db_name + "OK to here as expected") print( "done!" )
Repro 2 in the 4.0, 4.1.10 shell on MongoDB 4.0, 4.1.10, the final output document shows a Change Stream on namespace has_a_|pipe.documents.documents match and return an insert from a different db, has_no_pipe.documents :
From changestream on has_a_|pipe.documents { "_id" : { "_data" : "825CDC58340000000D2B022C0100296E5A1004EDED18BF4358459FB3468AD4B8630D7E461E5F6964002B040004", "_typeBits" : BinData(0,"QA==") }, "operationType" : "insert", "clusterTime" : Timestamp(1557944372, 13), "fullDocument" : { "_id" : 2 }, "ns" : { "db" : "has_no_pipe", "coll" : "documents" }, "documentKey" : { "_id" : 2 } }
var db1 = db.getSiblingDB("has_no_pipe") var db2 = db.getSiblingDB("has_a_|pipe") var db3 = db.getSiblingDB("has_a_database") print("Drop test databases") db1.dropDatabase() db2.dropDatabase() db3.dropDatabase() var coll1 = db1.documents var coll2 = db2.documents var coll3 = db3.collection coll1.insertOne({_id:1}) coll2.insertOne({_id:1}) var cs1 = coll1.watch() var cs2 = coll2.watch() function waitFor(stream) { var i=10 while (!stream.hasNext() && (i > 0)) { i-- } } function printNext() { while (cs1.hasNext()) { print("From changestream on " + coll1.getFullName()) printjson(cs1.next()) } while (cs2.hasNext()) { print("From changestream on " + coll2.getFullName()) printjson(cs2.next()) } } print("Insert into " + coll1.getFullName()) coll1.insertOne({_id:2}) waitFor(cs1) printNext() print("Insert into " + coll2.getFullName()) coll2.insertOne({_id:3}) waitFor(cs2) printNext() print("Insert into (create) " + coll3.getFullName()) coll3.insertOne({_id:4}) waitFor(cs2) printNext()
Both of these seem to be regex failures from unescaped pipes in the regex called here,
return _nsRegex->PartialMatch(nsField.getString());
Using pipes in db names seems like a poor idea yet customers have tried it, and the risk of documents leaking across databases seems fix-worthy.