Не получается записать и прочитать данные в kafka через airflow

Хотел написать даг который сперва пишет данные, а затем вычитывает их из Airflow. В результате даг выполняется успешно, но данные не записываются и не читаются.

Вот код дага:

from datetime import datetime, timedelta
from confluent_kafka import Producer
from confluent_kafka import Consumer

from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.providers.apache.kafka.operators.produce import ProduceToTopicOperator
from airflow.providers.apache.kafka.operators.consume import ConsumeFromTopicOperator
from airflow.providers.apache.kafka.sensors.kafka import AwaitMessageSensor
import logging
import json

default_args = {
    'owner' : 'observersilent',
    'retries' : 1,
    'retry_delay' : timedelta(minutes=5)
}


config = {
    'bootstrap.servers': 'localhost:9091',
    'group.id': 'mygroup',
    'auto.offset.reset': 'earliest'
}

topic = 'test'

def _produce_function():
  producer = Producer(config)
  producer.produce(topic, key="test_key", value = 'test_value')
  producer.flush()
 

def _consume_function():
  consumer = Consumer(config)
  topics = [topic]
  consumer.subscribe(topics)

  while True:
    msg = consumer.poll(10.0) 
    
    if msg is None:
      break
    if msg.error():
      print(f"Consumer error: {msg.error()}")
      continue

    value = msg.value()
    print(f"Received message: {value}")

  consumer.close()


with DAG(
    default_args=default_args,
    dag_id='dag_with_kafka_operator',
    description='Our first dag using kafka operator',
    start_date=datetime(2021,10,6),
    schedule_interval='@once'
) as dag:
    task1 = PythonOperator(task_id = "produce_function",
                               python_callable = _produce_function)
    
    task2 = PythonOperator(task_id = "consume_function",
                   python_callable = _consume_function)
    
    
    task1 >> task2

Что я делаю не так? В логе пишет что все успешно, но вывода нету


Ответы (0 шт):