8000 Merge pull request #5 from SkuaD01/d01-18186-multiprocessing · SkuaD01/scikit-learn@bea07c0 · GitHub
[go: up one dir, main page]

Skip to content

Commit bea07c0

Browse files
authored
Merge pull request #5 from SkuaD01/d01-18186-multiprocessing
D01 18186 multiprocessing
2 parents 7f05efa + cc73a98 commit bea07c0

File tree

5 files changed

+235
-49
lines changed

5 files changed

+235
-49
lines changed

sklearn/impute/_knn.py

Lines changed: 59 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@
33
# License: BSD 3 clause
44

55
import numpy as np
6+
from multiprocessing import cpu_count
7+
from joblib import Parallel, delayed, effective_n_jobs
68

79
from ._base import _BaseImputer
810
from ..utils.validation import FLOAT_DTYPES
@@ -99,7 +101,7 @@ class KNNImputer(_BaseImputer):
99101
@_deprecate_positional_args
100102
def __init__(self, *, missing_values=np.nan, n_neighbors=5,
101103
weights="uniform", metric="nan_euclidean", copy=True,
102-
add_indicator=False):
104+
add_indicator=False, n_jobs=cpu_count()):
103105
super().__init__(
104106
missing_values=missing_values,
105107
add_indicator=add_indicator
@@ -108,6 +110,7 @@ def __init__(self, *, missing_values=np.nan, n_neighbors=5,
108110
self.weights = weights
109111
self.metric = metric
110112
self.copy = copy
113+
self.n_jobs = n_jobs
111114

112115
def _calc_impute(self, dist_pot_donors, n_neighbors,
113116
fit_X_col, mask_fit_X_col):
@@ -236,55 +239,63 @@ def transform(self, X):
236239
dist_idx_map = np.zeros(X.shape[0], dtype=int)
237240
dist_idx_map[row_missing_idx] = np.arange(row_missing_idx.shape[0])
238241

242+
243+
def process_chunk_col(dist_chunk, start, row_missing_chunk, col):
244+
if not valid_mask[col]:
245+
# column was all missing during training
246+
return
247+
248+
col_mask = mask[row_missing_chunk, col]
249+
if not np.any(col_mask):
250+
# column has no missing values
251+
return
252+
253+
potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col])
254+
255+
# receivers_idx are indices in X
256+
receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)]
257+
258+
# distances for samples that needed imputation for column
259+
dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start]
260+
[:, potential_donors_idx])
261+
262+
# receivers with all nan distances impute with mean
263+
all_nan_dist_mask = np.isnan(dist_subset).all(axis=1)
264+
all_nan_receivers_idx = receivers_idx[all_nan_dist_mask]
265+
266+
if all_nan_receivers_idx.size:
267+
col_mean = np.ma.array(self._fit_X[:, col],
268+
mask=mask_fit_X[:, col]).mean()
269+
X[all_nan_receivers_idx, col] = col_mean
270+
271+
if len(all_nan_receivers_idx) == len(receivers_idx):
272+
# all receivers imputed with mean
273+
return
274+
275+
# receivers with at least one defined distance
276+
receivers_idx = receivers_idx[~all_nan_dist_mask]
277+
dist_subset = (dist_chunk[dist_idx_map[receivers_idx]
278+
- start]
279+
[:, potential_donors_idx])
280+
281+
n_neighbors = min(self.n_neighbors, len(potential_donors_idx))
282+
value = self._calc_impute(
283+
dist_subset,
284+
n_neighbors,
285+
self._fit_X[potential_donors_idx, col],
286+
mask_fit_X[potential_donors_idx, col])
287+
X[receivers_idx, col] = value
288+
239289
def process_chunk(dist_chunk, start):
240290
row_missing_chunk = row_missing_idx[start:start + len(dist_chunk)]
241291

242292
# Find and impute missing by column
243-
for col in range(X.shape[1]):
244-
if not valid_mask[col]:
245-
# column was all missing during training
246-
continue
247-
248-
col_mask = mask[row_missing_chunk, col]
249-
if not np.any(col_mask):
250-
# column has no missing values
251-
continue
252-
253-
potential_donors_idx, = np.nonzero(non_missing_fix_X[:, col])
254-
255-
# receivers_idx are indices in X
256-
receivers_idx = row_missing_chunk[np.flatnonzero(col_mask)]
257-
258-
# distances for samples that needed imputation for column
259-
dist_subset = (dist_chunk[dist_idx_map[receivers_idx] - start]
260-
[:, potential_donors_idx])
261-
262-
# receivers with all nan distances impute with mean
263-
all_nan_dist_mask = np.isnan(dist_subset).all(axis=1)
264-
all_nan_receivers_idx = receivers_idx[all_nan_dist_mask]
265-
266-
if all_nan_receivers_idx.size:
267-
col_mean = np.ma.array(self._fit_X[:, col],
268-
mask=mask_fit_X[:, col]).mean()
269-
X[all_nan_receivers_idx, col] = col_mean
270-
271-
if len(all_nan_receivers_idx) == len(receivers_idx):
272-
# all receivers imputed with mean
273-
continue
274-
275-
# receivers with at least one defined distance
276-
receivers_idx = receivers_idx[~all_nan_dist_mask]
277-
dist_subset = (dist_chunk[dist_idx_map[receivers_idx]
278-
- start]
279-
[:, potential_donors_idx])
280-
281-
n_neighbors = min(self.n_neighbors, len(potential_donors_idx))
282-
value = self._calc_impute(
283-
dist_subset,
284-
n_neighbors,
285-
self._fit_X[potential_donors_idx, col],
286-
mask_fit_X[potential_donors_idx, col])
287-
X[receivers_idx, col] = value
293+
if effective_n_jobs(self.n_jobs) > 1:
294+
generator = (delayed(process_chunk_col)(dist_chunk, start, row_missing_chunk, col) for col in range(X.shape[1]))
295+
Parallel(n_jobs=self.n_jobs, backend='threading')(generator)
296+
else:
297+
for col in range(X.shape[1]):
298+
process_chunk_col(dist_chunk, start, row_missing_chunk, col)
288299

289300
# process in fixed-memory chunks
290301
gen = pairwise_distances_chunked(
@@ -293,7 +304,8 @@ def process_chunk(dist_chunk, start):
293304
metric=self.metric,
294305
missing_values=self.missing_values,
295306
force_all_finite=force_all_finite,
296-
reduce_func=process_chunk)
307+
reduce_func=process_chunk,
308+
n_jobs=self.n_jobs)
297309
for chunk in gen:
298310
# process_chunk modifies X in place. No return value.
299311
pass

sklearn/impute/tests/test_knn.py

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import numpy as np
22
import pytest
3+
from multiprocessing import cpu_count
34

45
from sklearn import config_context
56
from sklearn.impute import KNNImputer
@@ -639,3 +640,27 @@ def test_knn_imputer_distance_weighted_not_enough_neighbors(na,
639640
def test_knn_tags(na, allow_nan):
640641
knn = KNNImputer(missing_values=na)
641642
assert knn._get_tags()["allow_nan"] == allow_nan
643+
644+
645+
@pytest.mark.parametrize("n", [1,2, cpu_count(), cpu_count()+1, 2*cpu_count()])
646+
def test_knn_inputer_varying_n_jobs(n):
647+
knn = KNNImputer(n_jobs=n)
648+
na = np.nan
649+
X = np.array([
650+
[1, na, 1, 1, 1.],
651+
[2, 2, 2, 2, 2],
652+
[3, 3, 3, 3, na],
653+
[6, 6, na, 6, 6],
654+
])
655+
656+
r1c2 = 11 / 3
657+
r4c3 = 2
658+
r3c5 = 3
659+
660+
X_imputed = np.array([
661+
[1, r1c2, 1, 1, 1.],
662+
[2, 2, 2, 2, 2],
663+
[3, 3, 3, 3, r3c5],
664+
[6, 6, r4c3, 6, 6],
665+
])
666+
assert_allclose(knn.fit_transform(X), X_imputed)

sklearn/impute/tests/test_time.py

Lines changed: 139 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,139 @@
1+
import numpy as np
2+
import pandas as pd
3+
from sklearn.impute import KNNImputer
4+
import time
5+
import profile
6+
import pytest
7+
8+
##################### THIS TEST SUITE WILL TAKE APPROXIMATELY 15 MINUTES, BUT VARIES BY MACHINE ############################
9+
10+
# Initialise constants
11+
epsilon = 0.75 # Seconds of breathing room for smaller sized n tests
12+
perc = [0.1,0.5,0.9] # We want to test a varying amount of missing values to impute, each of these represent the precrentage of missing values in X
13+
small_n = [5,300,500]
14+
larg 2851 e_n = [1000,3000,5000]
15+
16+
# generate random array of size x by y with (p*100)% missing values
17+
def gen_matrix(x, y, p):
18+
np.random.seed(1)
19+
X = np.random.random([x, y])
20+
X = pd.DataFrame(X).mask(X <= p)
21+
return X
22+
23+
# Run (simulated) old time or new time, return start and end times
24+
def run_test(X, old=False):
25+
start = time.time()
26+
if (old):
27+
imputer = KNNImputer(n_neighbors=5, n_jobs=1)
28+
else:
29+
imputer = KNNImputer(n_neighbors=5)
30+
imputer.fit_transform(X)
31+
end = time.time()
32+
33+
return start,end
34+
35+
def relative_assert(new, old):
36+
# Since smaller times will yield more sporatic results, an amount of seconds of amount epsilon are accounted for
37+
assert ((new == pytest.approx(old, abs=epsilon)) or (new < old))
38+
39+
def output_res(old_end,old_start,end,start):
40+
print("\nOld time:", round((old_end-old_start),4) ,", New time:", round((end-start),4),",", str(round(((old_end-old_start)/(end-start) - 1)*100, 2))+"% improved")
41+
42+
# Test arrays of size 1 by a small n with varying missing values
43+
@pytest.mark.parametrize("n,p",[(n, p) for p in perc for n in small_n])
44+
def test_time_1_by_n(n,p):
45+
X = gen_matrix(1, n, p)
46+
47+
start, end = run_test(X)
48+
old_start, old_end = run_test(X, old=True)
49+
50+
output_res(old_end,old_start,end,start)
51+
52+
relative_assert(end-start, old_end-old_start)
53+
54+
# Test arrays of size 1 by a large n with varying missing values
55+
@pytest.mark.parametrize("N,p",[(N, p) for p in perc for N in large_n])
56+
def test_time_1_by_N(N,p):
57+
X = gen_matrix(1, N, p)
58+
59+
start, end = run_test(X)
60+
old_start, old_end = run_test(X, old=True)
61+
62+
output_res(old_end,old_start,end,start)
63+
64+
relative_assert(end-start, old_end-old_start)
65+
66+
# Test arrays of a small n by 1 with varying missing values
67+
@pytest.mark.parametrize("n,p",[(n, p) for p in perc for n in small_n])
68+
def test_time_n_by_1(n,p):
69+
X = gen_matrix(n, 1, p)
70+
71+
start, end = run_test(X)
72+
old_start, old_end = run_test(X, old=True)
73+
74+
output_res(old_end,old_start,end,start)
75+
76+
relative_assert(end-start, old_end-old_start)
77+
78+
# Test arrays of a large n by 1 with varying missing values
79+
@pytest.mark.parametrize("N,p",[(N, p) for p in perc for N in large_n])
80+
def test_time_N_by_1(N,p):
81+
X = gen_matrix(N, 1, p)
82+
83+
start, end = run_test(X)
84+
old_start, old_end = run_test(X, old=True)
85+
86+
output_res(old_end,old_start,end,start)
87+
88+
relative_assert(end-start, old_end-old_start)
89+
90+
# Test arrays of a small n by a small n with varying missing values
91+
@pytest.mark.parametrize("n1,n2,p",[(n1,n2,p) for p in perc for n1 in small_n for n2 in small_n])
92+
def test_time_n_by_n(n1,n2,p):
93+
X = gen_matrix(n1, n2, p)
94+
95+
start, end = run_test(X)
96+
old_start, old_end = run_test(X, old=True)
97+
98+
output_res(old_end,old_start,end,start)
99+
100+
relative_assert(end-start, old_end-old_start)
101+
102+
# Test arrays of a small n by a large n with varying missing values
103+
@pytest.mark.parametrize("n,N,p",[(n,N,p) for p in perc for n in small_n for N in large_n])
104+
def test_time_n_by_N(n,N,p):
105+
X = gen_matrix(n, N, p)
106+
107+
start, end = run_test(X)
108+
old_start, old_end = run_test(X, old=True)
109+
110+
output_res(old_end,old_start,end,start)
111+
112+
relative_assert(end-start, old_end-old_start)
113+
114+
# Test arrays of a large n by a small n with varying missing values
115+
### This is the most important test case since it is the most likely scenario for usage
116+
### (More likely to be testing a large number of features)
117+
@pytest.mark.parametrize("N,n,p",[(N,n,p) for p in perc for n in small_n for N in large_n])
118+
def test_time_N_by_n(N,n,p):
119+
X = gen_matrix(N, n, p)
120+
121+
start, end = run_test(X)
122+
old_start, old_end = run_test(X, old=True)
123+
124+
output_res(old_end,old_start,end,start)
125+
126+
relative_assert(end-start, old_end-old_start)
127+
128+
# Test arrays of a large n by a large n with varying missing values
129+
# (This takes a VERY long time to run, since they are more often called individually)
130+
## @pytest.mark.parametrize("N1,N2,p",[(N1,N2,p) for p in perc for N1 in large_n for N2 in large_n])
131+
## def test_time_N_by_N(N1, N2, p):
132+
## X = gen_matrix(N1, N2, p)
133+
134+
## start, end = run_test(X)
135+
## old_start, old_end = run_test(X, old=True)
136+
137+
## output_res(old_end,old_start,end,start)
138+
139+
## relative_assert(end-start, old_end-old_start)

sklearn/metrics/pairwise.py

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
from scipy.sparse import csr_matrix
1919
from scipy.sparse import issparse
2020
from joblib import Parallel, effective_n_jobs
21+
from multiprocessing import cpu_count
2122

2223
from ..utils.validation import _num_samples
2324
from ..utils.validation import check_non_negative
@@ -1644,7 +1645,7 @@ def pairwise_distances_chunked(X, Y=None, *, reduce_func=None,
16441645
params = _precompute_metric_params(X, Y, metric=metric, **kwds)
16451646
kwds.update(**params)
16461647

1647-
for sl in slices:
1648+
def _process_slice(sl, reduce_func):
16481649
if sl.start == 0 and sl.stop == n_samples_X:
16491650
X_chunk = X # enable optimised paths for X is Y
16501651
else:
@@ -1661,8 +1662,16 @@ def pairwise_distances_chunked(X, Y=None, *, reduce_func=None,
16611662
chunk_size = D_chunk.shape[0]
16621663
D_chunk = reduce_func(D_chunk, sl.start)
16631664
_check_chunk_size(D_chunk, chunk_size)
1664-
yield D_chunk
1665+
return D_chunk
16651666

1667+
if effective_n_jobs(n_jobs) > 1:
1668+
generator = (delayed(_process_slice)(sl, reduce_func) for sl in slices)
1669+
par_res=Parallel(n_jobs, backend='threading')(generator)
1670+
for res in par_res:
1671+
yield res
1672+
else:
1673+
for sl in slices:
1674+
yield _process_slice(sl, reduce_func)
16661675

16671676
@_deprecate_positional_args
16681677
def pairwise_distances(X, Y=None, metric="euclidean", *, n_jobs=None,

sklearn/preprocessing/_data.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1644,6 +1644,7 @@ def __init__(self, max_degree=None, *, interaction_only=False, include_bias=True
16441644
self.interaction_only = interaction_only
16451645
self.include_bias = include_bias
16461646
self.order = order
1647+
self.degree = max_degree or degree # lazy evaluation to handle deprecaition
16471648

16481649
@staticmethod
16491650
@_deprecate_positional_args

0 commit comments

Comments
 (0)
0