Как вызвать async функцию с другого месте кода?
Я меня есть роут и функция: Вот они
@app.get('/key_hire_stream')
async def message_stream(
request: Request,
message: str,
user_uuid: str,
session_id: int):
session_str = str(session_id)
context = memcached_client.get(session_str)
if not context:
messages = [{"role": "system", "content": system_info}]
logger.info(f"Initiated a new session. Session: {session_id}. User UUID: {user_uuid}")
else:
messages = ast.literal_eval(context.decode('utf-8'))
def new_messages():
str_response = ""
messages.append({"role": "user", "content": message})
status = False
while not status:
try:
completion = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=0.03,
stream=True,
request_timeout=3)
for num, line in enumerate(completion):
token = line['choices'][0]['delta'].get('content', "")
yield num, token
str_response += token
status = True
except (openai.error.Timeout, openai.error.ServiceUnavailableError) as e:
logger.warning(f"Session {session_id} with ChatGPT error {e} for the user {user_uuid}")
time.sleep(1)
messages.append({"role": "assistant", "content": str_response})
memcached_client.set(session_str, json.dumps(messages, ensure_ascii=False).encode('utf-8'))
if "++++" in str_response:
rows_to_insert = [
{u"session_id": session_id, u"user_uuid": user_uuid, u"messages": json.dumps(messages)}
]
bq_client.insert_rows_json(table_id, rows_to_insert)
memcached_client.delete(session_str)
logger.info(f"Session {session_id} for the user {user_uuid} "
f"removed from cache and added to BigQueryTable {table_id}")
async def event_generator():
stream_gen = new_messages()
while True:
if await request.is_disconnected():
break
try:
item = next(stream_gen)
yield Item(
user_uuid=user_uuid,
session_id=session_id,
message_id=item[0],
token_id=item[0],
message=item[1])
except StopIteration:
return
await asyncio.sleep(STREAM_DELAY)
return EventSourceResponse(event_generator())
Роут вызывает асинхронную функцию. И этот подход работает.
Я хочу эту функцию message_stream задать идин раз и потом вызывать ее в нескольких роутах, меняя только параметры функции.
Вот что я делаю:
async def message_stream(
request: Request,
message: str,
user_uuid: str,
session_id: int):
session_str = str(session_id)
context = memcached_client.get(session_str)
if not context:
messages = [{"role": "system", "content": system_info}]
logger.info(f"Initiated a new session. Session: {session_id}. User UUID: {user_uuid}")
else:
messages = ast.literal_eval(context.decode('utf-8'))
def new_messages():
str_response = ""
messages.append({"role": "user", "content": message})
status = False
while not status:
try:
completion = openai.ChatCompletion.create(
model=model,
messages=messages,
temperature=0.03,
stream=True,
request_timeout=3)
for num, line in enumerate(completion):
token = line['choices'][0]['delta'].get('content', "")
yield num, token
str_response += token
status = True
except (openai.error.Timeout, openai.error.ServiceUnavailableError) as e:
logger.warning(f"Session {session_id} with ChatGPT error {e} for the user {user_uuid}")
time.sleep(1)
messages.append({"role": "assistant", "content": str_response})
memcached_client.set(session_str, json.dumps(messages, ensure_ascii=False).encode('utf-8'))
if "++++" in str_response:
rows_to_insert = [
{u"session_id": session_id, u"user_uuid": user_uuid, u"messages": json.dumps(messages)}
]
bq_client.insert_rows_json(table_id, rows_to_insert)
memcached_client.delete(session_str)
logger.info(f"Session {session_id} for the user {user_uuid} "
f"removed from cache and added to BigQueryTable {table_id}")
async def event_generator():
stream_gen = new_messages()
while True:
if await request.is_disconnected():
break
try:
item = next(stream_gen)
yield Item(
user_uuid=user_uuid,
session_id=session_id,
message_id=item[0],
token_id=item[0],
message=item[1])
except StopIteration:
return
await asyncio.sleep(STREAM_DELAY)
return EventSourceResponse(event_generator())
@app.get('/key_hire_stream')
async def key_hire(request: Request,
message: str,
user_uuid: str,
session_id: int):
await message_stream(request=request, message=message, user_uuid=user_uuid, session_id=session_id)
Т. е. я сначала задаю асинхронную функцию, а только потом ии вызываю.
Но, почему то такой подход не работает.
Когда я отправляю реквест на роут, я получаю 200 ответ, и роут мне возвращает null.
Вопрос: почему в первом случае я получаю нужный респонс, а во втором нет? И как исправить вторую реализацию, чтобы я мог использовать асинхронную функцию в нескольких местах меняя только параметры функции? Благодарю