From e057d4307c8e75be2475eca0beec662fa9ea41ff Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Micha=C5=82=20G=C3=B3rny?= Date: Wed, 18 Jun 2025 07:46:37 +0200 Subject: [PATCH] test: Replace deprecated `event_loop` fixture Replace the deprecated `event_loop` fixture with an explicit call to `asyncio.get_running_loop()`, as suggested by the deprecation warnings: ``` test/test_asyncredis.py:61 test/test_asyncredis.py:61: PytestDeprecationWarning: test_pubsub[fake] is asynchronous and explicitly requests the "event_loop" fixture. Asynchronous fixtures and test functions should use "asyncio.get_running_loop()" instead. async def test_pubsub(async_redis, event_loop): ``` This fixes compatibility with `pytest-asyncio >= 1.0.0`. --- test/test_asyncredis.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/test/test_asyncredis.py b/test/test_asyncredis.py index 47e5f180..a824e962 100644 --- a/test/test_asyncredis.py +++ b/test/test_asyncredis.py @@ -58,7 +58,7 @@ async def test_transaction_fail(async_redis: redis.asyncio.Redis): await tr.execute() -async def test_pubsub(async_redis, event_loop): +async def test_pubsub(async_redis): queue = asyncio.Queue() async def reader(ps): @@ -71,7 +71,7 @@ async def reader(ps): async with async_timeout(5), async_redis.pubsub() as ps: await ps.subscribe("channel") - task = event_loop.create_task(reader(ps)) + task = asyncio.get_running_loop().create_task(reader(ps)) await async_redis.publish("channel", "message1") await async_redis.publish("channel", "message2") result1 = await queue.get() @@ -117,14 +117,14 @@ async def test_blocking_timeout(conn): @pytest.mark.slow -async def test_blocking_unblock(async_redis, conn, event_loop): +async def test_blocking_unblock(async_redis, conn): """Blocking command that gets unblocked after some time.""" async def unblock(): await asyncio.sleep(0.1) await async_redis.rpush("list", "y") - task = event_loop.create_task(unblock()) + task = asyncio.get_running_loop().create_task(unblock()) result = await conn.blpop("list", timeout=1) assert result == (b"list", b"y") await task