The docs for watch() incorrectly call ChangeStream.close():
try: loop.run_forever() except KeyboardInterrupt: pass finally: if change_stream is not None: change_stream.close()
change_stream.close() is an async method so calling it without await leaks a coroutine. Really it should work via task cancellation, not through KeyboardInterrupt.