SPARK STREAMING

Hadoop
------------------------------
Map Reduce | Pig | Hive
SPARK
------------------------------------------------SQL & Hive | StreamingML | GraphX
NOSQL
-----------------------
MongoDB HBase
Data Ingestion Tools
--------------------------
Sqoop Flume
Tools like Hadoop,Pig,Hive,Spark-Core,Spark SQL works with data which is residing at one place(i.e. Data at Rest) but what about the data which is continuously coming(Streaming data i.e Data In Motion) to our system and we want to analyse the data at same moment when it is generated.Spark Streaming can handle this.
Even Apache Storm / Splunk /ELK(Elastic Search +Logstash+Kibana) are  existing streaming system which can also handle streaming data but they are used for specific cases.

Spark Streaming is an extension of the core Spark API that enables scalable, high-throughput, fault-tolerant stream processing of live data streams. Spark Streaming can be used to stream live data and processing can happen in real time. Spark Streaming’s ever-growing user base consists of household names like Uber, Netflix and Pinterest.

When it comes to Real Time Data Analytics, Spark Streaming provides a single platform to ingest data for fast and live processing in Apache Spark. Through this blog, I will introduce you to this new exciting domain of Spark Streaming and we will go through a complete use case, Twitter Sentiment Analysis using Spark Streaming.
Spark Streaming is used for processing real-time streaming data. It is a useful addition to the core Spark API. Spark Streaming enables high-throughput and fault-tolerant stream processing of live data streams



Spark Streaming Workflow
Spark Streaming workflow has four high-level stages. The first is to stream data from various sources. These sources can be streaming data sources like Akka, Kafka, Flume, AWS or Parquet for real-time streaming. The second type of sources includes HBase, MySQL, PostgreSQL, Elastic Search, Mongo DB and Cassandra for static/batch streaming. Once this happens, Spark can be used to perform Machine Learning on the data through its MLlib API. Further, Spark SQL is used to perform further operations on this data. Finally, the streaming output can be stored into various data storage systems like HBase, Cassandra, MemSQL, Kafka, Elastic Search, HDFS and local file system.

Spark Streaming is not very efficient & fault toulrent.In case your app crashes , you will lose your data.So to avoid this, you can use tools like Kafka.Kafka will behave as staging area for this streaming data.So data streaming can  come to spark directly or via Kafka system.
In Spark, these streams are received by Receiver. Receiver is the entry point where streams are received. We have to create the receiver in our application.

There are many different type of receiver.
  • Spark streaming with text
  • Spark streaming withsocket,
  • Spark streaming with file,
  • Spark streaming with flume,
  • Spark streaming with kafka,
  • Spark streaming with twitter.
  • and manymore...


Receiver will be running on separate worker node receiving this streaming data.

Spark Streaming Fundamentals

  1. Streaming Context
  2. DStream
  3. Caching
  4. Accumulators, Broadcast Variables and Checkpoint
Streaming Context
Streaming Context consumes a stream of data in Spark. It registers an Input DStream to produce a Receiver object. It is the main entry point for Spark functionality. Spark provides a number of default implementations of sources like Twitter, Akka Actor and ZeroMQ that are accessible from the context.
A StreamingContext object can be created from a SparkContext object. A SparkContext represents the connection to a Spark cluster and can be used to create RDDs, accumulators and broadcast variables on that cluster.
DStream
Discretized Stream (DStream) is the basic abstraction provided by Spark Streaming. It is a continuous stream of data. It is received from a data source or a processed data stream generated by transforming the input stream.

Example:  Extracting words from an Input DStream.In every 1 second data is processed using some flatmap operation and analysed.



The following are some of the popular transformations on DStreams: 
                                      map(func) ,flatMap(func), filter(func), reduce(func), groupBy(func)


Output DStreams: 
Output operations allow DStream’s data to be pushed out to external systems like databases or file systems. Output operations trigger the actual execution of all the DStream transformations

Caching
DStreams allow developers to cache/ persist the stream’s data in memory. This is useful if the data in the DStream will be computed multiple times. This can be done using the persist() method on a DStream

Lets Understand Batch Interval /Sliding Interval/ Window Interval Concept in Spark Streaming:


 Example: As per below example, batch interval is set 2 min, slide interval is set 4 min and window interval is set as 4 min. This means that, this streaming application take 2 min worth of data as each batch and process the data at every 4th min as per sliding window but only process last 4 min of data (i.e 2 batch of data at a time)as given by Window interval.
Batch Interval : 2 min
Slide Interval:4 min
Window Interval :4
10.01 :10.02 :10.03 :10.04 :10.05 :10.06 :10.07 :10.08 :10.09 :10.10 :10.11 ----Batch1----.----Batch2----..----Batch3----.----Batch4----.----Batch5----

// Reading streaming data files coming in a folder
import org.apache.spark.streaming.{StreamingContext,Seconds}
val ssc =  new StreamingContext(sc,Seconds(5))
val empDstream = ssc.textFileStream("/user/Streaming")
case class Emp(id:Int,name:String,sal:Double)
empDstream.foreachRDD(rdd=>rdd.map(line=>line.split(",")).map(c=> Emp(c(0).toInt,c(1),c(2).toDouble)).foreach(println))

// Reading streaming data from socket

import org.apache.spark.streaming._
val ssc = new StreamingContext(sc, Seconds(5))
val streams = ssc.socketTextStream("172.31.60.179",44446,   org.apache.spark.storage.StorageLevel.MEMORY_ONLY)
val wordcount = streams.flatMap(_.split(" ")).map(w=>(w,1)).reduceByKey(_+_)
wordcount.print

//Reading streaming data from Twitter 
import org.apache.spark.streaming.{Seconds, StreamingContext}
import org.apache.spark.streaming.twitter._
import org.apache.spark.streaming.twitter.TwitterUtils
val consumerKey = "Have Your Own ConsumerKey from Twitter"
val consumerSecret ="Have Your Own ConsumerSecret from Twitter"
val accessToken ="Have Your Own accessToken from Twitter"
val accessTokenSecret ="Have Your Own accessToken Secret from Twitter"
System.setProperty("twitter4j.oauth.consumerKey", consumerKey)
System.setProperty("twitter4j.oauth.consumerSecret", consumerSecret)
System.setProperty("twitter4j.oauth.accessToken", accessToken)
System.setProperty("twitter4j.oauth.accessTokenSecret", accessTokenSecret)
val filters = Array("modi","trump","trudo","arnab","rahul gandhi")
val ssc = new StreamingContext(sc, Seconds(10))
val stream = TwitterUtils.createStream(ssc, None, filters)
val hashTags = stream.flatMap(status => status.getText.split(" ").filter(_.startsWith("#")))
val topCounts10 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(10)).map{case (topic, count) => (count, topic)}.transform(_.sortByKey(false))

topCounts10.foreachRDD(rdd => {
     
 val topList = rdd.take(10)
  
 println("\nPopular topics in last 10 seconds (%s total):".format(rdd.count()))
  
 topList.foreach{case (count, tag) =>  println("%s (%s tweets)".format(tag, count))}
    })

 // Using KAFKA as source for streaming data in SPARK

Comments

Popular posts from this blog

Exploring BigData Analytics Using SPARK in BigData World