Объединение микросервисов через kafka

Пытаюсь написать проект на микросервисной архитектуре на FastAPI + aiokafka, но т.к это как моя первая практика написание микросервисов, и кафку я знаю довольно поверхностно, я не понимаю как их связать между собой. У меня есть 2 микросервиса:

  1. auth - Для авторизации/аутентификации(написан с помощью fastapi users)
  2. tracker - "Основной", если так можно сказать.

Дело в том, что при создании объекта в tracker, мне необходимо автоматически присвоить ему user_id, который я должен получать из id текущего пользователя. Но дело в том, что я не до конца понимаю, как мне реализовать эту связь между auth и tracker. Я написал producer, который отправляет текущий user_id в tracker

producer.py

async def send_one(user_id: int):
   producer = AIOKafkaProducer(
       bootstrap_servers="localhost:9092",
       value_serializer=lambda v: json.dumps(v).encode("utf-8")
   )
   await producer.start()
   try:
       await producer.send_and_wait("tracker_to_auth", value={"user_id": user_id})
   finally:
       await producer.stop()

и consumer, который получает этот id

consumer.py

async def consume():
   consumer = AIOKafkaConsumer(
       "tracker_to_auth",
       bootstrap_servers="localhost:9092",
       value_deserializer=lambda v: json.loads(v.decode("utf-8"))
   )
   await consumer.start()
   try:
       async for msg in consumer:
           user_id = msg.value['user_id']
           return user_id
   finally:
       await consumer.stop()

Можете ли подсказать, как мне сделать так, что send_one активировался в момент создании объекта в tracker и автоматически присваил user_id, или что типо того


Ответы (0 шт):