From 0532aab213ee3fc8a5e88af1f5bde5454035c6dd Mon Sep 17 00:00:00 2001 From: Julien Jerphanion Date: Wed, 5 Jan 2022 15:19:56 +0100 Subject: [PATCH 1/6] MAINT Introduce Pairwise Distances Reductions private submodule (#22064) Co-authored-by: Thomas J. Fan Co-authored-by: Christian Lorentzen Co-authored-by: Olivier Grisel --- sklearn/_config.py | 30 +- sklearn/metrics/_dist_metrics.pxd | 21 + sklearn/metrics/_dist_metrics.pyx | 194 +++- .../metrics/_pairwise_distances_reduction.pyx | 839 ++++++++++++++++++ sklearn/metrics/setup.py | 7 + .../test_pairwise_distances_reduction.py | 357 ++++++++ sklearn/tests/test_config.py | 3 + sklearn/utils/_openmp_helpers.pxd | 6 + sklearn/utils/_openmp_helpers.pyx | 15 +- 9 files changed, 1458 insertions(+), 14 deletions(-) create mode 100644 sklearn/metrics/_pairwise_distances_reduction.pyx create mode 100644 sklearn/metrics/tests/test_pairwise_distances_reduction.py create mode 100644 sklearn/utils/_openmp_helpers.pxd diff --git a/sklearn/_config.py b/sklearn/_config.py index c41c180012056..d6a02737f640d 100644 --- a/sklearn/_config.py +++ b/sklearn/_config.py @@ -9,6 +9,9 @@ "working_memory": int(os.environ.get("SKLEARN_WORKING_MEMORY", 1024)), "print_changed_only": True, "display": "text", + "pairwise_dist_chunk_size": int( + os.environ.get("SKLEARN_PAIRWISE_DIST_CHUNK_SIZE", 256) + ), } _threadlocal = threading.local() @@ -40,7 +43,11 @@ def get_config(): def set_config( - assume_finite=None, working_memory=None, print_changed_only=None, display=None + assume_finite=None, + working_memory=None, + print_changed_only=None, + display=None, + pairwise_dist_chunk_size=None, ): """Set global scikit-learn configuration @@ -80,6 +87,12 @@ def set_config( .. versionadded:: 0.23 + pairwise_dist_chunk_size : int, default=None + The number of vectors per chunk for PairwiseDistancesReduction. + Default is 256 (suitable for most of modern laptops' caches and architectures). + + .. versionadded:: 1.1 + See Also -------- config_context : Context manager for global scikit-learn configuration. @@ -95,11 +108,18 @@ def set_config( local_config["print_changed_only"] = print_changed_only if display is not None: local_config["display"] = display + if pairwise_dist_chunk_size is not None: + local_config["pairwise_dist_chunk_size"] = pairwise_dist_chunk_size @contextmanager def config_context( - *, assume_finite=None, working_memory=None, print_changed_only=None, display=None + *, + assume_finite=None, + working_memory=None, + print_changed_only=None, + display=None, + pairwise_dist_chunk_size=None, ): """Context manager for global scikit-learn configuration. @@ -138,6 +158,12 @@ def config_context( .. versionadded:: 0.23 + pairwise_dist_chunk_size : int, default=None + The number of vectors per chunk for PairwiseDistancesReduction. + Default is 256 (suitable for most of modern laptops' caches and architectures). + + .. versionadded:: 1.1 + Yields ------ None. diff --git a/sklearn/metrics/_dist_metrics.pxd b/sklearn/metrics/_dist_metrics.pxd index 611f6759e2c8b..e7c2f2ea2f926 100644 --- a/sklearn/metrics/_dist_metrics.pxd +++ b/sklearn/metrics/_dist_metrics.pxd @@ -64,3 +64,24 @@ cdef class DistanceMetric: cdef DTYPE_t _rdist_to_dist(self, DTYPE_t rdist) nogil except -1 cdef DTYPE_t _dist_to_rdist(self, DTYPE_t dist) nogil except -1 + + +###################################################################### +# DatasetsPair base class +cdef class DatasetsPair: + cdef DistanceMetric distance_metric + + cdef ITYPE_t n_samples_X(self) nogil + + cdef ITYPE_t n_samples_Y(self) nogil + + cdef DTYPE_t dist(self, ITYPE_t i, ITYPE_t j) nogil + + cdef DTYPE_t surrogate_dist(self, ITYPE_t i, ITYPE_t j) nogil + + +cdef class DenseDenseDatasetsPair(DatasetsPair): + cdef: + const DTYPE_t[:, ::1] X + const DTYPE_t[:, ::1] Y + ITYPE_t d diff --git a/sklearn/metrics/_dist_metrics.pyx b/sklearn/metrics/_dist_metrics.pyx index f7d22c1badfa2..6cf93baeca925 100644 --- a/sklearn/metrics/_dist_metrics.pyx +++ b/sklearn/metrics/_dist_metrics.pyx @@ -4,6 +4,8 @@ import numpy as np cimport numpy as np +from cython cimport final + np.import_array() # required in order to use C-API @@ -23,10 +25,10 @@ cdef inline np.ndarray _buffer_to_ndarray(const DTYPE_t* x, np.npy_intp n): return PyArray_SimpleNewFromData(1, &n, DTYPECODE, x) -# some handy constants from libc.math cimport fabs, sqrt, exp, pow, cos, sin, asin cdef DTYPE_t INF = np.inf +from scipy.sparse import csr_matrix, issparse from ..utils._typedefs cimport DTYPE_t, ITYPE_t, DITYPE_t, DTYPECODE from ..utils._typedefs import DTYPE, ITYPE from ..utils._readonly_array_wrapper import ReadonlyArrayWrapper @@ -67,6 +69,16 @@ METRIC_MAPPING = {'euclidean': EuclideanDistance, 'haversine': HaversineDistance, 'pyfunc': PyFuncDistance} +BOOL_METRICS = [ + "matching", + "jaccard", + "dice", + "kulsinski", + "rogerstanimoto", + "russellrao", + "sokalmichener", + "sokalsneath", +] def get_valid_metric_ids(L): """Given an iterable of metric class names or class identifiers, @@ -195,8 +207,8 @@ cdef class DistanceMetric: """ def __cinit__(self): self.p = 2 - self.vec = np.zeros(1, dtype=DTYPE, order='c') - self.mat = np.zeros((1, 1), dtype=DTYPE, order='c') + self.vec = np.zeros(1, dtype=DTYPE, order='C') + self.mat = np.zeros((1, 1), dtype=DTYPE, order='C') self.size = 1 def __reduce__(self): @@ -306,8 +318,9 @@ cdef class DistanceMetric: This can optionally be overridden in a base class. The rank-preserving surrogate distance is any measure that yields the same - rank as the distance, but is more efficient to compute. For example, for the - Euclidean metric, the surrogate distance is the squared-euclidean distance. + rank as the distance, but is more efficient to compute. For example, the + rank-preserving surrogate distance of the Euclidean metric is the + squared-euclidean distance. """ return self.dist(x1, x2, size) @@ -343,8 +356,9 @@ cdef class DistanceMetric: """Convert the rank-preserving surrogate distance to the distance. The surrogate distance is any measure that yields the same rank as the - distance, but is more efficient to compute. For example, for the - Euclidean metric, the surrogate distance is the squared-euclidean distance. + distance, but is more efficient to compute. For example, the + rank-preserving surrogate distance of the Euclidean metric is the + squared-euclidean distance. Parameters ---------- @@ -362,8 +376,9 @@ cdef class DistanceMetric: """Convert the true distance to the rank-preserving surrogate distance. The surrogate distance is any measure that yields the same rank as the - distance, but is more efficient to compute. For example, for the - Euclidean metric, the surrogate distance is the squared-euclidean distance. + distance, but is more efficient to compute. For example, the + rank-preserving surrogate distance of the Euclidean metric is the + squared-euclidean distance. Parameters ---------- @@ -1150,3 +1165,164 @@ cdef class PyFuncDistance(DistanceMetric): cdef inline double fmax(double a, double b) nogil: return max(a, b) + + +###################################################################### +# Datasets Pair Classes +cdef class DatasetsPair: + """Abstract class which wraps a pair of datasets (X, Y). + + This class allows computing distances between a single pair of rows of + of X and Y at a time given the pair of their indices (i, j). This class is + specialized for each metric thanks to the :func:`get_for` factory classmethod. + + The handling of parallelization over chunks to compute the distances + and aggregation for several rows at a time is done in dedicated + subclasses of PairwiseDistancesReduction that in-turn rely on + subclasses of DatasetsPair for each pair of rows in the data. The goal + is to make it possible to decouple the generic parallelization and + aggregation logic from metric-specific computation as much as + possible. + + X and Y can be stored as C-contiguous np.ndarrays or CSR matrices + in subclasses. + + This class avoids the overhead of dispatching distance computations + to :class:`sklearn.metrics.DistanceMetric` based on the physical + representation of the vectors (sparse vs. dense). It makes use of + cython.final to remove the overhead of dispatching method calls. + + Parameters + ---------- + distance_metric: DistanceMetric + The distance metric responsible for computing distances + between two vectors of (X, Y). + """ + + @classmethod + def get_for( + cls, + X, + Y, + str metric="euclidean", + dict metric_kwargs=None, + ) -> DatasetsPair: + """Return the DatasetsPair implementation for the given arguments. + + Parameters + ---------- + X : {ndarray, sparse matrix} of shape (n_samples_X, n_features) + Input data. + If provided as a ndarray, it must be C-contiguous. + If provided as a sparse matrix, it must be in CSR format. + + Y : {ndarray, sparse matrix} of shape (n_samples_Y, n_features) + Input data. + If provided as a ndarray, it must be C-contiguous. + If provided as a sparse matrix, it must be in CSR format. + + metric : str, default='euclidean' + The distance metric to compute between rows of X and Y. + The default metric is a fast implementation of the Euclidean + metric. For a list of available metrics, see the documentation + of :class:`~sklearn.metrics.DistanceMetric`. + + metric_kwargs : dict, default=None + Keyword arguments to pass to specified metric function. + + Returns + ------- + datasets_pair: DatasetsPair + The suited DatasetsPair implementation. + """ + cdef: + DistanceMetric distance_metric = DistanceMetric.get_metric( + metric, + **(metric_kwargs or {}) + ) + + if not(X.dtype == Y.dtype == np.float64): + raise ValueError( + f"Only 64bit float datasets are supported at this time, " + f"got: X.dtype={X.dtype} and Y.dtype={Y.dtype}" + ) + + # Metric-specific checks that do not replace nor duplicate `check_array`. + distance_metric._validate_data(X) + distance_metric._validate_data(Y) + + # TODO: dispatch to other dataset pairs for sparse support once available: + if issparse(X) or issparse(Y): + raise ValueError("Only dense datasets are supported for X and Y.") + + return DenseDenseDatasetsPair(X, Y, distance_metric) + + def __init__(self, DistanceMetric distance_metric): + self.distance_metric = distance_metric + + cdef ITYPE_t n_samples_X(self) nogil: + """Number of samples in X.""" + # This is a abstract method. + # This _must_ always be overwritten in subclasses. + # TODO: add "with gil: raise" here when supporting Cython 3.0 + return -999 + + cdef ITYPE_t n_samples_Y(self) nogil: + """Number of samples in Y.""" + # This is a abstract method. + # This _must_ always be overwritten in subclasses. + # TODO: add "with gil: raise" here when supporting Cython 3.0 + return -999 + + cdef DTYPE_t surrogate_dist(self, ITYPE_t i, ITYPE_t j) nogil: + return self.dist(i, j) + + cdef DTYPE_t dist(self, ITYPE_t i, ITYPE_t j) nogil: + # This is a abstract method. + # This _must_ always be overwritten in subclasses. + # TODO: add "with gil: raise" here when supporting Cython 3.0 + return -1 + +@final +cdef class DenseDenseDatasetsPair(DatasetsPair): + """Compute distances between row vectors of two arrays. + + Parameters + ---------- + X: ndarray of shape (n_samples_X, n_features) + Rows represent vectors. Must be C-contiguous. + + Y: ndarray of shape (n_samples_Y, n_features) + Rows represent vectors. Must be C-contiguous. + + distance_metric: DistanceMetric + The distance metric responsible for computing distances + between two row vectors of (X, Y). + """ + + def __init__(self, X, Y, DistanceMetric distance_metric): + super().__init__(distance_metric) + # Arrays have already been checked + self.X = X + self.Y = Y + self.d = X.shape[1] + + @final + cdef ITYPE_t n_samples_X(self) nogil: + return self.X.shape[0] + + @final + cdef ITYPE_t n_samples_Y(self) nogil: + return self.Y.shape[0] + + @final + cdef DTYPE_t surrogate_dist(self, ITYPE_t i, ITYPE_t j) nogil: + return self.distance_metric.rdist(&self.X[i, 0], + &self.Y[j, 0], + self.d) + + @final + cdef DTYPE_t dist(self, ITYPE_t i, ITYPE_t j) nogil: + return self.distance_metric.dist(&self.X[i, 0], + &self.Y[j, 0], + self.d) diff --git a/sklearn/metrics/_pairwise_distances_reduction.pyx b/sklearn/metrics/_pairwise_distances_reduction.pyx new file mode 100644 index 0000000000000..830df08e1a952 --- /dev/null +++ b/sklearn/metrics/_pairwise_distances_reduction.pyx @@ -0,0 +1,839 @@ +# Pairwise Distances Reductions +# ============================= +# +# Author: Julien Jerphanion +# +# +# The abstractions defined here are used in various algorithms performing +# the same structure of operations on distances between row vectors +# of a datasets pair (X, Y). +# +# Importantly, the core of the computation is chunked to make sure that the pairwise +# distance chunk matrices stay in CPU cache before applying the final reduction step. +# Furthermore, the chunking strategy is also used to leverage OpenMP-based parallelism +# (using Cython prange loops) which gives another multiplicative speed-up in +# favorable cases on many-core machines. +cimport numpy as np +import numpy as np +import warnings +import scipy.sparse + +from .. import get_config +from libc.stdlib cimport free, malloc +from libc.float cimport DBL_MAX +from cython cimport final +from cython.parallel cimport parallel, prange + +from ._dist_metrics cimport DatasetsPair +from ..utils._heap cimport simultaneous_sort, heap_push +from ..utils._openmp_helpers cimport _openmp_thread_num +from ..utils._typedefs cimport ITYPE_t, DTYPE_t + +from numbers import Integral +from typing import List +from scipy.sparse import issparse +from ._dist_metrics import BOOL_METRICS, METRIC_MAPPING +from ..utils import check_scalar +from ..utils.fixes import threadpool_limits +from ..utils._openmp_helpers import _openmp_effective_n_threads +from ..utils._typedefs import ITYPE, DTYPE + + +np.import_array() + +cdef class PairwiseDistancesReduction: + """Abstract base class for pairwise distance computation & reduction. + + Subclasses of this class compute pairwise distances between a set of + row vectors of X and another set of row vectors pf Y and apply a reduction on top. + The reduction takes a matrix of pairwise distances between rows of X and Y + as input and outputs an aggregate data-structure for each row of X. + The aggregate values are typically smaller than the number of rows in Y, + hence the term reduction. + + For computational reasons, it is interesting to perform the reduction on + the fly on chunks of rows of X and Y so as to keep intermediate + data-structures in CPU cache and avoid unnecessary round trips of large + distance arrays with the RAM that would otherwise severely degrade the + speed by making the overall processing memory-bound. + + The base class provides the generic chunked parallelization template using + OpenMP loops (Cython prange), either on rows of X or rows of Y depending on + their respective sizes. + + The subclasses are specialized for reduction. + + The actual distance computation for a given pair of rows of X and Y are + delegated to format-specific subclasses of the DatasetsPair companion base + class. + + Parameters + ---------- + datasets_pair: DatasetsPair + The pair of dataset to use. + + chunk_size: int, default=None + The number of vectors per chunk. If None (default) looks-up in + scikit-learn configuration for `pairwise_dist_chunk_size`, + and use 256 if it is not set. + + n_threads: int, default=None + The number of OpenMP threads to use for the reduction. + Parallelism is done on chunks and the sharding of chunks + depends on the `strategy` set on :method:`~PairwiseDistancesReduction.compute`. + + See _openmp_effective_n_threads, for details about + the specification of n_threads. + + strategy : str, {'auto', 'parallel_on_X', 'parallel_on_Y'}, default=None + The chunking strategy defining which dataset parallelization are made on. + + For both strategies the computations happens with two nested loops, + respectively on chunks of X and chunks of Y. + Strategies differs on which loop (outer or inner) is made to run + in parallel with the Cython `prange` construct: + + - 'parallel_on_X' dispatches chunks of X uniformly on threads. + Each thread then iterates on all the chunks of Y. This strategy is + embarrassingly parallel and comes with no datastructures synchronisation. + + - 'parallel_on_Y' dispatches chunks of Y uniformly on threads. + Each thread processes all the chunks of X in turn. This strategy is + a sequence of embarrassingly parallel subtasks (the inner loop on Y + chunks) with intermediate datastructures synchronisation at each + iteration of the sequential outer loop on X chunks. + + - 'auto' relies on a simple heuristic to choose between + 'parallel_on_X' and 'parallel_on_Y': when `X.shape[0]` is large enough, + 'parallel_on_X' is usually the most efficient strategy. When `X.shape[0]` + is small but `Y.shape[0]` is large, 'parallel_on_Y' brings more opportunity + for parallelism and is therefore more efficient despite the synchronization + step at each iteration of the outer loop on chunks of `X`. + + - None (default) looks-up in scikit-learn configuration for + `pairwise_dist_parallel_strategy`, and use 'auto' if it is not set. + """ + + cdef: + readonly DatasetsPair datasets_pair + + # The number of threads that can be used is stored in effective_n_threads. + # + # The number of threads to use in the parallelisation strategy + # (i.e. parallel_on_X or parallel_on_Y) can be smaller than effective_n_threads: + # for small datasets, less threads might be needed to loop over pair of chunks. + # + # Hence the number of threads that _will_ be used for looping over chunks + # is stored in chunks_n_threads, allowing solely using what we need. + # + # Thus, an invariant is: + # + # chunks_n_threads <= effective_n_threads + # + ITYPE_t effective_n_threads + ITYPE_t chunks_n_threads + + ITYPE_t n_samples_chunk, chunk_size + + ITYPE_t n_samples_X, X_n_samples_chunk, X_n_chunks, X_n_samples_last_chunk + ITYPE_t n_samples_Y, Y_n_samples_chunk, Y_n_chunks, Y_n_samples_last_chunk + + bint execute_in_parallel_on_Y + + @classmethod + def valid_metrics(cls) -> List[str]: + excluded = { + "pyfunc", # is relatively slow because we need to coerce data as np arrays + "mahalanobis", # is numerically unstable + # TODO: In order to support discrete distance metrics, we need to have a + # stable simultaneous sort which preserves the order of the input. + # The best might be using std::stable_sort and a Comparator taking an + # Arrays of Structures instead of Structure of Arrays (currently used). + "hamming", + *BOOL_METRICS, + } + return sorted(set(METRIC_MAPPING.keys()) - excluded) + + @classmethod + def is_usable_for(cls, X, Y, metric) -> bool: + """Return True if the PairwiseDistancesReduction can be used for the given parameters. + + Parameters + ---------- + X : {ndarray, sparse matrix} of shape (n_samples_X, n_features) + Input data. + + Y : {ndarray, sparse matrix} of shape (n_samples_Y, n_features) + Input data. + + metric : str, default='euclidean' + The distance metric to use. + For a list of available metrics, see the documentation of + :class:`~sklearn.metrics.DistanceMetric`. + + Returns + ------- + True if the PairwiseDistancesReduction can be used, else False. + """ + # TODO: support sparse arrays and 32 bits + return (not issparse(X) and X.dtype == np.float64 and + not issparse(Y) and Y.dtype == np.float64 and + metric in cls.valid_metrics()) + + def __cinit__( + self, + DatasetsPair datasets_pair, + chunk_size=None, + n_threads=None, + strategy=None, + *args, + **kwargs, + ): + cdef: + ITYPE_t n_samples_chunk, X_n_full_chunks, Y_n_full_chunks + + if chunk_size is None: + chunk_size = get_config().get("pairwise_dist_chunk_size", 256) + + self.chunk_size = check_scalar(chunk_size, "chunk_size", Integral, min_val=20) + + self.effective_n_threads = _openmp_effective_n_threads(n_threads) + + self.datasets_pair = datasets_pair + + self.n_samples_X = datasets_pair.n_samples_X() + self.X_n_samples_chunk = min(self.n_samples_X, self.chunk_size) + X_n_full_chunks = self.n_samples_X // self.X_n_samples_chunk + X_n_samples_remainder = self.n_samples_X % self.X_n_samples_chunk + self.X_n_chunks = X_n_full_chunks + (X_n_samples_remainder != 0) + + if X_n_samples_remainder != 0: + self.X_n_samples_last_chunk = X_n_samples_remainder + else: + self.X_n_samples_last_chunk = self.X_n_samples_chunk + + self.n_samples_Y = datasets_pair.n_samples_Y() + self.Y_n_samples_chunk = min(self.n_samples_Y, self.chunk_size) + Y_n_full_chunks = self.n_samples_Y // self.Y_n_samples_chunk + Y_n_samples_remainder = self.n_samples_Y % self.Y_n_samples_chunk + self.Y_n_chunks = Y_n_full_chunks + (Y_n_samples_remainder != 0) + + if Y_n_samples_remainder != 0: + self.Y_n_samples_last_chunk = Y_n_samples_remainder + else: + self.Y_n_samples_last_chunk = self.Y_n_samples_chunk + + if strategy is None: + strategy = get_config().get("pairwise_dist_parallel_strategy", 'auto') + + if strategy not in ('parallel_on_X', 'parallel_on_Y', 'auto'): + raise RuntimeError(f"strategy must be 'parallel_on_X, 'parallel_on_Y', " + f"or 'auto', but currently strategy='{self.strategy}'.") + + if strategy == 'auto': + # This is a simple heuristic whose constant for the + # comparison has been chosen based on experiments. + if 4 * self.chunk_size * self.effective_n_threads < self.n_samples_X: + strategy = 'parallel_on_X' + else: + strategy = 'parallel_on_Y' + + self.execute_in_parallel_on_Y = strategy == "parallel_on_Y" + + # Not using less, not using more. + self.chunks_n_threads = min( + self.Y_n_chunks if self.execute_in_parallel_on_Y else self.X_n_chunks, + self.effective_n_threads, + ) + + @final + cdef void _parallel_on_X(self) nogil: + """Compute the pairwise distances of each row vector of X on Y + by parallelizing computation on the outer loop on chunks of X + and reduce them. + + This strategy dispatches chunks of Y uniformly on threads. + Each thread processes all the chunks of X in turn. This strategy is + a sequence of embarrassingly parallel subtasks (the inner loop on Y + chunks) with intermediate datastructures synchronisation at each + iteration of the sequential outer loop on X chunks. + + Private datastructures are modified internally by threads. + + Private template methods can be implemented on subclasses to + interact with those datastructures at various stages. + """ + cdef: + ITYPE_t Y_start, Y_end, X_start, X_end, X_chunk_idx, Y_chunk_idx + ITYPE_t thread_num + + with nogil, parallel(num_threads=self.chunks_n_threads): + thread_num = _openmp_thread_num() + + # Allocating thread datastructures + self._parallel_on_X_parallel_init(thread_num) + + for X_chunk_idx in prange(self.X_n_chunks, schedule='static'): + X_start = X_chunk_idx * self.X_n_samples_chunk + if X_chunk_idx == self.X_n_chunks - 1: + X_end = X_start + self.X_n_samples_last_chunk + else: + X_end = X_start + self.X_n_samples_chunk + + # Reinitializing thread datastructures for the new X chunk + self._parallel_on_X_init_chunk(thread_num, X_start) + + for Y_chunk_idx in range(self.Y_n_chunks): + Y_start = Y_chunk_idx * self.Y_n_samples_chunk + if Y_chunk_idx == self.Y_n_chunks - 1: + Y_end = Y_start + self.Y_n_samples_last_chunk + else: + Y_end = Y_start + self.Y_n_samples_chunk + + self._compute_and_reduce_distances_on_chunks( + X_start, X_end, + Y_start, Y_end, + thread_num, + ) + + # Adjusting thread datastructures on the full pass on Y + self._parallel_on_X_prange_iter_finalize(thread_num, X_start, X_end) + + # end: for X_chunk_idx + + # Deallocating thread datastructures + self._parallel_on_X_parallel_finalize(thread_num) + + # end: with nogil, parallel + return + + @final + cdef void _parallel_on_Y(self) nogil: + """Compute the pairwise distances of each row vector of X on Y + by parallelizing computation on the inner loop on chunks of Y + and reduce them. + + This strategy dispatches chunks of Y uniformly on threads. + Each thread processes all the chunks of X in turn. This strategy is + a sequence of embarrassingly parallel subtasks (the inner loop on Y + chunks) with intermediate datastructures synchronisation at each + iteration of the sequential outer loop on X chunks. + + Private datastructures are modified internally by threads. + + Private template methods can be implemented on subclasses to + interact with those datastructures at various stages. + """ + cdef: + ITYPE_t Y_start, Y_end, X_start, X_end, X_chunk_idx, Y_chunk_idx + ITYPE_t thread_num + + # Allocating datastructures shared by all threads + self._parallel_on_Y_init() + + for X_chunk_idx in range(self.X_n_chunks): + X_start = X_chunk_idx * self.X_n_samples_chunk + if X_chunk_idx == self.X_n_chunks - 1: + X_end = X_start + self.X_n_samples_last_chunk + else: + X_end = X_start + self.X_n_samples_chunk + + with nogil, parallel(num_threads=self.chunks_n_threads): + thread_num = _openmp_thread_num() + + # Initializing datastructures used in this thread + self._parallel_on_Y_parallel_init(thread_num) + + for Y_chunk_idx in prange(self.Y_n_chunks, schedule='static'): + Y_start = Y_chunk_idx * self.Y_n_samples_chunk + if Y_chunk_idx == self.Y_n_chunks - 1: + Y_end = Y_start + self.Y_n_samples_last_chunk + else: + Y_end = Y_start + self.Y_n_samples_chunk + + self._compute_and_reduce_distances_on_chunks( + X_start, X_end, + Y_start, Y_end, + thread_num, + ) + # end: prange + + # Note: we don't need a _parallel_on_Y_finalize similarly. + # This can be introduced if needed. + + # end: with nogil, parallel + + # Synchronizing the thread datastructures with the main ones + self._parallel_on_Y_synchronize(X_start, X_end) + + # end: for X_chunk_idx + # Deallocating temporary datastructures and adjusting main datastructures + self._parallel_on_Y_finalize() + return + + # Placeholder methods which have to be implemented + + cdef void _compute_and_reduce_distances_on_chunks( + self, + ITYPE_t X_start, + ITYPE_t X_end, + ITYPE_t Y_start, + ITYPE_t Y_end, + ITYPE_t thread_num, + ) nogil: + """Compute the pairwise distances on two chunks of X and Y and reduce them. + + This is THE core computational method of PairwiseDistanceReductions. + This must be implemented in subclasses. + """ + return + + def _finalize_results(self, bint return_distance): + """Callback adapting datastructures before returning results. + + This must be implemented in subclasses. + """ + return None + + # Placeholder methods which can be implemented + + cdef void compute_exact_distances(self) nogil: + """Convert rank-preserving distances to exact distances or recompute them.""" + return + + cdef void _parallel_on_X_parallel_init( + self, + ITYPE_t thread_num, + ) nogil: + """Allocate datastructures used in a thread given its number.""" + return + + cdef void _parallel_on_X_init_chunk( + self, + ITYPE_t thread_num, + ITYPE_t X_start, + ) nogil: + """Initialise datastructures used in a thread given its number.""" + return + + cdef void _parallel_on_X_prange_iter_finalize( + self, + ITYPE_t thread_num, + ITYPE_t X_start, + ITYPE_t X_end, + ) nogil: + """Interact with datastructures after a reduction on chunks.""" + return + + cdef void _parallel_on_X_parallel_finalize( + self, + ITYPE_t thread_num + ) nogil: + """Interact with datastructures after executing all the reductions.""" + return + + cdef void _parallel_on_Y_init( + self, + ) nogil: + """Allocate datastructures used in all threads.""" + return + + cdef void _parallel_on_Y_parallel_init( + self, + ITYPE_t thread_num, + ) nogil: + """Initialise datastructures used in a thread given its number.""" + return + + cdef void _parallel_on_Y_synchronize( + self, + ITYPE_t X_start, + ITYPE_t X_end, + ) nogil: + """Update thread datastructures before leaving a parallel region.""" + return + + cdef void _parallel_on_Y_finalize( + self, + ) nogil: + """Update datastructures after executing all the reductions.""" + return + +cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): + """Compute the argkmin of row vectors of X on the ones of Y. + + For each row vector of X, computes the indices of k first the rows + vectors of Y with the smallest distances. + + PairwiseDistancesArgKmin is typically used to perform + bruteforce k-nearest neighbors queries. + + Parameters + ---------- + datasets_pair: DatasetsPair + The dataset pairs (X, Y) for the reduction. + + chunk_size: int, default=None, + The number of vectors per chunk. If None (default) looks-up in + scikit-learn configuration for `pairwise_dist_chunk_size`, + and use 256 if it is not set. + + n_threads: int, default=None + The number of OpenMP threads to use for the reduction. + Parallelism is done on chunks and the sharding of chunks + depends on the `strategy` set on + :meth:`~PairwiseDistancesArgKmin.compute`. + + See _openmp_effective_n_threads, for details about + the specification of n_threads. + + k: int, default=1 + The k for the argkmin reduction. + """ + + cdef: + ITYPE_t k + + ITYPE_t[:, ::1] argkmin_indices + DTYPE_t[:, ::1] argkmin_distances + + # Used as array of pointers to private datastructures used in threads. + DTYPE_t ** heaps_r_distances_chunks + ITYPE_t ** heaps_indices_chunks + + @classmethod + def compute( + cls, + X, + Y, + ITYPE_t k, + str metric="euclidean", + chunk_size=None, + dict metric_kwargs=None, + n_threads=None, + str strategy=None, + bint return_distance=False, + ): + """Return the results of the reduction for the given arguments. + + Parameters + ---------- + X : ndarray or CSR matrix of shape (n_samples_X, n_features) + Input data. + + Y : ndarray or CSR matrix of shape (n_samples_Y, n_features) + Input data. + + k : int + The k for the argkmin reduction. + + metric : str, default='euclidean' + The distance metric to use for argkmin. + For a list of available metrics, see the documentation of + :class:`~sklearn.metrics.DistanceMetric`. + + chunk_size : int, default=None, + The number of vectors per chunk. If None (default) looks-up in + scikit-learn configuration for `pairwise_dist_chunk_size`, + and use 256 if it is not set. + + metric_kwargs : dict, default=None + Keyword arguments to pass to specified metric function. + + n_threads : int, default=None + The number of OpenMP threads to use for the reduction. + Parallelism is done on chunks and the sharding of chunks + depends on the `strategy` set on + :meth:`~PairwiseDistancesArgKmin.compute`. + + See _openmp_effective_n_threads, for details about + the specification of n_threads. + + strategy : str, {'auto', 'parallel_on_X', 'parallel_on_Y'}, default=None + The chunking strategy defining which dataset parallelization are made on. + + For both strategies the computations happens with two nested loops, + respectively on chunks of X and chunks of Y. + Strategies differs on which loop (outer or inner) is made to run + in parallel with the Cython `prange` construct: + + - 'parallel_on_X' dispatches chunks of X uniformly on threads. + Each thread then iterates on all the chunks of Y. This strategy is + embarrassingly parallel and comes with no datastructures synchronisation. + + - 'parallel_on_Y' dispatches chunks of Y uniformly on threads. + Each thread processes all the chunks of X in turn. This strategy is + a sequence of embarrassingly parallel subtasks (the inner loop on Y + chunks) with intermediate datastructures synchronisation at each + iteration of the sequential outer loop on X chunks. + + - 'auto' relies on a simple heuristic to choose between + 'parallel_on_X' and 'parallel_on_Y': when `X.shape[0]` is large enough, + 'parallel_on_X' is usually the most efficient strategy. When `X.shape[0]` + is small but `Y.shape[0]` is large, 'parallel_on_Y' brings more opportunity + for parallelism and is therefore more efficient despite the synchronization + step at each iteration of the outer loop on chunks of `X`. + + - None (default) looks-up in scikit-learn configuration for + `pairwise_dist_parallel_strategy`, and use 'auto' if it is not set. + + return_distance : boolean, default=False + Return distances between each X vector and its + argkmin if set to True. + + Returns + ------- + If return_distance=False: + - argkmin_indices : ndarray of shape (n_samples_X, k) + Indices of the argkmin for each vector in X. + + If return_distance=True: + - argkmin_indices : ndarray of shape (n_samples_X, k) + Indices of the argkmin for each vector in X. + - argkmin_distances : ndarray of shape (n_samples_X, k) + Distances to the argkmin for each vector in X. + + Notes + ----- + This public classmethod is responsible for introspecting the arguments + values to dispatch to the private :meth:`PairwiseDistancesArgKmin._compute` + instance method of the most appropriate :class:`PairwiseDistancesArgKmin` + concrete implementation. + + All temporarily allocated datastructures necessary for the concrete + implementation are therefore freed when this classmethod returns. + + This allows entirely decoupling the interface entirely from the + implementation details whilst maintaining RAII. + """ + # Note (jjerphan): Some design thoughts for future extensions. + # This factory comes to handle specialisations for the given arguments. + # For future work, this might can be an entrypoint to specialise operations + # for various back-end and/or hardware and/or datatypes, and/or fused + # {sparse, dense}-datasetspair etc. + + pda = PairwiseDistancesArgKmin( + datasets_pair=DatasetsPair.get_for(X, Y, metric, metric_kwargs), + k=k, + chunk_size=chunk_size, + strategy=strategy, + ) + + # Limit the number of threads in second level of nested parallelism for BLAS + # to avoid threads over-subscription (in GEMM for instance). + with threadpool_limits(limits=1, user_api="blas"): + if pda.execute_in_parallel_on_Y: + pda._parallel_on_Y() + else: + pda._parallel_on_X() + + return pda._finalize_results(return_distance) + + def __cinit__( + self, + DatasetsPair datasets_pair, + chunk_size=None, + n_threads=None, + strategy=None, + *args, + **kwargs, + ): + # Allocating pointers to datastructures but not the datastructures themselves. + # There are as many pointers as effective threads. + # + # For the sake of explicitness: + # - when parallelizing on X, the pointers of those heaps are referencing + # (with proper offsets) addresses of the two main heaps (see bellow) + # - when parallelizing on Y, the pointers of those heaps are referencing + # small heaps which are thread-wise-allocated and whose content will be + # merged with the main heaps'. + self.heaps_r_distances_chunks = malloc( + sizeof(DTYPE_t *) * self.chunks_n_threads + ) + self.heaps_indices_chunks = malloc( + sizeof(ITYPE_t *) * self.chunks_n_threads + ) + + def __init__( + self, + DatasetsPair datasets_pair, + chunk_size=None, + n_threads=None, + strategy=None, + ITYPE_t k=1, + ): + self.k = check_scalar(k, "k", Integral, min_val=1) + + # Main heaps which will be returned as results by `PairwiseDistancesArgKmin.compute`. + self.argkmin_indices = np.full((self.n_samples_X, self.k), 0, dtype=ITYPE) + self.argkmin_distances = np.full((self.n_samples_X, self.k), DBL_MAX, dtype=DTYPE) + + def __dealloc__(self): + if self.heaps_indices_chunks is not NULL: + free(self.heaps_indices_chunks) + + if self.heaps_r_distances_chunks is not NULL: + free(self.heaps_r_distances_chunks) + + cdef void _compute_and_reduce_distances_on_chunks( + self, + ITYPE_t X_start, + ITYPE_t X_end, + ITYPE_t Y_start, + ITYPE_t Y_end, + ITYPE_t thread_num, + ) nogil: + cdef: + ITYPE_t i, j + ITYPE_t n_samples_X = X_end - X_start + ITYPE_t n_samples_Y = Y_end - Y_start + DTYPE_t *heaps_r_distances = self.heaps_r_distances_chunks[thread_num] + ITYPE_t *heaps_indices = self.heaps_indices_chunks[thread_num] + + # Pushing the distances and their associated indices on a heap + # which by construction will keep track of the argkmin. + for i in range(n_samples_X): + for j in range(n_samples_Y): + heap_push( + heaps_r_distances + i * self.k, + heaps_indices + i * self.k, + self.k, + self.datasets_pair.surrogate_dist(X_start + i, Y_start + j), + Y_start + j, + ) + + @final + cdef void _parallel_on_X_init_chunk( + self, + ITYPE_t thread_num, + ITYPE_t X_start, + ) nogil: + # As this strategy is embarrassingly parallel, we can set each + # thread's heaps pointer to the proper position on the main heaps. + self.heaps_r_distances_chunks[thread_num] = &self.argkmin_distances[X_start, 0] + self.heaps_indices_chunks[thread_num] = &self.argkmin_indices[X_start, 0] + + @final + cdef void _parallel_on_X_prange_iter_finalize( + self, + ITYPE_t thread_num, + ITYPE_t X_start, + ITYPE_t X_end, + ) nogil: + cdef: + ITYPE_t idx, jdx + + # Sorting the main heaps portion associated to `X[X_start:X_end]` + # in ascending order w.r.t the distances. + for idx in range(X_end - X_start): + simultaneous_sort( + self.heaps_r_distances_chunks[thread_num] + idx * self.k, + self.heaps_indices_chunks[thread_num] + idx * self.k, + self.k + ) + + cdef void _parallel_on_Y_init( + self, + ) nogil: + cdef: + # Maximum number of scalar elements (the last chunks can be smaller) + ITYPE_t heaps_size = self.X_n_samples_chunk * self.k + ITYPE_t thread_num + + # The allocation is done in parallel for data locality purposes: this way + # the heaps used in each threads are allocated in pages which are closer + # to the CPU core used by the thread. + # See comments about First Touch Placement Policy: + # https://www.openmp.org/wp-content/uploads/openmp-webinar-vanderPas-20210318.pdf #noqa + for thread_num in prange(self.chunks_n_threads, schedule='static', nogil=True, + num_threads=self.chunks_n_threads): + # As chunks of X are shared across threads, so must their + # heaps. To solve this, each thread has its own heaps + # which are then synchronised back in the main ones. + self.heaps_r_distances_chunks[thread_num] = malloc( + heaps_size * sizeof(DTYPE_t) + ) + self.heaps_indices_chunks[thread_num] = malloc( + heaps_size * sizeof(ITYPE_t) + ) + + @final + cdef void _parallel_on_Y_parallel_init( + self, + ITYPE_t thread_num, + ) nogil: + # Initialising heaps (memset can't be used here) + for idx in range(self.X_n_samples_chunk * self.k): + self.heaps_r_distances_chunks[thread_num][idx] = DBL_MAX + self.heaps_indices_chunks[thread_num][idx] = -1 + + @final + cdef void _parallel_on_Y_synchronize( + self, + ITYPE_t X_start, + ITYPE_t X_end, + ) nogil: + cdef: + ITYPE_t idx, jdx, thread_num + with nogil, parallel(num_threads=self.effective_n_threads): + # Synchronising the thread heaps with the main heaps. + # This is done in parallel sample-wise (no need for locks). + # + # This might break each thread's data locality as each heap which + # was allocated in a thread is being now being used in several threads. + # + # Still, this parallel pattern has shown to be efficient in practice. + for idx in prange(X_end - X_start, schedule="static"): + for thread_num in range(self.chunks_n_threads): + for jdx in range(self.k): + heap_push( + &self.argkmin_distances[X_start + idx, 0], + &self.argkmin_indices[X_start + idx, 0], + self.k, + self.heaps_r_distances_chunks[thread_num][idx * self.k + jdx], + self.heaps_indices_chunks[thread_num][idx * self.k + jdx], + ) + + cdef void _parallel_on_Y_finalize( + self, + ) nogil: + cdef: + ITYPE_t idx, thread_num + + with nogil, parallel(num_threads=self.chunks_n_threads): + # Deallocating temporary datastructures + for thread_num in prange(self.chunks_n_threads, schedule='static'): + free(self.heaps_r_distances_chunks[thread_num]) + free(self.heaps_indices_chunks[thread_num]) + + # Sorting the main in ascending order w.r.t the distances. + # This is done in parallel sample-wise (no need for locks). + for idx in prange(self.n_samples_X, schedule='static'): + simultaneous_sort( + &self.argkmin_distances[idx, 0], + &self.argkmin_indices[idx, 0], + self.k, + ) + return + + cdef void compute_exact_distances(self) nogil: + cdef: + ITYPE_t i, j + ITYPE_t[:, ::1] Y_indices = self.argkmin_indices + DTYPE_t[:, ::1] distances = self.argkmin_distances + for i in prange(self.n_samples_X, schedule='static', nogil=True, + num_threads=self.effective_n_threads): + for j in range(self.k): + distances[i, j] = self.datasets_pair.distance_metric._rdist_to_dist( + # Guard against eventual -0., causing nan production. + max(distances[i, j], 0.) + ) + + def _finalize_results(self, bint return_distance=False): + if return_distance: + # We need to recompute distances because we relied on + # surrogate distances for the reduction. + self.compute_exact_distances() + return np.asarray(self.argkmin_indices), np.asarray(self.argkmin_distances) + + return np.asarray(self.argkmin_indices) diff --git a/sklearn/metrics/setup.py b/sklearn/metrics/setup.py index 69925a3590be6..1c26d9969397c 100644 --- a/sklearn/metrics/setup.py +++ b/sklearn/metrics/setup.py @@ -26,6 +26,13 @@ def configuration(parent_package="", top_path=None): libraries=libraries, ) + config.add_extension( + "_pairwise_distances_reduction", + sources=["_pairwise_distances_reduction.pyx"], + include_dirs=[np.get_include(), os.path.join(np.get_include(), "numpy")], + libraries=libraries, + ) + config.add_subpackage("tests") return config diff --git a/sklearn/metrics/tests/test_pairwise_distances_reduction.py b/sklearn/metrics/tests/test_pairwise_distances_reduction.py new file mode 100644 index 0000000000000..a4d51e4662740 --- /dev/null +++ b/sklearn/metrics/tests/test_pairwise_distances_reduction.py @@ -0,0 +1,357 @@ +import numpy as np +import pytest +from numpy.testing import assert_array_equal, assert_allclose +from scipy.sparse import csr_matrix +from scipy.spatial.distance import cdist + +from sklearn.metrics._pairwise_distances_reduction import ( + PairwiseDistancesReduction, + PairwiseDistancesArgKmin, +) + +from sklearn.utils.fixes import sp_version, parse_version + +# Common supported metric between scipy.spatial.distance.cdist +# and PairwiseDistancesReduction. +# This allows constructing tests to check consistency of results +# of concrete PairwiseDistancesReduction on some metrics using APIs +# from scipy and numpy. +CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS = [ + "braycurtis", + "canberra", + "chebyshev", + "cityblock", + "euclidean", + "minkowski", + "seuclidean", +] + + +def _get_dummy_metric_params_list(metric: str, n_features: int): + """Return list of dummy DistanceMetric kwargs for tests.""" + + # Distinguishing on cases not to compute unneeded datastructures. + rng = np.random.RandomState(1) + + if metric == "minkowski": + minkowski_kwargs = [dict(p=1.5), dict(p=2), dict(p=3), dict(p=np.inf)] + if sp_version >= parse_version("1.8.0.dev0"): + # TODO: remove the test once we no longer support scipy < 1.8.0. + # Recent scipy versions accept weights in the Minkowski metric directly: + # type: ignore + minkowski_kwargs.append(dict(p=3, w=rng.rand(n_features))) + + return minkowski_kwargs + + # TODO: remove this case for "wminkowski" once we no longer support scipy < 1.8.0. + if metric == "wminkowski": + weights = rng.random_sample(n_features) + weights /= weights.sum() + wminkowski_kwargs = [dict(p=1.5, w=weights)] + if sp_version < parse_version("1.8.0.dev0"): + # wminkowski was removed in scipy 1.8.0 but should work for previous + # versions. + wminkowski_kwargs.append(dict(p=3, w=rng.rand(n_features))) + return wminkowski_kwargs + + if metric == "seuclidean": + return [dict(V=rng.rand(n_features))] + + # Case of: "euclidean", "manhattan", "chebyshev", "haversine" or any other metric. + # In those cases, no kwargs is needed. + return [{}] + + +def assert_argkmin_results_equality(ref_dist, dist, ref_indices, indices): + assert_array_equal( + ref_indices, + indices, + err_msg="Query vectors have different neighbors' indices", + ) + assert_allclose( + ref_dist, + dist, + err_msg="Query vectors have different neighbors' distances", + rtol=1e-7, + ) + + +ASSERT_RESULT = { + PairwiseDistancesArgKmin: assert_argkmin_results_equality, +} + + +def test_pairwise_distances_reduction_is_usable_for(): + rng = np.random.RandomState(0) + X = rng.rand(100, 10) + Y = rng.rand(100, 10) + metric = "euclidean" + assert PairwiseDistancesReduction.is_usable_for(X, Y, metric) + assert not PairwiseDistancesReduction.is_usable_for( + X.astype(np.int64), Y.astype(np.int64), metric + ) + + assert not PairwiseDistancesReduction.is_usable_for(X, Y, metric="pyfunc") + # TODO: remove once 32 bits datasets are supported + assert not PairwiseDistancesReduction.is_usable_for(X.astype(np.float32), Y, metric) + assert not PairwiseDistancesReduction.is_usable_for(X, Y.astype(np.int32), metric) + + # TODO: remove once sparse matrices are supported + assert not PairwiseDistancesReduction.is_usable_for(csr_matrix(X), Y, metric) + assert not PairwiseDistancesReduction.is_usable_for(X, csr_matrix(Y), metric) + + +def test_argkmin_factory_method_wrong_usages(): + rng = np.random.RandomState(1) + X = rng.rand(100, 10) + Y = rng.rand(100, 10) + k = 5 + metric = "euclidean" + + msg = ( + "Only 64bit float datasets are supported at this time, " + "got: X.dtype=float32 and Y.dtype=float64" + ) + with pytest.raises(ValueError, match=msg): + PairwiseDistancesArgKmin.compute( + X=X.astype(np.float32), Y=Y, k=k, metric=metric + ) + + msg = ( + "Only 64bit float datasets are supported at this time, " + "got: X.dtype=float64 and Y.dtype=int32" + ) + with pytest.raises(ValueError, match=msg): + PairwiseDistancesArgKmin.compute(X=X, Y=Y.astype(np.int32), k=k, metric=metric) + + with pytest.raises(ValueError, match="k == -1, must be >= 1."): + PairwiseDistancesArgKmin.compute(X=X, Y=Y, k=-1, metric=metric) + + with pytest.raises(ValueError, match="k == 0, must be >= 1."): + PairwiseDistancesArgKmin.compute(X=X, Y=Y, k=0, metric=metric) + + with pytest.raises(ValueError, match="Unrecognized metric"): + PairwiseDistancesArgKmin.compute(X=X, Y=Y, k=k, metric="wrong metric") + + with pytest.raises( + ValueError, match=r"Buffer has wrong number of dimensions \(expected 2, got 1\)" + ): + PairwiseDistancesArgKmin.compute( + X=np.array([1.0, 2.0]), Y=Y, k=k, metric=metric + ) + + with pytest.raises(ValueError, match="ndarray is not C-contiguous"): + PairwiseDistancesArgKmin.compute( + X=np.asfortranarray(X), Y=Y, k=k, metric=metric + ) + + +@pytest.mark.parametrize("seed", range(5)) +@pytest.mark.parametrize("n_samples", [100, 1000]) +@pytest.mark.parametrize("chunk_size", [50, 512, 1024]) +@pytest.mark.parametrize( + "PairwiseDistancesReduction", + [PairwiseDistancesArgKmin], +) +def test_chunk_size_agnosticism( + PairwiseDistancesReduction, + seed, + n_samples, + chunk_size, + n_features=100, + dtype=np.float64, +): + # Results should not depend on the chunk size + rng = np.random.RandomState(seed) + spread = 100 + X = rng.rand(n_samples, n_features).astype(dtype) * spread + Y = rng.rand(n_samples, n_features).astype(dtype) * spread + + parameter = ( + 10 + if PairwiseDistancesReduction is PairwiseDistancesArgKmin + # Scaling the radius slightly with the numbers of dimensions + else 10 ** np.log(n_features) + ) + + ref_indices, ref_dist = PairwiseDistancesReduction.compute( + X, + Y, + parameter, + return_distance=True, + ) + + indices, dist = PairwiseDistancesReduction.compute( + X, + Y, + parameter, + chunk_size=chunk_size, + return_distance=True, + ) + + ASSERT_RESULT[PairwiseDistancesReduction](ref_dist, dist, ref_indices, indices) + + +@pytest.mark.parametrize("seed", range(5)) +@pytest.mark.parametrize("n_samples", [100, 1000]) +@pytest.mark.parametrize("chunk_size", [50, 512, 1024]) +@pytest.mark.parametrize( + "PairwiseDistancesReduction", + [PairwiseDistancesArgKmin], +) +def test_n_threads_agnosticism( + PairwiseDistancesReduction, + seed, + n_samples, + chunk_size, + n_features=100, + dtype=np.float64, +): + # Results should not depend on the number of threads + rng = np.random.RandomState(seed) + spread = 100 + X = rng.rand(n_samples, n_features).astype(dtype) * spread + Y = rng.rand(n_samples, n_features).astype(dtype) * spread + + parameter = ( + 10 + if PairwiseDistancesReduction is PairwiseDistancesArgKmin + # Scaling the radius slightly with the numbers of dimensions + else 10 ** np.log(n_features) + ) + + ref_indices, ref_dist = PairwiseDistancesReduction.compute( + X, + Y, + parameter, + return_distance=True, + ) + + indices, dist = PairwiseDistancesReduction.compute( + X, Y, parameter, n_threads=1, return_distance=True + ) + + ASSERT_RESULT[PairwiseDistancesReduction](ref_dist, dist, ref_indices, indices) + + +@pytest.mark.parametrize("seed", range(5)) +@pytest.mark.parametrize("n_samples", [100, 1000]) +@pytest.mark.parametrize("metric", PairwiseDistancesReduction.valid_metrics()) +@pytest.mark.parametrize( + "PairwiseDistancesReduction", + [PairwiseDistancesArgKmin], +) +def test_strategies_consistency( + PairwiseDistancesReduction, + metric, + n_samples, + seed, + n_features=10, + dtype=np.float64, +): + + rng = np.random.RandomState(seed) + spread = 100 + X = rng.rand(n_samples, n_features).astype(dtype) * spread + Y = rng.rand(n_samples, n_features).astype(dtype) * spread + + # Haversine distance only accepts 2D data + if metric == "haversine": + X = np.ascontiguousarray(X[:, :2]) + Y = np.ascontiguousarray(Y[:, :2]) + + parameter = ( + 10 + if PairwiseDistancesReduction is PairwiseDistancesArgKmin + # Scaling the radius slightly with the numbers of dimensions + else 10 ** np.log(n_features) + ) + + indices_par_X, dist_par_X = PairwiseDistancesReduction.compute( + X, + Y, + parameter, + metric=metric, + # Taking the first + metric_kwargs=_get_dummy_metric_params_list(metric, n_features)[0], + # To be sure to use parallelization + chunk_size=n_samples // 4, + strategy="parallel_on_X", + return_distance=True, + ) + + indices_par_Y, dist_par_Y = PairwiseDistancesReduction.compute( + X, + Y, + parameter, + metric=metric, + # Taking the first + metric_kwargs=_get_dummy_metric_params_list(metric, n_features)[0], + # To be sure to use parallelization + chunk_size=n_samples // 4, + strategy="parallel_on_Y", + return_distance=True, + ) + + ASSERT_RESULT[PairwiseDistancesReduction]( + dist_par_X, + dist_par_Y, + indices_par_X, + indices_par_Y, + ) + + +# Concrete PairwiseDistancesReductions tests + + +@pytest.mark.parametrize("n_features", [50, 500]) +@pytest.mark.parametrize("translation", [0, 1e8]) +@pytest.mark.parametrize("metric", CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS) +@pytest.mark.parametrize("strategy", ("parallel_on_X", "parallel_on_Y")) +def test_pairwise_distances_argkmin( + n_features, + translation, + metric, + strategy, + n_samples=100, + k=10, + dtype=np.float64, +): + rng = np.random.RandomState(0) + spread = 1000 + X = translation + rng.rand(n_samples, n_features).astype(dtype) * spread + Y = translation + rng.rand(n_samples, n_features).astype(dtype) * spread + + # Haversine distance only accepts 2D data + if metric == "haversine": + X = np.ascontiguousarray(X[:, :2]) + Y = np.ascontiguousarray(Y[:, :2]) + + metric_kwargs = _get_dummy_metric_params_list(metric, n_features)[0] + + # Reference for argkmin results + dist_matrix = cdist(X, Y, metric=metric, **metric_kwargs) + # Taking argkmin (indices of the k smallest values) + argkmin_indices_ref = np.argsort(dist_matrix, axis=1)[:, :k] + # Getting the associated distances + argkmin_distances_ref = np.zeros(argkmin_indices_ref.shape, dtype=np.float64) + for row_idx in range(argkmin_indices_ref.shape[0]): + argkmin_distances_ref[row_idx] = dist_matrix[ + row_idx, argkmin_indices_ref[row_idx] + ] + + argkmin_indices, argkmin_distances = PairwiseDistancesArgKmin.compute( + X, + Y, + k, + metric=metric, + metric_kwargs=metric_kwargs, + return_distance=True, + # So as to have more than a chunk, forcing parallelism. + chunk_size=n_samples // 4, + strategy=strategy, + ) + + ASSERT_RESULT[PairwiseDistancesArgKmin]( + argkmin_distances, argkmin_distances_ref, argkmin_indices, argkmin_indices_ref + ) diff --git a/sklearn/tests/test_config.py b/sklearn/tests/test_config.py index f78a9ff30b10a..e99eb5fc9db82 100644 --- a/sklearn/tests/test_config.py +++ b/sklearn/tests/test_config.py @@ -16,6 +16,7 @@ def test_config_context(): "working_memory": 1024, "print_changed_only": True, "display": "text", + "pairwise_dist_chunk_size": 256, } # Not using as a context manager affects nothing @@ -28,6 +29,7 @@ def test_config_context(): "working_memory": 1024, "print_changed_only": True, "display": "text", + "pairwise_dist_chunk_size": 256, } assert get_config()["assume_finite"] is False @@ -57,6 +59,7 @@ def test_config_context(): "working_memory": 1024, "print_changed_only": True, "display": "text", + "pairwise_dist_chunk_size": 256, } # No positional arguments diff --git a/sklearn/utils/_openmp_helpers.pxd b/sklearn/utils/_openmp_helpers.pxd new file mode 100644 index 0000000000000..e57fc9bfa6bf5 --- /dev/null +++ b/sklearn/utils/_openmp_helpers.pxd @@ -0,0 +1,6 @@ +# Helpers to access OpenMP threads information +# +# Those interfaces act as indirections which allows the non-support of OpenMP +# for implementations which have been written for it. + +cdef int _openmp_thread_num() nogil diff --git a/sklearn/utils/_openmp_helpers.pyx b/sklearn/utils/_openmp_helpers.pyx index fb8920074a84e..cddd77ac42746 100644 --- a/sklearn/utils/_openmp_helpers.pyx +++ b/sklearn/utils/_openmp_helpers.pyx @@ -6,7 +6,7 @@ IF SKLEARN_OPENMP_PARALLELISM_ENABLED: def _openmp_parallelism_enabled(): """Determines whether scikit-learn has been built with OpenMP - + It allows to retrieve at runtime the information gathered at compile time. """ # SKLEARN_OPENMP_PARALLELISM_ENABLED is resolved at compile time during @@ -22,7 +22,7 @@ cpdef _openmp_effective_n_threads(n_threads=None): - if the ``OMP_NUM_THREADS`` environment variable is set, return ``openmp.omp_get_max_threads()`` - otherwise, return the minimum between ``openmp.omp_get_max_threads()`` - and the number of cpus, taking cgroups quotas into account. Cgroups + and the number of cpus, taking cgroups quotas into account. Cgroups quotas can typically be set by tools such as Docker. The result of ``omp_get_max_threads`` can be influenced by environment variable ``OMP_NUM_THREADS`` or at runtime by ``omp_set_num_threads``. @@ -59,4 +59,13 @@ cpdef _openmp_effective_n_threads(n_threads=None): # OpenMP disabled at build-time => sequential mode return 1 - + +cdef inline int _openmp_thread_num() nogil: + """Return the number of the thread calling this function. + + If scikit-learn is built without OpenMP support, always return 0. + """ + IF SKLEARN_OPENMP_PARALLELISM_ENABLED: + return openmp.omp_get_thread_num() + ELSE: + return 0 From 60dcd130c99aa60dab2daf66ae31c3de7b20626f Mon Sep 17 00:00:00 2001 From: Julien Jerphanion Date: Thu, 13 Jan 2022 16:41:47 +0100 Subject: [PATCH 2/6] MAINT Introduce FastEuclideanPairwiseArgKmin (#22065) Co-authored-by: Olivier Grisel Co-authored-by: Thomas J. Fan --- .../metrics/_pairwise_distances_reduction.pyx | 302 ++++++++++++++++-- .../test_pairwise_distances_reduction.py | 31 +- sklearn/utils/__init__.py | 35 +- sklearn/utils/_testing.py | 11 +- 4 files changed, 347 insertions(+), 32 deletions(-) diff --git a/sklearn/metrics/_pairwise_distances_reduction.pyx b/sklearn/metrics/_pairwise_distances_reduction.pyx index 830df08e1a952..76420b50a1b5e 100644 --- a/sklearn/metrics/_pairwise_distances_reduction.pyx +++ b/sklearn/metrics/_pairwise_distances_reduction.pyx @@ -16,7 +16,6 @@ cimport numpy as np import numpy as np import warnings -import scipy.sparse from .. import get_config from libc.stdlib cimport free, malloc @@ -24,7 +23,17 @@ from libc.float cimport DBL_MAX from cython cimport final from cython.parallel cimport parallel, prange -from ._dist_metrics cimport DatasetsPair +from ._dist_metrics cimport DatasetsPair, DenseDenseDatasetsPair +from ..utils._cython_blas cimport ( + BLAS_Order, + BLAS_Trans, + ColMajor, + NoTrans, + RowMajor, + Trans, + _dot, + _gemm, +) from ..utils._heap cimport simultaneous_sort, heap_push from ..utils._openmp_helpers cimport _openmp_thread_num from ..utils._typedefs cimport ITYPE_t, DTYPE_t @@ -33,7 +42,7 @@ from numbers import Integral from typing import List from scipy.sparse import issparse from ._dist_metrics import BOOL_METRICS, METRIC_MAPPING -from ..utils import check_scalar +from ..utils import check_scalar, _in_unstable_openblas_configuration from ..utils.fixes import threadpool_limits from ..utils._openmp_helpers import _openmp_effective_n_threads from ..utils._typedefs import ITYPE, DTYPE @@ -41,11 +50,37 @@ from ..utils._typedefs import ITYPE, DTYPE np.import_array() +cpdef DTYPE_t[::1] _sqeuclidean_row_norms( + const DTYPE_t[:, ::1] X, + ITYPE_t num_threads, +): + """Compute the squared euclidean norm of the rows of X in parallel. + + This is faster than using np.einsum("ij, ij->i") even when using a single thread. + """ + cdef: + # Casting for X to remove the const qualifier is needed because APIs + # exposed via scipy.linalg.cython_blas aren't reflecting the arguments' + # const qualifier. + # See: https://github.com/scipy/scipy/issues/14262 + DTYPE_t * X_ptr = &X[0, 0] + ITYPE_t idx = 0 + ITYPE_t n = X.shape[0] + ITYPE_t d = X.shape[1] + DTYPE_t[::1] squared_row_norms = np.empty(n, dtype=DTYPE) + + for idx in prange(n, schedule='static', nogil=True, num_threads=num_threads): + squared_row_norms[idx] = _dot(d, X_ptr + idx * d, 1, X_ptr + idx * d, 1) + + return squared_row_norms + + + cdef class PairwiseDistancesReduction: """Abstract base class for pairwise distance computation & reduction. Subclasses of this class compute pairwise distances between a set of - row vectors of X and another set of row vectors pf Y and apply a reduction on top. + row vectors of X and another set of row vectors of Y and apply a reduction on top. The reduction takes a matrix of pairwise distances between rows of X and Y as input and outputs an aggregate data-structure for each row of X. The aggregate values are typically smaller than the number of rows in Y, @@ -180,14 +215,12 @@ cdef class PairwiseDistancesReduction: not issparse(Y) and Y.dtype == np.float64 and metric in cls.valid_metrics()) - def __cinit__( + def __init__( self, DatasetsPair datasets_pair, chunk_size=None, n_threads=None, strategy=None, - *args, - **kwargs, ): cdef: ITYPE_t n_samples_chunk, X_n_full_chunks, Y_n_full_chunks @@ -611,13 +644,32 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): # For future work, this might can be an entrypoint to specialise operations # for various back-end and/or hardware and/or datatypes, and/or fused # {sparse, dense}-datasetspair etc. - - pda = PairwiseDistancesArgKmin( - datasets_pair=DatasetsPair.get_for(X, Y, metric, metric_kwargs), - k=k, - chunk_size=chunk_size, - strategy=strategy, - ) + if ( + metric in ("euclidean", "sqeuclidean") + and not issparse(X) + and not issparse(Y) + ): + # Specialized implementation with improved arithmetic intensity + # and vector instructions (SIMD) by processing several vectors + # at time to leverage a call to the BLAS GEMM routine as explained + # in more details in the docstring. + use_squared_distances = metric == "sqeuclidean" + pda = FastEuclideanPairwiseDistancesArgKmin( + X=X, Y=Y, k=k, + use_squared_distances=use_squared_distances, + chunk_size=chunk_size, + strategy=strategy, + metric_kwargs=metric_kwargs, + ) + else: + # Fall back on a generic implementation that handles most scipy + # metrics by computing the distances between 2 vectors at a time. + pda = PairwiseDistancesArgKmin( + datasets_pair=DatasetsPair.get_for(X, Y, metric, metric_kwargs), + k=k, + chunk_size=chunk_size, + strategy=strategy, + ) # Limit the number of threads in second level of nested parallelism for BLAS # to avoid threads over-subscription (in GEMM for instance). @@ -629,15 +681,22 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): return pda._finalize_results(return_distance) - def __cinit__( + def __init__( self, DatasetsPair datasets_pair, chunk_size=None, n_threads=None, strategy=None, - *args, - **kwargs, - ): + ITYPE_t k=1, + ): + super().__init__( + datasets_pair=datasets_pair, + chunk_size=chunk_size, + n_threads=n_threads, + strategy=strategy, + ) + self.k = check_scalar(k, "k", Integral, min_val=1) + # Allocating pointers to datastructures but not the datastructures themselves. # There are as many pointers as effective threads. # @@ -654,16 +713,6 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): sizeof(ITYPE_t *) * self.chunks_n_threads ) - def __init__( - self, - DatasetsPair datasets_pair, - chunk_size=None, - n_threads=None, - strategy=None, - ITYPE_t k=1, - ): - self.k = check_scalar(k, "k", Integral, min_val=1) - # Main heaps which will be returned as results by `PairwiseDistancesArgKmin.compute`. self.argkmin_indices = np.full((self.n_samples_X, self.k), 0, dtype=ITYPE) self.argkmin_distances = np.full((self.n_samples_X, self.k), DBL_MAX, dtype=DTYPE) @@ -837,3 +886,200 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): return np.asarray(self.argkmin_indices), np.asarray(self.argkmin_distances) return np.asarray(self.argkmin_indices) + + +cdef class FastEuclideanPairwiseDistancesArgKmin(PairwiseDistancesArgKmin): + """Fast specialized alternative for PairwiseDistancesArgKmin on EuclideanDistance. + + The full pairwise squared distances matrix is computed as follows: + + ||X - Y||² = ||X||² - 2 X.Y^T + ||Y||² + + The middle term gets computed efficiently bellow using BLAS Level 3 GEMM. + + Notes + ----- + This implementation has a superior arithmetic intensity and hence + better running time when the alternative is IO bound, but it can suffer + from numerical instability caused by catastrophic cancellation potentially + introduced by the subtraction in the arithmetic expression above. + """ + + cdef: + const DTYPE_t[:, ::1] X + const DTYPE_t[:, ::1] Y + const DTYPE_t[::1] X_norm_squared + const DTYPE_t[::1] Y_norm_squared + + # Buffers for GEMM + DTYPE_t ** dist_middle_terms_chunks + bint use_squared_distances + + @classmethod + def is_usable_for(cls, X, Y, metric) -> bool: + return (PairwiseDistancesArgKmin.is_usable_for(X, Y, metric) and + not _in_unstable_openblas_configuration()) + + def __init__( + self, + X, + Y, + ITYPE_t k, + bint use_squared_distances=False, + chunk_size=None, + n_threads=None, + strategy=None, + metric_kwargs=None, + ): + if metric_kwargs is not None and len(metric_kwargs) > 0: + warnings.warn( + f"Some metric_kwargs have been passed ({metric_kwargs}) but aren't" + f"usable for this case ({self.__class__.__name__}) and will be ignored.", + UserWarning, + stacklevel=3, + ) + + super().__init__( + # The datasets pair here is used for exact distances computations + datasets_pair=DatasetsPair.get_for(X, Y, metric="euclidean"), + chunk_size=chunk_size, + n_threads=n_threads, + strategy=strategy, + k=k, + ) + # X and Y are checked by the DatasetsPair implemented as a DenseDenseDatasetsPair + cdef: + DenseDenseDatasetsPair datasets_pair = self.datasets_pair + self.X, self.Y = datasets_pair.X, datasets_pair.Y + + if metric_kwargs is not None and "Y_norm_squared" in metric_kwargs: + self.Y_norm_squared = metric_kwargs.pop("Y_norm_squared") + else: + self.Y_norm_squared = _sqeuclidean_row_norms(self.Y, self.effective_n_threads) + + # Do not recompute norms if datasets are identical. + self.X_norm_squared = ( + self.Y_norm_squared if X is Y else + _sqeuclidean_row_norms(self.X, self.effective_n_threads) + ) + self.use_squared_distances = use_squared_distances + + # Temporary datastructures used in threads + self.dist_middle_terms_chunks = malloc( + sizeof(DTYPE_t *) * self.chunks_n_threads + ) + + def __dealloc__(self): + if self.dist_middle_terms_chunks is not NULL: + free(self.dist_middle_terms_chunks) + + @final + cdef void compute_exact_distances(self) nogil: + if not self.use_squared_distances: + PairwiseDistancesArgKmin.compute_exact_distances(self) + + @final + cdef void _parallel_on_X_parallel_init( + self, + ITYPE_t thread_num, + ) nogil: + PairwiseDistancesArgKmin._parallel_on_X_parallel_init(self, thread_num) + + # Temporary buffer for the `-2 * X_c @ Y_c.T` term + self.dist_middle_terms_chunks[thread_num] = malloc( + self.Y_n_samples_chunk * self.X_n_samples_chunk * sizeof(DTYPE_t) + ) + + @final + cdef void _parallel_on_X_parallel_finalize( + self, + ITYPE_t thread_num + ) nogil: + PairwiseDistancesArgKmin._parallel_on_X_parallel_finalize(self, thread_num) + free(self.dist_middle_terms_chunks[thread_num]) + + @final + cdef void _parallel_on_Y_init( + self, + ) nogil: + cdef ITYPE_t thread_num + PairwiseDistancesArgKmin._parallel_on_Y_init(self) + + for thread_num in range(self.chunks_n_threads): + # Temporary buffer for the `-2 * X_c @ Y_c.T` term + self.dist_middle_terms_chunks[thread_num] = malloc( + self.Y_n_samples_chunk * self.X_n_samples_chunk * sizeof(DTYPE_t) + ) + + @final + cdef void _parallel_on_Y_finalize( + self, + ) nogil: + cdef ITYPE_t thread_num + PairwiseDistancesArgKmin._parallel_on_Y_finalize(self) + + for thread_num in range(self.chunks_n_threads): + free(self.dist_middle_terms_chunks[thread_num]) + + @final + cdef void _compute_and_reduce_distances_on_chunks( + self, + ITYPE_t X_start, + ITYPE_t X_end, + ITYPE_t Y_start, + ITYPE_t Y_end, + ITYPE_t thread_num, + ) nogil: + cdef: + ITYPE_t i, j + + const DTYPE_t[:, ::1] X_c = self.X[X_start:X_end, :] + const DTYPE_t[:, ::1] Y_c = self.Y[Y_start:Y_end, :] + DTYPE_t *dist_middle_terms = self.dist_middle_terms_chunks[thread_num] + DTYPE_t *heaps_r_distances = self.heaps_r_distances_chunks[thread_num] + ITYPE_t *heaps_indices = self.heaps_indices_chunks[thread_num] + + # Careful: LDA, LDB and LDC are given for F-ordered arrays + # in BLAS documentations, for instance: + # https://www.netlib.org/lapack/explore-html/db/dc9/group__single__blas__level3_gafe51bacb54592ff5de056acabd83c260.html #noqa + # + # Here, we use their counterpart values to work with C-ordered arrays. + BLAS_Order order = RowMajor + BLAS_Trans ta = NoTrans + BLAS_Trans tb = Trans + ITYPE_t m = X_c.shape[0] + ITYPE_t n = Y_c.shape[0] + ITYPE_t K = X_c.shape[1] + DTYPE_t alpha = - 2. + # Casting for A and B to remove the const is needed because APIs exposed via + # scipy.linalg.cython_blas aren't reflecting the arguments' const qualifier. + # See: https://github.com/scipy/scipy/issues/14262 + DTYPE_t * A = & X_c[0, 0] + ITYPE_t lda = X_c.shape[1] + DTYPE_t * B = & Y_c[0, 0] + ITYPE_t ldb = X_c.shape[1] + DTYPE_t beta = 0. + ITYPE_t ldc = Y_c.shape[0] + + # dist_middle_terms = `-2 * X_c @ Y_c.T` + _gemm(order, ta, tb, m, n, K, alpha, A, lda, B, ldb, beta, dist_middle_terms, ldc) + + # Pushing the distance and their associated indices on heaps + # which keep tracks of the argkmin. + for i in range(X_c.shape[0]): + for j in range(Y_c.shape[0]): + heap_push( + heaps_r_distances + i * self.k, + heaps_indices + i * self.k, + self.k, + # Using the squared euclidean distance as the rank-preserving distance: + # + # ||X_c_i||² - 2 X_c_i.Y_c_j^T + ||Y_c_j||² + # + ( + self.X_norm_squared[i + X_start] + + dist_middle_terms[i * Y_c.shape[0] + j] + + self.Y_norm_squared[j + Y_start] + ), + j + Y_start, + ) diff --git a/sklearn/metrics/tests/test_pairwise_distances_reduction.py b/sklearn/metrics/tests/test_pairwise_distances_reduction.py index a4d51e4662740..e975aad55bf9c 100644 --- a/sklearn/metrics/tests/test_pairwise_distances_reduction.py +++ b/sklearn/metrics/tests/test_pairwise_distances_reduction.py @@ -7,8 +7,10 @@ from sklearn.metrics._pairwise_distances_reduction import ( PairwiseDistancesReduction, PairwiseDistancesArgKmin, + _sqeuclidean_row_norms, ) +from sklearn.metrics import euclidean_distances from sklearn.utils.fixes import sp_version, parse_version # Common supported metric between scipy.spatial.distance.cdist @@ -305,7 +307,7 @@ def test_strategies_consistency( @pytest.mark.parametrize("n_features", [50, 500]) -@pytest.mark.parametrize("translation", [0, 1e8]) +@pytest.mark.parametrize("translation", [0, 1e6]) @pytest.mark.parametrize("metric", CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS) @pytest.mark.parametrize("strategy", ("parallel_on_X", "parallel_on_Y")) def test_pairwise_distances_argkmin( @@ -330,7 +332,11 @@ def test_pairwise_distances_argkmin( metric_kwargs = _get_dummy_metric_params_list(metric, n_features)[0] # Reference for argkmin results - dist_matrix = cdist(X, Y, metric=metric, **metric_kwargs) + if metric == "euclidean": + # Compare to scikit-learn GEMM optimized implementation + dist_matrix = euclidean_distances(X, Y) + else: + dist_matrix = cdist(X, Y, metric=metric, **metric_kwargs) # Taking argkmin (indices of the k smallest values) argkmin_indices_ref = np.argsort(dist_matrix, axis=1)[:, :k] # Getting the associated distances @@ -355,3 +361,24 @@ def test_pairwise_distances_argkmin( ASSERT_RESULT[PairwiseDistancesArgKmin]( argkmin_distances, argkmin_distances_ref, argkmin_indices, argkmin_indices_ref ) + + +@pytest.mark.parametrize("seed", range(10)) +@pytest.mark.parametrize("n_samples", [100, 1000]) +@pytest.mark.parametrize("n_features", [5, 10, 100]) +@pytest.mark.parametrize("num_threads", [1, 2, 8]) +def test_sqeuclidean_row_norms( + seed, + n_samples, + n_features, + num_threads, + dtype=np.float64, +): + rng = np.random.RandomState(seed) + spread = 100 + X = rng.rand(n_samples, n_features).astype(dtype) * spread + + sq_row_norm_reference = np.linalg.norm(X, axis=1) ** 2 + sq_row_norm = np.asarray(_sqeuclidean_row_norms(X, num_threads=num_threads)) + + assert_allclose(sq_row_norm_reference, sq_row_norm) diff --git a/sklearn/utils/__init__.py b/sklearn/utils/__init__.py index 3d8a1ca87d210..4b2261ad7c2f4 100644 --- a/sklearn/utils/__init__.py +++ b/sklearn/utils/__init__.py @@ -26,7 +26,7 @@ from . import _joblib from ..exceptions import DataConversionWarning from .deprecation import deprecated -from .fixes import np_version, parse_version +from .fixes import np_version, parse_version, threadpool_info from ._estimator_html_repr import estimator_html_repr from .validation import ( as_float_array, @@ -81,6 +81,39 @@ _IS_32BIT = 8 * struct.calcsize("P") == 32 +def _in_unstable_openblas_configuration(): + """Return True if in an unstable configuration for OpenBLAS""" + + # Import libraries which might load OpenBLAS. + import numpy # noqa + import scipy # noqa + + modules_info = threadpool_info() + + open_blas_used = any(info["internal_api"] == "openblas" for info in modules_info) + if not open_blas_used: + return False + + # OpenBLAS 0.3.16 fixed unstability for arm64, see: + # https://github.com/xianyi/OpenBLAS/blob/1b6db3dbba672b4f8af935bd43a1ff6cff4d20b7/Changelog.txt#L56-L58 # noqa + openblas_arm64_stable_version = parse_version("0.3.16") + for info in modules_info: + if info["internal_api"] != "openblas": + continue + openblas_version = info.get("version") + openblas_architecture = info.get("architecture") + if openblas_version is None or openblas_architecture is None: + # Cannot be sure that OpenBLAS is good enough. Assume unstable: + return True + if ( + openblas_architecture == "neoversen1" + and parse_version(openblas_version) < openblas_arm64_stable_version + ): + # See discussions in https://github.com/numpy/numpy/issues/19411 + return True + return False + + class Bunch(dict): """Container object exposing keys as attributes. diff --git a/sklearn/utils/_testing.py b/sklearn/utils/_testing.py index 1724063be2f43..6f58ce3f3b7b4 100644 --- a/sklearn/utils/_testing.py +++ b/sklearn/utils/_testing.py @@ -48,7 +48,12 @@ import joblib import sklearn -from sklearn.utils import IS_PYPY, _IS_32BIT, deprecated +from sklearn.utils import ( + IS_PYPY, + _IS_32BIT, + deprecated, + _in_unstable_openblas_configuration, +) from sklearn.utils.multiclass import check_classification_targets from sklearn.utils.validation import ( check_array, @@ -448,6 +453,10 @@ def set_random_state(estimator, random_state=0): os.environ.get("TRAVIS") == "true", reason="skip on travis" ) fails_if_pypy = pytest.mark.xfail(IS_PYPY, reason="not compatible with PyPy") + fails_if_unstable_openblas = pytest.mark.xfail( + _in_unstable_openblas_configuration(), + reason="OpenBLAS is unstable for this configuration", + ) skip_if_no_parallel = pytest.mark.skipif( not joblib.parallel.mp, reason="joblib is in serial mode" ) From 1abb44186fc534d4d2ed1c7f96103f9621dff261 Mon Sep 17 00:00:00 2001 From: Julien Jerphanion Date: Thu, 13 Jan 2022 18:35:53 +0100 Subject: [PATCH 3/6] fixup! Merge branch 'main' into pairwise-distances-argkmin Remove duplicated Bunch --- sklearn/utils/__init__.py | 50 --------------------------------------- 1 file changed, 50 deletions(-) diff --git a/sklearn/utils/__init__.py b/sklearn/utils/__init__.py index 279e6cca8d335..1fc622f6a4538 100644 --- a/sklearn/utils/__init__.py +++ b/sklearn/utils/__init__.py @@ -116,56 +116,6 @@ def _in_unstable_openblas_configuration(): return False -class Bunch(dict): - """Container object exposing keys as attributes. - - Bunch objects are sometimes used as an output for functions and methods. - They extend dictionaries by enabling values to be accessed by key, - `bunch["value_key"]`, or by an attribute, `bunch.value_key`. - - Examples - -------- - >>> from sklearn.utils import Bunch - >>> b = Bunch(a=1, b=2) - >>> b['b'] - 2 - >>> b.b - 2 - >>> b.a = 3 - >>> b['a'] - 3 - >>> b.c = 6 - >>> b['c'] - 6 - """ - - def __init__(self, **kwargs): - super().__init__(kwargs) - - def __setattr__(self, key, value): - self[key] = value - - def __dir__(self): - return self.keys() - - def __getattr__(self, key): - try: - return self[key] - except KeyError: - raise AttributeError(key) - - def __setstate__(self, state): - # Bunch pickles generated with scikit-learn 0.16.* have an non - # empty __dict__. This causes a surprising behaviour when - # loading these pickles scikit-learn 0.17: reading bunch.key - # uses __dict__ but assigning to bunch.key use __setattr__ and - # only changes bunch['key']. More details can be found at: - # https://github.com/scikit-learn/scikit-learn/issues/6196. - # Overriding __setstate__ to be a noop has the effect of - # ignoring the pickled __dict__ - pass - - def safe_mask(X, mask): """Return a mask which is safe to use on X. From 5a4d710d7d4bdf3fd8adad471ae7504d86554196 Mon Sep 17 00:00:00 2001 From: Julien Jerphanion Date: Thu, 10 Feb 2022 16:23:47 +0100 Subject: [PATCH 4/6] MAINT Plug `PairwiseDistancesArgKmin` as a back-end (#22288) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Forward pairwise_dist_chunk_size in the configuration * Flip finalized results for PairwiseDistancesArgKmin The previous would have made the code more complex by introducing some boilerplate for the interface plugs. Having it this way actually simplifies the code. This also removes the haversine branch for test_pairwise_distances_argkmin Co-authored-by: Olivier Grisel * Plug PairwiseDistancesArgKmin as a back-end * Adapt test accordingly * Add whats_new entry * Change input validation order for kneighbors * Remove duplicated test_neighbors_distance_metric_deprecation Co-authored-by: Thomas J. Fan * Adapt the documentation Co-authored-by: Thomas J. Fan * Add mahalanobis case to test fixtures * Correct whats_new entry * CLN Remove unneeded private metric attribute This was needed when 'fast_sqeuclidean' and 'fast_euclidean' were present to choose the best implementation based on the user specification. Those metric have been removed since then, making this attribute useless. Co-authored-by: Thomas J. Fan * TST Assert FutureWarning instead of DeprecationWarning in test_neighbors_metrics * MAINT Add use_pairwise_dist_activate to scikit-learn config * TST Add a test for the 'brute' backends' results' consistency Co-authored-by: Thomas J. Fan * fixup! MAINT Add use_pairwise_dist_activate to scikit-learn config * fixup! fixup! MAINT Add use_pairwise_dist_activate to scikit-learn config * TST Filter FutureWarning for WMinkowskiDistance * MAINT pin numpydoc in arm for now (#22292) * fixup! TST Filter FutureWarning for WMinkowskiDistance * Revert keywords arguments removal for the GEMM trick for 'euclidean' * MAINT pin max numpydoc for now (#22286) * Add 'haversine' to CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS Co-authored-by: Olivier Grisel * fixup! Add 'haversine' to CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS * Apply suggestions from code review Co-authored-by: Olivier Grisel * MAINT Document some config parameters for maintenance Also rename one of them. Co-authored-by: Thomas J. Fan Co-authored-by: Olivier Grisel * FIX Support and test one of 'sqeuclidean' specification Co-authored-by: Olivier Grisel * FIX Various typos fix and correct haversine 'haversine' is not supported by cdist. * Directly use get_config * CLN Apply comments from review Co-authored-by: Christian Lorentzen Co-authored-by: Jérémie du Boisberranger * Motivate swapped returned values Co-authored-by: Jérémie du Boisberranger Co-authored-by: Thomas J. Fan * TST Remove mahalanobis from test fixtures Co-authored-by: Jérémie du Boisberranger * MNT Add comment regaduction functions' signatures Co-authored-by: Christian Lorentzen Co-authored-by: Olivier Grisel * TST Complete test for `pairwise_distance_{argmin,argmin_min}` (#22371) * DOC Add sub-pull requests to the whats_new entry --- doc/whats_new/v1.1.rst | 32 ++++++ sklearn/_config.py | 37 ++++++- sklearn/metrics/_dist_metrics.pyx | 1 + .../metrics/_pairwise_distances_reduction.pyx | 15 ++- sklearn/metrics/pairwise.py | 102 +++++++++++++++--- sklearn/metrics/tests/test_dist_metrics.py | 12 +-- sklearn/metrics/tests/test_pairwise.py | 24 +++-- .../test_pairwise_distances_reduction.py | 19 ++-- sklearn/neighbors/_base.py | 54 ++++++++-- sklearn/neighbors/tests/test_neighbors.py | 64 ++++++++++- sklearn/tests/test_config.py | 3 + 11 files changed, 305 insertions(+), 58 deletions(-) diff --git a/doc/whats_new/v1.1.rst b/doc/whats_new/v1.1.rst index 39f8e405ebf7c..b961e249d376a 100644 --- a/doc/whats_new/v1.1.rst +++ b/doc/whats_new/v1.1.rst @@ -617,6 +617,38 @@ Changelog left corner of the HTML representation to show how the elements are clickable. :pr:`21298` by `Thomas Fan`_. +Miscellaneous +............. + +- |Efficiency| Low-level routines for reductions on pairwise distances + for dense float64 datasets have been refactored. The following functions + and estimators now benefit from improved performances, in particular on + multi-cores machines: + - :func:`sklearn.metrics.pairwise_distances_argmin` + - :func:`sklearn.metrics.pairwise_distances_argmin_min` + - :class:`sklearn.cluster.AffinityPropagation` + - :class:`sklearn.cluster.Birch` + - :class:`sklearn.cluster.MeanShift` + - :class:`sklearn.cluster.OPTICS` + - :class:`sklearn.cluster.SpectralClustering` + - :func:`sklearn.feature_selection.mutual_info_regression` + - :class:`sklearn.neighbors.KNeighborsClassifier` + - :class:`sklearn.neighbors.KNeighborsRegressor` + - :class:`sklearn.neighbors.LocalOutlierFactor` + - :class:`sklearn.neighbors.NearestNeighbors` + - :class:`sklearn.manifold.Isomap` + - :class:`sklearn.manifold.LocallyLinearEmbedding` + - :class:`sklearn.manifold.TSNE` + - :func:`sklearn.manifold.trustworthiness` + - :class:`sklearn.semi_supervised.LabelPropagation` + - :class:`sklearn.semi_supervised.LabelSpreading` + + For instance :class:`sklearn.neighbors.NearestNeighbors.kneighbors` + can be up to ×20 faster than in the previous versions'. + + :pr:`21987`, :pr:`22064`, :pr:`22065` and :pr:`22288` + by :user:`Julien Jerphanion ` + - |Fix| :func:`check_scalar` raises an error when `include_boundaries={"left", "right"}` and the boundaries are not set. :pr:`22027` by `Marie Lanternier `. diff --git a/sklearn/_config.py b/sklearn/_config.py index d6a02737f640d..6248025b05aa0 100644 --- a/sklearn/_config.py +++ b/sklearn/_config.py @@ -12,6 +12,7 @@ "pairwise_dist_chunk_size": int( os.environ.get("SKLEARN_PAIRWISE_DIST_CHUNK_SIZE", 256) ), + "enable_cython_pairwise_dist": True, } _threadlocal = threading.local() @@ -48,6 +49,7 @@ def set_config( print_changed_only=None, display=None, pairwise_dist_chunk_size=None, + enable_cython_pairwise_dist=None, ): """Set global scikit-learn configuration @@ -88,9 +90,23 @@ def set_config( .. versionadded:: 0.23 pairwise_dist_chunk_size : int, default=None - The number of vectors per chunk for PairwiseDistancesReduction. + The number of row vectors per chunk for PairwiseDistancesReduction. Default is 256 (suitable for most of modern laptops' caches and architectures). + Intended for easier benchmarking and testing of scikit-learn internals. + End users are not expected to benefit from customizing this configuration + setting. + + .. versionadded:: 1.1 + + enable_cython_pairwise_dist : bool, default=None + Use PairwiseDistancesReduction when possible. + Default is True. + + Intended for easier benchmarking and testing of scikit-learn internals. + End users are not expected to benefit from customizing this configuration + setting. + .. versionadded:: 1.1 See Also @@ -110,6 +126,8 @@ def set_config( local_config["display"] = display if pairwise_dist_chunk_size is not None: local_config["pairwise_dist_chunk_size"] = pairwise_dist_chunk_size + if enable_cython_pairwise_dist is not None: + local_config["enable_cython_pairwise_dist"] = enable_cython_pairwise_dist @contextmanager @@ -120,6 +138,7 @@ def config_context( print_changed_only=None, display=None, pairwise_dist_chunk_size=None, + enable_cython_pairwise_dist=None, ): """Context manager for global scikit-learn configuration. @@ -162,6 +181,20 @@ def config_context( The number of vectors per chunk for PairwiseDistancesReduction. Default is 256 (suitable for most of modern laptops' caches and architectures). + Intended for easier benchmarking and testing of scikit-learn internals. + End users are not expected to benefit from customizing this configuration + setting. + + .. versionadded:: 1.1 + + enable_cython_pairwise_dist : bool, default=None + Use PairwiseDistancesReduction when possible. + Default is True. + + Intended for easier benchmarking and testing of scikit-learn internals. + End users are not expected to benefit from customizing this configuration + setting. + .. versionadded:: 1.1 Yields @@ -197,6 +230,8 @@ def config_context( working_memory=working_memory, print_changed_only=print_changed_only, display=display, + pairwise_dist_chunk_size=pairwise_dist_chunk_size, + enable_cython_pairwise_dist=enable_cython_pairwise_dist, ) try: diff --git a/sklearn/metrics/_dist_metrics.pyx b/sklearn/metrics/_dist_metrics.pyx index 370116d510910..c442be0398980 100644 --- a/sklearn/metrics/_dist_metrics.pyx +++ b/sklearn/metrics/_dist_metrics.pyx @@ -71,6 +71,7 @@ METRIC_MAPPING = {'euclidean': EuclideanDistance, 'pyfunc': PyFuncDistance} BOOL_METRICS = [ + "hamming", "matching", "jaccard", "dice", diff --git a/sklearn/metrics/_pairwise_distances_reduction.pyx b/sklearn/metrics/_pairwise_distances_reduction.pyx index 76420b50a1b5e..df0918bb61334 100644 --- a/sklearn/metrics/_pairwise_distances_reduction.pyx +++ b/sklearn/metrics/_pairwise_distances_reduction.pyx @@ -211,7 +211,8 @@ cdef class PairwiseDistancesReduction: True if the PairwiseDistancesReduction can be used, else False. """ # TODO: support sparse arrays and 32 bits - return (not issparse(X) and X.dtype == np.float64 and + return (get_config().get("enable_cython_pairwise_dist", True) and + not issparse(X) and X.dtype == np.float64 and not issparse(Y) and Y.dtype == np.float64 and metric in cls.valid_metrics()) @@ -621,10 +622,10 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): Indices of the argkmin for each vector in X. If return_distance=True: - - argkmin_indices : ndarray of shape (n_samples_X, k) - Indices of the argkmin for each vector in X. - argkmin_distances : ndarray of shape (n_samples_X, k) Distances to the argkmin for each vector in X. + - argkmin_indices : ndarray of shape (n_samples_X, k) + Indices of the argkmin for each vector in X. Notes ----- @@ -642,7 +643,7 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): # Note (jjerphan): Some design thoughts for future extensions. # This factory comes to handle specialisations for the given arguments. # For future work, this might can be an entrypoint to specialise operations - # for various back-end and/or hardware and/or datatypes, and/or fused + # for various backend and/or hardware and/or datatypes, and/or fused # {sparse, dense}-datasetspair etc. if ( metric in ("euclidean", "sqeuclidean") @@ -883,7 +884,11 @@ cdef class PairwiseDistancesArgKmin(PairwiseDistancesReduction): # We need to recompute distances because we relied on # surrogate distances for the reduction. self.compute_exact_distances() - return np.asarray(self.argkmin_indices), np.asarray(self.argkmin_distances) + + # Values are returned identically to the way `KNeighborsMixin.kneighbors` + # returns values. This is counter-intuitive but this allows not using + # complex adaptations where `PairwiseDistancesArgKmin.compute` is called. + return np.asarray(self.argkmin_distances), np.asarray(self.argkmin_indices) return np.asarray(self.argkmin_indices) diff --git a/sklearn/metrics/pairwise.py b/sklearn/metrics/pairwise.py index 9643b7c39ee7d..2eec32d1682af 100644 --- a/sklearn/metrics/pairwise.py +++ b/sklearn/metrics/pairwise.py @@ -19,6 +19,7 @@ from scipy.sparse import issparse from joblib import Parallel, effective_n_jobs +from .. import config_context from ..utils.validation import _num_samples from ..utils.validation import check_non_negative from ..utils import check_array @@ -31,6 +32,7 @@ from ..utils.fixes import delayed from ..utils.fixes import sp_version, parse_version +from ._pairwise_distances_reduction import PairwiseDistancesArgKmin from ._pairwise_fast import _chi2_kernel_fast, _sparse_manhattan from ..exceptions import DataConversionWarning @@ -576,12 +578,23 @@ def _euclidean_distances_upcast(X, XX=None, Y=None, YY=None, batch_size=None): return distances +# start is specified in the signature of `_argmin_min_reduce` +# and of `_argmin_reduce` but is not used. +# This is because the higher order `pairwise_distances_chunked` +# function needs reduction functions that are passed as argument +# to have a two arguments signature. + + def _argmin_min_reduce(dist, start): indices = dist.argmin(axis=1) values = dist[np.arange(dist.shape[0]), indices] return indices, values +def _argmin_reduce(dist, start): + return dist.argmin(axis=1) + + def pairwise_distances_argmin_min( X, Y, *, axis=1, metric="euclidean", metric_kwargs=None ): @@ -654,19 +667,44 @@ def pairwise_distances_argmin_min( """ X, Y = check_pairwise_arrays(X, Y) - if metric_kwargs is None: - metric_kwargs = {} - if axis == 0: X, Y = Y, X - indices, values = zip( - *pairwise_distances_chunked( - X, Y, reduce_func=_argmin_min_reduce, metric=metric, **metric_kwargs + if metric_kwargs is None: + metric_kwargs = {} + + if PairwiseDistancesArgKmin.is_usable_for(X, Y, metric): + # This is an adaptor for one "sqeuclidean" specification. + # For this backend, we can directly use "sqeuclidean". + if metric_kwargs.get("squared", False) and metric == "euclidean": + metric = "sqeuclidean" + metric_kwargs = {} + + values, indices = PairwiseDistancesArgKmin.compute( + X=X, + Y=Y, + k=1, + metric=metric, + metric_kwargs=metric_kwargs, + strategy="auto", + return_distance=True, ) - ) - indices = np.concatenate(indices) - values = np.concatenate(values) + values = values.flatten() + indices = indices.flatten() + else: + # TODO: once PairwiseDistancesArgKmin supports sparse input matrices and 32 bit, + # we won't need to fallback to pairwise_distances_chunked anymore. + + # Turn off check for finiteness because this is costly and because arrays + # have already been validated. + with config_context(assume_finite=True): + indices, values = zip( + *pairwise_distances_chunked( + X, Y, reduce_func=_argmin_min_reduce, metric=metric, **metric_kwargs + ) + ) + indices = np.concatenate(indices) + values = np.concatenate(values) return indices, values @@ -738,9 +776,49 @@ def pairwise_distances_argmin(X, Y, *, axis=1, metric="euclidean", metric_kwargs if metric_kwargs is None: metric_kwargs = {} - return pairwise_distances_argmin_min( - X, Y, axis=axis, metric=metric, metric_kwargs=metric_kwargs - )[0] + X, Y = check_pairwise_arrays(X, Y) + + if axis == 0: + X, Y = Y, X + + if metric_kwargs is None: + metric_kwargs = {} + + if PairwiseDistancesArgKmin.is_usable_for(X, Y, metric): + # This is an adaptor for one "sqeuclidean" specification. + # For this backend, we can directly use "sqeuclidean". + if metric_kwargs.get("squared", False) and metric == "euclidean": + metric = "sqeuclidean" + metric_kwargs = {} + + indices = PairwiseDistancesArgKmin.compute( + X=X, + Y=Y, + k=1, + metric=metric, + metric_kwargs=metric_kwargs, + strategy="auto", + return_distance=False, + ) + indices = indices.flatten() + else: + # TODO: once PairwiseDistancesArgKmin supports sparse input matrices and 32 bit, + # we won't need to fallback to pairwise_distances_chunked anymore. + + # Turn off check for finiteness because this is costly and because arrays + # have already been validated. + with config_context(assume_finite=True): + indices = np.concatenate( + list( + # This returns a np.ndarray generator whose arrays we need + # to flatten into one. + pairwise_distances_chunked( + X, Y, reduce_func=_argmin_reduce, metric=metric, **metric_kwargs + ) + ) + ) + + return indices def haversine_distances(X, Y=None): diff --git a/sklearn/metrics/tests/test_dist_metrics.py b/sklearn/metrics/tests/test_dist_metrics.py index bf258ea564c8c..6c841d1d44f8c 100644 --- a/sklearn/metrics/tests/test_dist_metrics.py +++ b/sklearn/metrics/tests/test_dist_metrics.py @@ -10,6 +10,7 @@ import scipy.sparse as sp from scipy.spatial.distance import cdist from sklearn.metrics import DistanceMetric +from sklearn.metrics._dist_metrics import BOOL_METRICS from sklearn.utils import check_random_state from sklearn.utils._testing import create_memmap_backed_data from sklearn.utils.fixes import sp_version, parse_version @@ -38,17 +39,6 @@ def dist_func(x1, x2, p): V = rng.random_sample((d, d)) VI = np.dot(V, V.T) -BOOL_METRICS = [ - "hamming", - "matching", - "jaccard", - "dice", - "kulsinski", - "rogerstanimoto", - "russellrao", - "sokalmichener", - "sokalsneath", -] METRICS_DEFAULT_PARAMS = [ ("euclidean", {}), diff --git a/sklearn/metrics/tests/test_pairwise.py b/sklearn/metrics/tests/test_pairwise.py index e2549cbde0807..345ae96bfcee1 100644 --- a/sklearn/metrics/tests/test_pairwise.py +++ b/sklearn/metrics/tests/test_pairwise.py @@ -432,10 +432,11 @@ def test_paired_distances_callable(): paired_distances(X, Y) -def test_pairwise_distances_argmin_min(): +@pytest.mark.parametrize("dtype", (np.float32, np.float64)) +def test_pairwise_distances_argmin_min(dtype): # Check pairwise minimum distances computation for any metric - X = [[0], [1]] - Y = [[-2], [3]] + X = np.asarray([[0], [1]], dtype=dtype) + Y = np.asarray([[-2], [3]], dtype=dtype) Xsp = dok_matrix(X) Ysp = csr_matrix(Y, dtype=np.float32) @@ -458,12 +459,23 @@ def test_pairwise_distances_argmin_min(): assert type(idxsp) == np.ndarray assert type(valssp) == np.ndarray - # euclidean metric squared - idx, vals = pairwise_distances_argmin_min( + # Squared Euclidean metric + idx, vals = pairwise_distances_argmin_min(X, Y, metric="sqeuclidean") + idx2, vals2 = pairwise_distances_argmin_min( X, Y, metric="euclidean", metric_kwargs={"squared": True} ) - assert_array_almost_equal(idx, expected_idx) + idx3 = pairwise_distances_argmin(X, Y, metric="sqeuclidean") + idx4 = pairwise_distances_argmin( + X, Y, metric="euclidean", metric_kwargs={"squared": True} + ) + assert_array_almost_equal(vals, expected_vals_sq) + assert_array_almost_equal(vals2, expected_vals_sq) + + assert_array_almost_equal(idx, expected_idx) + assert_array_almost_equal(idx2, expected_idx) + assert_array_almost_equal(idx3, expected_idx) + assert_array_almost_equal(idx4, expected_idx) # Non-euclidean scikit-learn metric idx, vals = pairwise_distances_argmin_min(X, Y, metric="manhattan") diff --git a/sklearn/metrics/tests/test_pairwise_distances_reduction.py b/sklearn/metrics/tests/test_pairwise_distances_reduction.py index e975aad55bf9c..b9f3d7dbf3dd5 100644 --- a/sklearn/metrics/tests/test_pairwise_distances_reduction.py +++ b/sklearn/metrics/tests/test_pairwise_distances_reduction.py @@ -176,14 +176,14 @@ def test_chunk_size_agnosticism( else 10 ** np.log(n_features) ) - ref_indices, ref_dist = PairwiseDistancesReduction.compute( + ref_dist, ref_indices = PairwiseDistancesReduction.compute( X, Y, parameter, return_distance=True, ) - indices, dist = PairwiseDistancesReduction.compute( + dist, indices = PairwiseDistancesReduction.compute( X, Y, parameter, @@ -222,20 +222,22 @@ def test_n_threads_agnosticism( else 10 ** np.log(n_features) ) - ref_indices, ref_dist = PairwiseDistancesReduction.compute( + ref_dist, ref_indices = PairwiseDistancesReduction.compute( X, Y, parameter, return_distance=True, ) - indices, dist = PairwiseDistancesReduction.compute( + dist, indices = PairwiseDistancesReduction.compute( X, Y, parameter, n_threads=1, return_distance=True ) ASSERT_RESULT[PairwiseDistancesReduction](ref_dist, dist, ref_indices, indices) +# TODO: Remove filterwarnings in 1.3 when wminkowski is removed +@pytest.mark.filterwarnings("ignore:WMinkowskiDistance:FutureWarning:sklearn") @pytest.mark.parametrize("seed", range(5)) @pytest.mark.parametrize("n_samples", [100, 1000]) @pytest.mark.parametrize("metric", PairwiseDistancesReduction.valid_metrics()) @@ -269,7 +271,7 @@ def test_strategies_consistency( else 10 ** np.log(n_features) ) - indices_par_X, dist_par_X = PairwiseDistancesReduction.compute( + dist_par_X, indices_par_X = PairwiseDistancesReduction.compute( X, Y, parameter, @@ -282,7 +284,7 @@ def test_strategies_consistency( return_distance=True, ) - indices_par_Y, dist_par_Y = PairwiseDistancesReduction.compute( + dist_par_Y, indices_par_Y = PairwiseDistancesReduction.compute( X, Y, parameter, @@ -305,7 +307,8 @@ def test_strategies_consistency( # Concrete PairwiseDistancesReductions tests - +# TODO: Remove filterwarnings in 1.3 when wminkowski is removed +@pytest.mark.filterwarnings("ignore:WMinkowskiDistance:FutureWarning:sklearn") @pytest.mark.parametrize("n_features", [50, 500]) @pytest.mark.parametrize("translation", [0, 1e6]) @pytest.mark.parametrize("metric", CDIST_PAIRWISE_DISTANCES_REDUCTION_COMMON_METRICS) @@ -346,7 +349,7 @@ def test_pairwise_distances_argkmin( row_idx, argkmin_indices_ref[row_idx] ] - argkmin_indices, argkmin_distances = PairwiseDistancesArgKmin.compute( + argkmin_distances, argkmin_indices = PairwiseDistancesArgKmin.compute( X, Y, k, diff --git a/sklearn/neighbors/_base.py b/sklearn/neighbors/_base.py index 8adb58b4f8c6c..5f8f9966ae349 100644 --- a/sklearn/neighbors/_base.py +++ b/sklearn/neighbors/_base.py @@ -22,6 +22,9 @@ from ..base import is_classifier from ..metrics import pairwise_distances_chunked from ..metrics.pairwise import PAIRWISE_DISTANCE_FUNCTIONS +from ..metrics._pairwise_distances_reduction import ( + PairwiseDistancesArgKmin, +) from ..utils import ( check_array, gen_even_slices, @@ -399,7 +402,9 @@ def _check_algorithm_metric(self): def _fit(self, X, y=None): if self._get_tags()["requires_y"]: if not isinstance(X, (KDTree, BallTree, NeighborsBase)): - X, y = self._validate_data(X, y, accept_sparse="csr", multi_output=True) + X, y = self._validate_data( + X, y, accept_sparse="csr", multi_output=True, order="C" + ) if is_classifier(self): # Classification targets require a specific format @@ -434,7 +439,7 @@ def _fit(self, X, y=None): else: if not isinstance(X, (KDTree, BallTree, NeighborsBase)): - X = self._validate_data(X, accept_sparse="csr") + X = self._validate_data(X, accept_sparse="csr", order="C") self._check_algorithm_metric() if self.metric_params is None: @@ -504,6 +509,7 @@ def _fit(self, X, y=None): if issparse(X): if self.algorithm not in ("auto", "brute"): warnings.warn("cannot use tree with sparse input: using brute force") + if self.effective_metric_ not in VALID_METRICS_SPARSE[ "brute" ] and not callable(self.effective_metric_): @@ -724,18 +730,17 @@ class from an array representing our data set and ask who's % type(n_neighbors) ) - if X is not None: - query_is_train = False - if self.metric == "precomputed": - X = _check_precomputed(X) - else: - X = self._validate_data(X, accept_sparse="csr", reset=False) - else: - query_is_train = True + query_is_train = X is None + if query_is_train: X = self._fit_X # Include an extra neighbor to account for the sample itself being # returned, which is removed later n_neighbors += 1 + else: + if self.metric == "precomputed": + X = _check_precomputed(X) + else: + X = self._validate_data(X, accept_sparse="csr", reset=False, order="C") n_samples_fit = self.n_samples_fit_ if n_neighbors > n_samples_fit: @@ -746,12 +751,35 @@ class from an array representing our data set and ask who's n_jobs = effective_n_jobs(self.n_jobs) chunked_results = None - if self._fit_method == "brute" and self.metric == "precomputed" and issparse(X): + use_pairwise_distances_reductions = ( + self._fit_method == "brute" + and PairwiseDistancesArgKmin.is_usable_for( + X if X is not None else self._fit_X, self._fit_X, self.effective_metric_ + ) + ) + if use_pairwise_distances_reductions: + results = PairwiseDistancesArgKmin.compute( + X=X, + Y=self._fit_X, + k=n_neighbors, + metric=self.effective_metric_, + metric_kwargs=self.effective_metric_params_, + n_threads=self.n_jobs, + strategy="auto", + return_distance=return_distance, + ) + + elif ( + self._fit_method == "brute" and self.metric == "precomputed" and issparse(X) + ): results = _kneighbors_from_graph( X, n_neighbors=n_neighbors, return_distance=return_distance ) elif self._fit_method == "brute": + # TODO: should no longer be needed once PairwiseDistancesArgKmin + # is extended to accept sparse and/or float32 inputs. + reduce_func = partial( self._kneighbors_reduce_func, n_neighbors=n_neighbors, @@ -1061,6 +1089,10 @@ class from an array representing our data set and ask who's ) elif self._fit_method == "brute": + # TODO: should no longer be needed once we have Cython-optimized + # implementation for radius queries, with support for sparse and/or + # float32 inputs. + # for efficiency, use squared euclidean distances if self.effective_metric_ == "euclidean": radius *= radius diff --git a/sklearn/neighbors/tests/test_neighbors.py b/sklearn/neighbors/tests/test_neighbors.py index 4526b0a3c2e06..1d76feaeca782 100644 --- a/sklearn/neighbors/tests/test_neighbors.py +++ b/sklearn/neighbors/tests/test_neighbors.py @@ -13,8 +13,12 @@ issparse, ) -from sklearn import metrics -from sklearn import neighbors, datasets +from sklearn import ( + config_context, + datasets, + metrics, + neighbors, +) from sklearn.base import clone from sklearn.exceptions import DataConversionWarning from sklearn.exceptions import EfficiencyWarning @@ -1450,7 +1454,6 @@ def test_neighbors_metrics( metric, n_samples=20, n_features=3, n_query_pts=2, n_neighbors=5 ): # Test computing the neighbors for various metrics - # create a symmetric matrix algorithms = ["brute", "ball_tree", "kd_tree"] X_train = rng.rand(n_samples, n_features) X_test = rng.rand(n_query_pts, n_features) @@ -1493,7 +1496,7 @@ def test_neighbors_metrics( and algorithm == "brute" and sp_version >= parse_version("1.6.0") ): - ExceptionToAssert = DeprecationWarning + ExceptionToAssert = FutureWarning with pytest.warns(ExceptionToAssert): results[algorithm] = neigh.kneighbors(X_test, return_distance=True) @@ -1513,6 +1516,59 @@ def test_neighbors_metrics( assert_array_equal(ball_tree_idx, kd_tree_idx) +# TODO: Remove filterwarnings in 1.3 when wminkowski is removed +@pytest.mark.filterwarnings("ignore:WMinkowskiDistance:FutureWarning:sklearn") +@pytest.mark.parametrize( + "metric", sorted(set(neighbors.VALID_METRICS["brute"]) - set(["precomputed"])) +) +def test_kneighbors_brute_backend( + metric, n_samples=2000, n_features=30, n_query_pts=100, n_neighbors=5 +): + # Both backend for the 'brute' algorithm of kneighbors must give identical results. + X_train = rng.rand(n_samples, n_features) + X_test = rng.rand(n_query_pts, n_features) + + # Haversine distance only accepts 2D data + if metric == "haversine": + feature_sl = slice(None, 2) + X_train = np.ascontiguousarray(X_train[:, feature_sl]) + X_test = np.ascontiguousarray(X_test[:, feature_sl]) + + metric_params_list = _generate_test_params_for(metric, n_features) + + # wminkoski is deprecated in SciPy 1.6.0 and removed in 1.8.0 + ExceptionToAssert = None + if metric == "wminkowski" and sp_version >= parse_version("1.6.0"): + ExceptionToAssert = FutureWarning + + for metric_params in metric_params_list: + p = metric_params.pop("p", 2) + + neigh = neighbors.NearestNeighbors( + n_neighbors=n_neighbors, + algorithm="brute", + metric=metric, + p=p, + metric_params=metric_params, + ) + + neigh.fit(X_train) + with pytest.warns(ExceptionToAssert): + with config_context(enable_cython_pairwise_dist=False): + # Use the legacy backend for brute + legacy_brute_dst, legacy_brute_idx = neigh.kneighbors( + X_test, return_distance=True + ) + with config_context(enable_cython_pairwise_dist=True): + # Use the PairwiseDistancesReduction as a backend for brute + pdr_brute_dst, pdr_brute_idx = neigh.kneighbors( + X_test, return_distance=True + ) + + assert_allclose(legacy_brute_dst, pdr_brute_dst) + assert_array_equal(legacy_brute_idx, pdr_brute_idx) + + def test_callable_metric(): def custom_metric(x1, x2): return np.sqrt(np.sum(x1 ** 2 + x2 ** 2)) diff --git a/sklearn/tests/test_config.py b/sklearn/tests/test_config.py index cccaf7c44d2a7..c0bddf8cbab0e 100644 --- a/sklearn/tests/test_config.py +++ b/sklearn/tests/test_config.py @@ -15,6 +15,7 @@ def test_config_context(): "print_changed_only": True, "display": "text", "pairwise_dist_chunk_size": 256, + "enable_cython_pairwise_dist": True, } # Not using as a context manager affects nothing @@ -28,6 +29,7 @@ def test_config_context(): "print_changed_only": True, "display": "text", "pairwise_dist_chunk_size": 256, + "enable_cython_pairwise_dist": True, } assert get_config()["assume_finite"] is False @@ -58,6 +60,7 @@ def test_config_context(): "print_changed_only": True, "display": "text", "pairwise_dist_chunk_size": 256, + "enable_cython_pairwise_dist": True, } # No positional arguments From ca6f379182e96f024193390b213cc4c90fc56598 Mon Sep 17 00:00:00 2001 From: Christian Lorentzen Date: Thu, 17 Feb 2022 09:12:58 +0100 Subject: [PATCH 5/6] DOC place comment inside functions --- sklearn/metrics/pairwise.py | 13 ++++++------- 1 file changed, 6 insertions(+), 7 deletions(-) diff --git a/sklearn/metrics/pairwise.py b/sklearn/metrics/pairwise.py index 7332a5b9b84d5..1414d6afc1e39 100644 --- a/sklearn/metrics/pairwise.py +++ b/sklearn/metrics/pairwise.py @@ -578,20 +578,19 @@ def _euclidean_distances_upcast(X, XX=None, Y=None, YY=None, batch_size=None): return distances -# start is specified in the signature of `_argmin_min_reduce` -# and of `_argmin_reduce` but is not used. -# This is because the higher order `pairwise_distances_chunked` -# function needs reduction functions that are passed as argument -# to have a two arguments signature. - - def _argmin_min_reduce(dist, start): + # `start` is specified in the signature but not used. This is because the higher + # order `pairwise_distances_chunked` function needs reduction functions that are + # passed as argument to have a two arguments signature. indices = dist.argmin(axis=1) values = dist[np.arange(dist.shape[0]), indices] return indices, values def _argmin_reduce(dist, start): + # `start` is specified in the signature but not used. This is because the higher + # order `pairwise_distances_chunked` function needs reduction functions that are + # passed as argument to have a two arguments signature. return dist.argmin(axis=1) From 351061cbdd8be46deee2abd72bf93c432675bf40 Mon Sep 17 00:00:00 2001 From: Christian Lorentzen Date: Thu, 17 Feb 2022 09:37:19 +0100 Subject: [PATCH 6/6] DOC move up whatsnew entry --- doc/whats_new/v1.1.rst | 63 ++++++++++++++++++++---------------------- 1 file changed, 30 insertions(+), 33 deletions(-) diff --git a/doc/whats_new/v1.1.rst b/doc/whats_new/v1.1.rst index 0d3a782e9a122..8987e3a21f5a3 100644 --- a/doc/whats_new/v1.1.rst +++ b/doc/whats_new/v1.1.rst @@ -65,6 +65,35 @@ Changelog :pr:`123456` by :user:`Joe Bloggs `. where 123456 is the *pull request* number, not the issue number. +- |Efficiency| Low-level routines for reductions on pairwise distances + for dense float64 datasets have been refactored. The following functions + and estimators now benefit from improved performances, in particular on + multi-cores machines: + - :func:`sklearn.metrics.pairwise_distances_argmin` + - :func:`sklearn.metrics.pairwise_distances_argmin_min` + - :class:`sklearn.cluster.AffinityPropagation` + - :class:`sklearn.cluster.Birch` + - :class:`sklearn.cluster.MeanShift` + - :class:`sklearn.cluster.OPTICS` + - :class:`sklearn.cluster.SpectralClustering` + - :func:`sklearn.feature_selection.mutual_info_regression` + - :class:`sklearn.neighbors.KNeighborsClassifier` + - :class:`sklearn.neighbors.KNeighborsRegressor` + - :class:`sklearn.neighbors.LocalOutlierFactor` + - :class:`sklearn.neighbors.NearestNeighbors` + - :class:`sklearn.manifold.Isomap` + - :class:`sklearn.manifold.LocallyLinearEmbedding` + - :class:`sklearn.manifold.TSNE` + - :func:`sklearn.manifold.trustworthiness` + - :class:`sklearn.semi_supervised.LabelPropagation` + - :class:`sklearn.semi_supervised.LabelSpreading` + + For instance :class:`sklearn.neighbors.NearestNeighbors.kneighbors` + can be up to ×20 faster than in the previous versions'. + + :pr:`21987`, :pr:`22064`, :pr:`22065` and :pr:`22288` + by :user:`Julien Jerphanion ` + - |Enhancement| All scikit-learn models now generate a more informative error message when some input contains unexpected `NaN` or infinite values. In particular the message contains the input name ("X", "y" or @@ -459,7 +488,7 @@ Changelog now raise consistent error messages when passed invalid values for `l1_ratio`, `alpha`, `max_iter` and `tol`. :pr:`22240` by :user:`Arturo Amor `. - + - |Fix| :class:`linear_model.LogisticRegression` and :class:`linear_model.LogisticRegressionCV` now set the `n_iter_` attribute with a shape that respects the docstring and that is consistent with the shape @@ -674,38 +703,6 @@ Changelog - |Enhancement| :func:`utils.validation.check_scalar` now has better messages when displaying the type. :pr:`22218` by `Thomas Fan`_. -Miscellaneous -............. - -- |Efficiency| Low-level routines for reductions on pairwise distances - for dense float64 datasets have been refactored. The following functions - and estimators now benefit from improved performances, in particular on - multi-cores machines: - - :func:`sklearn.metrics.pairwise_distances_argmin` - - :func:`sklearn.metrics.pairwise_distances_argmin_min` - - :class:`sklearn.cluster.AffinityPropagation` - - :class:`sklearn.cluster.Birch` - - :class:`sklearn.cluster.MeanShift` - - :class:`sklearn.cluster.OPTICS` - - :class:`sklearn.cluster.SpectralClustering` - - :func:`sklearn.feature_selection.mutual_info_regression` - - :class:`sklearn.neighbors.KNeighborsClassifier` - - :class:`sklearn.neighbors.KNeighborsRegressor` - - :class:`sklearn.neighbors.LocalOutlierFactor` - - :class:`sklearn.neighbors.NearestNeighbors` - - :class:`sklearn.manifold.Isomap` - - :class:`sklearn.manifold.LocallyLinearEmbedding` - - :class:`sklearn.manifold.TSNE` - - :func:`sklearn.manifold.trustworthiness` - - :class:`sklearn.semi_supervised.LabelPropagation` - - :class:`sklearn.semi_supervised.LabelSpreading` - - For instance :class:`sklearn.neighbors.NearestNeighbors.kneighbors` - can be up to ×20 faster than in the previous versions'. - - :pr:`21987`, :pr:`22064`, :pr:`22065` and :pr:`22288` - by :user:`Julien Jerphanion ` - - |Fix| :func:`check_scalar` raises an error when `include_boundaries={"left", "right"}` and the boundaries are not set. :pr:`22027` by `Marie Lanternier `.