Airflow Flashcards
Что знаешь про архитектуру Airflow? Как устроена связь компонентов ? Как развернуто?
Теги: #okko, #x5
Архитектура Apache Airflow и взаимодействие компонентов
Apache Airflow — это платформа для оркестрации рабочих процессов, которая состоит из нескольких ключевых компонентов:
Metadata DB — центральное хранилище состояния DAG'ов, задач, их статусов и метаданных. Scheduler — отвечает за чтение расписания DAG'ов и определение, какие задачи должны быть запущены и когда. Например, если задача обработки данных настроена на ежедневный запуск в 11:00, планировщик автоматически создаст ее экземпляр. Webserver — веб-интерфейс для мониторинга и управления DAG'ами, позволяет запускать задачи вручную, отслеживать логи и настраивать параметры. Trigger — управляет выполнением отложенных задач. В базовой конфигурации Airflow триггеры не используются, но их можно подключить при необходимости. Worker(ы) — исполняют задачи, назначенные Scheduler’ом. Их количество и способ работы зависят от типа Executor'а: В LocalExecutor и SequentialExecutor worker'ы являются частью процесса. В CeleryExecutor они могут работать в отдельных процессах или даже на разных серверах. Executor — определяет, как именно будут выполняться задачи. Airflow поддерживает несколько типов исполнителей, позволяя настраивать распределенное или локальное выполнение.
Как устроена связь между компонентами?
Scheduler взаимодействует с Metadata DB, определяя, какие задачи необходимо запустить. Webserver получает данные от Metadata DB и отображает их в UI. Executor забирает задачи из Scheduler и передает их Worker'ам. Worker'ы исполняют задачи и записывают результат в Metadata DB. Если используется CeleryExecutor, взаимодействие между Scheduler и Worker'ами происходит через брокер сообщений (например, Redis или RabbitMQ).
Как развернуть Airflow в продакшене?
Обычно в проде используется CeleryExecutor или KubernetesExecutor, что позволяет масштабировать выполнение задач. Компоненты развертываются следующим образом:
Metadata DB — PostgreSQL/MySQL, хранит состояние DAG'ов и задач. Scheduler — отдельный сервис, который считывает расписание и запускает задачи. Webserver — развертывается как отдельный сервис (обычно Gunicorn+Flask). Worker'ы — выполняют задачи, работают в отдельных контейнерах/серверных процессах. В случае CeleryExecutor используются Redis/RabbitMQ как брокер сообщений. В KubernetesExecutor каждый worker создается как pod в K8s. Executor — управляет распределением задач между worker'ами. Triggerer (если используются отложенные задачи) — запускает их при срабатывании условий. Logs — хранятся либо в локальной ФС worker'ов, либо в S3, GCS, HDFS.
Способы развертывания:
Docker Compose (для небольших кластеров) Kubernetes (Helm Chart) — самый популярный вариант в проде Systemd/Supervisor (если развертывание на bare-metal)
Какие есть виды executors и как они работают?
Теги: #Ярослав #okko, #wildberries, #x5
В конфигурационном файле Airflow (airflow.cfg) или через переменные окружения в docker container/compose можно указать, какой executor использовать для планирования и выполнения задач.
- SequentialExecutor
* Суть: Запускает задачи строго по очереди (по одной за раз).
SequentialExecutor – Как он работает?
1️⃣ Scheduler берет задачу из DAG и исполняет её последовательно.
2️⃣ Одна задача за раз – следующая задача выполняется только после завершения предыдущей.
3️⃣ Все метаданные записываются в PostgreSQL/MySQL.
- Плюсы:
o Проще всего настроить.
o Идеален для отладки и тестирования. - Минусы:
o Нет параллелизма → низкая производительность. - Когда использовать:
o Для локальной разработки и небольших проектов без большого числа задач.
________________________________________
2. LocalExecutor - Суть: Параллельное выполнение на одной машине (через потоки или процессы).
- Как работает LocalExecutor?
1️⃣ Scheduler анализирует DAG и запускает задачи.
2️⃣ Для выполнения задач используется локальный пул процессов (multiprocessing Pool).
3️⃣ Процессы выполняются параллельно, используя ресурсы одного сервера.
4️⃣ Результаты выполнения записываются в базу данных Airflow.
📌 LocalExecutor отлично подходит для запуска DAG’ов на одном сервере без сложных настроек.
* Плюсы:
o Даёт параллелизм без дополнительных сервисов.
o Простая настройка (нужен только один сервер).
* Минусы:
o Упирается в ресурсы одной машины.
* Когда использовать:
o Средние проекты, где один сервер достаточен по ресурсам.
________________________________________
3. CeleryExecutor
* Суть: Celery Executor — это механизм выполнения задач в Apache Airflow, который позволяет распределять выполнение задач DAG по нескольким воркерам. Это делает его хорошим выбором для масштабируемых и отказоустойчивых систем.
- Как это работает?o Scheduler добавляет задачи в очередь сообщений (например, Redis или RabbitMQ).
o Celery Worker’ы подписаны на эту очередь и периодически запрашивают новые задачи.
o Когда воркер получает задачу, он исполняет её и возвращает результат.
o Каждая задача выполняется локально на воркере, но воркеров может быть несколько и на разных серверах.
o Метаданные о статусе задачи хранятся в базе данных Airflow.
o Scheduler отслеживает состояние задач и триггерит зависимости между ними. - Основные компоненты:
o Scheduler – распределяет задачи по очереди сообщений.
o Broker (Redis/RabbitMQ) – передаёт задачи воркерам.
o Workers – исполняют задачи.
o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах, статусах задач.
o Web Server – UI для мониторинга. - Когда использовать:
o Если нужно масштабируемое распределение нагрузки.
o Если задачи требуют разных вычислительных мощностей.
o Если хочется автоматически перераспределять упавшие задачи
________________________________________
4. KubernetesExecutor - KubernetesExecutor — это механизм выполнения задач в Apache Airflow, который запускает каждую задачу в отдельном Pod’е в Kubernetes. Это делает его самым масштабируемым и гибким вариантом среди всех Airflow Executors.
- Как это работает?
1️⃣ Scheduler создаёт задачу и отправляет её в API Kubernetes.
2️⃣ Kubernetes создаёт новый Pod, в котором запускается нужный оператор (например, PythonOperator).
3️⃣ Задача выполняется внутри Pod’а.
4️⃣ По завершении задачи Pod автоматически удаляется.
5️⃣ Scheduler получает статус выполнения и триггерит зависимости.
🔥 Главное отличие от CeleryExecutor — нет долгоживущих воркеров! Каждая задача исполняется в новом Pod’е.
* Основные компоненты
o Scheduler – отправляет задачи в Kubernetes API.
o Kubernetes API – создаёт и управляет Pod’ами.
o Worker Pod’ы – выполняют задачи DAG.
o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах, статусах задач.
o Web Server – UI для мониторинга.
* Когда использовать:
o Когда нужно горизонтальное масштабирование (лучше, чем CeleryExecutor).
o Когда задачи требуют разного объёма ресурсов (можно задать CPU/RAM на уровне Pod’а).
o Когда нужна высокая отказоустойчивость.
o Когда Airflow уже работает в Kubernetes (иначе проще использовать CeleryExecutor).
________________________________________
5. DaskExecutor
* Суть: DaskExecutor — это механизм выполнения задач в Apache Airflow, который использует кластер Dask для распределённого выполнения задач.
Этот Executor полезен, если у тебя уже есть кластер Dask и ты хочешь использовать его возможности для выполнения DAG’ов.
* Как это работает?
1️⃣ Scheduler отправляет задачу в кластер Dask.
2️⃣ Dask запускает Task на одном из доступных воркеров.
3️⃣ После выполнения результата возвращаются в Airflow.
4️⃣ Метаданные хранятся в БД Airflow.
🔥 В отличие от CeleryExecutor, DaskExecutor не требует очереди сообщений (Redis/RabbitMQ). Он напрямую отправляет задачи в кластер Dask.
- Основные компоненты
o Scheduler – отправляет задачи в кластер Dask.
o Dask Cluster – выполняет задачи DAG’ов.
o Dask Scheduler – управляет распределением задач по воркерам.
o Dask Workers – исполняют задачи DAG’ов.
o Metadata Database (PostgreSQL/MySQL) – хранит информацию о DAG’ах.
o Web Server – UI для мониторинга DAG’ов. - Когда использовать:
o Если у тебя уже есть кластер Dask.
o Если задачи DAG’ов требуют многочисленных распределённых вычислений.
o Если нужно гибкое управление ресурсами (CPU/GPU).
o Если ты не хочешь использовать CeleryExecutor с Redis/RabbitMQ.
________________________________________
6. DebugExecutor - Суть: DebugExecutor — это самый простой и удобный способ отлаживать DAG’и в локальном окружении без запуска Scheduler’а, Worker’ов или БД.
🔥 Главное отличие от других Executors:
Задачи выполняются синхронно и последовательно, прямо в текущем процессе (без фоновых процессов, очередей или Kubernetes). Результаты сразу выводятся в консоль. * Как это работает? 1️⃣ DAG запускается как обычный Python-скрипт. 2️⃣ Scheduler не требуется – задачи исполняются сразу. 3️⃣ БД не нужна – нет записи метаданных. 4️⃣ Результаты сразу видны в консоли (stdout). 🔥 В отличие от LocalExecutor, DebugExecutor не использует многопоточность и выполняет задачи последовательно!
- Основные компоненты:
o DAG – исполняется как обычный Python-скрипт.
o DebugExecutor – запускает задачи в текущем процессе.
o Логирование – сразу видно в консоли.
o Нет воркеров, брокеров сообщений или Scheduler’а. - Минусы:
o Нет параллелизма.
o Не предназначен для продакшна. - Когда использовать:
o Локальная отладка и проверка логики DAG’ов.
________________________________________
7. CeleryKubernetesExecutor - Суть: CeleryKubernetesExecutor — это гибридный Executor, который объединяет возможности CeleryExecutor и KubernetesExecutor. Он позволяет гибко распределять задачи DAG’ов:Стандартные задачи исполняются через CeleryExecutor (на постоянных воркерах).
Ресурсоёмкие задачи запускаются в отдельных Kubernetes Pod’ах (как в KubernetesExecutor).
🔥 Главное преимущество — возможность гибкого выбора способа выполнения для каждой задачи DAG’а.
* Как это работает?
1️⃣ Scheduler анализирует DAG и определяет, как выполнять задачи:
Обычные задачи → Celery (в воркерах Celery). Тяжёлые задачи → Kubernetes (новый Pod в Kubernetes).
2️⃣ Celery Worker’ы берут задачи из Redis/RabbitMQ и выполняют их.
3️⃣ Kubernetes Pod’ы создаются и уничтожаются для выполнения отдельных задач.
4️⃣ Метаданные (статус задач) хранятся в базе Airflow.
🔥 Celery + Kubernetes = гибкость и масштабируемость
* Основные компоненты
o Scheduler – распределяет задачи между Celery и Kubernetes.
o Broker (Redis/RabbitMQ) – очередь сообщений для Celery.
o Celery Worker’ы – обрабатывают задачи, которые не требуют выделенных ресурсов.
o Kubernetes Cluster – создаёт Pod’ы для задач с особыми требованиями.
o Metadata Database – хранит состояние DAG’ов и задач.
o Web Server – UI для мониторинга.
- Когда использовать:
✅ Если у тебя есть Celery, но иногда нужны временные ресурсы Kubernetes.
✅ Если хочешь минимизировать использование Pod’ов (они создаются только для тяжёлых задач).
✅ Если нужно комбинировать стабильность Celery с масштабируемостью Kubernetes.
Какие компоненты нужно указать при переходе Airflow из локальной среды в прод? Какие конфигурации необходимо настроить?
Теги: #Cian
Итог
При переходе Airflow из локальной среды в прод необходимо:
o Переключиться на полноценную БД (Postgres/MySQL).
o Выбрать Executor, чаще всего Celery или Kubernetes (с настройкой брокера и бэкенда).
o Подготовить окружение для webserver, настроить аутентификацию/авторизацию.
o Настроить scheduler (планировщик) на стабильную работу с нужным параллелизмом.
o Решить вопрос с хранением логов (локально, S3, GCS и т.д.).
o Настроить и защитить Connections и Variables (секреты).
o Организовать безопасность — аутентификация (RBAC, LDAP), HTTPS, доступ по VPN.
o Обеспечить масштабирование и мониторинг (проверять ресурсы, метрики).
- Обёртка (Docker/Kubernetes или «голое» развёртывание)В продакшне часто используют Docker или Kubernetes для упаковки Airflow.
Если используете Helm Chart для Airflow, нужно настроить:
Переменные окружения (env) для указания подключения к БД, брокеру и т.д.
Секреты (Secrets) для хранения логинов/паролей.
Объёмы (Volumes) для шаринга логов, плагинов, файлов DAG-ов.
Если устанавливается на «голый» сервер, то используют systemd/Supervisor для автозапуска процессов (scheduler, webserver, worker).
1 Тип экзекьютора (Executor)
LocalExecutor или SequentialExecutor — используются в тестовых/локальных средах. CeleryExecutor — чаще всего для продакшна, так как поддерживает горизонтальное масштабирование, распределение задач между воркерами. Потребуется настроить брокер сообщений (RabbitMQ или Redis) и бэкенд (результаты выполнения задач часто хранятся в той же базе, что и метаданные, либо в Redis). KubernetesExecutor — если Airflow развёртывается в Kubernetes, и требуется динамическое создание подов для задач.
При переходе на CeleryExecutor или KubernetesExecutor нужно настроить соответствующее окружение и сервисы (брокер, очередь, база данных, и т.д.).
- Брокер и бекенд (для CeleryExecutor)
Если выбран CeleryExecutor, то обязательно требуется:
Broker (брокер сообщений): обычно RabbitMQ или Redis. Указать параметры подключения к брокеру в airflow.cfg (broker_url). Result backend (хранилище результатов): может быть Redis или та же PostgreSQL/MySQL. Указать result_backend в airflow.cfg.
- База данных (Metadata DB)
В локальной среде часто используется SQLite, но в продакшне необходимо использовать полноценную реляционную БД, например:
PostgreSQL MySQL
Основные моменты:
Указать корректный sql_alchemy_conn в airflow.cfg (или в переменных окружения). Убедиться, что Airflow имеет права на чтение/запись в эту БД. Настроить регулярную процедуру бэкапа.
- Настройки веб-сервера Airflow (Webserver)Авторизация и аутентификация
Переключение с открытой анонимной аутентификации на роль-based (RBAC) или другие варианты (LDAP/OAuth/SAML).
Настройка учетных записей, ролей (Admin, User, Viewer и т.д.) и т.п.
Настройка порта и хостинга
Как будет доступен Airflow UI – за прокси (Nginx, Traefik), либо напрямую.
SSL/HTTPS (через прокси-сервер или «комбинированно»).
Gunicorn-воркеры, количество процессов, таймауты.- Scheduler (Планировщик)
В продакшне:
Для эффективного управления ресурсами Airflow необходимо правильно настраивать ключевые параметры планировщика (scheduler), которые влияют на обработку DAG-ов и задач: max_threads — максимальное количество потоков, которые использует Scheduler для параллельной обработки DAG-файлов. По умолчанию, планировщик сканирует папку с DAG-ами и загружает их в память, что может быть ресурсоёмкой операцией. Если DAG-ов много, увеличение max_threads позволит ускорить их обработку. Однако слишком высокое значение может привести к перегрузке процессора, поэтому его следует подбирать с учетом доступных ресурсов. min_file_process_interval — минимальный интервал в секундах между повторными обходами DAG-файлов. Если, например, установить min_file_process_interval=30, то планировщик не будет проверять файлы DAG-ов чаще, чем раз в 30 секунд. Это позволяет снизить нагрузку на файловую систему, особенно если DAG-ов много и они часто обновляются.
Запуск планировщика как сервиса
Для стабильной работы Airflow важно следить, чтобы Scheduler запускался как системный сервис. Это гарантирует его автоматический перезапуск в случае сбоя и удобное управление процессом. В зависимости от окружения используются разные методы:
systemd — стандартный способ управления сервисами в Linux. Позволяет настроить автоматический рестарт планировщика в случае падения. supervisord — инструмент мониторинга и управления процессами, который также может автоматически перезапускать Airflow Scheduler. Docker/Kubernetes — при использовании контейнеризированного окружения Airflow можно настроить Kubernetes Pod или Docker контейнер с политикой перезапуска (restart policy). Например, в Kubernetes можно использовать restartPolicy: Always, чтобы перезапуск происходил автоматически при падении пода.
- ЛогированиеПути хранения логов
В продакшне логи обычно хранят отдельно (S3, GCS, NFS, локальная папка на лог-сервере).
В airflow.cfg прописать remote_logging и параметры удалённого хранилища, если нужно.
Уровень логирования (DEBUG/INFO/WARN). - Настройка Connections и VariablesВсе внешние подключение (Connections: БД, API, S3, Hadoop-кластеры и т.п.) переводятся из локальных URI в прод-окружения.
Хранение чувствительных данных:
Если необходимо, подключать secret-backend (Vault, AWS Secret Manager, GCP Secret Manager).
Убедиться, что значения паролей не «зашиты» в простых текститовых airflow.cfg и т.п.9. Масштабирование
При увеличении количества DAG-ов и задач в Apache Airflow важно контролировать ключевые параметры, определяющие уровень параллелизма и управление ресурсами. Параметры parallelism, dag_concurrency и worker_concurrency настраиваются в файле конфигурации airflow.cfg:parallelism — глобальный уровень параллелизма в Airflow. Этот параметр задаёт максимальное количество задач, которые могут выполняться одновременно во всей системе (во всех DAG-ах). Например, если parallelism=32, то в любой момент времени не более 32 задач могут находиться в активном состоянии (running), независимо от того, в каких DAG-ах они находятся.dag_concurrency — ограничение на число задач, которые могут выполняться одновременно в рамках одного DAG-а. Если, например, dag_concurrency=8, это означает, что даже если DAG содержит больше задач, Airflow не запустит более 8 одновременно активных задач для этого DAG-а. Этот параметр важен, если есть DAG-и с большим количеством задач, чтобы они не потребляли слишком много ресурсов.worker_concurrency — количество процессов, которые может запускать один worker в CeleryExecutor. Если используется CeleryExecutor, то каждый worker может параллельно обрабатывать несколько задач, но их общее количество определяется worker_concurrency. Например, если параметр установлен в worker_concurrency=16, то один worker сможет одновременно обрабатывать до 16 задач. Это значение должно быть согласовано с ресурсами хоста (CPU, RAM), чтобы избежать перегрузки.
Как можно передавать данные между тасками в Airflow ? Какие ограничения существуют? Почему не рекомендуется передавать большие объекты (например, DataFrame) через XCom?
Теги: #Cian, #Иннотех, #Rubbles, #x5 #Ярослав
- XCom (Cross-Communication) — механизм, позволяющий таскам обмениваться небольшими данными (например, статусом, путями к файлам, ключами и т.п.).
- Под капотом данные хранятся в базе Airflow, поэтому есть ограничения по объёму.
- Рекомендуется передавать только “легковесные” объекты (строки, числа, ссылки).
- Можно также хранить данные во внешних хранилищах и передавать ссылки (при больших объёмах).
Производится через некоторый внешний сервис. Или нужно сохранить в файл (локально на ВМ/контейнер airflow, S3, hdfs и тд) и в следующем шаге оттуда считать. Или сложить в базу в промежуточную схему и потом читать оттуда.
Ещё раз полезно напомнить, что Airflow это оркестратор, который отправляет задачи внешним системам и следит за их исполнением, а не ETL инструмент.
Как передавать данные через XCom?
Запись данных в XCom Используется task_instance.xcom_push внутри Python-оператора. Пример: from airflow.decorators import task
@task
def push_xcom():
return “Hello from XCom”
Чтение данных из XCom
Используется task_instance.xcom_pull. Пример: @task def pull_xcom(ti): value = ti.xcom_pull(task_ids='push_xcom') print(f"Received: {value}")
Автоматическая передача (XComArg)
В Airflow 2+ можно передавать результат одной задачи в другую без xcom_pull. Пример:
pushed_value = push_xcom()
pull_xcom(pushed_value)
TaskFlow API (новый стиль написания DAG’ов в Airflow 2+) — даёт возможность возвращать объекты в декораторе @task и автоматически прокидывать их в другие таски. Но, по сути, данные всё равно сохраняются в XCom. Удобный способ определения последовательности выполнения тасок и передачи мета-данных между ними. Введён в 2.0.0 airflow. Использует декораторы для определения тасок (и дагов). Пример: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Поясню, что здесь происходит.
После определения тасок указывается их запуск в таком порядке:
order_data = extract() # вызывается экстракт таск и результат передаётся в order_data переменную
order_summary = transform(order_data) # вызывается трансформ таск с параметром, туда передаётся результат выполнения предыдущей таски
load(order_summary[“total_order_value”]) # аналогично предыдущему шагу, но со значением словаря по ключу
По-старинке можно было бы задать порядок выполнения через
extract»_space; transform»_space; load, обрабатывая передачу параметров через XCOM вручную и описывая как минимум task_id.
Или через
extract.set_downstream(transform)
transform.set_downstream(load)
Также можно задавать условия выполнения через BranchPythonOperator и ShortCircuitOperator.
Почему не стоит передавать большие объекты (например, DataFrame) через XCom
Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.
- Перегружаете базу: большой объём данных в XCom = большая нагрузка на мета-базу Airflow.
- Проблемы с сериализацией: объекты сериализуются (pickling), что может приводить к ошибкам и замедлять работу.
- Плохая масштабируемость: при больших объёмах данных это сильно бьёт по производительности.
Рекомендация:
- Хранить большие объёмы данных во внешнем хранилище (S3, PostgreSQL, HDFS и т.д.), а через XCom/TaskFlow передавать только путь или ключ.
Как организовать алертинг об ошибках и падениях тасок/дагов Airflow (например, в Telegram)?
Теги: #wildberries, #x5
Через оператор TelegramOperator. Да, так просто.
Если впервые подключаешь новый сервис к взаимодействию с airflow, проверь, может есть уже написанный оператор для этого.
Как настроить алертинг в Airflow через Telegram
- Создать бота в Telegram через @BotFather, получить TOKEN.
- Найти chat_id с помощью API:
- https://api.telegram.org/bot<TOKEN>/getUpdates</TOKEN>
- Добавить соединение в Airflow (Admin → Connections):
Conn Id: telegram_default
Conn Type: HTTP
Host: https://api.telegram.org/bot<TOKEN>/
Extra: {"password": "<TOKEN>"}</TOKEN></TOKEN> - Установить пакет (если нет):pip install apache-airflow-providers-telegram
- Добавить TelegramOperator в DAG для отправки сообщений.
- ## Использовать on_failure_callback в таски, даги для автоматических алертов при сбоях.
- Стандартные e-mail уведомления
- В Airflow можно настроить e-mail алерты, указав SMTP-сервер и параметры в airflow.cfg.
- В DAG или задачах можно прописать email_on_failure=True и email=[‘you@example.com’].
Что такое параметр catchup в DAG Airflow и для чего он может быть полезен, кроме прогонки исторических данных?
Теги: #wildberries
catchup — параметр в Apache Airflow, который определяет, должны ли старые даты выполнения задач быть выполнены при первом запуске DAG. Когда catchup установлен в True, Apache Airflow автоматически запускает пропущенные даты выполнения задач после того, как DAG был включен в работу. Кроме исторического прогона, помогает обеспечить целостность данных, если DAG должен гарантированно отработать за все пропущенные периоды.
Когда catchup=True полезен?
Когда важно обработать всю историю данных. Для DAG, работающих с инкрементальными загрузками из БД. Если DAG загружает данные с метками времени, например, записи логов.
Когда catchup=True не полезен?
- Оптимизация нагрузки на систему
При наличии DAG, который выполняется часто (например, каждый час или чаще), большое количество пропущенных запусков может перегрузить систему, когда Airflow попытается их все обработать. Установка catchup=False предотвращает лавинообразное выполнение DAG.
Пример:
Вы включаете DAG, который выполняется раз в 5 минут, но он был отключен на 2 недели. Без catchup=False Airflow попытается выполнить 4032 запуска (12 запусков в час × 24 часа × 14 дней), что может привести к перегрузке.
- Исключение обработки устаревших данных
Если DAG загружает данные из внешних источников (например, API Wildberries), исторические данные могут быть уже недоступны или устаревшими.
Пример:
Вы загружаете остатки товаров с Wildberries API, но API предоставляет данные только за последние 7 дней. Если DAG был отключен на 2 недели, то запуски за более ранние даты просто не найдут данных.
- Исключение проблем с повторным запуском зависимых процессов
Если DAG управляет данными, которые изменяются в реальном времени (например, обновляет агрегированные отчеты в ClickHouse или Greenplum), прогон старых запусков может привести к:
дублированию данных, пересозданию отчетов с устаревшими значениями, логическим ошибкам при обработке временных меток.
Пример:
DAG обновляет витрину продаж Wildberries в ClickHouse. Если DAG был отключен на неделю и затем включен, пересоздание витрины за старые даты приведет к конфликту данных.
Какие макросы Airflow существуют и какие вы используете на практике?
Теги: #wildberries
В Apache Airflow макросы позволяют использовать динамические значения внутри DAG’ов, шаблонов и операторов (например, BashOperator, PythonOperator, EmailOperator и др.). Они особенно полезны для параметризации задач.
Примеры макросов:
* {{ ds }} / {{ ds_nodash }} Дата, за которую DAG обрабатывает данные (а не дата его запуска). Если DAG выполняется ежедневно, то ds – это вчерашняя дата.),
* {{ execution_date }} (дата/время запуска),
* {{ prev_ds }}, {{ next_ds }} (предыдущая/следующая дата),
* {{ dag_run.conf }} (конфигурация, переданная при триггере).
Чаще всего используют ds/ds_nodash для шаблонизации путей и имён файлов.
Сенсоры и Poke интервал. Что знаешь про это?
Теги: #Ярослав
Sensors — это специальные операторы в Airflow, которые ожидают наступления определённого события (например, появления файла, готовности таблицы или завершения другого DAG’а).
По умолчанию сенсоры работают в poke-режиме, при котором они полностью занимают worker slot (ресурс кластера, см. параллелизм), пока ждут событие.
Режимы работы сенсоров
Настроить режим сенсора можно в параметре mode, передаваемом в конструктор сенсора:
Poke (по умолчанию) — сенсор постоянно проверяет условие через poke_interval, занимая worker slot. Reschedule — сенсор «отпускает» worker slot между проверками, что снижает нагрузку на Airflow. Вместо блокировки он просто перезапускается через poke_interval. Устанавливается через mode='reschedule'. Smart Sensors — оптимизированная система сенсоров в Airflow 2.x, позволяющая сгруппировать несколько сенсоров в один процесс и уменьшить нагрузку на кластер. Smart Sensors тоже используют poke-логику, но делают это оптимизированно. Smart Sensors позволяют объединить несколько сенсоров в один процесс, что снижает нагрузку на scheduler и workers. Работает это так: вместо того чтобы каждый сенсор запускался отдельно, Airflow группирует их в один специальный smart-sensor-процесс, который выполняет проверки централизованно. 🔹 Как включить Smart Sensors Включить их в конфигурационном файле Airflow (airflow.cfg): [smart_sensor] use_smart_sensor = true shard_code_upper_limit = 10000 Настройка poke- и reschedule-режимов
Режим сенсора (mode) и интервал между проверками (poke_interval) указываются при его создании. Например:
from airflow.sensors.filesystem import FileSensor
file_sensor = FileSensor(
task_id=”wait_for_file”,
filepath=”/path/to/file”,
poke_interval=60, # Проверка каждую минуту
mode=”poke”, # Можно заменить на ‘reschedule’ для экономии ресурсов
)
Если в mode=’poke’, Airflow будет удерживать worker slot на время ожидания. Если mode=’reschedule’, сенсор будет временно отключаться и ждать следующего запуска.
Какие есть конекторы в Аирфлоу?
Теги: #neoflex
Коннекторы в Apache Airflow – это специальные модули, которые позволяют DAG’ам взаимодействовать с различными внешними системами, такими как базы данных, облачные сервисы, API и другие источники данных. Коннекторы реализуются через Hook и Operator.
Hook – низкоуровневый интерфейс для подключения к внешней системе.
Operator – высокоуровневый объект, использующий Hook для выполнения задач.
Пример:
PostgresHook – устанавливает соединение с PostgreSQL. PostgresOperator – выполняет SQL-запрос в PostgreSQL через этот Hook.
Коннектор (Hook/Operator) Описание
PostgresHook, PostgresOperator Подключение к PostgreSQL
MySqlHook, MySqlOperator Подключение к MySQL
MsSqlHook, MsSqlOperator Подключение к Microsoft SQL Server
OracleHook, OracleOperator Подключение к Oracle
SnowflakeHook, SnowflakeOperator Работа с Snowflake
MongoHook, MongoOperator Работа с MongoDB
ClickHouseHook, ClickHouseOperator Работа с ClickHouse (через кастомные плагины)
HdfsHook, HdfsSensor Подключение к HDFS
HiveServer2Hook, HiveOperator Работа с Apache Hive
SparkSubmitHook, SparkSubmitOperator Запуск Spark-приложений
PrestoHook, PrestoToGCSOperator Работа с Presto
BigQueryHook, BigQueryOperator Подключение к Google BigQuery
пример
from airflow.providers.postgres.hooks.postgres import PostgresHook
def get_postgres_data():
pg_hook = PostgresHook(postgres_conn_id=’my_postgres_conn’)
connection = pg_hook.get_conn()
cursor = connection.cursor()
cursor.execute(“SELECT * FROM my_table LIMIT 10;”)
result = cursor.fetchall()
for row in result:
print(row)
get_postgres_data()
- Как добавить новое подключение (Connection) в Airflow?
Через Web UIПерейти в Admin → Connections.
Нажать “+ Add Connection”.
Заполнить поля:
Conn Id: my_postgres_conn
Conn Type: Postgres
Host: localhost
Schema: my_database
Login: airflow_user
Password: password
Port: 5432
Нажать Save.
Как хранить секреты? +(connections)?
Теги: #Ярослав
Можно в переменных (Admin - Variables), они хранятся в зашифрованном виде (если в airflow.cfg настроен fernet key – фернет это тип симметричного шифрования, этого достаточно).
Но если секреты это параметры подключения (credentials, кредэншиалс) для баз данных и других источников, для этого идеально подходят коннекшены (Admin - Connections). Там задаются все нужные параметры, имя коннекта, и подключение производится по нему.
Через эти же коннекшены можно разделять контура дев-тест-прод: задаётся единое имя коннекта, а в разных инстансах airflow значения могут быть разными, и соответственно вести к разным сервисам. Конечно, ещё нужно чтобы была сетевая связность, т.е. сервер airflow мог обмениваться данными с сервером базы в принципе (подробнее https://t.me/rzv_de/92).
Также можно использовать отдельные сервисы, например hashicorp Vaul
Зачем нужен самописный оператор и как работает?
Теги: #Ярослав
Нужен тогда, когда хочется переиспользовать какой-то пайтон код, а готового оператора нет. Например, для связи с внутренним сервисом компании или непопулярным внешним API. Для google ads точно есть оператор, для яндекс погоды - вряд ли. Наследуется от BaseOperator, должен определить метод execute.
Создание кастомного оператора
Создадим Python-класс PostgresToGCSOperator, который:
Выполняет SQL-запрос в PostgreSQL. Сохраняет результат в локальный файл. Загружает файл в GCS. from airflow.models import BaseOperator from airflow.hooks.postgres_hook import PostgresHook from airflow.providers.google.cloud.hooks.gcs import GCSHook import csv import os
class PostgresToGCSOperator(BaseOperator):
def __init__(self, postgres_conn_id, sql_query, gcs_bucket, gcs_path, gcs_conn_id=’google_cloud_default’, args, **kwargs):
super().__init__(args, **kwargs)
self.postgres_conn_id = postgres_conn_id
self.sql_query = sql_query
self.gcs_bucket = gcs_bucket
self.gcs_path = gcs_path
self.gcs_conn_id = gcs_conn_id
def execute(self, context): self.log.info("Подключаемся к PostgreSQL") pg_hook = PostgresHook(postgres_conn_id=self.postgres_conn_id) conn = pg_hook.get_conn() cursor = conn.cursor() self.log.info("Выполняем SQL-запрос") cursor.execute(self.sql_query) rows = cursor.fetchall() column_names = [desc[0] for desc in cursor.description] local_file = "/tmp/temp_data.csv" self.log.info(f"Сохраняем данные в файл {local_file}") with open(local_file, "w", newline="") as f: writer = csv.writer(f) writer.writerow(column_names) writer.writerows(rows) self.log.info(f"Загружаем файл в GCS: gs://{self.gcs_bucket}/{self.gcs_path}") gcs_hook = GCSHook(self.gcs_conn_id) gcs_hook.upload(bucket_name=self.gcs_bucket, object_name=self.gcs_path, filename=local_file) self.log.info("Удаляем временный файл") os.remove(local_file) self.log.info("Оператор успешно завершил работу")
Использование кастомного оператора
Теперь добавим его в DAG:
from airflow import DAG
from datetime import datetime
from custom_operators.postgres_to_gcs import PostgresToGCSOperator
default_args = {
“owner”: “airflow”,
“start_date”: datetime(2024, 3, 1),
“retries”: 1,
}
with DAG(“postgres_to_gcs_dag”, default_args=default_args, schedule_interval=”@daily”, catchup=False) as dag:
export_to_gcs = PostgresToGCSOperator( task_id="export_postgres_to_gcs", postgres_conn_id="my_postgres_conn", sql_query="SELECT * FROM my_table;", gcs_bucket="my_bucket", gcs_path="data/my_table.csv", ) export_to_gcs
Что влияет на параллелизм?
Теги: #Ярослав
Основные настройки:
* parallelism (глобальный максимум параллельных задач)
Это параметр на уровне всей установки Airflow (настройка в airflow.cfg). Он задает максимальное количество одновременно выполняемых задач (task instances) в рамках всей системы, независимо от количества DAG-ов, worker-ов и других ограничений.
Пример:
Если parallelism = 32, то Airflow одновременно запустит не более 32 задач, даже если DAG-ов много и в каждом из них есть готовые к выполнению задачи.,
* dag_concurrency (максимум одновременно работающих тасок в рамках одного DAG),
* worker_concurrency (сколько задач может одновременно обрабатывать один worker),
* pool (ограничение числа задач по ресурсам/пуллам)
Pools (пулы) в Airflow позволяют ограничивать выполнение задач по определенным ресурсным ограничениям. Например, если задачи обращаются к внешней базе данных или API с ограниченным числом соединений, можно создать pool и указать, сколько задач одновременно могут его использовать.
Пример:
Если создать pool с size=10, то не более 10 задач смогут одновременно использовать этот ресурс, даже если все остальные ограничения (parallelism, dag_concurrency, worker_concurrency) позволяют больше..
Достаточно сказать, что есть Pool’ы, которые ограничивают максимальное количество тасок, работающих параллельно. Стандартный пул содержит 128 слотов.
Их можно настроить через Admin - Pools
Для разных дагов и даже тасок можно задавать разные пулы. Для таски можно указать “вес” для приоритизации внутри пула. Это тонкая настройка для тех дагов, выполнение которых важнее, чем других.
Ну и, конечно, в самих дагах нужно таски выстраивать параллельно, где возможно. Оставляй только те зависимости, которые логически ограничивают таску от запуска, даже если будет выглядеть “некрасиво”. Полезно сказать и про таскгруппы, которые задают шаблон выполнения тасок и позволяют формировать их, например, в цикле.
Как реализовать свой кастомный оператор в Airflow? (Какой метод нужно определить?)
Теги: #x5
Чтобы реализовать свой кастомный оператор в Apache Airflow, тебе нужно понимать архитектуру Airflow Operators, какую базу они наследуют, и какие методы и параметры нужно определять, чтобы всё корректно работало внутри DAG.
Вот пошаговое и полное объяснение без самого кода, как ты просил:
🔧 1. Наследование: от какого класса
Любой кастомный оператор в Airflow наследуется от базового класса:
airflow.models.BaseOperator
или от специализированного:
airflow.operators.BaseOperator
Или ты можешь унаследовать от существующего оператора (например, PythonOperator
, если ты хочешь просто добавить логику к уже существующему поведению).
📌 2. Какие методы нужно определить
Обязательный метод:
-
def execute(self, context):
- Это основной метод, который вызывается при выполнении задачи в DAG.
- Аргумент
context
— это словарь со всеми переменными окружения Airflow (execution date, task instance, dag, ds, ts и т.п.) - Всё, что оператор должен делать — описывается в
execute
.
(Опционально):
-
\_\_init\_\_(...)
- Здесь ты задаёшь параметры, которые можно будет передавать при объявлении оператора в DAG.
- Не забудь вызвать
super().\_\_init\_\_(...)
, передав в негоtask_id
, и другие параметры, поддерживаемые базовым оператором (например,retries
,depends_on_past
и т.д.).
🧱 3. Где сохранять кастомный оператор
a) Локально в проекте DAG’ов:
- Создай структуру:
dags/ ├── my_operators/ │ └── my_custom_operator.py ├── my_dag_using_operator.py
- Обязательно добавь
\_\_init\_\_.py
в каждую папку, если используешь относительный импорт (или хочешь чтобы это была полноценная Python-модель).
b) Airflow Plugins (устаревший способ, но иногда нужен):
- Регистрируй оператор через систему плагинов, но это избыточно, если не используешь внешние UI-компоненты или хуки.
📦 4. Как импортировать и использовать в DAG
В DAG-файле:
- Импортируй кастомный оператор как обычный Python-класс.
- Используй его как обычный оператор, передавая
task_id
и свои параметры.
```python
from my_operators.my_custom_operator import MyCustomOperator
task = MyCustomOperator(
task_id=’do_something_custom’,
my_param=’value’
)
~~~
🧠 5. Что можно использовать в кастомном операторе
Внутри execute(context)
ты можешь:
- Получать execution date и другие параметры из
context
. - Работать с Airflow hook’ами (например,
PostgresHook
,HttpHook
). - Вызывать внешние API, скрипты, команды, и т.д.
- Возвращать значения (они сохраняются в
XCom
по умолчанию).
🧪 6. Как тестировать
- Можешь использовать
airflow tasks test <dag_id> <task_id> <execution_date>
для запуска конкретной задачи с твоим оператором. - Логика в
execute
будет вызвана в том же процессе.
🏷️ 7. Тег #x5 — как сохранять и документировать
Если у тебя в команде приняты специальные теги (например, #x5
), разумно:
- Добавить комментарий с таким тегом внутри
\_\_init\_\_
иexecute
. - Придерживаться Docstring-стиля Google или NumPy для читаемой автогенерации документации.
- Можно хранить такие кастомные модули в
libs/x5_airflow_plugins/operators/
илиdags/x5/operators/
, в зависимости от структуры проекта.
🧭 8. Рекомендации и нетривиальные кейсы
- Если хочешь, чтобы кастомный оператор отображал поля в UI (например, в XCom, или кастомный лог), — реализуй логику логирования через
self.log.info(...)
. - Чтобы оператор был совместим с TaskFlow API, он должен корректно поддерживать возврат значений из
execute
. - В мультиплатформенных окружениях не забывай указывать зависимости (
requirements.txt
илиconstraints.txt
), если твой оператор требует сторонние библиотеки.
Если хочешь, могу сразу предложить шаблон класса с комментами и готовой структурой — скажи.
Использовали ли вы кастомные шаблоны Jinja и XCom в Airflow? Как их настраивали?
Теги: #x5
Кастомные шаблоны Jinja
В одном из проектов у нас была потребность подстановки динамических путей в S3 и генерации SQL-запросов с учётом среды (dev, prod) и дат. Вместо того чтобы хардкодить логику в каждом таске, мы вынесли её в кастомные макросы и фильтры Jinja. Например, создавали макрос, который на вход принимал дату и окружение, а возвращал полный путь к файлу; этот макрос объявлялся в user_defined_macros и вызывался в bash_command и SQL, чтобы каждый раз не переписывать шаблон.
Кастомный XCom
Была задача хранить в XCom большие JSON-объекты с метаданными, что в стандартном варианте могло перегружать базу Airflow. Мы реализовали кастомный XCom-класс (наследованный от BaseXCom), переопределили методы сериализации и десериализации, чтобы фактически сохранять данные во внешнее S3-хранилище, а в базе Airflow хранить только ссылку. Так одна задача выгружала метаданные в S3 и пушила их ключ в XCom, а другая задача подтягивала ключ из XCom, загружала данные из S3 и уже с ними работала. Это позволяло эффективно работать с объёмными данными и не перегружать метаданные Airflow.
Jinja-шаблоны в Airflow
Jinja-шаблоны позволяют динамически подставлять значения в параметры операторов (tasks) с помощью переменных, макросов и встроенной логики (например, условных выражений или циклов). Это удобно, когда нужно, например, автоматически подставлять даты выполнения (execution_date), идентификаторы DAG’ов или другие параметры среды.
Для использования шаблонов в Airflow есть два ключевых механизма:
template_fields – список полей в операторе, которые можно шаблонизировать. Например, если у вас есть BashOperator, его аргумент bash_command можно сделать шаблонизируемым: bash_command="echo Execution date is {{ ds }}"
В момент выполнения DAG’а {{ ds }} автоматически заменится на дату выполнения.
Каждый оператор (например, BashOperator, PythonOperator) имеет атрибут template_fields. Это список полей, которые могут использовать Jinja-шаблоны. У каждого оператора свой template_fields. Например, у BashOperator можно шаблонизировать bash_command, а у PythonOperator – аргументы функции.Посмотреть список template_fields можно так: print(BashOperator.template_fields) # (‘bash_command’, ‘env’) . Он позволяет передавать несколько шаблонизируемых значений в виде словаря. Например:
PythonOperator(
task_id=”my_task”,
python_callable=my_function,
op_kwargs={“param”: “{{ execution_date }}”}
)
Здесь “{{ execution_date }}” заменится реальной датой перед вызовом my_function.
Кастомные шаблонные переменные
Если нужно добавить свои переменные, можно переопределить user_defined_macros:
- XCom – механизм передачи данных между задачами
XCom (Cross-Communication) – это встроенный механизм Airflow для передачи данных между задачами. Он позволяет одной задаче (task) записывать данные, а другой – их читать.
🔹 Как работает XCom?
Airflow автоматически сохраняет XCom-записи в своей базе данных. Каждая запись содержит:
Ключ (key) – уникальное название записи Значение (value) – сами данные (может быть строка, число, объект и т. д.) Идентификаторы DAG'а и задачи (чтобы записи не путались)
🔹 Как записывать и читать XCom?
Запись выполняется через task_instance.xcom_push(key, value) Чтение – через task_instance.xcom_pull(task_ids, key)
Пример использования в PythonOperator:
def push_value(**kwargs):
kwargs[‘ti’].xcom_push(key=’my_key’, value=42)
def pull_value(**kwargs):
value = kwargs[‘ti’].xcom_pull(task_ids=’push_task’, key=’my_key’)
print(f”Received value: {value}”)
🔹 Как передавать данные удобнее?
Для упрощенной работы с XCom можно использовать TaskFlow API (@task-декораторы). Это избавляет от явного вызова xcom_push/xcom_pull:
@task
def push_value():
return 42 # Значение автоматически записывается в XCom
@task
def pull_value(value):
print(f”Received: {value}”) # Значение автоматически передается из предыдущей задачи
Кастомизация XCom
Настройка сериализации: по умолчанию XCom сохраняет данные в виде pickle-объектов. В airflow.cfg можно изменить формат, например, на JSON (xcom_serializer = airflow.utils.json). Хранение больших объектов: для передачи больших объемов данных можно использовать XCom backend (например, Google Cloud Storage, S3).
Динамическая генерация тасок. Partial, expand. Что знаешь про это?
Теги: #Ярослав #x5
Создание дага
Динамическая генерация задач в Airflow
Динамическая генерация задач (dynamic task mapping) позволяет «на лету» создавать несколько экземпляров одной и той же задачи с разными параметрами. Это удобно, когда у вас есть список входных данных, и для каждого элемента списка нужно выполнить однотипную операцию.
Способы реализации:
-
Генерация через цикл (например, PythonOperator)
Можно использовать классический подход с созданием операторов (операторов) в цикле. Пример (упрощённо):
from datetime import datetime
from airflow.decorators import dag, task
default_args = {
“owner”: “airflow”,
“retries”: 1
}
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False,
default_args=default_args,
tags=[“example”, “taskflow”]
)
def dynamic_taskflow_dag():
@task() def my_task(param1: str, param2: str): """ Пример таска, который принимает два параметра и выводит их в лог. param1 - фиксированное значение для всех задач. param2 - динамическое значение, получаемое из списка dynamic_values. """ print(f"param1={param1}, param2={param2}") # Фиксированные аргументы для всех тасков static_args = {"param1": "fixed_value"} # Значение param1 остаётся неизменным # Динамический список значений для параметра param2 dynamic_values = ["val1", "val2", "val3"] # Каждое значение создаст отдельный таск # Создаём частично зафиксированную задачу с заранее заданным param1 partial_task = my_task.partial(**static_args) # Используем expand, чтобы создать несколько независимых задач, # где param2 будет принимать значения из dynamic_values # Это эквивалентно вызову my_task("fixed_value", "val1"), my_task("fixed_value", "val2"), и т.д. partial_task.expand(param2=dynamic_values)
dynamic_taskflow_dag()
Каждый проход цикла создаёт новую задачу (task) со своим task_id
и передаёт соответствующий item
в функцию.
Зачем это нужно? Например, у нас есть 10 файлов, которые нужно последовательно обработать, и мы не хотим «клонировать» один и тот же код 10 раз.
-
Генерация задач через TaskGroup
Если нужно логически группировать несколько динамических задач, то в Airflow 2.* можно использоватьTaskGroup
. Внутри TaskGroup можно, аналогично примеру выше, сгенерировать задачи в цикле, а затем связать эту группу с другими задачами.
Это упрощает визуализацию на графе и делает DAG чище.from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.utils.task_group import TaskGroup
from datetime import datetime
Функция, которая будет вызвана в динамически созданных задачах
def process_item(item):
print(f”Processing item: {item}”)
with DAG(
dag_id=”dynamic_taskgroup_example”,
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False
) as dag:
items = ["apple", "banana", "cherry", "date", "elderberry"] # Данные для генерации тасок # Создаём TaskGroup, в которой будем динамически создавать задачи with TaskGroup(group_id="process_fruits") as process_fruits_group: for item in items: PythonOperator( task_id=f"process_{item}", python_callable=process_item, op_args=[item] ) # Дополнительные таски DAG (пример) start_task = PythonOperator( task_id="start", python_callable=lambda: print("Starting the DAG") ) end_task = PythonOperator( task_id="end", python_callable=lambda: print("Ending the DAG") ) # Определяем порядок выполнения start_task >> process_fruits_group >> end_task
-
Использование
@task
(TaskFlow API),partial
иexpand
В TaskFlow API, начиная с Airflow 2.3, появились механизмыpartial
иexpand
, которые упрощают написание динамических задач.-
partial
позволяет зафиксировать часть аргументов, которые будут одинаковыми для всех экземпляров задачи. -
expand
позволяет «раскрывать» список параметров и создать несколько экземпляров одной задачи для каждого значения из этого списка.
from datetime import datetime
from airflow.decorators import dag, task -
default_args = {
“owner”: “airflow”,
“retries”: 1
}
@dag(
start_date=datetime(2023, 1, 1),
schedule_interval=”@daily”,
catchup=False,
default_args=default_args,
tags=[“example”, “taskflow”]
)
def dynamic_taskflow_dag():
@task() def my_task(param1: str, param2: str): print(f"param1={param1}, param2={param2}") static_args = {"param1": "fixed_value"} dynamic_values = ["val1", "val2", "val3"] partial_task = my_task.partial(**static_args) partial_task.expand(param2=dynamic_values)
dynamic_taskflow_dag()
Здесь:
- my_task
— это один Python-функциональный таск.
- partial
«зашивает» param1="fixed_value"
для всех экземпляров.
- expand(param2=dynamic_values)
генерирует три независимых подзадачи (для val1
, val2
и val3
соответственно).
Зачем это нужно? Например, если у вас есть список параметров (названий таблиц в БД, идентификаторов пользователей, датовых срезов, имён файлов и т.д.), и нужно применить одну и ту же функцию (или PythonOperator) к каждому элементу этого списка. Благодаря expand
вы получите параллельное исполнение одинаковых задач с разными входными данными.
Когда и почему пригодится динамическая генерация?
- Обработка нескольких файлов / табличных партиций: Нужно запустить один и тот же код для набора данных (список имён файлов или датовых партиций), не копируя оператор много раз вручную.
- Параллелизм и масштабирование: Airflow может автоматически параллелить задачи, если у вас есть рабочие узлы (workers). Генерация множества однотипных задач позволяет загружать все вычислительные ресурсы.
- Гибкость и удобство поддержки: Вместо того, чтобы прописывать статически 10–100 одинаковых задач, достаточно иметь один шаблон (оператор или @task), который вызывается с разными аргументами.
Таким образом, подходы с циклом, TaskGroup или связкой partial/expand
решают схожую задачу: динамически сгенерировать набор однотипных задач с разными параметрами. Разница лишь в синтаксисе и удобстве чтения кода.
- Если вам проще мыслить классическими операторами (PythonOperator) и циклами — используйте «ручную» генерацию в
for
. - Если нравится «чистый» Python-код и декларативный стиль TaskFlow, то обратите внимание на
partial
иexpand
.
Все варианты верны; выбор инструмента зависит от вкуса, ситуации и требуемой наглядности кода.
Датасеты (event-based scheduling) что это?
Теги: #Ярослав
Датасеты (event-based scheduling) позволяют запускать DAG не по заранее заданному расписанию (которое мы указываем в schedule_interval или через CRON), а по событию. Например, при добавлении файла в S3-бакет, при вызове API или другом схожем событии.
Зачем это нужно?
Если данные появляются раньше стандартного расписания, то и обрабатывать их можно быстрее, не дожидаясь наступления CRON-времени. Схожего результата можно достичь через сенсоры, но для первого DAG в цепочке сенсор сам по себе не запустится — нужно так или иначе триггерить DAG. Датасеты упрощают эту логику: один DAG публикует (обновляет) Dataset, а другой DAG подписывается на этот Dataset и запускается автоматически.
Ограничения и актуальная версия
До версии Airflow 2.9 были некоторые шероховатости в работе с датасетами, но к 2.9+ механизм стал более надёжным и гибким. На некоторых проектах датасеты могут не использоваться просто по причинам легаси-кода (исторически всё завязано на CRON, а менять большие массивы кода дорого и долго). Датасеты появились в конце 2022 года (в Airflow 2.4) и где-то полтора года дорабатывались, поэтому в новых проектах на свежих версиях Airflow их использование может быть очень удобно.
Как именно это работает?
Датасеты (Datasets) в Apache Airflow — это не оператор и не сенсор. Это механизм, который работает похожим на триггер способом, но встроен в сам движок Airflow.
Производящий DAG (Producer DAG) публикует (обновляет) Dataset при помощи outlets=[my_dataset] который говорит Airflow, что этот таск обновляет my_dataset.. Это похоже на триггер, но без явного вызова trigger_dag. После успешного выполнения DAG-а, Airflow помечает Dataset как обновлённый. Потребляющий DAG (Consumer DAG) подписан на этот Dataset. У него расписания (schedule=[my_dataset]), Как только Dataset обновится — Airflow автоматически запустит этот DAG.
Важно:
Работает внутри самого Airflow, без внешних API-запросов. Нет необходимости в сенсорах (которые нагружают систему постоянными проверками). DAG подписывается на Dataset внутри Airflow, а не через оператор. Producer DAG (публикует Dataset) Здесь outlets=[my_dataset] говорит Airflow, что этот таск обновляет my_dataset. ------------------------------------------------------------------------------------------------------------------------------------------------------ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.datasets import Dataset from datetime import datetime
my_dataset = Dataset(“s3://my-bucket/data.csv”)
def generate_data():
# Код генерации данных и их сохранения
print(“Dataset обновлен!”)
with DAG(
dag_id=”producer_dag”,
start_date=datetime(2024, 1, 1),
schedule=”@daily”, # DAG выполняется раз в день
catchup=False,
) as dag:
task = PythonOperator( task_id="publish_dataset", python_callable=generate_data, outlets=[my_dataset], # Обновляет Dataset ) Consumer DAG (запускается при обновлении Dataset) schedule=[my_dataset] → DAG будет автоматически запускаться при обновлении my_dataset. ------------------------------------------------------------------------------------------------------------------------------------------------------ from airflow import DAG from airflow.operators.python import PythonOperator from airflow.datasets import Dataset from datetime import datetime
my_dataset = Dataset(“s3://my-bucket/data.csv”)
def process_data():
print(“Данные обновлены, выполняем обработку!”)
with DAG(
dag_id=”consumer_dag”,
start_date=datetime(2024, 1, 1),
schedule=[my_dataset], # DAG запускается при обновлении Dataset
catchup=False,
) as dag:
task = PythonOperator( task_id="consume_dataset", python_callable=process_data, )
Что такое pull (connection pull) в Airflow? Как это используется?
Теги: #x5
- В контексте Airflow «pull» обычно означает чтение (запрос) значения из XCom (например, xcom_pull).
- «Pull» (иногда говорят connection pull) – это когда одна задача извлекает данные, которые другая задача сохранила (через XCom).
Чем хук отличается от оператора? Что такое хук?
Теги: #Ярослав
- Оператор - отдельный таск. (hook) в AirFlow — это высокоуровневый интерфейс к внешней платформе, который позволяет быстро общаться со сторонними системами без необходимости писать низкоуровневый код, который обращается к их API или использует специальные библиотеки.
- Хук можно использовать внутри кастомного пайтон таска для взаимодействия на более низком уровне. Например, если нужно выполнить несколько postgresql запросов с промежуточной обработкой вычислений одним атомарным шагом
Что можно указывать в default_args при определении DAG в Airflow?
Теги: #x5 #Ярослав
В Airflow при определении DAG и тасков можно использовать параметр default_args, который позволяет задать значения по умолчанию для всех тасков внутри DAG. Разберёмся, какие параметры можно задавать в default_args для DAG и тасков.
default_args передаётся в DAG как словарь, в котором можно указать:
Основные параметры:
owner – владелец DAG (например, 'airflow'). schedule_interval – интервал запуска DAG (может быть None, если DAG триггерится вручную). on_failure_callback Этот параметр принимает функцию-обработчик, которая вызывается при падении таска. В колбэк передаётся объект контекста (context), содержащий полезную информацию о DAG, таске и его окружении. catchup – нужно ли догонять пропущенные выполнения DAG (по умолчанию True). start_date – начальная дата выполнения DAG (например, datetime(2024, 3, 1)). end_date – дата окончания выполнения DAG (можно не указывать). depends_on_past – влияет ли выполнение текущего запуска DAG на успешность предыдущего (False по умолчанию). retries – число попыток перезапуска в случае ошибки (0 по умолчанию). execution_timeout – максимальное время выполнения таска (timedelta(hours=1)). dagrun_timeout – максимальное время выполнения DAG (timedelta(days=1)). catchup – нужно ли догонять пропущенные выполнения DAG (по умолчанию True). schedule_interval – интервал запуска DAG (может быть None, если DAG триггерится вручную). priority_weight – приоритет DAG среди других DAG'ов. wait_for_downstream – нужно ли ждать выполнения всех downstream-тасков перед запуском нового инстанса DAG. sla_miss_callback – callback-функция, вызываемая при нарушении SLA.
Что такое XCOM?
Теги: #Ярослав
- XCom – межтасковый канал обмена данными внутри одного DAG.
- Сохраняет (push) и извлекает (pull) небольшие объёмы Python-объектов в базе метаданных Airflow.
Протокол обмена мета-данными между тасок через бэкенд-базу airflow. Типа “икс-ком, кросс-коммьюникейшн”.
Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.
Что такое Hooks в Airflow и для чего они используются?
Теги: #x5
- Hooks – это обёртки для работы с внешними ресурсами (базы, хранилища, API).
- Упрощают открытие соединений, авторизацию, выполнение запросов, чтобы операторы могли переиспользовать эту логику.
Catchup, backfill что такое? Зачем?
Теги: #Ярослав
Если у дага указаны start_date
и schedule
, scheduler запланирует выполнение дага для каждого интервала, который ещё не отработал. Это поведение называется catchup (кэтчап).
То же произойдёт, если даг поставить на паузу на какое-то время, а потом – снова включить.
При этом можно указатьexecution_timeout
, чтобы запуск дага не работал дольше, чем интервал запуска. Например, дольше часа при запуске каждый час (иначе очередь будет накапливаться, а процесс никогда не закончится).
Бэкфилл позволяет загрузить данные за определённый период, например с даты 1 по дату 2. Запустит все даг раны в указанном диапазоне, даже если они уже отработали успешно раньше.
Запускается только через CLI:
airflow dags backfill \
–start-date START_DATE \
–end-date END_DATE \
dag_id
Чтобы бэкфилл и catchup работали, нужно в таске указывать связь между execution_date и условием фильтрации данных, например select … where dt = {{ execution_date }}
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
Также полезно упомянуть:depends_on_past
– добавляет ограничение – только если предыдущий запуск дага/таски успешен, можно запускать текущий. Не срабатывает в первый запуск по start_date.max_active_runs
– ограничение на количество одновременно работающих дагов.
Как можно настроить зависимости между группами параллельных тасок в Airflow?
Теги: #x5
- Использование TaskGroup с task_group.set_upstream() и task_group.set_downstream()
Если у вас есть две группы тасков, которые выполняются параллельно, но одна должна начинаться только после завершения другой, можно использовать TaskGroup:
with DAG(‘parallel_task_groups’, default_args=default_args, schedule_interval=None) as dag:
start = DummyOperator(task_id='start') with TaskGroup('group_1') as group_1: task_1_1 = DummyOperator(task_id='task_1_1') task_1_2 = DummyOperator(task_id='task_1_2') task_1_1 >> task_1_2 # Зависимость внутри группы with TaskGroup('group_2') as group_2: task_2_1 = DummyOperator(task_id='task_2_1') task_2_2 = DummyOperator(task_id='task_2_2') task_2_1 >> task_2_2 # Зависимость внутри группы end = DummyOperator(task_id='end') # Определяем зависимости между группами start >> group_1 group_1.set_downstream(group_2) group_2 >> end
- Использование chain() для линейной связи между группами
Если вы хотите избежать явного указания зависимостей через .set_downstream(), можно использовать airflow.models.baseoperator.chain():
from airflow.models.baseoperator import chain
chain(start, group_1, group_2, end)
- Использование ExternalTaskSensor для меж-DAG-овых зависимостей
Если группы находятся в разных DAG’ах, можно использовать ExternalTaskSensor, чтобы ожидать завершения первой группы в одном DAG перед началом второй группы в другом DAG:
from airflow.sensors.external_task import ExternalTaskSensor
wait_for_group_1 = ExternalTaskSensor(
task_id=’wait_for_group_1’,
external_dag_id=’parallel_task_groups’,
external_task_group_id=’group_1’,
mode=’poke’, # или reschedule
timeout=600,
poke_interval=30,
dag=dag
)
with TaskGroup(‘group_2’) as group_2:
task_2_1 = DummyOperator(task_id=’task_2_1’)
task_2_2 = DummyOperator(task_id=’task_2_2’)
task_2_1»_space; task_2_2
wait_for_group_1»_space; group_2
- Использование XCom для динамических зависимостей
Если у вас сложные сценарии, где группа тасков должна стартовать только при определенных условиях, можно передавать статус выполнения первой группы через XCom и проверять его во второй группе:
from airflow.operators.python import PythonOperator
def check_group_1(**kwargs):
ti = kwargs[‘ti’]
group_1_status = ti.xcom_pull(task_ids=’group_1.task_1_2’) # Берем статус последнего таска из группы 1
if group_1_status != ‘success’:
raise ValueError(“Group 1 не завершилась успешно!”)
check_task = PythonOperator(
task_id=’check_group_1’,
python_callable=check_group_1,
provide_context=True,
dag=dag
)
group_1»_space; check_task»_space; group_2
Какая база данных используется в Airflow? #тг
posgressql