|
3 | 3 | # License: BSD 3 clause
|
4 | 4 |
|
5 | 5 | import numpy as np
|
| 6 | +from multiprocessing import cpu_count |
| 7 | +from joblib import Parallel, delayed |
6 | 8 |
|
7 | 9 | from ._base import _BaseImputer
|
8 | 10 | from ..utils.validation import FLOAT_DTYPES
|
@@ -236,55 +238,61 @@ def transform(self, X):
|
236 | 238 | dist_idx_map = np.zeros(X.shape[0], dtype=int)
|
237 | 239 | dist_idx_map[row_missing_idx] = np.arange(row_missing_idx.shape[0])
|
238 | 240 |
|
| 241 | + |
| 242 | + def process_chunk_col(dist_chunk, start, row_missing_chunk, col): |
| 243 | + if not valid_mask[col]: |
| 244 | + # column was all missing during training |
| 245 | + return |
| 246 | + |
| 247 | + col_mask = mask[row_missing_chunk, col] |
| 248 | + if not np.any(col_mask): |
| 249 | + # column has no missing values |
| 250 | + return |
| 251 | + |
| 252 | + potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col]) |
| 253 | + |
| 254 | + # receivers_idx are indices in X |
| 255 | + receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)] |
| 256 | + |
| 257 | + # distances for samples that needed imputation for column |
| 258 | + dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start] |
| 259 | + [:, potential_donors_idx]) |
| 260 | + |
| 261 | + # receivers with all nan distances impute with mean |
| 262 | + all_nan_dist_mask = np.isnan(dist_subset).all(axis=1) |
| 263 | + all_nan_receivers_idx = receivers_idx[all_nan_dist_mask] |
| 264 | + |
| 265 | + if all_nan_receivers_idx.size: |
| 266 | + col_mean = np.ma.array(self._fit_X[:, col], |
| 267 | + mask=mask_fit_X[:, col]).mean() |
| 268 | + X[all_nan_receivers_idx, col] = col_mean |
| 269 | + |
| 270 | + if len(all_nan_receivers_idx) == len(receivers_idx): |
| 271 | + # all receivers imputed with mean |
| 272 | + return |
| 273 | + |
| 274 | + # receivers with at least one defined distance |
| 275 | + receivers_idx = receivers_idx[~all_nan_dist_mask] |
| 276 | + dist_subset = (dist_chunk[dist_idx_map[receivers_idx] |
| 277 | + - start] |
| 278 | + [:, potential_donors_idx]) |
| 279 | + |
| 280 | + n_neighbors = min(self.n_neighbors, len(potential_donors_idx)) |
| 281 | + value = self._calc_impute( |
| 282 | + dist_subset, |
| 283 | + n_neighbors, |
| 284 | + self._fit_X[potential_donors_idx, col], |
| 285 | + mask_fit_X[potential_donors_idx, col]) |
| 286 | + X[receivers_idx, col] = value |
| 287 | + |
239 | 288 | def process_chunk(dist_chunk, start):
|
240 | 289 | row_missing_chunk = row_missing_idx[start:start + len(dist_chunk)]
|
241 | 290 |
|
242 | 291 | # Find and impute missing by column
|
243 |
| - for col in range(X.shape[1]): |
244 |
| - if not valid_mask[col]: |
245 |
| - # column was all missing during training |
246 |
| - continue |
247 |
| - |
248 |
| - col_mask = mask[row_missing_chunk, col] |
249 |
| - if not np.any(col_mask): |
250 |
| - # column has no missing values |
251 |
| - continue |
252 |
| - |
253 |
| - potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col]) |
254 |
| - |
255 |
| - # receivers_idx are indices in X |
256 |
| - receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)] |
257 |
| - |
258 |
| - # distances for samples that needed imputation for column |
259 |
| - dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start] |
260 |
| - [:, potential_donors_idx]) |
261 |
| - |
262 |
| - # receivers with all nan distances impute with mean |
263 |
| - all_nan_dist_mask = np.isnan(dist_subset).all(axis=1) |
264 |
| - all_nan_receivers_idx = receivers_idx[all_nan_dist_mask] |
265 |
| - |
266 |
| - if all_nan_receivers_idx.size: |
267 |
| - col_mean = np.ma.array(self._fit_X[:, col], |
268 |
| - mask=mask_fit_X[:, col]).mean() |
269 |
| - X[all_nan_receivers_idx, col] = col_mean |
270 |
| - |
271 |
| - if len(all_nan_receivers_idx) == len(receivers_idx): |
272 |
| - # all receivers imputed with mean |
273 |
| - continue |
274 |
| - |
275 |
| - # receivers with at least one defined distance |
276 |
| - receivers_idx = receivers_idx[~all_nan_dist_mask] |
277 |
| - dist_subset = (dist_chunk[dist_idx_map[receivers_idx] |
278 |
| - - start] |
279 |
| - [:, potential_donors_idx]) |
280 |
| - |
281 |
| - n_neighbors = min(self.n_neighbors, len(potential_donors_idx)) |
282 |
| - value = self._calc_impute( |
283 |
| - dist_subset, |
284 |
| - n_neighbors, |
285 |
| - self._fit_X[potential_donors_idx, col], |
286 |
| - mask_fit_X[potential_donors_idx, col]) |
287 |
| - X[receivers_idx, col] = value |
| 292 | + Parallel(n_jobs=cpu_count(), backend='threading')(delayed(process_chunk_col)(dist_chunk, start, row_missing_chunk, col) for col in range(X.shape[1])) |
| 293 | + # for col in range(X.shape[1]): |
| 294 | + # process_chunk_col(dist_chunk, start, row_missing_chunk, col) |
| 295 | + |
288 | 296 |
|
289 | 297 | # process in fixed-memory chunks
|
290 | 298 | gen = pairwise_distances_chunked(
|
|
0 commit comments