8000 DataFrame skewed joins · coder-RT/spark-optimization@ab3ab4e · GitHub
[go: up one dir, main page]

Skip to content
8000

Commit ab3ab4e

Browse files
DataFrame skewed joins
1 parent 7e3fb58 commit ab3ab4e

File tree

3 files changed

+78
-10
lines changed

3 files changed

+78
-10
lines changed

src/main/scala/generator/DataGenerator.scala

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -32,17 +32,17 @@ object DataGenerator {
3232
// Laptop models generation - skewed data lectures
3333
/////////////////////////////////////////////////////////////////////////////////
3434

35-
val laptopModelsSet: Seq[LatopModel] = Seq(
36-
LatopModel("Razer", "Blade"),
37-
LatopModel("Alienware", "Area-51"),
38-
LatopModel("HP", "Omen"),
39-
LatopModel("Acer", "Predator"),
40-
LatopModel("Asus", "ROG"),
41-
LatopModel("Lenovo", "Legion"),
42-
LatopModel("MSI", "Raider")
35+
val laptopModelsSet: Seq[LaptopModel] = Seq(
36+
LaptopModel("Razer", "Blade"),
37+
LaptopModel("Alienware", "Area-51"),
38+
LaptopModel("HP", "Omen"),
39+
LaptopModel( 8000 "Acer", "Predator"),
40+
LaptopModel("Asus", "ROG"),
41+
LaptopModel("Lenovo", "Legion"),
42+
LaptopModel("MSI", "Raider")
4343
)
4444

45-
def randomLaptopModel(uniform: Boolean = false): LatopModel = {
45+
def randomLaptopModel(uniform: Boolean = false): LaptopModel = {
4646
val makeModelIndex = if (!uniform && random.nextBoolean()) 0 else random.nextInt(laptopModelsSet.size) // 50% of the data is of the first kind
4747
laptopModelsSet(makeModelIndex)
4848
}
Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
package generator
22

3-
case class LatopModel(make: String, model: String)
3+
case class LaptopModel(make: String, model: String)
44
case class Laptop(registration: String, make: String, model: String, procSpeed: Double)
55
case class LaptopOffer(make: String, model: String, procSpeed: Double, salePrice: Double)
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,73 @@
11
package part3dfjoins
22

3+
import generator.DataGenerator
4+
import org.apache.spark.sql.SparkSession
5+
import org.apache.spark.sql.functions._
6+
37
object SkewedJoins {
48

9+
val spark = SparkSession.builder()
10+
.appName("Skewed Joins")
11+
.master("local[*]")
12+
.config("spark.sql.autoBroadcastJoinThreshold", -1) // deactivate broadcast joins
13+
.getOrCreate()
14+
15+
import spark.implicits._
16+
17+
/*
18+
An online store selling gaming laptops.
19+
2 laptops are "similar" if they have the same make & model, but proc speed within 0.1
20+
21+
For each laptop configuration, we are interested in the average sale price of "similar" models.
22+
23+
Acer Predator 2.9Ghz aylfaskjhrw -> average sale price of all Acer Predators with CPU speed between 2.8 and 3.0 GHz
24+
*/
25+
26+
val laptops = Seq.fill(40000)(DataGenerator.randomLaptop()).toDS
27+
val laptopOffers = Seq.fill(100000)(DataGenerator.randomLaptopOffer()).toDS
28+
29+
val joined = laptops.join(laptopOffers, Seq("make", "model"))
30+
.filter(abs(laptopOffers.col("procSpeed") - laptops.col("procSpeed")) <= 0.1)
31+
.groupBy("registration")
32+
.agg(avg("salePrice").as("averagePrice"))
33+
/*
34+
== Physical Plan ==
35+
*(4) HashAggregate(keys=[registration#4], functions=[avg(salePrice#20)])
36+
+- Exchange hashpartitioning(registration#4, 200), true, [id=#99]
37+
+- *(3) HashAggregate(keys=[registration#4], functions=[partial_avg(salePrice#20)])
38+
+- *(3) Project [registration#4, salePrice#20]
39+
+- *(3) SortMergeJoin [make#5, model#6], [make#17, model#18], Inner, (abs((procSpeed#19 - procSpeed#7)) <= 0.1)
40+
:- *(1) Sort [make#5 ASC NULLS FIRST, model#6 ASC NULLS FIRST], false, 0
41+
: +- Exchange hashpartitioning(make#5, model#6, 200), true, [id=#77]
42+
: +- LocalTableScan [registration#4, make#5, model#6, procSpeed#7]
43+
+- *(2) Sort [make#17 ASC NULLS FIRST, model#18 ASC NULLS FIRST], false, 0
44+
+- Exchange hashpartitioning(make#17, model#18, 200), true, [id=#78]
45+
+- LocalTableScan [make#17, model#18, procSpeed#19, salePrice#20]
46+
*/
47+
48+
val laptops2 = laptops.withColumn("procSpeed", explode(array($"procSpeed" - 0.1, $"procSpeed", $"procSpeed" + 0.1)))
49+
val joined2 = laptops2.join(laptopOffers, Seq("make", "model", "procSpeed"))
50+
.groupBy("registration")
51+
.agg(avg("salePrice").as("averagePrice"))
52+
/*
53+
== Physical Plan ==
54+
*(4) HashAggregate(keys=[registration#4], functions=[avg(salePrice#20)])
55+
+- Exchange hashpartitioning(registration#4, 200), true, [id=#107]
56+
+- *(3) HashAggregate(keys=[registration#4], functions=[partial_avg(salePrice#20)])
57+
+- *(3) Project [registration#4, salePrice#20]
58+
+- *(3) SortMergeJoin [make#5, model#6, knownfloatingpointnormalized(normalizenanandzero(procSpeed#43))], [make#17, model#18, knownfloatingpointnormalized(normalizenanandzero(procSpeed#19))], Inner
59+
:- *(1) Sort [make#5 ASC NULLS FIRST, model#6 ASC NULLS FIRST, knownfloatingpointnormalized(normalizenanandzero(procSpeed#43)) ASC NULLS FIRST], false, 0
60+
: +- Exchange hashpartitioning(make#5, model#6, knownfloatingpointnormalized(normalizenanandzero(procSpeed#43)), 200), true, [id=#85]
61+
: +- Generate explode(array((procSpeed#7 - 0.1), procSpeed#7, (procSpeed#7 + 0.1))), [registration#4, make#5, model#6], false, [procSpeed#43]
62+
: +- LocalTableScan [registration#4, make#5, model#6, procSpeed#7]
63+
+- *(2) Sort [make#17 ASC NULLS FIRST, model#18 ASC NULLS FIRST, knownfloatingpointnormalized(normalizena 628C nandzero(procSpeed#19)) ASC NULLS FIRST], false, 0
64+
+- Exchange hashpartitioning(make#17, model#18, knownfloatingpointnormalized(normalizenanandzero(procSpeed#19)), 200), true, [id=#86]
65+
+- LocalTableScan [make#17, model#18, procSpeed#19, salePrice#20]
66+
*/
67+
68+
def main(args: Array[String]): Unit = {
69+
joined2.show()
70+
joined2.explain()
71+
Thread.sleep(1000000)
72+
}
573
}

0 commit comments

Comments
 (0)
0