[go: up one dir, main page]

0% found this document useful (0 votes)
117 views43 pages

Day 4-01-Spark

This document provides an overview of Apache Spark, a general purpose framework for big data processing. Some key points: - Spark is 100 times faster than Hadoop for in-memory computation. It can interface with various distributed file systems. - Spark supports programming in Java, Python, Scala and R. Resilient Distributed Datasets (RDDs) are Spark's fundamental data structure, representing an immutable distributed collection of objects. - Transformations take an RDD and return a new one. Actions return values to the driver program. Functions passed to Spark must be serializable to distribute work across a cluster.

Uploaded by

aissamemi
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
117 views43 pages

Day 4-01-Spark

This document provides an overview of Apache Spark, a general purpose framework for big data processing. Some key points: - Spark is 100 times faster than Hadoop for in-memory computation. It can interface with various distributed file systems. - Spark supports programming in Java, Python, Scala and R. Resilient Distributed Datasets (RDDs) are Spark's fundamental data structure, representing an immutable distributed collection of objects. - Transformations take an RDD and return a new one. Actions return values to the driver program. Functions passed to Spark must be serializable to distribute work across a cluster.

Uploaded by

aissamemi
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 43

Apache Spark

Lorenzo Di Gaetano

THE CONTRACTOR IS ACTING UNDER A FRAMEWORK CONTRACT CONCLUDED WITH THE COMMISSION

Eurostat
What is Apache Spark?
• A general purpose framework for big data
processing

• It interfaces with many distributed file systems,


such as Hdfs (Hadoop Distributed File System),
Amazon S3, Apache Cassandra and many others

• 100 times faster than Hadoop for in-memory


computation
2
Eurostat
Multilanguage API
• You can write applications in various languages
• Java
• Python
• Scala
• R

• In the context of this course we will consider


Python

3
Eurostat
Built-in Libraries

4
Eurostat
Third party libraries

• Many third party libraries are available


• http://spark-packages.org/

• We used spark-csv in our examples


• We will see later how to use an external jar on
our application

5
Eurostat
Getting started with Spark!

• Spark can run on Hadoop, on the cloud and can


be also be installed as a standalone application
on your PC
• Spark comes pre-installed in all the major
Hadoop distributions
• We will consider the stand-alone installation in
this course

6
Eurostat
Prerequisites

• Python (for using pySpark) and Java must be


installed.

• All environment variables must be correctly set.

7
Eurostat
Local installation on Windows (1/2)
• Download Spark at
https://spark.apache.com/downloads.html
• Download “Pre-built for Hadoop 2.6 and later”
• Unzip it
• Set the SPARK_HOME environment variable to
point where you unzipped Spark
• Add %SPARK_HOME%/bin to your PATH

8
Eurostat
Local installation on Windows (2/2)
• Download http://public-repo-
1.hortonworks.com/hdp-win-alpha/winutils.exe
• Move it in %SPARK_HOME%/bin
• Create an enviroment variable:
HADOOP_HOME=%SPARK_HOME%
• Now test it: open a terminal and launch pySpark

9
Eurostat
PySpark up and running!

10
Eurostat
Running Spark
• Once you correctly installed spark you can use it
in two ways.

• spark-submit: it’s the CLI command you can use to


launch python spark applications

• pyspark: used to launch an interactive python shell.

11
Eurostat
Running Spark

• Using spark-submit you have to manually create


a SparkContext object in your code.

• Using the pyspark interactive shell a


SparkContext is automatically available in a
variable called sc.

12
Eurostat
What is a Spark Application?

• A Spark application is made of a Driver Program

• The Driver Program runs the the main function


which executes parallel operations on the cluster

13
Eurostat
RDD

• Spark works on RDD – Resilient Distributed


Filesystem

• A RDD is a collection of elements partitioned in


every cluster node. Spark operates in parallel on
them

14
Eurostat
RDD

• Every RDD is created from a file on Hadoop


filesystem

• They can be made persistent in memory

15
Eurostat
How to create a RDD
• There are two ways:

• Parallelizing a pre-existent collection on the driver


program

• Reading an external dataset on the filesystem

16
Eurostat
RDD from collections

• You can create a RDD from a collection using


parallelize() method on SparkContext

data = [1,2,3,4,5]
rdd = sc.parallelize(data)

• Then you can operate in parallel on rdd


17
Eurostat
RDD from external datasets

• You can create RDD from various kind of external


datasets like local filesystem, HDFS, Cassandra
etc…
• For example we can read a text file, obtaining a
collection of lines.

rdd = sc.textFile("textfile.txt")

18
Eurostat
Operations on RDD
• There are two types of operations:

• Transformations: Take an existing dataset and return


another dataset

• Actions: Take an existing dataset and return a value

19
Eurostat
Transformations

• For example, map is a transformation that takes


all elements of the dataset, pass them to a
function and returns another RDD with the
results

resultRDD = originalRDD.map(myFunction)

20
Eurostat
Actions

• For example, reduce is an action. It aggregates


all elements of the RDD using a function and
returns the result to the driver program

result = rdd.reduce(function)

21
Eurostat
Passing functions to Spark
• There are three ways

• lambda expressions: used to pass simple expressions


which return a value.

• Local function definition: for more complex code

• Functions inside a module

22
Eurostat
Example: word counting

def countWords(s):
words = s.split(" ")
return len(words)

sc = SparkContext(...)
sc.textFile("textFile.txt").map(countWords)

23
Eurostat
Shared Variables
• We always must keep in mind that when we pass
a function to a spark operation, this function is
executed on separate cluster nodes. Every node
receives a COPY of the variable inside the
function

• Every change to the local value of the variable is


not propagated to the driver program

24
Eurostat
Shared Variables
• To solve the problem spark offers two types of
shared variables:

• Broadcast variables

• Accumulators

25
Eurostat
Broadcast variables
• Instead of creating a copy of the variable for
each machine, broadcast variables allows the
programmer to keep a cached read-only variable
in every machine

• We can create a broadcast variable with the


SparkContext.broadcast(var) method, which
returns the reference of the broadcast variable

26
Eurostat
Accumulators
• Accumulators are used to keep some kind of
shared «counter» across the machines. It’s a
special variable which can be «added»

• We can create an accumulator with


SparkContext.accumulator(var) method

• Once the accumulator is created we can add


values to it with the add() method
27
Eurostat
Making RDD persistent
• Spark can persist (or cache) a dataset in memory
during operations
• If you persist an RDD, every node stores the
partitions it elaborates in memory and reuses
them in other actions which will run faster
• You can persist an RDD using the persist() or
cache() methods

28
Eurostat
Printing elements
• When working on a single machine we can simply
use rdd.foreach(println)
• But when we work in cluster mode, the println
will be executed on the executor stdout, so we
will not see anything on the driver node!
• Instead, we can rdd.take(n).foreach(println) to
print the first n elements of the collection, in
order to be sure we will not run out of memory.

29
Eurostat
Removing data

• Spark perfoms a sort of garbage collection and


automatically deletes old data partitions

• We can manually remove an RDD using the


unpersist() method

30
Eurostat
SparkSQL and DataFrames

• SparkSQL is the spark module for structured data


processing

• DataFrame API is one of the ways to interact with


SparkSQL

31
Eurostat
DataFrames
• A DataFrame is a collection of data organized into
columns

• Similar to tables in relational databases

• Can be created from various sources: structured


data files, Hive Tables, external db, csv etc…

32
Eurostat
Creating a DataFrame
• Given a SparkContext (sc), the first thing to do is
to create a SQLContext

sqlContext = SQLContext(sc)

• Then read the data, for example in JSON format:

df = sqlContext.read.json(‘file.json’)

33
Eurostat
Creating a DataFrame from csv file

• We now see how we can load a csv file into a


DataFrame using an external library called Spark
csv: https://github.com/databricks/spark-csv

• We need to download two files: spark-csv_2.11-


1.4.0.jar and commons-csv-1.2.jar

34
Eurostat
Creating a DataFrame from csv file

• When launching our code with spark-submit we


have to add external jars to the classpath,
assuming we put the jars into lib directory of our
project:

spark-submit --jars lib/spark-csv_2.11-1.4.0.jar,lib/commons-csv-1.2.jar myCode.py

35
Eurostat
Creating a DataFrame from csv file
• Then we read our file into a DataFrame

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myCsvFile, inferschema = ‘true’)

• Note the inferschema = true option. With this


option activated, Spark tries to guess the format
of every field
• We can specify the schema manually creating a
StructType array
36
Eurostat
Creating a DataFrame from csv file
• Create the schema structure
customSchema = StructType([ \
StructField("field1", IntegerType(), True),
StructField("field2", FloatType(), True), \
StructField("field3", TimestampType(), True), \
StructField("field4", StringType(), True) \
])

• And then pass it to our load function as an option

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myFile, schema = customSchema)

37
Eurostat
Example operations on DataFrames
• To show the content of the DataFrame
• df.show()
• To print the Schema of the DataFrame
• df.printSchema()
• To select a column
• df.select(‘columnName’).show()
• To filter by some parameter
• df.filter(df[‘columnName’] > N).show()

38
Eurostat
A complete example: group and avg
• We have a table like this

+----------+------+-------+--------------+------------------+-----------+-------+
|codice_pdv|prdkey| week|vendite_valore|vendite_confezioni|flag_sconto| sconto|
+----------+------+-------+--------------+------------------+-----------+-------+
| 4567|730716|2015009| 190.8500061| 196.0| 0.0|0.98991|
| 4567|730716|2013048| 174.6000061| 153.0| null| null|
| 4567|730716|2015008| 160.6000061| 165.0| 0.0|0.98951|
| 4567|730716|2015006| 171.92999268| 176.0| 0.0|0.99329|
| 4567|730716|2015005| 209.47999573| 213.0| 0.0|1.00447|
+----------+------+-------+--------------+------------------+-----------+-------+

• We want to group by prdkey and calculate the


average vendite_valore for every group

39
Eurostat
Preparing the schema
sc = SparkContext("local", "PrezzoMedio")

sqlContext = SQLContext(sc)

customSchema = StructType([ \
StructField("codice_pdv", IntegerType(), True), \
StructField("prdkey", IntegerType(), True), \
StructField("week", IntegerType(), True), \
StructField("vendite_valore", FloatType(), True), \
StructField("vendite_confezioni", FloatType(), True), \
StructField("data_ins", TimestampType(), True), \
StructField("num_riga", FloatType(), True), \
StructField("flag_sconto", FloatType(), True), \
StructField("sconto", FloatType(), True), \
StructField("idricefile", FloatType(), True), \
StructField("iddettzipfile", FloatType(), True), \
StructField("uteins", StringType(), True), \
StructField("datamod", TimestampType(), True), \
StructField("utemod", StringType(), True) \
])

40
Eurostat
Read data and do the job
myCsvFile = "C:\TransFatt1000.csv"

df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myCsvFile, schema = customSchema)

t0 = time()

averageForProductKey = df.groupBy("prdkey").avg("vendite_valore").collect()

tt = time() - t0

print(averageForProductKey)

print ("Query performed in {:03.2f} seconds.".format(tt))

41
Eurostat
With parquet table on HDFS
myParquetTable = "hdfs://bigdata-mnn.hcl.istat.it:8020///user/hive/warehouse/scanner_data.db/trans_fatt_p«

df = sqlContext.read.load(myParquetTable, schema = customSchema)

t0 = time()

averageForProductKey = df.groupBy("prdkey").avg("vendite_valore").collect()

tt = time() - t0

print(averageForProductKey)

print ("Query performed in {:03.2f} seconds.".format(tt))

42
Eurostat
References

• http://spark.apache.org/

• http://spark.apache.org/sql/

• http://spark.apache.org/docs/latest/sql-
programming-guide.html

43
Eurostat

You might also like