Data Intesive Ch11 - Stream Processing Flashcards
Context Map
Stream Analytics CEP Dataflow Storm, Spark, Flink ETL Event Sourcing Kafka, Kinesis Message Queues Materialized View Maintenance Change Data Capture
Summary
Streaming processing is done CONTINOUSLY on UNBOUNDED, never ending streams rather than fixed-size input.
Message brokers and event logs are equivalent of a filesystem in batch processing
Message brokers:
-> AMQP/JMS style
Broker assigns individual messages to consumers
Consumers must ACK each message successfully processed
After ACK is received message is deleted from the broker
Appropriate for async RPC like a task queue;
- order of message processing does not matter
- messages are not re-read once processed
-> Log-based message broker (Kinesis, Kafka)
Broker assigns all messages in a partition to the same consumer node
Messages are always delivered in the same order
Parallelism is achieved through partitioning
Consumers track progress by checkpointing the offset of last processed message
Broker retains messages on disk - can re-read
Streams are natural-born for: user activity events, sensors, data feeds
Writes to DB can be thought of as a Stream - changelog via Change Data Capture (implicit) or Event Sourcing (explicit)
Log compaction to retain full copy of DB in a stream (discard old messages etc)
Time in stream processor - processing time & event timiestamps
Straggler events arriving after window was complete
Joins in stream processes:
-> Stream-stream
Both input streams consist of activity events, join operator searches for related events that occur within some WINDOW of time
Match 2 actions taken by the same user within 30 minutes of each other
Can be self-join stream
-> Stream-table
One input stream is activity events and other is DB change log
Changelog keeps a localcopy of DB up to date
Each activity event join operator queries DB and outputs enriched activity event
-> Tble-table
Both input streams are DB changelogs
Every change one side is joined with latest state of the other side
Resulting stream is the MATERIALIZED VIEW of the join between 2 tables
Fault tolerance & exactly once SEMANTICS in stream processor
Partial output of any failed task must be discarded but since it’s long running, continuously produced output cannot simply discard all output
Checkpointing, microbatching, transactions, idempotent writes must be used as recovery mechanis
What is “stream”?
Data INCREMENTALLY made available OVER TIME
Batch vs Stream?
Stream data has not fixed size, cannot be divided (UNBOUNDED)
Event is processed continuously, almost as it happens
BATCH/STREAM equivalents record/event job/consumer or subcriber or recipient filename as set of related records/events grouped as topic or stream writing file/producing event
Stream Event
Basic unit of processing, record
Small self contained immutable object containing the details of sth that happened at some point in time
Timestamp is usually time-of-day- clock timestamp
Could be text string, JSON
Why consumer notifications are necessary for stream processing instead of stream processor just polling for new data?
Continual processing requires low delays
Polling database to check for new events becomes slow if datastore is not designed for it
Frequent polling = more frequently empty, wasteful queries (no new records returned)
Messaging System
Producer sends message with an event there
System pushes the event to consumers
Primitive one example - Unix Pipe between producer and consumer
Limited to only exactly one sender coupled with exactly one recipient
What if producer sends messages faster than consumer processes them?
- Drop messages
Depends on application if this is acceptable. IoT sensors readers may tolerate missing data point or two as the next data point should arrive briefly anyway. Dropping large number of message though… - Buffer messages in a queue
when queue size grows -> system can go out of memory.
should messages be written to disk? This comes at the cost of IO access… - Apply backpressure/flow control
Backpressure
Blocking producer from sending more messages
Unix pipes or TCP use by providing small fixed-size buffer
If it fills up then the sender is blocked until the recipient takes data out of the buffer
What if nodes crash or go offline for a while?
Durability - write to disk or replicate - all comes at cost
If losing message is possible - some latency or higher throughput is achievable
Direct messaging producer-consumer
No intermediay nodes
Examples:
-> UDP multicast
Financial industry - stock market feeds -> low latency is important
UDP is unreliable -> app level protocols used to recover lost packets. This implies producer must remember packets is thas sent to be able to retransmit on demand
-> Brokerless messaging libraries
ZeroMQ, nanomsg
TCP/IP multicast
-> StatsD, Brubeck
UDP messaging for collecting metrics from machines on the network
Monitoring
Metrics are not guaranteed to be accurate but APPROXIMATE
-> REST & RPC dataflow through services
Direct request/response
Webhooks with a callback URL
Assumptions:
Producer and consumer are online constantly
App code needs to be aware of possible message loss
Retries not fault tolerant - if producer crashes then message buffer is lost, oh well
Message brokers
AKA message queue kind of database dedicated for handling message streams producers and consumers connect to its server producer write messages to the broker consumers receive them from the broker
centralizaing
- > better tolerance of clients disconnecting (due to crash fex)
- > durability responsibility moved to broker
Some are in-memory only, some write to disk
As consequence of queueing capability consumers are usually async -> sent message is confirmed by broker but not yet delivered
Message broker vs Database
Data retention - MB usually AUTOMATICALLY CLEANS MESSAGES older than retention period or those already delivered
Usually brokers assume their data set is small - QUEUES are SHORT
DBs provide secondary indexes for searching the data, message broker can support SUBSCRIPTION to TOPICS MATCHING some PATTERN
Querying DB usually is based on point-in-time snapshot of the data; Message brokers do not support arbitrary queries but they notify consumers when DATA CHANGES
Multiple consumers patterns
Load balancing
Message is delivered to ONE of the consumers
Processing is shared among consumers of the topic
Broker assings messages arbitrarly to consumers
Good for messages expensive to process - adding consumers speeds up the processing (more parallelism)
Fan out
Each message is delivered to ALL of the consumers
Independent consumers handle given message independently
Streaming equivalent of multiple batch jobs reading the same input file
Combined
Consumers group - all groups get all messages but single consumer in the group receives each message
How does brokers ensure messages are not lost if consumer crashes?
ACK and redelivery
Consumer explicitly notifies broker when it finished processing a message -> broker can removed the message from the queue
If broker does not receives ACK it assumes it was not processed and redelivers it to another consumer
ACK could actually be lost and message was processed - atomic commit protocol required so consumers save messages that were processed on their side
What is an issue with load balancing + redelivery behavior?
Messages can be processed out completely out of order
Solution - not use load balancing and have separate queues for each consumer
Log-based message brokers
Log - append only sequence of records on disk
Hybrid approach combining
- durable storage of DB
- low-latency notification facilities of messaging
Receiving message is not destructive
Producers append to the log by sending message to the broker
Consumers read the log sequentially; when end of the log is reached consumer waits for a notification that a new message has been added
Partitioning can be used to scale for higher throughput
Each partition can be read/written independently
Topic is then grouping of all partitions carrying messages of the same type
Each partition is assigned MONITONICALLY increasing sequence number/offset (partition is append only and messages within it are TOTALLY ORDERED)
Ordering is guaranteed per partition only (no cross partition guarantees)