8000 query plans · coder-RT/spark-optimization@72c7120 · GitHub
[go: up one dir, main page]

Skip to content
65F6

Commit 72c7120

Browse files
query plans
1 parent bf3cd48 commit 72c7120

File tree

1 file changed

+78
-0
lines changed

1 file changed

+78
-0
lines changed
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
package part2foundations
2+
3+
import org.apache.spark.sql.SparkSession
4+
5+
object ReadingQueryPlans {
6+
///////////////////////////////////////////////////////////////////// Boilerplate
7+
// you don't need this code in the Spark shell
8+
// this code is needed if you want to run it locally in IntelliJ
9+
10+
val spark = SparkSession.builder()
11+
.config("spark.master", "local")
12+
.appName("Reading Query Plans")
13+
.getOrCreate()
14+
15+
val sc = spark.sparkContext
16+
17+
///////////////////////////////////////////////////////////////////// Boilerplate
18+
19+
// plan 1 - a simple transformation
20+
val simpleNumbers = spark.range(1, 1000000)
21+
val times5 = simpleNumbers.selectExpr("id * 5 as id")
22+
times5.explain() // this is how you show a query plan
23+
/*
24+
== Physical Plan ==
25+
*(1) Project [(id#0L * 5) AS id#2L]
26+
+- *(1) Range (1, 1000000, step=1, splits=6)
27+
*/
28+
29+
// plan 2 - a shuffle
30+
val moreNumbers = spark.range(1, 1000000, 2)
31+
val split7 = moreNumbers.repartition(7)
32+
33+
split7.explain()
34+
/*
35+
== Physical Plan ==
36+
Exchange RoundRobinPartitioning(7), false, [id=#16]
37+
+- *(1) Range (1, 1000000, step=2, splits=6)
38+
*/
39+
40+
// plan 3 - shuffle + transformation
41+
split7.selectExpr("id * 5 as id").explain()
42+
/*
43+
== Physical Plan ==
44+
*(2) Project [(id#4L * 5) AS id#8L]
45+
+- Exchange RoundRobinPartitioning(7), false, [id=#29]
46+
+- *(1) Range (1, 1000000, step=2, splits=6)
47+
*/
48+
49+
50+
// plan 4 - a more complex job with a join
51+
val ds1 = spark.range(1, 10000000)
52+
val ds2 = spark.range(1, 20000000, 2)
53+
val ds3 = ds1.repartition(7)
54+
val ds4 = ds2.repartition(9)
55+
val ds5 = ds3.selectExpr("id * 3 as id")
56+
val joined = ds5.join(ds4, "id")
57+
val sum = joined.selectExpr("sum(id)")
58+
sum.explain()
59+
/*
60+
61+
== Physical Plan ==
62+
*(7) HashAggregate(keys=[], functions=[sum(id#18L)])
63+
+- Exchange SinglePartition, true, [id=#99]
64+
+- *(6) HashAggregate(keys=[], functions=[partial_sum(id#18L)])
65+
+- *(6) Project [id#18L]
66+
+- *(6) SortMergeJoin [id#18L], [id#12L], Inner
67+
:- *(3) Sort [id#18L ASC NULLS FIRST], false, 0
68+
: +- Exchange hashpartitioning(id#18L, 200), true, [id=#83]
69+
: +- *(2) Project [(id#10L * 3) AS id#18L]
70+
: +- Exchange RoundRobinPartitioning(7), false, [id=#79]
71+
: +- *(1) Range (1, 10000000, step=1, splits=6)
72+
+- *(5) Sort [id#12L ASC NULLS FIRST], false, 0
73+
+- Exchange hashpartitioning(id#12L, 200), true, [id=#90]
74+
+- Exchange RoundRobinPartitioning(9), false, [id=#89]
75+
+- *(4) Range (1, 20000000, step=2, splits=6)
76+
*/
77+
78+
}

0 commit comments

Comments
 (0)
0