@@ -218,7 +218,7 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
218
218
CC :: Chunk : Send + ' static ,
219
219
{
220
220
match number. get ( ) {
221
- 1 => ParallelSorter :: Single ( self . build ( ) ) ,
221
+ 1 | 2 => ParallelSorter ( ParallelSorterInner :: Single ( self . build ( ) ) ) ,
222
222
number => {
223
223
let ( senders, receivers) : ( Vec < Sender < ( usize , Vec < u8 > ) > > , Vec < _ > ) =
224
224
repeat_with ( unbounded) . take ( number) . unzip ( ) ;
@@ -227,6 +227,7 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
227
227
for receiver in receivers {
228
228
let sorter_builder = self . clone ( ) ;
229
229
handles. push ( thread:: spawn ( move || {
230
+ // TODO make sure the max memory is divided by the number of threads
230
231
let mut sorter = sorter_builder. build ( ) ;
231
232
for ( key_length, data) in receiver {
232
233
let ( key, val) = data. split_at ( key_length) ;
@@ -236,7 +237,11 @@ impl<MF, CC: ChunkCreator> SorterBuilder<MF, CC> {
236
237
} ) ) ;
237
238
}
238
239
239
- ParallelSorter :: Multi { senders, handles, merge_function : self . merge }
240
+ ParallelSorter ( ParallelSorterInner :: Multi {
241
+ senders,
242
+ handles,
243
+ merge_function : self . merge ,
244
+ } )
240
245
}
241
246
}
242
247
}
@@ -712,14 +717,19 @@ where
712
717
}
713
718
}
714
719
715
- // TODO Make this private by wrapping it
716
- pub enum ParallelSorter < MF , U , CC : ChunkCreator = DefaultChunkCreator >
720
+ pub struct ParallelSorter < MF , U , CC : ChunkCreator = DefaultChunkCreator > (
721
+ ParallelSorterInner < MF , U , CC > ,
722
+ )
723
+ where
724
+ MF : for < ' a > Fn ( & [ u8 ] , & [ Cow < ' a , [ u8 ] > ] ) -> Result < Cow < ' a , [ u8 ] > , U > ;
725
+
726
+ enum ParallelSorterInner < MF , U , CC : ChunkCreator = DefaultChunkCreator >
717
727
where
718
728
MF : for < ' a > Fn ( & [ u8 ] , & [ Cow < ' a , [ u8 ] > ] ) -> Result < Cow < ' a , [ u8 ] > , U > ,
719
729
{
720
730
Single ( Sorter < MF , CC > ) ,
721
731
Multi {
722
- // Indicates the length of the key and the bytes assoicated to the key + the data.
732
+ // Indicates the length of the key and the bytes associated to the key + the data.
723
733
senders : Vec < Sender < ( usize , Vec < u8 > ) > > ,
724
734
handles : Vec < JoinHandle < Result < Vec < ReaderCursor < CC :: Chunk > > , Error < U > > > > ,
725
735
merge_function : MF ,
@@ -740,9 +750,9 @@ where
740
750
{
741
751
let key = key. as_ref ( ) ;
742
752
let val = val. as_ref ( ) ;
743
- match self {
744
- ParallelSorter :: Single ( sorter) => sorter. insert ( key, val) ,
745
- ParallelSorter :: Multi { senders, .. } => {
753
+ match & mut self . 0 {
754
+ ParallelSorterInner :: Single ( sorter) => sorter. insert ( key, val) ,
755
+ ParallelSorterInner :: Multi { senders, .. } => {
746
756
let key_length = key. len ( ) ;
747
757
let key_hash = compute_hash ( key) ;
748
758
@@ -766,9 +776,9 @@ where
766
776
767
777
/// Consumes this [`Sorter`] and outputs a stream of the merged entries in key-order.
768
778
pub fn into_stream_merger_iter ( self ) -> Result < MergerIter < CC :: Chunk , MF > , Error < U > > {
769
- match self {
770
- ParallelSorter :: Single ( sorter) => sorter. into_stream_merger_iter ( ) ,
771
- ParallelSorter :: Multi { senders, handles, merge_function } => {
779
+ match self . 0 {
780
+ ParallelSorterInner ::Single ( sorter) => sorter. into_stream_merger_iter ( ) ,
781
+ ParallelSorterInner :: Multi { senders, handles, merge_function } => {
772
782
drop ( senders) ;
773
783
774
784
let mut sources = Vec :: new ( ) ;
0 commit comments