Объединение микросервисов через kafka
Пытаюсь написать проект на микросервисной архитектуре на FastAPI + aiokafka, но т.к это как моя первая практика написание микросервисов, и кафку я знаю довольно поверхностно, я не понимаю как их связать между собой. У меня есть 2 микросервиса:
- auth - Для авторизации/аутентификации(написан с помощью fastapi users)
- tracker - "Основной", если так можно сказать.
Дело в том, что при создании объекта в tracker, мне необходимо автоматически присвоить ему user_id, который я должен получать из id текущего пользователя. Но дело в том, что я не до конца понимаю, как мне реализовать эту связь между auth и tracker. Я написал producer, который отправляет текущий user_id в tracker
producer.py
async def send_one(user_id: int):
producer = AIOKafkaProducer(
bootstrap_servers="localhost:9092",
value_serializer=lambda v: json.dumps(v).encode("utf-8")
)
await producer.start()
try:
await producer.send_and_wait("tracker_to_auth", value={"user_id": user_id})
finally:
await producer.stop()
и consumer, который получает этот id
consumer.py
async def consume():
consumer = AIOKafkaConsumer(
"tracker_to_auth",
bootstrap_servers="localhost:9092",
value_deserializer=lambda v: json.loads(v.decode("utf-8"))
)
await consumer.start()
try:
async for msg in consumer:
user_id = msg.value['user_id']
return user_id
finally:
await consumer.stop()
Можете ли подсказать, как мне сделать так, что send_one активировался в момент создании объекта в tracker и автоматически присваил user_id, или что типо того