Как запустить асинхронный код в pytest?
Есть класс, который подключается в rabbitmq и читает очередь. В самом классе есть ассинхронные методы, и стоит задача в pytest держать запущенным этот класс, пока другие тесты отработают.
class RPCServer:
def __init__(self,
uri: str):
self.connection = None
self.channel = None
self.rpc = None
self.uri = uri
self.queue = None
async def start(self):
timeout = 3
for i in range(100):
try:
logger.info(self.uri)
self.connection = await ap.connect_robust(
url=self.uri,
client_properties={"connection_name": "callee"},
)
logger.info(f'Got connection')
break
except ConnectionError:
time.sleep(timeout)
logger.info(f'Sleep timeout {timeout}')
if not self.connection:
error = f'Cannot connect to AMQP URI.'
logger.error(error)
raise RuntimeError(error)
logger.info(self.uri)
self.channel = await self.connection.channel()
self.rpc = await ap.patterns.RPC.create(self.channel)
await self.rpc.register(
'request',
request,
durable=True
)
return self.connection
async def stop(self):
await self.channel.close()
await self.connection.close()
Как сделать чтобы можно было использовать этот класс вот таким образом
@pytest.fixture(scope='session')
async def rpc_server():
server = RPCServer('amqp://rabbitmq:rabbitmq@localhost:5673/')
await server.start()
yield server
Ну или на крайний случай - запустить скрипт, ждать окончания всех тестов - закрыть процесс.