@@ -4,12 +4,12 @@ use std::io::{self, Write};
44use std:: mem;
55use std:: path:: PathBuf ;
66use std:: sync:: atomic:: { AtomicBool , Ordering } ;
7- use std:: sync:: { Arc , Mutex } ;
7+ use std:: sync:: { Arc , Mutex , MutexGuard } ;
88use std:: thread;
99use std:: time:: { Duration , Instant } ;
1010
1111use anyhow:: { anyhow, Result } ;
12- use crossbeam_channel:: { bounded, Receiver , RecvTimeoutError , Sender } ;
12+ use crossbeam_channel:: { bounded, Receiver , RecvTimeoutError , SendError , Sender } ;
1313use etcetera:: BaseStrategy ;
1414use ignore:: overrides:: { Override , OverrideBuilder } ;
1515use ignore:: { self , WalkBuilder , WalkParallel , WalkState } ;
@@ -36,13 +36,97 @@ enum ReceiverMode {
3636
3737/// The Worker threads can result in a valid entry having PathBuf or an error.
3838#[ allow( clippy:: large_enum_variant) ]
39+ #[ derive( Debug ) ]
3940pub enum WorkerResult {
4041 // Errors should be rare, so it's probably better to allow large_enum_variant than
4142 // to box the Entry variant
4243 Entry ( DirEntry ) ,
4344 Error ( ignore:: Error ) ,
4445}
4546
47+ /// A batch of items to send over a channel.
48+ struct Batch < T > {
49+ items : Arc < Mutex < Option < Vec < T > > > > ,
50+ }
51+
52+ impl < T > Batch < T > {
53+ fn new ( ) -> Self {
54+ Self {
55+ items : Arc :: new ( Mutex :: new ( Some ( vec ! [ ] ) ) ) ,
56+ }
57+ }
58+
59+ fn lock ( & self ) -> MutexGuard < ' _ , Option < Vec < T > > > {
60+ self . items . lock ( ) . unwrap ( )
61+ }
62+ }
63+
64+ impl < T > Clone for Batch < T > {
65+ fn clone ( & self ) -> Self {
66+ Self {
67+ items : Arc :: clone ( & self . items ) ,
68+ }
69+ }
70+ }
71+
72+ impl < T > IntoIterator for Batch < T > {
73+ type Item = T ;
74+ type IntoIter = std:: vec:: IntoIter < T > ;
75+
76+ fn into_iter ( self ) -> Self :: IntoIter {
77+ self . lock ( ) . take ( ) . unwrap ( ) . into_iter ( )
78+ }
79+ }
80+
81+ /// A batch of WorkerResults.
82+ type ResultBatch = Batch < WorkerResult > ;
83+
84+ /// Wrapper that sends batches of items at once over a channel.
85+ struct BatchSender < T > {
86+ batch : Batch < T > ,
87+ tx : Sender < Batch < T > > ,
88+ }
89+
90+ impl < T > BatchSender < T > {
91+ fn new ( tx : Sender < Batch < T > > ) -> Self {
92+ Self {
93+ batch : Batch :: new ( ) ,
94+ tx,
95+ }
96+ }
97+
98+ /// Check if we need to flush a batch.
99+ fn needs_flush ( batch : Option < & Vec < T > > ) -> bool {
100+ match batch {
101+ // Limit the batch size to provide some backpressure
102+ Some ( vec) => vec. len ( ) >= 0x400 ,
103+ // Batch was already taken by the receiver, so make a new one
104+ None => true ,
105+ }
106+ }
107+
108+ /// Add an item to a batch.
109+ fn send ( & mut self , item : T ) -> Result < ( ) , SendError < ( ) > > {
110+ let mut batch = self . batch . lock ( ) ;
111+
112+ if Self :: needs_flush ( batch. as_ref ( ) ) {
113+ drop ( batch) ;
114+ self . batch = Batch :: new ( ) ;
115+ batch = self . batch . lock ( ) ;
116+ }
117+
118+ let items = batch. as_mut ( ) . unwrap ( ) ;
119+ items. push ( item) ;
120+
121+ if items. len ( ) == 1 {
122+ // New batch, send it over the channel
123+ self . tx . send ( self . batch . clone ( ) ) . map_err ( |_| SendError
65CE
( ( ) ) ) ?;
124+ }
125+
126+ Ok ( ( ) )
127+ }
128+ }
129+
46130/// Maximum size of the output buffer before flushing results to the console
47131const MAX_BUFFER_LENGTH : usize = 1000 ;
48132/// Default duration until output buffering switches to streaming.
@@ -57,7 +141,7 @@ struct ReceiverBuffer<'a, W> {
57141 /// The ^C notifier.
58142 interrupt_flag : & ' a AtomicBool ,
59143 /// Receiver for worker results.
60- rx : Receiver < WorkerResult > ,
144+ rx : Receiver < ResultBatch > ,
61145 /// Standard output.
62146 stdout : W ,
63147 /// The current buffer mode.
@@ -72,7 +156,7 @@ struct ReceiverBuffer<'a, W> {
72156
73157impl < ' a , W : Write > ReceiverBuffer < ' a , W > {
74158 /// Create a new receiver buffer.
75- fn new ( state : & ' a WorkerState , rx : Receiver < WorkerResult > , stdout : W ) -> Self {
159+ fn new ( state : & ' a WorkerState , rx : Receiver < ResultBatch > , stdout : W ) -> Self {
76160 let config = & state. config ;
77161 let quit_flag = state. quit_flag . as_ref ( ) ;
78162 let interrupt_flag = state. interrupt_flag . as_ref ( ) ;
@@ -103,7 +187,7 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
103187 }
104188
105189 /// Receive the next worker result.
106- fn recv ( & self ) -> Result < WorkerResult , RecvTimeoutError > {
190+ fn recv ( & self ) -> Result < ResultBatch , RecvTimeoutError > {
107191 match self . mode {
108192 ReceiverMode :: Buffering => {
109193 // Wait at most until we should switch to streaming
@@ -119,34 +203,38 @@ impl<'a, W: Write> ReceiverBuffer<'a, W> {
119203 /// Wait for a result or state change.
120204 fn poll ( & mut self ) -> Result < ( ) , ExitCode > {
121205 match self . recv ( ) {
122- Ok ( WorkerResult :: Entry ( dir_entry) ) => {
123- if self . config . quiet {
124- return Err ( ExitCode :: HasResults ( true ) ) ;
125- }
206+ Ok ( batch) => for result in batch {
207+ match result {
208+ WorkerResult :: Entry ( dir_entry) => {
209+ if self . config . quiet {
210+ return Err ( ExitCode :: HasResults ( true ) ) ;
211+ }
126212
127- match self . mode {
128- ReceiverMode :: Buffering => {
129- self . buffer . push ( dir_entry) ;
130- if self . buffer . len ( ) > MAX_BUFFER_LENGTH {
131- self . stream ( ) ?;
213+ match self . mode {
214+ ReceiverMode :: Buffering => {
215+ self . buffer . push ( dir_entry) ;
216+ if self . buffer . len ( ) > MAX_BUFFER_LENGTH {
217+ self . stream ( ) ?;
218+ }
219+ }
220+ ReceiverMode :: Streaming => {
221+ self . print ( & dir_entry) ?;
222+ self . flush ( ) ?;
223+ }
132224 }
133- }
134- ReceiverMode :: Streaming => {
135- self .
D306
span>print ( & dir_entry) ?;
136- self . flush ( ) ?;
137- }
138- }
139225
140- self . num_results += 1 ;
141- if let Some ( max_results) = self . config . max_results {
142- if self . num_results >= max_results {
143- return self . stop ( ) ;
226+ self . num_results += 1 ;
227+ if let Some ( max_results) = self . config . max_results {
228+ if self . num_results >= max_results {
229+ return self . stop ( ) ;
230+ }
231+ }
232+ }
233+ WorkerResult :: Error ( err) => {
234+ if self . config . show_filesystem_errors {
235+ print_error ( err. to_string ( ) ) ;
236+ }
144237 }
145- }
146- }
147- Ok ( WorkerResult :: Error ( err) ) => {
148- if self . config . show_filesystem_errors {
149- print_error ( err. to_string ( ) ) ;
150238 }
151239 }
152240 Err ( RecvTimeoutError :: Timeout ) => {
@@ -319,13 +407,13 @@ impl WorkerState {
319407
320408 /// Run the receiver work, either on this thread or a pool of background
321409 /// threads (for --exec).
322- fn receive ( & self , rx : Receiver < WorkerResult > ) -> ExitCode {
410+ fn receive ( & self , rx : Receiver < ResultBatch > ) -> ExitCode {
323411 let config = & self . config ;
324412
325413 // This will be set to `Some` if the `--exec` argument was supplied.
326414 if let Some ( ref cmd) = config. command {
327415 if cmd. in_batch_mode ( ) {
328- exec:: batch ( rx, cmd, & config)
416+ exec:: batch ( rx. into_iter ( ) . flatten ( ) , cmd, & config)
329417 } else {
330418 let out_perm = Mutex :: new ( ( ) ) ;
331419
@@ -337,7 +425,7 @@ impl WorkerState {
337425 let rx = rx. clone ( ) ;
338426
339427 // Spawn a job thread that will listen for and execute inputs.
340- let handle = scope. spawn ( || exec:: job ( rx, cmd, & out_perm, & config) ) ;
428+ let handle = scope. spawn ( || exec:: job ( rx. into_iter ( ) . flatten ( ) , cmd, & out_perm, & config) ) ;
341429
342430 // Push the handle of the spawned thread into the vector for later joining.
343431 handles. push ( handle) ;
@@ -355,12 +443,12 @@ impl WorkerState {
355443 }
356444
357445 /// Spawn the sender threads.
358- fn spawn_senders ( & self , walker : WalkParallel , tx : Sender < WorkerResult > ) {
446+ fn spawn_senders ( & self , walker : WalkParallel , tx : Sender < ResultBatch > ) {
359447 walker. run ( || {
360448 let patterns = & self . patterns ;
361449 let config = & self . config ;
362450 let quit_flag = self . quit_flag . as_ref ( ) ;
363- let tx = tx. clone ( ) ;
451+ let mut tx = BatchSender :: new ( tx. clone ( ) ) ;
364452
365453 Box :: new ( move |entry| {
366454 if quit_flag. load ( Ordering :: Relaxed ) {
@@ -545,8 +633,7 @@ impl WorkerState {
545633 . unwrap ( ) ;
546634 }
547635
548- // Channel capacity was chosen empircally to perform similarly to an unbounded channel
549- let ( tx, rx) = bounded ( 0x4000 * config. threads ) ;
636+ let ( tx, rx) = bounded ( config. threads ) ;
550637
551638 let exit_code = thread:: scope ( |scope| {
552639 // Spawn the receiver thread(s)
0 commit comments