Week 10: ApacheSpark Streaming Flashcards

1
Q

Spark Streaming

A

It’s an extension of the core Spark ABI that allows for scalable, high-throughput, fault-tolerant stream processing of live data streams.

Properties of Spark Stream:
1. Scales to hundreds of nodes
2. Achieves second-scale latencies
3. Efficiently recovers from failures
4. Integrates with batch and interactive processing.

Input sources include, but aren’t limited to:
1. Kafka
2. Flume
3. HDFS/S3
4. Kinesis
5. Twitter

Output destinations include, but aren’t limited to:
1. HDFS
2. Databases
3. Dashboards

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
2
Q

Unified Stack

A

A system of handling various big data processing tasks using a single framework.

Pros:
1. Explore data interactively (in shell) to identify problems.
2. Use the same code that you used in shell also in Spark for processing large logs.
3. Use similar code in Spark Streaming for realtime processing.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
3
Q

Spark Streaming: Applications

A
  1. Social media sites like Twitter need to discover malicious campaigns in realtime.
  2. Datacentres receive streams of logs from many clusters, e.g. Flume. They want to detect problems.
  3. Advertisers want to understand who clicks on their pages in realtime.
How well did you know this?
1
Not at all
2
3
4
5
Perfectly
4
Q

Data Streaming: Challenges

A

Data size calls for a very large cluster environment. More cluster nodes lead to more faults are likely. More cluster nodes also mean that slow nodes are likely.

Quick recovery is important because we don’t want to lose useful data. For example, click logs for advertising and social media accounts that may be affected by malicious campaign.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
5
Q

Discretised Streams (D-Streams)

A

Run a streaming computation as a series of very small, deterministic batch jobs.

Workflow:
1. Cut the stream into batches of X seconds.
2. Treat each batch as RDD and process it using RDD operations.
3. Finally, return the processed results of the RDD operations in batches.

In order to generate a RDD in each batch intervals, the DStream keeps:
1. Dependencies, a list of dependent (parent) DStreams
2. Slide interval, the interval at which it will compute RDD’s.
3. Function, to compute RDD at a time t.

Note that flatMap can be applied to both RDD’s to make new RDD’s and to DStreams, specifically batches of data it receives over time.

Pros:
1. Low latency, batch size X as low as 0.5 seconds.
2. RDD’s can store batch data, we can combine batch processing and streaming processing.

Cons:
1. Batches of a discredited stream of smaller than 0.1 seconds can’t be created.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
6
Q

Example: Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags1 = tweets.flatMap(status => getTags(status)).saveAsHadoop Files(“hdfs://…”)
val hashTags2 = tweets.flatMap(status => getTags(status)).foreach(hashTagRDD => {…})

A

tweets is a DStream, a sequence of RDD’s representing a stream of data. Remember that RDD’s are immutable and distributed.

hashTags1 is a new DStream, transformed by a flatMap (transformations modify data in one DSTream to create another Dstream, hashTags, containing Tags in this case), and saveAsHadoopFiles is an output operation to push data to an external storage (here HDFS).

hashTags2 is a new DStream, transformed by a flatMap, and runs the for each function.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
7
Q

Python Example 1:

from __future__ import print_function

import sys

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

if __name__ == “__main__”:

if len(sys.argv) != 2:
print(“Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)</directory>

sc = SparkContext(appName=”PythonStreamingHDFSWordCount”)
scc = StreamingContext(sc,1)

lines = scc.textFileSTream(sys.argv[1])
counts = lines.flatMap(lambda Line: line.split(“ “)).map(lambda x: (x,1)).reduceByKey(lambda a, b: a+b)
counts.pprint()

ssc.start()
ssc.awaitTermination()

A

The example code counts the words in new text files created in the given HDFS directory.

StreamingContext is the main entry point for Spark Streaming functionality.

The “1” in StreamingContext(sc,1) is the time interval in milliseconds at which the streaming data will be divided into batches.

ssc.start() Starts the execution of the streams.
ssc.awaitTermination() Waits for the execution to stop.

To test out the code,
Run in one terminal (leave it running): spark-submit hdfs_wordcount.py <directory> > stream_out.txt</directory>

Open another terminal and execute:
hadoop fs -mkdir test_stream
hadoop fs -put localfile.txt test_stream/

Go to the first terminal:
Press Ctrl+C to terminate the spark sprit.

Then, execute in the first terminal:
nano stream_out.txt

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
8
Q

Python Example 2:

from pyspark import SparkContext
from pyspark.streaming import StreamingContext

sc = SparkContext(“local[2]”, “NetworkWordCount”)
scc = StreamingContext(sc,1)

lines = ssc.socketTextStream(“localhost”, 9999)

words = lines.flatMap(lambda line: line.split(“ “)

ssc.start()
ssc.awaitTermination()

A

sc and scc are for making a local STreamingContext with two working threads and a batch interval of 1 second.

lines is for making a DSTream that will connect to hostname:port, like localhost:9999

words splits each line into words

ssc.start() starts the computation.

ssc.awaitTermination() waits for the computation to terminate.

To test out, start the Python script. Then, run nc -lk 999 in the terminal. The Spark Python script is connected with nc and starts processing the data stream from nc. Use Ctrl+c to terminate the connection.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
9
Q

Transformations: window(windowLength, slideInterval)

A

This tranformation returns a new DStream which is computed based on windowed batches of the source DStream.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
10
Q

Transformations: countByWindow(windowLength, slideInterval)

A

Return a sliding window count of elements in the stream.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
11
Q

Transformations: reduceByWindow(func, windowLength, slideInterval)

A

Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
12
Q

Window-based Queries Example:

val tweets = ssc.twitterSTream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()

A

This code counts the hashtags over the last 1 minute.

window returns a new DSTream which is computed based on windowed batches of hashTags, with a window length of 1 minute, and a sliding interval of 5 seconds.

tagCounts returns a stream, where the key is the word, and the value of the key is the frequency.

Example: [(#cat, 5), (#dog, 6), (#mouse, 11),…

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
13
Q

Output Operations:
saveAsTextFiles(prefix, [suffix])

A

It saves this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
14
Q

Output Operations:
foreachRDD(func)

A

It’s the most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database.

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
15
Q

Output Operations Example:

def sendPartition(iter):
connection = createNewConnection()

for record in iter:
connection.send(record)

connection.close()

dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))

A

This code sends each partition of the resultant RDD to a remote connection (e.g., TCP location).

How well did you know this?
1
Not at all
2
3
4
5
Perfectly
16
Q

updateStateByKey(func)

A

Return a new “state” DStream where the state for each key is updated by applying the given function on the previous state of the key and the new values for the key. This can be used to maintain arbitrary state data for each key. It’s possible for updateStateByKey to return None, meaning that the state of the key should be removed.

Example:
def updateFunction(newValues, runningCount):

if runningCount is None:
runningCount = 0

return sum(newValues, runningCount)

runningCounts = pairs.updateSTateByey(updateFunction)

17
Q

Combining Batch and Stream Processing Example:
cleanedDStream = wordCounts.transform(lambda rdd: rdd.join(spamInfoRDD).filter(…)).

A

There needs to be an RDD containing spam words (spamInfoRDD), and a stream with all words (wordCounts).

The join function returns a new DStream of (K, (V,W)) pairs with all pairs of elements for each key.

18
Q

Combining Batch and Stream Processing: Benefits

A
  1. Online machine learning. You can use a prediction function in conjunction with updateStateByKey and transform to continuously learn and update data models.
  2. Combine live data streams with historical data. You can generate historical data models with Spark, etc. Use data models to process live data stream (transform).
  3. Complex Event Processing (CEP). CEP matches continuously incoming events against a pattern. It can use window-based operations, such as reduceByWindow.
19
Q

Spark Streaming: Fault Tolerance

A

Spark Streaming buffers and replicates data among multiple Spark executors (by default, 2) in worker nodes in the cluster. The replication is done in batches.

When the data is replicated and the worker fails, data is obtained from the other working nodes.

When the data is buffered but not replicated, the data is obtained from the data source, if possible.

20
Q

Example of Mapped DStream:

override def compute(time: Time): Option[RDD[U]] = {
parent.getOrCompute(time).map(_.mapU)
}

A

Dependencies: single parent DStream

Slide Interval: same as the parent DStream

Compute function for time t: create a new RDD by applying map function on parent DStream’s RDD of time t.

getOrCompute(time) gets RDD of time t if already computed once, or generates it.
mapFunc applied to generate a new RDD.

21
Q

Spark Streaming System Model: Network Input Tracker

A

It keeps track of the data received by each network receiver and maps them to the corresponding input DStreams.

22
Q

Spark Streaming System Model: Job Scheduler

A

It periodically queries the DStream graph to generate Spark jobs from received data, and hands them to the Job Manager for execution.

23
Q

Spark Streaming System Model: Job Manager

A

It maintains a job queue and executes the jobs in Spark.

24
Q

DStream: Persistence

A

It makes sense for a DStream to persist if there are multiple transformations/actions on a DStream, as well as if RDD’s in a DStream is going to be used multiple time.

If a DStream is set to persist at a storage level, then all RDD’s generated by it are set to the same storage level. Window-based DStreams are automatically persisted in memory.

25
Q

Data Checkpointing

A

Checkpoints are done on the metadata (configuration, DStream operations, incomplete jobs) to recover from driver’s failures, as well as on data by saving the generated RDD’s to reliable storage.

Saving the RDD to the HDFS prevents the RDD graph from growing too large. This is done internally in Spark and is transparent to the user programme, and it’s done lazily, meaning the RDD is saved to the HDFS the first time it’s computed.

This is necessary, as stateful DSTream operators can have infinite lineages, which lead to large closure of the RDD object -> large task sizes -> high task launch times. There are high recovery times under failure. Periodic RDD checkpointing solves this. It’s useful for iterative Spark programmes as well.

26
Q

Data Checkpointing: Periodicity Tradeoff

A

If the checkpoint is too frequent, HDFS writing will slow things down.

If the checkpoint is too infrequent, the task launch times may increase.

Default setting checkpoint at most once in 10 seconds. Try to checkpoint once in about 10 batches.

27
Q

Spouts

A

These are stream sources. They read tuples from external sources or from a disk and emit them in the topology.

28
Q

Bolts

A

These process input streams and produce output streams. They encapsulate the application logic.

29
Q

DStream Storage Level

A

It’s defaulted to MEMORY_ONLY_SER, unless it’s coming from an external source, which then defaults to MEMORY_ONLY_SER_2.