Spark Streaming
State of the Union and Beyond
Tathagata TD Das
@tathadas
Feb 19, 2015
Who am I?
Project Management Committee (PMC) member of Spark
Lead developer of Spark Streaming
Formerly in AMPLab, UC Berkeley
Software developer at Databricks
What is Databricks?
Founded by the creators of Spark in 2013
Largest organization contributing to Spark
End-to-end hosted service, Databricks Cloud
What is Spark
Streaming?
Spark Streaming
Scalable, fault-tolerant stream processing system
High-level API
Fault-tolerant
Integration
joins, windows, 
often 5x less code
Exactly-once semantics,
even for stateful ops
Integrate with MLlib, SQL,
DataFrames, GraphX
Kafka
File systems
Flume
Kinesis
HDFS/S3
Twitter
Streaming
Databases
Dashboards
What can you use it for?
Real-time fraud detection in transactions
React to anomalies in sensors in real-time
Cat videos in tweets as soon as they go viral
How does it work?
Data streams are chopped up into batches
Each batch is processed in Spark
Results pushed out in batches
data streams
receivers
Streaming
batches
results
7
Streaming Word Count
val lines = context.socketTextStream(localhost, 9999)
create DStream
from data over socket
val words = lines.flatMap(_.split(" "))
split lines into words
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
print some counts on screen
ssc.start()
start processing the stream
count the words
Word Count
object NetworkWordCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val context = new StreamingContext(sparkConf, Seconds(1))
val lines = context.socketTextStream(localhost, 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
public class WordCountTopology {
public static class SplitSentence extends ShellBolt implements IRichBolt {
public SplitSentence() {
super("python", "splitsentence.py");
}
Word Count
Spark Streaming
Storm
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word"));
}
public static class WordCount extends BaseBasicBolt {
Map<String, Integer> counts = new HashMap<String, Integer>();
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
String word = tuple.getString(0);
Integer count = counts.get(word);
if (count == null)
count = 0;
count++;
counts.put(word, count);
collector.emit(new Values(word, count));
}
object NetworkWordCount {
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("NetworkWordCount")
val context = new StreamingContext(sparkConf, Seconds(1))
val lines = context.socketTextStream(localhost, 9999)
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
wordCounts.print()
ssc.start()
ssc.awaitTermination()
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word", "count"));
}
public static void main(String[] args) throws Exception {
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("spout", new RandomSentenceSpout(), 5);
builder.setBolt("split", new SplitSentence(), 8).shuleGrouping("spout");
builder.setBolt("count", new WordCount(), 12).fieldsGrouping("split", new Fields("word"));
Config conf = new Config();
conf.setDebug(true);
if (args != null && args.length > 0) {
conf.setNumWorkers(3);
StormSubmitter.submitTopologyWithProgressBar(args[0], conf, builder.createTopology());
}
else {
conf.setMaxTaskParallelism(3);
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("word-count", conf, builder.createTopology());
Thread.sleep(10000);
}
cluster.shutdown();
10
Languages
Can natively use
Can use any other language by using pipe()
11
Integrates with Spark Ecosystem
Spark
Spark SQL
Streaming
MLlib
GraphX
Spark Core
12
Combine batch and streaming processing
Join data streams with static data sets
// Create data set from Hadoop file!
val dataset = sparkContext.hadoopFile(file)
// Join each batch in stream with the dataset
kafkaStream.transform { batchRDD =>
batchRDD.join(dataset)
.filter( ... )
}
Spark SQL
Spark
Streaming
MLlib
GraphX
Spark Core
13
Combine machine learning with streaming
Learn models oline, apply them online
// Learn model oline
val model = KMeans.train(dataset, ...)
Spark SQL
// Apply model online on stream
kafkaStream.map { event =>
model.predict(event.feature)
}
Spark
Streaming
MLlib
GraphX
Spark Core
14
Combine SQL with streaming
Interactively query streaming data with SQL
// Register each batch in stream as table
kafkaStream.map { batchRDD =>
batchRDD.registerTempTable("latestEvents") Spark SQL
}
Spark
Streaming
MLlib
GraphX
Spark Core
// Interactively query table
sqlContext.sql("select * from latestEvents")
15
History
Late 2011  research idea
AMPLab, UC Berkeley
We need to
make Spark
faster
Okay...umm,
how??!?!
16
History
Late 2011  idea
AMPLab, UC Berkeley
Q3 2012
Spark core improvements
open sourced in Spark 0.6
Q2 2012  prototype
Rewrote large parts of Spark core
Smallest job - 900 ms  <50 ms
Feb 2013  Alpha release
7.7k lines, merged in 7 days
Released with Spark 0.7
17
History
Late 2011  idea
AMPLab, UC Berkeley
Q3 2012
Spark core improvements
open sourced in Spark 0.6
Q2 2012  prototype
Rewrote large parts of Spark core
Smallest job - 900 ms  <50 ms
Jan 2014  Stable release
Graduation with Spark 0.9
Feb 2013  Alpha release
7.7k lines, merged in 7 days
Released with Spark 0.7
18
Current state of
Spark Streaming
Development
Adoption
Roadmap
20
What have we added
in the last year?
21
Python API
Core functionality in Spark 1.2,
with sockets and files as sources
Kafka support coming in Spark 1.3
lines = ssc.socketTextStream(localhost", 9999))
counts = lines.flatMap(lambda line: line.split(" ")) \
.map(lambda word: (word, 1)) \
.reduceByKey(lambda a, b: a+b)
counts.pprint()
Other sources coming in future
22
Streaming MLlib algorithms
Continuous learning and
prediction on streaming data
StreamingLinearRegression in
Spark 1.1
val model = new StreamingKMeans()
.setK(args(3).toInt)
.setDecayFactor(1.0)
.setRandomCenters(args(4).toInt, 0.0)
// Apply model to DStreams
model.trainOn(trainingDStream)
model.predictOnValues(testDStream.map { lp =>
(lp.label, lp.features) } ).print()
StreamingKMeans in Spark 1.2
https://databricks.com/blog/2015/01/28/introducing-streaming-k-means-in-spark-1-2.html
23
Other library additions
Amazon Kinesis integration [ Spark 1.1]
More fault-tolerant Flume integration [Spark 1.1]
New Kafka API for more native integration [Spark 1.3]
24
System Infrastructure
Automated driver fault-tolerance [Spark 1.0]
Graceful shutdown [Spark 1.0]
Write Ahead Logs for zero data loss [Spark 1.2]
25
Contributors to Streaming
40
30
20
10
0
Spark 0.9
Spark 1.0
Spark 1.1
Spark 1.2
26
Contributors - Full Picture
120
Streaming
90
Core + Streaming
(w/o SQL, MLlib,)
60
30
0
Spark 0.9
Spark 1.0
Spark 1.1
Spark 1.2
All contributions
to core Spark
directly improve
Spark Streaming
27
Spark Packages
More contributions from the
community in spark-packages
Alternate Kafka receiver
Apache Camel receiver
Cassandra examples
http://spark-packages.org/
28
Who is using
Spark Streaming?
Spark Summit 2014 Survey
40% of Spark users were
using Spark Streaming in
production or prototyping
Production
9%
Prototyping
31%
Not using
21%
Another 39% were
evaluating it
Evaluating
39%
30
31
80+
known
deployments
32
Intel China builds big data solutions for large enterprises
Multiple streaming applications for dierent businesses
Real-time risk analysis for a top online payment company
Real-time deal and flow metric reporting for a top online shopping company
Complicated stream processing
SQL queries on streams
Join streams with large historical datasets
> 1TB/day passing through Spark Streaming
Kafka
RocketMQ
Spark
Streaming
YARN
HBase
One of the largest publishing and education company, wants
to accelerate their push into digital learning
Needed to combine student activities and domain events to
continuously update the learning model of each student
Earlier implementation in Storm, but now moved on to
Spark Streaming
Chose Spark Streaming, because Spark together combines
batch, streaming, machine learning, and graph processing
Kafka
Spark
Streaming
YARN
More information: http://dbricks.co/1BnFZZ8
Cassandra
Apache Blur
Leading advertising automation company with an exchange
platform for in-feed ads
Process clickstream data for optimizing real-time bidding for ads
Kinesis
RabbitMQ
Spark
Streaming
Mesos+Marathon
MySQL
Redis
SQS
http://techblog.netflix.com/2015/02/whats-trending-on-netflix.html
http://goo.gl/mJNf8X
Neuroscience @ Freeman Lab, Janelia Farm
Spark Streaming and MLlib to
analyze neural activities
Laser microscope scans Zebrafish
brain Spark Streaming 
interactive visualization 
laser ZAP to kill neurons!
http://www.jeremyfreeman.net/share/talks/spark-summit-2014/
Neuroscience @ Freeman Lab, Janelia Farm
Streaming machine learning
algorithms on time series data of
every neuron
2TB/hour and increasing with
brain size
80 HPC nodes
Why are they adopting Spark Streaming?
Easy, high-level API
Unified API across batch and streaming
Integration with Spark SQL and MLlib
Ease of operations
41
Whats coming next?
Beyond Spark 1.3
Libraries
Streaming machine learning algorithms
A/B testing
Online Latent Dirichlet Allocation (LDA)
More streaming linear algorithms
Streaming + SQL, Streaming + DataFrames
43
Beyond Spark 1.3
Operational Ease
Better flow control
Elastic scaling
Cross-version upgradability
Improved support for non-Hadoop environments
44
Beyond Spark 1.3
Performance
Higher throughput, especially of stateful operations
Lower latencies
Easy deployment of streaming apps in Databricks Cloud!
45
You can help!
Roadmaps are heavily driven by community feedback
We have listened to community demands over the last year
Write Ahead Logs for zero data loss
New Kafka integration for stronger semantics
Let us know what do you want to see in Spark Streaming
Spark user mailing list, tweet it to me @tathadas
46
Takeaways
Spark Streaming is scalable, fault-tolerant stream processing
system with high-level API and rich set of libraries
Over 80+ deployments in the industry
More libraries and operational ease in the roadmap
47
Backup slides
48
Typesafe survey of Spark users
2136 developers, data scientists,
and other tech professionals
http://java.dzone.com/articles/apache-spark-survey-typesafe-0
Typesafe survey of Spark users
65% of Spark users are interested
in Spark Streaming
Typesafe survey of Spark users
2/3 of Spark users want to process
event streams
More usecases
52
 Big data solution provider for enterprises
 Multiple applications for dierent businesses
- Monitoring +optimizing online services of Tier-1 bank
- Fraudulent transaction detection for Tier-2 bank
 Kafka  SS  Cassandra, MongoDB
 Built their own Stratio Streaming platform on
Spark Streaming, Kafka, Cassandra, MongoDB
 Provides data analytics solutions for Communication
Service Providers
- 4 of 5 top mobile ops, 3 of 4 top internet backbone providers
- Processes >50% of all US mobile traic
 Multiple applications for dierent businesses
- Real-time anomaly detection in cell tower traic
- Real-time call quality optimizations
 Kafka  SS
http://spark-summit.org/2014/talk/building-big-data-operational-intelligence-platform-with-apache-spark
 Runs claims processing applications for healthcare providers
 Predictive models can look
for claims that are likely to
be held up for approval
 Spark Streaming allows
model scoring in seconds
instead of hours
http://searchbusinessanalytics.techtarget.com/feature/Spark-Streaming-project-looks-to-shed-new-light-on-medical-claims