Airflow Flashcards

1
Q

Что это простыми словами

A

Apache Airflow это оркестратор, который умеет ставить задачи на расписание, следить за статусом их выполнения и “триггерить” (запускать) команды по условию “все подготовительные задачи выполнены успешно”. Это НЕ ETL инструмент. Это сервис, который раздаёт команды другим сервисам и обрабатывает результат выполнения этих команд.

Уже написано много готовых операторов, которые стандартизуют работу с источниками и таргетами (S3, базы данных, API сервисы). Причём для Postgres будет свой оператор, а для Clickhouse – свой.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Архитектура, термины

A

DAG (directed acyclic graph, даг) – граф выполнения, атомарная единица в airflow. Определяет параметры запуска первого шага (с каким расписанием или по какому событию), взаимосвязь между шагами. Даги это python файлы, написанные с использованием пакета airflow, и лежащие в директории с дагами (обычно ./dags).

Task (таска, шаг, степ) – узел графа, который выполняет полезную работу или определяет условие запуска какой-то из веток графа. Например, нужно ли пропустить все остальные шаги или продолжаем работу. Или может быть общий даг для 10 типов коннекторов и на основе входных параметров нужно запустить только задачи для указанного типа коннектора. Таска может быть: оператором, сенсором или python кодом с декоратором @task.

Оператор – заранее написанный код, который можно сконфигурировать для выполнения нужной задачи. Например, PostgresOperator может принимать на вход sql и postgres_conn_id, что выполнять и где выполнять.

Сенсор используется для связи дагов между собой. Единственное назначение – раз в poke интервал проверять, что связанный таск выполнен успешно. Подробнее ниже.

Scheduler (шедУлер, планировщик) – компонент, который ставит на расписание даги и таски, следит за их статусами, переводит из одного статуса в другой. Он же сканирует папку с дагами и импортирует их. Здесь же задаётся тип executor’a – механизма, который определяет как именно будут запущены инстансы тасок.

Worker (воркер) – отдельный процесс, собственно выполняющий задачи. На проде обычно много воркеров.

Webserver – UI для airflow, запускает “вебсайт” для взаимодействия с дагами и тасками. Полезно для просмотра логов, создания и изменения коннектов и переменных. Даг менять через интерфейс НЕЛЬЗЯ, только через код.

Triggerer – необязательный компонент, который нужен для запуска deferrable операторов, которыми можно заменять сенсоры. Подробнее ниже

Очень полезная схема: переход между статусами таск инстансов, связывает компоненты между собой
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#task-instances

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Самописный оператор

A

Нужен тогда, когда хочется переиспользовать какой-то пайтон код, а готового оператора нет. Например, для связи с внутренним сервисом компании или непопулярным внешним API. Для google ads точно есть оператор, для яндекс погоды – вряд ли.

Наследуется от BaseOperator, должен определить метод execute.
Пример:

from airflow.models.baseoperator import BaseOperator

class HelloOperator(BaseOperator):
    def \_\_init\_\_(self, name: str, **kwargs) -> None:
        super().\_\_init\_\_(**kwargs)
        self.name = name

    def execute(self, context):
        message = f"Hello {self.name}"
        print(message)
        return message

Для поддержки оператором jinja шаблонов нужно явно прописать поля, в которых парсеру дага нужно будет искать {{ }}.

   template_fields: Sequence[str] = ("name",)

Использовать можно так:

with dag:
    hello_task = HelloOperator(
        task_id="task_id_1",
        name="{{ task_instance.task_id }}",
        world="Earth",
    )
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Catchup, backfill

A

Если у дага указаны start_date и schedule, scheduler запланирует выполнение дага для каждого интервала, который ещё не отработал. Это поведение называется catchup (кэтчап).
То же произойдёт, если даг поставить на паузу на какое-то время, а потом – снова включить.
При этом можно указатьexecution_timeout, чтобы запуск дага не работал дольше, чем интервал запуска. Например, дольше часа при запуске каждый час (иначе очередь будет накапливаться, а процесс никогда не закончится).

Бэкфилл позволяет загрузить данные за определённый период, например с даты 1 по дату 2. Запустит все даг раны в указанном диапазоне, даже если они уже отработали успешно раньше.
Запускается только через CLI:

airflow dags backfill \
    --start-date START_DATE \
    --end-date END_DATE \
    dag_id

Чтобы бэкфилл и catchup работали, нужно в таске указывать связь между execution_date и условием фильтрации данных, например select … where dt = {{ execution_date }} https://airflow.apache.org/docs/apache-airflow/stable/templates-ref.html

Также полезно упомянуть:
depends_on_past – добавляет ограничение – только если предыдущий запуск дага/таски успешен, можно запускать текущий. Не срабатывает в первый запуск по start_date.
max_active_runs – ограничение на количество одновременно работающих дагов.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Передача данных между тасками

A

Производится через некоторый внешний сервис. Или нужно сохранить в файл (локально на ВМ/контейнер airflow, S3, hdfs и тд) и в следующем шаге оттуда считать. Или сложить в базу в промежуточную схему и потом читать оттуда.

Ещё раз полезно напомнить, что Airflow это оркестратор, который отправляет задачи внешним системам и следит за их исполнением, а не ETL инструмент.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

XCOM

A

Протокол обмена мета-данными между тасок через бэкенд-базу airflow. Типа “икс-ком, кросс-коммьюникейшн”.

Обычно база это постгрес, и в теории в одной ячейке таблицы может и гигабайт храниться, и больше, но это неоптимальный способ + база будет очень сильно раздуваться и замедляться из-за этого. Сам протокол спроектирован именно под мета-данные, какие-то параметры запуска таска, но не данные, над которыми нужно произвести обработку. Поэтому пандас датасеты не стоит передавать “как есть” между тасками.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

TaskFlow, последовательность выполнения тасок

A

Удобный способ определения последовательности выполнения тасок и передачи мета-данных между ними. Введён в 2.0.0 airflow. Использует декораторы для определения тасок (и дагов). Пример: https://airflow.apache.org/docs/apache-airflow/stable/tutorial/taskflow.html

Поясню, что здесь происходит.
После определения тасок указывается их запуск в таком порядке:

    order_data = extract() # вызывается экстракт таск и результат передаётся в order_data переменную
    order_summary = transform(order_data) # вызывается трансформ таск с параметром, туда передаётся результат выполнения предыдущей таски
    load(order_summary["total_order_value"]) # аналогично предыдущему шагу, но со значением словаря по ключу

По-старинке можно было бы задать порядок выполнения через
extract >> transform >> load, обрабатывая передачу параметров через XCOM вручную и описывая как минимум task_id.
Или через
extract.set_downstream(transform)
transform.set_downstream(load)

Также можно задавать условия выполнения через BranchPythonOperator и ShortCircuitOperator.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Как хранить секреты и подключения

A

Можно в переменных (Admin - Variables), они хранятся в зашифрованном виде (если в airflow.cfg настроен fernet key – фернет это тип симметричного шифрования, этого достаточно).

Но если секреты это параметры подключения (credentials, кредэншиалс) для баз данных и других источников, для этого идеально подходят коннекшены (Admin - Connections). Там задаются все нужные параметры, имя коннекта, и подключение производится по нему.

Через эти же коннекшены можно разделять контура дев-тест-прод: задаётся единое имя коннекта, а в разных инстансах airflow значения могут быть разными, и соответственно вести к разным сервисам. Конечно, ещё нужно чтобы была сетевая связность, т.е. сервер airflow мог обмениваться данными с сервером базы в принципе (подробнее https://t.me/rzv_de/92).

Также можно использовать отдельные сервисы, например hashicorp Vault.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Виды executors

A

В конфигурационном файле airflow.cfg или через переменные окружения в docker container/compose можно указать, какой executor использовать для планирования и выполнения задач.

  1. SequentialExecutor: Этот executor запускает задачи последовательно в одном рабочем процессе. Ставится по умолчанию при установке, не используется ни локально, ни в продакшене, т.к. не масштабируется.
  2. LocalExecutor: Запускает задачи параллельно в нескольких локальных процессах. Он хорошо подходит для выполнения задач на одной машине, но также плох для прода. Хорош для локальной отладки.
  3. CeleryExecutor: Использует Celery для распределенного выполнения задач на нескольких рабочих узлах. Он хорошо подходит для масштабируемых сред и обеспечивает высокую производительность. Выбор для прода, если нет кубера. Celery это асинхронная очередь, которая поднимается отдельным процессом/контейнером.
  4. KubernetesExecutor: Использует Kubernetes для запуска контейнеров с задачами. Ваш выбор, если есть кубер.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Сенсоры и poke интервал

A

Сенсоры по умолчанию полностью забирают worker slot (ресурс кластера, см. параллелизм), если используют poke режим.
Тогда каждые, например, 60 секунд будет отсылаться запрос “готово или не готово?”, и в соответствии с этим или запускать зависимую часть дага в работу, или продолжать ожидание.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Triggerer и deferrable оператор

A

Deferred значит отсроченный. Асинхронный процесс ожидания выполнения таски в другом даге или всего дага.
Например, TriggerDagRunOperator можно установить в режим deferrable.
Отличается от сенсора тем, что высвобождает слот воркера и эффективно масштабируется.

Для работы deferrable операторов нужно настроить отдельный компонент airflow – triggerer.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Чем хук отличается от оператора

A

Оператор – отдельный таск. Хук можно использовать внутри кастомного пайтон таска для взаимодействия на более низком уровне. Например, если нужно выполнить несколько postgresql запросов с промежуточной обработкой вычислений одним атомарным шагом.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Как установить библиотеку (пайтон пакет) на airflow, добавлять операторы через установку провайдеров

A

Зависит от того, как установлен airflow. Если на виртуальной машине, то почти наверняка управление идёт через обычный pip/poetry/conda или другой пакетный менеджер.
Если в контейнере, то можно создать свой мини-образ на основе официального докер-образа, скопировать requirements и поставить через pip install -r requirements.txt.

Многие операторы и коннекты не доступны по умолчанию. Их нужно установить или вместе с airflow core, или отдельно через пакетный менеджер.

pip install apache-airflow-providers-google==10.1.1 # отдельно
pip install "apache-airflow[google]==2.10.1" # вместе

У провайдеров есть свой airflow hub по аналогии с docker hub, т.е. онлайн каталог готовых решений.

Также функционал можно расширить плагинами, но это редкость.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Что влияет на параллелизм

A

Достаточно сказать, что есть Pool’ы, которые ограничивают максимальное количество тасок, работающих параллельно. Стандартный пул содержит 128 слотов.

Их можно настроить через Admin - Pools

Для разных дагов и даже тасок можно задавать разные пулы. Для таски можно указать “вес” для приоритизации внутри пула. Это тонкая настройка для тех дагов, выполнение которых важнее, чем других.

Ну и, конечно, в самих дагах нужно таски выстраивать параллельно, где возможно. Оставляй только те зависимости, которые логически ограничивают таску от запуска, даже если будет выглядеть “некрасиво”. Полезно сказать и про таскгруппы, которые задают шаблон выполнения тасок и позволяют формировать их, например, в цикле.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Алертинг в телеграмм

A

Через оператор TelegramOperator. Да, так просто.
Если впервые подключаешь новый сервис к взаимодействию с airflow, проверь, может есть уже написанный оператор для этого.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Датасеты (event-based scheduling)

A

Дают возможность запускать даг по событию, а не по чётко заданному расписанию. Например, по добавлению файла в s3 бакет, по вызову API или чему-то схожему.

Сенсоры могут делать похожее, но первый даг в цепочке как-то надо запустить. Датасеты задают такую связь, начиная с первого дага.

Могут сильно экономить время, если данные готовы раньше, чем по указанному нами времени в schedule.

К версии 2.9+ доработали так, что почти нет ограничений. На проектах могут не использоваться просто потому что легаси кода на schedule слишком много, т.к. появились датасеты к концу 2022 года и ещё где-то года полтора шлифовались.