When launching my tests with pytest I get a RuntimeError : Event loop is closed coming from I call to the database. It was working in python 3.8 with motor 2.5.
Here is my traceback:
request = <starlette.requests.Request object at 0x0000019ADCCEEE10> async def call_next(request: Request) -> Response: app_exc: typing.Optional[Exception] = None send_stream, recv_stream = anyio.create_memory_object_stream() async def receive_or_disconnect() -> Message: if response_sent.is_set(): return \{"type": "http.disconnect"} async with anyio.create_task_group() as task_group: async def wrap(func: typing.Callable[[], typing.Awaitable[T]]) -> T: result = await func() task_group.cancel_scope.cancel() return result task_group.start_soon(wrap, response_sent.wait) message = await wrap(request.receive) if response_sent.is_set(): return \{"type": "http.disconnect"} return message async def close_recv_stream_on_response_sent() -> None: await response_sent.wait() recv_stream.close() async def send_no_error(message: Message) -> None: try: await send_stream.send(message) except anyio.BrokenResourceError: # recv_stream has been closed, i.e. response_sent has been set. return async def coro() -> None: nonlocal app_exc async with send_stream: try: await self.app(scope, receive_or_disconnect, send_no_error) except Exception as exc: app_exc = exc task_group.start_soon(close_recv_stream_on_response_sent) task_group.start_soon(coro) try: > message = await recv_stream.receive() ..\venv\Lib\site-packages\starlette\middleware\base.py:78: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = MemoryObjectReceiveStream(_state=MemoryObjectStreamState(max_buffer_size=0, buffer=deque([]), open_send_channels=0, open_receive_channels=1, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=False) async def receive(self) -> T_Item: await checkpoint() try: > return self.receive_nowait() ..\venv\Lib\site-packages\anyio\streams\memory.py:94: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = MemoryObjectReceiveStream(_state=MemoryObjectStreamState(max_buffer_size=0, buffer=deque([]), open_send_channels=0, open_receive_channels=1, waiting_receivers=OrderedDict(), waiting_senders=OrderedDict()), _closed=False) def receive_nowait(self) -> T_Item: """ Receive the next item if it can be done without waiting. :return: the received item :raises ~anyio.ClosedResourceError: if this send stream has been closed :raises ~anyio.EndOfStream: if the buffer is empty and this stream has been closed from the sending end :raises ~anyio.WouldBlock: if there are no items in the buffer and no tasks waiting to send """ if self._closed: raise ClosedResourceError if self._state.waiting_senders: # Get the item from the next sender send_event, item = self._state.waiting_senders.popitem(last=False) self._state.buffer.append(item) send_event.set() if self._state.buffer: return self._state.buffer.popleft() elif not self._state.open_send_channels: > raise EndOfStream E anyio.EndOfStream ..\venv\Lib\site-packages\anyio\streams\memory.py:87: EndOfStream During handling of the above exception, another exception occurred: get_headers = \{'Authorization': 'Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzdWIiOiJ2aWFubmV5Lm1peHR1ci5kZXZAZ21haWwuY29tIiwiZXhwIjoxNjc4Mjk3MTYyfQ.B1kMU8g01bQ7FzwirCv4pMam0b-e__xfUjkStKfY_Tk'} def test_get_brands(get_headers:dict): """Testes the get brands static page""" > response = client.get("/static/sitemap/brands", headers=get_headers) test_static.py:27: _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ ..\venv\Lib\site-packages\starlette\testclient.py:492: in get return super().get( ..\venv\Lib\site-packages\httpx\_client.py:1045: in get return self.request( ..\venv\Lib\site-packages\starlette\testclient.py:458: in request return super().request( ..\venv\Lib\site-packages\httpx\_client.py:821: in request return self.send(request, auth=auth, follow_redirects=follow_redirects) ..\venv\Lib\site-packages\httpx\_client.py:908: in send response = self._send_handling_auth( ..\venv\Lib\site-packages\httpx\_client.py:936: in _send_handling_auth response = self._send_handling_redirects( ..\venv\Lib\site-packages\httpx\_client.py:973: in _send_handling_redirects response = self._send_single_request(request) ..\venv\Lib\site-packages\httpx\_client.py:1009: in _send_single_request response = transport.handle_request(request) ..\venv\Lib\site-packages\starlette\testclient.py:337: in handle_request raise exc ..\venv\Lib\site-packages\starlette\testclient.py:334: in handle_request portal.call(self.app, scope, receive, send) ..\venv\Lib\site-packages\anyio\from_thread.py:283: in call return cast(T_Retval, self.start_task_soon(func, *args).result()) C:\Python311\Lib\concurrent\futures\_base.py:449: in result return self.__get_result() C:\Python311\Lib\concurrent\futures\_base.py:401: in __get_result raise self._exception ..\venv\Lib\site-packages\anyio\from_thread.py:219: in _call_func retval = await retval ..\venv\Lib\site-packages\fastapi\applications.py:274: in __call__ await super().__call__(scope, receive, send) ..\venv\Lib\site-packages\starlette\applications.py:118: in __call__ await self.middleware_stack(scope, receive, send) ..\venv\Lib\site-packages\starlette\middleware\errors.py:184: in __call__ raise exc ..\venv\Lib\site-packages\starlette\middleware\errors.py:162: in __call__ await self.app(scope, receive, _send) ..\venv\Lib\site-packages\starlette\middleware\cors.py:84: in __call__ await self.app(scope, receive, send) ..\venv\Lib\site-packages\starlette\middleware\base.py:108: in __call__ response = await self.dispatch_func(request, call_next) ..\venv\Lib\site-packages\slowapi\middleware.py:136: in dispatch response = await call_next(request) ..\venv\Lib\site-packages\starlette\middleware\base.py:84: in call_next raise app_exc ..\venv\Lib\site-packages\starlette\middleware\base.py:70: in coro await self.app(scope, receive_or_disconnect, send_no_error) ..\venv\Lib\site-packages\starlette\middleware\exceptions.py:79: in __call__ raise exc ..\venv\Lib\site-packages\starlette\middleware\exceptions.py:68: in __call__ await self.app(scope, receive, sender) ..\venv\Lib\site-packages\fastapi\middleware\asyncexitstack.py:21: in __call__ raise e ..\venv\Lib\site-packages\fastapi\middleware\asyncexitstack.py:18: in __call__ await self.app(scope, receive, send) ..\venv\Lib\site-packages\starlette\routing.py:706: in __call__ await route.handle(scope, receive, send) ..\venv\Lib\site-packages\starlette\routing.py:276: in handle await self.app(scope, receive, send) ..\venv\Lib\site-packages\starlette\routing.py:66: in app response = await func(request) ..\venv\Lib\site-packages\fastapi\routing.py:238: in app raw_response = await run_endpoint_function( ..\venv\Lib\site-packages\fastapi\routing.py:164: in run_endpoint_function return await dependant.call(**values) ..\app\routers\static.py:56: in get_sitemap_brands return await controller.get_sitemap_brands(request.base_url.__str__()) ..\app\controllers\static.py:210: in get_sitemap_brands array, _ = await queries.industry.select_brands(projection=\{"codename", "updated_at"}) ..\app\models\queries\industry.py:155: in select_brands array, total = await select("brands", model=Brand, **kwargs) ..\app\models\queries\utils.py:143: in select documents = await cursor.to_list(length=CURSOR_LIMIT) ..\venv\Lib\site-packages\motor\core.py:1528: in to_list self.get_io_loop(), self._get_more(), self._to_list, length, the_list, future ..\venv\Lib\site-packages\motor\core.py:1291: in _get_more return self._refresh() ..\venv\Lib\site-packages\motor\metaprogramming.py:73: in method return framework.run_on_executor( ..\venv\Lib\site-packages\motor\frameworks\asyncio\__init__.py:74: in run_on_executor return loop.run_in_executor(_EXECUTOR, functools.partial(fn, *args, **kwargs)) C:\Python311\Lib\asyncio\base_events.py:813: in run_in_executor self._check_closed() _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ self = <ProactorEventLoop running=False closed=True debug=False> def _check_closed(self): if self._closed: > raise RuntimeError('Event loop is closed') E RuntimeError: Event loop is closed C:\Python311\Lib\asyncio\base_events.py:519: RuntimeError
The stackstrace is quite long. But the first exception is raised when to_list is called on a cursor.
It is a concurrency issue as some of the tests that fail when launching all my tests suite don't fail when launching a subset of the tests.
- backported by
-
MOTOR-1054 motor.motor_asyncio is broken for Python 3.11
- Closed