p553 Boehm
p553 Boehm
p553 Boehm
ABSTRACT
1.
INTRODUCTION
This work is licensed under the Creative Commons AttributionNonCommercial-NoDerivs 3.0 Unported License. To view a copy of this license, visit http://creativecommons.org/licenses/by-nc-nd/3.0/. Obtain permission prior to any use beyond those covered by the license. Contact
copyright holder by emailing info@vldb.org. Articles from this volume
were invited to present their results at the 40th International Conference on
Very Large Data Bases, September 1st - 5th 2014, Hangzhou, China.
Proceedings of the VLDB Endowment, Vol. 7, No. 7
Copyright 2014 VLDB Endowment 2150-8097/14/03.
553
DML Scripts
High-Level Operators
Compiler
Runtime
2.
Caching
Optimizations
Low-Level Operators
Control
Program
Program
Blocks
CP
Instructions
MR
Instructions
Generic MR Jobs
temp
space
Hadoop
DFS
2.1
DML Statements
Parser
Background SystemML
r (./out/r)
b(cov)
(./in/X,
106x1) X
b(/)
b(*)
u(sqrt)
u(sqrt)
b(*)
b(*)
w/ constant
propagation
w/ common
subexpression
elimination
w/o constant
folding (1.000001)
b(/)
b(cm)
b(cm)
Y (./in/Y, 106x1)
b(-)
2
1,000,000
554
2.2
The outer ParFOR loop iterates from 1 to (n 1) and computes for the first column. Due to symmetry of rX,Y , the
inner loop only iterates from (i + 1) to n in order to compute
rX,Y for all pairs of columns. In later discussions, we will
refer to R[i,j]=v and v=D[,i] as left and right indexing,
respectively. The result is an upper-triangular matrix R.
A Case for Optimization: Given a variety of use cases
(see Table 1) and workloads, there is a strong need for different parallel execution strategies. Recall our running example; if we have many small pairs, we aim for distributed
in-memory computation, but if we have few very large pairs,
we are interested in scalable data-parallel computation. Additional challenges of this example are: (1) a triangular
nested loop control structure, (2) a column-wise data access
on unordered distributed data, and (3) a bivariate all-toall data shuffling pattern. Putting it altogether, complex
ParFOR programs and ad-hoc data analysis with unknown
input characteristics require automatic optimization.
We now introduce a taxonomy of task-parallel ML programs as a basis for reasoning about classes of use cases.
In our context, data parallelism refers to operator-/DAGlevel parallelization, i.e., executing an operation on blocks
of matrices in parallel. In contrast, task parallelism refers to
program-level parallelization, i.e., executing a complex ML
program on iterations in parallel. Our taxonomy (see Table 1) employs two perspectives: model- and data-oriented.
First, the model-oriented perspective describes the MLalgorithm-specific statistical model we compute or use. Multiple (independent) models inherently exhibit large potential
for task parallelism. Examples are cross-validation (CV) or
ensemble learning (EL). There are also many use cases of
single composable models, i.e., decomposable problems and
aggregation of submodels, that could benefit from task parallelism. Example classes are (1) algorithms in statistical
query model (SQM) summation form [5], (2) partitioned
matrix factorization via alternating least squares (ALS), expectation maximization (EM), or stochastic gradient decent
(SGD) (see [10] for a comprehensive survey), and (3) treebased algorithms like decision trees or Cascade support vector machines (SVMs) [12]. Second, the data-oriented view
describes the data access characteristics of iterations, which
may use disjoint, overlapping, or all data. Those data categories define applicable optimizations such as partitioning
(disjoint/overlapping) and memoization/sharing (overlapping/all ). The bottom line is, we have a variety of use cases
with diverse computation and data access characteristics.
2.3
3.
PARALLELIZATION STRATEGIES
555
3.1
Task Partitioning
ParFOR (local)
Task partitioning groups iterations to tasks with two major but contradictory goals: (1) low task communication
overhead (via few tasks) and (2) good load balance (via
many tasks). Both aspects have high impact. For example,
on MapReduce, task setup can take seconds. However, we
need many tasks to exploit large clusters and load balance is
crucial because the most time-consuming worker determines
the overall execution time. We model a task wi W as a
logical group of one or many (sequentially executed) iterations with task size li = |wi |. Additionally, W is defined as
a set of disjoint tasks that exactly cover predicate p.
Fixed-Size Schemes: Fixed-size task partitioning creates tasks with constant size li = cl which trades communication overhead and load balance. One extreme is nave task
partitioning with minimal task sizes of li = 1 that leads to
very good load balance but high communication overhead.
Another extreme is static task partitioning with maximal
task sizes of li = dN/ke that leads to very low communication overhead but potentially poor load balance.
Self-Scheduling Schemes: Additionally, we apply the
Factoring1 self-scheduling algorithm [18] from the area of
HPC, as a simple yet very effective scheme. The basic idea
is to use waves of exponentially decaying task sizes in order
to achieve low communication overhead via large tasks at
the beginning but good load balance via few small tasks
at the end. Factoring computes the task size li for the next
wave of k tasks, based on remaining iterations Ri , as follows:
& i+1 '
R0 = N,
1
N
Ri
=
, (1)
li =
xi k
xi
k
Ri+1 = Ri k li ,
ParFOR (remote)
Task Partitioning
w5: i, {11}
w4: i, {9,10}
w3: i, {7, 8 }
Task Queue w2: i, {4,5,6}
w1: i, {1,2,3}
Local
ParWorker 1
while(w deq())
foreach pi w
execute(prog(pi))
...
Task Partitioning
Hadoop
Local
ParWorker k
ParWorker
Mapper 1
map(key,value)
...
w parse(value)
foreach pi w
execute(prog(pi))
w5: i, {11}
w4: i, {9,10}
w3: i, {7, 8 }
w2: i, {4,5,6}
w1: i, {1,2,3}
ParWorker
Mapper k
A|MATRIX|./out/A7tmp
3.2
3.3
Local Parallelism
Remote Parallelism
In order to complement the generality of local parallelism (ParFOR-L) with distributed in-memory computation, we provide a second runtime strategy: REMOTE ParFOR
(ParFOR-R). The basic concept is to execute ParFOR itself as
a single MR job and to execute its body as in-memory CP
instructions, distributed on all nodes of the cluster. This
ensures scalability for large or compute-intensive problems.
Runtime Architecture Overview: The runtime architecture of ParFOR-R is shown in Figure 3(b). First, we do
task partitioning and serialize the task sequence into a task
file on HDFS. Second, we export all dirtyi.e., updated,
in-memoryinput matrices to HDFS. Third, we serialize
the ParFOR program body, i.e., program blocks, instructions,
and referenced DML/external functions, a shallow copy of
the symbol table, as well as internal configurations and store
them in the MR job configuration. Fourth, we submit the
556
Master
Node
Cluster
ParFOR
Local
MR
Job x
MR
Job y
Master
Node
Cluster
ParFOR
Local
ParFOR
Remote
ParFOR
Remote
Cluster
ParFOR
Remote
ParFOR
Local
...
ParFOR
Local
3.5
(a) L-MR
(b) L-R-CP
(c) R-L-CP
Figure 4: Examples Hybrid Parallelism.
MR job and wait for its completion. Result matrices of individual tasks are directly written to HDFS but (varname,
filename)-tuples are collected in the job output. This ensures output flexibility and yet fault tolerance. Fifth, we
aggregate results. We now describe again selected details.
MR Job Configuration: ParFOR-R is a map-only MR
job whose input is the task file with one ParFOR task per
line and we use the NLineInputFormat in order to initiate
one map task per ParFOR task. The number of map tasks is
therefore directly controlled via task partitioning, where k
is equal to the number of map slots in the cluster.
Remote Parallel Workers behave like local workers,
except for realizing the MR mapper interface. We initialize the worker by parsing the serialized program and creating program blocks, instructions, and a symbol table with
unique file names. On each map, we parse the given task,
execute the program for all task iterations, and write result
variables to HDFS. In case of JVM reuse, we also reuse workers in order to exploit cached inputs and pre-aggregate results. Finally, we do not allow MR-job instructions (nested
MR jobs) inside a remote worker because this incurs the
danger of deadlocks if all map slots are occupied by ParFOR.
Task Scheduling is handed over to the Hadoop scheduler, which is important for several reasons. First, it provides global scheduling for (1) our task- and data-parallel
jobs, as well as (2) other MR-based applications on a shared
cluster. Second, we get MR functionality such as fault tolerance, data-locality, and an existing ecosystem. Finally, since
Hadoop executes input splits by decreasing size, we ensure
the ParFOR task order by padding with leading zeros.
3.4
After local or remote execution, we automatically consolidate all worker results, which is crucial for usability and
performance. There are two important observations from
dependency analysis. First, result variables are the dependency candidates C, i.e., updated, non-local matrices. Second, independence implies that worker results are disjoint.
Conceptually, we distinguish two scenarios, in both of which
the original result matrix R still exists due to copy-on-write.
First, if R is empty, we simply need to copy all non-zero values from all workers into the final result. An instance of this
scenario is our running example. Second, if R is non-empty,
we need to copy all (zero and non-zero) values that differ
from the original ones. An example is distributed matrix
factorization, where we iteratively modify subblocks in parallel. We call those two cases with and without compare.
We support three strategies: (1) local in-memory, (2) local
file-based, and (3) parallel remote result aggregation. Local
in-memory pins R into memory, creates the compare matrix
if necessary, and merges all worker results one-at-a-time.
Local file-based uses a virtual staging file of indexed blocks,
which can be viewed as a radix sort of blocks. It maps worker
result and compare blocks, and merges one result block at-atime. Parallel remote uses a dedicated MR job whose inputs
are the worker results and if necessary the compare matrix.
Mappers then tag blocks as data or compare, while reducers
get the compare block and then merge one block at-a-time.
3.6
Runtime Optimizations
Hybrid Parallelism
The generality of ParFOR allows us to combine parallel execution models as needed. We now exemplify hybrid parallelization strategies, where hybrid refers to combing (1) task
and data parallelism, (2) in-memory and MR computation,
as well as (3) multi-core and cluster parallelism. Finally,
hybrid strategies give us great flexibility of creating efficient
execution plans for complex ML algorithms.
Parallel MR Jobs (Figure 4(a)): If the ParFOR body
contains operations on large data, we cannot run in-memory
operations via ParFOR-R. However, ParFOR-L exploits multicore parallelism for CP and MR-job instructions, and hence
can run parallel MR jobs. This is beneficial for latency hiding and full resource exploitation. For instance, consider our
running example with a 109 5 matrix D, i.e., 10 pairs of
2 8 GB each. We would run two nested ParFOR-L and thus,
MR jobs for indexing, covariance and central moment of all
pairs in parallel. The best MR job configuration (e.g., number of reducers) depends on the ParFOR degree of parallelism
k and we might get piggybacking potential across iterations.
Mixed Nested Parallelism (Figures 4(b)/4(c)): In case
of nested ParFORas used in our running examplewhere
only the outer contains an MR-job instruction, we can use
557
Reported
Locations:
Node 2
Node 1, 2
Node 1
Node 2
Node 1
Task File
w5: i, {11}
w4: i, {9,10}
w3: i, {7, 8 }
w2: i, {4,5,6}
w1: i, {1,2,3}
Node1
Node2
D D D D D D
1 2 6 7 8 9
D D D D D
3 4 5 10 11
Partitions
Partitions
ec0
cmec = 1024 MB
ckec = 16
ParFOR
Generic
RIX
... b(cm)
ParFOR
ec1
MR
Generic
4.
...
LIX
b(cm)
b(cov)
(2)
K(r(ec)) ckec .
Thus, the goal is to minimize the execution time of the plan
trees root node T(r(P )) under the hard constraints of max (r(ec)) and maximum
imum total memory consumption M
total parallelism K(r(ec)) per execution context ec. Valid
transformations are node operator selection (et), node configuration changes (k, A), and structural changes of P .
OPTIMIZATION FRAMEWORK
Hybrid parallelization strategies give us great opportunities. However, finding the optimal parallel execution plan
is challenging because local decisions affect each other due
to shared resources and data-flow properties, which span a
huge search space. In the following, we present a tailor-made
optimization framework including (1) the problem formulation, (2) a cost model and statistics estimation, as well as
(3) plan rewrites and an optimization algorithm. The generality of this framework allows to reason about hybrid task
and data parallelism of arbitrary complex ML programs.
4.1
RIX
cmec = 600 MB
ckec = 1
Problem Formulation
During initial compilation, important matrix characteristics and most importantly the ParFOR problem size N might
be unknown. This led to the major design decision to apply ParFOR optimization as a second optimization phaseas
done in parallel DBMSs such as XPRS [15]at runtime for
558
4.2.1
4.2.2
M=(8 MB,
X d1=1M
8 MB)
d2=1
b(cm)
RIX
4.2.3
ds
0
0.2
qs
fS(qs) /
fS(ds)
=1.345
1.0
Time Estimates
Memory and time estimates for arbitrary complex programs are then aggregates of leaf node estimates.
Memory Estimates: The worst-case estimate of memory consumption for a ParFOR node is computed with
(
(ci ) et = CP
k max M
ci c(n)
M (n) =
(5)
MC
et = MR,
M=(0 MB,
8 MB)
M=(8 MB,
88 MB)
j
100
x(Qd)
d1=0, d2=0
Y d1=1M
d2=1
10
200
Scaling one-dimensional cost functions makes a fundamental independence assumption, which is important for efficient profiling but can lead to low accuracy. We therefore
use correction terms, based on ratios of number of floating point operations, e.g., for matrix shape adjustments.
This correction is crucial for high accuracy due to a shapedependent asymptotic behavior. For example, consider a
matrix multiplication AB, where each matrix has 106 cells,
i.e., 8 MB. Multiplying two 1,000 1,000 matrices requires
2 GFlop, while a dot product of 106 1 vectors requires
only 2 MFlop, i.e., a relative difference of 103 . To summarize, scaled cost functions allow us to accurately estimate
time, even for different behavior of dense/sparse operations.
Example Time Estimates: Assume a query Q: qd =
700,000, qd1 = 1,000, qd2 = 700, and qs = 0.7 for CP,
dense, transpose-self matrix multiplication X> X. Further,
assume cost functions for datasize fD (d) and sparsity fS (s),
created with squared matrices and defaults dd = 500,000
and ds = 0.5. We pick fD as the leading dimension and get
fD (qd ) = 325 ms. Then, we scale it to T(qd , qs ) = T(qd )
fS (qs )/fS (ds ) = 438 ms as shown in Figure 8. Last, we do
the correction T = T(qd , qs ) corr(Q) and get T = 366 ms.
Finally, we assign the time estimates from mapped HOPs
and instructions to plan tree leaf nodes again.
ci in(n)
b(cov)
2
4
6
8
Datasize D [100k]
fS(s)
300
Estimating time for leaf nodes of a plan tree is more challenging than memory. For accurate estimates, we need to
take runtime properties of operators into account. Our basic idea therefore relies on offline performance profiling of
runtime instructions, done once for a cluster configuration.
Performance profiling measures T of relevant instructions Op for variables V, varying one v V at-a-time. Different execution types and matrix representations are modeled as different instructions. We then create polynomial regression models as cost functions CT,Op (v) for each (v, Op)combination. The profile is the set of CT,Op (v) for all v V.
Cost Function Scaling then estimates T via this cost
profile as follows for a given request Q with v V : p(v)
Q, where fx (qx ) is a shorthand for CT,Op (x = qx ):
Y
fx (qx , A)
corr(Q). (4)
T(Q, A) = fd (qd , A)
fx (dx , A)
qd
400
d1=0, d2=0
fD(d)
M (out(n)) =
8 B d1 d2
dense
(3)
X
(n) =
(out(ci )) + M
(n, k) + M
(out(n)),
M
M
M=(0 MB,
16 MB)
600
500
400
300
200
100
0
Cost Model
4.2
M=(<output mem>,
<operation mem>)
559
/ke
parfor
dN
N
X
for,while
T(n) = wn
T(ci ), wn =
(6)
1/|c(n)|
if
ci c(n)
1
otherwise.
4.3
5.
EXPERIMENTAL EVALUATION
The aim of our evaluation is to study ParFOR parallelization strategies and its optimization for a variety of use cases.
To summarize, the major findings and insights are:
Hybrid Parallelization: ML use cases exhibit diverse workload characteristics. Complementary parallelization strategies and optimization are key to achieve high performance. The R comparisons confirmed this since each use
case required a different hand-chosen strategy. In contrast,
our optimizer generated good plans with negligible overhead.
High-Impact Rewrites: Depending on the use case,
different rewrites matter the most. For large inputs with indexed access it is data partitioning, locality and replication;
for large compute it is ParFOR-R, i.e., distributed in-memory
operations; for many iterations it is task partitioning; and
for large outputs it is result partitioning and merging.
R and Spark Comparison: Our optimizer achieved
performance comparable to pure in-memory computations
(local and distributed) on small problems, significant improvements on larger problems, and very good scalability.
Furthermore, both R and Spark required careful prevention
of out-of-memory situations (since they are pure runtimes),
while our optimizer guarantees memory constraints.
Optimization Algorithm
5.1
Experimental Setting
560
DML FOR
DML ParFORL
30
20
# cores
10
# HWT
4.6x
0
0
10
15
20
# of Threads
25
50
40
2500
DML ParFORRwRP
DML ParFORR
DML ParFOR Opt
DML ParFORL8
30
44.3 MB
20
10
> breakeven
Opt/L8: 370cols
30
60
40
50
41 h
DML FOR
DML ParFOR
2000
1500
1000
11 h
500
61 min 48 min 16 min
0
20
40
60
# of Columns
80
100
404x
For
L5
6 min
Opt
/
7.4e7** / 5.5e6** 7.0e7** / 5.7e6**
/ / ()
*
**
(d) R and Spark Comparison [s] ( reduced degree of local parallelism k = 3, estimate based on 1% sample of iterations)
Figure 9: Results of Pairwise Correlation (Descriptive Statistics).
algorithm-specific DML data generators. Furthermore, we
use a spectrum of three use cases ranging from data- to
compute-intensive. For each use case, we investigate scenarios of small (S), medium (M), and large (L) input data. All
use cases are based on real-world problems, where experiments on the real data showed similar results.
Baseline Comparisons: The baselines are (1) SystemMLs serial FOR, (2) R 2.15.1 64bit [9] (unlimited memory), incl. the parallel R packages doMC (multi-core, 1 node
8 cores) and doSNOW (cluster, type socket, 5 nodes 1/8
cores), as well as (3) Spark 0.8.0 [32] (5 16 workers, 16 GB
memory per node, 10 reducers, standalone on top of our
HDFS). For a fair comparison, both SystemML and R use
(1) binary inputs/outputs, and (2) equivalent script-level algorithms, while for Spark, we ported our runtime operators
and hand-tuned alternative runtime plans. To the best of
our knowledge, there is no publicly available MR-based ML
system on an abstraction level comparable to SystemML.
5.2
Our first use case is the data-intensive pairwise correlation (Corr) from our running example (Ex. 2), representing more complex real-world use cases of bivariate statistics
from finance and log analysis. Its characteristics are (1)
large, dense, partitionable input, (2) small output, (3) few
iterations, and (4) a simple algorithm but nested ParFOR.
Scenario S: The small scenario uses a dense 105 100
input matrix, i.e., 80 MB in dense binary format and 4,950
pairs. Figure 9(a) compares FOR and ParFOR-L for increasing numbers of threads k. For k = 1, we see that the ParFOR
overhead is negligible. We also observe good speedups with
more threads up to 4.6x at k = 16. The best speedup is
below 8x because this includes parsing/compilation (2 s),
read/write, and JIT compile. Figure 9(b) compares the plan
alternatives ParFOR-L with k = 8, ParFOR-R w/ and w/o remote partitioning (2 and 1 MR jobs), and the choice of our
optimizer (Opt) for increasing numbers of columns n. The
time differences are mainly due to MR job latency as well as
more I/O and JIT compile. Our optimizer picks ParFOR-L
up to n = 56 and then switches to ParFOR-R because it tries
to exploit k = 16 threads and hence, estimates the memory
as 730 MB, i.e., greater than the budget of 717 MB.
Scenario M: Figure 9(c) shows the results of the medium
scenario, where we increased the input size to 107 100, i.e.,
561
5.3
300
200
150
100
50
DML FOR
DML ParFORL8
DML ParFORR
DML ParFOR Opt
250
200
150
100
50
0
1e+00
1e+02
1e+04
# of Models
1e+06
1e+00
1e+02
1e+04
# of Models
1e+06
(a) Scenario S
(b) Scenario M
Scenario
DML
R
R doMC
R doSNOW
S (104 103 )
180
239
1,872 4,616 / 2,599
M (106 103 ) 238 1,631 3,034 5,737 / 3,739
L (107 103 )
574
/
(c) R Comparison [s]
Figure 10: Results of Linear Regression.
500
DML FOR
DML ParFORL8
DML ParFORR
DML ParFOR Opt
60
50
40
70
30
20
10
0
DML FOR
DML ParFORL3
DML ParFORR
DML ParFOR Opt
400
300
200
100
0
50
100
# of Models
150
50
100
# of Models
150
(a) Scenario S
(b) Scenario M
Scenario
DML
R
R doMC
R doSNOW
S (104 103 )
31
370
60
82 / 35
M (106 103 ) 239 19,869 4,621
4,041 / 1,097
L (106 105 )
619 49,860 12,355 10,492 / 2,550
(c) R Comparison [s]
Figure 11: Results of Logistic Regression.
5.3.1
300
DML FOR
DML ParFORL8
DML ParFORR
DML ParFOR Opt
250
format/operations, w/o buffer pool: (1) data-parallel (parallel indexing, cm, cov), (2) hybrid (parallel indexing, local
cm, cov), and (3) task-parallel (parallel outer loop). For
data-parallel and hybrid, we investigate different RDD storage levels (memory-only, memory-and-disk) for D (and X).
For task-parallel plans, we investigate different input transfers (task serialization, broadcast variables, and HDFS partitions via our data partitioning, replication 1). First, Spark
shows impressive performance for data-parallel and hybrid
plans. Compare Scenario M, Hybrid against SystemML FOR
from Figure 9(c) (equivalent plans): the 13x improvement is
reasoned by distributed in-memory data and fast job/task
scheduling. Second, despite fast jobs, data-parallel/hybrid
plans do not perform well for this task-parallel use case due
to many jobs (15,048/10,098 for scenarios XS, S, M). Third,
task-parallel plans with Sparks mechanisms quickly run out
of memory. With our partitioning, Spark performs very well
on Scenario M, which is comparable to ParFOR-R w/ remote
partitioning (RwRP, Figure 9(c)). On Scenario L, even this
plan fails because column partitioning runs out of memory
at groupByKey(). However, conceptually all presented techniques could be transferred to Spark as well.
Larger Clusters: We also conducted experiments on two
larger clusters of (a) 1+15 nodes (2x4 E5440 @ 2.83GHz, 4
disks), 120/60 map/reduce and (b) 1+18 nodes (1x6 E52430 @ 2.20GHz, 11 disks), 216/108 map/reduce. We see
consistent results for Scenario L of 13,009 s and 5,581 s.
Compared to the result of 17,321 s with 80 mappers, this
means linear scaling with increasing degree of parallelism.
This very good scalability allows us to effectively address
larger problems with additional hardware resources.
5.3.2
562
ParFOR Execute
Data Partitioning
Rest
15
10
30
Execution Time [min]
20
25
li=100
20
15
Task Partitioner: Task partitioning plays an important role for low communication overhead but good load
balance. Figure 12(b) compares different task partitioner
(Nave/Fixed-Size, Factoring, Static) for ParFOR-R on Scenario M of all use cases. For few iterations (Corr and Logistic, 100/150) both Nave and Factoring used minimal task
sizes of li = 1 that led to very good load balance at low
overhead and hence performed best. For many iterations
(LinReg, 106 ), even Fixed-size with a task size of li = 100
(i.e., 10,000 tasks) led to large communication overhead,
while Nave was infeasible. In contrast, Factoring showed
low communication overhead due to a logarithmic number
of tasks (1,120). Static showeddespite the lowest communication overheadsuboptimal performance in cases with
time variability per iteration (Corr, Logistic) due to load
imbalance. Hence, our optimizer applied Nave/Factoring.
Optimization Overhead: Dependency analysis is negligible because it took less than 0.1 ms on all use cases, even
for Logistic with more than 150 lines of DML. The actual
optimization time including plan tree creation was also very
low. For use cases Corr and LinRegwith plan tree sizes
of |NP | = 21 and |NP | = 11, respectivelyit was always
below 85 ms. The more complex Logistic had a plan size of
|NP | = 238 and h = 10 but also required less than 110 ms.
Naive/Fixed
Factoring
Static
10
5
0
1
2
3
4
5
Partition Replication Factor
Corr
LinReg
Use Cases
Logistic
5.4
6.
RELATED WORK
Optimizer Deep-Dive
Finally, we take a closer look at selected optimizer decisions and the actual optimization overhead itself. The use
cases are Corr, LinReg, and Logistic as introduced before.
Replication Factor: Setting the partition replication
factor r had large impact for the Corr use case. Figure 12(a)
shows the time breakdown for ParFOR-R on Scenario M with
increasing r. Partitioning time increases sublinearly due to
asynchronous replication but the execution time decreases
significantly due to nested, i.e., the quadratic number of,
partition reads. Note that together with data locality the
trend is similar but damped. Hence, our optimizer set r = 5.
563
focus on optimizing pure data flows of relational or blackbox operators, which stands in contrast to complex ML
programs. RIOT [33] already observed that mapping ML
programs to SQL queries does not always exploit the full
optimization potential. Accordingly, Zhang et al. optimized
I/O sharing for complex programs [34] but for singlenode
execution only. Second, due to the focus on black-box operators, there is a lack of analytical cost models for plan
comparisons. Cumulon [16] already presented time estimation but for data-parallel ML operations only. Our optimization framework is a first step towards both challenges
of optimizing task-parallel ML programs.
7.
CONCLUSIONS
8.
REFERENCES
564