Airflow Worker не работает при использовании CeleryExecutor

Пытаюсь перейти с LocalExecutor на CeleryExecutor в Airflow 2.1.3 при помощи Docker. Очередь - redis. Сделал отдельные контейнеры для webserver, scheduler, worker, redis и БД. Проблема: задачи становятся в очередь, но не выполняются.

docker-compose.yml:

version: "3.3"
 
services:
  redis:
    image: redis:6.0.9-alpine
    healthcheck:
      test: ["CMD", "redis-cli", "ping"]
      interval: 5s
      timeout: 30s
      retries: 50
    restart: always
    ports:
      - "6793:6793"
  
  database:
    image: postgres:12-alpine
    restart: always
    environment:
      POSTGRES_DB: airflow
      POSTGRES_USER: airflow
      POSTGRES_PASSWORD: airflow
    volumes:
      - airflow_database:/var/lib/postgresql/data/
 
  webserver:
    image: airflow:latest
    restart: always
    depends_on:
      - database
      - redis
    environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@database/airflow
      GUNICORN_CMD_ARGS: --log-level WARNING
      EXECUTOR: Celery
    volumes:
      - airflow_logs:/var/log/airflow
      - airflow_data:/var/spool/airflow
      - ./airflow/dags:/usr/local/airflow/dags
      - ./airflow/plugins:/usr/local/airflow/plugins
    ports:
      - 8080:8080
      - 8888:8888
    command: webserver
    healthcheck:
      test: ["CMD-SHELL", "[ -f /usr/local/airflow/airflow-webserver.pid ]"]
      interval: 30s
      timeout: 30s
      retries: 3
  
  flower:
    image: airflow:latest
    restart: always
    depends_on:
      - redis
    environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@database/airflow
      EXECUTOR: Celery
    ports:
      - "5555:5555"
    command: celery flower -b "redis://redis:6379/1"
  
  scheduler:
    image: airflow:latest
    restart: always
    depends_on:
      - webserver
    volumes:
      - airflow_logs:/var/log/airflow
      - airflow_data:/var/spool/airflow
      - ./airflow/dags:/usr/local/airflow/dags
      - ./airflow/plugins:/usr/local/airflow/plugins
    environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@database/airflow
      EXECUTOR: Celery
    command: scheduler
  
  worker:
    image: airflow:latest
    restart: always
    depends_on:
      - scheduler
    volumes:
      - airflow_logs:/var/log/airflow
      - airflow_data:/var/spool/airflow
      - ./airflow/dags:/usr/local/airflow/dags
      - ./airflow/plugins:/usr/local/airflow/plugins
    environment:
      AIRFLOW__CORE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@database/airflow
      EXECUTOR: Celery
    healthcheck:
      test:
        - "CMD-SHELL"
        - 'celery --app airflow.executors.celery_executor.app inspect ping -d "celery@$${HOSTNAME}"'
      interval: 10s
      timeout: 10s
      retries: 5
    command: celery worker -b "redis://redis:6379/1" --result-backend "db+postgresql://airflow:airflow@database/airflow"

volumes:
  airflow_database:
  airflow_data:
  airflow_logs:
  staging_database:

entrypoint.sh

#!/bin/sh

: "${AIRFLOW_HOME:="/usr/local/airflow"}"
: "${AIRFLOW__CORE__EXECUTOR:=${EXECUTOR:-Celery}Executor}"

export \
  AIRFLOW_HOME \
  AIRFLOW__CORE__EXECUTOR \
  AIRFLOW__CORE__SQL_ALCHEMY_CONN

DATABASE=$(echo "$AIRFLOW__CORE__SQL_ALCHEMY_CONN" | cut -d '/' -f 3 | cut -d '@' -f 2)
while ! nc -z $DATABASE 5432; do
  sleep 3
done

EMPTY=$(python -c "\
import sqlalchemy as sa;\
engine = sa.create_engine('$AIRFLOW__CORE__SQL_ALCHEMY_CONN');\
tables = sa.inspect(engine).get_table_names();
if tables == []: print('empty')\
")

case "$1" in
  webserver)
    if [ $EMPTY ]; then
      echo "Init database"
      airflow db init
      airflow users create --username admin --firstname admin --lastname admin --role Admin --email [email protected] --password admin
    fi
    exec airflow webserver
    ;;
  worker|scheduler)
    sleep 10
    exec airflow "$@"
    ;;
  flower)
    sleep 10
    exec airflow "$@"
    ;;
  version)
    exec airflow "$@"
    ;;
  *)
    exec "$@"
    ;;
esac

python ./plugins/utils/converter.py

Dockerfile и airflow.cfg

Все контейнеры запускаются нормально. Пробовал сделать celery_result_backend == broker_url == 'redis://redis:6379/1' (как вот здесь), но безрезультатно. Flower показывает, что сам worker создан, но контейнер worker не показывает ни одной строки логов. Пробовал также запустить контейнер worker отдельно - не помогло.Flower


Ответы (0 шт):