Data Intesive Ch11 - Stream Processing Flashcards

1
Q

Context Map

A
Stream Analytics
CEP
Dataflow
Storm, Spark, Flink
ETL
Event Sourcing
Kafka, Kinesis
Message Queues
Materialized View Maintenance
Change Data Capture
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Summary

A

Streaming processing is done CONTINOUSLY on UNBOUNDED, never ending streams rather than fixed-size input.

Message brokers and event logs are equivalent of a filesystem in batch processing

Message brokers:
-> AMQP/JMS style
Broker assigns individual messages to consumers
Consumers must ACK each message successfully processed
After ACK is received message is deleted from the broker
Appropriate for async RPC like a task queue;
- order of message processing does not matter
- messages are not re-read once processed
-> Log-based message broker (Kinesis, Kafka)
Broker assigns all messages in a partition to the same consumer node
Messages are always delivered in the same order
Parallelism is achieved through partitioning
Consumers track progress by checkpointing the offset of last processed message
Broker retains messages on disk - can re-read

Streams are natural-born for: user activity events, sensors, data feeds

Writes to DB can be thought of as a Stream - changelog via Change Data Capture (implicit) or Event Sourcing (explicit)

Log compaction to retain full copy of DB in a stream (discard old messages etc)

Time in stream processor - processing time & event timiestamps
Straggler events arriving after window was complete

Joins in stream processes:
-> Stream-stream
Both input streams consist of activity events, join operator searches for related events that occur within some WINDOW of time
Match 2 actions taken by the same user within 30 minutes of each other
Can be self-join stream
-> Stream-table
One input stream is activity events and other is DB change log
Changelog keeps a localcopy of DB up to date
Each activity event join operator queries DB and outputs enriched activity event
-> Tble-table
Both input streams are DB changelogs
Every change one side is joined with latest state of the other side
Resulting stream is the MATERIALIZED VIEW of the join between 2 tables

Fault tolerance & exactly once SEMANTICS in stream processor
Partial output of any failed task must be discarded but since it’s long running, continuously produced output cannot simply discard all output
Checkpointing, microbatching, transactions, idempotent writes must be used as recovery mechanis

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

What is “stream”?

A

Data INCREMENTALLY made available OVER TIME

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Batch vs Stream?

A

Stream data has not fixed size, cannot be divided (UNBOUNDED)
Event is processed continuously, almost as it happens

BATCH/STREAM equivalents
record/event
job/consumer or subcriber or recipient
filename as set of related records/events grouped as topic or stream
writing file/producing event
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Stream Event

A

Basic unit of processing, record
Small self contained immutable object containing the details of sth that happened at some point in time
Timestamp is usually time-of-day- clock timestamp
Could be text string, JSON

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Why consumer notifications are necessary for stream processing instead of stream processor just polling for new data?

A

Continual processing requires low delays
Polling database to check for new events becomes slow if datastore is not designed for it
Frequent polling = more frequently empty, wasteful queries (no new records returned)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Messaging System

A

Producer sends message with an event there
System pushes the event to consumers

Primitive one example - Unix Pipe between producer and consumer
Limited to only exactly one sender coupled with exactly one recipient

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

What if producer sends messages faster than consumer processes them?

A
  1. Drop messages
    Depends on application if this is acceptable. IoT sensors readers may tolerate missing data point or two as the next data point should arrive briefly anyway. Dropping large number of message though…
  2. Buffer messages in a queue
    when queue size grows -> system can go out of memory.
    should messages be written to disk? This comes at the cost of IO access…
  3. Apply backpressure/flow control
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Backpressure

A

Blocking producer from sending more messages
Unix pipes or TCP use by providing small fixed-size buffer
If it fills up then the sender is blocked until the recipient takes data out of the buffer

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

What if nodes crash or go offline for a while?

A

Durability - write to disk or replicate - all comes at cost

If losing message is possible - some latency or higher throughput is achievable

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Direct messaging producer-consumer

A

No intermediay nodes
Examples:
-> UDP multicast
Financial industry - stock market feeds -> low latency is important
UDP is unreliable -> app level protocols used to recover lost packets. This implies producer must remember packets is thas sent to be able to retransmit on demand
-> Brokerless messaging libraries
ZeroMQ, nanomsg
TCP/IP multicast
-> StatsD, Brubeck
UDP messaging for collecting metrics from machines on the network
Monitoring
Metrics are not guaranteed to be accurate but APPROXIMATE
-> REST & RPC dataflow through services
Direct request/response
Webhooks with a callback URL

Assumptions:
Producer and consumer are online constantly
App code needs to be aware of possible message loss
Retries not fault tolerant - if producer crashes then message buffer is lost, oh well

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Message brokers

A
AKA message queue
kind of database 
dedicated for handling message streams
producers and consumers connect to its server
producer write messages to the broker
consumers receive them from the broker

centralizaing

  • > better tolerance of clients disconnecting (due to crash fex)
  • > durability responsibility moved to broker

Some are in-memory only, some write to disk

As consequence of queueing capability consumers are usually async -> sent message is confirmed by broker but not yet delivered

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Message broker vs Database

A

Data retention - MB usually AUTOMATICALLY CLEANS MESSAGES older than retention period or those already delivered

Usually brokers assume their data set is small - QUEUES are SHORT

DBs provide secondary indexes for searching the data, message broker can support SUBSCRIPTION to TOPICS MATCHING some PATTERN

Querying DB usually is based on point-in-time snapshot of the data; Message brokers do not support arbitrary queries but they notify consumers when DATA CHANGES

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Multiple consumers patterns

A

Load balancing
Message is delivered to ONE of the consumers
Processing is shared among consumers of the topic
Broker assings messages arbitrarly to consumers
Good for messages expensive to process - adding consumers speeds up the processing (more parallelism)

Fan out
Each message is delivered to ALL of the consumers
Independent consumers handle given message independently
Streaming equivalent of multiple batch jobs reading the same input file

Combined
Consumers group - all groups get all messages but single consumer in the group receives each message

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

How does brokers ensure messages are not lost if consumer crashes?

A

ACK and redelivery
Consumer explicitly notifies broker when it finished processing a message -> broker can removed the message from the queue

If broker does not receives ACK it assumes it was not processed and redelivers it to another consumer
ACK could actually be lost and message was processed - atomic commit protocol required so consumers save messages that were processed on their side

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

What is an issue with load balancing + redelivery behavior?

A

Messages can be processed out completely out of order

Solution - not use load balancing and have separate queues for each consumer

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
17
Q

Log-based message brokers

A

Log - append only sequence of records on disk

Hybrid approach combining
- durable storage of DB
- low-latency notification facilities of messaging
Receiving message is not destructive

Producers append to the log by sending message to the broker
Consumers read the log sequentially; when end of the log is reached consumer waits for a notification that a new message has been added

Partitioning can be used to scale for higher throughput
Each partition can be read/written independently
Topic is then grouping of all partitions carrying messages of the same type

Each partition is assigned MONITONICALLY increasing sequence number/offset (partition is append only and messages within it are TOTALLY ORDERED)
Ordering is guaranteed per partition only (no cross partition guarantees)

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
18
Q

Consumers patterns in partitioned log streams

A

Fan out - out of the box -> each consumer can read the log independently

Load balancing - each consumer reads separate partition

19
Q

How is consumer failure handled by log based brokers?

A

Consumer must periodically save the offset that was processed
If consumer fails and broker notices that different node is assign given topic/partition. It would resume from the last-saved offset.
Some messages may have been processed already - this is something consumers need to be deal with themselves.

20
Q

What if consumer cannot keep up with producers

A

Generally - disk space is limited but it’s hard for producers to fill the entire large hard disk with mesages.
Log based brokers are bound-size brokers anyway (circular, ring buffers) - when it gets full old messages are discarded.
Hence super slow consumer may miss some messages.
Consumer should be monitored how far behind the head of the log it is.
RAISE ALERT if it falls back significantly
Human operator needs to fix the slow consumer or let it catch up before messages are missed.

Good thing is only the slow consumer is affected. Other have their own offsets.
This is unlike non-log based message brokers. If queue for inactive consumer is not deleted it will grow in size and affect other consumers.

21
Q

What is dual write? What is it used for? What is wrong with it?

A

Apps sometimes need to keep 2 systems in sync like OLTP DB and search index. Doing periodic data dumps ETL-style may not be fast/often enough for business.
Dual write is an alternative approach when application code explicitly writes data to both data storages (concurrently or not).

Problems with that - race conditions. Two separate requests update database (and it’s fine thanks to its ACID properties) but we have not guarantees search index will end up in correct state (as it’s outside of trx).
Fig 11-4 p453
Data Lake search soooo much.

Another issue - fault tolerance. One of the two indexing requests may fail and the other succeed. System ends up in inconsistent state again. OLTP DB sees value A and search index sees value B.
Basically - distributed transaction case here. Trx to OLTP DB and reindexing search -> 2PC needed. It’s expensive and not flawless.

Remedy - OLTP DB is single source of truth leader and search index is a follower -> Change Data Capture.

22
Q

Change Data Capture

A

For a long time DBs didn’t expose any documented way of getting their replication/change log. This made it hard to replicate the data to another storage like search index.
Recently there is growing interest in that called Change Data Capture.
It’s useful for observing all data changes written to DB and extracting them so data can be replicated to external system.

In essence - one database is a leader from which changes are captued as a stream. This stream goes to log based message broker. Derived system DBs are consumers reading leader-db changes in order.

Naive implementation - database triggers observing all changes in data tables and pushing entries to changelog table -> fragile + performance overhead.
Other idea - parsing replication log but it’s tricky to manage schema changes.

CDC is usually async - leader DB does not wait for change to be read by consumers.

Examples
LinkedIn Databus, Facebook Wormhole, Yahoo Sherpa
Bottled Water - PGSQL CDC (its API decodes write-ahead log)
Kafka Connect - framework offering CDC connectors for different DBs

23
Q

CDC Initial Snapshot

A

Keeping all changes ever made in db - too much data
-> replaying all of this would eventually take too long and log would be truncated
Building new index usually requires a full copy of entire DB - CONSISTENT SNAPSHOT
Snapshot must correspond to a know position/offset in CHANGE LOG
Some tools support that some require manual work…

24
Q

Log Compaction

A

Storage engine PERIODICALLY goes through log records for the same key and removes older duplicates
The goal is to keep the most recent update for each key only
Compaction & Merging process is done in background
Deletes are indicated by null-value tombstone
This allows keeping the whole db as stream of changes as eventually only latest value will be retained in the whole log. so contents of logs depend only on up-to-date contents on DB and not all the writes.
Effectively it allows CDC log-stream to be used in place of the initial snapshot.

Kafka supports it

25
Q

Event Sourcing

A

DDD idea between which there’re parallels with CDC concept.
Similarly to CDC - ES stores all changes to app state as log of change events. Biggest different is the ideas apply at different levels of abstraction

CDC - app uses DB in a mutable way (create, update, delete records). Log of changes is extracted from a low level representation like replication log. This ensures the order of writes is in sync. App using DB can be unaware of CDC.
ES - app logic is based on immutable events written to event log. Event store is append only (no updates and deletes - discouraged). Events are designed to reflect things that happened at APP level rather than low-level state changes.

ES makes app easier
- to evolve over time
- debug (seeing all the events helps understanding wth happened)
- guards against bugs - no mutable state.
Storing an event “cancelling subscription” is more clear than following the side effects like “subscription entry was deleted from the table and notification was sent to SNS”.
- capability to chain new side effects off existing events
Like send an email

26
Q

Mutable state & Immutable Streams of Change

A

Mutable state of DB and append-only log of immutable events are two-sides of the same coin
Each state is always a result of events that caused changes

Storing changelog durably allows making the state reproducible.
It’s easier to reason about flow of data if:
- current state
- changelog of events
are considered

27
Q

Advantages of immutable events log and having it separated from mutable state

A

Idea: accountants and financial bookkeeping
Append-only ledger describing each transaction
If mistake is made past events are not erased/changed
Instead compensating transaction is issued

Immutable events give
-> AUDITABILITY capability
Which is useful not only when it’s strictly required by regulations
It’s useful for diagnosing and recovering from problems in data in running system
-> USER ACTIVITY tracking
we can not only track items in the basket that was ordered but any removes
-> Deriving different read views from the same log of events at any time
This allows separating how data is written from how data is read (just derive a new view from events) - no need to design perfect schema
-> Concurrency control
Deriving current state from event logs simplifies it.
Instead of having user action changing data in several places instead it just is self-contained description of an action to happen. Saving this atomically is trivial.
Then, if data is partitioned in the same manner in both app state and event log, single-threaded consumer can read the msg to update app state.
Events in partition are guaranteed to be processed serially, in order.

28
Q

What’s the fallacy behind traditional approach to DBs and schema design?

A

Data must be written in the same form as it will be queried
CQRS says hello
Normalization/Denormalization becomes less of a thing if we can always translate write-optimized event log into any read-optimized view

29
Q

Downsides of event sourcing, CDC approach

A

Event log consumers are usually async
Having async-updated read-view makes things read-your-own-writes-problem prone

Making read-view update synchronous is tricky but possible with either:

  • having event-log and read-view in the same storage system
  • using distributed trx
  • using linearizable-storage-with-total-order-broadcast

Keeping entire immutable history of all changes forver may be infeasible.
If data changes rarely it’s fine.
Otherwise good log compaction and GC is mandatory

Law and regulations like GDPR - data may be required to be deleted from the system.
Rewriting history is required then.
Excision in datomic, Fossil - shunning
(true deletion of data is hard due to how file. systems, disks work)

30
Q

Ways of processing streams

A
  1. Write derived view - search index, DB, cache etc
  2. Push events to users in form of push-notifications, emails, live-dashboard
    monitoring like fraud detection systems, military tracking systems
  3. Produce a new stream which may join several
31
Q

Uses of stream processing

A

CEP - Complex event processing
Approach got analyzing streams
Applications requiring seraching for certain event patterns
Consumer of input stream + internal state machine matching for given patterns

Stream analytics
Similar to CEP but instead of finding specific event sequences -> find aggregations and metrics over large num of events
Rolling averages etc
Usually computed over fixed time intervals called window

Maintaining materialized views

Pattern matching individual events
Saved Searches with alerts (otomoto cars)

Message passing and RPC
p468 ?

32
Q

Time issues in stream processing

A

For windowing processors often rely on their local clock (for knowing when window is ready).
This usually works fine unless there is a delay between event generation and processing.
And there could be a lot of reasons for that:
- queueing (processor does not keep up with data consumption)
- network faults
- message broker is overloaded
- events are delayed (offline enabled, untrusted clients like mobile devices)
- stream consumer is restarted or taken down (bug fixing)
- old data is REPLAYED
Basically event time vs processing time confusion

Star wars issue - events reach broker out of order (like order of star wars movies 4-6, 1-3, 7-9 lol). Also related to msg delays.
System A and B participate in business process
Both submit events to consumer C. A handles request first then calls B.
A sends event but it gets stuck in network.
B sends event and it reaches broker.
C sees B first and A second.

Spikes of messages processed after restart - monitors for traffic spikes could trigger if processor time is only taken into account when in fact there was no unusual request rate.

Knowing when time window is complete - straggler events

Issue pertains to batch processing too but is less severe and noticeable.
Batch processing which works on historical data (known scope) and uses record timestamps
Stream processing sees the data as it comes

33
Q

Straggler event problem

A

When event arrives late after its window has been declared finished
Dealing with stragglers:
- ignore (usually small percentage of all events, low impact). Drop rate can be monitored for safety
- publish a correction (updated value with straggler included). Previous output may need to be retracted
- use special messages for finishing windows on consumer side (from now on no more messages earlier than timestamp t). Downside - each producer consumer must be tracked individually, adding new ones is harder

34
Q

How to deal with untrusted devices reporting their time?

A

Events can be buffered for example at offline client mobile device. Users can deliberately change time on their devices.

Adjustion by taking 3 timestamps
- time of an event (device clock) t1
- time at which event was sent to the server (device clock) t2
- time at which event was received by the server (server clock) t3
t3 - t2 estimates offset between device and server
add offset to t1 to estimate true time

35
Q

Types of windows

A

-> Tumbling
Fixed length
Event belongs to exactly one window
<1:31-1:32), <1:32-1:33)

-> Hopping 
May have fixed length
Windows can overlap (for smoothing etc)
Tumbling over short interval aggregated over hopping interval
1:30-1:35, 1:31-1:36, 1:32-1:37

-> Sliding
All events being apart

-> Session
No fixed duration
Grouping of events relevant to the same user
Close in time
Assuming session expires 30 minutes after inactivity
All events before belong to the same window

36
Q

Stream-stream (window) join

A

Example:
detect recent trends in searched-for URLs
Each query log an event containing query + result URLs
Each click on one of resulting URLs log another event
Compute click-through rate - join searches and clicked URLs by session id.
Users may abandon search for unspecified duration -> time limit window is required like at most join clicks one hour apart from search

Note from book: embedding details of search in click event is not joining the events. It does not tell anything about cases where user did not click any of the results.

Processor needs to maintain state
All events that occured in the last hour indexed by session id for example.
On new event processor checks events from both indices.
If there is search event and click event - emit search clicked event.
If search event expires - emit event no search results clicked.

37
Q

Stream-table join (enriching streams)

A

Example relevant from batch chapter: user details + user events.

Stream process looks at one activity at the time
On event looks up user ID in DB
Querying remote DB adds latency
Alternatively load copy of db into stream processor (avoids network round-trip) -> index on the local disk or in-memory hash table (Map side join like)
Problem - batch job uses a snapshot in fixed point-in-time
Stream processing is long-running
Local copy of db must be kept in sync…
Can use CDC for that
Then it becomes plain stream-stream join
So basically this is:
Load snapshot DB and then CDC stream-stream join?

A stream-table join is very similar to stream-stream join. The biggest diff is for table changelog stream, the join uses a window reaching back to the beggining of time (infinite window), with newer versions of records overwriting older ones.
For the stream input the join might not maintain a window at all.

38
Q

Table-table join (materialized view maintenance)

A

Twitter example
Viewing tweets from user timeline - too expensive to grab recent tweets from all followed people and merge them
Instead - timeline cache per user.
Tweets are written as sent to follower’s “inbox” cache
On delete - removes tweet from all timeline cache
When user follows new user u2 then u2 recent tweets must be added to the timeline (and all tweets removed on unfollow)
Streams of tweets (add/delete) and following relationships (follow/unfollow)
Stream processor maintains DB with set of followers per user (must know which timelines need to be updated on new tweet)

39
Q

Time-dependence of joins

A

All types of joins require processor to maintain some state
Based on one join input the state is queried when other stream message arrives

If events on different streams happen close in time which order they are processed?
When user can update their profile which events will be joined with old and which with new profile data?
When state changes over time which point in time is used for the join?
If order is undetermined then join is non-deterministic

Date warehouse problem of slowly changing dimension SCD
Use unique id of particular version of joined record
Downside - log compaction is not possible, all versions of records must be retained

40
Q

Stream vs batch processing fault tolerance

A

BP - input is ummutable -> transparent retry is possible
if task of the job fails just rerun it and discard failed-output
Output is made visible in HDFS only after job completes so all is good
Output is the same as if nothing had gone wrong ever
It appears every record was processed EXACTLY ONCE
none skipped, non processed more than 1 time

EXACTLY ONCE (effectively once) semantics

In SP it’s less straightforward.
Waiting for the task to finish to make output visible - not an option as stream is INFINITE
Task is never finished

Solutions:

  • > Microbatching + checkpointing
  • > Atomic commit
41
Q

Microbatching and checkpointing fault tolerance approach

A

Split stream into small blocks each being mini-batch-job
Spark Streaming
Batch size usually one second of data (smaller batches incur scheduling + coordination penalties, larger mean longer delay before output is visible)
Tumbling window out of box

Apache Flink - generate periodic rolling checkpoints of state and write them to durable storage.
On crash - restart from last checkpoint and discard any output generated between chkpt and crash
Checkpoints triggered by barriers in the message stream (similar to boundaries between microbatches but without any particular window size enforcement)

Within stream processing framework microbatching/checkpointing approach provides same exactly-once semantics as batch processing
As soon as output leaves the processor (side effects like sending email or writing to db) framework cannot discard any output of failed batch or side effect happens twice or more.

42
Q

Atomic commit for exactly once semantics

Idempotence

A

To preserve the illusion side effects cannot take effect unless processing is successful.
All happen atomically or none
p477, 478 dunno wtf re-read this shit
Basically distributed trx in restricted env (making efficient distro trx possible)

Idempotence:
Goal - discard partial output of failed task so we can retry
Each consumed message has an offset
Provide it when writing value to external db so it can check if update has already happened
Assumes messages are replayed in the same order when restarted a task (log based message broker guarantees that within a partition); processing is deterministic; no other node can update the value concurrently.
Fencing tokens may be required for fail overs (alive node that is thought to be dead but is alive)

43
Q

Rebuilding state after failure as fault tolerance

A

if stream processor requires state (windowed aggreagtions) then any tables and indexes used for joins must be recoverable after a failure
Keeping the state in remote DB and replicating it is one option
Querying remote db for each message is slow

Keeping state locally and replicating it periodically is the way
Task retry can read the replicated state and resume processing without data loss
Example: Flink capturing snapshots of operator state and writing them to durable storage fex HDFS p479

Sometimes replicating state is not needed if state can be rebuilt from input streams.
If aggregation window is short then just replay messages from the window