Как кастомизировать Airflow SqlSensor, чтобы он отправил письмо если сенсор выполняется более 1 часа?
То есть надо добавить параметр: timedelta и python_callable для EmailOperator'а (отправку письма), если сенсор ждёт больше таймдельты.
Ответы (1 шт):
Автор решения: asanisimov
→ Ссылка
Вы действительно можете пойти этим путем и переопределить его.
Добавляете необходимые атрибуты (timedelta_ и python_callable) и переопределяете исходный метод execute. Вам необходимо операться на started_at.
def execute(self, context):
started_at = timezone.utcnow()
if self.reschedule:
# If reschedule, use first start date of current try
task_reschedules = TaskReschedule.find_for_task_instance(context['ti'])
if task_reschedules:
started_at = task_reschedules[0].start_date
while not self.poke(context):
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
# give it a chance and fail with timeout.
# This gives the ability to set up non-blocking AND soft-fail sensors.
if self.soft_fail and not context['ti'].is_eligible_to_retry():
self._do_skip_downstream_tasks(context)
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
if self.reschedule:
reschedule_date = timezone.utcnow() + timedelta(
seconds=self.poke_interval)
raise AirflowRescheduleException(reschedule_date)
else:
sleep(self.poke_interval)
self.log.info("Success criteria met. Exiting.")
При этом есть более логичный способ решить задачу (на мой взгляд). Просто выставить timeout для сенсора. Тогда он будет падать с ошибкой при превышении таймаута.
source code:
if (timezone.utcnow() - started_at).total_seconds() > self.timeout:
# If sensor is in soft fail mode but will be retried then
# give it a chance and fail with timeout.
# This gives the ability to set up non-blocking AND soft-fail sensors.
if self.soft_fail and not context['ti'].is_eligible_to_retry():
self._do_skip_downstream_tasks(context)
raise AirflowSkipException('Snap. Time is OUT.')
else:
raise AirflowSensorTimeout('Snap. Time is OUT.')
А далее унаследовать задачу с EmailOperator которая бы запускалась в случае ошибки в родительской задаче.