Uploaded image for project: 'Python Driver'
  1. Python Driver
  2. PYTHON-4745

Document and Test Behavior when User Cancels Async Operation

    • Type: Icon: Task Task
    • Resolution: Unresolved
    • Priority: Icon: Unknown Unknown
    • 4.12
    • Affects Version/s: None
    • Component/s: None
    • None
    • Python Drivers
    • Hide

      1. What would you like to communicate to the user about this feature?
      2. Would you like the user to see examples of the syntax and/or executable code and its output?
      3. Which versions of the driver/connector does this apply to?

      Show
      1. What would you like to communicate to the user about this feature? 2. Would you like the user to see examples of the syntax and/or executable code and its output? 3. Which versions of the driver/connector does this apply to?

      Context

      In PYTHON-4695 we noted that when an async operation is canceled we can end up with an asyncio.TimeoutError instead of the expected NetworkError in the test. We should decide what we want the behavior to be, and test and document it.

      Definition of done

      Document and test the behavior.

      Pitfalls

      N/A

      Example traceback:

      [2024/09/05 12:12:33.097] FAILURE: asyncio.exceptions.TimeoutError ()
       [2024/09/05 12:12:33.097] sock = <ssl.SSLSocket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
       [2024/09/05 12:12:33.097] buf = b'\xd2\xfd\xff\x01\x7f\xdfV\xd8\x00\x00\x00\x00\xdd\x07\x00\x00\x00\x00\x00\x00\x00\t\x01\x00\x00\x10bulkWrite\x00\x01...9\xe6rp\xe8\x15x\xdb\xb3>?\x00\x00\x01 \x00\x00\x00nsInfo\x00\x15\x00\x00\x00\x02ns\x00\x08\x00\x00\x00db.coll\x00\x00'
       [2024/09/05 12:12:33.097]     async def _async_sendall_ssl_windows(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
       [2024/09/05 12:12:33.097]         view = memoryview(buf)
       [2024/09/05 12:12:33.097]         total_length = len(buf)
       [2024/09/05 12:12:33.097]         total_sent = 0
       [2024/09/05 12:12:33.097]         while total_sent < total_length:
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097] >               sent = sock.send(view[total_sent:])
       [2024/09/05 12:12:33.097] pymongo\network_layer.py:121: 
       [2024/09/05 12:12:33.097] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
       [2024/09/05 12:12:33.097] self = <ssl.SSLSocket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
       [2024/09/05 12:12:33.097] data = <memory at 0x000001D272C0E580>, flags = 0
       [2024/09/05 12:12:33.097]     def send(self, data, flags=0):
       [2024/09/05 12:12:33.097]         self._checkClosed()
       [2024/09/05 12:12:33.097]         if self._sslobj is not None:
       [2024/09/05 12:12:33.097]             if flags != 0:
       [2024/09/05 12:12:33.097]                 raise ValueError(
       [2024/09/05 12:12:33.097]                     "non-zero flags not allowed in calls to send() on %s" %
       [2024/09/05 12:12:33.097]                     self.__class__)
       [2024/09/05 12:12:33.097] >           return self._sslobj.write(data)
       [2024/09/05 12:12:33.097] E           ssl.SSLWantWriteError: The operation did not complete (write) (_ssl.c:2483)
       [2024/09/05 12:12:33.097] C:\python\Python39\lib\ssl.py:1174: SSLWantWriteError
       [2024/09/05 12:12:33.097] During handling of the above exception, another exception occurred:
       [2024/09/05 12:12:33.097] sock = <ssl.SSLSocket [closed] fd=-1, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0>
       [2024/09/05 12:12:33.097] buf = b'\xd2\xfd\xff\x01\x7f\xdfV\xd8\x00\x00\x00\x00\xdd\x07\x00\x00\x00\x00\x00\x00\x00\t\x01\x00\x00\x10bulkWrite\x00\x01...9\xe6rp\xe8\x15x\xdb\xb3>?\x00\x00\x01 \x00\x00\x00nsInfo\x00\x15\x00\x00\x00\x02ns\x00\x08\x00\x00\x00db.coll\x00\x00'
       [2024/09/05 12:12:33.097]     async def _async_sendall_ssl_windows(sock: Union[socket.socket, _sslConn], buf: bytes) -> None:
       [2024/09/05 12:12:33.097]         view = memoryview(buf)
       [2024/09/05 12:12:33.097]         total_length = len(buf)
       [2024/09/05 12:12:33.097]         total_sent = 0
       [2024/09/05 12:12:33.097]         while total_sent < total_length:
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097]                 sent = sock.send(view[total_sent:])
       [2024/09/05 12:12:33.097]             except BLOCKING_IO_ERRORS:
       [2024/09/05 12:12:33.097] >               await asyncio.sleep(0.5)
       [2024/09/05 12:12:33.097] pymongo\network_layer.py:123: 
       [2024/09/05 12:12:33.097] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
       [2024/09/05 12:12:33.097] delay = 0.5, result = None
       [2024/09/05 12:12:33.097]     async def sleep(delay, result=None, *, loop=None):
       [2024/09/05 12:12:33.097]         """Coroutine that completes after a given time (in seconds)."""
       [2024/09/05 12:12:33.097]         if loop is not None:
       [2024/09/05 12:12:33.097]             warnings.warn("The loop argument is deprecated since Python 3.8, "
       [2024/09/05 12:12:33.097]                           "and scheduled for removal in Python 3.10.",
       [2024/09/05 12:12:33.097]                           DeprecationWarning, stacklevel=2)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if delay <= 0:
       [2024/09/05 12:12:33.097]             await __sleep0()
       [2024/09/05 12:12:33.097]             return result
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if loop is None:
       [2024/09/05 12:12:33.097]             loop = events.get_running_loop()
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         future = loop.create_future()
       [2024/09/05 12:12:33.097]         h = loop.call_later(delay,
       [2024/09/05 12:12:33.097]                             futures._set_result_unless_cancelled,
       [2024/09/05 12:12:33.097]                             future, result)
       [2024/09/05 12:12:33.097]         try:
       [2024/09/05 12:12:33.097] >           return await future
       [2024/09/05 12:12:33.097] E           asyncio.exceptions.CancelledError
       [2024/09/05 12:12:33.097] C:\python\Python39\lib\asyncio\tasks.py:652: CancelledError
       [2024/09/05 12:12:33.097] During handling of the above exception, another exception occurred:
       [2024/09/05 12:12:33.097] fut = <Task cancelled name='Task-72' coro=<_async_sendall_ssl_windows() done, defined at C:\data\mci\924010f6415f1f701d744cbf1ed49800\src\pymongo\network_layer.py:115> created at C:\python\Python39\lib\asyncio\tasks.py:460>
       [2024/09/05 12:12:33.097] timeout = 2.0
       [2024/09/05 12:12:33.097]     async def wait_for(fut, timeout, *, loop=None):
       [2024/09/05 12:12:33.097]         """Wait for the single Future or coroutine to complete, with timeout.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         Coroutine will be wrapped in Task.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         Returns result of the Future or coroutine.  When a timeout occurs,
       [2024/09/05 12:12:33.097]         it cancels the task and raises TimeoutError.  To avoid the task
       [2024/09/05 12:12:33.097]         cancellation, wrap it in shield().
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         If the wait is cancelled, the task is also cancelled.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         This function is a coroutine.
       [2024/09/05 12:12:33.097]         """
       [2024/09/05 12:12:33.097]         if loop is None:
       [2024/09/05 12:12:33.097]             loop = events.get_running_loop()
       [2024/09/05 12:12:33.097]         else:
       [2024/09/05 12:12:33.097]             warnings.warn("The loop argument is deprecated since Python 3.8, "
       [2024/09/05 12:12:33.097]                           "and scheduled for removal in Python 3.10.",
       [2024/09/05 12:12:33.097]                           DeprecationWarning, stacklevel=2)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if timeout is None:
       [2024/09/05 12:12:33.097]             return await fut
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if timeout <= 0:
       [2024/09/05 12:12:33.097]             fut = ensure_future(fut, loop=loop)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             if fut.done():
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]             except exceptions.CancelledError as exc:
       [2024/09/05 12:12:33.097]                 raise exceptions.TimeoutError() from exc
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         waiter = loop.create_future()
       [2024/09/05 12:12:33.097]         timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
       [2024/09/05 12:12:33.097]         cb = functools.partial(_release_waiter, waiter)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         fut = ensure_future(fut, loop=loop)
       [2024/09/05 12:12:33.097]         fut.add_done_callback(cb)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         try:
       [2024/09/05 12:12:33.097]             # wait until the future completes or the timeout
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097]                 await waiter
       [2024/09/05 12:12:33.097]             except exceptions.CancelledError:
       [2024/09/05 12:12:33.097]                 if fut.done():
       [2024/09/05 12:12:33.097]                     return fut.result()
       [2024/09/05 12:12:33.097]                 else:
       [2024/09/05 12:12:33.097]                     fut.remove_done_callback(cb)
       [2024/09/05 12:12:33.097]                     # We must ensure that the task is not running
       [2024/09/05 12:12:33.097]                     # after wait_for() returns.
       [2024/09/05 12:12:33.097]                     # See https://bugs.python.org/issue32751
       [2024/09/05 12:12:33.097]                     await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]                     raise
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             if fut.done():
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]             else:
       [2024/09/05 12:12:33.097]                 fut.remove_done_callback(cb)
       [2024/09/05 12:12:33.097]                 # We must ensure that the task is not running
       [2024/09/05 12:12:33.097]                 # after wait_for() returns.
       [2024/09/05 12:12:33.097]                 # See https://bugs.python.org/issue32751
       [2024/09/05 12:12:33.097]                 await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]                 # In case task cancellation failed with some
       [2024/09/05 12:12:33.097]                 # exception, we should re-raise it
       [2024/09/05 12:12:33.097]                 # See https://bugs.python.org/issue40607
       [2024/09/05 12:12:33.097]                 try:
       [2024/09/05 12:12:33.097] >                   return fut.result()
       [2024/09/05 12:12:33.097] E                   asyncio.exceptions.CancelledError
       [2024/09/05 12:12:33.097] C:\python\Python39\lib\asyncio\tasks.py:490: CancelledError
       [2024/09/05 12:12:33.097] The above exception was the direct cause of the following exception:
       [2024/09/05 12:12:33.097] self = <test.asynchronous.test_client_bulk_write.TestClientBulkWriteCSOT testMethod=test_timeout_in_multi_batch_bulk_write>
       [2024/09/05 12:12:33.097]     @async_client_context.require_version_min(8, 0, 0, -24)
       [2024/09/05 12:12:33.097]     @async_client_context.require_no_serverless
       [2024/09/05 12:12:33.097]     @async_client_context.require_failCommand_fail_point
       [2024/09/05 12:12:33.097]     async def test_timeout_in_multi_batch_bulk_write(self):
       [2024/09/05 12:12:33.097]         _OVERHEAD = 500
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         internal_client = await async_rs_or_single_client(timeoutMS=None)
       [2024/09/05 12:12:33.097]         self.addAsyncCleanup(internal_client.close)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         collection = internal_client.db["coll"]
       [2024/09/05 12:12:33.097]         self.addAsyncCleanup(collection.drop)
       [2024/09/05 12:12:33.097]         await collection.drop()
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         fail_command = {
       [2024/09/05 12:12:33.097]             "configureFailPoint": "failCommand",
       [2024/09/05 12:12:33.097]             "mode": {"times": 2},
       [2024/09/05 12:12:33.097]             "data": {"failCommands": ["bulkWrite"], "blockConnection": True, "blockTimeMS": 1010},
       [2024/09/05 12:12:33.097]         }
       [2024/09/05 12:12:33.097]         async with self.fail_point(fail_command):
       [2024/09/05 12:12:33.097]             models = []
       [2024/09/05 12:12:33.097]             num_models = int(self.max_message_size_bytes / self.max_bson_object_size + 1)
       [2024/09/05 12:12:33.097]             b_repeated = "b" * (self.max_bson_object_size - _OVERHEAD)
       [2024/09/05 12:12:33.097]             for _ in range(num_models):
       [2024/09/05 12:12:33.097]                 models.append(
       [2024/09/05 12:12:33.097]                     InsertOne(
       [2024/09/05 12:12:33.097]                         namespace="db.coll",
       [2024/09/05 12:12:33.097]                         document={"a": b_repeated},
       [2024/09/05 12:12:33.097]                     )
       [2024/09/05 12:12:33.097]                 )
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             listener = OvertCommandListener()
       [2024/09/05 12:12:33.097]             client = await async_rs_or_single_client(
       [2024/09/05 12:12:33.097]                 event_listeners=[listener],
       [2024/09/05 12:12:33.097]                 readConcernLevel="majority",
       [2024/09/05 12:12:33.097]                 readPreference="primary",
       [2024/09/05 12:12:33.097]                 timeoutMS=2000,
       [2024/09/05 12:12:33.097]                 w="majority",
       [2024/09/05 12:12:33.097]             )
       [2024/09/05 12:12:33.097]             self.addAsyncCleanup(client.close)
       [2024/09/05 12:12:33.097]             await client.admin.command("ping")  # Init the client first.
       [2024/09/05 12:12:33.097]             with self.assertRaises(ClientBulkWriteException) as context:
       [2024/09/05 12:12:33.097]                 await client.bulk_write(models=models)
       [2024/09/05 12:12:33.097]             if not isinstance(context.exception.error, NetworkTimeout):
       [2024/09/05 12:12:33.097] >               raise context.exception.error
       [2024/09/05 12:12:33.097] test\asynchronous\test_client_bulk_write.py:620: 
       [2024/09/05 12:12:33.097] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
       [2024/09/05 12:12:33.097] pymongo\asynchronous\client_bulk.py:265: in write_command
       [2024/09/05 12:12:33.097]     reply = await bwc.conn.write_command(request_id, msg, bwc.codec)  # type: ignore[misc, arg-type]
       [2024/09/05 12:12:33.097] pymongo\asynchronous\pool.py:623: in write_command
       [2024/09/05 12:12:33.097]     await self.send_message(msg, 0)
       [2024/09/05 12:12:33.097] pymongo\asynchronous\pool.py:582: in send_message
       [2024/09/05 12:12:33.097]     self._raise_connection_failure(error)
       [2024/09/05 12:12:33.097] pymongo\asynchronous\pool.py:580: in send_message
       [2024/09/05 12:12:33.097]     await async_sendall(self.conn, message)
       [2024/09/05 12:12:33.097] pymongo\network_layer.py:68: in async_sendall
       [2024/09/05 12:12:33.097]     await asyncio.wait_for(_async_sendall_ssl_windows(sock, buf), timeout=timeout)
       [2024/09/05 12:12:33.097] _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _
       [2024/09/05 12:12:33.097] fut = <Task cancelled name='Task-72' coro=<_async_sendall_ssl_windows() done, defined at C:\data\mci\924010f6415f1f701d744cbf1ed49800\src\pymongo\network_layer.py:115> created at C:\python\Python39\lib\asyncio\tasks.py:460>
       [2024/09/05 12:12:33.097] timeout = 2.0
       [2024/09/05 12:12:33.097]     async def wait_for(fut, timeout, *, loop=None):
       [2024/09/05 12:12:33.097]         """Wait for the single Future or coroutine to complete, with timeout.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         Coroutine will be wrapped in Task.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         Returns result of the Future or coroutine.  When a timeout occurs,
       [2024/09/05 12:12:33.097]         it cancels the task and raises TimeoutError.  To avoid the task
       [2024/09/05 12:12:33.097]         cancellation, wrap it in shield().
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         If the wait is cancelled, the task is also cancelled.
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         This function is a coroutine.
       [2024/09/05 12:12:33.097]         """
       [2024/09/05 12:12:33.097]         if loop is None:
       [2024/09/05 12:12:33.097]             loop = events.get_running_loop()
       [2024/09/05 12:12:33.097]         else:
       [2024/09/05 12:12:33.097]             warnings.warn("The loop argument is deprecated since Python 3.8, "
       [2024/09/05 12:12:33.097]                           "and scheduled for removal in Python 3.10.",
       [2024/09/05 12:12:33.097]                           DeprecationWarning, stacklevel=2)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if timeout is None:
       [2024/09/05 12:12:33.097]             return await fut
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         if timeout <= 0:
       [2024/09/05 12:12:33.097]             fut = ensure_future(fut, loop=loop)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             if fut.done():
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]             except exceptions.CancelledError as exc:
       [2024/09/05 12:12:33.097]                 raise exceptions.TimeoutError() from exc
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         waiter = loop.create_future()
       [2024/09/05 12:12:33.097]         timeout_handle = loop.call_later(timeout, _release_waiter, waiter)
       [2024/09/05 12:12:33.097]         cb = functools.partial(_release_waiter, waiter)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         fut = ensure_future(fut, loop=loop)
       [2024/09/05 12:12:33.097]         fut.add_done_callback(cb)
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]         try:
       [2024/09/05 12:12:33.097]             # wait until the future completes or the timeout
       [2024/09/05 12:12:33.097]             try:
       [2024/09/05 12:12:33.097]                 await waiter
       [2024/09/05 12:12:33.097]             except exceptions.CancelledError:
       [2024/09/05 12:12:33.097]                 if fut.done():
       [2024/09/05 12:12:33.097]                     return fut.result()
       [2024/09/05 12:12:33.097]                 else:
       [2024/09/05 12:12:33.097]                     fut.remove_done_callback(cb)
       [2024/09/05 12:12:33.097]                     # We must ensure that the task is not running
       [2024/09/05 12:12:33.097]                     # after wait_for() returns.
       [2024/09/05 12:12:33.097]                     # See https://bugs.python.org/issue32751
       [2024/09/05 12:12:33.097]                     await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]                     raise
       [2024/09/05 12:12:33.097]     
       [2024/09/05 12:12:33.097]             if fut.done():
       [2024/09/05 12:12:33.097]                 return fut.result()
       [2024/09/05 12:12:33.097]             else:
       [2024/09/05 12:12:33.097]                 fut.remove_done_callback(cb)
       [2024/09/05 12:12:33.097]                 # We must ensure that the task is not running
       [2024/09/05 12:12:33.097]                 # after wait_for() returns.
       [2024/09/05 12:12:33.097]                 # See https://bugs.python.org/issue32751
       [2024/09/05 12:12:33.097]                 await _cancel_and_wait(fut, loop=loop)
       [2024/09/05 12:12:33.097]                 # In case task cancellation failed with some
       [2024/09/05 12:12:33.097]                 # exception, we should re-raise it
       [2024/09/05 12:12:33.097]                 # See https://bugs.python.org/issue40607
       [2024/09/05 12:12:33.097]                 try:
       [2024/09/05 12:12:33.097]                     return fut.result()
       [2024/09/05 12:12:33.097]                 except exceptions.CancelledError as exc:
       [2024/09/05 12:12:33.097] >                   raise exceptions.TimeoutError() from exc
       [2024/09/05 12:12:33.097] E                   asyncio.exceptions.TimeoutError
       [2024/09/05 12:12:33.097] C:\python\Python39\lib\asyncio\tasks.py:492: TimeoutError
      

            Assignee:
            Unassigned Unassigned
            Reporter:
            steve.silvester@mongodb.com Steve Silvester
            Votes:
            0 Vote for this issue
            Watchers:
            3 Start watching this issue

              Created:
              Updated: