-
Type: Bug
-
Resolution: Fixed
-
Priority: Major - P3
-
Affects Version/s: 2.0
-
Component/s: None
-
None
-
Environment:Python 3.6
PyMongo 3.9.0
Motor 2.0.0
Ubuntu 16.04
2.7.12 (default, Oct 8 2019, 14:14:10)
[GCC 5.4.0 20160609]
Hello,
My team and I are currently working on developing a backend which is written in Python 3.6+ with asyncio, using the aiohttp framework.
We are implementing an endpoint which makes a query using an aggregation pipeline. Depending on the time frame provided by the user through the frontend, the query can last several minutes executing. Since the queries can take that long, we're looking for a way to cancel the queries in case the result is not required anymore by the frontend, this way the MongoDB Server is not overloaded by queries that are not required anymore.
So the idea is to abort the queries in the server when a CancelledError exception is raised in the backend. For this, first, the operation id of the query is obtained using the currentOp pipeline stage, and then the operation is killed using the killOp mongo command.
The code looks like this:
async def endpoint_name(self): # Aggregation pipeline stages pipeline = [<pipeline stages>] try: # Aggregate using the pipeline cursor = self._mongodb_db[collection].aggregate( pipeline=pipeline, allowDiskUse=True, ) docs = await cursor.to_list(length=None) except CancelledError: # Kill query operation await abort_mongo_query(self._mongodb_admin, pipeline) raise else: return test_list
When a CancelledError exception is raised, the queries are effectively killed on the server. However, as soon as the queries are killed on the server, the backend shows this error for every endpoint whose query was aborted:
It seems that Motor library internally starts a thread to wait for the query response, and since the query are killed this error happens. We tried closing the cursor before killing the query like below, but the a_wait cursor.close()_ raises a NotImplemented error.
async def endpoint_name(self): # Aggregation pipeline stages pipeline = [<pipeline stages>] try: # Aggregate using the pipeline cursor = self._mongodb_db[collection].aggregate( pipeline=pipeline, allowDiskUse=True, ) docs = await cursor.to_list(length=None) except CancelledError: await cursor.close() # Kill query operation await abort_mongo_query(self._mongodb_admin, pipeline) raise else: return test_list
- is related to
-
MOTOR-488 InvalidStateError in AgnosticLatentCommandCursor when future is cancelled
- Closed