Day 4-01-Spark
Day 4-01-Spark
Lorenzo Di Gaetano
THE CONTRACTOR IS ACTING UNDER A FRAMEWORK CONTRACT CONCLUDED WITH THE COMMISSION
Eurostat
What is Apache Spark?
• A general purpose framework for big data
processing
3
Eurostat
Built-in Libraries
4
Eurostat
Third party libraries
5
Eurostat
Getting started with Spark!
6
Eurostat
Prerequisites
7
Eurostat
Local installation on Windows (1/2)
• Download Spark at
https://spark.apache.com/downloads.html
• Download “Pre-built for Hadoop 2.6 and later”
• Unzip it
• Set the SPARK_HOME environment variable to
point where you unzipped Spark
• Add %SPARK_HOME%/bin to your PATH
8
Eurostat
Local installation on Windows (2/2)
• Download http://public-repo-
1.hortonworks.com/hdp-win-alpha/winutils.exe
• Move it in %SPARK_HOME%/bin
• Create an enviroment variable:
HADOOP_HOME=%SPARK_HOME%
• Now test it: open a terminal and launch pySpark
9
Eurostat
PySpark up and running!
10
Eurostat
Running Spark
• Once you correctly installed spark you can use it
in two ways.
11
Eurostat
Running Spark
12
Eurostat
What is a Spark Application?
13
Eurostat
RDD
14
Eurostat
RDD
15
Eurostat
How to create a RDD
• There are two ways:
16
Eurostat
RDD from collections
data = [1,2,3,4,5]
rdd = sc.parallelize(data)
rdd = sc.textFile("textfile.txt")
18
Eurostat
Operations on RDD
• There are two types of operations:
19
Eurostat
Transformations
resultRDD = originalRDD.map(myFunction)
20
Eurostat
Actions
result = rdd.reduce(function)
21
Eurostat
Passing functions to Spark
• There are three ways
22
Eurostat
Example: word counting
def countWords(s):
words = s.split(" ")
return len(words)
sc = SparkContext(...)
sc.textFile("textFile.txt").map(countWords)
23
Eurostat
Shared Variables
• We always must keep in mind that when we pass
a function to a spark operation, this function is
executed on separate cluster nodes. Every node
receives a COPY of the variable inside the
function
24
Eurostat
Shared Variables
• To solve the problem spark offers two types of
shared variables:
• Broadcast variables
• Accumulators
25
Eurostat
Broadcast variables
• Instead of creating a copy of the variable for
each machine, broadcast variables allows the
programmer to keep a cached read-only variable
in every machine
26
Eurostat
Accumulators
• Accumulators are used to keep some kind of
shared «counter» across the machines. It’s a
special variable which can be «added»
28
Eurostat
Printing elements
• When working on a single machine we can simply
use rdd.foreach(println)
• But when we work in cluster mode, the println
will be executed on the executor stdout, so we
will not see anything on the driver node!
• Instead, we can rdd.take(n).foreach(println) to
print the first n elements of the collection, in
order to be sure we will not run out of memory.
29
Eurostat
Removing data
30
Eurostat
SparkSQL and DataFrames
31
Eurostat
DataFrames
• A DataFrame is a collection of data organized into
columns
32
Eurostat
Creating a DataFrame
• Given a SparkContext (sc), the first thing to do is
to create a SQLContext
sqlContext = SQLContext(sc)
df = sqlContext.read.json(‘file.json’)
33
Eurostat
Creating a DataFrame from csv file
34
Eurostat
Creating a DataFrame from csv file
35
Eurostat
Creating a DataFrame from csv file
• Then we read our file into a DataFrame
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myCsvFile, inferschema = ‘true’)
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myFile, schema = customSchema)
37
Eurostat
Example operations on DataFrames
• To show the content of the DataFrame
• df.show()
• To print the Schema of the DataFrame
• df.printSchema()
• To select a column
• df.select(‘columnName’).show()
• To filter by some parameter
• df.filter(df[‘columnName’] > N).show()
38
Eurostat
A complete example: group and avg
• We have a table like this
+----------+------+-------+--------------+------------------+-----------+-------+
|codice_pdv|prdkey| week|vendite_valore|vendite_confezioni|flag_sconto| sconto|
+----------+------+-------+--------------+------------------+-----------+-------+
| 4567|730716|2015009| 190.8500061| 196.0| 0.0|0.98991|
| 4567|730716|2013048| 174.6000061| 153.0| null| null|
| 4567|730716|2015008| 160.6000061| 165.0| 0.0|0.98951|
| 4567|730716|2015006| 171.92999268| 176.0| 0.0|0.99329|
| 4567|730716|2015005| 209.47999573| 213.0| 0.0|1.00447|
+----------+------+-------+--------------+------------------+-----------+-------+
39
Eurostat
Preparing the schema
sc = SparkContext("local", "PrezzoMedio")
sqlContext = SQLContext(sc)
customSchema = StructType([ \
StructField("codice_pdv", IntegerType(), True), \
StructField("prdkey", IntegerType(), True), \
StructField("week", IntegerType(), True), \
StructField("vendite_valore", FloatType(), True), \
StructField("vendite_confezioni", FloatType(), True), \
StructField("data_ins", TimestampType(), True), \
StructField("num_riga", FloatType(), True), \
StructField("flag_sconto", FloatType(), True), \
StructField("sconto", FloatType(), True), \
StructField("idricefile", FloatType(), True), \
StructField("iddettzipfile", FloatType(), True), \
StructField("uteins", StringType(), True), \
StructField("datamod", TimestampType(), True), \
StructField("utemod", StringType(), True) \
])
40
Eurostat
Read data and do the job
myCsvFile = "C:\TransFatt1000.csv"
df = sqlContext.read.format('com.databricks.spark.csv').options(header='true',
nullValue='NULL').load(myCsvFile, schema = customSchema)
t0 = time()
averageForProductKey = df.groupBy("prdkey").avg("vendite_valore").collect()
tt = time() - t0
print(averageForProductKey)
41
Eurostat
With parquet table on HDFS
myParquetTable = "hdfs://bigdata-mnn.hcl.istat.it:8020///user/hive/warehouse/scanner_data.db/trans_fatt_p«
t0 = time()
averageForProductKey = df.groupBy("prdkey").avg("vendite_valore").collect()
tt = time() - t0
print(averageForProductKey)
42
Eurostat
References
• http://spark.apache.org/
• http://spark.apache.org/sql/
• http://spark.apache.org/docs/latest/sql-
programming-guide.html
43
Eurostat