|
1 | 1 | package LOCO
|
2 | 2 |
|
3 | 3 |
|
| 4 | +import breeze.linalg.DenseVector |
4 | 5 | import org.apache.spark.rdd.RDD
|
| 6 | +import org.apache.spark.storage.StorageLevel |
5 | 7 | import org.apache.spark.{SparkConf, SparkContext}
|
6 | 8 |
|
7 | 9 | import org.apache.log4j.Logger
|
8 | 10 | import org.apache.log4j.Level
|
9 | 11 |
|
10 |
| -import preprocessingUtils.DataPoint |
| 12 | +import preprocessingUtils.FeatureVectorLP |
11 | 13 | import preprocessingUtils.loadData.load
|
12 |
| -import preprocessingUtils.loadData.load._ |
13 | 14 |
|
14 | 15 | import LOCO.solvers.runLOCO
|
15 | 16 | import LOCO.utils.LOCOUtils._
|
16 | 17 | import LOCO.utils.CVUtils
|
17 | 18 |
|
| 19 | +import scala.io.Source |
| 20 | + |
18 | 21 |
|
19 | 22 | object driver {
|
20 | 23 |
|
@@ -44,107 +47,90 @@ object driver {
|
44 | 47 | // how many partitions of the data matrix to use
|
45 | 48 | val nPartitions = options.getOrElse("nPartitions","4").toInt
|
46 | 49 | // how many executors are used
|
47 |
| - val nExecutors = options.getOrElse("nExecutors","4").toInt |
48 |
| - |
49 |
| - // "text" or "object" |
50 |
| - val dataFormat = options.getOrElse("dataFormat", "text") |
51 |
| - // "libsvm", "spaces" or "comma" |
52 |
| - val textDataFormat = options.getOrElse("textDataFormat", "spaces") |
53 |
| - // input path |
54 |
| - val dataFile = options.getOrElse("dataFile", "../data/climate_train.txt") |
55 |
| - // provide training and test set as separate files? |
56 |
| - val separateTrainTestFiles = options.getOrElse("separateTrainTestFiles", "true").toBoolean |
| 50 | + val nExecutors = options.getOrElse("nExecutors","1").toInt |
57 | 51 | // training input path
|
58 | 52 | val trainingDatafile =
|
59 |
| - options.getOrElse("trainingDatafile", "../data/climate_train.txt") |
| 53 | + options.getOrElse("trainingDatafile", "../data/climate-serialized/climate-train-colwise/") |
60 | 54 | // test input path
|
61 | 55 | val testDatafile =
|
62 |
| - options.getOrElse("testDatafile", "../data/climate_test.txt") |
63 |
| - // if only one file is provided, proportion used to test set |
64 |
| - val proportionTest = options.getOrElse("proportionTest", "0.2").toDouble |
| 56 | + options.getOrElse("testDatafile", "../data/climate-serialized/climate-test-colwise/") |
| 57 | + // response vector - training |
| 58 | + val responsePathTrain = |
| 59 | + options.getOrElse("responsePathTrain", "../data/climate-serialized/climate-responseTrain.txt") |
| 60 | + // response vector - test |
| 61 | + val responsePathTest = |
| 62 | + options.getOrElse("responsePathTest", "../data/climate-serialized/climate-responseTest.txt") |
| 63 | + // number of features |
| 64 | + val nFeatsPath = options.getOrElse("nFeats", "../data/climate-serialized/climate-nFeats.txt") |
65 | 65 | // random seed
|
66 |
| - val myseed = options.getOrElse("seed", "3").toInt |
| 66 | + val randomSeed = options.getOrElse("seed", "3").toInt |
| 67 | + // shall sparse data structures be used? |
| 68 | + val useSparseStructure = options.getOrElse("useSparseStructure", "false").toBoolean |
67 | 69 |
|
68 | 70 | // 2) specify algorithm, loss function, and optimizer (if applicable)
|
69 | 71 |
|
70 | 72 | // specify whether classification or ridge regression shall be used
|
71 | 73 | val classification = options.getOrElse("classification", "false").toBoolean
|
72 |
| - // use factorie or SDCA |
73 |
| - val optimizer = options.getOrElse("optimizer", "SDCA") |
74 | 74 | // number of iterations used in SDCA
|
75 |
| - val numIterations = options.getOrElse("numIterations", "5000").toInt |
| 75 | + val numIterations = options.getOrElse("numIterations", "20000").toInt |
76 | 76 | // set duality gap as convergence criterion
|
77 | 77 | val stoppingDualityGap = options.getOrElse("stoppingDualityGap", "0.01").toDouble
|
78 | 78 | // specify whether duality gap as convergence criterion shall be used
|
79 | 79 | val checkDualityGap = options.getOrElse("checkDualityGap", "false").toBoolean
|
80 | 80 |
|
81 | 81 | // 3) algorithm-specific inputs
|
82 | 82 |
|
83 |
| - // center features and response |
84 |
| - val center = options.getOrElse("center", "true").toBoolean |
85 |
| - // center features only |
86 |
| - val centerFeaturesOnly = options.getOrElse("centerFeaturesOnly", "false").toBoolean |
87 | 83 | // specify projection (sparse or SDCT)
|
88 |
| - val projection = options.getOrElse("projection", "sparse") |
89 |
| - // specify flag for SDCT/FFTW: 64 corresponds to FFTW_ESTIMATE, 0 corresponds to FFTW_MEASURE |
90 |
| - val flagFFTW = options.getOrElse("flagFFTW", "64").toInt |
| 84 | + val projection = options.getOrElse("projection", "SDCT") |
91 | 85 | // specify projection dimension
|
92 |
| - val nFeatsProj = options.getOrElse("nFeatsProj", "260").toInt |
| 86 | + val nFeatsProj = options.getOrElse("nFeatsProj", "389").toInt |
93 | 87 | // concatenate or add
|
94 |
| - val concatenate = options.getOrElse("concatenate", "true").toBoolean |
95 |
| - // cross validation: "global", "local", or "none" |
96 |
| - val CVKind = options.getOrElse("CVKind", "none") |
| 88 | + val concatenate = options.getOrElse("concatenate", "false").toBoolean |
| 89 | + // cross validation |
| 90 | + val CV = options.getOrElse("CV", "false").toBoolean |
97 | 91 | // k for k-fold CV
|
98 |
| - val kfold = options.getOrElse("kfold", "5").toInt |
| 92 | + val kfold = options.getOrElse("kfold", "2").toInt |
99 | 93 | // regularization parameter sequence start used in CV
|
100 |
| - val lambdaSeqFrom = options.getOrElse("lambdaSeqFrom", "65").toDouble |
| 94 | + val lambdaSeqFrom = options.getOrElse("lambdaSeqFrom", "1").toDouble |
101 | 95 | // regularization parameter sequence end used in CV
|
102 |
| - val lambdaSeqTo = options.getOrElse("lambdaSeqTo", "80").toDouble |
| 96 | + val lambdaSeqTo = options.getOrElse("lambdaSeqTo", "10").toDouble |
103 | 97 | // regularization parameter sequence step size used in CV
|
104 | 98 | val lambdaSeqBy = options.getOrElse("lambdaSeqBy", "1").toDouble
|
105 | 99 | // create lambda sequence
|
106 | 100 | val lambdaSeq = lambdaSeqFrom to lambdaSeqTo by lambdaSeqBy
|
107 | 101 | // regularization parameter to be used if CVKind == "none"
|
108 |
| - val lambda = options.getOrElse("lambda", "70").toDouble |
| 102 | + val lambda = options.getOrElse("lambda", "95").toDouble |
109 | 103 |
|
110 | 104 | // print out inputs
|
111 | 105 | println("\nSpecify input and output options: ")
|
112 |
| - println("dataFormat: " + dataFormat) |
113 |
| - if(dataFormat == "text"){ |
114 |
| - println("textDataFormat: " + textDataFormat) |
115 |
| - } |
116 |
| - println("separateTrainTestFiles: " + separateTrainTestFiles) |
117 |
| - if(separateTrainTestFiles){ |
118 |
| - println("trainingDatafile: " + trainingDatafile) |
119 |
| - println("testDatafile: " + testDatafile) |
120 |
| - }else { |
121 |
| - println("dataFile: " + dataFile) |
122 |
| - println("proportionTest: " + proportionTest) |
123 |
| - } |
| 106 | + |
| 107 | + println("trainingDatafile: " + trainingDatafile) |
| 108 | + println("responsePathTrain: " + responsePathTrain) |
| 109 | + println("testDatafile: " + testDatafile) |
| 110 | + println("responsePathTest: " + responsePathTest) |
| 111 | + println("nFeatsPath: " + nFeatsPath) |
| 112 | + println("useSparseStructure: " + useSparseStructure) |
| 113 | + |
124 | 114 | println("outdir: " + outdir)
|
125 | 115 | println("saveToHDFS: " + saveToHDFS)
|
126 |
| - println("seed: " + myseed) |
| 116 | + println("seed: " + randomSeed) |
127 | 117 |
|
128 | 118 | println("\nSpecify number of partitions, " +
|
129 | 119 | "algorithm, loss function, and optimizer (if applicable): ")
|
130 | 120 | println("nPartitions: " + nPartitions)
|
131 | 121 | println("nExecutors: " + nExecutors)
|
132 | 122 | println("classification: " + classification)
|
133 |
| - println("optimizer: " + optimizer) |
134 | 123 | println("numIterations: " + numIterations)
|
135 | 124 | println("checkDualityGap: " + checkDualityGap)
|
136 | 125 | println("stoppingDualityGap: " + stoppingDualityGap)
|
137 | 126 |
|
138 | 127 | println("\nAlgorithm-specific inputs: ")
|
139 |
| - println("center: " + center) |
140 |
| - println("centerFeaturesOnly: " + centerFeaturesOnly) |
141 | 128 | println("projection: " + projection)
|
142 |
| - println("flagFFTW: " + flagFFTW) |
143 | 129 | println("nFeatsProj: " + nFeatsProj)
|
144 | 130 | println("concatenate: " + concatenate)
|
145 |
| - println("CVKind: " + CVKind) |
| 131 | + println("CV: " + CV) |
146 | 132 | println("kfold: " + kfold)
|
147 |
| - if(CVKind != "none"){ |
| 133 | + if(CV){ |
148 | 134 | println("lambdaSeq: " + lambdaSeq)
|
149 | 135 | }else{
|
150 | 136 | println("lambda: " + lambda)
|
@@ -175,62 +161,68 @@ object driver {
|
175 | 161 | Logger.getLogger("org").setLevel(Level.WARN)
|
176 | 162 | Logger.getLogger("akka").setLevel(Level.WARN)
|
177 | 163 |
|
178 |
| - // read in training and test data, distribute over rows |
179 |
| - val (training : RDD[DataPoint], test : RDD[DataPoint]) = |
180 |
| - dataFormat match { |
181 |
| - |
182 |
| - // input files are text files |
183 |
| - case "text" => { |
184 |
| - val (training_temp, test_temp) = |
185 |
| - load.readTextFiles( |
186 |
| - sc, dataFile, nPartitions, textDataFormat, separateTrainTestFiles, |
187 |
| - trainingDatafile, testDatafile, proportionTest, myseed) |
188 |
| - |
189 |
| - // convert RDD[Array(Double)] to RDD[DataPoint] |
190 |
| - (training_temp.map(x => doubleArrayToDataPoint(x)), |
191 |
| - test_temp.map(x => doubleArrayToDataPoint(x))) |
192 |
| - } |
193 |
| - |
194 |
| - // input files are object files |
195 |
| - case "object" => |
196 |
| - load.readObjectFiles[DataPoint]( |
197 |
| - sc, dataFile, nPartitions, separateTrainTestFiles, trainingDatafile, |
198 |
| - testDatafile, proportionTest, myseed) |
199 |
| - |
200 |
| - // throw exception if another option is given |
201 |
| - case _ => throw new Error("No such data format option (use text or object)!") |
202 |
| - } |
| 164 | + // read in training and test data, distributed over columns |
| 165 | + val (training : RDD[FeatureVectorLP], test : RDD[FeatureVectorLP]) = |
| 166 | + load.readObjectFiles[FeatureVectorLP]( |
| 167 | + sc, null, nPartitions, true, trainingDatafile, |
| 168 | + testDatafile, 0.2, randomSeed) |
203 | 169 |
|
204 |
| - // if cross validation is chosen to be "global", cross-validate |
205 |
| - // targeting the global prediction error |
206 |
| - val lambdaGlobal = |
207 |
| - if(CVKind == "global"){ |
| 170 | + // repartition |
| 171 | + val trainingPartitioned: RDD[FeatureVectorLP] = |
| 172 | + training |
| 173 | + .repartition(nPartitions) |
| 174 | + .persist(StorageLevel.MEMORY_AND_DISK) |
| 175 | + |
| 176 | + // force evaluation to allow for proper timing |
| 177 | + trainingPartitioned.foreach(x => {}) |
| 178 | + |
| 179 | + // read response vectors |
| 180 | + val responseTrain = DenseVector(load.readResponse(responsePathTrain).toArray) |
| 181 | + val responseTest = DenseVector(load.readResponse(responsePathTest).toArray) |
| 182 | + |
| 183 | + // read number of features |
| 184 | + val nFeats = Source.fromFile(nFeatsPath).getLines().mkString.toInt |
| 185 | + |
| 186 | + // start timing for cross validation |
| 187 | + val CVStart = System.currentTimeMillis() |
| 188 | + |
| 189 | + // cross validation |
| 190 | + val lambdaCV = |
| 191 | + if(CV){ |
208 | 192 | CVUtils.globalCV(
|
209 |
| - sc, classification, myseed, training, center, centerFeaturesOnly, nPartitions, |
210 |
| - nExecutors, projection, flagFFTW, concatenate, nFeatsProj, lambdaSeq, kfold, optimizer, |
| 193 | + sc, classification, randomSeed, trainingPartitioned, responseTrain, nFeats, |
| 194 | + nPartitions, nExecutors, projection, useSparseStructure, |
| 195 | + concatenate, nFeatsProj, lambdaSeq, kfold, |
211 | 196 | numIterations, checkDualityGap, stoppingDualityGap)
|
212 | 197 | }else{
|
213 | 198 | lambda
|
214 | 199 | }
|
215 | 200 |
|
| 201 | + // stop timing for cross validation |
| 202 | + val CVTime = System.currentTimeMillis() - CVStart |
| 203 | + |
216 | 204 | // compute LOCO coefficients
|
217 |
| - val (betaLoco, startTime, colMeans, meanResponse) = |
| 205 | + val (betaLoco, startTime, afterRPTime, afterCommTime) = |
218 | 206 | runLOCO.run(
|
219 |
| - sc, classification, myseed, training, center, centerFeaturesOnly, nPartitions, nExecutors, |
220 |
| - projection, flagFFTW, concatenate, nFeatsProj, lambdaGlobal, CVKind, lambdaSeq, kfold, |
221 |
| - optimizer, numIterations, checkDualityGap, stoppingDualityGap) |
| 207 | + sc, classification, randomSeed, trainingPartitioned, responseTrain, nFeats, |
| 208 | + nPartitions, nExecutors, projection, useSparseStructure, |
| 209 | + concatenate, nFeatsProj, lambdaCV, |
| 210 | + numIterations, checkDualityGap, stoppingDualityGap) |
222 | 211 |
|
223 | 212 | // get second timestamp needed to time LOCO and compute time difference
|
224 | 213 | val endTime = System.currentTimeMillis
|
225 | 214 | val runTime = endTime - startTime
|
| 215 | + val RPTime = afterRPTime - startTime |
| 216 | + val communicationTime = afterCommTime - afterRPTime |
| 217 | + val restTime = runTime - RPTime - communicationTime |
226 | 218 |
|
227 | 219 | // print summary stats
|
228 | 220 | printSummaryStatistics(
|
229 |
| - sc, classification, optimizer, numIterations, startTime, runTime, |
230 |
| - betaLoco, training, test, center, centerFeaturesOnly, meanResponse, colMeans, dataFormat, |
231 |
| - separateTrainTestFiles, trainingDatafile, testDatafile, dataFile, proportionTest, nPartitions, |
232 |
| - nExecutors, nFeatsProj, projection, flagFFTW, concatenate, lambda, CVKind, lambdaSeq, kfold, |
233 |
| - myseed, lambdaGlobal, checkDualityGap, stoppingDualityGap, saveToHDFS, directoryNameResultsFolder) |
| 221 | + sc, classification, numIterations, startTime, runTime, RPTime, communicationTime, restTime, CVTime, |
| 222 | + betaLoco, trainingPartitioned, test, responseTrain, responseTest, trainingDatafile, testDatafile, |
| 223 | + responsePathTrain, responsePathTest, nPartitions, nExecutors, nFeatsProj, projection, |
| 224 | + useSparseStructure, concatenate, lambda, CV, lambdaSeq, kfold, randomSeed, lambdaCV, |
| 225 | + checkDualityGap, stoppingDualityGap, saveToHDFS, directoryNameResultsFolder) |
234 | 226 |
|
235 | 227 | // compute end time of application and compute time needed overall
|
236 | 228 | val globalEndTime = System.currentTimeMillis
|
|
0 commit comments