Как вызвать async функцию с другого месте кода?

Я меня есть роут и функция: Вот они

@app.get('/key_hire_stream')
async def message_stream(
        request: Request,
        message: str,
        user_uuid: str,
        session_id: int):

    session_str = str(session_id)
    context = memcached_client.get(session_str)
    if not context:
        messages = [{"role": "system", "content": system_info}]
        logger.info(f"Initiated a new session. Session:  {session_id}. User UUID: {user_uuid}")
    else:
        messages = ast.literal_eval(context.decode('utf-8'))

    def new_messages():
        str_response = ""
        messages.append({"role": "user", "content": message})

        status = False
        while not status:
            try:
                completion = openai.ChatCompletion.create(
                    model=model,
                    messages=messages,
                    temperature=0.03,
                    stream=True,
                    request_timeout=3)
                for num, line in enumerate(completion):
                    token = line['choices'][0]['delta'].get('content', "")
                    yield num, token
                    str_response += token
                status = True

            except (openai.error.Timeout, openai.error.ServiceUnavailableError) as e:
                logger.warning(f"Session {session_id} with ChatGPT error {e} for the user {user_uuid}")
                time.sleep(1)

        messages.append({"role": "assistant", "content": str_response})
        memcached_client.set(session_str, json.dumps(messages, ensure_ascii=False).encode('utf-8'))
        if "++++" in str_response:
            rows_to_insert = [
                {u"session_id": session_id, u"user_uuid": user_uuid, u"messages": json.dumps(messages)}
            ]
            bq_client.insert_rows_json(table_id, rows_to_insert)
            memcached_client.delete(session_str)
            logger.info(f"Session {session_id} for the user {user_uuid} "
                        f"removed from cache and added to BigQueryTable {table_id}")

    async def event_generator():
        stream_gen = new_messages()
        while True:
            if await request.is_disconnected():
                break
            try:
                item = next(stream_gen)
                yield Item(
                    user_uuid=user_uuid,
                    session_id=session_id,
                    message_id=item[0],
                    token_id=item[0],
                    message=item[1])
            except StopIteration:
                return

            await asyncio.sleep(STREAM_DELAY)

    return EventSourceResponse(event_generator())

Роут вызывает асинхронную функцию. И этот подход работает.

Я хочу эту функцию message_stream задать идин раз и потом вызывать ее в нескольких роутах, меняя только параметры функции.

Вот что я делаю:

async def message_stream(
        request: Request,
        message: str,
        user_uuid: str,
        session_id: int):

    session_str = str(session_id)
    context = memcached_client.get(session_str)
    if not context:
        messages = [{"role": "system", "content": system_info}]
        logger.info(f"Initiated a new session. Session:  {session_id}. User UUID: {user_uuid}")
    else:
        messages = ast.literal_eval(context.decode('utf-8'))

    def new_messages():
        str_response = ""
        messages.append({"role": "user", "content": message})

        status = False
        while not status:
            try:
                completion = openai.ChatCompletion.create(
                    model=model,
                    messages=messages,
                    temperature=0.03,
                    stream=True,
                    request_timeout=3)
                for num, line in enumerate(completion):
                    token = line['choices'][0]['delta'].get('content', "")
                    yield num, token
                    str_response += token
                status = True

            except (openai.error.Timeout, openai.error.ServiceUnavailableError) as e:
                logger.warning(f"Session {session_id} with ChatGPT error {e} for the user {user_uuid}")
                time.sleep(1)

        messages.append({"role": "assistant", "content": str_response})
        memcached_client.set(session_str, json.dumps(messages, ensure_ascii=False).encode('utf-8'))
        if "++++" in str_response:
            rows_to_insert = [
                {u"session_id": session_id, u"user_uuid": user_uuid, u"messages": json.dumps(messages)}
            ]
            bq_client.insert_rows_json(table_id, rows_to_insert)
            memcached_client.delete(session_str)
            logger.info(f"Session {session_id} for the user {user_uuid} "
                        f"removed from cache and added to BigQueryTable {table_id}")

    async def event_generator():
        stream_gen = new_messages()
        while True:
            if await request.is_disconnected():
                break
            try:
                item = next(stream_gen)
                yield Item(
                    user_uuid=user_uuid,
                    session_id=session_id,
                    message_id=item[0],
                    token_id=item[0],
                    message=item[1])
            except StopIteration:
                return

            await asyncio.sleep(STREAM_DELAY)

    return EventSourceResponse(event_generator())

@app.get('/key_hire_stream')
async def key_hire(request: Request,
        message: str,
        user_uuid: str,
        session_id: int):
    await message_stream(request=request, message=message, user_uuid=user_uuid, session_id=session_id) 

Т. е. я сначала задаю асинхронную функцию, а только потом ии вызываю. Но, почему то такой подход не работает. Когда я отправляю реквест на роут, я получаю 200 ответ, и роут мне возвращает null.

Вопрос: почему в первом случае я получаю нужный респонс, а во втором нет? И как исправить вторую реализацию, чтобы я мог использовать асинхронную функцию в нескольких местах меняя только параметры функции? Благодарю


Ответы (0 шт):