Week 10: ApacheSpark Streaming Flashcards
Spark Streaming
It’s an extension of the core Spark ABI that allows for scalable, high-throughput, fault-tolerant stream processing of live data streams.
Properties of Spark Stream:
1. Scales to hundreds of nodes
2. Achieves second-scale latencies
3. Efficiently recovers from failures
4. Integrates with batch and interactive processing.
Input sources include, but aren’t limited to:
1. Kafka
2. Flume
3. HDFS/S3
4. Kinesis
5. Twitter
Output destinations include, but aren’t limited to:
1. HDFS
2. Databases
3. Dashboards
Unified Stack
A system of handling various big data processing tasks using a single framework.
Pros:
1. Explore data interactively (in shell) to identify problems.
2. Use the same code that you used in shell also in Spark for processing large logs.
3. Use similar code in Spark Streaming for realtime processing.
Spark Streaming: Applications
- Social media sites like Twitter need to discover malicious campaigns in realtime.
- Datacentres receive streams of logs from many clusters, e.g. Flume. They want to detect problems.
- Advertisers want to understand who clicks on their pages in realtime.
Data Streaming: Challenges
Data size calls for a very large cluster environment. More cluster nodes lead to more faults are likely. More cluster nodes also mean that slow nodes are likely.
Quick recovery is important because we don’t want to lose useful data. For example, click logs for advertising and social media accounts that may be affected by malicious campaign.
Discretised Streams (D-Streams)
Run a streaming computation as a series of very small, deterministic batch jobs.
Workflow:
1. Cut the stream into batches of X seconds.
2. Treat each batch as RDD and process it using RDD operations.
3. Finally, return the processed results of the RDD operations in batches.
In order to generate a RDD in each batch intervals, the DStream keeps:
1. Dependencies, a list of dependent (parent) DStreams
2. Slide interval, the interval at which it will compute RDD’s.
3. Function, to compute RDD at a time t.
Note that flatMap can be applied to both RDD’s to make new RDD’s and to DStreams, specifically batches of data it receives over time.
Pros:
1. Low latency, batch size X as low as 0.5 seconds.
2. RDD’s can store batch data, we can combine batch processing and streaming processing.
Cons:
1. Batches of a discredited stream of smaller than 0.1 seconds can’t be created.
Example: Get hashtags from Twitter
val tweets = ssc.twitterStream()
val hashTags1 = tweets.flatMap(status => getTags(status)).saveAsHadoop Files(“hdfs://…”)
val hashTags2 = tweets.flatMap(status => getTags(status)).foreach(hashTagRDD => {…})
tweets is a DStream, a sequence of RDD’s representing a stream of data. Remember that RDD’s are immutable and distributed.
hashTags1 is a new DStream, transformed by a flatMap (transformations modify data in one DSTream to create another Dstream, hashTags, containing Tags in this case), and saveAsHadoopFiles is an output operation to push data to an external storage (here HDFS).
hashTags2 is a new DStream, transformed by a flatMap, and runs the for each function.
Python Example 1:
from __future__ import print_function
import sys
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
if __name__ == “__main__”:
if len(sys.argv) != 2:
print(“Usage: hdfs_wordcount.py <directory>", file=sys.stderr)
exit(-1)</directory>
sc = SparkContext(appName=”PythonStreamingHDFSWordCount”)
scc = StreamingContext(sc,1)
lines = scc.textFileSTream(sys.argv[1])
counts = lines.flatMap(lambda Line: line.split(“ “)).map(lambda x: (x,1)).reduceByKey(lambda a, b: a+b)
counts.pprint()
ssc.start()
ssc.awaitTermination()
The example code counts the words in new text files created in the given HDFS directory.
StreamingContext is the main entry point for Spark Streaming functionality.
The “1” in StreamingContext(sc,1) is the time interval in milliseconds at which the streaming data will be divided into batches.
ssc.start() Starts the execution of the streams.
ssc.awaitTermination() Waits for the execution to stop.
To test out the code,
Run in one terminal (leave it running): spark-submit hdfs_wordcount.py <directory> > stream_out.txt</directory>
Open another terminal and execute:
hadoop fs -mkdir test_stream
hadoop fs -put localfile.txt test_stream/
Go to the first terminal:
Press Ctrl+C to terminate the spark sprit.
Then, execute in the first terminal:
nano stream_out.txt
Python Example 2:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
sc = SparkContext(“local[2]”, “NetworkWordCount”)
scc = StreamingContext(sc,1)
lines = ssc.socketTextStream(“localhost”, 9999)
words = lines.flatMap(lambda line: line.split(“ “)
ssc.start()
ssc.awaitTermination()
sc and scc are for making a local STreamingContext with two working threads and a batch interval of 1 second.
lines is for making a DSTream that will connect to hostname:port, like localhost:9999
words splits each line into words
ssc.start() starts the computation.
ssc.awaitTermination() waits for the computation to terminate.
To test out, start the Python script. Then, run nc -lk 999 in the terminal. The Spark Python script is connected with nc and starts processing the data stream from nc. Use Ctrl+c to terminate the connection.
Transformations: window(windowLength, slideInterval)
This tranformation returns a new DStream which is computed based on windowed batches of the source DStream.
Transformations: countByWindow(windowLength, slideInterval)
Return a sliding window count of elements in the stream.
Transformations: reduceByWindow(func, windowLength, slideInterval)
Return a new single-element stream, created by aggregating elements in the stream over a sliding interval using func. The function should be associative so that it can be computed correctly in parallel.
Window-based Queries Example:
val tweets = ssc.twitterSTream()
val hashTags = tweets.flatMap(status => getTags(status))
val tagCounts = hashTags.window(Minutes(1), Seconds(5)).countByValue()
This code counts the hashtags over the last 1 minute.
window returns a new DSTream which is computed based on windowed batches of hashTags, with a window length of 1 minute, and a sliding interval of 5 seconds.
tagCounts returns a stream, where the key is the word, and the value of the key is the frequency.
Example: [(#cat, 5), (#dog, 6), (#mouse, 11),…
Output Operations:
saveAsTextFiles(prefix, [suffix])
It saves this DStream’s contents as text files. The file name at each batch interval is generated based on prefix and suffix: “prefix-TIME_IN_MS[.suffix]”.
Output Operations:
foreachRDD(func)
It’s the most generic output operator that applies a function, func, to each RDD generated from the stream. This function should push the data in each RDD to an external system, such as saving the RDD to files, or writing it over the network to a database.
Output Operations Example:
def sendPartition(iter):
connection = createNewConnection()
for record in iter:
connection.send(record)
connection.close()
dstream.foreachRDD(lambda rdd: rdd.foreachPartition(sendPartition))
This code sends each partition of the resultant RDD to a remote connection (e.g., TCP location).