Data Intensive Ch10 - Batch processing Flashcards
Context map
Derived data
Distributed filesystems
MapReduce workflows
Search Indexes
Recommendation engines
Machine Learning
Summary
Chapter looked briefly at Unix tools such as awk, grep, sort as an example of design philosophy which transformed into MapReduce and later dataflow engines.
Key ideas are:
- inputs are immutable
- outputs are intended as input to another (unspecified yet) program - Unix pipelines hello
- complex problems are broken down into smaller pieces that can be solved using tools doing one thing well
Uniform interface
Unix: files and pipes
MapReduces: HDFS
Two main problems of distributed batch processing frameworks:
-> Partitioning
MapReduce mappers are partitioned according to input file blocks
Output of mappers is repartitioned, sorted and merged into a configurable number of REDUCER partitions
Purpose: bring all the related data (records grouped by the same key) together
Later Dataflow engines avoid sorting unless required but works in similar manner
-> Fault tolerance
MapReduce frequently writes to disk. this makes it easier to recover from individual failed tasks. Full restart is not required thanks to that.
Dataflow engines perform less materialization of intermediate state. They keep more in memory. If node fails they need to recompute more. Deterministic operators reduce the amount of data needed to be recomputed.
MapReduce join algorithms (also used in MPP DBs or dataflow engines)
-> Sort-Merge join
Mapper extracts the join key. All records with the same key end up going to the same reducer call which outputs joined records.
-> Broadcast hash join
One of the two join inputs is small - not partitioned. it’s entirely loaded into a hash table. Each mapper loads the hash table and joins with input record.
-> Partitioned hash join
Prerequisite: Both inputs are partitioned in the same way. Hash table approach per partition.
Distributed batch processing engines have an intentional restriction on programming model. All callback functions (mappers, reducers) are assumed to:
- be. stateless
- have no visible side effects besides the output
This allows safe retries in the face of crashes/network issues hidden behind framework abstraction. Output of any failed task can be safely discarded.
Processing code can be oblivious to any fault tolerant mechanisms.
Distinguishing feature of batch processing - reads IMMUTABLE input of FIXED SIZE (data is from some POINT IN TIME/DB SNAPSHOT) and produces some output.
Output is DERIVED from input.
Jobs are said to have BOUNDED input which is the opposite of stream processing where input is UNBOUNDED. Jobs are never-ending streams of data.
Types of systems
-> Services - online systems Waits for Request/ returns response Latency optimized -> Batch processing (offline) Takes large, known amount of data and runs job to process it and output some result Periodically scheduled Throughput matters Runs for minutes to days -> Stream processing (near real time) Somewhere in between service and batch Consumes unbounded input Produces some output Job picks up events promptly after they happened
Unix philosophy
- Make each program do one thing well. New job -> build afresh, do not complicate old programs with new features
- Expect output of every program to become input to another unknown program. Don’t clutter output with extraneous info. void columnar/binary input formats. Don’t insist on interactive input
- Design and build software to be tried early. Remove clumsy parts, rebuild if needed.
- Use tools to lighten a programming task
Sounds like Agile & DevOps movements
Sort, uniq -> Unix shell like bash -> COMPOSE small programs into data processing jobs
Unix Uniform interface
File (descriptor) Ordered sequence of bytes Can be: - actual file in FS - communication channel to another process (socket, stdin, stdout) - device driver - socket for TCP connection
Many assume seq of bytes is ACII text; each record separated by newline
Stdin/stdout (by default - console input and terminal screen)
Pipes attach stdout of left to stdin of the right process
Stdin/Stdout can be redirected, program doesn’t worry about concrete files
Loose coupling/Inversion of Control
Immutable input files
Pipeline can be inspected at any point (insert less cmd)
Output of some pipeline stage can be written to a file which can become input for the next stage
Map reduce and Unix similarities
MR - distributed across potentially thousands of machines
MR job is like Unix process - take input, produce output
MR - job usually does not modify input; no side effects
Stdin/Stdout for MR are always files in distro FS (HDFS)
MR allows to paralelize a computation across many machines. Programmer needs not explicitly write any code for parallelization
Mapper/reducer operates on one record at a time. They don’t explicitly request any file reads from HDFS. Framework provides proper streams.
Input is typically a directory in HDFS
Each file/file block is considered a separate partition that can be processed by separate map task
Unix pipes become MR workflows
HDFS is based on the … principle
Shared nothing
No special hardware requirement - commodity hardware is used
Computers connected by a conventional datacenter network
HDFS basics
Set of daemon processes running on each machine
Each exposes network service to access files stored on that machine
Central server - NameNode - keeps track which file blocks are stored on which node
File blocks are replicated to multiple machines
Usage of commodity hardware and OS software - fairly low cost
Map Reduce Job Execution
- Read input files and break them into records (like by \n character)
- Call mapper function for each record. This extracts key and value.
- Sort all key-value pairs by key
- Call reducer function. This iterates over sorted key-value pairs. If there were multiple tuples of given key and value - reducer gets a key with a list of all associated values.
MapReduce scheduler principle
Put the computation near the data It tries to run mapper on a machine that also stores a replica of the input file (as long as it has enough RAM and CPU) Saves copying files over network Reduces network load Increases locality
How is MapReduce number of map/reduce tasks determined?
Map - number of input file blocks
Reduce - configured by a job author
How does MapReduce sort work?
Dataset is assumed to be too large to sort in-single-machine-memory
Staged sorting:
1. Each map task PARTITIONS its output by reducer
- by hash of key
2. each partition is written to mapper’s local disk. Files are sorted (SSTables, LSM Tree reference).
- so each mapper makes a local sorted file with data to be sent per each reducer
3. When mapper is done with input file and written all outputs MR scheduler notifies reducers they can start fetching the output from that mapper
4. Reducer merges individually sorted files from all mappers. Sorting order is preserved. Same keys will be adjacent in resulting file
5. Reducer can be called with a key and ITERATOR which scans over records with the same key (might not fit in memory hence iterator)
Step 2 and 3 is called shuffle
MapReduce workflows
Chained MR jobs (as single job has limited usefulness)
Output of one job is input to another - no particular support -> usually by convention -> output directory name of job 1 is configured as input directory of job 2
From MR perspective 2 jobs are independent so job 2 cannot start until job 1 output has materialized (so whole job completed successfully)
Third party tools to manage dependencies between jobs like Airflow or Luigi
Equi-joins
Basically join using “=” lol
Type of join where record is associated with another based on having identical value in some particular field like ID.
Other kinds of joins also exist (like using less-than operator)
We need joins to associate records but… why bother splitting records? Why not keep them all together? Why not denormalize data?
having separate tables - data normalization
Often we need same data in different context
If the same data is kept in copies in multiple records -> updating is difficult
Keeping all data together? Inefficient to query, what if some rows associate with the same data?
user + organisation - they change for different reasons
many users would return the same organisation
You could keep all users with organisation though
Hard to query efficiently then though
What if we group users in groups? Do we keep groups with organisations?
What if user can be in a group from an external organisation?