|
| 1 | +package part1recap |
| 2 | + |
| 3 | +import org.apache.spark.rdd.RDD |
| 4 | +import org.apache.spark.sql.SparkSession |
| 5 | +import org.apache.spark.sql.functions._ |
| 6 | + |
| 7 | +object SparkRecap { |
| 8 | + |
| 9 | + // the entry point to the Spark structured API |
| 10 | + val spark = SparkSession.builder() |
| 11 | + .appName("Spark Recap") |
| 12 | + .master("local[2]") |
| 13 | + .getOrCreate() |
| 14 | + |
| 15 | + // read a DF |
| 16 | + val cars = spark.read |
| 17 | + .format("json") |
| 18 | + .option("inferSchema", "true") |
| 19 | + .load("src/main/resources/data/cars") |
| 20 | + |
| 21 | + import spark.implicits._ |
| 22 | + |
| 23 | + // select |
| 24 | + val usefulCarsData = cars.select( |
| 25 | + col("Name"), // column object |
| 26 | + $"Year", // another column object (needs spark implicits) |
| 27 | + (col("Weight_in_lbs") / 2.2).as("Weight_in_kg"), |
| 28 | + expr("Weight_in_lbs / 2.2").as("Weight_in_kg_2") |
| 29 | + ) |
| 30 | + |
| 31 | + val carsWeights = cars.selectExpr("Weight_in_lbs / 2.2") |
| 32 | + |
| 33 | + // filter |
| 34 | + val europeanCars = cars.where(col("Origin") =!= "USA") |
| 35 | + |
| 36 | + // aggregations |
| 37 | + val averageHP = cars.select(avg(col("Horsepower")).as("average_hp")) // sum, meam, stddev, min, max |
| 38 | + |
| 39 | + // grouping |
| 40 | + val countByOrigin = cars |
| 41 | + .groupBy(col("Origin")) // a RelationalGroupedDataset |
| 42 | + .count() |
| 43 | + |
| 44 | + // joining |
| 45 | + val guitarPlayers = spark.read |
| 46 | + .option("inferSchema", "true") |
| 47 | + .json("src/main/resources/data/guitarPlayers") |
| 48 | + |
| 49 | + val bands = spark.read |
| 50 | + .option("inferSchema", "true") |
| 51 | + .json("src/main/resources/data/bands") |
| 52 | + |
| 53 | + val guitaristsBands = guitarPlayers.join(bands, guitarPlayers.col("band") === bands.col("id")) |
| 54 | + /* |
| 55 | + join types |
| 56 | + - inner: only the matching rows are kept |
| 57 | + - left/right/full outer join |
| 58 | + - semi/anti |
| 59 | + */ |
| 60 | + |
| 61 | + // datasets = typed distributed collection of objects |
| 62 | + case class GuitarPlayer(id: Long, name: String, guitars: Seq[Long], band: Long) |
| 63 | + val guitarPlayersDS = guitarPlayers.as[GuitarPlayer] // needs spark.implicits |
| 64 | + guitarPlayersDS.map(_.name) |
| 65 | + |
| 66 | + // Spark SQL |
| 67 | + cars.createOrReplaceTempView("cars") |
| 68 | + val americanCars = spark.sql( |
| 69 | + """ |
| 70 | + |select Name from cars where Origin = 'USA' |
| 71 | + """.stripMargin |
| 72 | + ) |
| 73 | + |
| 74 | + // low-level API: RDDs |
| 75 | + val sc = spark.sparkContext |
| 76 | + val numbersRDD: RDD[Int] = sc.parallelize(1 to 1000000) |
| 77 | + |
| 78 | + // functional operators |
| 79 | + val doubles = numbersRDD.map(_ * 2) |
| 80 | + |
| 81 | + // RDD -> DF |
| 82 | + val numbersDF = numbersRDD.toDF("number") // you lose type info, you get SQL capability |
| 83 | + |
| 84 | + // RDD -> DS |
| 85 | + val numbersDS = spark.createDataset(numbersRDD) |
| 86 | + |
| 87 | + // DS -> RDD |
| 88 | + val guitarPlayersRDD = guitarPlayersDS.rdd |
| 89 | + |
| 90 | + // DF -> RDD |
| 91 | + val carsRDD = cars.rdd // RDD[Row] |
| 92 | + |
| 93 | + def main(args: Array[String]): Unit = { |
| 94 | + // showing a DF to the console |
| 95 | + cars.show() |
| 96 | + cars.printSchema() |
| 97 | + } |
| 98 | +} |
0 commit comments