8000 spark recap · coder-RT/spark-optimization@b4d4aa0 · GitHub
[go: up one dir, main page]

Skip to content

Commit b4d4aa0

Browse files
spark recap
1 parent 9e5f127 commit b4d4aa0

File tree

1 file changed

+98
-0
lines changed

1 file changed

+98
-0
lines changed
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
package part1recap
2+
3+
import org.apache.spark.rdd.RDD
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.functions._
6+
7+
object SparkRecap {
8+
9+
// the entry point to the Spark structured API
10+
val spark = SparkSession.builder()
11+
.appName("Spark Recap")
12+
.master("local[2]")
13+
.getOrCreate()
14+
15+
// read a DF
16+
val cars = spark.read
17+
.format("json")
18+
.option("inferSchema", "true")
19+
.load("src/main/resources/data/cars")
20+
21+
import spark.implicits._
22+
23+
// select
24+
val usefulCarsData = cars.select(
25+
col("Name"), // column object
26+
$"Year", // another column object (needs spark implicits)
27+
(col("Weight_in_lbs") / 2.2).as("Weight_in_kg"),
28+
expr("Weight_in_lbs / 2.2").as("Weight_in_kg_2")
29+
)
30+
31+
val carsWeights = cars.selectExpr("Weight_in_lbs / 2.2")
32+
33+
// filter
34+
val europeanCars = cars.where(col("Origin") =!= "USA")
35+
36+
// aggregations
37+
val averageHP = cars.select(avg(col("Horsepower")).as("average_hp")) // sum, meam, stddev, min, max
38+
39+
// grouping
40+
val countByOrigin = cars
41+
.groupBy(col("Origin")) // a RelationalGroupedDataset
42+
.count()
43+
44+
// joining
45+
val guitarPlayers = spark.read
46+
.option("inferSchema", "true")
47+
.json("src/main/resources/data/guitarPlayers")
48+
49+
val bands = spark.read
50+
.option("inferSchema", "true")
51+
.json("src/main/resources/data/bands")
52+
53+
val guitaristsBands = guitarPlayers.join(bands, guitarPlayers.col("band") === bands.col("id"))
54+
/*
55+
join types
56+
- inner: only the matching rows are kept
57+
- left/right/full outer join
58+
- semi/anti
59+
*/
60+
61+
// datasets = typed distributed collection of objects
62+
case class GuitarPlayer(id: Long, name: String, guitars: Seq[Long], band: Long)
63+
val guitarPlayersDS = guitarPlayers.as[GuitarPlayer] // needs spark.implicits
64+
guitarPlayersDS.map(_.name)
65+
66+
// Spark SQL
67+
cars.createOrReplaceTempView("cars")
68+
val americanCars = spark.sql(
69+
"""
70+
|select Name from cars where Origin = 'USA'
71+
""".stripMargin
72+
)
73+
74+
// low-level API: RDDs
75+
val sc = spark.sparkContext
76+
val numbersRDD: RDD[Int] = sc.parallelize(1 to 1000000)
77+
78+
// functional operators
79+
val doubles = numbersRDD.map(_ * 2)
80+
81+
// RDD -> DF
82+
val numbersDF = numbersRDD.toDF("number") // you lose type info, you get SQL capability
83+
84+
// RDD -> DS
85+
val numbersDS = spark.createDataset(numbersRDD)
86+
87+
// DS -> RDD
88+
val guitarPlayersRDD = guitarPlayersDS.rdd
89+
90+
// DF -> RDD
91+
val carsRDD = cars.rdd // RDD[Row]
92+
93+
def main(args: Array[String]): Unit = {
94+
// showing a DF to the console
95+
cars.show()
96+
cars.printSchema()
97+
}
98+
}

0 commit comments

Comments
 (0)
0