Uploaded image for project: 'Motor'
  1. Motor
  2. MOTOR-467

AgnosticBaseCursor error when killing an aggregation query in process.

    • Type: Icon: Bug Bug
    • Resolution: Fixed
    • Priority: Icon: Major - P3 Major - P3
    • 2.1
    • Affects Version/s: 2.0
    • Component/s: None
    • None
    • Environment:
      Python 3.6

      PyMongo 3.9.0
      Motor 2.0.0

      Ubuntu 16.04
      2.7.12 (default, Oct 8 2019, 14:14:10)
      [GCC 5.4.0 20160609]

       Hello,

      My team and I are currently working on developing a backend which is written in Python 3.6+ with asyncio, using the aiohttp framework.

      We are implementing an endpoint which makes a query using an aggregation pipeline. Depending on the time frame provided by the user through the frontend, the query can last several minutes executing. Since the queries can take that long, we're looking for a way to cancel the queries in case the result is not required anymore by the frontend, this way the MongoDB Server is not overloaded by queries that are not required anymore.

      So the idea is to abort the queries in the server when a CancelledError exception is raised in the backend. For this, first, the operation id of the query is obtained using the currentOp pipeline stage, and then the operation is killed using the killOp mongo command.

      The code looks like this:

      async def endpoint_name(self):
          # Aggregation pipeline stages
          pipeline = [<pipeline stages>]
          
          try:
               # Aggregate using the pipeline
               cursor = self._mongodb_db[collection].aggregate(
                   pipeline=pipeline,
                   allowDiskUse=True,
               )           
               docs = await cursor.to_list(length=None)
          except CancelledError:
               # Kill query operation
               await abort_mongo_query(self._mongodb_admin, pipeline)
               raise        
          else:
               return test_list
      
      

       

      When a CancelledError exception is raised, the queries are effectively killed on the server. However, as soon as the queries are killed on the server, the backend shows this error for every endpoint whose query was aborted:

       

       It seems that Motor library internally starts a thread to wait for the query response, and since the query are killed this error happens. We tried closing the cursor before killing the query like below, but the a_wait cursor.close()_ raises a NotImplemented error.

      async def endpoint_name(self):
          # Aggregation pipeline stages
          pipeline = [<pipeline stages>]
       
          try: 
              # Aggregate using the pipeline
          cursor = self._mongodb_db[collection].aggregate(
              pipeline=pipeline, 
              allowDiskUse=True,
          )
              docs = await cursor.to_list(length=None)
          except CancelledError:
              await cursor.close()
              # Kill query operation 
              await abort_mongo_query(self._mongodb_admin, pipeline) 
              raise 
          else: 
              return test_list
      

       

            Assignee:
            shane.harvey@mongodb.com Shane Harvey
            Reporter:
            jonnaro14@gmail.com Jonathan Rojas
            Votes:
            0 Vote for this issue
            Watchers:
            2 Start watching this issue

              Created:
              Updated:
              Resolved: