Spark Flashcards

1
Q

Что такое Spark? Зачем нужен? Где использовали?

Теги: #wildberries

A

Spark – это распределённая вычислительная платформа для быстрой обработки больших данных (batch/stream). Используется для ETL, аналитики, machine learning и т.д. Применяется во множестве компаний (Netflix, Uber, e-commerce), где важна высокая скорость анализа данных.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Spark Streaming: near real-time или real-time? Сравнение Spark vs Flink

Теги: #wildberries

A

Spark Streaming обеспечивает near real-time обработку (с микробатчами), тогда как Flink может работать ближе к real-time (стриминг на уровне событий). Flink эффективнее для постоянного потока, Spark удобен при смешанных задачах batch + stream.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Что знаешь про PyArrow? Как взаимодействует со Spark?

Теги: #wildberries

A

PyArrow – это библиотека для работы со столбцовыми данными в памяти (формат Apache Arrow). В Spark её используют для ускоренной передачи данных между Python и JVM (Vectorized UDF и оптимизация сериализации/десериализации).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Функции трансформации и действия, в чём разница, примеры?

Теги: #Ярослав #Билайн, #rubbles

A

В Apache Spark существуют два основных типа операций: трансформации и действия. Они имеют разные характеристики и поведение при работе с RDD, DataFrame и Dataset.

Трансформации
Определение: Трансформации — это операции, которые преобразуют один RDD (или DataFrame, Dataset) в другой. Они ленивые (lazy), что означает, что они не выполняются немедленно, а откладываются до тех пор, пока не будет вызвано действие.
Примеры трансформаций:
map(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD.
filter(func): Возвращает новый RDD, содержащий только элементы, для которых функция возвращает True.
flatMap(func): Применяет функцию к каждому элементу RDD и возвращает новый RDD, где каждый элемент может быть разделен на несколько элементов.
groupByKey(): Группирует значения по ключу и возвращает новый RDD с ключами и списками значений.

Действия
Определение: Действия — это операции, которые возвращают результат или производят побочный эффект. Они вызывают выполнение всех трансформаций, которые были применены к RDD до момента вызова действия.
Примеры действий:
collect(): Собирает все элементы RDD и возвращает их в виде списка.
count(): Возвращает количество элементов в RDD.
first(): Возвращает первый элемент RDD.
reduce(func): Объединяет элементы RDD с использованием указанной функции.
saveAsTextFile(path): Сохраняет RDD в виде текстового файла по указанному пути

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Что такое Spark Catalyst и как работает?

Теги: #Ярослав

A

Catalyst – это оптимизатор запросов в Spark SQL. Он строит деревья логического и физического плана, применяет правила оптимизации (pushdown, reordering джоинов и т.д.), а затем формирует оптимальный план выполнения. Основные фишки - Анализатор (Analyzer), Логический план (Logical Plan) Оптимизатор (Optimizer) Физический план (Physical Plan) Выбор плана и генерация кода

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Какие есть коннекторы у Spark? Особенности коннектора к Greenplum?

Теги: #wildberries

A

Общие коннекторы
* JDBC: универсальный способ для подключения к реляционным базам (PostgreSQL, MySQL, Greenplum и т.д.).
* Kafka: чтение/запись стриминговых данных.
* HDFS/S3: работа с файлами в распределённом хранилище.
* Cassandra и прочие NoSQL-хранилища.

Greenplum
* Через JDBC: требуется соответствующий драйвер и указание url, table, user, password. Работает «из коробки», но не всегда максимально эффективно для больших данных.
* PXF (Parallel eXtensible Framework): специальный коннектор, который умеет параллельно выгружать и загружать данные в Greenplum. Он устанавливается на стороне Greenplum и позволяет Spark обращаться к Greenplum, используя параллельные потоки (каждый сегмент Greenplum может читать/записывать данные самостоятельно).

Основная особенность PXF — параллелизм: данные распределяются по сегментам Greenplum, а Spark читает и пишет в эти сегменты напрямую, что даёт высокую скорость обмена.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Spark: чему соответствует каждая job в коде?

Теги: #Cian

A

В Spark есть две ключевые концепции:
Трансформации (transformations) — «ленивые» операции (map, filter, join и т.д.), которые только формируют план вычислений (DAG), но сами не запускают обработку данных.

Действия (actions) — операции, которые действительно запускают вычисления (например, count, collect, show).

Каждый раз, когда в коде встречается action, Spark запускает новую job.
Одна job может включать в себя несколько stages — этапов выполнения, разбитых по границам shuffle (см. вопрос про stages).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Из-за чего job разделяется на одну и несколько stages?

Теги: #Cian

A

Spark разбивает job на stages, когда возникает операция, требующая shuffle – широкая трансформация.

  • Нarrow-трансформации (map, filter, etc.) не требуют shuffle: данные передаются по цепочке (pipe-line) в рамках одного executor.
  • Wide-трансформации (reduceByKey, join, groupBy и т.д.) вызывают shuffle, то есть нужно перегруппировать данные между executor’ами.

Как только в плане встречается shuffle, Spark завершает текущий этап (stage) и начинает новый после shuffle. Поэтому job может иметь один или несколько stages.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Какие есть типы джойнов в Spark (логические и физические)? Как работает SortMergeJoin?

Теги: #Cian, #Билайн, #мир #Ярослав

A

Логические типы джойнов (указываются в коде, например, df.join(df2, “key”, “inner”)):
* Inner (внутренний)
* Left (левый)
* Right (правый)
* Full (полный)
* Cross (перекрёстный / декартово произведение)
После того, как в плане (DAG) сформирована операция join, оптимизатор (Catalyst) выбирает конкретный физический метод соединения (join strategy).

Основные виды физический метод соединения:
1. Broadcast Hash Join
o Маленькая таблица (по умолчанию до ~10-11 МБ) «рассылается» (broadcast) на все исполнители.
o Хранится в памяти как хэш-таблица. Крупная таблица стримится и быстро ищет совпадения в хэше.
o Очень эффективен, если одна из таблиц невелика.
2. Shuffle Hash Join
o Оба набора данных разбиваются (shuffle) по ключу и распределяются между исполнителями.
o На каждом исполнителе строится хэш-таблица (или две) по сегменту данных.
o Подходит для больших таблиц, но требует shuffle, что может быть затратным по сети.
3. Sort-Merge Join
o Оба набора данных сначала сортируются по ключу (также с shuffle), затем «сшиваются» (merge).
o Эффективен при больших объёмах данных, если их удобно отсортировать, особенно когда данные уже отсортированы или легко партиционируются.
4. Adaptive Query Execution (AQE)
o Механизм динамического изменения плана во время выполнения. Например, Spark может переключаться с Sort-Merge на Broadcast Hash Join, если видит, что реальный объём данных меньше ожидаемого.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

client / cluster mode в спарке

Теги: #Cian

A

В client mode драйвер запускается локально, а executors – в кластере. В cluster mode драйвер также запускается внутри кластера (например, на Yarn или Kubernetes).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Параметры spark submit

Теги: #Ярослав

A

Основные:
* Spark Submit — это команда, используемая для запуска приложений Apache Spark на кластере. Она принимает множество параметров, которые позволяют настраивать поведение приложения и управлять ресурсами. Вот основные параметры spark-submit:

–class: указывает главный класс приложения.
–master: определяет мастер-узел Spark
–deploy-mode: режим развертывания приложения: client или cluster.
–name: задает имя приложения в интерфейсе управления Spark.
–executor-memory: определяет объем памяти для каждого исполнителя
–driver-memory: объем памяти для драйвера
–conf: позволяет задать дополнительные настройки Spark.
–executor-cores: количество ядер CPU для каждого исполнителя.
–num-executors: общее количество исполнителей
–files: передает дополнительные файлы в приложение, которые будут доступны в рабочей директории

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Чем кэширование отличается от broadcast join?

Теги: #мир

A

Кэширование (cache/persist)
1. Сохраняет результат вычислений (RDD/DataFrame) в оперативной памяти (или на диск, если не хватает памяти)
2. Позволяет переиспользовать уже рассчитанные данные при повторных обращениях, сокращая время вычислений
3. Типичное применение: несколько разных действий (actions) на одном и том же наборе данных

Broadcast Join
1. «Рассылает» (broadcast) маленькую таблицу/датасет на все узлы кластера, где она хранится в памяти
2. Ускоряет join с большой таблицей, поскольку не требует shuffle
3. Типичное применение: одна таблица очень мала, а другая – большая
Это два разных механизма оптимизации: кэширование помогает при многократном использовании одного набора данных, а broadcast join ускоряет соединение (join), когда один из датасетов невелик.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Что такое spill?

Теги: #Ярослав

A

Скорость спарка в основном завязана на вычислениях в оперативной памяти. Если данные не помещаются в ОЗУ, происходит spill - сохранение промежуточных результатов на диск, который в разы медленнее оперативной памяти. Может происходить как при вычислениях на executor’е, так и при сборе всех результатов на driver. Решение - увеличивать количество кусочков, на которые делятся данные (партиций) или увеличивать лимиты ОЗУ.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Чем отличаются RDD, DataFrame и Dataset? В чём особенности каждого?

Теги: #мир, #Билайн #Ярослав

A

RDD (Resilient Distributed Dataset) — низкоуровневая структура данных в Spark, которая предоставляет API для работы с распределенными данными. RDD immutable (неизменяемый) и поддерживает ленивые вычисления. Подходит для гибких, нестандартных операций с данными, но не имеет оптимизаций и схемы, что снижает производительность.

DataFrame — это схематизированный набор данных, представленный в виде таблицы с колонками и строками. DataFrame поддерживает API с SQL-подобными операциями, использует Catalyst-оптимизатор и Tungsten для выполнения запросов, что повышает производительность. DataFrame легче использовать для структурированных данных и анализа, но он менее гибок, чем RDD, и не поддерживает строгую типизацию.

Dataset — объединяет свойства RDD и DataFrame, добавляя строгую типизацию данных (type-safe). Dataset компилируется с проверкой типов, использует Catalyst и Tungsten, что улучшает производительность. Поддерживает SQL-подобный API и предоставляет методы RDD для трансформаций. Полноценно доступен только в Scala и Java, что ограничивает использование в Python.

Краткое сравнение
RDD: Гибкость, отсутствие схемы, нет оптимизаций. Подходит для нестандартных и сложных вычислений.
DataFrame: SQL-подобный API, автоматические оптимизации, высокая производительность. Предпочтителен для структурированных данных.
Dataset: Строгая типизация, производительность DataFrame, поддержка RDD API. Лучший выбор для больших типизированных данных в Scala и Java.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Какие бывают типы кэширования?

Теги: #мир

A

Memory Only
o Сохраняет данные в оперативной памяти, при нехватке – пересчитывает недостающие партиции.

Memory and Disk
o Пытается сохранить данные в память, а излишек складывает на диск.

Disk Only
o Сохраняет данные только на диск, не используя память.

Memory Only Serialized
o Хранит данные в памяти в сериализованном виде (например, с помощью Kryo), что экономит память, но требует затрат на сериализацию/десериализацию.

Memory and Disk Serialized
o Аналогично предыдущему пункту, но при нехватке памяти выгружает сериализованные данные на диск.
При выборе метода учитывают: объём данных, скорость доступа и избыточность вычислений (что «дешевле» – пересчитать или сохранить?).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

Как выбрать количество ядер на executor? Как увеличение числа ядер влияет на память (OutOfMemory)?

Теги: #мир

A

Выбор числа ядер
* Обычно ориентируются на общее количество доступных CPU в кластере и на необходимость параллелизма в задачах.
* Часто выделяют 2–5 ядер на один executor (есть много тонкостей, например, оставлять немного CPU для системных процессов).

  • Влияние на память
  • Чем больше ядер, тем больше параллельных tasks (потоков) будет выполняться на одном executor.
  • Каждый task потребляет часть памяти, поэтому при росте числа ядер возрастает риск OutOfMemory (OOM), если не увеличить executor memory пропорционально.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Архитектура Spark - что под капотом?

Теги: #Ярослав

A

Компоненты архитектуры Spark:
Драйвер (Driver): запускает пользовательский код и управляет DAG, этапами, задачами.
Менеджер кластеров (Cluster Manager): управляет ресурсами в кластере (может быть YARN, Mesos или Standalone).
Воркеры (Worker Nodes): физические или виртуальные машины, на которых выполняются исполнители.
Исполнители (Executors): процессы на воркерах, выполняющие задачи и управляют памятью для RDD.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

Что такое udf?

Теги: #Яроcлав

A

Пользовательские функции, написанные в Spark. Обычно плохо оптимизируются движком при анализе кода, являются плохой практикой. Старайся избегать в production коде, если на Python, только если никак нельзя обойтись стандартными методами и типами данных.

Проблема в том, что для исполнения Python UDF нужно перенести данные к коду и вернуть обратно. Тот же Spark SQL обычно выполняется напрямую на данных (переносит код к данным), поэтому может быть в разы быстрее.
С UDF на Scala я опыта не имел, но и спрашивают редко. Тоже будет медленнее встроенных трансформаций, но лучше, чем PySpark UDF, т.к. работает напрямую.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
19
Q

Как ограничить количество ресурсов в Спарк? Можно ли через YARN это сделать?

Теги: #мир

A

Параметры spark-submit
o –executor-memory (память на executor)
o –executor-cores (число ядер на executor)
o –num-executors (число executors)

Настройки YARN
o Ограничения на приложение (Application Master и executors) по памяти и ядрам можно выставлять в конфигурации YARN (например, в Capacity Scheduler или Fair Scheduler).
o YARN будет распределять ресурсы и не даст приложению выйти за заданные пределы.
Таким образом, вы можете управлять ресурсами как из Spark, так и со стороны YARN.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
20
Q

Как происходит выполнение (вычисления) в Spark? Что под капотом у Spark?

Теги: #Билайн, #rubbles

A

Логический план
o При написании трансформаций (map, filter, join и т.д.) Spark формирует логический DAG (граф вычислений).

Catalyst Optimizer
o Оптимизатор Spark, который преобразует логический план в физический (решает, какой тип join использовать, как распределить данные и т.п.).

Запуск job
o Когда встречается action (например, collect, count), Spark запускает job (задачу), которая может состоять из нескольких stages.

Stages и tasks
o Stage – это этап, ограниченный shuffle (широкими трансформациями).
o Каждый stage разбивается на tasks по числу партиций данных.

Driver и Executors
o Driver координирует выполнение, отслеживает прогресс, распределяет задачи.
o Executors на рабочих узлах (worker nodes) выполняют tasks.
В итоге Spark масштабирует вычисления благодаря распределению задач (tasks) по executors и управлению этими задачами через Driver.

21
Q

Как изменить степень параллелизма в Spark?

Теги: #Билайн #Ярослав

A

Параллелизм в Spark — это распределение задач между узлами кластера для ускорения обработки данных. Spark реализует параллелизм на нескольких уровнях:

  1. Разделы (Partitions)
    Данные разбиваются на части, называемые разделами (partitions), и каждый раздел обрабатывается отдельно. Больше разделов — больше параллельных задач. Количество разделов можно изменить с помощью repartition и coalesce.
  2. Задачи (Tasks)
    Каждый раздел обрабатывается одной задачей (task) на исполнителе (executor), что позволяет Spark обрабатывать несколько разделов одновременно. Число задач в каждом этапе (stage) равно количеству разделов.
  3. Исполнители (Executors)
    Исполнители — процессы на узлах, которые выполняют задачи. Каждый исполнитель использует пул потоков для обработки задач параллельно. Число исполнителей и потоков можно настроить для повышения параллелизма.
  4. Этапы (Stages) и Шаффлинг (Shuffle)
    Spark разбивает вычисления на этапы, которые выполняются параллельно до момента шаффлинга — обмена данными между узлами. Шаффлинг замедляет выполнение, так как требует сетевых операций, и Spark минимизирует его, чтобы повысить производительность.
  5. Настройки параллелизма
    Ключевые параметры:
    spark.default.parallelism — задает количество разделов для RDD.
    spark.sql.shuffle.partitions — определяет количество разделов для операций с шаффлингом в SQL/DataFrame.
    spark.executor.cores и spark.executor.instances — управляют количеством потоков и исполнителей на узел.
  6. Управление ресурсами кластера
    Spark-кластеры (например, на YARN или Kubernetes) распределяют задачи по узлам на основе доступных ресурсов, что позволяет максимизировать параллелизм с учетом доступных CPU и памяти.
22
Q

Как бороться с перекосом данных (data skew)?

Теги: #Ярослав

A

Перекос данных (data skew) возникает, когда данные распределены по разделам (partitions) неравномерно, и некоторые из них получают существенно больше данных. Это приводит к увеличению времени выполнения задач и перегрузке памяти на отдельных узлах. Spark предлагает несколько подходов для борьбы с этим:

  1. Переразбиение данных (Repartitioning)
    Использование repartition: перераспределение данных с увеличением количества разделов может уменьшить нагрузку на один раздел.
    Ручное управление разбиением: можно задать ключи для более равномерного распределения данных.
  2. Семплирование данных (Sampling)
    Семплирование: сделайте выборку данных, чтобы обнаружить узкие места, и затем создайте дополнительные разделы для ключей, вызывающих перекос.
  3. Добавление случайного ключа (Salting)
    Добавьте случайное значение к ключам для перераспределения данных, чтобы нагрузка не концентрировалась на одних и тех же ключах.
    Пример: если ключ “key1” вызывает перекос, можно создать несколько версий, например, key1_1, key1_2. Затем на этапе объединения данных убрать этот “соль” (salt).
  4. Использование broadcast join
    При перекосе данных в джоинах с маленькой таблицей используйте broadcast join: небольшая таблица передается на все узлы, и перекос при соединении минимизируется.

Включите автоматический broadcast join с помощью spark.sql.autoBroadcastJoinThreshold.
5. Выборка ключей с перекосом и их обработка отдельно
Найдите ключи, вызывающие перекос, обработайте их отдельно (например, в отдельном DataFrame) и соедините результат с основными данными после выполнения операций.

  1. Агрегация данных перед джойном
    Если перекос возникает в джоинах, попробуйте сначала агрегировать данные по ключам до соединения, что может уменьшить количество данных и нагрузку.
23
Q

Какая нагрузка по вычислениям? Есть ли spill’ы? #Иннотех

A
  • Нагрузка по вычислениям зависит от:
    1. Объёма данных: чем больше данных, тем выше объём обработки.
    2. Типа операций:
    o Широкие трансформации (join, groupBy, reduceByKey) вызывают shuffle и требуют перераспределения данных.
    o Узкие трансформации (map, filter) работают в рамках текущих партиций.
  • Spill (пролив на диск) происходит, когда Spark не хватает памяти для хранения временных структур (например, при shuffle или сортировке).
  • Spark «выгружает» часть данных на диск, чтобы освободить оперативную память.
  • Spill может замедлять вычисления, поскольку доступ к диску медленнее, чем к памяти.
  • Проверить наличие spill’ов можно в логах Spark (или через Spark UI, в разделе Tasks/Stages видна информация о spill на диск).
24
Q

Какая разница между Coalesce и Repartition?

Теги: #Ярослав

A
  • coalesce и repartition — это методы переразбиения (reshuffling) данных в Spark, но с разными подходами и назначением.
  1. coalesce
    Используется для уменьшения количества разделов (partitions).
    Работает без шаффлинга (reshuffling) данных между узлами, если данные остаются на тех же узлах.
    Обычно быстрее и эффективнее, так как не требует передачи данных по сети.
    Идеален для оптимизации разделов, если данные стали меньше на более поздних стадиях обработки, например, после фильтрации.
    Пример использования:
    scala
    val reducedData = largeData.coalesce(4)
    Когда использовать:
    Если нужно уменьшить количество разделов.
    Для увеличения производительности на финальных этапах, когда данные уже обработаны и требуется только сохранить их в меньшем количестве разделов.
  2. repartition
    Может использоваться как для уменьшения, так и для увеличения числа разделов.
    Требует шаффлинга данных, что приводит к перераспределению по всем узлам кластера.
    Подходит для случаев, когда необходимо равномерно распределить данные или переразбить их для параллельной обработки, что актуально на ранних этапах анализа данных.

Пример использования:
scala
val repartitionedData = data.repartition(10)
Когда использовать:
Для увеличения числа разделов (например, чтобы увеличить параллелизм на этапе вычислений).
При необходимости перераспределения данных равномерно по кластеру, особенно если исходные разделы дисбалансированы.

25
Как удалить дубликаты из таблицы? Теги: #Иннотех
* В Spark SQL/DF: df.dropDuplicates(["cols"]), где cols – список колонок, по которым ищем дубликаты. * Можно использовать Window-функции и фильтр по row_number(), если нужна более гибкая логика.
26
Есть большой датафрейм, который нужно обработать, но он не помещается целиком в оперативку. Что делать? Теги: #wildberries
* Распределённая обработка * Использовать кластер Spark (несколько узлов) с достаточным количеством памяти. * Spark автоматически разбивает большие данные на партиции и распределяет их между executors. * Spill на диск * Spark конфигурируется так, чтобы при нехватке памяти данные (shuffle-файлы, временные структуры) выгружались на диск. * Настройки: spark.local.dir, spark.shuffle.spill, т. п. * Управление партициями * Использовать repartition (для увеличения числа партиций) или coalesce (для уменьшения). * Слишком мало партиций → перегрузка отдельных executors. * Слишком много → большие накладные расходы на управление задачами. * Оптимизация кода * Читать только нужные колонки (select) и фильтровать данные как можно раньше, чтобы объём обрабатываемых данных был меньше.
27
Методы оптимизации Spark Теги: #Астон
* Настройка партиций * spark.sql.shuffle.partitions – ключевой параметр для join и группировок. * Balancing партиций, чтобы избежать перекоса (data skew). * Broadcast Join (для маленьких таблиц) * Избегает shuffle, так как небольшая таблица «рассылается» на все узлы. * Кэширование / persist * Сохраняет промежуточные результаты в память/на диск, если данные используются многократно. * Фильтры pushdown, выбор только нужных колонок * Экономит память и CPU, уменьшая объём данных, передаваемых из источника и обрабатываемых в Spark. * Настройка памяти * Увеличение executor-memory, driver-memory при необходимости. * Правильное число ядер (executor-cores) в сочетании с достаточной памятью, чтобы избежать OOM. * Catalyst Optimizer и AQE (Adaptive Query Execution) * Включение AQE (spark.sql.adaptive.enabled) может динамически менять план выполнения (например, переключиться на broadcast join, если таблица оказалась меньше ожидаемого).
28
Что такое Shuffle, partitionBy, широкие и узкие трансформации? Теги: #Ярослав #Cian, #wildberries #Билайн, #rubbles
Спарк выполняет вычисления на кластере – группе серверов, соединённых сетью. Передача данных по сети (shuffle) это дополнительный шаг, который нужен не для всех трансформаций. Но для того, чтобы вернуть правильный результат, иногда это необходимо. Перетасовка (shuffle) — это ещё и дорогостоящая операция, ведь требуется копировать данные по исполнителям, а также перетасовка включает: операции ввода-вывода, сериализацию и десериализацию данных, перемещение по сети. Например, прежде чем вычислять средний чек по магазинам за месяц, нужно все данные за этот один месяц переместить на один узел, а уже потом агрегировать. И так по каждому месяцу данные физически перераскладываются по разным серверам. Контролировать распределение данных можно через partitionBy. Широкие трансформации могут требовать передать данные по сети, узкие выполняются на одном узле над одной партицией. Зачастую оптимизация заключается в уменьшении числа широких операций до теоретического минимума. Широкие: . Intersection . Distinct . GroupByKey . ReduceByKey . Join . Repartition Узкие: . Map . Filter . Union . Sample . Coalesce (в случае уменьшения числа партиций, например до 1 партиции на ноду)
29
Как строится план запроса в Spark? Из чего он состоит? Как работает Catalyst Optimizer? Как выбирается оптимальный план? Теги: #ПетровичТех, #rubbles
* Логический план: анализ синтаксиса, формирование дерева операций * Optimized logical plan: Catalyst применяет правила оптимизации (pushdown, переупорядочение join и др.) * Физический план: выбор конкретного алгоритма (SortMergeJoin, BroadcastHashJoin и т.п.) * Catalyst выбирает план с наименьшей стоимостью согласно эвристикам и статистике.
30
Что такое pushdown-фильтр, как проверить, применился ли он? Теги: #wildberries
* Pushdown-фильтр – это механизм, когда Spark передаёт условия фильтрации на уровень источника данных (например, СУБД или файл-формат Parquet), чтобы: * Ограничить объём данных, которые нужно прочитать * Уменьшить сетевой и I/O трафик * Проверить: 1. Посмотреть физический план через df.explain(true) или spark.sql.explain(). o Если pushdown применяется, там будет указано, что фильтр был «протолкнут» (pushed down) к источнику. 2. Логи Spark/источника данных: можно увидеть, что выбираются уже отфильтрованные данные (не полный скан). 3. В некоторых коннекторах (например, JDBC) Spark строит SQL-запрос с условиями WHERE, если pushdown активирован.
31
Как исправить ошибку OOM (Out of memory)? Теги: #Ярослав
Для исправления ошибки OOM (Out of Memory) в Spark нужно оптимизировать использование памяти и вычислительных ресурсов. Основные подходы: 1. Настройка памяти Увеличьте память исполнителя с помощью spark.executor.memory (например, spark.executor.memory=4g). Это даст больше памяти для обработки данных. Настройте память драйвера: spark.driver.memory (например, spark.driver.memory=2g), если OOM возникает на уровне драйвера. Управляйте памятью для шаффлинга: spark.memory.fraction определяет долю памяти для хранения данных и вычислений. Увеличьте spark.memory.storageFraction, чтобы больше памяти выделить для промежуточных данных. 2. Оптимизация разделов (partitions) Увеличьте количество разделов при загрузке больших данных или выполнении тяжелых операций, например, data.repartition(200), чтобы уменьшить объем данных в каждом разделе и снизить нагрузку на память. Используйте coalesce для уменьшения числа разделов после фильтрации данных, что помогает избежать лишнего шаффлинга. 3. Уменьшение объема данных Фильтрация на ранних этапах: фильтруйте ненужные данные, чтобы уменьшить объем входных данных. Проекционные операции: используйте только нужные колонки, избегая работы с большими и ненужными полями. Настройка форматов хранения: выбирайте колоночные форматы данных (например, Parquet), которые потребляют меньше памяти. 4. Оптимизация шаффлинга Увеличьте число разделов для шаффлинга с помощью spark.sql.shuffle.partitions. Если возможно, используйте broadcast join для небольших таблиц в джоинах, чтобы избежать тяжелого шаффлинга. 5. Управление ресурсами кластера Увеличьте количество исполнителей и потоков (spark.executor.instances, spark.executor.cores), чтобы распределить нагрузку между узлами. Сжимайте данные в памяти с помощью spark.memory.compress=true и spark.rdd.compress=true.
32
Что такое persist в Spark и какие storage levels существуют? #тг
persist() позволяет дополнительно сообщить параметр storage level (MEMORY_ONLY, MEMORY_AND_DISK, MEMORY_ONLY_SER, MEMORY_AND_DISK_SER, DISK_ONLY)
33
Назовите наименьшую исполнительную единицу в Spark #тг
Задача
34
За что отвечает следующий код: my_rdd = sc.textFile(‘my_rdd_file.csv’) my_rdd = my_rdd.coalesce(5) #тг
За создание набора RDD и объединение его разделов
35
В чем разница между cache() и persist() в спарке? #тг
Разница между cache() и persist() в том, что последний может принимать необязательный аргумент storageLevel, с помощью можно указать, где именно данные будут сохраняться. По умолчанию значение storageLevel для обеих функций — MEMORY_AND_DISK, т.е. данные будут храниться в памяти, если там есть для них есть место. Иначе данные будут сохранены на диске
36
Какой класс отвечает за создание Spark-сессии на рабочих узлах в кластере? #тг
SparkContext
37
Разница flatMap и map Spark #тг
Функция map в Spark - это функция преобразования, которая применяет заданную функцию к каждому элементу RDD (Resilient Distributed Dataset) и возвращает новый RDD. Функция принимает входной элемент и возвращает один выходной элемент. Функция flatMap в Spark также является функцией преобразования, которая применяет заданную функцию к каждому элементу RDD и возвращает новый RDD. Однако разница между map и flatMap заключается в том, что функция, применяемая flatMap, возвращает последовательность выходных элементов, а не один выходной элемент.
38
Когда вам понадобится кэшировать DataFrame в Apache Spark?#тг
Наиболее распространенными вариантами использования кэширования являются сценарии, в которых вам потребуется повторно обращаться к большому набору данных для выполнения запросов или преобразований. Некоторые примеры включают в себя: * DataFrame, обычно используемые при итеративном машинном обучении. * DataFrame, к которым обычно обращаются для выполнения частых преобразований во время ETL или построения конвейеров передачи данных. Однако не все варианты использования требуют кэширования. В некоторых сценариях кэширование фреймов данных может не потребоваться.: * DataFrame слишком велики, чтобы поместиться в памяти. * Недорогое преобразование фрейма данных, не требующее частого использования, независимо от его размера. Как правило, вы должны разумно использовать кэширование памяти, поскольку это может привести к затратам ресурсов на сериализацию и десериализацию, в зависимости от используемого уровня хранения.
39
Какие факторы влияют на join в apache spark? #тг
1) размер датасета 2) условие соединения: =, >, < и тд 3) тип соединения: inner/cross/left
40
Сколько гигабайт памяти выделяется на каждую задачу в Spark? #HalltapeRoadmapDE
В Spark нет жёстко зашитого фиксированного размера памяти на каждую задачу (task). Вместо этого память конфигурируется на уровне экзекьюторов (executors), а уже внутри одного экзекьютора одновременно могут выполняться несколько задач — каждая задача использует часть общего пула памяти. Как это устроено? Executor Memory Основной параметр — spark.executor.memory: сколько всего памяти (heap) будет доступно одному экзекьютору. Например, 8 GB. Плюс есть Memory Overhead (например, spark.executor.memoryOverhead), которую Spark резервирует под системные нужды (JVM overhead, native buffers для shuffle, PySpark worker и т.д.). Количество CPU (cores) у экзекьютора Параметр spark.executor.cores определяет, сколько задач может выполняться параллельно на одном экзекьюторе. Если указано cores=4, значит на одном экзекьюторе одновременно могут бежать до 4 задач (каждой нужен хотя бы 1 CPU core). Соответственно, если есть 8 GB памяти и 4 cores, условно (но не строго!) можно прикинуть, что в среднем на каждую задачу придётся ~2 GB (не учитывая overhead). Пулы памяти внутри JVM В Spark 1.x / 2.x была модель Execution / Storage Memory (т.е. часть памяти для shuffle, часть для кэша и т.д.). В Spark 2.x+ есть параметры вроде spark.memory.fraction, spark.memory.storageFraction и т.д., которые управляют тем, какая доля от всей памяти экзекьютора может пойти под вычисления (execution) и под кэш (storage). На практике это значит, что все задачи экзекьютора делят общий Memory pool. Если одна задача на shuffle, sort или операцию join начнёт съедать слишком много памяти, другие могут получить меньше.
41
Что такое Adaptive query execution? #HalltapeRoadmapDE
Adaptive Query Execution (AQE) — это механизм, появившийся (по умолчанию) начиная с Spark 3.0 (частично был доступен экспериментально и в Spark 2.4), который позволяет динамически оптимизировать физический план выполнения во время работы job. Зачем это нужно? Не всегда точны планы, построенные на основе статистики до старта job: Данные могут быть распределены неравномерно (data skew). Статистика таблиц может быть устаревшей или неполной. При shuffle могут возникать очень «тяжёлые» партиции. AQE даёт Spark возможность «подглядывать» в реальные данные и корректировать план на лету, например: Combine shuffle partitions: если Spark видит, что после shuffle многие партиции мелкие, он может объединить их в меньшее количество более крупных партиций (уменьшая overhead). Split skewed partitions: если обнаружились одна-две «перегруженные» партиции (skew), Spark может автоматически разбить их на несколько более мелких, чтобы параллелить обработку. Динамически менять тип join’а (например, заменять sort-merge join на broadcast join, если одна таблица оказалась достаточно маленькой). Пример включения AQE spark.conf.set("spark.sql.adaptive.enabled", "true") spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true") spark.conf.set("spark.sql.adaptive.skewJoin.enabled", "true") ... Основная выгода Стабильность выполнения больших job, которые сталкиваются с непредсказуемым распределением данных. Улучшение производительности за счёт сокращения shuffle и более «умных» join’ов. Итого, AQE — это механизм «Adaptive Execution», который после начального планирования Spark’ом может «переигрывать» стратегии (shuffle, join, разбиение партиций) на основании фактических метаданных, полученных во время выполнения, что помогает лучше бороться с «skew» и неравномерными нагрузками.
42
Механизмы broadcast join и когда их использовать ? теги #x5
43
Различия driver vs executor теги #x5
44
Различия между take() и collect() теги #x5
45
Как работает dynamic allocation теги #x5
46
Сколько партиций при чтении ? #x5
По дефолту 200, но зависит от shuffle.partitions
47
конфигурация приложений (спрашивали конкретные опции, например spark.driver.memory,
48
запуск на YARN vs Kubernetes