@@ -71,8 +71,6 @@ impl PyTableProvider {
71
71
PyTable :: new ( table_provider)
72
72
}
73
73
}
74
- const MAX_TABLE_BYTES_TO_DISPLAY : usize = 2 * 1024 * 1024 ; // 2 MB
75
- const MIN_TABLE_ROWS_TO_DISPLAY : usize = 20 ;
76
74
77
75
/// A PyDataFrame is a representation of a logical plan and an API to compose statements.
78
76
/// Use it to build a plan and `.collect()` to execute the plan and collect the result.
@@ -81,12 +79,16 @@ const MIN_TABLE_ROWS_TO_DISPLAY: usize = 20;
81
79
#[ derive( Clone ) ]
82
80
pub struct PyDataFrame {
83
81
df : Arc < DataFrame > ,
82
+ display_config : Arc < PyDataframeDisplayConfig > ,
84
83
}
85
84
86
85
impl PyDataFrame {
87
86
/// creates a new PyDataFrame
88
- pub fn new ( df : DataFrame ) -> Self {
89
- Self { df : Arc :: new ( df) }
87
+ pub fn new ( df : DataFrame , display_config : PyDataframeDisplayConfig ) -> Self {
88
+ Self {
89
+ df : Arc :: new ( df) ,
90
+ display_config : Arc :: new ( display_config) ,
91
+ }
90
92
}
91
93
}
92
94
@@ -116,7 +118,12 @@ impl PyDataFrame {
116
118
fn __repr__ ( & self , py : Python ) -> PyDataFusionResult < String > {
117
119
let ( batches, has_more) = wait_for_future (
118
120
py,
119
- collect_record_batches_to_display ( self . df . as_ref ( ) . clone ( ) , 10 , 10 ) ,
121
+ collect_record_batches_to_display (
122
+ self . df . as_ref ( ) . clone ( ) ,
123
+ 10 ,
124
+ 10 ,
125
+ self . display_config . max_table_bytes ,
126
+ ) ,
120
127
) ?;
121
128
if batches. is_empty ( ) {
122
129
// This should not be reached, but do it for safety since we index into the vector below
@@ -139,8 +146,9 @@ impl PyDataFrame {
139
146
py,
140
147
collect_record_batches_to_display (
141
148
self . df . as_ref ( ) . clone ( ) ,
142
- MIN_TABLE_ROWS_TO_DISPLAY ,
149
+ self . display_config . min_table_rows ,
143
150
usize:: MAX ,
151
+ self . display_config . max_table_bytes ,
144
152
) ,
145
153
) ?;
146
154
if batches. is_empty ( ) {
@@ -181,7 +189,7 @@ impl PyDataFrame {
181
189
fn describe ( & self , py : Python ) -> PyDataFusionResult < Self > {
182
190
let df = self . df . as_ref ( ) . clone ( ) ;
183
191
let stat_df = wait_for_future ( py, df. describe ( ) ) ?;
184
- Ok ( Self :: new ( stat_df) )
192
+ Ok ( Self :: new ( stat_df, ( * self . display_config ) . clone ( ) ) )
185
193
}
186
194
187
195
/// Returns the schema from the logical plan
@@ -211,31 +219,31 @@ impl PyDataFrame {
211
219
fn select_columns ( & self , args : Vec < PyBackedStr > ) -> PyDataFusionResult < Self > {
212
220
let args = args. iter ( ) . map ( |s| s. as_ref ( ) ) . collect :: < Vec < & str > > ( ) ;
213
221
let df = self . df . as_ref ( ) . clone ( ) . select_columns ( & args) ?;
214
- Ok ( Self :: new ( df) )
222
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
215
223
}
216
224
217
225
#[ pyo3( signature = ( * args) ) ]
218
226
fn select ( & self , args : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
219
227
let expr = args. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
220
228
let df = self . df . as_ref ( ) . clone ( ) . select ( expr) ?;
221
- Ok ( Self :: new ( df) )
229
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
222
230
}
223
231
224
232
#[ pyo3( signature = ( * args) ) ]
225
233
fn drop ( & self , args : Vec < PyBackedStr > ) -> PyDataFusionResult < Self > {
226
234
let cols = args. iter ( ) . map ( |s| s. as_ref ( ) ) . collect :: < Vec < & str > > ( ) ;
227
235
let df = self . df . as_ref ( ) . clone ( ) . drop_columns ( & cols) ?;
228
- Ok ( Self :: new ( df) )
236
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
229
237
}
230
238
231
239
fn filter ( & self , predicate : PyExpr ) -> PyDataFusionResult < Self > {
232
240
let df = self . df . as_ref ( ) . clone ( ) . filter ( predicate. into ( ) ) ?;
233
- Ok ( Self :: new ( df) )
241
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
234
242
}
235
243
236
244
fn with_column ( & self , name : & str , expr : PyExpr ) -> PyDataFusionResult < Self > {
237
245
let df = self . df . as_ref ( ) . clone ( ) . with_column ( name, expr. into ( ) ) ?;
238
- Ok ( Self :: new ( df) )
246
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
239
247
}
240
248
241
249
fn with_columns ( & self , exprs : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
@@ -245,7 +253,7 @@ impl PyDataFrame {
245
253
let name = format ! ( "{}" , expr. schema_name( ) ) ;
246
254
df = df. with_column ( name. as_str ( ) , expr) ?
247
255
}
248
- Ok ( Self :: new ( df) )
256
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
249
257
}
250
258
251
259
/// Rename one column by applying a new projection. This is a no-op if the column to be
@@ -256,27 +264,27 @@ impl PyDataFrame {
256
264
. as_ref ( )
257
265
. clone ( )
258
266
. with_column_renamed ( old_name, new_name) ?;
259
- Ok ( Self :: new ( df) )
267
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
260
268
}
261
269
262
270
fn aggregate ( & self , group_by : Vec < PyExpr > , aggs : Vec < PyExpr > ) -> PyDataFusionResult < Self > {
263
271
let group_by = group_by. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
264
272
let aggs = aggs. into_iter ( ) . map ( |e| e. into ( ) ) . collect ( ) ;
265
273
let df = self . df . as_ref ( ) . clone ( ) . aggregate ( group_by, aggs) ?;
266
- Ok ( Self :: new ( df) )
274
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
267
275
}
268
276
269
277
#[ pyo3( signature = ( * exprs) ) ]
270
278
fn sort ( & self , exprs : Vec < PySortExpr > ) -> PyDataFusionResult < Self > {
271
279
let exprs = to_sort_expressions ( exprs) ;
272
280
let df = self . df . as_ref ( ) . clone ( ) . sort ( exprs) ?;
273
- Ok ( Self :: new ( df) )
281
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
274
282
}
275
283
276
284
#[ pyo3( signature = ( count, offset=0 ) ) ]
277
285
fn limit ( & self , count : usize , offset : usize ) -> PyDataFusionResult < Self > {
278
286
let df = self . df . as_ref ( ) . clone ( ) . limit ( offset, Some ( count) ) ?;
279
- Ok ( Self :: new ( df) )
287
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
280
288
}
281
289
282
290
/// Executes the plan, returning a list of `RecordBatch`es.
@@ -293,7 +301,7 @@ impl PyDataFrame {
293
301
/// Cache DataFrame.
294
302
fn cache ( & self , py : Python ) -> PyDataFusionResult < Self > {
295
303
let df = wait_for_future ( py, self . df . as_ref ( ) . clone ( ) . cache ( ) ) ?;
296
- Ok ( Self :: new ( df) )
304
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
297
305
}
298
306
299
307
/// Executes this DataFrame and collects all results into a vector of vector of RecordBatch
@@ -318,7 +326,7 @@ impl PyDataFrame {
318
326
/// Filter out duplicate rows
319
327
fn distinct ( & self ) -> PyDataFusionResult < Self > {
320
328
let df = self . df . as_ref ( ) . clone ( ) . distinct ( ) ?;
321
- Ok ( Self :: new ( df) )
329
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
322
330
}
323
331
324
332
fn join (
@@ -352,7 +360,7 @@ impl PyDataFrame {
352
360
& right_keys,
353
361
None ,
354
362
) ?;
355
- Ok ( Self :: new ( df) )
363
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
356
364
}
357
365
358
366
fn join_on (
@@ -381,7 +389,7 @@ impl PyDataFrame {
381
389
. as_ref ( )
382
390
. clone ( )
383
391
. join_on ( right. df . as_ref ( ) . clone ( ) , join_type, exprs) ?;
384
- Ok ( Self :: new ( df) )
392
+ Ok ( Self :: new ( df, ( * self . display_config ) . clone ( ) ) )
385
393
}
386
394
387
395
/// Print the query plan
@@ -414,7 +422,7 @@ impl PyDataFrame {
414
422
. as_ref ( )
415
423
. clone ( )
416
424
. repartition ( Partitioning :: RoundRobinBatch ( num) ) ?;
417
- Ok ( Self :: new ( new_df) )
425
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
418
426
}
419
427
420
428
/// Repartition a `DataFrame` based on a logical partitioning scheme.
@@ -426,7 +434,7 @@ impl PyDataFrame {
426
434
. as_ref ( )
427
435
. clone ( )
428
436
. repartition ( Partitioning :: Hash ( expr, num) ) ?;
429
- Ok ( Self :: new ( new_df) )
437
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
430
438
}
431
439
432
440
/// Calculate the union of two `DataFrame`s, preserving duplicate rows.The
@@ -442,7 +450,7 @@ impl PyDataFrame {
442
450
self . df . as_ref ( ) . clone ( ) . union ( py_df. df . as_ref ( ) . clone ( ) ) ?
443
451
} ;
444
452
445
- Ok ( Self :: new ( new_df) )
453
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
446
454
}
447
455
448
456
/// Calculate the distinct union of two `DataFrame`s. The
@@ -453,7 +461,7 @@ impl PyDataFrame {
453
461
. as_ref ( )
454
462
. clone ( )
455
463
. union_distinct ( py_df. df . as_ref ( ) . clone ( ) ) ?;
456
- Ok ( Self :: new ( new_df) )
464
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
457
465
}
458
466
459
467
#[ pyo3( signature = ( column, preserve_nulls=true ) ) ]
@@ -494,13 +502,13 @@ impl PyDataFrame {
494
502
. as_ref ( )
495
503
. clone ( )
496
504
. intersect ( py_df. df . as_ref ( ) . clone ( ) ) ?;
497
- Ok ( Self :: new ( new_df) )
505
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
498
506
}
499
507
500
508
/// Calculate the exception of two `DataFrame`s. The two `DataFrame`s must have exactly the same schema
501
509
fn except_all ( & self , py_df : PyDataFrame ) -> PyDataFusionResult < Self > {
502
510
let new_df = self . df . as_ref ( ) . clone ( ) . except ( py_df. df . as_ref ( ) . clone ( ) ) ?;
503
- Ok ( Self :: new ( new_df) )
511
+ Ok ( Self :: new ( new_df, ( * self . display_config ) . clone ( ) ) )
504
512
}
505
513
506
514
/// Write a `DataFrame` to a CSV file.
@@ -798,6 +806,7 @@ async fn collect_record_batches_to_display(
798
806
df : DataFrame ,
799
807
min_rows : usize ,
800
808
max_rows : usize ,
809
+ max_table_bytes : usize ,
801
810
) -> Result < ( Vec < RecordBatch > , bool ) , DataFusionError > {
802
811
let partitioned_stream = df. execute_stream_partitioned ( ) . await ?;
803
812
let mut stream = futures:: stream:: iter ( partitioned_stream) . flatten ( ) ;
@@ -806,7 +815,7 @@ async fn collect_record_batches_to_display(
806
815
let mut record_batches = Vec :: default ( ) ;
807
816
let mut has_more = false ;
808
817
809
- while ( size_estimate_so_far < MAX_TABLE_BYTES_TO_DISPLAY && rows_so_far < max_rows)
818
+ while ( size_estimate_so_far < max_table_bytes && rows_so_far < max_rows)
810
819
|| rows_so_far < min_rows
811
820
{
812
821
let mut rb = match stream. next ( ) . await {
@@ -821,8 +830,8 @@ async fn collect_record_batches_to_display(
821
830
if rows_in_rb > 0 {
822
831
size_estimate_so_far += rb. get_array_memory_size ( ) ;
823
832
824
- if size_estimate_so_far > MAX_TABLE_BYTES_TO_DISPLAY {
825
- let ratio = MAX_TABLE_BYTES_TO_DISPLAY as f32 / size_estimate_so_far as f32 ;
833
+ if size_estimate_so_far > max_table_bytes {
834
+ let ratio = max_table_bytes as f32 / size_estimate_so_far as f32 ;
826
835
let total_rows = rows_in_rb + rows_so_far;
827
836
828
837
let mut reduced_row_num = ( total_rows as f32 * ratio) . round ( ) as usize ;
0 commit comments