Консюмер Кафки не пропускает часть сообщения

Консюмер Кафки пропускает часть сообщения. У меня есть консюмер Кафки:

my_consumer = KafkaConsumer(
'events.taxonomy',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)

он читает сообщение из топика Кафки и записывает его в базу данных с помощью SQlAlchemy:

for msg in my_consumer:
kafka_record = msg.value
res = json.loads(kafka_record)
print(res)
p = schemas.EventCreate(event_id=res['event_id'], type=res['type'],  
team_1=res['team_1'], team_2=res['team_1'], event_date=res['event_date'], score=res['score'], state=res['state'])
Session.add(p)
Session.commit()

Но когда я передаю значение в базу мне возвращает ошибку:

pydantic.error_wrappers.ValidationError: 1 validation error for EventCreate
event_id
field required (type=value_error.missing)

Когда я посылаю сообщение Кафки в словарь оно содержит значение ''user_id'':

{"_sa_instance_state": "<not serializable>", "event_id": 1101, "type": "string", "team_1": "string", "team_2": "string", "event_date": 0, "score": "string", "state": "string"}

Но когда консюмер читает сообщение почему то ключа со значением ''user_id'' нет:

{'_sa_instance_state': '<not serializable>', 'type': 'string', 'team_1': 'string', 'team_2': 'string', 'event_date': 0, 'score': 'string', 'state': 'string'}

В чем может быть ошибка и как ее исправить? подскажите пожалуйста


Ответы (0 шт):