8000 i2i transformations · coder-RT/spark-optimization@50fa66f · GitHub
[go: up one dir, main page]

Skip to content

Commit 50fa66f

Browse files
i2i transformations
1 parent 61f6944 commit 50fa66f

File tree

1 file changed

+79
-0
lines changed

1 file changed

+79
-0
lines changed
Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,79 @@
1+
package part5rddtransformations
2+
3+
import generator.DataGenerator
4+
import org.apache.spark.sql.SparkSession
5+
import scala.collection.mutable
6+
7+
object I2ITransformations {
8+
9+
val spark = SparkSession.builder()
10+
.appName("I2I Transformations")
11+
.master("local[*]")
12+
.getOrCreate()
13+
14+
val sc = spark.sparkContext
15+
16+
/*
17+
Science project
18+
each metric has identifier, value
19+
20+
Return the smallest ("best") 10 metrics (identifiers + values)
21+
*/
22+
23+
val LIMIT = 10
24+
25+
def readMetrics() = sc.textFile("src/main/resources/generated/metrics/metrics10m.txt")
26+
.map { line =>
27+
val tokens = line.split(" ")
28+
val name = tokens(0)
29+
val value = tokens(1)
30+
31+
(name, value.toDouble)
32+
}
33+
34+
def printTopMetrics() = {
35+
val sortedMetrics = readMetrics().sortBy(_._2).take(LIMIT)
36+
sortedMetrics.foreach(println)
37+
}
38+
39+
def printTopMetricsI2I() = {
40+
41+
val iteratorToIteratorTransformation = (records: Iterator[(String, Double)]) => {
42+
/*
43+
i2i transformation
44+
- they are NARROW TRANSFORMATIONS
45+
- Spark will "selectively" spill data to disk when partitions are too big for memory
46+
47+
Warning: don't traverse more than once or convert to collections
48+
*/
49+
50+
implicit val ordering: Ordering[(String, Double)] = Ordering.by[(String, Double), Double](_._2)
51+
val limitedCollection = new mutable.TreeSet[(String, Double)]()
52+
53+
records.foreach { record =>
54+
limitedCollection.add(record)
55+
if (limitedCollection.size > LIMIT) {
56+
limitedCollection.remove(limitedCollection.last)
57+
}
58+
}
59+
60+
// I've traversed the iterator
61+
62+
limitedCollection.toIterator
63+
}
64+
65+
val topMetrics = readMetrics()
66+
.mapPartitions(iteratorToIteratorTransformation)
67+
.repartition(1)
68+
.mapPartitions(iteratorToIteratorTransformation)
69+
70+
val result = topMetrics.take(LIMIT)
71+
result.foreach(println)
72+
}
73+
74+
def main(args: Array[String]): Unit = {
75+
printTopMetrics()
76+
printTopMetricsI2I()
77+
Thread.sleep(1000000)
78+
}
79+
}

0 commit comments

Comments
 (0)
0