8000 walk: Send WorkerResults in batches · sharkdp/fd@c384492 · GitHub
[go: up one dir, main page]

Skip to content

Commit c384492

Browse files
committed
walk: Send WorkerResults in batches
1 parent 8bbbd76 commit c384492

File tree

3 files changed

+137
-51
lines changed

3 files changed

+137
-51
lines changed

src/dir_entry.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,11 +8,13 @@ use lscolors::{Colorable, LsColors, Style};
88
use crate::config::Config;
99
use crate::filesystem::strip_current_dir;
1010

11+
#[derive(Debug)]
1112
enum DirEntryInner {
1213
Normal(ignore::DirEntry),
1314
BrokenSymlink(PathBuf),
1415
}
1516

17+
#[derive(Debug)]
1618
pub struct DirEntry {
1719
inner: DirEntryInner,
1820
metadata: OnceCell<Option<Metadata>>,

src/exec/job.rs

Lines changed: 12 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,6 @@
11
use std::sync::Mutex;
22

3-
use crossbeam_channel::Receiver;
4-
53
use crate::config::Config;
6-
use crate::dir_entry::DirEntry;
74
use crate::error::print_error;
85
use crate::exit_codes::{merge_exitcodes, ExitCode};
96
use crate::walk::WorkerResult;
@@ -14,43 +11,43 @@ use super::CommandSet;
1411
/// generate a command with the supplied command template. The generated command will then
1512
/// be executed, and this process will continue until the receiver's sender has closed.
1613
pub fn job(
17-
rx: Receiver<WorkerResult>,
14+
results: impl IntoIterator<Item = WorkerResult>,
1815
cmd: &CommandSet,
1916
out_perm: &Mutex<()>,
2017
config: &Config,
2118
) -> ExitCode {
2219
// Output should be buffered when only running a single thread
2320
let buffer_output: bool = config.threads > 1;
2421

25-
let mut results: Vec<ExitCode> = Vec::new();
26-
loop {
22+
let mut ret = ExitCode::Success;
23+
for result in results {
2724
// Obtain the next result from the receiver, else if the channel
2825
// has closed, exit from the loop
29-
let dir_entry: DirEntry = match rx.recv() {
30-
Ok(WorkerResult::Entry(dir_entry)) => dir_entry,
31-
Ok(WorkerResult::Error(err)) => {
26+
let dir_entry = match result {
27+
WorkerResult::Entry(dir_entry) => dir_entry,
28+
WorkerResult::Error(err) => {
3229
if config.show_filesystem_errors {
3330
print_error(err.to_string());
3431
}
3532
continue;
3633
}
37-
Err(_) => break,
3834
};
3935

4036
// Generate a command, execute it and store its exit code.
41-
results.push(cmd.execute(
37+
let code = cmd.execute(
4238
dir_entry.stripped_path(config),
4339
config.path_separator.as_deref(),
4440
out_perm,
4541
buffer_output,
46-
))
42+
);
43+
ret = merge_exitcodes([ret, code]);
4744
}
4845
// Returns error in case of any error.
49-
merge_exitcodes(results)
46+
ret
5047
}
5148

52-
pub fn batch(rx: Receiver<WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
53-
let paths = rx
49+
pub fn batch(results: impl IntoIterator<Item = WorkerResult>, cmd: &CommandSet, config: &Config) -> ExitCode {
50+
let paths = results
5451
.into_iter()
5552
.filter_map(|worker_result| match worker_result {
5653
WorkerResult::Entry(dir_entry) => Some(dir_entry.into_stripped_path(config)),

src/walk.rs

Lines changed: 123 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,12 @@ use std::io::{self, Write};
44
use std::mem;
55
use std::path::PathBuf;
66
use std::sync::atomic::{AtomicBool, Ordering};
7-
use std::sync::{Arc, Mutex};
7+
use std::sync::{Arc, Mutex, MutexGuard};
88
use std::thread;
99
use std::time::{Duration, Instant};
1010

1111
use anyhow::{anyhow, Result};
12-
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, Sender};
12+
use crossbeam_channel::{bounded, Receiver, RecvTimeoutError, SendError, Sender};
1313
use etcetera::BaseStrategy;
1414
use ignore::overrides::{Override, OverrideBuilder};
1515
use ignore::{self, WalkBuilder, WalkParallel, WalkState};
@@ -36,13 +36,97 @@ enum ReceiverMode {
3636

3737
/// The Worker threads can result in a valid entry having PathBuf or an error.
3838
#[allow(clippy::large_enum_variant)]
39+
#[derive(Debug)]
3940
pub enum WorkerResult {
4041
// Errors should be rare, so it's probably better to allow large_enum_variant than
4142
// to box the Entry variant
4243
Entry(DirEntry),
4344
Error(ignore::Error),
4445
}
4546

47+
/// A batch of items to send over a channel.
48+
struct Batch<T> {
49+
items: Arc<Mutex<Option<Vec<T>>>>,
50+
}
51+
52+
impl<T> Batch<T> {
53+
fn new() -> Self {
54+
Self {
55+
items: Arc::new(Mutex::new(Some(vec![]))),
56+
}
57+
}
58+
59+
fn lock(&self) -> MutexGuard<'_, Option<Vec<T>>> {
60+
self.items.lock().unwrap()
61+
}
62+
}
63+
64+
impl<T> Clone for Batch<T> {
65+
fn clone(&self) -> Self {
66+
Self {
67+
items: Arc::clone(&self.items),
68+
}
69+
}
70+
}
71+
72+
impl<T> IntoIterator for Batch<T> {
73+
type Item = T;
74+
type IntoIter = std::vec::IntoIter<T>;
75+
76+
fn into_iter(self) -> Self::IntoIter {
77+
self.lock().take().unwrap().into_iter()
78+
}
79+
}
80+
81+
/// A batch of WorkerResults.
82+
type ResultBatch = Batch<WorkerResult>;
83+
84+
/// Wrapper that sends batches of items at once over a channel.
85+
struct BatchSender<T> {
86+
batch: Batch<T>,
87+
tx: Sender<Batch<T>>,
88+
}
89+
90+
impl<T> BatchSender<T> {
91+
fn new(tx: Sender<Batch<T>>) -> Self {
92+
Self {
93+
batch: Batch::new(),
94+
tx,
95+
}
96+
}
97+
98+
/// Check if we need to flush a batch.
99+
fn needs_flush(batch: Option<&Vec<T>>) -> bool {
100+
match batch {
101+
// Limit the batch size to provide some backpressure
102+
Some(vec) => vec.len() >= 0x400,
103+
// Batch was already taken by the receiver, so make a new one
104+
None => true,
105+
}
106+
}
107+
108+
/// Add an item to a batch.
109+
fn send(&mut self, item: T) -> Result<(), SendError<()>> {
110+
let mut batch = self.batch.lock();
111+
112+
if Self::needs_flush(batch.as_ref()) {
113+
drop(batch);
114+
self.batch = Batch::new();
115+
batch = self.batch.lock();
116+
}
117+
118+
let items = batch.as_mut().unwrap();
119+
items.push(item);
120+
121+
if items.len() == 1 {
122+
// New batch, send it over the channel
123+
self.tx.send(self.batch.clone()).map_err(|_| SendError 65CE (()))?;
124+
}
125+
126+
Ok(())
127+
}
128+
}
129+
46130
/// Maximum size of the output buffer before flushing results to the console
47131
const MAX_BUFFER_LENGTH: usize = 1000;
48132
/// Default duration until output buffering switches to streaming.
@@ -57,7 +141,7 @@ struct ReceiverBuffer<'a, W> {
57141
/// The ^C notifier.
58142
interrupt_flag: &'a AtomicBool,
59143
/// Receiver for worker results.
60-
rx: Receiver<WorkerResult>,
144+
rx: Receiver<ResultBatch>,
61145
/// Standard output.
62146
stdout: W,
63147
/// The current buffer mode.
@@ -72,7 +156,7 @@ struct ReceiverBuffer<'a, W> {
72156

73157
impl<'a, W: Write> ReceiverBuffer<'a, W> {
74158
/// Create a new receiver buffer.
75-
fn new(state: &'a WorkerState, rx: Receiver<WorkerResult>, stdout: W) -> Self {
159+
fn new(state: &'a WorkerState, rx: Receiver<ResultBatch>, stdout: W) -> Self {
76160
let config = &state.config;
77161
let quit_flag = state.quit_flag.as_ref();
78162
let interrupt_flag = state.interrupt_flag.as_ref();
@@ -103,7 +187,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
103187
}
104188

105189
/// Receive the next worker result.
106-
fn recv(&self) -> Result<WorkerResult, RecvTimeoutError> {
190+
fn recv(&self) -> Result<ResultBatch, RecvTimeoutError> {
107191
match self.mode {
108192
ReceiverMode::Buffering => {
109193
// Wait at most until we should switch to streaming
@@ -119,34 +203,38 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
119203
/// Wait for a result or state change.
120204
fn poll(&mut self) -> Result<(), ExitCode> {
121205
match self.recv() {
122-
Ok(WorkerResult::Entry(dir_entry)) => {
123-
if self.config.quiet {
124-
return Err(ExitCode::HasResults(true));
125-
}
206+
Ok(batch) => for result in batch {
207+
match result {
208+
WorkerResult::Entry(dir_entry) => {
209+
if self.config.quiet {
210+
return Err(ExitCode::HasResults(true));
211+
}
126212

127-
match self.mode {
128-
ReceiverMode::Buffering => {
129-
self.buffer.push(dir_entry);
130-
if self.buffer.len() > MAX_BUFFER_LENGTH {
131-
self.stream()?;
213+
match self.mode {
214+
ReceiverMode::Buffering => {
215+
self.buffer.push(dir_entry);
216+
if self.buffer.len() > MAX_BUFFER_LENGTH {
217+
self.stream()?;
218+
}
219+
}
220+
ReceiverMode::Streaming => {
221+
self.print(&dir_entry)?;
222+
self.flush()?;
223+
}
132224
}
133-
}
134-
ReceiverMode::Streaming => {
135-
self.print(&dir_entry)?;
136-
self.flush()?;
137-
}
138-
}
139225

140-
self.num_results += 1;
141-
if let Some(max_results) = self.config.max_results {
142-
if self.num_results >= max_results {
143-
return self.stop();
226+
self.num_results += 1;
227+
if let Some(max_results) = self.config.max_results {
228+
if self.num_results >= max_results {
229+
return self.stop();
230+
}
231+
}
232+
}
233+
WorkerResult::Error(err) => {
234+
if self.config.show_filesystem_errors {
235+
print_error(err.to_string());
236+
}
144237
}
145-
}
146-
}
147-
Ok(WorkerResult::Error(err)) => {
148-
if self.config.show_filesystem_errors {
149-
print_error(err.to_string());
150238
}
151239
}
152240
Err(RecvTimeoutError::Timeout) => {
@@ -319,13 +407,13 @@ impl WorkerState {
319407

320408
/// Run the receiver work, either on this thread or a pool of background
321409
/// threads (for --exec).
322-
fn receive(&self, rx: Receiver<WorkerResult>) -> ExitCode {
410+
fn receive(&self, rx: Receiver<ResultBatch>) -> ExitCode {
323411
let config = &self.config;
324412

325413
// This will be set to `Some` if the `--exec` argument was supplied.
326414
if let Some(ref cmd) = config.command {
327415
if cmd.in_batch_mode() {
328-
exec::batch(rx, cmd, &config)
416+
exec::batch(rx.into_iter().flatten(), cmd, &config)
329417
} else {
330418
let out_perm = Mutex::new(());
331419

@@ -337,7 +425,7 @@ impl WorkerState {
337425
let rx = rx.clone();
338426

339427
// Spawn a job thread that will listen for and execute inputs.
340-
let handle = scope.spawn(|| exec::job(rx, cmd, &out_perm, &config));
428+
let handle = scope.spawn(|| exec::job(rx.into_iter().flatten(), cmd, &out_perm, &config));
341429

342430
// Push the handle of the spawned thread into the vector for later joining.
343431
handles.push(handle);
@@ -355,12 +443,12 @@ impl WorkerState {
355443
}
356444

357445
/// Spawn the sender threads.
358-
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<WorkerResult>) {
446+
fn spawn_senders(&self, walker: WalkParallel, tx: Sender<ResultBatch>) {
359447
walker.run(|| {
360448
let patterns = &self.patterns;
361449
let config = &self.config;
362450
let quit_flag = self.quit_flag.as_ref();
363-
let tx = tx.clone();
451+
let mut tx = BatchSender::new(tx.clone());
364452

365453
Box::new(move |entry| {
366454
if quit_flag.load(Ordering::Relaxed) {
@@ -545,8 +633,7 @@ impl WorkerState {
545633
.unwrap();
546634
}
547635

548-
// Channel capacity was chosen empircally to perform similarly to an unbounded channel
549-
let (tx, rx) = bounded(0x4000 * config.threads);
636+
let (tx, rx) = bounded(config.threads);
550637

551638
let exit_code = thread::scope(|scope| {
552639
// Spawn the receiver thread(s)

0 commit comments

Comments
 (0)
0