Uploaded image for project: 'Motor'
  1. Motor
  2. MOTOR-1102

motor raises Runtime Error : Event loop is closed

    • Type: Icon: Bug Bug
    • Resolution: Gone away
    • Priority: Icon: Major - P3 Major - P3
    • None
    • Affects Version/s: 3.1.1
    • Component/s: asyncio, tornado
    • None

      When launching my tests with pytest I get a RuntimeError : Event loop is closed coming from I call to the database. It was working in python 3.8 with motor 2.5.

      Here is my traceback:

      request = <starlette.requests.Request object at 0x0000019ADCCEEE10>
      
          async def call_next(request: Request) -> Response:
              app_exc: typing.Optional[Exception] = None
              send_stream, recv_stream = anyio.create_memory_object_stream()
          
              async def receive_or_disconnect() -> Message:
                  if response_sent.is_set():
                      return \{"type": "http.disconnect"}
          
                  async with anyio.create_task_group() as task_group:
          
                      async def wrap(func: typing.Callable[[], typing.Awaitable[T]]) -> T:
                          result = await func()
                          task_group.cancel_scope.cancel()
                          return result
          
                      task_group.start_soon(wrap, response_sent.wait)
                      message = await wrap(request.receive)
          
                  if response_sent.is_set():
                      return \{"type": "http.disconnect"}
          
                  return message
          
              async def close_recv_stream_on_response_sent() -> None:
                  await response_sent.wait()
                  recv_stream.close()
          
              async def send_no_error(message: Message) -> None:
                  try:
                      await send_stream.send(message)
                  except anyio.BrokenResourceError:
                      # recv_stream has been closed, i.e. response_sent has been set.
                      return
          
              async def coro() -> None:
                  nonlocal app_exc
          
                  async with send_stream:
                      try:
                          await self.app(scope, receive_or_disconnect, send_no_error)
                      except Exception as exc:
                          app_exc = exc
          
              task_group.start_soon(close_recv_stream_on_response_sent)
              task_group.start_soon(coro)
          
              try:
      >           message = await recv_stream.receive()
      
      ..\venv\Lib\site-packages\starlette\middleware\base.py:78: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = MemoryObjectReceiveStream(_state=MemoryObjectStreamState(max_buffer_size=0, buffer=deque([]), open_send_channels=0, open_receive_channels=1, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=False)
      
          async def receive(self) -> T_Item:
              await checkpoint()
              try:
      >           return self.receive_nowait()
      
      ..\venv\Lib\site-packages\anyio\streams\memory.py:94: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = MemoryObjectReceiveStream(_state=MemoryObjectStreamState(max_buffer_size=0, buffer=deque([]), open_send_channels=0, open_receive_channels=1, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=False)
      
          def receive_nowait(self) -> T_Item:
              """
              Receive the next item if it can be done without waiting.
          
              :return: the received item
              :raises ~anyio.ClosedResourceError: if this send stream has been closed
              :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been
                  closed from the sending end
              :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks
                  waiting to send
          
              """
              if self._closed:
                  raise ClosedResourceError
          
              if self._state.waiting_senders:
                  # Get the item from the next sender
                  send_event, item = self._state.waiting_senders.popitem(last=False)
                  self._state.buffer.append(item)
                  send_event.set()
          
              if self._state.buffer:
                  return self._state.buffer.popleft()
              elif not self._state.open_send_channels:
      >           raise EndOfStream
      E           anyio.EndOfStream
      
      ..\venv\Lib\site-packages\anyio\streams\memory.py:87: EndOfStream
      
      During handling of the above exception, another exception occurred:
      
      get_headers = \{'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ2aWFubmV5Lm1peHR1ci5kZXZAZ21haWwuY29tIiwiZXhwIjoxNjc4Mjk3MTYyfQ.B1kMU8g01bQ7FzwirCv4pMam0b-e__xfUjkStKfY_Tk'}
      
          def test_get_brands(get_headers:dict):
              """Testes the get brands static page"""
          
      >       response = client.get("/static/sitemap/brands", headers=get_headers)
      
      test_static.py:27: 
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      ..\venv\Lib\site-packages\starlette\testclient.py:492: in get
          return super().get(
      ..\venv\Lib\site-packages\httpx\_client.py:1045: in get
          return self.request(
      ..\venv\Lib\site-packages\starlette\testclient.py:458: in request
          return super().request(
      ..\venv\Lib\site-packages\httpx\_client.py:821: in request
          return self.send(request, auth=auth, follow_redirects=follow_redirects)
      ..\venv\Lib\site-packages\httpx\_client.py:908: in send
          response = self._send_handling_auth(
      ..\venv\Lib\site-packages\httpx\_client.py:936: in _send_handling_auth
          response = self._send_handling_redirects(
      ..\venv\Lib\site-packages\httpx\_client.py:973: in _send_handling_redirects
          response = self._send_single_request(request)
      ..\venv\Lib\site-packages\httpx\_client.py:1009: in _send_single_request
          response = transport.handle_request(request)
      ..\venv\Lib\site-packages\starlette\testclient.py:337: in handle_request
          raise exc
      ..\venv\Lib\site-packages\starlette\testclient.py:334: in handle_request
          portal.call(self.app, scope, receive, send)
      ..\venv\Lib\site-packages\anyio\from_thread.py:283: in call
          return cast(T_Retval, self.start_task_soon(func, *args).result())
      C:\Python311\Lib\concurrent\futures\_base.py:449: in result
          return self.__get_result()
      C:\Python311\Lib\concurrent\futures\_base.py:401: in __get_result
          raise self._exception
      ..\venv\Lib\site-packages\anyio\from_thread.py:219: in _call_func
          retval = await retval
      ..\venv\Lib\site-packages\fastapi\applications.py:274: in __call__
          await super().__call__(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\applications.py:118: in __call__
          await self.middleware_stack(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\middleware\errors.py:184: in __call__
          raise exc
      ..\venv\Lib\site-packages\starlette\middleware\errors.py:162: in __call__
          await self.app(scope, receive, _send)
      ..\venv\Lib\site-packages\starlette\middleware\cors.py:84: in __call__
          await self.app(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\middleware\base.py:108: in __call__
          response = await self.dispatch_func(request, call_next)
      ..\venv\Lib\site-packages\slowapi\middleware.py:136: in dispatch
          response = await call_next(request)
      ..\venv\Lib\site-packages\starlette\middleware\base.py:84: in call_next
          raise app_exc
      ..\venv\Lib\site-packages\starlette\middleware\base.py:70: in coro
          await self.app(scope, receive_or_disconnect, send_no_error)
      ..\venv\Lib\site-packages\starlette\middleware\exceptions.py:79: in __call__
          raise exc
      ..\venv\Lib\site-packages\starlette\middleware\exceptions.py:68: in __call__
          await self.app(scope, receive, sender)
      ..\venv\Lib\site-packages\fastapi\middleware\asyncexitstack.py:21: in __call__
          raise e
      ..\venv\Lib\site-packages\fastapi\middleware\asyncexitstack.py:18: in __call__
          await self.app(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\routing.py:706: in __call__
          await route.handle(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\routing.py:276: in handle
          await self.app(scope, receive, send)
      ..\venv\Lib\site-packages\starlette\routing.py:66: in app
          response = await func(request)
      ..\venv\Lib\site-packages\fastapi\routing.py:238: in app
          raw_response = await run_endpoint_function(
      ..\venv\Lib\site-packages\fastapi\routing.py:164: in run_endpoint_function
          return await dependant.call(**values)
      ..\app\routers\static.py:56: in get_sitemap_brands
          return await controller.get_sitemap_brands(request.base_url.__str__())
      ..\app\controllers\static.py:210: in get_sitemap_brands
          array, _ = await queries.industry.select_brands(projection=\{"codename", "updated_at"})
      ..\app\models\queries\industry.py:155: in select_brands
          array, total = await select("brands", model=Brand, **kwargs)
      ..\app\models\queries\utils.py:143: in select
          documents = await cursor.to_list(length=CURSOR_LIMIT)
      ..\venv\Lib\site-packages\motor\core.py:1528: in to_list
          self.get_io_loop(), self._get_more(), self._to_list, length, the_list, future
      ..\venv\Lib\site-packages\motor\core.py:1291: in _get_more
          return self._refresh()
      ..\venv\Lib\site-packages\motor\metaprogramming.py:73: in method
          return framework.run_on_executor(
      ..\venv\Lib\site-packages\motor\frameworks\asyncio\__init__.py:74: in run_on_executor
          return loop.run_in_executor(_EXECUTOR, functools.partial(fn, *args, **kwargs))
      C:\Python311\Lib\asyncio\base_events.py:813: in run_in_executor
          self._check_closed()
      _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 
      
      self = <ProactorEventLoop running=False closed=True debug=False>
      
          def _check_closed(self):
              if self._closed:
      >           raise RuntimeError('Event loop is closed')
      E           RuntimeError: Event loop is closed
      
      C:\Python311\Lib\asyncio\base_events.py:519: RuntimeError
      

      The stackstrace is quite long. But the first exception is raised when to_list is called on a cursor.

      It is a concurrency issue as some of the tests that fail when launching all my tests suite don't fail when launching a subset of the tests.

            Assignee:
            julius.park@mongodb.com Julius Park (Inactive)
            Reporter:
            vianney@kompozite.io Vianney Mixtur
            Votes:
            0 Vote for this issue
            Watchers:
            4 Start watching this issue

              Created:
              Updated:
              Resolved: