8000 added multithreading to knn chunk processing · SkuaD01/scikit-learn@d74ff9f · GitHub
[go: up one dir, main page]

Skip to content

Commit d74ff9f

Browse files
committed
added multithreading to knn chunk processing
1 parent e63417c commit d74ff9f

File tree

1 file changed

+53
-45
lines changed

1 file changed

+53
-45
lines changed

sklearn/impute/_knn.py

Lines changed: 53 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# License: BSD 3 clause
44

55
import numpy as np
6+
from multiprocessing import cpu_count
7+
from joblib import Parallel, delayed
68

79
from ._base import _BaseImputer
810
from ..utils.validation import FLOAT_DTYPES
@@ -236,55 +238,61 @@ def transform(self, X):
236238
dist_idx_map = np.zeros(X.shape[0], dtype=int)
237239
dist_idx_map[row_missing_idx] = np.arange(row_missing_idx.shape[0])
238240

241+
242+
def process_chunk_col(dist_chunk, start, row_missing_chunk, col):
243+
if not valid_mask[col]:
244+
# column was all missing during training
245+
return
246+
247+
col_mask = mask[row_missing_chunk, col]
248+
if not np.any(col_mask):
249+
# column has no missing values
250+
return
251+
252+
potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col])
253+
254+
# receivers_idx are indices in X
255+
receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)]
256+
257+
# distances for samples that needed imputation for column
258+
dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start]
259+
[:, potential_donors_idx])
260+
261+
# receivers with all nan distances impute with mean
262+
all_nan_dist_mask = np.isnan(dist_subset).all(axis=1)
263+
all_nan_receivers_idx = receivers_idx[all_nan_dist_mask]
264+
265+
if all_nan_receivers_idx.size:
266+
col_mean = np.ma.array(self._fit_X[:, col],
267+
mask=mask_fit_X[:, col]).mean()
268+
X[all_nan_receivers_idx, col] = col_mean
269+
270+
if len(all_nan_receivers_idx) == len(receivers_idx):
271+
# all receivers imputed with mean
272+
return
273+
274+
# receivers with at least one defined distance
275+
receivers_idx = receivers_idx[~all_nan_dist_mask]
276+
dist_subset = (dist_chunk[dist_idx_map[receivers_idx]
277+
- start]
278+
[:, potential_donors_idx])
279+
280+
n_neighbors = min(self.n_neighbors, len(potential_donors_idx))
281+
value = self._calc_impute(
282+
dist_subset,
283+
n_neighbors,
284+
self._fit_X[potential_donors_idx, col],
285+
mask_fit_X[potential_donors_idx, col])
286+
X[receivers_idx, col] = value
287+
239288
def process_chunk(dist_chunk, start):
240289
row_missing_chunk = row_missing_idx[start:start + len(dist_chunk)]
241290

242291
# Find and impute missing by column
243-
for col in range(X.shape[1]):
244-
if not valid_mask[col]:
245-
# column was all missing during training
246-
continue
247-
248-
col_mask = mask[row_missing_chunk, col]
249-
if not np.any(col_mask):
250-
# column has no missing values
251-
continue
252-
253-
potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col])
254-
255-
# receivers_idx are indices in X
256-
receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)]
257-
258-
# distances for samples that needed imputation for column
259-
dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start]
260-
[:, potential_donors_idx])
261-
262-
# receivers with all nan distances impute with mean
263-
all_nan_dist_mask = np.isnan(dist_subset).all(axis=1)
264-
all_nan_receivers_idx = receivers_idx[all_nan_dist_mask]
265-
266-
if all_nan_receivers_idx.size:
267-
col_mean = np.ma.array(self._fit_X[:, col],
268-
mask=mask_fit_X[:, col]).mean()
269-
X[all_nan_receivers_idx, col] = col_mean
270-
271-
if len(all_nan_receivers_idx) == len(receivers_idx):
272-
# all receivers imputed with mean
273-
continue
274-
275-
# receivers with at least one defined distance
276-
receivers_idx = receivers_idx[~all_nan_dist_mask]
277-
dist_subset = (dist_chunk[dist_idx_map[receivers_idx]
278-
- start]
279-
[:, potential_donors_idx])
280-
281-
n_neighbors = min(self.n_neighbors, len(potential_donors_idx))
282-
value = self._calc_impute(
283-
dist_subset,
284-
n_neighbors,
285-
self._fit_X[potential_donors_idx, col],
286-
mask_fit_X[potential_donors_idx, col])
287-
X[receivers_idx, col] = value
292+
Parallel(n_jobs=cpu_count(), backend='threading')(delayed(process_chunk_col)(dist_chunk, start, row_missing_chunk, col) for col in range(X.shape[1]))
293+
# for col in range(X.shape[1]):
294+
# process_chunk_col(dist_chunk, start, row_missing_chunk, col)
295+
288296

289297
# process in fixed-memory chunks
290298
gen = pairwise_distances_chunked(

0 commit comments

Comments
 (0)
0