3. Автоматизация ETL-процессов Flashcards

1
Q

Принципы построения ETL

A
  • Чистый код — хорошим тоном считается следовать правилам pep8
  • Простота — старайтесь писать читаемый и лаконичный код.
  • Единообразие — необходимо стремиться к единым принципам построения ваших дата-пайплайнов.
  • Время выполнения пайплайна — необходимо следить за SLA, например, ежедневный процесс не должен выполняться более суток.
  • Меньше сетевого трафика — используйте только необходимые данные, не перегружая систему. В данном случае можем заметить преимущество ELT — данные выгружаются в хранилище один раз, а при использовании ETL — минимум два раза.
  • Работа с репликой — используйте только реплики источников для предотвращения неожиданных падений систем/баз данных/др.
  • Оптимизация забора данных — пользуйтесь индексами, пишите оптимальные запросы, старайтесь оптимизировать ваш дата-пайплайн.
    Способы загрузки данных (SCD) — ориентируйтесь на количество данных, чтобы принять решение о применении инкрементальной загрузки.
  • Партицирование — при возможности используйте партиции.
  • Инкрементальный пересчет витрин — стоит пересчитывать только новые данные витрины.
  • Загрузка всего без ограничений — нормально получать все данные в большинстве случаев.
  • Избавляться от неактуального — периодически проводите аудит вашего хранилища, чтобы не перегрузить его.
  • Идемподентность — (свойство при повторном запуске выдавать точно такой же результат, как при первом запуске) — отдавайте предпочтение merge перед insert для избежания возникновения дубликатов.
  • Аудиторский след — стоит хранить данные, такие как в источниках.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Преимущества Airflow

A

> Open source

> Отличная документация

> Простой код на Питоне

> Удобный веб-интерфейс

> Алертинг и мониторинг

> Интеграция с основными источниками

> Кастомизация

> Масштабирование

> Большое комьюнити

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

DAG
(определение, из чего состоит)

A

DAG (Directed Acyclic Graph) — направленный ацикличный граф. Это граф, в котором нет зацикливания. В нем могут быть разветвления, но в конце они сходятся в одной точке.

Вершины в DAG — это отдельные задачи (task).

Ребра в DAG — зависимости между задачами.

DAG-ов в Airflow может быть несчетное количество.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Task (определение, какие бывают)

A

Задача (task) — это одна из двух сущностей:

> сенсор. Они используются, когда нам нужно дождаться наступления какого-то события, например, появление строчки в БД, наступление определенного времени, появление файла в s3.

> оператор. Это действие, которое выполняет конкретная задача, например, забирает данные из базы, отсылает письма, вычитывает данные из API, производит вычисления.
Задачи объединяются в DAG по смыслу.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Процесс выполнения задач

A

Сначала запускаются таски, которые не имеют предшественников. После того, как такие задачи отработали, запускаются зависимые от них таски. Это продолжается, пока мы не дойдем до конца DAG, то есть пока все задачи не будут обработаны.

Если при выполнении задача упала, то она переходит в состояние retry и через определенное время (по умолчанию 10 минут) перезапускается. Если после 3-х (этот параметр тоже настраивается) перезапусков таск так и не смог выполниться успешно, то он переводится в состояние failed. Все последующие за ним задачи переводятся в состояние upstream failed и выполнены не будут. В таком случае DAG переходит в состояние failed.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Компоненты Airflow

A

> Web Server Airflow
Sheduler
Executor
Worker
Metadata Database

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Web Server Airflow

A

отвечает за пользовательский интерфейс и дает возможность контролировать пайплайны.

Основные задачи:
* Внешний вид DAG-a
* Статус выполнения (их получает из метаданных Airflow)
* Перезапуск (как тасок, так и DAG-ов)
* Отладка

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Sheduler

A

планировщик.

  • Анализирует DAG’и (ищет те, которые готовы к запуску)
  • Создает DAG Run с конкретным execution_date
  • Создает Task Instance
  • Ставит таски в очередь

DAG Run — это экземпляр DAG-а. Каждый экземпляр характеризуется параметром execution_date (начало предыдущего периода).
Task Instance — это экземпляр задачи. Инстансы привязываются к DAG. У них так же определен параметр execution_date.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Executor

A

механизм, с помощью которого запускаются экземпляры задач. Он работает в связке с планировщиком.

Executor-ы делятся на две категории:
* локальные — исполняются на той же машине, на которой есть планировщик

  • нелокальные — могут запускать таски удаленно
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Локальные Executor

A
  • SequentialExecutor - Последовательный запуск задач;
  • LocalExecutor (По дочернему процессу на задачу);
  • DebugExecutor (Для запуска и отладки из IDE)
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Нелокальные Executor

A
  • CeleryExecutor - Требует брокер сообщений. Несколько серверов с воркерами
  • DaskExecutor - Использует Dask
  • KubernetesExecutor - Новый pod для каждого task instance
  • CeleryKubernetesExecutor - CeleryExecutor/KubernetesExecutor
  • Custom - Самописаный
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Worker

A

процесс, в котором исполняются задачи. В зависимости от executor-a он может быть размещен локально (на той же машине, что и планировщик), либо на отдельно машине/машинах

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Metadata Database

A

база метаданных. В ней хранится информация о состоянии всех пайплайнов:
* DAG (абстрактные DAG-и)

  • DAG Run (конкретные инстансы DAG-ов)
  • Task Instance
  • Variable (глобальные переменные Airflow)
  • Connection
  • Xcom

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Пул

A

У executor-ов есть ограниченное количество воркеров. Если запущен некий не самый важные DAG, который генерирует слишком много задач одновременно, то такому DAG-у нужно ограничить ресурсы с помощью пулов, чтобы другие DAG-и так же могли запускаться.

Пул — это абстрактное ограничение на количество одновременно выполняемых задач.

Каждый пул имеет ограничение по количеству слотов — количество задач, которые одновременно могут быть запущены в пуле.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Способы создания DAG (код)

A
  1. dag = DAG(аргументы)
    Самый простой вариант. Создание переменной класса DAG. В данном случае каждому таску надо обязательно привязываться к DAG’у. И внутри каждого таска явно указывать на DAG.
  2. with DAG(аргументы) as dag:
    таски без указания DAG
    Он аналогичен первому варианту, но производится через контекстный менеджер. Здесь DAG не нужно указывать внутри каждого таска, он назначается им автоматически.
  3. Этот вариант появился в Airflow2. DAG создается с помощью декоратора @dag. Нам требуется создать функцию со списком тасков внутри и обернуть ее в декоратор. Таким образом мы получаем переменную класса DAG. Эту переменную мы объявляем в глобальной области видимости, с помощью чего Airflow понимает, что внутри скрипта находится DAG.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Что такое аргументы DAG и для чего они нужны?

A

Аргументы для DAG передаются в виде словаря при создании. Он нужен для того, чтобы не только какой-то конкретный DAG знал о своем поведении, но и все таски унаследовали это поведение, когда создавались под конкретным DAG’ом.

17
Q

Примеры аргументов и их описание

A

DEFAULT_ARGS = {
‘owner’: ‘karpov’,
‘queue’: ‘karpov_queue’,
‘pool’: ‘user_pool’,
‘email’: [‘airflow@example.com’],
‘email_on_failure’: False,
‘email_on_retry’: False,
‘depends_on_past’: False,
‘wait_for_downstream’: False,
‘retries’: 3,
‘retry_delay’: timedelta(minutes=5),
‘priority_weight’: 10,
‘start_date’: datetime(2021, 1, 1),
‘end_date’: datetime(2025, 1, 1),
‘sla’: timedelta(hours=2),
‘execution_timeout’: timedelta(seconds=300),
‘on_failure_callback’: some_function,
‘on_success_callback’: some_other_function,
‘on_retry_callback’: another_function,
‘sla_miss_callback’: yet_another_function,
‘trigger_rule’: ‘all_success’
}

  • owner — отображает владельца DAG’а в интерфейсе.
    queue — отвечает за очередь, в которую становится таск. В случае нескольких воркеров, мы можем направить таски на разные воркеры. Например, если у какого-то воркера есть сетевой доступ к ресурсам, которых нет во втором. Также можно управлять нагрузкой таким образом.
  • pool — пул, в рамках которого исполняется таск.
  • email — нужен для оповещения о падении и запуске таска.
  • email_on_failure — флаг для оповещения в случае падения
  • email_on_retry — флаг для оповещения в случае перезапуска
  • depends_on_past — таск в данном DAG инстансе будет запущен только в тот момент, когда этот же таск в предыдущем DAG инстансе (в инстансе за предыдущий период) уже был отработан. То есть у нас не может быть два одинаковых таска в разных инстансах запущено одновременно.
  • wait_for_downstream — похож на предыдущий аргумент, только здесь мы ждем не окончания работы данного конкретного таска, а окончания всех тасков, которые зависят от этого. Например, мы забираем данные из источника и складываем их во временную таблицу. Этим занимается один таск, а следующий таск перекладывает данные из временной таблицы в постоянное хранилище. Таким образом, если бы у нас был включен только аргумент depends_on_past, то данные загружались бы действительно по очереди, но в тот момент, когда запускался бы второй таск (более свежий) с загрузкой данных из источника, он бы сначала стер все данные за вчерашний день (ту самую таблицу, из которой в этот момент следующий таск забирал бы данные для переноса их в хранилище). Непонятно, кто бы победил в этой гонке, но мы бы могли потенциально получить неполные данные за предыдущий день в хранилище.
  • retries — количество перезапусков таска в случае падения (1 запуск всегда основной + переданное количество в аргументе).
  • retry_delay — время между попытками перезапуска.
  • priority_weight — вес приоритета этого таска перед другими.
  • start_date — время первого запуска. Если это не сегодняшний день, то DAG отработает ровно столько раз, сколько прошло времени между указанным start_date и текущей датой. Если передать airflow.utils.dates.days_ago(3), это будет значить, что будет запущено только 3 последних инстанса (за 3 последних дня).
  • end_date — дата, после которой инстансы перестанут генерироваться.
  • sla — время, до которого мы ожидаем, что таск завершится. Если этого не случится, придет оповещение и в интерфейсе SLA появится запись о том, что этот таск завершился не вовремя.
  • execution_timeout — максимальное время выполнения таска. Если таск не успеет выполниться за это время, он будет помечен как FAILED.
  • on_failure_callback — вызов переданной функции в случае падения
  • on_success_callback — вызов переданной функции в случае успешного завершения
  • on_retry_callback — вызов переданной функции в случае перезапуска
  • sla_miss_callback — вызов переданной функции в случае пропущенного SLA
18
Q

Хуки - определение, для чего, какие бывают

A

это интерфейсы для работы с различными внешними системами. Они позволяют писать код операторов универсально, то есть для обращения к какой-то базе нужно будет заменить только хук.

CONNECTIONS
Соединения нужны для системного управления параметрами подключения к различным системам. У каждого connection есть уникальный ключ (conn_id) или его название.
Их можно использовать из кода напрямую или с помощью хуков.

Пример, в котором мы с помощью BaseHook вытаскиваем пароль из конкретного соединения.
from airflow.hooks import BaseHook
import logging
logging.info(BaseHook.get_connection(‘conn_karpov_mysql’).password)

Хук — это интерфейс для соединения. В нем скрывается low-level код для работы с источником, а вся основная логика обычно передается оператору.
Какие бывают хуки:
> S3Hook
> DockerHook
> HDFSHook
> HttpHook
> MsSqlHook
> MySqlHook
> OracleHook
> PigCliHook
> PostgresHook
> SqliteHook

19
Q

Trigger Rule - что за аргумент, какие значения может принимать?

A

Он отвечает за то, в каком состоянии должны быть предыдущие таски, чтобы таск, который от них зависит, завершился. По умолчанию это значение all_success.

Возможные значения аргумента trigger_rule:

  • all_success — следующий таск запускается только тогда, когда все предыдущие отработали успешно.
  • all_failed — все предыдущие таски должны перейти в состояние FAILED, иначе текущий не встанет в очередь.
  • all_done — все предыдущие таски должны завершиться с любым результатом.
  • one_failed — текущий таск начнет свою работу в тот момент, когда хотя бы один предыдущий таск перейдет состояние FAILED
  • one_success — текущий таск начнет свою работу в тот момент, когда хотя бы один предыдущий таск перейдет состояние SUCCESS
  • none_failed — ни один из тасков, от которых зависит текущий, не должен быть в состоянии FAILED или UPSTREAM FAILED
  • none_failed_or_skipped — ни один из тасков, от которых зависит текущий, не должен быть в состоянии FAILED, UPSTREAM FAILED или SKIPPED
  • none_skipped — ни один из тасков, от которых зависит текущий, не должен быть в состоянии SKIPPED
  • dummy — таски могут быть в любом состоянии
20
Q

Операторы - определение, какие бывают

A

Операторы — параметризуемые шаблоны для тасков. Все, что делается внутри операторов, можно повторить внутри PythonOperator.

Какие бывают операторы:

BashOperator
PythonOperator
EmailOperator
PostgresOperator
MySqlOperator
MsSqlOperator
HiveOperator
SimpleHttpOperator
SlackAPIOperator
PrestoToMySqlOperator
TriggerDagRunOperator

21
Q

Сенсоры - что такое, для чего, параметры сенсора, какие бывают

A

Напомним, что сенсоры ожидают момента наступления какого-либо события.

Параметры сенсора:
* timeout — время в секундах, прежде чем сенсор перейдет в состояние FAILED

  • soft_fail — флаг, при поднятии которого сенсор при неудаче переходит в состояние SKIPPED
  • poke_interval — время в секундах между попытками, в которые конкретный сенсор будет выяснять, отработало ли событие
  • mode — poke - всегда держит занятым конкретный воркер | reschedule - когда сенсор отработал и понял, что событие, которое он ожидает, еще не наступило, он освобождает воркер и в следующий раз занимает его только после того, как прошло время, указанное в poke_interval.

В Airflow2 появился экспериментальный способ запуска сенсоров — smart sensor. Эта настройка включается для всего Airflow и в тот момент, когда она включена, сенсоры начинают запускаться в отдельном одном или нескольких (в зависимости от настроек) процессе по очереди. Мы в едином месте по очереди запускаем сенсоры, чтобы выяснить, какое из событий отработало, а соответствующий сенсор переводим в состояние SUCCESS в тот момент, когда оно произошло. Таким образом заимствуется всего несколько воркеров для того, чтобы покрыть все DAG’и сенсорами.

Какие бывают сенсоры:
* ExternalTaskSensor — логически связывает между собой DAG’и

  • SqlSensor — дожидается, когда в результатах запроса вернется хотя бы одна строка
  • TimeDeltaSensor — дожидается, когда пройдет определенное количество секунд после времени запуска DAG’a
  • HdfsSensor — ждет появления определенного файла в папке внутри HDFS
  • PythonSensor — ждет, когда Python функция вернет значение True
  • DayOfWeekSensor — ждет наступления конкретного дня

Подробнее про ExternalTaskSensor

is_payments_done = ExternalTaskSensor(
task_id=’is_payments_done’,
external_dag_id=’load_payments’,
external_task_id=’end’,
timeout=600,
allowed_status=[‘success’],
failed_states=[‘failed’, ‘skipped’],
mode=’reschedule’
)

Например, если в одном DAG’е забираются данные из источника и складываются в хранилище, а в другом поверх этих данных строятся какие-то витрины, то оба этих DAG’а должны быть логически связаны. Складывать их в единый DAG — не очень правильно, потому что они относятся к различным уровням хранилища. ExternalTaskSensor в данном случае поможет внутри DAG’а с витринами дождаться момента, когда завершится DAG с получением данных из источника.

22
Q

Операторы ветвления - какие бывают, как работают

A
  • BranchPythonOperator
  • ShortCircuitOperator
  • BranchDateTimeOperator

BranchPythonOperator:
Функция, поверх которой работает BranchPythonOperator, должна вернуть название одного или нескольких тасков, которые начнут работать после завершения этого таска. Все таски, которые не будет упомянуты в выводе из этой функции, перейдут в состояние SKIPPED и будут пропущены. Если же функция вернет None, то будут пропущены все таски, которые зависят от этого таска. В данном примере мы случайным образом выбираем один из трех последующих тасков.

ShortCircuitOperator:
Возвращает значение True или False. В первом случае все последующие таски выполняются, в обратном — нет.

BranchDateTimeOperator:
Данный оператор запускает тот или иной таск в зависимости от того, попадает ли execution date в указанный промежуток времени. Если execution date между target_lower и target_upper, то запускается таск date_in_range (может быть другое название), иначе date_outside_range (может быть другое название).

23
Q

Шаблоны Jinja

A

Шаблонизация
В Airflow для параметризации внутри DAG’ов используется Jinja. Параметризуемое выражение окружается двойными фигурными скобками и в момент определения раскрывается с помощью шаблонизатора.

Шаблон Расшифровка
{{ execution_date }} execution_date
{{ ds }} execution_date (YYYY-MM-DD)
{{ ds_nodash }} execution_date (YYYYMMDD)
{{ ts }} execution_date (2021-01-01T00:00:00+00:00)
{{ yesterday_ds }} Вчерашний день относительно execution_date
{{ tomorrow_ds }} Завтрашний день относительно execution_date
{{ var.value.my_var }} Значение ключа в глобальной переменной (словарь)
{{ var.json.my_var.path }} Значение ключа в глобальной переменной (json)
{{ conf }} airflow.cfg

Посмотреть, во что именно раскрылся шаблон, можно двумя способами:
* через CLI

  • во вкладке Rendered конкретного таска
24
Q

Макросы

A

Кроме предопределенных параметров можно еще использовать внутри шаблонов несколько python-пакетов.

Переменная Пакет в Питоне
macros.datetime datetime.datetime
macros.timedelta datetime.timedelta
macros.dateutil dateutil
macros.time datetime.time
macros.uuid uuid
macros.random random

25
Q

Передача аргументов для PythonOperator (способы)

A

Аргументы для PythonOperator
Их можно передавать следующими способами:
* op_args

  • op_kwargs
  • template_dict
  • provide_context

Рассмотрим на примере

def print_args_func(arg1, arg2, **kwargs):
logging.info(‘——————————–’)
logging.info(f’op_args, №1: {arg1}’)
logging.info(f’op_args, №2: {arg2}’)
logging.info(‘op_kwargs, №1: ‘ + kwargs[‘kwarg1’])
logging.info(‘op_kwargs, №2: ‘ + kwargs[‘kwarg2’])
logging.info(‘template_dict, gv_karpov: ‘ + kwargs[‘templates_dict’][‘gv_karpov’])
logging.info(‘templates_dict, task_owner: ‘ kwargs[‘templates_dict’][‘tasl_owner’])
logging.info(‘context’, {{ ds }}: ‘ + kwargs[‘ds’])
logging.info(‘context’, {{ tomorrow_ds }}: ‘ + kwargs[‘tomorrow_ds’])
logging.info(‘——————————–’)

print_args = PythonOperator(
task_id=’print_args’,
python_callable=print_args_func,
op_args=[‘arg1’, ‘arg2’]
op_kwargs={‘kwarg1’: ‘kwarg1’, ‘kwarg2’: ‘kwarg2’},
templates_dict={‘gv_karpov’: ‘{{ var.value.gv_karpov }},
‘task_owner’: ‘{{ task.owner }}’},
provide_context=True
)

В данной функции передается 2 позиционных аргумента arg1 и arg2, а также словарь с ключами kwarg1 и kwarg2. Также передаем еще один словарь templates_dict. Посмотрим на результат.

26
Q

XCom

A

XCom (от «cross-communication») – механизм, позволяющий таскам взаимодействовать друг с другом, поскольку по умолчанию таски полностью изолированы и могут выполняться на совершенно разных машинах.

Каждое сообщение имеет 3 параметра:

ID дага
ID таска
Ключ
Методы:

xcom_push – передаёт параметры, которые хотим получить в другом таске
xcom_pull – забирает эти параметры

27
Q

Taskflow API

A

Основная идея Taskflow API – каждый таск взаимодействует друг с другом напрямую. Т.е. результат работы одного таска — это входные параметры для другого таска и т.д.

@dag(default_args=DEFAULT_ARGS,
schedule_interval=’@daily’,
tags=[‘karpov’])
def dina_taskflow():

@task
def list_of_nums():
    return [1, 2, 3, 4, 6]

@task
def sum_nums(nums: list):
    return sum(nums)

@task
def print_sum(total: int):
    logging.info(str(total))

print_sum(sum_nums(list_of_nums()))

dina_taskflow_dag = dina_taskflow()

Как это выглядит в интерфейсе Airflow:
list_of_nums -> sum_nums - > print_sum

28
Q

SubDags

A

SubDags – дочерние даги. У SubDags две цели:

переиспользование кода
визуальная группировка

Что нужно знать про SubDag-и, чтобы ими пользоваться:
* Расписание у дага и сабдага должны совпадать
* Название сабдагов: parent.child
* Состояние сабдага и таска SubDagOperator независимы
* По возможности избегайте использование сабдагов

29
Q

TaskGroup

A

Цели:

  • Группировка задач
  • Переиспользование кода
30
Q

Динамическое создание дагов

A
  • Скрипты должны находиться в DAG_FOLDER (внутри DAG_FOLDER можно организовать хранение по папкам)
  • dag в globals() (переменная дага должна быть объявлена в глобальной области видимости)

Варианты для динамического создания дага:

  • Статическая генерация нескольких одинаковых дагов: используется конструктор дага – функция, в которую передаём параметры, после выполнения генерируется даг с нужным количеством тасков. Одному скрипту соответствует любое количество дагов.
  • Генерация дага из глобальных переменных/соединений: используется конструктор дага – функция, в которой используется глобальные переменные, которые есть в Airflow.
  • Генерация дага на основе json/yaml-файла: удобно для работы с БД. Например, для каждой таблицы БД создаётся свой словарь json с параметрами, далее автопилот (код, который генерирует даги из json) смотрит на эти параметры и выстраивает по ним даг.
31
Q

Best Practice

A
  • Сохраняйте идемподентность: результат работы одного таска при перезапуске не должен изменяться
  • Не храните пароли в коде: используйте connection-ы
  • Не храните файлы локально: используйте промежуточные хранилища, HDFS, S3 и т.д.
  • Убирайте лишний код верхнего уровня: всю логику переносите в код таска
  • Не используйте переменные Airflow: загружайте переменные из jinja, загружайте переменные внутри таска, используйте переменные окружения