Не получается записать и прочитать данные в kafka через airflow
Хотел написать даг который сперва пишет данные, а затем вычитывает их из Airflow. В результате даг выполняется успешно, но данные не записываются и не читаются.
Вот код дага:
from datetime import datetime, timedelta
from confluent_kafka import Producer
from confluent_kafka import Consumer
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
import logging
import json
default_args = {
'owner' : 'observersilent',
'retries' : 1,
'retry_delay' : timedelta(minutes=5)
}
config = {
'bootstrap.servers': 'localhost:9091',
'group.id': 'mygroup',
'auto.offset.reset': 'earliest'
}
topic = 'test'
def _produce_function():
producer = Producer(config)
producer.produce(topic, key="test_key", value = 'test_value')
producer.flush()
def _consume_function():
consumer = Consumer(config)
topics = [topic]
consumer.subscribe(topics)
while True:
msg = consumer.poll(10.0)
if msg is None:
break
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
value = msg.value()
print(f"Received message: {value}")
consumer.close()
with DAG(
default_args=default_args,
dag_id='dag_with_kafka_operator',
description='Our first dag using kafka operator',
start_date=datetime(2021,10,6),
schedule_interval='@once'
) as dag:
task1 = PythonOperator(task_id = "produce_function",
python_callable = _produce_function)
task2 = PythonOperator(task_id = "consume_function",
python_callable = _consume_function)
task1 >> task2
Что я делаю не так? В логе пишет что все успешно, но вывода нету