Dataflow
Overview
Open source apache beam api. Can be executed in flink, spark as well.
Jobs can be read, filter, group, transform
Jobs are executed in parallel
Tasks are written in Java or Python using beam sdk
Same code is used for streaming and batch. For streaming, we bound the data by windowing using
timeline, number of records
Terms – source, sink, runner(pipeline execution), transform(each step in pipeline), transform applied
on pcollection
Pipeline is a directed graph of steps
Source/Sink can be filesystem, gcs, bigquery, pub/sub
Runner can be local laptop, dataflow(in cloud)
Output data written can be sharded or unsharded. For unsharded, use unsharded option.
Pipelines
Pipeline.create
Pipeline.apply – setup individual tasks
Pipeline.run – pipeline started here
Input and outputs are pcollection. Pcollection is not in-memory and can be unbounded.
Each transform – give a name
Read from source, write to sink
Pipeline in Java:
parDo – indicates to do in parallel
Use “java classpath” or “mvn compile”
“Mvn compile” with runner “dataflow” will run in dataflow
Pipeline in Python:
| – means apply
Use “python <program>” or “python <program> dataflowrunner”
Pipelines are localized to a region
Shutdown options – cancel, drain. Drain is graceful.
Side input
Can be static like constant
Can also be a list or map. If side input is a pcollection, we first convert to list or map and pass that as
side input.
Call parDo.withsideInputs with the map or list
Mapreduce in Dataflow
Map – operates in parallel, reduce – aggregates based on key
parDo acts on one item at a time, similar to map operation in mapreduce, should not have
state/history. Useful for filtering, mapping.
In python, map done using map for 1:1, flatmap for non 1:1. In Java, done using parDo
Example of map – filtering, convert type, extracting parts of input, calculating from different inputs
Example of flatmap – The FlatMap example yields the line only for lines that contain the searchTerm.
groupBy:
Groupby does the aggregation. Done using combine and groupby key. Combine is faster than
groupby because it distributes across multiple workers. Use groupby for custom operations.
In Java, groupby returns iterable.
Combine example – sum, average. Groupby example – groupby state and zipcode. state is used for
grouping. Combine by key – total sales by person
Streaming
Challenges:
Size
Scalable
Fault-tolerant
Programming model
Unbounded data
Windowing
Use window approach to process in dataflow
For streaming data, pubsub has timestamp when data is inserted into pubsub
For batching data, we can insert timestamp when data is read so that dataflow pipeline can be
similar between streaming and batch
In code, we set streaming option to be true.
Window type – fixed, sliding, session id, global. Session id example is based on a particular user and
its dynamic.
Sliding window parameters – window duration, sliding window duration. (eg) 2 minute window done
every 30 seconds
Out of order from pubsub – taken care using aggregate
Duplicate from pubsub – taken care using pubsub msgid. If sender themselves sends duplicates,
pubsub wont be aware. In that case, sender can add id and dataflow can use it to remove duplicates.
Dataflow can use this id instead of pubsub id in this case.
Watermark, triggers and accumulation:
Window – event arrival time window
Watermark – dataflow tracking how far processing time is behind event time. Watermark is
dynamically calculated. It decides when to close the window. By default, watermark is based on
message arrival in pub/sub. We can change this using option that can be set when pushing message
to pub/sub.
Triggers – aggregation calculated at watermark. There is an option to calculate aggregate for each
late data arrival or we can drop late data. We can also control when to trigger relative to watermark.
Triggering api also helps in providing early results.
Default – all late arrival data will be discarded since default allowed_lateness is 0.
Types of triggers:
This is to handle late arrival of data
Time based trigger
Data driven triggers – Number of events based trigger
composite(Combination of time and data based trigger )
(eg)
PCollection> avgSpeed = currentConditions // .apply(“TimeWindow”, Window.into(SlidingWindows//
.of(Duration.standardMinutes(5)) .every(Duration.standardSeconds(60))))
▪ Above example, window is a time based trigger with sliding window type. Since lateness is
not specified, allowed lateness default is 0 which means ignore late data.
PCollection> scores = input .apply(Window.into(FixedWindows.of(Minutes(2))
.triggering(AfterWatermark() .withEarlyFirings(AtPeriod(Minutes(1))) .withLateFirings(AtCount(1))
.withAllowedLateness(Minutes(30) )
▪ Trigger 1 minute before watermark, at watermark and trigger after each batch of N (here
N=1) late events up to a maximum of 30 minutes.
PCollection> scores = input .apply(Window.into(Sessions.withGapDuration(Minutes(2))
.triggering(AfterWatermark() .withEarlyFirings(AtPeriod(Minutes(1))) .apply(Sum.integersPerKey());
▪ Session window example. Useful for data irregularly distributed. Gap duration specifies that
any data that is not idle for 2 minutes is grouped into same window.
With early firing scenarios:
Classic batch(no windows)
Batch with fixed windows
Triggering at watermark
Complicated watermark and triggering
Session windows
Accumulation mode:
trigger set to .accumulatingFiredPanes always outputs all data in a given window, including any
elements previously triggered. A trigger set to .discardingFiredPanes outputs incremental changes
since the last time the trigger fired.
IAM
only project level access, no further division. Dataflow admin, developer, viewer, worker. Worker is
specific to service account. Admin has access to storage bucket.
Choosing between dataproc and dataflow
Dataflow vs dataproc – existing hadoop/spark->dataproc. streaming->dataflow, complete serverless-
>dataflow