[go: up one dir, main page]

0% found this document useful (0 votes)
104 views84 pages

Introduction To Spark

The document discusses Apache Spark, an open-source distributed processing system used for big data workloads. It provides an introduction to Spark and compares it to Hadoop, noting that Spark is faster, easier to use, and supports both batch and stream processing. The document also describes Spark's architecture and key components like RDDs, DataFrames, and libraries for SQL, machine learning, and graph processing.

Uploaded by

Namruta G H
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
104 views84 pages

Introduction To Spark

The document discusses Apache Spark, an open-source distributed processing system used for big data workloads. It provides an introduction to Spark and compares it to Hadoop, noting that Spark is faster, easier to use, and supports both batch and stream processing. The document also describes Spark's architecture and key components like RDDs, DataFrames, and libraries for SQL, machine learning, and graph processing.

Uploaded by

Namruta G H
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PPTX, PDF, TXT or read online on Scribd
You are on page 1/ 84

Big Data Analytics

MCA 5026
Introduction
Introduction to SPARK
Hadoop Eco System
Apache Spark™
• is a unified analytics engine for large-scale data processing.
• Is an open-source, distributed processing system used for big data workloads.
• It utilizes in-memory caching and optimized query execution for fast queries
against data of any size.
• Spark is a fast and general engine for large-scale data processing.
• It’s faster than previous approaches to work with Big Data like classical Map Reduce.
• Faster because Spark runs on memory (RAM), and that makes the processing much
faster than on disk drives.
•  general means that it can be used for multiple things like running distributed SQL,
creating data pipelines, ingesting data into a database, running Machine Learning
algorithms, working with graphs or data streams, and much more.
Why Spark when Hadoop is already there?
• Hadoop is based on  batch processing 
• where the processing happens of blocks of data that have already been stored over a period of time.
• Spark
• could process data in real time 
• is 100 times faster than Hadoop MapReduce in batch processing large data sets.
• Spark's RDDs
• function as a working set for distributed programs that offers a restricted form of distributed shared
memory
• facilitates the implementation of both iterative algorithms and interactive/exploratory data analysis
• The latency of such applications may be reduced by several orders of magnitude compared to Apache
Hadoop MapReduce 
• Examples of Real time analytics
• Fraud detection
• Checking medical status of patients
• National Security
Map Reduce vs. Spark
Apache Spark™ features
• Speed
• Run workloads 100x faster.
• achieves high performance for both batch and streaming data, using a state-
of-the-art DAG scheduler, a query optimizer, and a physical execution engine.
• Spark contains Resilient Distributed Dataset (RDD)
•  which saves time in reading and writing operations, allowing it to run almost ten to one
hundred times faster than Hadoop.
• Ease of Use
• Write applications quickly in Java, Scala, Python, R, and SQL.
• Spark offers over 80 high-level operators that make it easy to build parallel
apps.
• Use it interactively from the Scala, Python, R, and SQL shells.
Apache Spark™ features
• In-memory computing 
• stores the data in the RAM of servers which allows quick access and in turn
accelerates the speed of analytics.
• Real-time processing 
• is able to process real-time streaming data.
• Unlike MapReduce which processes only stored data, Spark is able to process
real-time data and is, therefore, able to produce instant outcomes.
• Better analytics 
• Apache Spark consists of a rich set of SQL queries, machine learning
algorithms, complex analytics, etc. With all these functionalities, analytics can
be performed in a better fashion with the help of Spark.
Apache Spark™
• Generality
• Combine SQL, streaming, and complex analytics.
• Powers a stack of libraries including SQL and DataFrames, MLlib for machine
learning, GraphX, and Spark Streaming.
• combine these libraries seamlessly in the same application.
• Runs Everywhere
• Runs on Hadoop, Apache Mesos, Kubernetes, standalone, or in the cloud.
• It can access diverse data sources.
• run Spark using its standalone cluster mode, on EC2, on Hadoop YARN,
• on Mesos, or on Kubernetes.
• Access data in HDFS, Alluxio, Apache Cassandra, Apache HBase, Apache Hive, and
hundreds of other data sources.
Spark Components
Spark Components
• Apache Spark Core 
• is the underlying general execution engine for the Spark platform that all other functionality is built
upon.
• It provides in-memory computing and referencing datasets in external storage systems.
• Spark SQL 
• is Apache Spark’s module for working with structured data.
• provides Spark with more information about the structure of both the data and the computation
being performed.
• Spark Streaming 
• This component allows Spark to process real-time streaming data.
• Data can be ingested from many sources like Kafka, Flume, and HDFS (Hadoop Distributed File
System).
• Then the data can be processed using complex algorithms and pushed out to file systems, databases,
and live dashboards.
Spark Components
• MLlib (Machine Learning Library) 
• Apache Spark is equipped with a rich library known as MLlib.
• contains a wide array of machine learning algorithms- classification,
regression, clustering, and collaborative filtering.
• includes other tools for constructing, evaluating, and tuning ML Pipelines.
• All these functionalities help Spark scale out across a cluster.
• GraphX 
• a library to manipulate graph databases and perform computations
• unifies ETL (Extract, Transform, and Load) process, exploratory analysis, and
iterative graph computation within a single system.
Main concepts of Spark
• Spark Shell: 
• Spark’s shell provides a simple way to learn the API, as well as a powerful tool to analyze data interactively.
• Spark Session: 
• In earlier versions of Spark, the entry point was
• Spark Context
• For streaming, StreamingContext
• for SQL sqlContext
• for hive HiveContext. 
• Now SparkSession which is combination of all
• Data Sources:
• API provides a pluggable mechanism for accessing structured data though Spark SQL.
• API is used to read and store structured and semi-structured data into Spark SQL. 
• can be more than just simple pipes that convert data and pull it into Spark.
Main Concepts of Spark
• RDD:
• Resilient Distributed Dataset (RDD)
• is a fundamental data structure of Spark.
• an immutable distributed collection of objects.
• Each dataset in RDD is divided into logical partitions, which may be computed on different nodes of the
cluster.
• can contain any type of Python, Java, or Scala objects, including user-defined classes.
• Dataset: 
• is a distributed collection of data.
• can be constructed from JVM objects and then manipulated using functional transformations (map,
flatMap, filter, etc.).
• API is available in Scala and Java.
• DataFrames: 
• is a Dataset organized into named columns.
• It is conceptually equivalent to a table in a relational database or a data frame in R/Python, but with
richer optimizations.
• can be constructed from a wide array of sources such as: structured data files, tables in Hive,
external databases or existing RDDs.
Resilient Distributed Datasets
• is a fundamental data structure of Spark.
• It is an immutable distributed collection of objects.
• Each dataset in RDD is divided into logical partitions, which may be computed on
different nodes of the cluster.
• can contain any type of Python, Java, or Scala objects, including user-defined classes.
• is a fault-tolerant collection of elements that can be operated on in parallel.
• Two ways to create RDDs
• parallelizing an existing collection in your driver program
• referencing a dataset in an external storage system, such as a shared file system, HDFS, HBase,
or any data source offering a Hadoop Input Format.
• Spark makes uses RDD to achieve faster and efficient MapReduce operations.
Spark Cluster Mode
• run as independent sets of processes on a
cluster, coordinated by the SparkContext object
in main program
• called the driver program
• To run on a cluster, the SparkContext can
connect to several types of cluster managers
• either Spark’s own standalone cluster manager,
Mesos or YARN
• which allocate resources across applications
• Once connected, Spark acquires executors on
nodes in the cluster
• Executors are processes that run computations
and store data for the application.
• Next, it sends application to the executors.
• Finally, SparkContext sends tasks to the
executors to run.
• Each application gets its own executor processes, which
Spark Cluster Mode stay up for the duration of the whole application and run
tasks in multiple threads.
• This has the benefit of isolating applications from each
other, on both the
• scheduling side (each driver schedules its own tasks)
• executor side (tasks from different applications run in
different JVMs).
• However, it also means that data cannot be shared
across different Spark applications (instances of
SparkContext) without writing it to an external storage
system.
• The driver program must listen for and accept incoming
connections from its executors throughout its lifetime
(e.g., see spark.driver.port in the network config section).
• As such, the driver program must be network
addressable from the worker nodes.
• Because the driver schedules tasks on the cluster, it
should be run close to the worker nodes, preferably on
the same local area network.
• If you’d like to send requests to the cluster remotely, it’s
better to open an RPC to the driver and have it submit
operations from nearby than to run a driver far away
from the worker nodes.
Data Sharing – MapReduce vs. RDD
• MapReduce • Resilient Distributed Datasets
• is widely used for processing and generating
large datasets with a parallel, distributed
(RDD)
algorithm on a cluster. • it supports in-memory processing
• users to write parallel computations, using a computation.
set of high-level operators, without having • It stores the state of memory as an
to worry about work distribution and fault
tolerance. object across the jobs and the
• To reuse data between computations , write object is sharable between those
it to an external stable storage system. jobs.
• Data sharing is slow due to replication, • Data sharing in memory is 10 to
serialization, and disk IO.
100 times faster than network and
• Most of the Hadoop applications, spend
more than 90% of the time doing HDFS
Disk.
read-write operations.
Iterative Operations on MapReduce
Iterative Operations on Spark RDD
Interactive Operations on Map Reduce
• MapReduce • Spark RDD
Spark Shell Commands
a) Read File from local filesystem and create an RDD.
• [php]scala> val data = sc.textFile(“data.txt”)[/php]
b) Create an RDD through Parallelized Collection
• [php]scala> val no = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
c) From Existing RDDs
• [php]scala> val newRDD = no.map(data => (data * 2))[/php]
d) Number of Items in RDD
• [php]scala> data.count()[/php]
e) 2.3. Filter Operation
• Filter the RDD and create new RDD of items which contain word “DataFlair”.
• To filter, we need to call transformation filter, which will return a new RDD with subset of items.
• [php]scala> val DFData = data.filter(line => line.contains(“DataFlair”))[/php]
Spark Shell Commands
• Transformation and Action together
• For complex requirements, we can chain multiple operations together like filter transformation and
count action together:
• [php]scala> data.filter(line => line.contains(“DataFlair”)).count()[/php]
• Read the first item from the RDD
• To read the first item from the file, you can use the following command:
• [php]scala> data.first()[/php]
• Read the first 5 item from the RDD
• To read the first 5 item from the file, you can use the following command:
• [php]scala> data.take(5)[/php]
• RDD Partitions
• An RDD is made up of multiple partitions, to count the number of partitions:
• [php]scala> data.partitions.length[/php]
Spark Shell Commands
• Cache the file
• Caching is the optimization technique.
• Once we cache the RDD in the memory all future computation will work on the in-memory data,
which saves disk seeks and improve the performance.
• [php]scala> data.cache()[/php]
• RDDs will be cached once we run the Action, which actually needs data read from the
disk.
• [php]scala> data.count()[/php]
• [php]scala> data.collect()[/php]
• 2.9. Read Data from HDFS file
• To read data from HDFS file we can specify complete hdfs URL like hdfs://IP:PORT/PATH
• [php]scala> var hFile = sc.textFile(“hdfs://localhost:9000/inp”)[/php]
Apache Spark Core Programming
• Open Spark Shell
• $ spark-shell
• Create Simple RDD
scala> val inputfile = sc.textFile(“input.txt”)
• The Spark RDD API introduces few Transformations and few Actions to manipulate RDD.
• Execute Word Count Transformation
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
• flat map for splitting each line into words
• read each word as a key with a value ‘1’ 
• reduce those keys by adding values of similar keys
• Applying the action
scala> counts.saveAsTextFile("output")
Apache Spark Core Programming
• Caching the transformation
counts.cache()
• can mark an RDD to be persisted using the persist() or cache() methods on it.
• to store the intermediate transformations in memory.
• Checking the Output
[hadoop@localhost ~]$ cd output/
[hadoop@localhost output]$ ls -1
part-00000
part-00001
_SUCCESS
cat part-00000
• Unpersist the storage
Scala> counts.unpersist()
Apache Spark Core Programming
• RDD transformations
• returns pointer to new RDD and allows users to create dependencies
between RDDs.
• Each RDD in dependency chain (String of Dependencies) has a function for
calculating its data and has a pointer (dependency) to its parent RDD.
• Spark is lazy, so nothing will be executed unless you call some transformation
or action that will trigger job creation and execution.
• RDD transformation is not a set of data but is a step in a program telling Spark
how to get data and what to do with it.
RDD Transformations
• map(func)
• Returns a new distributed dataset, formed by passing each element of the source through a function func.
• filter(func)
• Returns a new dataset formed by selecting those elements of the source on which func returns true.
• flatMap(func)
• Similar to map, but each input item can be mapped to 0 or more output items
• so func should return a Seq rather than a single item
• mapPartitions(func)
• Similar to map, but runs separately on each partition (block) of the RDD
• so func must be of type Iterator<T> ⇒ Iterator<U> when running on an RDD of type T.
• mapPartitionsWithIndex(func)
• Similar to map Partitions, but also provides func with an integer value representing the index of the
partition,
• so func must be of type (Int, Iterator<T>) ⇒ Iterator<U> when running on an RDD of type T.
RDD Transformations
• sample(withReplacement, fraction, seed)
• Sample a fraction of the data, with or without replacement, using a given random number
generator seed.
• union(otherDataset)
• Returns a new dataset that contains the union of the elements in the source dataset and the
argument.
• intersection(otherDataset)
• Returns a new RDD that contains the intersection of elements in the source dataset and the
argument.
• distinct([numTasks])
• Returns a new dataset that contains the distinct elements of the source dataset.
• groupByKey([numTasks])
• When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
RDD Transformations
• reduceByKey(func, [numTasks])
• When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated
using the given reduce function func, which must be of type (V, V) ⇒ V.
• Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
• aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])
• When called on a dataset of (K, V) pairs, returns a dataset of (K, U) pairs where the values for each key are aggregated
using the given combine functions and a neutral "zero" value.
• Allows an aggregated value type that is different from the input value type, while avoiding unnecessary allocations.
• Like in groupByKey, the number of reduce tasks is configurable through an optional second argument.
• sortByKey([ascending], [numTasks])
• When called on a dataset of (K, V) pairs where K implements Ordered, returns a dataset of (K, V) pairs sorted by keys in
ascending or descending order, as specified in the Boolean ascending argument.
• join(otherDataset, [numTasks])
• When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each
key. Outer joins are supported through leftOuterJoin, rightOuterJoin, and fullOuterJoin.
• cogroup(otherDataset, [numTasks])
• When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (Iterable<V>, Iterable<W>)) tuples. This
operation is also called group With.
RDD Transformations
• cartesian(otherDataset)
• When called on datasets of types T and U, returns a dataset of (T, U) pairs (all pairs of elements).
• pipe(command, [envVars])
• Pipe each partition of the RDD through a shell command, e.g. a Perl or bash script. RDD elements are written to the
process's stdin and lines output to its stdout are returned as an RDD of strings.
• coalesce(numPartitions)
• Decrease the number of partitions in the RDD to numPartitions.
• Useful for running operations more efficiently after filtering down a large dataset.
• repartition(numPartitions)
• Reshuffle the data in the RDD randomly to create either more or fewer partitions and balance it across them.
• This always shuffles all data over the network.
• repartitionAndSortWithinPartitions(partitioner)
• Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.
• This is more efficient than calling repartition and then sorting within each partition because it can push the sorting
down into the shuffle machinery.
RD Actions
• reduce(func)
• Aggregate the elements of the dataset using a function func (which takes two arguments and
returns one). The function should be commutative and associative so that it can be
computed correctly in parallel.
• collect()
• Returns all the elements of the dataset as an array at the driver program. This is usually
useful after a filter or other operation that returns a sufficiently small subset of the data.
• count()
• Returns the number of elements in the dataset.
• first()
• Returns the first element of the dataset (similar to take (1)).
• take(n)
• Returns an array with the first n elements of the dataset.
RDD Actions
• takeSample (withReplacement,num, [seed])
• Returns an array with a random sample of num elements of the dataset, with or without
replacement, optionally pre-specifying a random number generator seed.
• takeOrdered(n, [ordering])
• Returns the first n elements of the RDD using either their natural order or a custom
comparator.
• saveAsTextFile(path)
• Writes the elements of the dataset as a text file (or set of text files) in a given directory in
the local filesystem, HDFS or any other Hadoop-supported file system.
• Spark calls toString on each element to convert it to a line of text in the file.
• saveAsSequenceFile(path) (Java and Scala)
• Writes the elements of the dataset as a Hadoop SequenceFile in a given path in the local
filesystem, HDFS or any other Hadoop-supported file system.
RDD Actions
• saveAsObjectFile(path) (Java and Scala)
• `Writes the elements of the dataset in a simple format using Java serialization,
which can then be loaded using SparkContext.objectFile().
• countByKey()
• Only available on RDDs of type (K, V). Returns a hashmap of (K, Int) pairs with
the count of each key.
• foreach(func)
• Runs a function func on each element of the dataset.
• This is usually, done for side effects such as updating an Accumulator or
interacting with external storage systems.
• Note − modifying variables other than Accumulators outside of the foreach()
may result in undefined behavior.
Spark Submit Syntax
spark-submit [options] <app jar | python file> [app arguments]
• --master spark://host:port, mesos://host:port, yarn, or local.
• --deploy-mode Whether to launch the driver program locally ("client") or on one of the worker
machines inside the cluster ("cluster") (Default: client).
• --class Your application's main class (for Java / Scala apps).
• --nameA name of your application.
• --jars Comma-separated list of local jars to include on the driver and executor classpaths.
• --packages Comma-separated list of maven coordinates of jars to include on the driver and
executor classpaths.
• --repositories Comma-separated list of additional remote repositories to search for the maven
coordinates given with --packages.
• --py-files Comma-separated list of .zip, .egg, or .py files to place on the PYTHON PATH for Python
apps.
• --files Comma-separated list of files to be placed in the
Spark Shared Variables
• Broadcast variables 
• used to efficiently, distribute large values.
• Accumulators 
• used to aggregate the information of particular collection.
Spark Shared Variables
• Broadcast variables
• allow the programmer to keep a read-only variable cached on each machine rather than shipping a copy of it with tasks.
• They can be used, for example, to give every node, a copy of a large input dataset, in an efficient manner.
• Spark also attempts to distribute broadcast variables using efficient broadcast algorithms to reduce communication cost.
• Spark actions are executed through a set of stages, separated by distributed “shuffle” operations.
• Spark automatically broadcasts the common data needed by tasks within each stage.
• The data broadcasted this way is cached in serialized form and is deserialized before running each task.
• This means that explicitly creating broadcast variables, is only useful when tasks across multiple stages need
the same data or when caching the data in deserialized form is important.
• Broadcast variables are created by calling SparkContext.broadcast(v).
• The broadcast variable is a wrapper around v, and its value can be accessed by calling the value method.
• scala> val broadcastVar = sc.broadcast(Array(1, 2, 3))
• Output − broadcastVar: org.apache.spark.broadcast.Broadcast[Array[Int]] = Broadcast(0)
• After the broadcast variable is created, it should be used instead of the value v in any functions run on the
cluster, so that v is not shipped to the nodes more than once.
• In addition, the object v should not be modified after its broadcast, in order to ensure that all nodes get the
same value of the broadcast variable.
Spark Shared Variables
• Accumulators
• are variables that are only “added” to through an associative operation and can therefore, be efficiently supported in parallel.
• Can be used to implement counters (as in MapReduce) or sums.
• Spark natively supports accumulators of numeric types, and programmers can add support for new types.
• If accumulators are created with a name, they will be displayed in Spark’s UI.
• An accumulator is created from an initial value v by calling SparkContext.accumulator(v).
• Tasks running on the cluster can then add to it using the add method or the += operator (in Scala and Python).
However, they cannot read its value.
• Only the driver program can read the accumulator’s value, using its value method.
• The code given below shows an accumulator being used to add up the elements of an array −
• scala> val accum = sc.accumulator(0)
• scala> sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)
• If you want to see the output of above code then use the following command −
• scala> accum.value
• Output
• res2: Int = 10
Spark numeric RDD Operations
• Spark allows different operations on numeric data, using one of the
predefined API methods.
• Numeric operations are implemented with a streaming algorithm that
allows building the model, one element at a time.
• These operations are computed and returned as
a StatusCounter object by calling status() method.
• numeric methods available in StatusCounter
Spark numeric RDD Operations
• count()
• Number of elements in the RDD.
• Mean()
• Average of the elements in the RDD.
• Sum()
• Total value of the elements in the RDD.
• Max()
• Maximum value among all elements in the RDD.
• Min()
• Minimum value among all elements in the RDD.
• Variance()
• Variance of the elements.
• Stdev()
• Standard deviation.
Spark SQL Architecture
Spark SQL
• Apache Spark SQL
• is a Spark module to work with structured data using DataFrame and DataSet
abstractions in Python, Java, and Scala.
• Abstractions are the distributed collection of data organized into named
columns.
• can query data, both from inside a Spark program and from external tools
that connect through standard database connectors (JDBC/ODBC) to Spark
SQL.
• The data can be read and written in a variety of structured formats. For
example, JSON, Hive Tables, and Parquet.
Spark SQL DataFrames
• Limitations with RDDs.
• When working with structured data, there was no inbuilt optimization engine.
• On the basis of attributes, the developer optimized each RDD.
• No provision to handle structured data.
• The DataFrame overcomes these limitations of RDD
• It is a distributed collection of data ordered into named columns.
• Concept wise it is equal to the table in a relational database or a data frame in R/Python
• can create DataFrame using:
• Structured data files
• Tables in Hive
• External databases
• Using existing RDD
Spark SQL DataFrames
• Ability to process the data in the size of Kilobytes to Petabytes on a
single node cluster to large cluster.
• Supports different data formats (Avro, csv, elastic search, and
Cassandra) and storage systems (HDFS, HIVE tables, mysql, etc).
• State of art optimization and code generation through the Spark SQL
Catalyst optimizer (tree transformation framework).
• Can be easily integrated with all Big Data tools and frameworks via
Spark-Core.
• Provides API for Python, Java, Scala, and R Programming
DataFrame Operations
• SQLContext is a class and is used for initializing the functionalities of Spark SQL.
SparkContext class object (sc) is required for initializing SQLContext class object.
• $ spark-shell
• scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
• scala> val dfs = sqlContext.read.json("employee.json")
• dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
• scala> dfs.show()
• scala> dfs.printSchema()
• |-- age: string (nullable = true)
• |-- id: string (nullable = true)
• |-- name: string (nullable = true)
• scala> dfs.select("name").show()
• scala> dfs.filter(dfs("age") > 23).show()
• scala> dfs.groupBy("age").count().show()
Spark SQL Dataset
•  is an interface added in version Spark 1.6.
• is a distributed collection of data.
• An encoder is a concept that does conversion between JVM objects
and tabular representation.
A Dataset can be made using JVM objects and after that, it can be
manipulated using functional transformations (map, filter etc.).
• The Dataset API is accessible in Scala and Java. Dataset API is not
supported by Python.
Spark Catalyst Optimizer
• The optimizer used by Spark SQL
• It optimizes all the queries written in Spark SQL and DataFrame DSL.
• The optimizer helps us to run queries much faster than their counter
RDD part. This increases the performance of the system.
• Spark Catalyst is a library built as a rule-based system.
• And each rule focusses on the specific optimization.
• For example, ConstantFolding focus on eliminating constant
expression from the query.
Advantages of Spark SQL
• Integrated
• Apache Spark SQL mixes SQL queries with Spark programs.
• With the help of Spark SQL, we can query structured data as a distributed dataset (RDD).
• can run SQL queries alongside complex analytic algorithms using tight integration property of Spark SQL.
• Unified Data Access
• can load and query data from different sources.
• The Schema-RDDs lets single interface to productively work structured data
• High compatibility
• can run unmodified Hive queries on existing warehouses.
• Standard Connectivity
• can connect through JDBC or ODBC.
• It includes server mode with industry standard JDBC and ODBC connectivity.
• Scalability
• To support mid-query fault tolerance and large jobs, it takes advantage of RDD model.
• It uses the same engine for interactive and long queries.
• Performance Optimization
• The query optimization engine in Spark SQL converts each SQL query to a logical plan.
• Further, it converts to many physical execution plans.
• Among the entire plan, it selects the most optimal physical plan for execution.
• For batch processing of Hive tables
• We can make use of Spark SQL for fast batch processing of Hive tables.
Disadvantages of Spark SQL
• Unsupportive Union type
• cannot create or read a table containing union fields.
• No error for oversize of varchar type
• Even if the inserted value exceeds the size limit, no error will occur.
• The same data will truncate if read from Hive but not if read from Spark.
• SparkSQL will consider varchar as a string, meaning there is no size limit.
• No support for transactional table
• Hive transactions are not supported by Spark SQL.
• Unsupportive Char type
• Char type (fixed-length strings) are not supported.
• Like the union, we cannot read or create a table with such fields.
• No support for time-stamp in Avro table.
Spark MLlib
• is Apache Spark’s Machine Learning component.
• scalable machine learning library consisting of common learning algorithms and
utilities
• including classification, regression, clustering, collaborative filtering, dimensionality reduction
• as well as underlying optimization primitives
• Spark
• Is becoming the de-facto platform for building machine learning algorithms and applications.
• has the ability to scale computation massively, and that is exactly what you need for machine
learning algorithms.
• all machine learning algorithms cannot be effectively parallelized.
• Each algorithm has its own challenges for parallelization, whether it is task parallelism or data
parallelism.
Spark ML supports
• Supervised Learning: 
• input variables (x) and an output variable (Y) and you use an algorithm to learn the
mapping function from the input to the output.
• Unsupervised Learning: 
• used to draw inferences from datasets consisting of input data without labeled
responses.
• Reinforcement Learning: 
• A computer program interacts with a dynamic environment in which it must perform
a certain goal (such as driving a vehicle or playing a game against an opponent).
• The program is provided feedback in terms of rewards and punishments as it
navigates its problem space. 
Machine Learning Tools
MLlib Overview:

• spark.mllib
• contains the original API built on top of RDDs.
• provides higher level API built on top of DataFrames for constructing ML pipelines.
• spark.ml is the primary Machine Learning API for Spark
• Spark MLlib Tools
• ML Algorithms:
• include common learning algorithms such as classification, regression, clustering and collaborative filtering.
• Featurization:
• includes feature extraction, transformation, dimensionality reduction and selection.
• Pipelines:
• provide tools for constructing, evaluating and tuning ML Pipelines.
• Persistence:
• helps in saving and loading algorithms, models and Pipelines.
• Utilities:
• for linear algebra, statistics and data handling.
MLlib Algorithms
1. Basic Statistics
2. Regression
3. Classification
4. Recommendation System
5. Clustering
6. Dimensionality Reduction
7. Feature Extraction
8. Optimization
MLlib - Data Types
1. Local vector
• has integer-typed and 0-based indices and double-typed values
• stored on a single machine.
• Supports two types of local vectors:
• dense vector
• backed by a double array representing its entry values
• sparse vector
• backed by two parallel arrays: indices and values.
• For example
• a vector (1.0, 0.0, 3.0) can be represented in dense format as [1.0, 0.0, 3.0] or in sparse format
as (3, [0, 2], [1.0, 3.0]), where 3 is the size of the vector.
MLlib - Data Types
2. Labeled point
• is a local vector, either dense or sparse, associated with a label/response.
• are used in supervised learning algorithms.
• For binary classification, a label should be either 0 (negative) or 1 (positive).
• For multiclass classification, labels should be class indices starting from zero: 0, 1, 2, ....
3. Sparse data
• supports reading training examples stored in LIBSVM format, which is the default
format used by LIBSVM and LIBLINEAR.
• It is a text format in which each line represents a labeled sparse feature vector using
the following format:
label index1:value1 index2:value2 ...
MLlib - Data Types
4. Local matrix
• has integer-typed row and column indices and double-typed values, stored on a single machine.
• supports dense matrices, whose entry values are stored in a single double array in column major.
5. Distributed matrix
• has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs
• Three types of distributed matrices have been implemented so far.
RowMatrix
• is a row-oriented distributed matrix without meaningful row indices, e.g., a collection of feature vectors.
• It is backed by an RDD of its rows, where each row is a local vector.
• IndexedRowMatrix
• is similar to a RowMatrix but with row indices, which can be used for identifying rows and executing joins
• CoordinateMatrix
• is a distributed matrix stored in coordinate list (COO) format, backed by an RDD of its entries.
• Each entry is a tuple of (i: Long, j: Long, value: Double), where i is the row index, j is the column index, and value is the entry value.
• should be used only when both dimensions of the matrix are huge and the matrix is very sparse.
MLlib - Basic Statistics
• Summary statistics
• colStats() returns an instance of MultivariateStatisticalSummary, which
contains the column-wise max, min, mean, variance, and number of
nonzeros, as well as the total count.
• For example
• val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
• Correlations
• provides methods to calculate correlations between series.
• Depending on the type of input, two RDD[Double]s or an RDD[Vector], the
output will be a Double or the correlation Matrix respectively.
MLlib - Basic Statistics
• Stratified sampling
• sampleByKey and sampleByKeyExact, can be performed on RDD’s of key-value
pairs.
• For stratified sampling, the keys can be thought of as a label and the value as a
specific attribute.
• For example the key can be man or woman, or document ids, and the respective values can
be the list of ages of the people in the population or the list of words in the documents.
• The sampleByKey
• will flip a coin to decide whether an observation will be sampled or not, therefore requires
one pass over the data, and provides an expected sample size.
• sampleByKeyExact
• requires significant more resources than the per-stratum simple random sampling used in
sampleByKey, but will provide the exact sampling size with 99.99% confidence. Random data
generation
MLlib - Basic Statistics
• Hypothesis testing
• is a powerful tool in statistics to determine whether a result is statistically
significant, whether this result occurred by chance or not.
• currently supports Pearson’s chi-squared ( χ2) tests for goodness of fit and
independence.
• The input data types determine whether the goodness of fit or the
independence test is conducted.
• The goodness of fit test requires an input type of Vector, whereas the
independence test requires a Matrix as input.
• MLlib also supports the input type RDD[LabeledPoint] to enable feature
selection via chi-squared independence tests.
MLLib – Basic Statistics
• Random data generation
• is useful for randomized algorithms, prototyping, and performance testing.
• MLlib supports generating random RDDs with values drawn from a given
distribution: uniform, standard normal, or Poisson.

import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._
val sc: SparkContext = ...
// Generate a random double RDD that contains 1 million values drawn from the
// standard normal distribution `N(0, 1)`, evenly distributed in 10 partitions.
val u = normalRDD(sc, 1000000L, 10)
MLLib - Linear Methods
• Mathematical formulation
• can be formulated as a convex optimization problem, i.e. the task of finding a minimizer of
a convex function f that depends on a variable vector w (called weights in the code), which
has d entries
• Formally, we can write this as the optimization problem min w∈Rd f(w)
• Loss functions
• the loss functions supported hinge, logistic, squared loss
• Regularizers
• L1 & L2, The purpose  is to encourage simple models and avoid overfitting.
• Optimization
• linear methods use convex optimization methods to optimize the objective functions.
• MLlib uses two methods, SGD and L-BFGS
ML Lib
• Binary classification
• linear Support Vector Machines (SVMs)
• logistic regression.
• For both methods, MLlib supports L1 and L2 regularized variants.
• The training data set is represented by an RDD of LabeledPoint in MLlib.
• Linear Support Vector Machines (SVMs)
• The linear SVM is a standard method for large-scale classification tasks.
• with the loss function in the formulation given by the hinge loss:
• L(w;x,y):=max{0,1−ywTx}
• Logistic regression
• It is a linear method
• with the loss function in the formulation given by the logistic loss:
• L(w;x,y):=log(1+exp(−ywTx)).
• Evaluation metrics
• supports common evaluation metrics for binary classification.
• This includes precision, recall, F-measure, receiver operating characteristic (ROC), precision-recall curve, and
area under the curves (AUC).
ML Lib
• Linear least squares, Lasso, and ridge regression
• It is a linear method as described above in equation (1), with the loss function in the formulation given by the
squared loss:
• L(w;x,y):=1/2(wTx−y)2
• ridge regression uses L2 regularization
• Lasso uses L1 regularization
• Naive Bayes
• is a simple multiclass classification algorithm with the assumption of independence between every pair of features
• Within a single pass to the training data, it computes the conditional probability distribution of each feature given
label, and then it applies Bayes’ theorem to compute the conditional probability distribution of label given an
observation and use it for prediction.
• MLlib supports multinomial naive Bayes, which is typically used for document classification. Within that context,
each observation is a document and each feature represents a term whose value is the frequency of the term.
• Feature values must be nonnegative to represent term frequencies. Additive smoothing can be used by setting the
parameter λ (default to 1.0).
• For document classification, the input feature vectors are usually sparse, and sparse vectors should be supplied as
input to take advantage of sparsity. Since the training data is only used once, it is not necessary to cache it.
Ml Lib – Decision Trees
• MLlib supports decision trees for binary and multiclass classification
and for regression, using both continuous and categorical features.
• The implementation partitions data by rows, allowing distributed
training with millions of instances.
• Ensembles of trees (Random Forests and Gradient-Boosted Trees) are
also supported
• Node impurity and information gain
• The node impurity is a measure of the homogeneity of the labels at the node.
• The current implementation provides two impurity measures for classification
(Gini impurity and entropy) and one impurity measure for regression (variance).
ML Lib - Collaborative filtering
•  is commonly used for recommender systems.
• These techniques aim to fill in the missing entries of a user-item association matrix.
• supports model-based collaborative filtering, in which users and products are
described by a small set of latent factors that can be used to predict missing entries
• The implementation in MLlib has the following parameters:
• numBlocks is the number of blocks used to parallelize computation (set to -1 to auto-configure).
• rank is the number of latent factors in the model.
• iterations is the number of iterations to run.
• lambda specifies the regularization parameter in ALS.
• implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted
for implicit feedback data.
• alpha is a parameter applicable to the implicit feedback variant of ALS that governs
the baseline confidence in preference observations.
ML Lib - Clustering
• supports k-means clustering, one of the most commonly used clustering
algorithms that clusters the data points into predefined number of clusters.
• The MLlib implementation includes a parallelized variant of the k-means++
 method called kmeans||.
• The implementation in MLlib has the following parameters:
• k is the number of desired clusters.
• maxIterations is the maximum number of iterations to run.
• initializationMode specifies either random initialization or initialization via k-means||.
• runs is the number of times to run the k-means algorithm (k-means is not guaranteed to
find a globally optimal solution, and when run multiple times on a given dataset, the
algorithm returns the best clustering result).
• initializationSteps determines the number of steps in the k-means|| algorithm.
• epsilon determines the distance threshold within which we consider k-means to have
converged.
ML Lib – Code Block
• import findspark • train, test = data_2.randomSplit([0.7, 0.3])
• findspark.init('/opt/spark') • from pyspark.ml.regression import
• from pyspark.sql import SparkSession LinearRegression
• spark = SparkSession.builder.getOrCreate() • algo = LinearRegression(featuresCol="features",
labelCol="medv")
• data = spark.read.csv('./boston_housing.csv',
header=True, inferSchema=True) • model = algo.fit(train)
• feature_columns = data.columns[:-1] # here we • evaluation_summary = model.evaluate(test)
omit the final column • evaluation_summary.meanAbsoluteError
• from pyspark.ml.feature import VectorAssembler • evaluation_summary.rootMeanSquaredError
• assembler = • evaluation_summary.r2
VectorAssembler(inputCols=feature_columns,out
• predictions = model.transform(test)
putCol="features")
• data_2 = assembler.transform(data) • predictions.select(predictions.columns[13:]).sho
w()
ML Pipelines
• provide a uniform set of high-level APIs built on top of DataFrames that help users
create and tune practical machine learning pipelines.
• Main Concepts
• DataFrame:
• can hold a variety of data types.
• E.g., a DataFrame could have different columns storing text, feature vectors, true labels, and predictions.
• Transformer:
• is an algorithm which can transform one DataFrame into another DataFrame.
• implements a method transform(), which converts one DataFrame into another, generally by appending
one or more columns.
• E.g., an ML model is a Transformer which transforms a DataFrame with features into a DataFrame with
predictions.
• Estimator:
• is an algorithm which can be fit on a DataFrame to produce a Transformer.
• implements a method fit(), which accepts a DataFrame and produces a Model, which is a Transformer.
• E.g., a learning algorithm is an Estimator which trains on a DataFrame and produces a model.
ML Pipelines
• Properties of pipeline components
• Transformer.transform()s and Estimator.fit()s are both stateless.
• Each instance of a Transformer or Estimator has a unique ID, which is useful in
specifying parameters.
• Pipeline:
• chains multiple Transformers and Estimators together to specify an ML workflow.
•  E.g., a simple text document processing workflow might include several stages:
• Split each document’s text into words.
• Convert each document’s words into a numerical feature vector.
• Learn a prediction model using the feature vectors and labels.
• Parameter:
• All Transformers and Estimators now share a common API for specifying parameters.
Working of Pipelines

• Top row represents a Pipeline with three stages.


• Bottom row represents data flowing through the pipeline, where cylinders indicate DataFrames.
• Pipeline.fit() method is called on the original DataFrame, which has raw text documents and labels.
• Tokenizer.transform() method splits the raw text documents into words, adding a new column with words
to the DataFrame.
• HashingTF.transform() method converts the words column into feature vectors, adding a new column
with those vectors to the DataFrame.
• LogisticRegression is an Estimator, the Pipeline first calls LogisticRegression.fit() to produce a
LogisticRegressionModel.
• If the Pipeline had more Estimators, it would call the LogisticRegressionModel’s transform() method on
the DataFrame before passing the DataFrame to the next stage.
Working of Pipelines

• A Pipeline is an Estimator. Thus, after a Pipeline’s fit() method runs, it produces a PipelineModel, which is a Transformer.
• PipelineModel is used at test time
• PipelineModel has the same number of stages as the original Pipeline, but all Estimators in the original Pipeline have
become Transformers.
• When the PipelineModel’s transform() method is called on a test dataset, the data are passed through the fitted pipeline
in order.
• Each stage’s transform() method updates the dataset and passes it to the next stage.
• Pipelines and PipelineModels help to ensure that training and test data go through identical feature processing steps.
ML Lib - Feature Extraction and
Transformation
• TF-IDF
• is a feature vectorization method widely used in text mining to reflect the importance of
a term to a document in the corpus.
• Denote a term by t, a document by d, and the corpus by D.
• Term frequency TF(t,d) is the number of times that term t appears in document
• document frequency DF(t,D) is the number of documents that contains term t.
• Word2Vec
• is an Estimator which takes sequences of words representing documents
and trains a Word2VecModel.
• The model maps each word to a unique fixed-size vector.
• Word2VecModel transforms each document into a vector using the average of all words
in the document; this vector can then be used as features for prediction, document
similarity calculations, etc.
ML Lib - Feature Extraction and
Transformation
• CountVectorizer and CountVectorizerModel aim to help convert a collection of
text documents to vectors of token counts.
• When an a-priori dictionary is not available, CountVectorizer can be used as an
Estimator to extract the vocabulary, and generates a CountVectorizerModel.
• The model produces sparse representations for the documents over the
vocabulary, which can then be passed to other algorithms like LDA.
• During the fitting process, CountVectorizer will select the top vocabSize words
ordered by term frequency across the corpus.
• An optional parameter minDF also affects the fitting process by specifying the
minimum number (or fraction if < 1.0) of documents a term must appear in to
be included in the vocabulary.
Code Example
import org.apache.spark.ml.classification.LogisticRegression val model2 = lr.fit(training, paramMapCombined)
import org.apache.spark.ml.linalg.{Vector, Vectors} println("Model 2 was fit using parameters: " +
import org.apache.spark.ml.param.ParamMap model2.parent.extractParamMap)
import org.apache.spark.sql.Row
val training = spark.createDataFrame(Seq(
val test = spark.createDataFrame(Seq(
(1.0, Vectors.dense(0.0, 1.1, 0.1)), (1.0, Vectors.dense(-1.0, 1.5, 1.3)),
(0.0, Vectors.dense(2.0, 1.0, -1.0)), (0.0, Vectors.dense(3.0, 2.0, -0.1)),
(0.0, Vectors.dense(2.0, 1.3, 1.0)),
(1.0, Vectors.dense(0.0, 1.2, -0.5)))).toDF("label", "features") (1.0, Vectors.dense(0.0, 2.2, -1.5))
val lr = new LogisticRegression() )).toDF("label", "features")
println("LogisticRegression parameters:\n" + lr.explainParams() + "\n")
model2.transform(test)
lr.setMaxIter(10)
.setRegParam(0.01) .select("features", "label", "myProbability",
val model1 = lr.fit(training) "prediction")
println("Model 1 was fit using parameters: " + .collect()
model1.parent.extractParamMap)
.foreach { case Row(features: Vector, label: Double,
val paramMap = ParamMap(lr.maxIter -> 20)
prob: Vector, prediction: Double) =>
.put(lr.maxIter, 30)
.put(lr.regParam -> 0.1, lr.threshold -> 0.55) println(s"($features, $label) -> prob=$prob,
val paramMap2 = ParamMap(lr.probabilityCol -> "myProbability") prediction=$prediction")
val paramMapCombined = paramMap ++ paramMap2 }
Spark Components
GraphX
• is the new (alpha) Spark API for graphs and graph-parallel
computation.
• extends the Spark RDD by introducing the Resilient Distributed
Property Graph:
• a directed multigraph with properties attached to each vertex and edge.
• exposes a set of fundamental operators (e.g., subgraph, joinVertices, and
mapReduceTriplets) as well as an optimized variant of the Pregel API.
• includes a growing collection of graph algorithms and builders to simplify
graph analytics tasks.
Typical graph Pipeline
• GraphX project
• is to unify graph-parallel and data-
parallel computation in one
system with a single composable
API.
• enables users to view data both as
a graph and as collections (i.e.,
RDDs) without data movement or
duplication.
• By incorporating recent advances
in graph-parallel systems, it is able
to optimize the execution of graph
operations.
Property Graph • // Assume the SparkContext has already been constructed
• val sc: SparkContext
• // Create an RDD for the vertices
• val users: RDD[(VertexId, (String, String))] =
• sc.parallelize(Array((3L, ("rxin", "student")), (7L,
("jgonzal", "postdoc")),
• (5L, ("franklin", "prof")), (2L, ("istoica",
"prof"))))
• // Create an RDD for edges
• val relationships: RDD[Edge[String]] =
• sc.parallelize(Array(Edge(3L, 7L, "collab"), Edge(5L, 3L,
"advisor"),
• Edge(2L, 5L, "colleague"), Edge(5L, 7L, "pi")))
• // Define a default user in case there are relationship with
missing user
• val defaultUser = ("John Doe", "Missing")
• // Build the initial Graph
• val graph = Graph(users, relationships, defaultUser)
PageRank
• // Load the edges as a graph
• val graph = GraphLoader.edgeListFile(sc,
"graphx/data/followers.txt")
• measures the importance of each vertex in a • // Run PageRank
graph, assuming an edge from u to v represents • val ranks = graph.pageRank(0.0001).vertices
an endorsement of v’s importance by u.
• // Join the ranks with the usernames
• For example, if a Twitter user is followed by many • val users = sc.textFile("graphx/data/users.txt").map {
others, the user will be ranked highly. line =>
• val fields = line.split(",")
• GraphX comes with static and dynamic • (fields(0).toLong, fields(1))
implementations of PageRank as methods on the
•}
PageRank object.
• Static PageRank runs for a fixed number of iterations • val ranksByUsername = users.join(ranks).map {
• Dynamic PageRank runs until the ranks converge (i.e., • case (id, (username, rank)) => (username, rank)
stop changing by more than a specified tolerance).
•}
• GraphOps allows calling these algorithms directly • // Print the result
as methods on Graph.
• println(ranksByUsername.collect().mkString("\n"))
• To process the data, most traditional stream
processing systems are designed with a
continuous operator model, which works as
follows:

• Streaming data is received from data sources (e.g.


live logs, system telemetry data, IoT device data,
etc.) into some data ingestion system like Apache
Kafka, Amazon Kinesis, etc.
• The data is then processed in parallel on a cluster.
• Results are given to downstream systems like
HBase, Cassandra, Kafka, etc.
• Working
• There is a set of worker nodes, each of which runs one
or more continuous operators.
• Each continuous operator processes the streaming
data one record at a time and forwards the records to
other operators in the pipeline.
• Data is received from ingestion systems via Source
operators and given as output to downstream systems
via sink operators.
Advantages of Spark Stream
• Dynamic load balancing
• Dividing the data into small micro-batches allows for fine-grained allocation of computations to resources
• In the traditional record-at-a-time approach, if one of the partitions is more computationally intensive than others, the node to
which that partition is assigned will become a bottleneck and slow down the pipeline.
• The job’s tasks will be naturally load balanced across the workers where some workers will process a few longer tasks while others
will process more of the shorter tasks in Spark Streaming.
• Fast failure and straggler recovery
• Traditional systems have to restart the failed operator on another node to recompute the lost information in case of node failure.
• Only one node is handling the recomputation due to which the pipeline cannot proceed until the new node has caught up after the
replay.

• In Spark, the computation discretizes into small tasks that can run anywhere without affecting correctness.
• So failed tasks we can distribute evenly on all the other nodes in the cluster to perform the recomputations and recover from the
failure faster than the traditional approach.
• Unification of batch, streaming and interactive analytics
• A DStream in Spark is just a series of RDDs in Spark that allows batch and streaming workloads to interoperate seamlessly.
• Apache Spark functions can be applied to each batch of streaming data.
• Since the batches of streaming data are stored in the Spark’s worker memory, it can be interactively queried on demand.
Advantages of Spark Stream
• Advanced analytics like machine learning and interactive SQL
• Spark interoperability extends to rich libraries like MLlib (machine learning),
SQL, DataFrames, and GraphX.
• RDDs generated by DStreams can convert to DataFrames and query with SQL.
• Machine learning models generated offline with MLlib can apply to streaming
data.
• Performance
• Spark Streaming’s ability to batch data and leverage the Spark engine leads to
almost higher throughput to other streaming systems.
• Spark Streaming can achieve latencies as low as a few hundred milliseconds.

You might also like