Ошибка: "Unexpected EOF while reading bytes"

Пытаюсь через Airflow произвести манипуляцию c данными в ClickHouse. В ДАГ импортируется кастомный оператор для КликХауса, который, в свою очередь, импортирует Hook и пытается его execute'нуть, что сделать не удаётся и возвращается ошибка:

[2024-10-17, 13:58:20 UTC] {clickhouse.py:79} INFO - TRUNCATE TABLE
reporting.table [2024-10-17, 13:58:50 UTC] {taskinstance.py:2905}
ERROR - Task failed with exception  [2024-10-17, 13:58:50 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 18068 for task clickhouse_issues (Unexpected EOF while reading bytes; 3238).

В Интернете находил информацию, что могут помочь параметры connection_timeout и send_receive_timeout. Поскольку в имеющемся Хуке этих параметров нет, а доступа к исходному коду этого Хука у меня нет, я написал свой, где и добавил эти два параметра — безрезультатно.

Находил использование sleep — то же самое.

Собственно, сам Таск в Даге выглядит так:

load_media_in_clickhouse = ClickHouseOperator(
        task_id='clickhouse_issues',
        clickhouse_conn_id='clickhouse',
        db='reporting',
        pre_execute_sql='TRUNCATE TABLE reporting.issues',
        sql='INSERT INTO reporting.issues SELECT * FROM _postgres_khd.issues'
)

А сам ClickHouseOperator выглядит так:

class ClickHouseOperator(BaseOperator):
    @apply_defaults
    def __init__(self, clickhouse_conn_id: str, sql: str, db: str, pre_execute_sql=None, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.clickhouse_conn_id = clickhouse_conn_id
        self.pre_execute_sql = pre_execute_sql
        self.db = db
        self.sql = sql

    def pre_execute(self, context: Any):
        if self.pre_execute_sql:
            hook = ClickHouseHook(clickhouse_conn_id=self.clickhouse_conn_id, database=self.db)
            hook.execute(self.pre_execute_sql)

    def execute(self, context):
        hook = ClickHouseHook(clickhouse_conn_id=self.clickhouse_conn_id, database=self.db)

        self.log.info(f"Executing query: {self.sql}")
        hook.execute(self.sql)

Порты: использовал 8123, 8124, 9000.

Первый возвращает ошибку:

(Unexpected EOF while reading bytes; 3238)

второй и третий:

connection refused.

Если подключаться напрямую к базе (например, через Dbeaver), то порт — 8123.

UPD: в Хуке есть 2 метода: метод get_conn (где создаётся сущность Client) и метод execute, которые тянутся из библиотеки clickhouse_driver. Проверял по логам — коннект создаётся, а ошибка возникает на Execute.


Ответы (0 шт):