Как кастомизировать Airflow SqlSensor, чтобы он отправил письмо если сенсор выполняется более 1 часа?

То есть надо добавить параметр: timedelta и python_callable для EmailOperator'а (отправку письма), если сенсор ждёт больше таймдельты.

https://airflow.apache.org/docs/apache-airflow/1.10.12/_modules/airflow/sensors/sql_sensor.html#SqlSensor


Ответы (1 шт):

Автор решения: asanisimov

Вы действительно можете пойти этим путем и переопределить его.

Добавляете необходимые атрибуты (timedelta_ и python_callable) и переопределяете исходный метод execute. Вам необходимо операться на started_at.

def execute(self, context):
        started_at = timezone.utcnow()
        if self.reschedule:
            # If reschedule, use first start date of current try
            task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
            if task_reschedules:
                started_at = task_reschedules[0].start_date
        while not self.poke(context):
            if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                # If sensor is in soft fail mode but will be retried then
                # give it a chance and fail with timeout.
                # This gives the ability to set up non-blocking AND soft-fail sensors.
                if self.soft_fail and not context['ti'].is_eligible_to_retry():
                    self._do_skip_downstream_tasks(context)
                    raise AirflowSkipException('Snap. Time is OUT.')
                else:
                    raise AirflowSensorTimeout('Snap. Time is OUT.')
            if self.reschedule:
                reschedule_date = timezone.utcnow() + timedelta(
                    seconds=self.poke_interval)
                raise AirflowRescheduleException(reschedule_date)
            else:
                sleep(self.poke_interval)
        self.log.info("Success criteria met. Exiting.")

При этом есть более логичный способ решить задачу (на мой взгляд). Просто выставить timeout для сенсора. Тогда он будет падать с ошибкой при превышении таймаута.

source code:

if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
                # If sensor is in soft fail mode but will be retried then
                # give it a chance and fail with timeout.
                # This gives the ability to set up non-blocking AND soft-fail sensors.
                if self.soft_fail and not context['ti'].is_eligible_to_retry():
                    self._do_skip_downstream_tasks(context)
                    raise AirflowSkipException('Snap. Time is OUT.')
                else:
                    raise AirflowSensorTimeout('Snap. Time is OUT.')

А далее унаследовать задачу с EmailOperator которая бы запускалась в случае ошибки в родительской задаче.

→ Ссылка