Консюмер Кафки не пропускает часть сообщения
Консюмер Кафки пропускает часть сообщения. У меня есть консюмер Кафки:
my_consumer = KafkaConsumer(
'events.taxonomy',
bootstrap_servers=['localhost:9092'],
auto_offset_reset='earliest',
enable_auto_commit=True,
group_id='my-group',
value_deserializer=lambda x: loads(x.decode('utf-8'))
)
он читает сообщение из топика Кафки и записывает его в базу данных с помощью SQlAlchemy:
for msg in my_consumer:
kafka_record = msg.value
res = json.loads(kafka_record)
print(res)
p = schemas.EventCreate(event_id=res['event_id'], type=res['type'],
team_1=res['team_1'], team_2=res['team_1'], event_date=res['event_date'], score=res['score'], state=res['state'])
Session.add(p)
Session.commit()
Но когда я передаю значение в базу мне возвращает ошибку:
pydantic.error_wrappers.ValidationError: 1 validation error for EventCreate
event_id
field required (type=value_error.missing)
Когда я посылаю сообщение Кафки в словарь оно содержит значение ''user_id'':
{"_sa_instance_state": "<not serializable>", "event_id": 1101, "type": "string", "team_1": "string", "team_2": "string", "event_date": 0, "score": "string", "state": "string"}
Но когда консюмер читает сообщение почему то ключа со значением ''user_id'' нет:
{'_sa_instance_state': '<not serializable>', 'type': 'string', 'team_1': 'string', 'team_2': 'string', 'event_date': 0, 'score': 'string', 'state': 'string'}
В чем может быть ошибка и как ее исправить? подскажите пожалуйста