Ошибка: "Unexpected EOF while reading bytes"
Пытаюсь через Airflow произвести манипуляцию c данными в ClickHouse. В ДАГ импортируется кастомный оператор для КликХауса, который, в свою очередь, импортирует Hook и пытается его execute'нуть, что сделать не удаётся и возвращается ошибка:
[2024-10-17, 13:58:20 UTC] {clickhouse.py:79} INFO - TRUNCATE TABLE
reporting.table [2024-10-17, 13:58:50 UTC] {taskinstance.py:2905}
ERROR - Task failed with exception [2024-10-17, 13:58:50 UTC] {standard_task_runner.py:110} ERROR - Failed to execute job 18068 for task clickhouse_issues (Unexpected EOF while reading bytes; 3238).
В Интернете находил информацию, что могут помочь параметры connection_timeout
и send_receive_timeout
. Поскольку в имеющемся Хуке этих параметров нет, а доступа к исходному коду этого Хука у меня нет, я написал свой, где и добавил эти два параметра — безрезультатно.
Находил использование sleep
— то же самое.
Собственно, сам Таск в Даге выглядит так:
load_media_in_clickhouse = ClickHouseOperator(
task_id='clickhouse_issues',
clickhouse_conn_id='clickhouse',
db='reporting',
pre_execute_sql='TRUNCATE TABLE reporting.issues',
sql='INSERT INTO reporting.issues SELECT * FROM _postgres_khd.issues'
)
А сам ClickHouseOperator
выглядит так:
class ClickHouseOperator(BaseOperator):
@apply_defaults
def __init__(self, clickhouse_conn_id: str, sql: str, db: str, pre_execute_sql=None, *args, **kwargs):
super().__init__(*args, **kwargs)
self.clickhouse_conn_id = clickhouse_conn_id
self.pre_execute_sql = pre_execute_sql
self.db = db
self.sql = sql
def pre_execute(self, context: Any):
if self.pre_execute_sql:
hook = ClickHouseHook(clickhouse_conn_id=self.clickhouse_conn_id, database=self.db)
hook.execute(self.pre_execute_sql)
def execute(self, context):
hook = ClickHouseHook(clickhouse_conn_id=self.clickhouse_conn_id, database=self.db)
self.log.info(f"Executing query: {self.sql}")
hook.execute(self.sql)
Порты: использовал 8123, 8124, 9000.
Первый возвращает ошибку:
(Unexpected EOF while reading bytes; 3238)
второй и третий:
connection refused.
Если подключаться напрямую к базе (например, через Dbeaver), то порт — 8123.
UPD: в Хуке есть 2 метода: метод get_conn
(где создаётся сущность Client
) и метод execute
, которые тянутся из библиотеки clickhouse_driver
. Проверял по логам — коннект создаётся, а ошибка возникает на Execute
.