|
| 1 | +package part2foundations |
| 2 | + |
| 3 | +import org.apache.spark.sql.SparkSession |
| 4 | + |
| 5 | +object ReadingQueryPlans { |
| 6 | + ///////////////////////////////////////////////////////////////////// Boilerplate |
| 7 | + // you don't need this code in the Spark shell |
| 8 | + // this code is needed if you want to run it locally in IntelliJ |
| 9 | + |
| 10 | + val spark = SparkSession.builder() |
| 11 | + .config("spark.master", "local") |
| 12 | + .appName("Reading Query Plans") |
| 13 | + .getOrCreate() |
| 14 | + |
| 15 | + val sc = spark.sparkContext |
| 16 | + |
| 17 | + ///////////////////////////////////////////////////////////////////// Boilerplate |
| 18 | + |
| 19 | + // plan 1 - a simple transformation |
| 20 | + val simpleNumbers = spark.range(1, 1000000) |
| 21 | + val times5 = simpleNumbers.selectExpr("id * 5 as id") |
| 22 | + times5.explain() // this is how you show a query plan |
| 23 | + /* |
| 24 | + == Physical Plan == |
| 25 | + *(1) Project [(id#0L * 5) AS id#2L] |
| 26 | + +- *(1) Range (1, 1000000, step=1, splits=6) |
| 27 | + */ |
| 28 | + |
| 29 | + // plan 2 - a shuffle |
| 30 | + val moreNumbers = spark.range(1, 1000000, 2) |
| 31 | + val split7 = moreNumbers.repartition(7) |
| 32 | + |
| 33 | + split7.explain() |
| 34 | + /* |
| 35 | + == Physical Plan == |
| 36 | + Exchange RoundRobinPartitioning(7), false, [id=#16] |
| 37 | + +- *(1) Range (1, 1000000, step=2, splits=6) |
| 38 | + */ |
| 39 | + |
| 40 | + // plan 3 - shuffle + transformation |
| 41 | + split7.selectExpr("id * 5 as id").explain() |
| 42 | + /* |
| 43 | + == Physical Plan == |
| 44 | + *(2) Project [(id#4L * 5) AS id#8L] |
| 45 | + +- Exchange RoundRobinPartitioning(7), false, [id=#29] |
| 46 | + +- *(1) Range (1, 1000000, step=2, splits=6) |
| 47 | + */ |
| 48 | + |
| 49 | + |
| 50 | + // plan 4 - a more complex job with a join |
| 51 | + val ds1 = spark.range(1, 10000000) |
| 52 | + val ds2 = spark.range(1, 20000000, 2) |
| 53 | + val ds3 = ds1.repartition(7) |
| 54 | + val ds4 = ds2.repartition(9) |
| 55 | + val ds5 = ds3.selectExpr("id * 3 as id") |
| 56 | + val joined = ds5.join(ds4, "id") |
| 57 | + val sum = joined.selectExpr("sum(id)") |
| 58 | + sum.explain() |
| 59 | + /* |
| 60 | +
|
| 61 | + == Physical Plan == |
| 62 | + *(7) HashAggregate(keys=[], functions=[sum(id#18L)]) |
| 63 | + +- Exchange SinglePartition, true, [id=#99] |
| 64 | + +- *(6) HashAggregate(keys=[], functions=[partial_sum(id#18L)]) |
| 65 | + +- *(6) Project [id#18L] |
| 66 | + +- *(6) SortMergeJoin [id#18L], [id#12L], Inner |
| 67 | + :- *(3) Sort [id#18L ASC NULLS FIRST], false, 0 |
| 68 | + : +- Exchange hashpartitioning(id#18L, 200), true, [id=#83] |
| 69 | + : +- *(2) Project [(id#10L * 3) AS id#18L] |
| 70 | + : +- Exchange RoundRobinPartitioning(7), false, [id=#79] |
| 71 | + : +- *(1) Range (1, 10000000, step=1, splits=6) |
| 72 | + +- *(5) Sort [id#12L ASC NULLS FIRST], false, 0 |
| 73 | + +- Exchange hashpartitioning(id#12L, 200), true, [id=#90] |
| 74 | + +- Exchange RoundRobinPartitioning(9), false, [id=#89] |
| 75 | + +- *(4) Range (1, 20000000, step=2, splits=6) |
| 76 | + */ |
| 77 | + |
| 78 | +} |
0 commit comments