|
| 1 | +package part3dfjoins |
| 2 | + |
| 3 | +import org.apache.spark.rdd.RDD |
| 4 | +import org.apache.spark.sql.SparkSession |
| 5 | + |
| 6 | +object JoinsRecap { |
| 7 | + |
| 8 | + val spark = SparkSession.builder() |
| 9 | + .master("local[2]") |
| 10 | + .appName("Joins Recap") |
| 11 | + .getOrCreate() |
| 12 | + |
| 13 | + val sc = spark.sparkContext |
| 14 | + |
| 15 | + val guitarsDF = spark.read |
| 16 | + .option("inferSchema", "true") |
| 17 | + .json("src/main/resources/data/guitars") |
| 18 | + |
| 19 | + val guitaristsDF = spark.read |
| 20 | + .option("inferSchema", "true") |
| 21 | + .json("src/main/resources/data/guitarPlayers") |
| 22 | + |
| 23 | + val bandsDF = spark.read |
| 24 | + .option("inferSchema", "true") |
| 25 | + .json("src/main/resources/data/bands") |
| 26 | + |
| 27 | + // inner joins |
| 28 | + val joinCondition = guitaristsDF.col("band") === bandsDF.col("id") |
| 29 | + val guitaristsBandsDF = guitaristsDF.join(bandsDF, joinCondition, "inner") |
| 30 | + |
| 31 | + // outer joins |
| 32 | + // left outer = everything in inner join + all the rows in the LEFT table, with nulls in the rows not passing the condition in the RIGHT table |
| 33 | + guitaristsDF.join(bandsDF, joinCondition, "left_outer") |
| 34 | + // right outer = everything in inner join + all the rows in the RIGHT table, with nulls in the rows not passing the condition in the LEFT table |
| 35 | + guitaristsDF.join(bandsDF, joinCondition, "right_outer") |
| 36 | + // outer join = everything in left_outer + right_outer |
| 37 | + guitaristsDF.join(bandsDF, joinCondition, "outer") |
| 38 | + |
| 39 | + // semi joins = everything in the left DF for which THERE IS a row in the right DF satisfying the condition |
| 40 | + // essentially a filter |
| 41 | + guitaristsDF.join(bandsDF, joinCondition, "left_semi") |
| 42 | + |
| 43 | + // anti join = everything in the left DF for which THERE IS NOT a row in the right DF satisfying the condition |
| 44 | + // also a filter |
| 45 | + guitaristsDF.join(bandsDF, joinCondition, "left_anti") |
| 46 | + |
| 47 | + // cross join = everything in the left table with everything in the right table |
| 48 | + // dangerous: NRows(crossjoin) = NRows(left) x NRows(right) |
| 49 | + // careful with outer joins with non-unique keys |
| 50 | + |
| 51 | + // RDD joins |
| 52 | + val colorsScores = Seq( |
| 53 | + ("blue", 1), |
| 54 | + ("red", 4), |
| 55 | + ("green", 5), |
| 56 | + ("yellow", 2), |
| 57 | + ("orange", 3), |
| 58 | + ("cyan", 0) |
| 59 | + ) |
| 60 | + val colorsRDD: RDD[(String, Int)] = sc.parallelize(colorsScores) |
| 61 | + val text = "The sky is blue, but the orange pale sun turns from yellow to red" |
| 62 | + val words = text.split(" ").map(_.toLowerCase()).map((_, 1)) // standard technique for counting words with RDDs |
| 63 | + val wordsRDD = sc.parallelize(words).reduceByKey(_ + _) // counting word occurrence |
| 64 | + val scores: RDD[(String, (Int, Int))] = wordsRDD.join(colorsRDD) // implied join type is INNER |
| 65 | + |
| 66 | + |
| 67 | + def main(args: Array[String]): Unit = { |
| 68 | + |
| 69 | + } |
| 70 | +} |
0 commit comments