-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: 1.3.1, 2.0
-
Component/s: None
-
None
I want to get all the documents matching a query that are currently in a collection and then start watching for any new documents matching the same query that are inserted from that moment on.
It's critical that I prevent a race condition that would cause me to miss any documents that are inserted/updated between the find()/aggregate() and the watch() commands.
This is straightforward to implement with pymongo:
import pymongo client = pymongo.MongoClient() client.testdb.drop_collection('foo') coll = client.testdb.foocoll.insert_one({'foo': 1}) with coll.watch() as stream: for doc in coll.find(): print(doc) coll.insert_one({'bar': 1}) for event in stream: print(event)
Output:
{'_id': ObjectId('5c8a558f22f3390f5b030219'), 'foo': 1} {'_id': {'_data': '825C8A55900000000129295A1004F96DAC4F71B04A9E9C732D2ED1EA1E9946645F696400645C8A559022F3390F5B03021A0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552569744, 1), 'fullDocument': {'_id': ObjectId('5c8a559022f3390f5b03021a'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a559022f3390f5b03021a')}}
The same with Motor 2.0 however misses the event:
import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient() await client.testdb.drop_collection('foo') coll = client.testdb.foo await coll.insert_one({'foo': 1}) async with coll.watch() as stream: async for doc in coll.find(): print(doc) await coll.insert_one({'bar': 1}) async for doc in stream: print(doc)
Output:
{'_id': ObjectId('5c8a559822f3390f5b03021c'), 'foo': 1}
There is a workaround, albeit cumbersome:
import asyncio import bson import motor.motor_asyncio client = motor.motor_asyncio.AsyncIOMotorClient() await client.testdb.drop_collection('foo') coll = client.testdb.foo await coll.insert_one({'foo': 1}) await asyncio.sleep(1) . # make sure we don't accidentally start watching before the drop server_status = await client.testdb.command({'serverStatus': 1}) ts = bson.Timestamp(server_status['localTime'], 0) seen = set() async for doc in coll.find(): seen.add(doc['_id']) print(doc) await coll.insert_one({'bar': 1}) async with coll.watch(start_at_operation_time=ts) as stream: async for event in stream: if event['fullDocument']['_id'] not in seen: print(event)
Output:
{'_id': ObjectId('5c8a58ca22f33910561ecbaa'), 'foo': 1} {'_id': {'_data': '825C8A58CB0000000129295A1004A1D923FC5DFB453D808A3B410DE286C246645F696400645C8A58CB22F33910561ECBAB0004'}, 'operationType': 'insert', 'clusterTime': Timestamp(1552570571, 1), 'fullDocument': {'_id': ObjectId('5c8a58cb22f33910561ecbab'), 'bar': 1}, 'ns': {'db': 'testdb', 'coll': 'foo'}, 'documentKey': {'_id': ObjectId('5c8a58cb22f33910561ecbab')}}
- is related to
-
PYTHON-4655 Decide whether to make Async Methods that Use Aggregate Sync Instead
- Closed