Spark Flashcards
Что такое Spark? Зачем нужен? Где использовали?
Теги: #wildberries
Spark – это распределённая вычислительная платформа для быстрой обработки больших данных (batch/stream). Используется для ETL, аналитики, machine learning и т.д. Применяется во множестве компаний (Netflix, Uber, e-commerce), где важна высокая скорость анализа данных.
Spark Streaming: near real-time или real-time? Сравнение Spark vs Flink
Теги: #wildberries
Spark Streaming обеспечивает near real-time обработку (с микробатчами), тогда как Flink может работать ближе к real-time (стриминг на уровне событий). Flink эффективнее для постоянного потока, Spark удобен при смешанных задачах batch + stream.
Что знаешь про PyArrow? Как взаимодействует со Spark?
Теги: #wildberries
PyArrow – это библиотека для работы со столбцовыми данными в памяти (формат Apache Arrow). В Spark её используют для ускоренной передачи данных между Python и JVM (Vectorized UDF и оптимизация сериализации/десериализации).
Функции трансформации и действия, в чём разница, примеры?
Теги: #Ярослав #Билайн, #rubbles
В Apache Spark существуют два основных типа операций: трансформации и действия. Они имеют разные характеристики и поведение при работе с RDD, DataFrame и Dataset.
Трансформации
Определение: Трансформации — это операции, которые преобразуют один RDD (или DataFrame, Dataset) в другой. Они ленивые (lazy), что означает, что они не выполняются немедленно, а откладываются до тех пор, пока не будет вызвано действие.
Примеры трансформаций:
map(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD.
filter(func): Возвращает новый RDD, содержащий только элементы, для которых функция возвращает True.
flatMap(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD, где каждый элемент может быть разделен на несколько элементов.
groupByKey(): Группирует значения по ключу и возвращает новый RDD с ключами и списками значений.
Действия
Определение: Действия — это операции, которые возвращают результат или производят побочный эффект. Они вызывают выполнение всех трансформаций, которые были применены к RDD до момента вызова действия.
Примеры действий:
collect(): Собирает все элементы RDD и возвращает их в виде списка.
count(): Возвращает количество элементов в RDD.
first(): Возвращает первый элемент RDD.
reduce(func): Объединяет элементы RDD с использованием указанной функции.
saveAsTextFile(path): Сохраняет RDD в виде текстового файла по указанному пути
Что такое Spark Catalyst и как работает?
Теги: #Ярослав
Catalyst – это оптимизатор запросов в Spark SQL. Он строит деревья логического и физического плана, применяет правила оптимизации (pushdown, reordering джоинов и т.д.), а затем формирует оптимальный план выполнения. Основные фишки - Анализатор (Analyzer), Логический план (Logical Plan) Оптимизатор (Optimizer) Физический план (Physical Plan) Выбор плана и генерация кода
Какие есть коннекторы у Spark? Особенности коннектора к Greenplum?
Теги: #wildberries
Общие коннекторы
* JDBC: универсальный способ для подключения к реляционным базам (PostgreSQL, MySQL, Greenplum и т.д.).
* Kafka: чтение/запись стриминговых данных.
* HDFS/S3: работа с файлами в распределённом хранилище.
* Cassandra и прочие NoSQL-хранилища.
Greenplum
* Через JDBC: требуется соответствующий драйвер и указание url, table, user, password. Работает «из коробки», но не всегда максимально эффективно для больших данных.
* PXF (Parallel eXtensible Framework): специальный коннектор, который умеет параллельно выгружать и загружать данные в Greenplum. Он устанавливается на стороне Greenplum и позволяет Spark обращаться к Greenplum, используя параллельные потоки (каждый сегмент Greenplum может читать/записывать данные самостоятельно).
Основная особенность PXF — параллелизм: данные распределяются по сегментам Greenplum, а Spark читает и пишет в эти сегменты напрямую, что даёт высокую скорость обмена.
Spark: чему соответствует каждая job в коде?
Теги: #Cian
В Spark есть две ключевые концепции:
Трансформации (transformations) — «ленивые» операции (map, filter, join и т.д.), которые только формируют план вычислений (DAG), но сами не запускают обработку данных.
Действия (actions) — операции, которые действительно запускают вычисления (например, count, collect, show).
Каждый раз, когда в коде встречается action, Spark запускает новую job.
Одна job может включать в себя несколько stages — этапов выполнения, разбитых по границам shuffle (см. вопрос про stages).
Из-за чего job разделяется на одну и несколько stages?
Теги: #Cian
Spark разбивает job на stages, когда возникает операция, требующая shuffle – широкая трансформация.
- Нarrow-трансформации (map, filter, etc.) не требуют shuffle: данные передаются по цепочке (pipe-line) в рамках одного executor.
- Wide-трансформации (reduceByKey, join, groupBy и т.д.) вызывают shuffle, то есть нужно перегруппировать данные между executor’ами.
Как только в плане встречается shuffle, Spark завершает текущий этап (stage) и начинает новый после shuffle. Поэтому job может иметь один или несколько stages.
Какие есть типы джойнов в Spark (логические и физические)? Как работает SortMergeJoin?
Теги: #Cian, #Билайн, #мир #Ярослав
Логические типы джойнов (указываются в коде, например, df.join(df2, “key”, “inner”)):
* Inner (внутренний)
* Left (левый)
* Right (правый)
* Full (полный)
* Cross (перекрёстный / декартово произведение)
После того, как в плане (DAG) сформирована операция join, оптимизатор (Catalyst) выбирает конкретный физический метод соединения (join strategy).
Основные виды физический метод соединения:
1. Broadcast Hash Join
o Маленькая таблица (по умолчанию до ~10-11 МБ) «рассылается» (broadcast) на все исполнители.
o Хранится в памяти как хэш-таблица. Крупная таблица стримится и быстро ищет совпадения в хэше.
o Очень эффективен, если одна из таблиц невелика.
2. Shuffle Hash Join
o Оба набора данных разбиваются (shuffle) по ключу и распределяются между исполнителями.
o На каждом исполнителе строится хэш-таблица (или две) по сегменту данных.
o Подходит для больших таблиц, но требует shuffle, что может быть затратным по сети.
3. Sort-Merge Join
o Оба набора данных сначала сортируются по ключу (также с shuffle), затем «сшиваются» (merge).
o Эффективен при больших объёмах данных, если их удобно отсортировать, особенно когда данные уже отсортированы или легко партиционируются.
4. Adaptive Query Execution (AQE)
o Механизм динамического изменения плана во время выполнения. Например, Spark может переключаться с Sort-Merge на Broadcast Hash Join, если видит, что реальный объём данных меньше ожидаемого.
client / cluster mode в спарке
Теги: #Cian
В client mode драйвер запускается локально, а executors – в кластере. В cluster mode драйвер также запускается внутри кластера (например, на Yarn или Kubernetes).
Параметры spark submit
Теги: #Ярослав
Основные:
* Spark Submit — это команда, используемая для запуска приложений Apache Spark на кластере. Она принимает множество параметров, которые позволяют настраивать поведение приложения и управлять ресурсами. Вот основные параметры spark-submit:
–class: указывает главный класс приложения.
–master: определяет мастер-узел Spark
–deploy-mode: режим развертывания приложения: client или cluster.
–name: задает имя приложения в интерфейсе управления Spark.
–executor-memory: определяет объем памяти для каждого исполнителя
–driver-memory: объем памяти для драйвера
–conf: позволяет задать дополнительные настройки Spark.
–executor-cores: количество ядер CPU для каждого исполнителя.
–num-executors: общее количество исполнителей
–files: передает дополнительные файлы в приложение, которые будут доступны в рабочей директории
Чем кэширование отличается от broadcast join?
Теги: #мир
Кэширование (cache/persist)
1. Сохраняет результат вычислений (RDD/DataFrame) в оперативной памяти (или на диск, если не хватает памяти)
2. Позволяет переиспользовать уже рассчитанные данные при повторных обращениях, сокращая время вычислений
3. Типичное применение: несколько разных действий (actions) на одном и том же наборе данных
Broadcast Join
1. «Рассылает» (broadcast) маленькую таблицу/датасет на все узлы кластера, где она хранится в памяти
2. Ускоряет join с большой таблицей, поскольку не требует shuffle
3. Типичное применение: одна таблица очень мала, а другая – большая
Это два разных механизма оптимизации: кэширование помогает при многократном использовании одного набора данных, а broadcast join ускоряет соединение (join), когда один из датасетов невелик.
Что такое spill?
Теги: #Ярослав
Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill - сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти. Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение - увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ.
Чем отличаются RDD, DataFrame и Dataset? В чём особенности каждого?
Теги: #мир, #Билайн #Ярослав
RDD (Resilient Distributed Dataset) — низкоуровневая структура данных в Spark, которая предоставляет API для работы с распределенными данными. RDD immutable (неизменяемый) и поддерживает ленивые вычисления. Подходит для гибких, нестандартных операций с данными, но не имеет оптимизаций и схемы, что снижает производительность.
DataFrame — это схематизированный набор данных, представленный в виде таблицы с колонками и строками. DataFrame поддерживает API с SQL-подобными операциями, использует Catalyst-оптимизатор и Tungsten для выполнения запросов, что повышает производительность. DataFrame легче использовать для структурированных данных и анализа, но он менее гибок, чем RDD, и не поддерживает строгую типизацию.
Dataset — объединяет свойства RDD и DataFrame, добавляя строгую типизацию данных (type-safe). Dataset компилируется с проверкой типов, использует Catalyst и Tungsten, что улучшает производительность. Поддерживает SQL-подобный API и предоставляет методы RDD для трансформаций. Полноценно доступен только в Scala и Java, что ограничивает использование в Python.
Краткое сравнение
RDD: Гибкость, отсутствие схемы, нет оптимизаций. Подходит для нестандартных и сложных вычислений.
DataFrame: SQL-подобный API, автоматические оптимизации, высокая производительность. Предпочтителен для структурированных данных.
Dataset: Строгая типизация, производительность DataFrame, поддержка RDD API. Лучший выбор для больших типизированных данных в Scala и Java.
Какие бывают типы кэширования?
Теги: #мир
Memory Only
o Сохраняет данные в оперативной памяти, при нехватке – пересчитывает недостающие партиции.
Memory and Disk
o Пытается сохранить данные в память, а излишек складывает на диск.
Disk Only
o Сохраняет данные только на диск, не используя память.
Memory Only Serialized
o Хранит данные в памяти в сериализованном виде (например, с помощью Kryo), что экономит память, но требует затрат на сериализацию/десериализацию.
Memory and Disk Serialized
o Аналогично предыдущему пункту, но при нехватке памяти выгружает сериализованные данные на диск.
При выборе метода учитывают: объём данных, скорость доступа и избыточность вычислений (что «дешевле» – пересчитать или сохранить?).
Как выбрать количество ядер на executor? Как увеличение числа ядер влияет на память (OutOfMemory)?
Теги: #мир
Выбор числа ядер
* Обычно ориентируются на общее количество доступных CPU в кластере и на необходимость параллелизма в задачах.
* Часто выделяют 2–5 ядер на один executor (есть много тонкостей, например, оставлять немного CPU для системных процессов).
- Влияние на память
- Чем больше ядер, тем больше параллельных tasks (потоков) будет выполняться на одном executor.
- Каждый task потребляет часть памяти, поэтому при росте числа ядер возрастает риск OutOfMemory (OOM), если не увеличить executor memory пропорционально.
Архитектура Spark - что под капотом?
Теги: #Ярослав
Компоненты архитектуры Spark:
Драйвер (Driver): запускает пользовательский код и управляет DAG, этапами, задачами.
Менеджер кластеров (Cluster Manager): управляет ресурсами в кластере (может быть YARN, Mesos или Standalone).
Воркеры (Worker Nodes): физические или виртуальные машины, на которых выполняются исполнители.
Исполнители (Executors): процессы на воркерах, выполняющие задачи и управляют памятью для RDD.
Что такое udf?
Теги: #Яроcлав
Пользовательские функции, написанные в Spark. Обычно плохо оптимизируются движком при анализе кода, являются плохой практикой. Старайся избегать в production коде, если на Python, только если никак нельзя обойтись стандартными методами и типами данных.
Проблема в том, что для исполнения Python UDF нужно перенести данные к коду и вернуть обратно. Тот же Spark SQL обычно выполняется напрямую на данных (переносит код к данным), поэтому может быть в разы быстрее.
С UDF на Scala я опыта не имел, но и спрашивают редко. Тоже будет медленнее встроенных трансформаций, но лучше, чем PySpark UDF, т.к. работает напрямую.
Как ограничить количество ресурсов в Спарк? Можно ли через YARN это сделать?
Теги: #мир
Параметры spark-submit
o –executor-memory (память на executor)
o –executor-cores (число ядер на executor)
o –num-executors (число executors)
Настройки YARN
o Ограничения на приложение (Application Master и executors) по памяти и ядрам можно выставлять в конфигурации YARN (например, в Capacity Scheduler или Fair Scheduler).
o YARN будет распределять ресурсы и не даст приложению выйти за заданные пределы.
Таким образом, вы можете управлять ресурсами как из Spark, так и со стороны YARN.
Как происходит выполнение (вычисления) в Spark? Что под капотом у Spark?
Теги: #Билайн, #rubbles
Логический план
o При написании трансформаций (map, filter, join и т.д.) Spark формирует логический DAG (граф вычислений).
Catalyst Optimizer
o Оптимизатор Spark, который преобразует логический план в физический (решает, какой тип join использовать, как распределить данные и т.п.).
Запуск job
o Когда встречается action (например, collect, count), Spark запускает job (задачу), которая может состоять из нескольких stages.
Stages и tasks
o Stage – это этап, ограниченный shuffle (широкими трансформациями).
o Каждый stage разбивается на tasks по числу партиций данных.
Driver и Executors
o Driver координирует выполнение, отслеживает прогресс, распределяет задачи.
o Executors на рабочих узлах (worker nodes) выполняют tasks.
В итоге Spark масштабирует вычисления благодаря распределению задач (tasks) по executors и управлению этими задачами через Driver.
Как изменить степень параллелизма в Spark?
Теги: #Билайн #Ярослав
Параллелизм в Spark — это распределение задач между узлами кластера для ускорения обработки данных. Spark реализует параллелизм на нескольких уровнях:
- Разделы (Partitions)
Данные разбиваются на части, называемые разделами (partitions), и каждый раздел обрабатывается отдельно. Больше разделов — больше параллельных задач. Количество разделов можно изменить с помощью repartition и coalesce. - Задачи (Tasks)
Каждый раздел обрабатывается одной задачей (task) на исполнителе (executor), что позволяет Spark обрабатывать несколько разделов одновременно. Число задач в каждом этапе (stage) равно количеству разделов. - Исполнители (Executors)
Исполнители — процессы на узлах, которые выполняют задачи. Каждый исполнитель использует пул потоков для обработки задач параллельно. Число исполнителей и потоков можно настроить для повышения параллелизма. - Этапы (Stages) и Шаффлинг (Shuffle)
Spark разбивает вычисления на этапы, которые выполняются параллельно до момента шаффлинга — обмена данными между узлами. Шаффлинг замедляет выполнение, так как требует сетевых операций, и Spark минимизирует его, чтобы повысить производительность. - Настройки параллелизма
Ключевые параметры:
spark.default.parallelism — задает количество разделов для RDD.
spark.sql.shuffle.partitions — определяет количество разделов для операций с шаффлингом в SQL/DataFrame.
spark.executor.cores и spark.executor.instances — управляют количеством потоков и исполнителей на узел. - Управление ресурсами кластера
Spark-кластеры (например, на YARN или Kubernetes) распределяют задачи по узлам на основе доступных ресурсов, что позволяет максимизировать параллелизм с учетом доступных CPU и памяти.
Как бороться с перекосом данных (data skew)?
Теги: #Ярослав
Перекос данных (data skew) возникает, когда данные распределены по разделам (partitions) неравномерно, и некоторые из них получают существенно больше данных. Это приводит к увеличению времени выполнения задач и перегрузке памяти на отдельных узлах. Spark предлагает несколько подходов для борьбы с этим:
- Переразбиение данных (Repartitioning)
Использование repartition: перераспределение данных с увеличением количества разделов может уменьшить нагрузку на один раздел.
Ручное управление разбиением: можно задать ключи для более равномерного распределения данных. - Семплирование данных (Sampling)
Семплирование: сделайте выборку данных, чтобы обнаружить узкие места, и затем создайте дополнительные разделы для ключей, вызывающих перекос. - Добавление случайного ключа (Salting)
Добавьте случайное значение к ключам для перераспределения данных, чтобы нагрузка не концентрировалась на одних и тех же ключах.
Пример: если ключ “key1” вызывает перекос, можно создать несколько версий, например, key1_1, key1_2. Затем на этапе объединения данных убрать этот “соль” (salt). - Использование broadcast join
При перекосе данных в джоинах с маленькой таблицей используйте broadcast join: небольшая таблица передается на все узлы, и перекос при соединении минимизируется.
Включите автоматический broadcast join с помощью spark.sql.autoBroadcastJoinThreshold.
5. Выборка ключей с перекосом и их обработка отдельно
Найдите ключи, вызывающие перекос, обработайте их отдельно (например, в отдельном DataFrame) и соедините результат с основными данными после выполнения операций.
- Агрегация данных перед джойном
Если перекос возникает в джоинах, попробуйте сначала агрегировать данные по ключам до соединения, что может уменьшить количество данных и нагрузку.
Какая нагрузка по вычислениям? Есть ли spill’ы? #Иннотех
- Нагрузка по вычислениям зависит от:
1. Объёма данных: чем больше данных, тем выше объём обработки.
2. Типа операций:
o Широкие трансформации (join, groupBy, reduceByKey) вызывают shuffle и требуют перераспределения данных.
o Узкие трансформации (map, filter) работают в рамках текущих партиций. - Spill (пролив на диск) происходит, когда Spark не хватает памяти для хранения временных структур (например, при shuffle или сортировке).
- Spark «выгружает» часть данных на диск, чтобы освободить оперативную память.
- Spill может замедлять вычисления, поскольку доступ к диску медленнее, чем к памяти.
- Проверить наличие spill’ов можно в логах Spark (или через Spark UI, в разделе Tasks/Stages видна информация о spill на диск).
Какая разница между Coalesce и Repartition?
Теги: #Ярослав
- coalesce и repartition — это методы переразбиения (reshuffling) данных в Spark, но с разными подходами и назначением.
- coalesce
Используется для уменьшения количества разделов (partitions).
Работает без шаффлинга (reshuffling) данных между узлами, если данные остаются на тех же узлах.
Обычно быстрее и эффективнее, так как не требует передачи данных по сети.
Идеален для оптимизации разделов, если данные стали меньше на более поздних стадиях обработки, например, после фильтрации.
Пример использования:
scala
val reducedData = largeData.coalesce(4)
Когда использовать:
Если нужно уменьшить количество разделов.
Для увеличения производительности на финальных этапах, когда данные уже обработаны и требуется только сохранить их в меньшем количестве разделов. - repartition
Может использоваться как для уменьшения, так и для увеличения числа разделов.
Требует шаффлинга данных, что приводит к перераспределению по всем узлам кластера.
Подходит для случаев, когда необходимо равномерно распределить данные или переразбить их для параллельной обработки, что актуально на ранних этапах анализа данных.
Пример использования:
scala
val repartitionedData = data.repartition(10)
Когда использовать:
Для увеличения числа разделов (например, чтобы увеличить параллелизм на этапе вычислений).
При необходимости перераспределения данных равномерно по кластеру, особенно если исходные разделы дисбалансированы.