Airflow Flashcards

1
Q

Что знаешь про архитектуру Airflow? Как устроена связь компонентов ? Как развернуто?

Теги: #okko, #x5

A

Архитектура Apache Airflow и взаимодействие компонентов

Apache Airflow — это платформа для оркестрации рабочих процессов, которая состоит из нескольких ключевых компонентов:

Metadata DB — центральное хранилище состояния DAG'ов, задач, их статусов и метаданных.
Scheduler — отвечает за чтение расписания DAG'ов и определение, какие задачи должны быть запущены и когда. Например, если задача обработки данных настроена на ежедневный запуск в 11:00, планировщик автоматически создаст ее экземпляр.
Webserver — веб-интерфейс для мониторинга и управления DAG'ами, позволяет запускать задачи вручную, отслеживать логи и настраивать параметры.
Trigger — управляет выполнением отложенных задач. В базовой конфигурации Airflow триггеры не используются, но их можно подключить при необходимости.
Worker(ы) — исполняют задачи, назначенные Scheduler’ом. Их количество и способ работы зависят от типа Executor'а:
    В LocalExecutor и SequentialExecutor worker'ы являются частью процесса.
    В CeleryExecutor они могут работать в отдельных процессах или даже на разных серверах.
Executor — определяет, как именно будут выполняться задачи. Airflow поддерживает несколько типов исполнителей, позволяя настраивать распределенное или локальное выполнение.

Как устроена связь между компонентами?

Scheduler взаимодействует с Metadata DB, определяя, какие задачи необходимо запустить.
Webserver получает данные от Metadata DB и отображает их в UI.
Executor забирает задачи из Scheduler и передает их Worker'ам.
Worker'ы исполняют задачи и записывают результат в Metadata DB.
Если используется CeleryExecutor, взаимодействие между Scheduler и Worker'ами происходит через брокер сообщений (например, Redis или RabbitMQ).

Как развернуть Airflow в продакшене?

Обычно в проде используется CeleryExecutor или KubernetesExecutor, что позволяет масштабировать выполнение задач. Компоненты развертываются следующим образом:

Metadata DB — PostgreSQL/MySQL, хранит состояние DAG'ов и задач.
Scheduler — отдельный сервис, который считывает расписание и запускает задачи.
Webserver — развертывается как отдельный сервис (обычно Gunicorn+Flask).
Worker'ы — выполняют задачи, работают в отдельных контейнерах/серверных процессах.
    В случае CeleryExecutor используются Redis/RabbitMQ как брокер сообщений.
    В KubernetesExecutor каждый worker создается как pod в K8s.
Executor — управляет распределением задач между worker'ами.
Triggerer (если используются отложенные задачи) — запускает их при срабатывании условий.
Logs — хранятся либо в локальной ФС worker'ов, либо в S3, GCS, HDFS.

Способы развертывания:

Docker Compose (для небольших кластеров)
Kubernetes (Helm Chart) — самый популярный вариант в проде
Systemd/Supervisor (если развертывание на bare-metal)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Какие есть виды executors и как они работают?

Теги: #Ярослав #okko, #wildberries, #x5

A

В конфигурационном файле Airflow (airflow.cfg) или через переменные окружения в docker container/compose можно указать, какой executor использовать для планирования и выполнения задач.

  1. SequentialExecutor
    * Суть: Запускает задачи строго по очереди (по одной за раз).
    SequentialExecutor – Как он работает?

1️⃣ Scheduler берет задачу из DAG и исполняет её последовательно.
2️⃣ Одна задача за раз – следующая задача выполняется только после завершения предыдущей.
3️⃣ Все метаданные записываются в PostgreSQL/MySQL.

  • Плюсы:
    o Проще всего настроить.
    o Идеален для отладки и тестирования.
  • Минусы:
    o Нет параллелизма → низкая производительность.
  • Когда использовать:
    o Для локальной разработки и небольших проектов без большого числа задач.
    ________________________________________
    2. LocalExecutor
  • Суть: Параллельное выполнение на одной машине (через потоки или процессы).
  • Как работает LocalExecutor?
    1️⃣ Scheduler анализирует DAG и запускает задачи.
    2️⃣ Для выполнения задач используется локальный пул процессов (multiprocessing Pool).
    3️⃣ Процессы выполняются параллельно, используя ресурсы одного сервера.
    4️⃣ Результаты выполнения записываются в базу данных Airflow.

📌 LocalExecutor отлично подходит для запуска DAG’ов на одном сервере без сложных настроек.
* Плюсы:
o Даёт параллелизм без дополнительных сервисов.
o Простая настройка (нужен только один сервер).
* Минусы:
o Упирается в ресурсы одной машины.
* Когда использовать:
o Средние проекты, где один сервер достаточен по ресурсам.
________________________________________
3. CeleryExecutor
* Суть: Celery Executor — это механизм выполнения задач в Apache Airflow, который позволяет распределять выполнение задач DAG по нескольким воркерам. Это делает его хорошим выбором для масштабируемых и отказоустойчивых систем.

  • Как это работает?o Scheduler добавляет задачи в очередь сообщений (например, Redis или RabbitMQ).
    o Celery Worker’ы подписаны на эту очередь и периодически запрашивают новые задачи.
    o Когда воркер получает задачу, он исполняет её и возвращает результат.
    o Каждая задача выполняется локально на воркере, но воркеров может быть несколько и на разных серверах.
    o Метаданные о статусе задачи хранятся в базе данных Airflow.
    o Scheduler отслеживает состояние задач и триггерит зависимости между ними.
  • Основные компоненты:
    o Scheduler – распределяет задачи по очереди сообщений.
    o Broker (Redis/RabbitMQ) – передаёт задачи воркерам.
    o Workers – исполняют задачи.
    o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах, статусах задач.
    o Web Server – UI для мониторинга.
  • Когда использовать:
    o Если нужно масштабируемое распределение нагрузки.
    o Если задачи требуют разных вычислительных мощностей.
    o Если хочется автоматически перераспределять упавшие задачи
    ________________________________________
    4. KubernetesExecutor
  • KubernetesExecutor — это механизм выполнения задач в Apache Airflow, который запускает каждую задачу в отдельном Pod’е в Kubernetes. Это делает его самым масштабируемым и гибким вариантом среди всех Airflow Executors.
  • Как это работает?
    1️⃣ Scheduler создаёт задачу и отправляет её в API Kubernetes.
    2️⃣ Kubernetes создаёт новый Pod, в котором запускается нужный оператор (например, PythonOperator).
    3️⃣ Задача выполняется внутри Pod’а.
    4️⃣ По завершении задачи Pod автоматически удаляется.
    5️⃣ Scheduler получает статус выполнения и триггерит зависимости.

🔥 Главное отличие от CeleryExecutor — нет долгоживущих воркеров! Каждая задача исполняется в новом Pod’е.
* Основные компоненты
o Scheduler – отправляет задачи в Kubernetes API.
o Kubernetes API – создаёт и управляет Pod’ами.
o Worker Pod’ы – выполняют задачи DAG.
o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах, статусах задач.
o Web Server – UI для мониторинга.
* Когда использовать:
o Когда нужно горизонтальное масштабирование (лучше, чем CeleryExecutor).
o Когда задачи требуют разного объёма ресурсов (можно задать CPU/RAM на уровне Pod’а).
o Когда нужна высокая отказоустойчивость.
o Когда Airflow уже работает в Kubernetes (иначе проще использовать CeleryExecutor).
________________________________________
5. DaskExecutor
* Суть: DaskExecutor — это механизм выполнения задач в Apache Airflow, который использует кластер Dask для распределённого выполнения задач.

Этот Executor полезен, если у тебя уже есть кластер Dask и ты хочешь использовать его возможности для выполнения DAG’ов.
* Как это работает?
1️⃣ Scheduler отправляет задачу в кластер Dask.
2️⃣ Dask запускает Task на одном из доступных воркеров.
3️⃣ После выполнения результата возвращаются в Airflow.
4️⃣ Метаданные хранятся в БД Airflow.

🔥 В отличие от CeleryExecutor, DaskExecutor не требует очереди сообщений (Redis/RabbitMQ). Он напрямую отправляет задачи в кластер Dask.

  • Основные компоненты
    o Scheduler – отправляет задачи в кластер Dask.
    o Dask Cluster – выполняет задачи DAG’ов.
    o Dask Scheduler – управляет распределением задач по воркерам.
    o Dask Workers – исполняют задачи DAG’ов.
    o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах.
    o Web Server – UI для мониторинга DAG’ов.
  • Когда использовать:
    o Если у тебя уже есть кластер Dask.
    o Если задачи DAG’ов требуют многочисленных распределённых вычислений.
    o Если нужно гибкое управление ресурсами (CPU/GPU).
    o Если ты не хочешь использовать CeleryExecutor с Redis/RabbitMQ.
    ________________________________________
    6. DebugExecutor
  • Суть: DebugExecutor — это самый простой и удобный способ отлаживать DAG’и в локальном окружении без запуска Scheduler’а, Worker’ов или БД.

🔥 Главное отличие от других Executors:

Задачи выполняются синхронно и последовательно, прямо в текущем процессе (без фоновых процессов, очередей или Kubernetes).
Результаты сразу выводятся в консоль. *	 Как это работает? 1️⃣ DAG запускается как обычный Python-скрипт. 2️⃣ Scheduler не требуется – задачи исполняются сразу. 3️⃣ БД не нужна – нет записи метаданных. 4️⃣ Результаты сразу видны в консоли (stdout). 🔥 В отличие от LocalExecutor, DebugExecutor не использует многопоточность и выполняет задачи последовательно!
  • Основные компоненты:
    o DAG – исполняется как обычный Python-скрипт.
    o DebugExecutor – запускает задачи в текущем процессе.
    o Логирование – сразу видно в консоли.
    o Нет воркеров, брокеров сообщений или Scheduler’а.
  • Минусы:
    o Нет параллелизма.
    o Не предназначен для продакшна.
  • Когда использовать:
    o Локальная отладка и проверка логики DAG’ов.
    ________________________________________
    7. CeleryKubernetesExecutor
  • Суть: CeleryKubernetesExecutor — это гибридный Executor, который объединяет возможности CeleryExecutor и KubernetesExecutor. Он позволяет гибко распределять задачи DAG’ов:Стандартные задачи исполняются через CeleryExecutor (на постоянных воркерах).
    Ресурсоёмкие задачи запускаются в отдельных Kubernetes Pod’ах (как в KubernetesExecutor).

🔥 Главное преимущество — возможность гибкого выбора способа выполнения для каждой задачи DAG’а.
* Как это работает?
1️⃣ Scheduler анализирует DAG и определяет, как выполнять задачи:

Обычные задачи → Celery (в воркерах Celery).
Тяжёлые задачи → Kubernetes (новый Pod в Kubernetes).

2️⃣ Celery Worker’ы берут задачи из Redis/RabbitMQ и выполняют их.
3️⃣ Kubernetes Pod’ы создаются и уничтожаются для выполнения отдельных задач.
4️⃣ Метаданные (статус задач) хранятся в базе Airflow.

🔥 Celery + Kubernetes = гибкость и масштабируемость
* Основные компоненты
o Scheduler – распределяет задачи между Celery и Kubernetes.
o Broker (Redis/RabbitMQ) – очередь сообщений для Celery.
o Celery Worker’ы – обрабатывают задачи, которые не требуют выделенных ресурсов.
o Kubernetes Cluster – создаёт Pod’ы для задач с особыми требованиями.
o Metadata Database – хранит состояние DAG’ов и задач.
o Web Server – UI для мониторинга.

  • Когда использовать:
    ✅ Если у тебя есть Celery, но иногда нужны временные ресурсы Kubernetes.
    ✅ Если хочешь минимизировать использование Pod’ов (они создаются только для тяжёлых задач).
    ✅ Если нужно комбинировать стабильность Celery с масштабируемостью Kubernetes.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Какие компоненты нужно указать при переходе Airflow из локальной среды в прод? Какие конфигурации необходимо настроить?

Теги: #Cian

A

Итог

При переходе Airflow из локальной среды в прод необходимо:

o Переключиться на полноценную БД (Postgres/MySQL).
o Выбрать Executor, чаще всего Celery или Kubernetes (с настройкой брокера и бэкенда).
o Подготовить окружение для webserver, настроить аутентификацию/авторизацию.
o Настроить scheduler (планировщик) на стабильную работу с нужным параллелизмом.
o Решить вопрос с хранением логов (локально, S3, GCS и т.д.).
o Настроить и защитить Connections и Variables (секреты).
o Организовать безопасность — аутентификация (RBAC, LDAP), HTTPS, доступ по VPN.
o Обеспечить масштабирование и мониторинг (проверять ресурсы, метрики).

  1. Обёртка (Docker/Kubernetes или «голое» развёртывание)В продакшне часто используют Docker или Kubernetes для упаковки Airflow.
    Если используете Helm Chart для Airflow, нужно настроить:
    Переменные окружения (env) для указания подключения к БД, брокеру и т.д.
    Секреты (Secrets) для хранения логинов/паролей.
    Объёмы (Volumes) для шаринга логов, плагинов, файлов DAG-ов.
    Если устанавливается на «голый» сервер, то используют systemd/Supervisor для автозапуска процессов (scheduler, webserver, worker).

1 Тип экзекьютора (Executor)

LocalExecutor или SequentialExecutor — используются в тестовых/локальных средах.
CeleryExecutor — чаще всего для продакшна, так как поддерживает горизонтальное масштабирование, распределение задач между воркерами. Потребуется настроить брокер сообщений (RabbitMQ или Redis) и бэкенд (результаты выполнения задач часто хранятся в той же базе, что и метаданные, либо в Redis).
KubernetesExecutor — если Airflow развёртывается в Kubernetes, и требуется динамическое создание подов для задач.

При переходе на CeleryExecutor или KubernetesExecutor нужно настроить соответствующее окружение и сервисы (брокер, очередь, база данных, и т.д.).

  1. Брокер и бекенд (для CeleryExecutor)

Если выбран CeleryExecutor, то обязательно требуется:

Broker (брокер сообщений): обычно RabbitMQ или Redis.
    Указать параметры подключения к брокеру в airflow.cfg (broker_url).
Result backend (хранилище результатов): может быть Redis или та же PostgreSQL/MySQL.
    Указать result_backend в airflow.cfg.
  1. База данных (Metadata DB)

В локальной среде часто используется SQLite, но в продакшне необходимо использовать полноценную реляционную БД, например:

PostgreSQL
MySQL

Основные моменты:

Указать корректный sql_alchemy_conn в airflow.cfg (или в переменных окружения).
Убедиться, что Airflow имеет права на чтение/запись в эту БД.
Настроить регулярную процедуру бэкапа.
  1. Настройки веб-сервера Airflow (Webserver)Авторизация и аутентификация
    Переключение с открытой анонимной аутентификации на роль-based (RBAC) или другие варианты (LDAP/OAuth/SAML).
    Настройка учетных записей, ролей (Admin, User, Viewer и т.д.) и т.п.
    Настройка порта и хостинга
    Как будет доступен Airflow UI – за прокси (Nginx, Traefik), либо напрямую.
    SSL/HTTPS (через прокси-сервер или «комбинированно»).
    Gunicorn-воркеры, количество процессов, таймауты.
    1. Scheduler (Планировщик)

В продакшне:

Для эффективного управления ресурсами Airflow необходимо правильно настраивать ключевые параметры планировщика (scheduler), которые влияют на обработку DAG-ов и задач:

max_threads — максимальное количество потоков, которые использует Scheduler для параллельной обработки DAG-файлов. По умолчанию, планировщик сканирует папку с DAG-ами и загружает их в память, что может быть ресурсоёмкой операцией. Если DAG-ов много, увеличение max_threads позволит ускорить их обработку. Однако слишком высокое значение может привести к перегрузке процессора, поэтому его следует подбирать с учетом доступных ресурсов.

min_file_process_interval — минимальный интервал в секундах между повторными обходами DAG-файлов. Если, например, установить min_file_process_interval=30, то планировщик не будет проверять файлы DAG-ов чаще, чем раз в 30 секунд. Это позволяет снизить нагрузку на файловую систему, особенно если DAG-ов много и они часто обновляются.

Запуск планировщика как сервиса

Для стабильной работы Airflow важно следить, чтобы Scheduler запускался как системный сервис. Это гарантирует его автоматический перезапуск в случае сбоя и удобное управление процессом. В зависимости от окружения используются разные методы:

systemd — стандартный способ управления сервисами в Linux. Позволяет настроить автоматический рестарт планировщика в случае падения.
supervisord — инструмент мониторинга и управления процессами, который также может автоматически перезапускать Airflow Scheduler.
Docker/Kubernetes — при использовании контейнеризированного окружения Airflow можно настроить Kubernetes Pod или Docker контейнер с политикой перезапуска (restart policy). Например, в Kubernetes можно использовать restartPolicy: Always, чтобы перезапуск происходил автоматически при падении пода.
  1. ЛогированиеПути хранения логов
    В продакшне логи обычно хранят отдельно (S3, GCS, NFS, локальная папка на лог-сервере).
    В airflow.cfg прописать remote_logging и параметры удалённого хранилища, если нужно.
    Уровень логирования (DEBUG/INFO/WARN).
  2. Настройка Connections и VariablesВсе внешние подключение (Connections: БД, API, S3, Hadoop-кластеры и т.п.) переводятся из локальных URI в прод-окружения.
    Хранение чувствительных данных:
    Если необходимо, подключать secret-backend (Vault, AWS Secret Manager, GCP Secret Manager).
    Убедиться, что значения паролей не «зашиты» в простых текститовых airflow.cfg и т.п.
     9. Масштабирование
    При увеличении количества DAG-ов и задач в Apache Airflow важно контролировать ключевые параметры, определяющие уровень параллелизма и управление ресурсами. Параметры parallelism, dag_concurrency и worker_concurrency настраиваются в файле конфигурации airflow.cfg:parallelism — глобальный уровень параллелизма в Airflow. Этот параметр задаёт максимальное количество задач, которые могут выполняться одновременно во всей системе (во всех DAG-ах). Например, если parallelism=32, то в любой момент времени не более 32 задач могут находиться в активном состоянии (running), независимо от того, в каких DAG-ах они находятся.dag_concurrency — ограничение на число задач, которые могут выполняться одновременно в рамках одного DAG-а. Если, например, dag_concurrency=8, это означает, что даже если DAG содержит больше задач, Airflow не запустит более 8 одновременно активных задач для этого DAG-а. Этот параметр важен, если есть DAG-и с большим количеством задач, чтобы они не потребляли слишком много ресурсов.worker_concurrency — количество процессов, которые может запускать один worker в CeleryExecutor. Если используется CeleryExecutor, то каждый worker может параллельно обрабатывать несколько задач, но их общее количество определяется worker_concurrency. Например, если параметр установлен в worker_concurrency=16, то один worker сможет одновременно обрабатывать до 16 задач. Это значение должно быть согласовано с ресурсами хоста (CPU, RAM), чтобы избежать перегрузки.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Как можно передавать данные между тасками в Airflow ? Какие ограничения существуют? Почему не рекомендуется передавать большие объекты (например, DataFrame) через XCom?
Теги: #Cian, #Иннотех, #Rubbles, #x5 #Ярослав

A
  • XCom (Cross-Communication) — механизм, позволяющий таскам обмениваться небольшими данными (например, статусом, путями к файлам, ключами и т.п.).
  • Под капотом данные хранятся в базе Airflow, поэтому есть ограничения по объёму.
  • Рекомендуется передавать только “легковесные” объекты (строки, числа, ссылки).
  • Можно также хранить данные во внешних хранилищах и передавать ссылки (при больших объёмах).

Производится через некоторый внешний сервис. Или нужно сохранить в файл (локально на ВМ/контейнер airflow, S3, hdfs и тд) и в следующем шаге оттуда считать. Или сложить в базу в промежуточную схему и потом читать оттуда.

Ещё раз полезно напомнить, что Airflow это оркестратор, который отправляет задачи внешним системам и следит за их исполнением, а не ETL инструмент.

Как передавать данные через XCom?

Запись данных в XCom
    Используется task_instance.xcom_push внутри Python-оператора.
    Пример: from airflow.decorators import task

@task
def push_xcom():
return “Hello from XCom”

Чтение данных из XCom

Используется task_instance.xcom_pull.
Пример:
	
@task def pull_xcom(ti):
value = ti.xcom_pull(task_ids='push_xcom')
print(f"Received: {value}")

Автоматическая передача (XComArg)

В Airflow 2+ можно передавать результат одной задачи в другую без xcom_pull.
Пример:

pushed_value = push_xcom()
pull_xcom(pushed_value)

TaskFlow API (новый стиль написания DAG’ов в Airflow 2+) — даёт возможность возвращать объекты в декораторе @task и автоматически прокидывать их в другие таски. Но, по сути, данные всё равно сохраняются в XCom. Удобный способ определения последовательности выполнения тасок и передачи мета-данных между ними. Введён в 2.0.0 airflow. Использует декораторы для определения тасок (и дагов). Пример: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Поясню, что здесь происходит.
После определения тасок указывается их запуск в таком порядке:
order_data = extract() # вызывается экстракт таск и результат передаётся в order_data переменную
order_summary = transform(order_data) # вызывается трансформ таск с параметром, туда передаётся результат выполнения предыдущей таски
load(order_summary[“total_order_value”]) # аналогично предыдущему шагу, но со значением словаря по ключу
По-старинке можно было бы задать порядок выполнения через
extract&raquo_space; transform&raquo_space; load, обрабатывая передачу параметров через XCOM вручную и описывая как минимум task_id.
Или через
extract.set_downstream(transform)
transform.set_downstream(load)
Также можно задавать условия выполнения через BranchPythonOperator и ShortCircuitOperator.

Почему не стоит передавать большие объекты (например, DataFrame) через XCom
Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.

  1. Перегружаете базу: большой объём данных в XCom = большая нагрузка на мета-базу Airflow.
  2. Проблемы с сериализацией: объекты сериализуются (pickling), что может приводить к ошибкам и замедлять работу.
  3. Плохая масштабируемость: при больших объёмах данных это сильно бьёт по производительности.
    Рекомендация:
    - Хранить большие объёмы данных во внешнем хранилище (S3, PostgreSQL, HDFS и т.д.), а через XCom/TaskFlow передавать только путь или ключ.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Как организовать алертинг об ошибках и падениях тасок/дагов Airflow (например, в Telegram)?

Теги: #wildberries, #x5

A

Через оператор TelegramOperator. Да, так просто.
Если впервые подключаешь новый сервис к взаимодействию с airflow, проверь, может есть уже написанный оператор для этого.

Как настроить алертинг в Airflow через Telegram

  • Создать бота в Telegram через @BotFather, получить TOKEN.
    • Найти chat_id с помощью API:
    • https://api.telegram.org/bot<TOKEN>/getUpdates</TOKEN>
  • Добавить соединение в Airflow (Admin → Connections):
    Conn Id: telegram_default
    Conn Type: HTTP
    Host: https://api.telegram.org/bot<TOKEN>/
    Extra: {"password": "<TOKEN>"}</TOKEN></TOKEN>
  • Установить пакет (если нет):pip install apache-airflow-providers-telegram
  • Добавить TelegramOperator в DAG для отправки сообщений.
  • ## Использовать on_failure_callback в таски, даги для автоматических алертов при сбоях.
  • Стандартные e-mail уведомления
  • В Airflow можно настроить e-mail алерты, указав SMTP-сервер и параметры в airflow.cfg.
  • В DAG или задачах можно прописать email_on_failure=True и email=[‘you@example.com’].
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Что такое параметр catchup в DAG Airflow и для чего он может быть полезен, кроме прогонки исторических данных?

Теги: #wildberries

A

catchup — параметр в Apache Airflow, который определяет, должны ли старые даты выполнения задач быть выполнены при первом запуске DAG. Когда catchup установлен в True, Apache Airflow автоматически запускает пропущенные даты выполнения задач после того, как DAG был включен в работу. Кроме исторического прогона, помогает обеспечить целостность данных, если DAG должен гарантированно отработать за все пропущенные периоды.

Когда catchup=True полезен?

Когда важно обработать всю историю данных.
Для DAG, работающих с инкрементальными загрузками из БД.
Если DAG загружает данные с метками времени, например, записи логов.

Когда catchup=True не полезен?

  1. Оптимизация нагрузки на систему

При наличии DAG, который выполняется часто (например, каждый час или чаще), большое количество пропущенных запусков может перегрузить систему, когда Airflow попытается их все обработать. Установка catchup=False предотвращает лавинообразное выполнение DAG.

Пример:
Вы включаете DAG, который выполняется раз в 5 минут, но он был отключен на 2 недели. Без catchup=False Airflow попытается выполнить 4032 запуска (12 запусков в час × 24 часа × 14 дней), что может привести к перегрузке.

  1. Исключение обработки устаревших данных

Если DAG загружает данные из внешних источников (например, API Wildberries), исторические данные могут быть уже недоступны или устаревшими.

Пример:
Вы загружаете остатки товаров с Wildberries API, но API предоставляет данные только за последние 7 дней. Если DAG был отключен на 2 недели, то запуски за более ранние даты просто не найдут данных.

  1. Исключение проблем с повторным запуском зависимых процессов

Если DAG управляет данными, которые изменяются в реальном времени (например, обновляет агрегированные отчеты в ClickHouse или Greenplum), прогон старых запусков может привести к:

дублированию данных,
пересозданию отчетов с устаревшими значениями,
логическим ошибкам при обработке временных меток.

Пример:
DAG обновляет витрину продаж Wildberries в ClickHouse. Если DAG был отключен на неделю и затем включен, пересоздание витрины за старые даты приведет к конфликту данных.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Какие макросы Airflow существуют и какие вы используете на практике?

Теги: #wildberries

A

В Apache Airflow макросы позволяют использовать динамические значения внутри DAG’ов, шаблонов и операторов (например, BashOperator, PythonOperator, EmailOperator и др.). Они особенно полезны для параметризации задач.

Примеры макросов:
* {{ ds }} / {{ ds_nodash }} Дата, за которую DAG обрабатывает данные (а не дата его запуска). Если DAG выполняется ежедневно, то ds – это вчерашняя дата.),
* {{ execution_date }} (дата/время запуска),
* {{ prev_ds }}, {{ next_ds }} (предыдущая/следующая дата),
* {{ dag_run.conf }} (конфигурация, переданная при триггере).
Чаще всего используют ds/ds_nodash для шаблонизации путей и имён файлов.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Сенсоры и Poke интервал. Что знаешь про это?

Теги: #Ярослав

A

Sensors — это специальные операторы в Airflow, которые ожидают наступления определённого события (например, появления файла, готовности таблицы или завершения другого DAG’а).

По умолчанию сенсоры работают в poke-режиме, при котором они полностью занимают worker slot (ресурс кластера, см. параллелизм), пока ждут событие.

Режимы работы сенсоров

Настроить режим сенсора можно в параметре mode, передаваемом в конструктор сенсора:

Poke (по умолчанию) — сенсор постоянно проверяет условие через poke_interval, занимая worker slot.
Reschedule — сенсор «отпускает» worker slot между проверками, что снижает нагрузку на Airflow. Вместо блокировки он просто перезапускается через poke_interval.
    Устанавливается через mode='reschedule'.
Smart Sensors — оптимизированная система сенсоров в Airflow 2.x, позволяющая сгруппировать несколько сенсоров в один процесс и уменьшить нагрузку на кластер. Smart Sensors тоже используют poke-логику, но делают это оптимизированно. Smart Sensors позволяют объединить несколько сенсоров в один процесс, что снижает нагрузку на scheduler и workers. Работает это так: вместо того чтобы каждый сенсор запускался отдельно, Airflow группирует их в один специальный smart-sensor-процесс, который выполняет проверки централизованно. 🔹 Как включить Smart Sensors

Включить их в конфигурационном файле Airflow (airflow.cfg):
	
	[smart_sensor] use_smart_sensor = true shard_code_upper_limit = 10000

	
	Настройка poke- и reschedule-режимов

Режим сенсора (mode) и интервал между проверками (poke_interval) указываются при его создании. Например:

from airflow.sensors.filesystem import FileSensor

file_sensor = FileSensor(
task_id=”wait_for_file”,
filepath=”/path/to/file”,
poke_interval=60, # Проверка каждую минуту
mode=”poke”, # Можно заменить на ‘reschedule’ для экономии ресурсов
)
Если в mode=’poke’, Airflow будет удерживать worker slot на время ожидания. Если mode=’reschedule’, сенсор будет временно отключаться и ждать следующего запуска.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Какие есть конекторы в Аирфлоу?

Теги: #neoflex

A

Коннекторы в Apache Airflow – это специальные модули, которые позволяют DAG’ам взаимодействовать с различными внешними системами, такими как базы данных, облачные сервисы, API и другие источники данных. Коннекторы реализуются через Hook и Operator.

Hook – низкоуровневый интерфейс для подключения к внешней системе.
Operator – высокоуровневый объект, использующий Hook для выполнения задач.

Пример:

PostgresHook – устанавливает соединение с PostgreSQL.
PostgresOperator – выполняет SQL-запрос в PostgreSQL через этот Hook.

Коннектор (Hook/Operator) Описание
PostgresHook, PostgresOperator Подключение к PostgreSQL
MySqlHook, MySqlOperator Подключение к MySQL
MsSqlHook, MsSqlOperator Подключение к Microsoft SQL Server
OracleHook, OracleOperator Подключение к Oracle
SnowflakeHook, SnowflakeOperator Работа с Snowflake
MongoHook, MongoOperator Работа с MongoDB
ClickHouseHook, ClickHouseOperator Работа с ClickHouse (через кастомные плагины)

HdfsHook, HdfsSensor Подключение к HDFS
HiveServer2Hook, HiveOperator Работа с Apache Hive
SparkSubmitHook, SparkSubmitOperator Запуск Spark-приложений
PrestoHook, PrestoToGCSOperator Работа с Presto
BigQueryHook, BigQueryOperator Подключение к Google BigQuery

пример
from airflow.providers.postgres.hooks.postgres import PostgresHook

def get_postgres_data():
pg_hook = PostgresHook(postgres_conn_id=’my_postgres_conn’)
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(“SELECT * FROM my_table LIMIT 10;”)
result = cursor.fetchall()
for row in result:
print(row)

get_postgres_data()

  • Как добавить новое подключение (Connection) в Airflow?
    Через Web UIПерейти в Admin → Connections.
    Нажать “+ Add Connection”.
    Заполнить поля:
    Conn Id: my_postgres_conn
    Conn Type: Postgres
    Host: localhost
    Schema: my_database
    Login: airflow_user
    Password: password
    Port: 5432
    Нажать Save.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Как хранить секреты? +(connections)?

Теги: #Ярослав

A

Можно в переменных (Admin - Variables), они хранятся в зашифрованном виде (если в airflow.cfg настроен fernet key – фернет это тип симметричного шифрования, этого достаточно).

Но если секреты это параметры подключения (credentials, кредэншиалс) для баз данных и других источников, для этого идеально подходят коннекшены (Admin - Connections). Там задаются все нужные параметры, имя коннекта, и подключение производится по нему.

Через эти же коннекшены можно разделять контура дев-тест-прод: задаётся единое имя коннекта, а в разных инстансах airflow значения могут быть разными, и соответственно вести к разным сервисам. Конечно, ещё нужно чтобы была сетевая связность, т.е. сервер airflow мог обмениваться данными с сервером базы в принципе (подробнее https://t.me/rzv_de/92).

Также можно использовать отдельные сервисы, например hashicorp Vaul

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Зачем нужен самописный оператор и как работает?

Теги: #Ярослав

A

Нужен тогда, когда хочется переиспользовать какой-то пайтон код, а готового оператора нет. Например, для связи с внутренним сервисом компании или непопулярным внешним API. Для google ads точно есть оператор, для яндекс погоды - вряд ли. Наследуется от BaseOperator, должен определить метод execute.

Создание кастомного оператора

Создадим Python-класс PostgresToGCSOperator, который:

Выполняет SQL-запрос в PostgreSQL.
Сохраняет результат в локальный файл.
Загружает файл в GCS.
	
	from airflow.models import BaseOperator from airflow.hooks.postgres_hook import PostgresHook from airflow.providers.google.cloud.hooks.gcs import GCSHook import csv import os

class PostgresToGCSOperator(BaseOperator):
def __init__(self, postgres_conn_id, sql_query, gcs_bucket, gcs_path, gcs_conn_id=’google_cloud_default’, args, **kwargs):
super().__init__(
args, **kwargs)
self.postgres_conn_id = postgres_conn_id
self.sql_query = sql_query
self.gcs_bucket = gcs_bucket
self.gcs_path = gcs_path
self.gcs_conn_id = gcs_conn_id

def execute(self, context):
    self.log.info("Подключаемся к PostgreSQL")
    pg_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id)
    conn = pg_hook.get_conn()
    cursor = conn.cursor()
    
    self.log.info("Выполняем SQL-запрос")
    cursor.execute(self.sql_query)
    rows = cursor.fetchall()
    column_names = [desc[0] for desc in cursor.description]

    local_file = "/tmp/temp_data.csv"
    self.log.info(f"Сохраняем данные в файл {local_file}")
    with open(local_file, "w", newline="") as f:
        writer = csv.writer(f)
        writer.writerow(column_names)
        writer.writerows(rows)

    self.log.info(f"Загружаем файл в GCS: gs://{self.gcs_bucket}/{self.gcs_path}")
    gcs_hook = GCSHook(self.gcs_conn_id)
    gcs_hook.upload(bucket_name=self.gcs_bucket, object_name=self.gcs_path, filename=local_file)

    self.log.info("Удаляем временный файл")
    os.remove(local_file)

    self.log.info("Оператор успешно завершил работу")

Использование кастомного оператора

Теперь добавим его в DAG:

from airflow import DAG
from datetime import datetime
from custom_operators.postgres_to_gcs import PostgresToGCSOperator

default_args = {
“owner”: “airflow”,
“start_date”: datetime(2024, 3, 1),
“retries”: 1,
}

with DAG(“postgres_to_gcs_dag”, default_args=default_args, schedule_interval=”@daily”, catchup=False) as dag:

export_to_gcs = PostgresToGCSOperator(
    task_id="export_postgres_to_gcs",
    postgres_conn_id="my_postgres_conn",
    sql_query="SELECT * FROM my_table;",
    gcs_bucket="my_bucket",
    gcs_path="data/my_table.csv",
)

export_to_gcs
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Что влияет на параллелизм?

Теги: #Ярослав

A

Основные настройки:
* parallelism (глобальный максимум параллельных задач)
Это параметр на уровне всей установки Airflow (настройка в airflow.cfg). Он задает максимальное количество одновременно выполняемых задач (task instances) в рамках всей системы, независимо от количества DAG-ов, worker-ов и других ограничений.
Пример:
Если parallelism = 32, то Airflow одновременно запустит не более 32 задач, даже если DAG-ов много и в каждом из них есть готовые к выполнению задачи.,
* dag_concurrency (максимум одновременно работающих тасок в рамках одного DAG),
* worker_concurrency (сколько задач может одновременно обрабатывать один worker),
* pool (ограничение числа задач по ресурсам/пуллам)
Pools (пулы) в Airflow позволяют ограничивать выполнение задач по определенным ресурсным ограничениям. Например, если задачи обращаются к внешней базе данных или API с ограниченным числом соединений, можно создать pool и указать, сколько задач одновременно могут его использовать.

Пример:
Если создать pool с size=10, то не более 10 задач смогут одновременно использовать этот ресурс, даже если все остальные ограничения (parallelism, dag_concurrency, worker_concurrency) позволяют больше..

Достаточно сказать, что есть Pool’ы, которые ограничивают максимальное количество тасок, работающих параллельно. Стандартный пул содержит 128 слотов.
Их можно настроить через Admin - Pools
Для разных дагов и даже тасок можно задавать разные пулы. Для таски можно указать “вес” для приоритизации внутри пула. Это тонкая настройка для тех дагов, выполнение которых важнее, чем других.
Ну и, конечно, в самих дагах нужно таски выстраивать параллельно, где возможно. Оставляй только те зависимости, которые логически ограничивают таску от запуска, даже если будет выглядеть “некрасиво”. Полезно сказать и про таскгруппы, которые задают шаблон выполнения тасок и позволяют формировать их, например, в цикле.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Как реализовать свой кастомный оператор в Airflow? (Какой метод нужно определить?)

Теги: #x5

A

Чтобы реализовать свой кастомный оператор в Apache Airflow, тебе нужно понимать архитектуру Airflow Operators, какую базу они наследуют, и какие методы и параметры нужно определять, чтобы всё корректно работало внутри DAG.

Вот пошаговое и полное объяснение без самого кода, как ты просил:

🔧 1. Наследование: от какого класса

Любой кастомный оператор в Airflow наследуется от базового класса:

airflow.models.BaseOperator

или от специализированного:

airflow.operators.BaseOperator

Или ты можешь унаследовать от существующего оператора (например, PythonOperator, если ты хочешь просто добавить логику к уже существующему поведению).

📌 2. Какие методы нужно определить

Обязательный метод:

  • def execute(self, context):
    • Это основной метод, который вызывается при выполнении задачи в DAG.
    • Аргумент context — это словарь со всеми переменными окружения Airflow (execution date, task instance, dag, ds, ts и т.п.)
    • Всё, что оператор должен делать — описывается в execute.

(Опционально):

  • \_\_init\_\_(...)
    • Здесь ты задаёшь параметры, которые можно будет передавать при объявлении оператора в DAG.
    • Не забудь вызвать super().\_\_init\_\_(...), передав в него task_id, и другие параметры, поддерживаемые базовым оператором (например, retries, depends_on_past и т.д.).

🧱 3. Где сохранять кастомный оператор

a) Локально в проекте DAG’ов:

  • Создай структуру:
    dags/
      ├── my_operators/
      │   └── my_custom_operator.py
      ├── my_dag_using_operator.py
  • Обязательно добавь \_\_init\_\_.py в каждую папку, если используешь относительный импорт (или хочешь чтобы это была полноценная Python-модель).

b) Airflow Plugins (устаревший способ, но иногда нужен):

  • Регистрируй оператор через систему плагинов, но это избыточно, если не используешь внешние UI-компоненты или хуки.

📦 4. Как импортировать и использовать в DAG

В DAG-файле:

  • Импортируй кастомный оператор как обычный Python-класс.
  • Используй его как обычный оператор, передавая task_id и свои параметры.

```python
from my_operators.my_custom_operator import MyCustomOperator

task = MyCustomOperator(
task_id=’do_something_custom’,
my_param=’value’
)
~~~

🧠 5. Что можно использовать в кастомном операторе

Внутри execute(context) ты можешь:

  • Получать execution date и другие параметры из context.
  • Работать с Airflow hook’ами (например, PostgresHook, HttpHook).
  • Вызывать внешние API, скрипты, команды, и т.д.
  • Возвращать значения (они сохраняются в XCom по умолчанию).

🧪 6. Как тестировать

  • Можешь использовать airflow tasks test <dag_id> <task_id> <execution_date> для запуска конкретной задачи с твоим оператором.
  • Логика в execute будет вызвана в том же процессе.

🏷️ 7. Тег #x5 — как сохранять и документировать

Если у тебя в команде приняты специальные теги (например, #x5), разумно:

  • Добавить комментарий с таким тегом внутри \_\_init\_\_ и execute.
  • Придерживаться Docstring-стиля Google или NumPy для читаемой автогенерации документации.
  • Можно хранить такие кастомные модули в libs/x5_airflow_plugins/operators/ или dags/x5/operators/, в зависимости от структуры проекта.

🧭 8. Рекомендации и нетривиальные кейсы

  • Если хочешь, чтобы кастомный оператор отображал поля в UI (например, в XCom, или кастомный лог), — реализуй логику логирования через self.log.info(...).
  • Чтобы оператор был совместим с TaskFlow API, он должен корректно поддерживать возврат значений из execute.
  • В мультиплатформенных окружениях не забывай указывать зависимости (requirements.txt или constraints.txt), если твой оператор требует сторонние библиотеки.

Если хочешь, могу сразу предложить шаблон класса с комментами и готовой структурой — скажи.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Использовали ли вы кастомные шаблоны Jinja и XCom в Airflow? Как их настраивали?

Теги: #x5

A

Кастомные шаблоны Jinja
В одном из проектов у нас была потребность подстановки динамических путей в S3 и генерации SQL-запросов с учётом среды (dev, prod) и дат. Вместо того чтобы хардкодить логику в каждом таске, мы вынесли её в кастомные макросы и фильтры Jinja. Например, создавали макрос, который на вход принимал дату и окружение, а возвращал полный путь к файлу; этот макрос объявлялся в user_defined_macros и вызывался в bash_command и SQL, чтобы каждый раз не переписывать шаблон.

Кастомный XCom
Была задача хранить в XCom большие JSON-объекты с метаданными, что в стандартном варианте могло перегружать базу Airflow. Мы реализовали кастомный XCom-класс (наследованный от BaseXCom), переопределили методы сериализации и десериализации, чтобы фактически сохранять данные во внешнее S3-хранилище, а в базе Airflow хранить только ссылку. Так одна задача выгружала метаданные в S3 и пушила их ключ в XCom, а другая задача подтягивала ключ из XCom, загружала данные из S3 и уже с ними работала. Это позволяло эффективно работать с объёмными данными и не перегружать метаданные Airflow.

Jinja-шаблоны в Airflow

Jinja-шаблоны позволяют динамически подставлять значения в параметры операторов (tasks) с помощью переменных, макросов и встроенной логики (например, условных выражений или циклов). Это удобно, когда нужно, например, автоматически подставлять даты выполнения (execution_date), идентификаторы DAG’ов или другие параметры среды.

Для использования шаблонов в Airflow есть два ключевых механизма:

template_fields – список полей в операторе, которые можно шаблонизировать. Например, если у вас есть BashOperator, его аргумент bash_command можно сделать шаблонизируемым:
	
	bash_command="echo Execution date is {{ ds }}"

В момент выполнения DAG’а {{ ds }} автоматически заменится на дату выполнения.

Каждый оператор (например, BashOperator, PythonOperator) имеет атрибут template_fields. Это список полей, которые могут использовать Jinja-шаблоны. У каждого оператора свой template_fields. Например, у BashOperator можно шаблонизировать bash_command, а у PythonOperator – аргументы функции.Посмотреть список template_fields можно так: print(BashOperator.template_fields) # (‘bash_command’, ‘env’) . Он позволяет передавать несколько шаблонизируемых значений в виде словаря. Например:

PythonOperator(
task_id=”my_task”,
python_callable=my_function,
op_kwargs={“param”: “{{ execution_date }}”}
)

Здесь “{{ execution_date }}” заменится реальной датой перед вызовом my_function.

Кастомные шаблонные переменные

Если нужно добавить свои переменные, можно переопределить user_defined_macros:

  • XCom – механизм передачи данных между задачами

XCom (Cross-Communication) – это встроенный механизм Airflow для передачи данных между задачами. Он позволяет одной задаче (task) записывать данные, а другой – их читать.

🔹 Как работает XCom?
Airflow автоматически сохраняет XCom-записи в своей базе данных. Каждая запись содержит:

Ключ (key) – уникальное название записи
Значение (value) – сами данные (может быть строка, число, объект и т. д.)
Идентификаторы DAG'а и задачи (чтобы записи не путались)

🔹 Как записывать и читать XCom?

Запись выполняется через task_instance.xcom_push(key, value)
Чтение – через task_instance.xcom_pull(task_ids, key)

Пример использования в PythonOperator:

def push_value(**kwargs):
kwargs[‘ti’].xcom_push(key=’my_key’, value=42)

def pull_value(**kwargs):
value = kwargs[‘ti’].xcom_pull(task_ids=’push_task’, key=’my_key’)
print(f”Received value: {value}”)

🔹 Как передавать данные удобнее?
Для упрощенной работы с XCom можно использовать TaskFlow API (@task-декораторы). Это избавляет от явного вызова xcom_push/xcom_pull:

@task
def push_value():
return 42 # Значение автоматически записывается в XCom

@task
def pull_value(value):
print(f”Received: {value}”) # Значение автоматически передается из предыдущей задачи

Кастомизация XCom

Настройка сериализации: по умолчанию XCom сохраняет данные в виде pickle-объектов. В airflow.cfg можно изменить формат, например, на JSON (xcom_serializer = airflow.utils.json).
Хранение больших объектов: для передачи больших объемов данных можно использовать XCom backend (например, Google Cloud Storage, S3).
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Динамическая генерация тасок. Partial, expand. Что знаешь про это?

Теги: #Ярослав #x5

A

Создание дага

Динамическая генерация задач в Airflow

Динамическая генерация задач (dynamic task mapping) позволяет «на лету» создавать несколько экземпляров одной и той же задачи с разными параметрами. Это удобно, когда у вас есть список входных данных, и для каждого элемента списка нужно выполнить однотипную операцию.

Способы реализации:

  1. Генерация через цикл (например, PythonOperator)
    Можно использовать классический подход с созданием операторов (операторов) в цикле. Пример (упрощённо):
    from datetime import datetime
    from airflow.decorators import dag, task

default_args = {
“owner”: “airflow”,
“retries”: 1
}

@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False,
default_args=default_args,
tags=[“example”, “taskflow”]
)
def dynamic_taskflow_dag():

@task()
def my_task(param1: str, param2: str):
    """
    Пример таска, который принимает два параметра и выводит их в лог.
    param1 - фиксированное значение для всех задач.
    param2 - динамическое значение, получаемое из списка dynamic_values.
    """
    print(f"param1={param1}, param2={param2}")

# Фиксированные аргументы для всех тасков
static_args = {"param1": "fixed_value"}  # Значение param1 остаётся неизменным

# Динамический список значений для параметра param2
dynamic_values = ["val1", "val2", "val3"]  # Каждое значение создаст отдельный таск

# Создаём частично зафиксированную задачу с заранее заданным param1
partial_task = my_task.partial(**static_args)

# Используем expand, чтобы создать несколько независимых задач, 
# где param2 будет принимать значения из dynamic_values
# Это эквивалентно вызову my_task("fixed_value", "val1"), my_task("fixed_value", "val2"), и т.д.
partial_task.expand(param2=dynamic_values)

dynamic_taskflow_dag()
Каждый проход цикла создаёт новую задачу (task) со своим task_id и передаёт соответствующий item в функцию.
Зачем это нужно? Например, у нас есть 10 файлов, которые нужно последовательно обработать, и мы не хотим «клонировать» один и тот же код 10 раз.

  1. Генерация задач через TaskGroup
    Если нужно логически группировать несколько динамических задач, то в Airflow 2.* можно использовать TaskGroup. Внутри TaskGroup можно, аналогично примеру выше, сгенерировать задачи в цикле, а затем связать эту группу с другими задачами.
    Это упрощает визуализацию на графе и делает DAG чище.from airflow import DAG
    from airflow.operators.python import PythonOperator
    from airflow.utils.task_group import TaskGroup
    from datetime import datetime

Функция, которая будет вызвана в динамически созданных задачах
def process_item(item):
print(f”Processing item: {item}”)

with DAG(
dag_id=”dynamic_taskgroup_example”,
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False
) as dag:

items = ["apple", "banana", "cherry", "date", "elderberry"]  # Данные для генерации тасок

# Создаём TaskGroup, в которой будем динамически создавать задачи
with TaskGroup(group_id="process_fruits") as process_fruits_group:
    for item in items:
        PythonOperator(
            task_id=f"process_{item}",
            python_callable=process_item,
            op_args=[item]
        )

# Дополнительные таски DAG (пример)
start_task = PythonOperator(
    task_id="start",
    python_callable=lambda: print("Starting the DAG")
)

end_task = PythonOperator(
    task_id="end",
    python_callable=lambda: print("Ending the DAG")
)

# Определяем порядок выполнения
start_task >> process_fruits_group >> end_task
  1. Использование @task (TaskFlow API), partial и expand
    В TaskFlow API, начиная с Airflow 2.3, появились механизмы partial и expand, которые упрощают написание динамических задач.
    • partial позволяет зафиксировать часть аргументов, которые будут одинаковыми для всех экземпляров задачи.
    • expand позволяет «раскрывать» список параметров и создать несколько экземпляров одной задачи для каждого значения из этого списка.
    Пример (TaskFlow API):
    from datetime import datetime
    from airflow.decorators import dag, task

default_args = {
“owner”: “airflow”,
“retries”: 1
}

@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False,
default_args=default_args,
tags=[“example”, “taskflow”]
)
def dynamic_taskflow_dag():

@task()
def my_task(param1: str, param2: str):
    print(f"param1={param1}, param2={param2}")

static_args = {"param1": "fixed_value"}
dynamic_values = ["val1", "val2", "val3"]

partial_task = my_task.partial(**static_args)
partial_task.expand(param2=dynamic_values)

dynamic_taskflow_dag()
Здесь:
- my_task — это один Python-функциональный таск.
- partial «зашивает» param1="fixed_value" для всех экземпляров.
- expand(param2=dynamic_values) генерирует три независимых подзадачи (для val1, val2 и val3 соответственно).

Зачем это нужно? Например, если у вас есть список параметров (названий таблиц в БД, идентификаторов пользователей, датовых срезов, имён файлов и т.д.), и нужно применить одну и ту же функцию (или PythonOperator) к каждому элементу этого списка. Благодаря expand вы получите параллельное исполнение одинаковых задач с разными входными данными.

Когда и почему пригодится динамическая генерация?

  1. Обработка нескольких файлов / табличных партиций: Нужно запустить один и тот же код для набора данных (список имён файлов или датовых партиций), не копируя оператор много раз вручную.
  2. Параллелизм и масштабирование: Airflow может автоматически параллелить задачи, если у вас есть рабочие узлы (workers). Генерация множества однотипных задач позволяет загружать все вычислительные ресурсы.
  3. Гибкость и удобство поддержки: Вместо того, чтобы прописывать статически 10–100 одинаковых задач, достаточно иметь один шаблон (оператор или @task), который вызывается с разными аргументами.

Таким образом, подходы с циклом, TaskGroup или связкой partial/expand решают схожую задачу: динамически сгенерировать набор однотипных задач с разными параметрами. Разница лишь в синтаксисе и удобстве чтения кода.

  • Если вам проще мыслить классическими операторами (PythonOperator) и циклами — используйте «ручную» генерацию в for.
  • Если нравится «чистый» Python-код и декларативный стиль TaskFlow, то обратите внимание на partial и expand.

Все варианты верны; выбор инструмента зависит от вкуса, ситуации и требуемой наглядности кода.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Датасеты (event-based scheduling) что это?

Теги: #Ярослав

A

Датасеты (event-based scheduling) позволяют запускать DAG не по заранее заданному расписанию (которое мы указываем в schedule_interval или через CRON), а по событию. Например, при добавлении файла в S3-бакет, при вызове API или другом схожем событии.

Зачем это нужно?

Если данные появляются раньше стандартного расписания, то и обрабатывать их можно быстрее, не дожидаясь наступления CRON-времени.
Схожего результата можно достичь через сенсоры, но для первого DAG в цепочке сенсор сам по себе не запустится — нужно так или иначе триггерить DAG.
Датасеты упрощают эту логику: один DAG публикует (обновляет) Dataset, а другой DAG подписывается на этот Dataset и запускается автоматически.

Ограничения и актуальная версия

До версии Airflow 2.9 были некоторые шероховатости в работе с датасетами, но к 2.9+ механизм стал более надёжным и гибким.
На некоторых проектах датасеты могут не использоваться просто по причинам легаси-кода (исторически всё завязано на CRON, а менять большие массивы кода дорого и долго).
Датасеты появились в конце 2022 года (в Airflow 2.4) и где-то полтора года дорабатывались, поэтому в новых проектах на свежих версиях Airflow их использование может быть очень удобно.

Как именно это работает?

Датасеты (Datasets) в Apache Airflow — это не оператор и не сенсор. Это механизм, который работает похожим на триггер способом, но встроен в сам движок Airflow.

Производящий DAG (Producer DAG) публикует (обновляет) Dataset при помощи outlets=[my_dataset] который говорит Airflow, что этот таск обновляет my_dataset..
    Это похоже на триггер, но без явного вызова trigger_dag.
    После успешного выполнения DAG-а, Airflow помечает Dataset как обновлённый.

Потребляющий DAG (Consumer DAG) подписан на этот Dataset.
    У него  расписания (schedule=[my_dataset]), 
    Как только Dataset обновится — Airflow автоматически запустит этот DAG.

Важно:

Работает внутри самого Airflow, без внешних API-запросов.
Нет необходимости в сенсорах (которые нагружают систему постоянными проверками).
DAG подписывается на Dataset внутри Airflow, а не через оператор.


Producer DAG (публикует Dataset)

Здесь outlets=[my_dataset] говорит Airflow, что этот таск обновляет my_dataset.

------------------------------------------------------------------------------------------------------------------------------------------------------
	from airflow import DAG from airflow.operators.python import PythonOperator from airflow.datasets import Dataset from datetime import datetime

my_dataset = Dataset(“s3://my-bucket/data.csv”)

def generate_data():
# Код генерации данных и их сохранения
print(“Dataset обновлен!”)

with DAG(
dag_id=”producer_dag”,
start_date=datetime(2024, 1, 1),
schedule=”@daily”, # DAG выполняется раз в день
catchup=False,
) as dag:

task = PythonOperator(
    task_id="publish_dataset",
    python_callable=generate_data,
    outlets=[my_dataset],  # Обновляет Dataset
)
	
	Consumer DAG (запускается при обновлении Dataset)
	
	schedule=[my_dataset] → DAG будет автоматически запускаться при обновлении my_dataset.
	------------------------------------------------------------------------------------------------------------------------------------------------------
	from airflow import DAG from airflow.operators.python import PythonOperator from airflow.datasets import Dataset from datetime import datetime

my_dataset = Dataset(“s3://my-bucket/data.csv”)

def process_data():
print(“Данные обновлены, выполняем обработку!”)

with DAG(
dag_id=”consumer_dag”,
start_date=datetime(2024, 1, 1),
schedule=[my_dataset], # DAG запускается при обновлении Dataset
catchup=False,
) as dag:

task = PythonOperator(
    task_id="consume_dataset",
    python_callable=process_data,
)
17
Q

Что такое pull (connection pull) в Airflow? Как это используется?

Теги: #x5

A
  • В контексте Airflow «pull» обычно означает чтение (запрос) значения из XCom (например, xcom_pull).
  • «Pull» (иногда говорят connection pull) – это когда одна задача извлекает данные, которые другая задача сохранила (через XCom).
18
Q

Чем хук отличается от оператора? Что такое хук?

Теги: #Ярослав

A
  • Оператор - отдельный таск. (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки.
  • Хук можно использовать внутри кастомного пайтон таска для взаимодействия на более низком уровне. Например, если нужно выполнить несколько postgresql запросов с промежуточной обработкой вычислений одним атомарным шагом
19
Q

Что можно указывать в default_args при определении DAG в Airflow?

Теги: #x5 #Ярослав

A

В Airflow при определении DAG и тасков можно использовать параметр default_args, который позволяет задать значения по умолчанию для всех тасков внутри DAG. Разберёмся, какие параметры можно задавать в default_args для DAG и тасков.

default_args передаётся в DAG как словарь, в котором можно указать:
Основные параметры:

owner – владелец DAG (например, 'airflow').
	schedule_interval – интервал запуска DAG (может быть None, если DAG триггерится вручную).
	on_failure_callback Этот параметр принимает функцию-обработчик, которая вызывается при падении таска. В колбэк передаётся объект контекста (context), содержащий полезную информацию о DAG, таске и его окружении.
	catchup – нужно ли догонять пропущенные выполнения DAG (по умолчанию True).
start_date – начальная дата выполнения DAG (например, datetime(2024, 3, 1)).
end_date – дата окончания выполнения DAG (можно не указывать).
depends_on_past – влияет ли выполнение текущего запуска DAG на успешность предыдущего (False по умолчанию).
retries – число попыток перезапуска в случае ошибки (0 по умолчанию).
execution_timeout – максимальное время выполнения таска (timedelta(hours=1)).
dagrun_timeout – максимальное время выполнения DAG (timedelta(days=1)).
catchup – нужно ли догонять пропущенные выполнения DAG (по умолчанию True).
schedule_interval – интервал запуска DAG (может быть None, если DAG триггерится вручную).
priority_weight – приоритет DAG среди других DAG'ов.
wait_for_downstream – нужно ли ждать выполнения всех downstream-тасков перед запуском нового инстанса DAG.
sla_miss_callback – callback-функция, вызываемая при нарушении SLA.
20
Q

Что такое XCOM?

Теги: #Ярослав

A
  • XCom – межтасковый канал обмена данными внутри одного DAG.
  • Сохраняет (push) и извлекает (pull) небольшие объёмы Python-объектов в базе метаданных Airflow.
    Протокол обмена мета-данными между тасок через бэкенд-базу airflow. Типа “икс-ком, кросс-коммьюникейшн”.

Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.

21
Q

Что такое Hooks в Airflow и для чего они используются?

Теги: #x5

A
  • Hooks – это обёртки для работы с внешними ресурсами (базы, хранилища, API).
  • Упрощают открытие соединений, авторизацию, выполнение запросов, чтобы операторы могли переиспользовать эту логику.
22
Q

Catchup, backfill что такое? Зачем?

Теги: #Ярослав

A

Если у дага указаны start_date и schedule, scheduler запланирует выполнение дага для каждого интервала, который ещё не отработал. Это поведение называется catchup (кэтчап).

То же произойдёт, если даг поставить на паузу на какое-то время, а потом – снова включить.

При этом можно указатьexecution_timeout, чтобы запуск дага не работал дольше, чем интервал запуска. Например, дольше часа при запуске каждый час (иначе очередь будет накапливаться, а процесс никогда не закончится).

Бэкфилл позволяет загрузить данные за определённый период, например с даты 1 по дату 2. Запустит все даг раны в указанном диапазоне, даже если они уже отработали успешно раньше.
Запускается только через CLI:
airflow dags backfill \
–start-date START_DATE \
–end-date END_DATE \
dag_id
Чтобы бэкфилл и catchup работали, нужно в таске указывать связь между execution_date и условием фильтрации данных, например select … where dt = {{ execution_date }} https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

Также полезно упомянуть:
depends_on_past – добавляет ограничение – только если предыдущий запуск дага/таски успешен, можно запускать текущий. Не срабатывает в первый запуск по start_date.
max_active_runs – ограничение на количество одновременно работающих дагов.

23
Q

Как можно настроить зависимости между группами параллельных тасок в Airflow?

Теги: #x5

A
  1. Использование TaskGroup с task_group.set_upstream() и task_group.set_downstream()

Если у вас есть две группы тасков, которые выполняются параллельно, но одна должна начинаться только после завершения другой, можно использовать TaskGroup:

with DAG(‘parallel_task_groups’, default_args=default_args, schedule_interval=None) as dag:

start = DummyOperator(task_id='start')

with TaskGroup('group_1') as group_1:
    task_1_1 = DummyOperator(task_id='task_1_1')
    task_1_2 = DummyOperator(task_id='task_1_2')
    task_1_1 >> task_1_2  # Зависимость внутри группы

with TaskGroup('group_2') as group_2:
    task_2_1 = DummyOperator(task_id='task_2_1')
    task_2_2 = DummyOperator(task_id='task_2_2')
    task_2_1 >> task_2_2  # Зависимость внутри группы

end = DummyOperator(task_id='end')

# Определяем зависимости между группами
start >> group_1
group_1.set_downstream(group_2)
group_2 >> end
  1. Использование chain() для линейной связи между группами

Если вы хотите избежать явного указания зависимостей через .set_downstream(), можно использовать airflow.models.baseoperator.chain():

from airflow.models.baseoperator import chain

chain(start, group_1, group_2, end)

  1. Использование ExternalTaskSensor для меж-DAG-овых зависимостей

Если группы находятся в разных DAG’ах, можно использовать ExternalTaskSensor, чтобы ожидать завершения первой группы в одном DAG перед началом второй группы в другом DAG:

from airflow.sensors.external_task import ExternalTaskSensor

wait_for_group_1 = ExternalTaskSensor(
task_id=’wait_for_group_1’,
external_dag_id=’parallel_task_groups’,
external_task_group_id=’group_1’,
mode=’poke’, # или reschedule
timeout=600,
poke_interval=30,
dag=dag
)

with TaskGroup(‘group_2’) as group_2:
task_2_1 = DummyOperator(task_id=’task_2_1’)
task_2_2 = DummyOperator(task_id=’task_2_2’)
task_2_1&raquo_space; task_2_2

wait_for_group_1&raquo_space; group_2

  1. Использование XCom для динамических зависимостей

Если у вас сложные сценарии, где группа тасков должна стартовать только при определенных условиях, можно передавать статус выполнения первой группы через XCom и проверять его во второй группе:

from airflow.operators.python import PythonOperator

def check_group_1(**kwargs):
ti = kwargs[‘ti’]
group_1_status = ti.xcom_pull(task_ids=’group_1.task_1_2’) # Берем статус последнего таска из группы 1
if group_1_status != ‘success’:
raise ValueError(“Group 1 не завершилась успешно!”)

check_task = PythonOperator(
task_id=’check_group_1’,
python_callable=check_group_1,
provide_context=True,
dag=dag
)

group_1&raquo_space; check_task&raquo_space; group_2

24
Q

Какая база данных используется в Airflow? #тг

A

posgressql

25
Назовите достоинства Airflow. #тг
1. Open source 2. Отличная документация 3. Простой код на питоне 4. Удобный веб-интерфейс 5. Алертинг и мониторинг 6. Интеграция с основными источниками 7. Кастомизация 8. Масштабирование 9. Большое комьюнити
26
Что такое DAG в аирфлоу? #Я
DAG (directed acyclic graph, даг) – граф выполнения, атомарная единица в airflow. Определяет параметры запуска первого шага (с каким расписанием или по какому событию), взаимосвязь между шагами. Даги это python файлы, написанные с использованием пакета airflow, и лежащие в директории с дагами (обычно ./dags).
27
Что такое Task? #Я
(таска, шаг, степ) – узел графа, который выполняет полезную работу или определяет условие запуска какой-то из веток графа. Например, нужно ли пропустить все остальные шаги или продолжаем работу. Или может быть общий даг для 10 типов коннекторов и на основе входных параметров нужно запустить только задачи для указанного типа коннектора. Таска может быть: оператором, сенсором или python кодом с декоратором @task.
28
Что такое оператор? #Я
заранее написанный код, который можно сконфигурировать для выполнения нужной задачи. Например, PostgresOperator может принимать на вход `sql` и `postgres_conn_id`, что выполнять и где выполнять.
29
Как установить библиотеку (пайтон пакет) на airflow, добавлять операторы через установку провайдеров #Алексей
Зависит от того, как установлен airflow. Если на виртуальной машине, то почти наверняка управление идёт через обычный pip/poetry/conda или другой пакетный менеджер. Если в контейнере, то можно создать свой мини-образ на основе официального докер-образа, скопировать requirements и поставить через pip install -r requirements.txt. Многие операторы и коннекты не доступны по умолчанию. Их нужно установить или вместе с airflow core, или отдельно через пакетный менеджер. pip install apache-airflow-providers-google==10.1.1 # отдельно pip install "apache-airflow[google]==2.10.1" # вместе У провайдеров есть свой airflow hub по аналогии с docker hub, если это тебе что-то скажет. Также функционал можно расширить плагинами, но это редкость.
30
назави типичные рабочие задачи в аирфлоу #Алексей
* Удалить дубликаты в таблице * Исправить падение дага из-за изменений структуры источника * Большой датасет не помещается в оперативку, как обрабатывать * Исправить перекос данных в greenplum таблице * Протянуть новое поле из источника в витрины
31
Что такое deferrable оператор? #Алексей
Deferred значит отсроченный. Асинхронный процесс ожидания выполнения таски в другом даге или всего дага. Например, TriggerDagRunOperator можно установить в режим deferrable. Отличается от сенсора тем, что высвобождает слот воркера и эффективно масштабируется. Для работы deferrable операторов нужно настроить отдельный компонент airflow – triggerer. 🚀 Зачем нужны deferrable операторы? Обычные операторы в Airflow запускаются на исполнителях (workers) и, если они ожидают какого-либо события (например, завершения загрузки данных в S3 или выполнения запроса в базе), они продолжают занимать worker, что может привести к нехватке ресурсов при высокой нагрузке. Deferrable-операторы не удерживают исполнителя в состоянии ожидания, а освобождают его, передавая контроль специальному процессу Triggerer. После наступления события задача "пробуждается" и завершается. 📌 Как это работает? В deferrable-операторе применяется новый механизм Trigger. Он действует по следующему алгоритму: Запуск задачи: Оператор начинает выполнение и доходит до точки ожидания (например, пока появятся данные в S3). Deferral (отложенное выполнение): Оператор освобождает worker и регистрирует "триггер" в Triggerer (специальный процесс Airflow). Ожидание события: Triggerer асинхронно следит за наступлением условия (например, файла в S3). Пробуждение: После выполнения условия Triggerer передает управление обратно в Airflow, и оператор завершает выполнение. 🔥 Пример использования Deferrable-оператора Пример с S3KeySensor, который проверяет наличие файла в S3. 🛑 Обычный (блокирующий) S3KeySensor: from airflow.providers.amazon.aws.sensors.s3 import S3KeySensor s3_sensor = S3KeySensor( task_id="s3_sensor", bucket_name="my-bucket", bucket_key="path/to/my_file.csv", poke_interval=60, # Проверка каждую минуту (блокирует worker!) timeout=3600, # Ожидание до 1 часа mode="poke" ) 👎 Минусы: Worker будет заблокирован на 60 минут, если файла нет. При сотнях таких сенсоров может исчерпаться пул воркеров. ✅ Deferrable (неблокирующий) S3KeySensorAsync: from airflow.providers.amazon.aws.sensors.s3 import S3KeySensorAsync s3_sensor = S3KeySensorAsync( task_id="s3_sensor_async", bucket_name="my-bucket", bucket_key="path/to/my_file.csv", deferrable=True # Включаем режим deferrable ) 👍 Плюсы: Не блокирует worker — освобождает ресурсы. Работает через Triggerer, который следит за событием асинхронно. Более масштабируемый подход при большом количестве задач. ⚙️ Как включить поддержку Deferrable-операторов? Deferrable-операторы требуют, чтобы в Airflow был запущен процесс Triggerer. Для этого: Включите triggerer в конфигурации Airflow: airflow triggerer start Убедитесь, что в airflow.cfg включен deferrable режим: [scheduler] enable_deferrable_operators = True Если используете Kubernetes Executor или Celery Executor, убедитесь, что у вас есть хотя бы один активный Triggerer pod/process.
32
Как написать свой Sensor ? Какие писал я ? #собес
Основные шаги для создания кастомного Sensor: 1.1. Унаследовать BaseSensorOperator Все сенсоры должны наследоваться от BaseSensorOperator и реализовывать метод poke. 1.2. Реализовать логику проверки в методе poke Метод poke вызывается с заданной периодичностью (poke_interval) и возвращает True, если условие выполнено, или False, если нужно продолжать ждать. 1.3. Определить параметры сенсора Можно передавать параметры через __init__, чтобы настраивать поведение сенсора.
33
Есть 7 тасок - каждую необходимо запускать в разные дни недели, как это реализовать ?
В Apache Airflow можно настроить запуск задач в разные дни недели с помощью **`schedule_interval`** в `cron`-формате или использовать **BranchPythonOperator** для выбора таски в зависимости от текущего дня. Вариант Слоты CPU RAM Scheduler нагрузка Комментарий 🟢 7 DAG'ов с cron по дням 🔹 1 🔹 1 🔹 1 🔹 очень малая Самый эффективный 🟢 Один DAG, 7 тасок с if day ⚪ 7 ⚪ 1 ⚪ 1 ⚪ средняя Хороший компромисс 🟢 Один DAG, .expand() + if day ⚪ 7 ⚪ 1 ⚪ 1 ⚪ средняя Удобно, немного нагружает UI 🟡 BranchPythonOperator ⚪ 7 ⚪ 2 ⚪ 2 ⚪ чуть выше Визуально раздутый DAG 🔴 PythonSensor (ждать день) 🔴 1 🔴 долго 🔴 долго 🔴 высокая Очень неэффективно 🟡 ShortCircuitOperator ⚪ 1 ⚪ 1 ⚪ 1 ⚪ средняя Норм, если в pipeline --- 🔹 **Вариант 1: Использование `schedule_interval` в `DAG`** Самый простой вариант — создать **7 отдельных DAG'ов** или **1 DAG с 7 задачами**, каждая из которых имеет свой `schedule_interval`: ```python from airflow import DAG from airflow.operators.dummy_operator import DummyOperator from datetime import datetime default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 3, 1), 'retries': 1 } Расписание в cron-формате: "0 0 * * X" — запуск раз в неделю в 00:00 (X - день недели) schedule_mapping = { 'task_mon': '0 0 * * 1', # Понедельник 'task_tue': '0 0 * * 2', # Вторник 'task_wed': '0 0 * * 3', # Среда 'task_thu': '0 0 * * 4', # Четверг 'task_fri': '0 0 * * 5', # Пятница 'task_sat': '0 0 * * 6', # Суббота 'task_sun': '0 0 * * 0', # Воскресенье } for task_id, cron_expr in schedule_mapping.items(): dag = DAG( dag_id=f'dag_{task_id}', default_args=default_args, schedule_interval=cron_expr, catchup=False ) task = DummyOperator( task_id=task_id, dag=dag ) ``` 🟢 **Плюсы:** Простота, задачи запускаются точно в нужный день. 🔴 **Минусы:** 7 DAG'ов, что может захламлять UI. --- 🔹 **Вариант 2: Один DAG, но с `BranchPythonOperator`** Если нужно один DAG, который будет запускать только **нужную** таску в определённый день, можно использовать **BranchPythonOperator**: ```python from airflow import DAG from airflow.operators.python import BranchPythonOperator from airflow.operators.dummy import DummyOperator from datetime import datetime import pendulum default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 3, 1), 'retries': 1 } dag = DAG( dag_id='dag_weekly_tasks', default_args=default_args, schedule_interval='@daily', # DAG запускается каждый день catchup=False ) Определяем, какая задача выполняется сегодня def choose_task(): day_of_week = pendulum.now().day_of_week # 0 - Пн, 6 - Вс task_map = { 0: "task_mon", 1: "task_tue", 2: "task_wed", 3: "task_thu", 4: "task_fri", 5: "task_sat", 6: "task_sun", } return task_map.get(day_of_week, "end_task") # По умолчанию завершаем DAG branch_task = BranchPythonOperator( task_id='branch_task', python_callable=choose_task, dag=dag ) end_task = DummyOperator(task_id="end_task", dag=dag) task_mon = DummyOperator(task_id="task_mon", dag=dag) task_tue = DummyOperator(task_id="task_tue", dag=dag) task_wed = DummyOperator(task_id="task_wed", dag=dag) task_thu = DummyOperator(task_id="task_thu", dag=dag) task_fri = DummyOperator(task_id="task_fri", dag=dag) task_sat = DummyOperator(task_id="task_sat", dag=dag) task_sun = DummyOperator(task_id="task_sun", dag=dag) branch_task >> [task_mon, task_tue, task_wed, task_thu, task_fri, task_sat, task_sun, end_task] ``` 🟢 **Плюсы:** Один DAG, легко поддерживать. 🔴 **Минусы:** Запускается каждый день, даже если не нужно. --- 🔹 **Вариант 3: Один DAG, но таски с `execution_date`** Если задачи должны запускаться только **в определённые дни** без `BranchPythonOperator`, можно проверять `execution_date`: ```python from airflow import DAG from airflow.operators.python import PythonOperator from datetime import datetime def run_task(**kwargs): day_of_week = kwargs['execution_date'].weekday() # 0 - Пн, 6 - Вс if day_of_week == 0: print("Запуск таски для понедельника") elif day_of_week == 1: print("Запуск таски для вторника") # и т.д. default_args = { 'owner': 'airflow', 'start_date': datetime(2025, 3, 1), } dag = DAG( dag_id='dag_conditional_tasks', default_args=default_args, schedule_interval='@daily', catchup=False ) task = PythonOperator( task_id="conditional_task", python_callable=run_task, provide_context=True, dag=dag ) ``` 🟢 **Плюсы:** Один DAG, только одна таска. 🔴 **Минусы:** Меньше прозрачности в UI (все таски в одном блоке). --- ✅ **Какой вариант выбрать?** |---------|----------------------| | 1. **Отдельные DAG'и** | Если задачи независимы, UI должен быть чистым | | 2. **BranchPythonOperator** | Если задачи логически связаны, но их выполнение зависит от дня недели | | 3. **Одна таска с `execution_date`** | Если нужно минимизировать количество DAG'ов и запусков | Что тебе подходит больше? 😊 | Вариант | Когда использовать? |
34
какие опреаторы есть в аирфлоу? и какие самые частые для использования для де? #я
В Apache Airflow есть множество операторов (Operators), которые используются для выполнения различных задач в DAG'ах. Их можно разделить на несколько категорий: --- 🔹 1. **Базовые операторы** Эти операторы входят в Airflow "из коробки" и используются чаще всего. - **`BashOperator`** – выполняет команды в Bash-скрипте. - **`PythonOperator`** – выполняет Python-функцию. - **`BranchPythonOperator`** – разветвляет выполнение DAG'а на основе Python-функции. - **`DummyOperator`** (в новых версиях `EmptyOperator`) – используется для создания пустых задач (заглушек). - **`SubDagOperator`** – позволяет вкладывать один DAG в другой (не рекомендуется, лучше использовать TaskGroup). --- 🔹 2. **Операторы работы с файлами и хранилищами** Используются для работы с файлами, S3, FTP и другими хранилищами. - **`FileSensor`** – ожидает появления файла в файловой системе. - **`S3FileTransformOperator`** – загружает, изменяет и сохраняет файл в S3. - **`S3ToGCSOperator`** – копирует данные из S3 в Google Cloud Storage. - **`FTPOperator`**, **`FTPSensor`** – работа с FTP. --- 🔹 3. **SQL-операторы** Позволяют работать с различными базами данных. - **`PostgresOperator`**, **`PostgresHook`** – выполняет SQL в PostgreSQL. - **`MySqlOperator`**, **`MySqlHook`** – аналогично, но для MySQL. - **`MsSqlOperator`** – для Microsoft SQL Server. - **`OracleOperator`** – выполняет SQL в Oracle. - **`SnowflakeOperator`** – выполняет SQL в Snowflake. - **`BigQueryOperator`** – выполняет SQL в Google BigQuery. - **`SQLiteOperator`** – выполняет SQL в SQLite. Также есть: - **`SQLExecuteQueryOperator`** – универсальный оператор для SQL-запросов (с Airflow 2.4+). - **`SQLCheckOperator`**, **`SQLValueCheckOperator`**, **`SQLIntervalCheckOperator`** – проверка качества данных в БД. --- 🔹 4. **Операторы интеграции с облачными сервисами** Используются для работы с AWS, Google Cloud, Azure и Яндекс.Облаком. ✅ **AWS** - **`S3CreateBucketOperator`**, **`S3ListOperator`** – работа с S3. - **`EMRCreateJobFlowOperator`**, **`EMRStepSensor`** – работа с AWS EMR. ✅ **Google Cloud** - **`BigQueryInsertJobOperator`**, **`BigQueryCheckOperator`** – работа с Google BigQuery. - **`GCSCreateBucketOperator`**, **`GCSToBigQueryOperator`** – работа с GCS. ✅ **Azure** - **`AzureDataLakeStorageDeleteOperator`** – работа с ADLS. - **`WasbCreateBlobOperator`** – работа с Blob Storage. ✅ **Яндекс.Облако** - **`YandexCloudDataProcCreateClusterOperator`** – работа с DataProc. - **`YandexCloudStorageCreateBucketOperator`** – работа с Yandex Object Storage. --- 🔹 5. **Операторы для ETL** Используются для запуска ETL-процессов. - **`SparkSubmitOperator`** – отправляет Spark-задание. - **`DataprocSubmitJobOperator`** – запускает Spark на Google Dataproc. - **`DataflowCreateJobOperator`** – работа с Apache Beam (Google Dataflow). - **`HiveOperator`** – выполнение SQL в Hive. - **`SqoopOperator`** – импорт данных с SQL в Hadoop. - **`PrestoToMySqlOperator`** – копирование данных из Presto в MySQL. --- 🔹 6. **Операторы работы с API** Используются для работы с REST и GraphQL API. - **`SimpleHttpOperator`** – отправка HTTP-запросов. - **`HttpSensor`** – ожидание HTTP-ответа. - **`SlackAPIPostOperator`** – отправка сообщений в Slack. - **`DiscordWebhookOperator`** – отправка сообщений в Discord. - **`TelegramOperator`** – отправка сообщений в Telegram. --- 🔹 7. **Операторы ML и Data Science** Используются в машинном обучении. - **`KubernetesPodOperator`** – запуск контейнеров в Kubernetes. - **`VertexAIStartTrainingOperator`** – работа с Google Vertex AI. - **`SageMakerTrainingOperator`** – обучение моделей в AWS SageMaker. --- ✅ **Самые частые операторы для Data Engineer** Как DE, чаще всего используешь: 1️⃣ **`PythonOperator`** – запуск Python-скриптов. 2️⃣ **`BashOperator`** – запуск команд Bash. 3️⃣ **`PostgresOperator`**, **`OracleOperator`** – работа с SQL. 4️⃣ **`SparkSubmitOperator`** – запуск Spark-заданий. 5️⃣ **`S3ToGCSOperator`**, **`GCSToBigQueryOperator`** – работа с облаками. 6️⃣ **`DummyOperator`** / **`EmptyOperator`** – заглушки. 7️⃣ **`HttpSensor`**, **`SimpleHttpOperator`** – API-запросы. 8️⃣ **`FileSensor`** – ожидание файлов. 9️⃣ **`TaskGroup`** – группировка задач. 🔟 **`BranchPythonOperator`** – логика ветвления. Если работаешь с **Yandex.Cloud**, могут пригодиться **YandexCloudStorageCreateBucketOperator** и **YandexCloudDataProcCreateClusterOperator**. --- 🔹 **Вывод**: В Airflow много операторов, но в реальной работе DE чаще всего использует **Python, Bash, SQL и ETL-операторы**. Остальные – по необходимости. Хочешь примеры кода для конкретных операторов? 🚀
35
какие сеносорыв есть в аирфлоу? и какие самые частые для использования для де? #я
✅ **Самые частые сенсоры для Data Engineer** Как DE, ты чаще всего используешь: 1️⃣ **`FileSensor`** – ждет файла в локальной файловой системе. 2️⃣ **`S3KeySensor`** – ждет файла в S3 (если работаешь с AWS). 3️⃣ **`GCSObjectExistenceSensor`** – аналогично, но для Google Cloud. 4️⃣ **`SqlSensor`**, **`PostgresSensor`** – ждет появления данных в БД. 5️⃣ **`ExternalTaskSensor`** – ждет выполнения задачи в другом DAG'е. 6️⃣ **`HttpSensor`** – ждет ответа от API. 7️⃣ **`TimeSensor`** – ждет конкретного времени, если DAG должен запускаться строго в определенный момент. Если работаешь в **Яндекс.Облаке**, можно использовать `HttpSensor` для проверки состояния ресурсов через API. 📡 Сенсоры в Apache Airflow Сенсоры (Sensors) в Airflow – это особый тип задач, которые **ожидают наступления определенного события** (например, появления файла, выполнения SQL-запроса или доступности API). Они полезны для организации зависимостей между задачами. --- 🔹 **1. Базовые сенсоры** Эти сенсоры входят в стандартный набор Airflow. - **`TimeSensor`** – ожидает наступления определенного времени. - **`DateTimeSensor`** – ждет конкретного времени и даты. - **`ExternalTaskSensor`** – ожидает завершения задачи в другом DAG'е. - **`HttpSensor`** – ждет успешного HTTP-ответа от API. --- 🔹 **2. Сенсоры работы с файлами** Используются для мониторинга файлов в локальной файловой системе, HDFS, S3 и других хранилищах. - **`FileSensor`** – ждет появления файла в локальной файловой системе. - **`HdfsSensor`** – ожидает файл в HDFS. - **`S3KeySensor`** – ждет файл в S3. - **`GCSObjectExistenceSensor`** – ожидает объект в Google Cloud Storage. - **`GCSObjectUpdateSensor`** – ждет обновления файла в GCS. --- 🔹 **3. Сенсоры для баз данных** Ожидают завершения SQL-запросов или появления данных в таблице. - **`SqlSensor`** – ждет, пока SQL-запрос вернет результат. - **`PostgresSensor`** – ждет появления данных в PostgreSQL. - **`MySqlSensor`** – аналогично, но для MySQL. - **`MsSqlSensor`** – ждет выполнения запроса в SQL Server. - **`OracleSensor`** – ожидает появления данных в Oracle. - **`MetastorePartitionSensor`** – ждет появления нового партиционированного набора данных в Hive. --- 🔹 **4. Сенсоры для работы с API** Используются для ожидания ответа от REST API или GraphQL. - **`HttpSensor`** – ждет успешного HTTP-ответа. - **`S3KeySensor`** – можно использовать для проверки наличия объектов в S3. - **`HdfsSensor`** – проверяет файлы в HDFS. --- 🔹 **5. Сенсоры облачных платформ** Используются для работы с AWS, Google Cloud, Azure и Яндекс.Облаком. ✅ **AWS** - **`S3KeySensor`** – ждет появления файла в S3. - **`EmrJobFlowSensor`** – ждет завершения задачи в AWS EMR. - **`EmrStepSensor`** – ждет выполнения конкретного шага в EMR. ✅ **Google Cloud** - **`GCSObjectExistenceSensor`** – ждет объект в Google Cloud Storage. - **`GCSObjectUpdateSensor`** – ждет обновления объекта. ✅ **Azure** - **`WasbBlobSensor`** – ждет объект в Azure Blob Storage. ✅ **Яндекс.Облако** - **(Кастомные сенсоры через API-запросы)** – так как Airflow не поддерживает Яндекс.Облако "из коробки", можно использовать `HttpSensor` для работы с API. --- ✅ **Самые частые сенсоры для Data Engineer** Как DE, ты чаще всего используешь: 1️⃣ **`FileSensor`** – ждет файла в локальной файловой системе. 2️⃣ **`S3KeySensor`** – ждет файла в S3 (если работаешь с AWS). 3️⃣ **`GCSObjectExistenceSensor`** – аналогично, но для Google Cloud. 4️⃣ **`SqlSensor`**, **`PostgresSensor`** – ждет появления данных в БД. 5️⃣ **`ExternalTaskSensor`** – ждет выполнения задачи в другом DAG'е. 6️⃣ **`HttpSensor`** – ждет ответа от API. 7️⃣ **`TimeSensor`** – ждет конкретного времени, если DAG должен запускаться строго в определенный момент. Если работаешь в **Яндекс.Облаке**, можно использовать `HttpSensor` для проверки состояния ресурсов через API. --- 🔹 **Пример использования сенсоров** ✅ **Пример 1: Ожидание файла в локальной файловой системе** ```python from airflow.sensors.filesystem import FileSensor from airflow.operators.dummy import DummyOperator from airflow import DAG from datetime import datetime with DAG( "file_sensor_example", schedule_interval="@daily", start_date=datetime(2024, 1, 1), catchup=False, ) as dag: wait_for_file = FileSensor( task_id="wait_for_file", filepath="/opt/airflow/data/input.csv", poke_interval=30, # Проверять каждые 30 секунд timeout=600, # Ждать максимум 10 минут ) process_data = DummyOperator(task_id="process_data") wait_for_file >> process_data ``` 📌 **Что делает этот DAG?** - Ждет файл `/opt/airflow/data/input.csv` в течение 10 минут. - Как только файл появится – переходит к следующему шагу. --- ✅ **Пример 2: Ожидание данных в PostgreSQL** ```python from airflow.providers.postgres.sensors.postgres import PostgresSensor wait_for_data = PostgresSensor( task_id="wait_for_data", postgres_conn_id="my_postgres_conn", sql="SELECT COUNT(*) FROM my_table WHERE processed = false;", mode="poke", # Периодически проверять poke_interval=60, # Проверять каждую минуту timeout=3600, # Ждать максимум 1 час ) ``` 📌 **Что делает этот сенсор?** - Ждет, пока в таблице `my_table` появятся новые данные с `processed = false`. - Если данных нет – будет проверять каждую минуту в течение 1 часа. --- ✅ **Пример 3: Ожидание HTTP-ответа** ```python from airflow.sensors.http import HttpSensor wait_for_api = HttpSensor( task_id="wait_for_api", http_conn_id="my_api_conn", endpoint="status", method="GET", response_check=lambda response: response.json()["status"] == "ready", poke_interval=30, timeout=600, ) ``` 📌 **Что делает этот сенсор?** - Раз в 30 секунд отправляет GET-запрос на `/status`. - Ждет, пока `response.json()["status"] == "ready"`. - Если через 10 минут не дождался – фейлится. --- 🔥 **Вывод** - Сенсоры помогают **ждать события** (файлы, SQL-данные, API, время). - Чаще всего DE использует **FileSensor, SqlSensor, ExternalTaskSensor, S3KeySensor**. - Можно кастомизировать сенсоры (например, через `HttpSensor` работать с API Яндекс.Облака). 📌 **Хочешь разбор еще какого-то конкретного сенсора или кастомный вариант?** 🚀