Uploaded image for project: 'Core Server'
  1. Core Server
  2. SERVER-41164

Change Stream pipeline regex matches wrong oplog docs when using '|' pipe in db name

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Minor - P4 Minor - P4
    • 4.0.11, 4.2.0-rc2, 4.3.1
    • Affects Version/s: 4.0.9, 4.1.11
    • Component/s: None
    • None
    • Fully Compatible
    • ALL
    • v4.2, v4.0
    • Query 2019-07-01
    • 32

      Fix for this ticket also fixes other issues with the following regex special characters: *,+,|,(,),^,?,[,],.,/,\,$.

      Our documentation states Linux database names allow (do not disallow) use of vertical pipe 0x7C characters.

      When a pipe is used in a db name, the change stream aggregation on the oplog can return documents for the wrong database or invalidate the stream when creating a collection in the same database.

      Repro 1 in python 3.7 pymongo 3.8 MongoDB 4.0, 4.1.10 causes an invalidate event to occur on the watched collection when creating a new collection on the same db. It appears to process the implicit create collection oplog entry shown below here,

       operationType = DocumentSourceChangeStream::kInvalidateOpType;
      
      { "ts" : Timestamp(1557884892, 6), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "c", "ns" : "fails_because_of_pipe_char_in_db_name|.$cmd", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "create" : "alt", "idIndex" : { "v" : 2, "key" : { "_id" : 1 }, "name" : "_id_", "ns" : "fails_because_of_pipe_char_in_db_name|.alt" } } }
      
      { "ts" : Timestamp(1557884892, 7), "t" : NumberLong(1), "h" : NumberLong(0), "v" : 2, "op" : "i", "ns" : "fails_because_of_pipe_char_in_db_name|.alt", "ui" : UUID("9f2275fb-c64f-4ed1-9719-a47cfad8888d"), "wall" : ISODate("2019-05-15T01:48:12.445Z"), "o" : { "_id" : 1 } }
      
      import pymongo
      from pprint import pprint
      from pymongo import MongoClient
      client = MongoClient('localhost',28021)
      
      flag_fix_reproduction = False
      db_name = 'invalidates_wrong_stream_because_of_pipe_char_in_db_name|'
      
      def dump_queued_watch_events( stream ):
          last = {}
          last["operationType"] = "(none seen!)";
          while stream.alive:
              change = stream.try_next()
              if change is not None:
                  print("Change event = ")
                  pprint(change)
                  last["operationType"] = change["operationType"]
              elif stream.alive:
                  return last["operationType"]
          return "(stream closed!)"
      
      # assure cleanup between test runs
      client.drop_database( db_name )
      print("cleaning up test dbs...")
      
      db = client[db_name]
      documents = db["documents"]
      documents.insert_one({"_id": 1})
      
      # implicit create collection here outside watch avoids issue
      
      if ( flag_fix_reproduction ):
          alt = db["alt"]
          alt.insert_one({"_id": 1}) # IMPLICIT CREATE BEFORE STREAM
      
      with documents.watch() as stream:
          documents.insert_one({"_id": 2})
          assert dump_queued_watch_events( stream ) == "insert"
      
          documents.insert_one({"_id": 2.1})
          assert dump_queued_watch_events( stream ) == "insert"
      
          documents.insert_one({"_id": 2.2})
          assert dump_queued_watch_events( stream ) == "insert"
      
          documents.insert_one({"_id": 2.3})
          assert dump_queued_watch_events( stream ) == "insert"
          
          # implicit create collection here repros issue
          
          if ( flag_fix_reproduction ):
              alt.insert_one({"_id": 3.5}) # no DDL event IN-STREAM
          else:
              alt = db["alt"]
              alt.insert_one({"_id": 3}) # IMPLICIT CREATE IN-STREAM
              
          # repro fails here:
      
          assert dump_queued_watch_events( stream ) != "invalidate"
      
          documents.insert_one({"_id": 3})
          assert dump_queued_watch_events( stream ) == "insert"
      
          print(db_name + "OK to here as expected")
      
      print( "done!" )
      

      Repro 2 in the 4.0, 4.1.10 shell on MongoDB 4.0, 4.1.10, the final output document shows a Change Stream on namespace has_a_|pipe.documents.documents match and return an insert from a different db, has_no_pipe.documents :

      From changestream on has_a_|pipe.documents
      {
      	"_id" : {
      		"_data" : "825CDC58340000000D2B022C0100296E5A1004EDED18BF4358459FB3468AD4B8630D7E461E5F6964002B040004",
      		"_typeBits" : BinData(0,"QA==")
      	},
      	"operationType" : "insert",
      	"clusterTime" : Timestamp(1557944372, 13),
      	"fullDocument" : {
      		"_id" : 2
      	},
      	"ns" : {
      		"db" : "has_no_pipe",
      		"coll" : "documents"
      	},
      	"documentKey" : {
      		"_id" : 2
      	}
      }
      
      var db1 = db.getSiblingDB("has_no_pipe")
      var db2 = db.getSiblingDB("has_a_|pipe")
      var db3 = db.getSiblingDB("has_a_database")
      
      print("Drop test databases")
      db1.dropDatabase()
      db2.dropDatabase()
      db3.dropDatabase()
      
      var coll1 = db1.documents
      var coll2 = db2.documents
      var coll3 = db3.collection
      
      coll1.insertOne({_id:1})
      coll2.insertOne({_id:1})
      
      var cs1 = coll1.watch()
      var cs2 = coll2.watch()
      
      function waitFor(stream) {
          var i=10
          while (!stream.hasNext() && (i > 0)) {
              i--
          }
      }
      
      function printNext() {
          while (cs1.hasNext()) {
              print("From changestream on " + coll1.getFullName())
              printjson(cs1.next())
          }
          while (cs2.hasNext()) {
              print("From changestream on " + coll2.getFullName())
              printjson(cs2.next())
          }
      }
      
      print("Insert into " + coll1.getFullName())
      coll1.insertOne({_id:2})
      waitFor(cs1)
      printNext()
      
      print("Insert into " + coll2.getFullName())
      coll2.insertOne({_id:3})
      waitFor(cs2)
      printNext()
      
      print("Insert into (create) " + coll3.getFullName())
      coll3.insertOne({_id:4})
      waitFor(cs2)
      printNext()
      

      Both of these seem to be regex failures from unescaped pipes in the regex called here,

       return _nsRegex->PartialMatch(nsField.getString());
      

      Using pipes in db names seems like a poor idea yet customers have tried it, and the risk of documents leaking across databases seems fix-worthy.

            Assignee:
            davis.haupt@mongodb.com Davis Haupt (Inactive)
            Reporter:
            scott.kurowski@mongodb.com Scott Kurowski
            Votes:
            0 Vote for this issue
            Watchers:
            11 Start watching this issue

              Created:
              Updated:
              Resolved: