Airflow Flashcards
Что это простыми словами
Apache Airflow это оркестратор, который умеет ставить задачи на расписание, следить за статусом их выполнения и “триггерить” (запускать) команды по условию “все подготовительные задачи выполнены успешно”. Это НЕ ETL инструмент. Это сервис, который раздаёт команды другим сервисам и обрабатывает результат выполнения этих команд.
Уже написано много готовых операторов, которые стандартизуют работу с источниками и таргетами (S3, базы данных, API сервисы). Причём для Postgres будет свой оператор, а для Clickhouse – свой.
Архитектура, термины
DAG (directed acyclic graph
, даг) – граф выполнения, атомарная единица в airflow. Определяет параметры запуска первого шага (с каким расписанием или по какому событию), взаимосвязь между шагами. Даги это python файлы, написанные с использованием пакета airflow, и лежащие в директории с дагами (обычно ./dags
).
Task (таска, шаг, степ) – узел графа, который выполняет полезную работу или определяет условие запуска какой-то из веток графа. Например, нужно ли пропустить все остальные шаги или продолжаем работу. Или может быть общий даг для 10 типов коннекторов и на основе входных параметров нужно запустить только задачи для указанного типа коннектора. Таска может быть: оператором, сенсором или python кодом с декоратором @task
.
Оператор – заранее написанный код, который можно сконфигурировать для выполнения нужной задачи. Например, PostgresOperator
может принимать на вход sql
и postgres_conn_id
, что выполнять и где выполнять.
Сенсор используется для связи дагов между собой. Единственное назначение – раз в poke
интервал проверять, что связанный таск выполнен успешно. Подробнее ниже.
Scheduler (шедУлер, планировщик) – компонент, который ставит на расписание даги и таски, следит за их статусами, переводит из одного статуса в другой. Он же сканирует папку с дагами и импортирует их. Здесь же задаётся тип executor’a
– механизма, который определяет как именно будут запущены инстансы тасок.
Worker (воркер) – отдельный процесс, собственно выполняющий задачи. На проде обычно много воркеров.
Webserver – UI для airflow, запускает “вебсайт” для взаимодействия с дагами и тасками. Полезно для просмотра логов, создания и изменения коннектов и переменных. Даг менять через интерфейс НЕЛЬЗЯ, только через код.
Triggerer – необязательный компонент, который нужен для запуска deferrable
операторов, которыми можно заменять сенсоры. Подробнее ниже
Очень полезная схема: переход между статусами таск инстансов, связывает компоненты между собой
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances
Самописный оператор
Нужен тогда, когда хочется переиспользовать какой-то пайтон код, а готового оператора нет. Например, для связи с внутренним сервисом компании или непопулярным внешним API. Для google ads точно есть оператор, для яндекс погоды – вряд ли.
Наследуется от BaseOperator
, должен определить метод execute.
Пример:
from airflow.models.baseoperator import BaseOperator class HelloOperator(BaseOperator): def \_\_init\_\_(self, name: str, **kwargs) -> None: super().\_\_init\_\_(**kwargs) self.name = name def execute(self, context): message = f"Hello {self.name}" print(message) return message
Для поддержки оператором jinja шаблонов нужно явно прописать поля, в которых парсеру дага нужно будет искать {{ }}.
template_fields: Sequence[str] = ("name",)
Использовать можно так:
with dag: hello_task = HelloOperator( task_id="task_id_1", name="{{ task_instance.task_id }}", world="Earth", )
Catchup, backfill
Если у дага указаны start_date
и schedule
, scheduler запланирует выполнение дага для каждого интервала, который ещё не отработал. Это поведение называется catchup (кэтчап).
То же произойдёт, если даг поставить на паузу на какое-то время, а потом – снова включить.
При этом можно указатьexecution_timeout
, чтобы запуск дага не работал дольше, чем интервал запуска. Например, дольше часа при запуске каждый час (иначе очередь будет накапливаться, а процесс никогда не закончится).
Бэкфилл позволяет загрузить данные за определённый период, например с даты 1 по дату 2. Запустит все даг раны в указанном диапазоне, даже если они уже отработали успешно раньше.
Запускается только через CLI:
airflow dags backfill \ --start-date START_DATE \ --end-date END_DATE \ dag_id
Чтобы бэкфилл и catchup работали, нужно в таске указывать связь между execution_date и условием фильтрации данных, например select … where dt = {{ execution_date }}
https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html
Также полезно упомянуть:depends_on_past
– добавляет ограничение – только если предыдущий запуск дага/таски успешен, можно запускать текущий. Не срабатывает в первый запуск по start_date.max_active_runs
– ограничение на количество одновременно работающих дагов.
Передача данных между тасками
Производится через некоторый внешний сервис. Или нужно сохранить в файл (локально на ВМ/контейнер airflow, S3, hdfs и тд) и в следующем шаге оттуда считать. Или сложить в базу в промежуточную схему и потом читать оттуда.
Ещё раз полезно напомнить, что Airflow это оркестратор, который отправляет задачи внешним системам и следит за их исполнением, а не ETL инструмент.
XCOM
Протокол обмена мета-данными между тасок через бэкенд-базу airflow. Типа “икс-ком, кросс-коммьюникейшн”.
Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.
TaskFlow, последовательность выполнения тасок
Удобный способ определения последовательности выполнения тасок и передачи мета-данных между ними. Введён в 2.0.0 airflow. Использует декораторы для определения тасок (и дагов). Пример: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html
Поясню, что здесь происходит.
После определения тасок указывается их запуск в таком порядке:
order_data = extract() # вызывается экстракт таск и результат передаётся в order_data переменную order_summary = transform(order_data) # вызывается трансформ таск с параметром, туда передаётся результат выполнения предыдущей таски load(order_summary["total_order_value"]) # аналогично предыдущему шагу, но со значением словаря по ключу
По-старинке можно было бы задать порядок выполнения черезextract >> transform >> load
, обрабатывая передачу параметров через XCOM вручную и описывая как минимум task_id.
Или через extract.set_downstream(transform)
transform.set_downstream(load)
Также можно задавать условия выполнения через BranchPythonOperator
и ShortCircuitOperator
.
Как хранить секреты и подключения
Можно в переменных (Admin - Variables
), они хранятся в зашифрованном виде (если в airflow.cfg настроен fernet key
– фернет это тип симметричного шифрования, этого достаточно).
Но если секреты это параметры подключения (credentials
, кредэншиалс) для баз данных и других источников, для этого идеально подходят коннекшены (Admin - Connections
). Там задаются все нужные параметры, имя коннекта, и подключение производится по нему.
Через эти же коннекшены можно разделять контура дев-тест-прод: задаётся единое имя коннекта, а в разных инстансах airflow значения могут быть разными, и соответственно вести к разным сервисам. Конечно, ещё нужно чтобы была сетевая связность, т.е. сервер airflow мог обмениваться данными с сервером базы в принципе (подробнее https://t.me/rzv_de/92).
Также можно использовать отдельные сервисы, например hashicorp Vault
.
Виды executors
В конфигурационном файле airflow.cfg
или через переменные окружения в docker container/compose
можно указать, какой executor использовать для планирования и выполнения задач.
- SequentialExecutor: Этот executor запускает задачи последовательно в одном рабочем процессе. Ставится по умолчанию при установке, не используется ни локально, ни в продакшене, т.к. не масштабируется.
- LocalExecutor: Запускает задачи параллельно в нескольких локальных процессах. Он хорошо подходит для выполнения задач на одной машине, но также плох для прода. Хорош для локальной отладки.
- CeleryExecutor: Использует Celery для распределенного выполнения задач на нескольких рабочих узлах. Он хорошо подходит для масштабируемых сред и обеспечивает высокую производительность. Выбор для прода, если нет кубера. Celery это асинхронная очередь, которая поднимается отдельным процессом/контейнером.
- KubernetesExecutor: Использует Kubernetes для запуска контейнеров с задачами. Ваш выбор, если есть кубер.
Сенсоры и poke интервал
Сенсоры по умолчанию полностью забирают worker slot (ресурс кластера, см. параллелизм), если используют poke режим.
Тогда каждые, например, 60 секунд будет отсылаться запрос “готово или не готово?”, и в соответствии с этим или запускать зависимую часть дага в работу, или продолжать ожидание.
Triggerer и deferrable оператор
Deferred
значит отсроченный. Асинхронный процесс ожидания выполнения таски в другом даге или всего дага.
Например, TriggerDagRunOperator
можно установить в режим deferrable
.
Отличается от сенсора тем, что высвобождает слот воркера и эффективно масштабируется.
Для работы deferrable операторов нужно настроить отдельный компонент airflow – triggerer
.
Чем хук отличается от оператора
Оператор – отдельный таск. Хук можно использовать внутри кастомного пайтон таска для взаимодействия на более низком уровне. Например, если нужно выполнить несколько postgresql запросов с промежуточной обработкой вычислений одним атомарным шагом.
Как установить библиотеку (пайтон пакет) на airflow, добавлять операторы через установку провайдеров
Зависит от того, как установлен airflow. Если на виртуальной машине, то почти наверняка управление идёт через обычный pip/poetry/conda
или другой пакетный менеджер.
Если в контейнере, то можно создать свой мини-образ на основе официального докер-образа, скопировать requirements
и поставить через pip install -r requirements.txt
.
Многие операторы и коннекты не доступны по умолчанию. Их нужно установить или вместе с airflow core
, или отдельно через пакетный менеджер.
pip install apache-airflow-providers-google==10.1.1 # отдельно pip install "apache-airflow[google]==2.10.1" # вместе
У провайдеров есть свой airflow hub
по аналогии с docker hub
, т.е. онлайн каталог готовых решений.
Также функционал можно расширить плагинами, но это редкость.
Что влияет на параллелизм
Достаточно сказать, что есть Pool’ы, которые ограничивают максимальное количество тасок, работающих параллельно. Стандартный пул содержит 128 слотов.
Их можно настроить через Admin - Pools
Для разных дагов и даже тасок можно задавать разные пулы. Для таски можно указать “вес” для приоритизации внутри пула. Это тонкая настройка для тех дагов, выполнение которых важнее, чем других.
Ну и, конечно, в самих дагах нужно таски выстраивать параллельно, где возможно. Оставляй только те зависимости, которые логически ограничивают таску от запуска, даже если будет выглядеть “некрасиво”. Полезно сказать и про таскгруппы, которые задают шаблон выполнения тасок и позволяют формировать их, например, в цикле.
Алертинг в телеграмм
Через оператор TelegramOperator
. Да, так просто.
Если впервые подключаешь новый сервис к взаимодействию с airflow, проверь, может есть уже написанный оператор для этого.
Датасеты (event-based scheduling)
Дают возможность запускать даг по событию, а не по чётко заданному расписанию. Например, по добавлению файла в s3 бакет, по вызову API или чему-то схожему.
Сенсоры могут делать похожее, но первый даг в цепочке как-то надо запустить. Датасеты задают такую связь, начиная с первого дага.
Могут сильно экономить время, если данные готовы раньше, чем по указанному нами времени в schedule
.
К версии 2.9+ доработали так, что почти нет ограничений. На проектах могут не использоваться просто потому что легаси кода на schedule слишком много, т.к. появились датасеты к концу 2022 года и ещё где-то года полтора шлифовались.