8000 perf: Set parallelization threshold in `take_unchecked_impl` (#25672) · pola-rs/polars@d28f504 · GitHub
[go: up one dir, main page]

Skip to content

Commit d28f504

Browse files
authored
perf: Set parallelization threshold in take_unchecked_impl (#25672)
1 parent d3671df commit d28f504

File tree

1 file changed

+52
-44
lines changed
  • crates/polars-core/src/frame

1 file changed

+52
-44
lines changed

crates/polars-core/src/frame/mod.rs

Lines changed: 52 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -2054,26 +2054,30 @@ impl DataFrame {
20542054
POOL.install(|| {
20552055
if POOL.current_num_threads() > self.width() {
20562056
let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
2057-
self._apply_columns_par(&|c| {
2058-
// Nested types initiate a rechunk in their take_unchecked implementation.
2059-
// If we do not rechunk, it will result in rechunk storms downstream.
2060-
let c = if c.dtype().is_nested() {
2061-
&c.rechunk()
2062-
} else {
2063-
c
2064-
};
2065-
2066-
(0..idx.len().div_ceil(stride))
2067-
.into_par_iter()
2068-
.map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
2069-
.reduce(
2070-
|| Column::new_empty(c.name().clone(), c.dtype()),
2071-
|mut a, b| {
2072-
a.append_owned(b).unwrap();
2073-
a
2074-
},
2075-
)
2076-
})
2057+
if self.len() / stride >= 2 {
2058+
self._apply_columns_par(&|c| {
2059+
// Nested types initiate a rechunk in their take_unchecked implementation.
2060+
// If we do not rechunk, it will result in rechunk storms downstream.
2061+
let c = if c.dtype().is_nested() {
2062+
&c.rechunk()
2063+
} else {
2064+
c
2065+
};
2066+
2067+
(0..idx.len().div_ceil(stride))
2068+
.into_par_iter()
2069+
.map(|i| c.take_unchecked(&idx.slice((i * stride) as i64, stride)))
2070+
.reduce(
2071+
|| Column::new_empty(c.name().clone(), c.dtype()),
2072+
|mut a, b| {
2073+
a.append_owned(b).unwrap();
2074+
a
2075+
},
2076+
)
2077+
})
2078+
} else {
2079+
self._apply_columns_par(&|c| c.take_unchecked(idx))
2080+
}
20772081
} else {
20782082
self._apply_columns_par(&|c| c.take_unchecked(idx))
20792083
}
@@ -2097,30 +2101,34 @@ impl DataFrame {
20972101
POOL.install(|| {
20982102
if POOL.current_num_threads() > self.width() {
20992103
let stride = usize::max(idx.len().div_ceil(POOL.current_num_threads()), 256);
2100-
self._apply_columns_par(&|c| {
2101-
// Nested types initiate a rechunk in their take_unchecked implementation.
2102-
// If we do not rechunk, it will result in rechunk storms downstream.
2103-
let c = if c.dtype().is_nested() {
2104-
&c.rechunk()
2105-
} else {
2106-
c
2107-
};
2108-
2109-
(0..idx.len().div_ceil(stride))
2110-
.into_par_iter()
2111-
.map(|i| {
2112-
let idx = &idx[i * stride..];
2113-
let idx = &idx[..idx.len().min(stride)];
2114-
c.take_slice_unchecked(idx)
2115-
})
2116-
.reduce(
2117-
|| Column::new_empty(c.name().clone(), c.dtype()),
2118-
|mut a, b| {
2119-
a.append_owned(b).unwrap();
2120-
a
2121-
},
2122-
)
2123-
})
2104+
if self.len() / stride >= 2 {
2105+
self._apply_columns_par(&|c| {
2106+
// Nested types initiate a rechunk in their take_unchecked implementation.
2107+
// If we do not rechunk, it will result in rechunk storms downstream.
2108+
let c = if c.dtype().is_nested() {
2109+
&c.rechunk()
2110+
} else {
2111+
c
2112+
};
2113+
2114+
(0..idx.len().div_ceil(stride))
2115+
.into_par_iter()
2116+
.map(|i| {
2117+
let idx = &idx[i * stride..];
2118+
let idx = &idx[..idx.len().min(stride)];
2119+
c.take_slice_unchecked(idx)
2120+
})
2121+
.reduce(
2122+
|| Column::new_empty(c.name().clone(), c.dtype()),
2123+
|mut a, b| {
2124+
a.append_owned(b).unwrap();
2125+
a
2126+
},
2127+
)
2128+
})
2129+
} else {
2130+
self._apply_columns_par(&|s| s.take_slice_unchecked(idx))
2131+
}
21242132
} else {
21252133
self._apply_columns_par(&|s| s.take_slice_unchecked(idx))
21262134
}

0 commit comments

Comments
 (0)
0