[go: up one dir, main page]

100% found this document useful (1 vote)
83 views24 pages

Chapter3-Working With Dask DataFrames

This document introduces Dask DataFrames for parallel processing of large datasets in Python. It shows how to read CSV files lazily using Dask and build delayed pipelines to operate on the data in parallel. The key advantages of Dask over Pandas are its ability to handle datasets that are too large to fit into memory by distributing the data and computations across multiple cores or machines. The document also compares the performance of Dask and Pandas on tasks like reading taxi trip data files and computing aggregations, demonstrating that Dask can improve performance for large datasets that require out-of-core computation.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
100% found this document useful (1 vote)
83 views24 pages

Chapter3-Working With Dask DataFrames

This document introduces Dask DataFrames for parallel processing of large datasets in Python. It shows how to read CSV files lazily using Dask and build delayed pipelines to operate on the data in parallel. The key advantages of Dask over Pandas are its ability to handle datasets that are too large to fit into memory by distributing the data and computations across multiple cores or machines. The document also compares the performance of Dask and Pandas on tasks like reading taxi trip data files and computing aggregations, demonstrating that Dask can improve performance for large datasets that require out-of-core computation.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 24

Using Dask

DataFrames
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
Reading CSV
import dask.dataframe as dd

dd.read_csv() function
Accepts single lename or glob pa ern (with wildcard * )

Does not read le immediately (lazy evaluation)

File(s) need not t in memory

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Reading multiple CSV files
%ls

quarter1.csv quarter2.csv quarter3.csv quarter4.csv

transactions.head()
transactions = dd.read_csv('*.csv')
transactions.tail()

id names amount date id names amount date


0 131 Norbert -1159 2016-01-01 195 838 Wendy 87 2016-12-28
1 342 Jerry 1149 2016-01-01 196 915 Bob 852 2016-12-30
2 485 Dan 1380 2016-01-01 197 749 Patricia 1741 2016-12-31
3 513 Xavier 1555 2016-01-02 198 743 Michael 1191 2016-12-31
4 849 Michael 363 2016-01-02 199 889 Wendy 336 2016-12-31

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Building delayed pipelines
is_wendy = (transactions['names'] == 'Wendy')
wendy_amounts = transactions.loc[is_wendy, 'amount']
wendy_amounts

Dask Series Structure:


npartitions=4
None int64
None ...
None ...
None ...
None ...
Name: amount, dtype: int64
Dask Name: loc-series, 24 tasks

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Building delayed pipelines
wendy_diff = wendy_amounts.sum()
wendy_diff

dd.Scalar<series-..., dtype=int64>

wendy_diff.visualize(rankdir='LR')

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Visualizing pipelines

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Compatibility with Pandas API
Unavailable in dask.dataframe :

some unsupported le formats (e.g., .xls , .zip , .gz )

sorting

Available in dask.dataframe :

indexing, selection, & reindexing

aggregations: .sum() , .mean() , .std() , .min() , .max()


etc.

grouping with .groupby()

datetime conversion with dd.to_datetime()

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Timing DataFrame
Operations
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
How big is big data?
Data size M Required hardware

M < 8 GB RAM (single machine)

8 GB < M < 10 TB hard disk (single machine)

M > 10 TB: specialized hardware


Two key questions:

Data ts in RAM (random access memory)?

Data ts on hard disk?

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Taxi CSV files
%ll -h yellow_tripdata_2015-*.csv

-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-01.csv


-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-02.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-03.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-04.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-05.csv
-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-06.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-07.csv
-rw-r--r-- 1 user staff 1.6G 31 Jul 16:43 yellow_tripdata_2015-08.csv
-rw-r--r-- 1 user staff 1.6G 31 Jul 16:43 yellow_tripdata_2015-09.csv
-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-10.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-11.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-12.csv

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Timing I/O & computation: Pandas
import time, pandas as pd
t_start = time.time();
df = pd.read_csv('yellow_tripdata_2015-01.csv');
t_end = time.time();
print('pd.read_csv(): {} s'.format(t_end-t_start)) # time [s]

pd.read_csv: 43.820565938949585 s

t_start = time.time();
m = df['trip_distance'].mean();
t_end = time.time();
print('.mean(): {} ms'.format((t_end-t_start)*1000)) # time [ms]

.mean(): 17.752885818481445 ms

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Timing I/O & computation: Dask
import dask.dataframe as dd, time
t_start = time.time();
df = dd.read_csv('yellow_tripdata_2015-*.csv');
t_end = time.time();
print('dd.read_csv: {} ms'.format((t_end-t_start)*1000)) # time [ms]

dd.read_csv: 404.7999382019043 ms

t_start = time.time();
m = df['trip_distance'].mean();
t_end = time.time();
print('.mean(): {} ms'.format((t_end-t_start)*1000)) # time [ms]

.mean(): 2.289295196533203 ms

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Timing I/O & computation: Dask
t_start = time.time();
result = m.compute();
t_end = time.time();
print('.compute(): {} min'.format((t_end-t_start)/60)) # time [min]

.compute(): 3.4004417498906454 min

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Timing in the IPython shell
m = df['trip_distance'].mean()
%time result = m.compute()

CPU times: user 9min 50s, sys: 1min 16s, total: 11min 7s
Wall time: 3min 1s

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Is Dask or Pandas appropriate?
How big is dataset?

How much RAM available?

How many threads/cores/CPUs available?

Are Pandas computations/formats supported in Dask API?

Is computation I/O-bound (disk-intensive) or CPU-bound


(processor intensive)?

Best use case for Dask

Computations from Pandas API available in Dask

Problem size close to limits of RAM, ts on disk

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Analyzing NYC Taxi
Rides
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
The New York taxi dataset

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Taxi CSV files
%ll -h yellow_tripdata_2015-*.csv

-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-01.csv


-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-02.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-03.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-04.csv
-rw-r--r-- 1 user staff 1.9G 31 Jul 16:43 yellow_tripdata_2015-05.csv
-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-06.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-07.csv
-rw-r--r-- 1 user staff 1.6G 31 Jul 16:43 yellow_tripdata_2015-08.csv
-rw-r--r-- 1 user staff 1.6G 31 Jul 16:43 yellow_tripdata_2015-09.csv
-rw-r--r-- 1 user staff 1.8G 31 Jul 16:43 yellow_tripdata_2015-10.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-11.csv
-rw-r--r-- 1 user staff 1.7G 31 Jul 16:43 yellow_tripdata_2015-12.csv

Exercises use smaller les...

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Taxi data features
import pandas as pd
df = pd.read_csv('yellow_tripdata_2015-01.csv')
df.shape
df.columns

(12748986, 19)
Index(['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime',
'passenger_count', 'trip_distance', 'pickup_longitude',
'pickup_latitude', 'RateCodeID', 'store_and_fwd_flag',
'dropoff_longitude', 'dropoff_latitude', 'payment_type',
'fare_amount','extra', 'mta_tax', 'tip_amount',
'tolls_amount','improvement_surcharge', 'total_amount'],
dtype='object')

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Amount paid
How much was each ride?
fare_amount : cost of ride

tolls_amount : charges for toll roads

extra : additional charges

tip_amount : amount tipped (credit cards only)

total_amount : total amount paid by passenger

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Payment type
df['payment_type'].value_counts()

1 7881388
2 4816992
3 38632
4 11972
5 2
Name: payment_type, dtype: int64

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

You might also like