[go: up one dir, main page]

0% found this document useful (0 votes)
89 views44 pages

Chapter1-Working With Big Data

This document discusses computer storage and big data. It begins by defining big data as "data that exceeds the storage or processing capacity of typical database systems". It then discusses different storage mediums like RAM, solid state disks, rotational disks and their relative speeds of access. The document introduces concepts like kilobytes, megabytes etc and how data is measured in bytes in computers. It demonstrates measuring memory usage of Python processes, NumPy arrays and Pandas DataFrames. It discusses reading data files in chunks to avoid loading entire datasets into memory. Generators are introduced as a way to process data without storing it fully in memory. The document shows using generators to iteratively process chunks from files.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
0% found this document useful (0 votes)
89 views44 pages

Chapter1-Working With Big Data

This document discusses computer storage and big data. It begins by defining big data as "data that exceeds the storage or processing capacity of typical database systems". It then discusses different storage mediums like RAM, solid state disks, rotational disks and their relative speeds of access. The document introduces concepts like kilobytes, megabytes etc and how data is measured in bytes in computers. It demonstrates measuring memory usage of Python processes, NumPy arrays and Pandas DataFrames. It discusses reading data files in chunks to avoid loading entire datasets into memory. Generators are introduced as a way to process data without storing it fully in memory. The document shows using generators to iteratively process chunks from files.
Copyright
© © All Rights Reserved
We take content rights seriously. If you suspect this is your content, claim it here.
Available Formats
Download as PDF, TXT or read online on Scribd
You are on page 1/ 44

Understanding

Computer Storage &


Big Data
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
What is "big data"?

"Data > one machine"

PARALLEL PROGRAMMING WITH DASK IN PYTHON


wa W Byte B 23 bits
Kilowa KW 103 W Kilobyte KB 210 Bytes
Megawa MW 106 W Megabyte MB 220 Bytes
Gigawa GW 109 W Gigabyte GB 230 Bytes
Terawa TW 1012 W Terabyte TB 240 Bytes

Conventional units: factors Binary computers: base 2:


of 1000 Binary digit (bit)
Kilo→ Mega → Giga →
Byte: 23 bits = 8 bits
Tera →  ⋯
103 = 1000 ↦
210 = 1024

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Hard disks

Hard storage: hard disks (permanent, big, slow)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Random Access Memory (RAM)

So storage: RAM (temporary, small, fast)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Time scales of storage technologies
Storage Access Storage
Rescaled
medium time medium
RAM 120 ns RAM 1s
Solid-state disk 50-150 µs Solid-state disk 7-21 min
Rotational disk 1-10 ms 2.5 hr - 1
Rotational disk
day
Internet (SF to
40 ms
NY) Internet (SF to
3.9 days
NY)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Big data in practical terms
RAM: fast (ns-µs)

Hard disk: slow (µs-ms)

I/O (input/output) is
punitive!

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Querying Python interpreter's memory usage
import psutil, os
def memory_footprint():
...: '''Returns memory (in MB) being used by Python process'''
...: mem = psutil.Process(os.getpid()).memory_info().rss
...: return (mem / 1024 ** 2)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Allocating memory for an array
import numpy as np
before = memory_footprint()
N = (1024 ** 2) // 8 # Number of floats that fill 1 MB
x = np.random.randn(50*N) # Random array filling 50 MB
after = memory_footprint()
print('Memory before: {} MB'.format(before))

Memory before: 45.68359375 MB

print('Memory after: {} MB'.format(after))

Memory after: 95.765625 MB

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Allocating memory for a computation
before = memory_footprint()
x ** 2 # Computes, but doesn't bind result to a variable

array([ 0.16344891, 0.05993282, 0.53595334, ...,


0.50537523, 0.48967157, 0.06905984])

after = memory_footprint()
print('Extra memory obtained: {} MB'.format(after - before))

Extra memory obtained: 50.34375 MB

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Querying array memory Usage
x.nbytes # Memory footprint in bytes (B)

52428800

x.nbytes // (1024**2) # Memory footprint in megabytes (MB)

50

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Querying DataFrame memory usage
df = pd.DataFrame(x)

df.memory_usage(index=False)

0 52428800
dtype: int64

df.memory_usage(index=False) // (1024**2)

0 50
dtype: int64

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Thinking about Data
in Chunks
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
Using pd.read_csv() with chunksize
filename = 'NYC_taxi_2013_01.csv'
for chunk in pd.read_csv(filename, chunksize=50000):
...: print('type: %s shape %s' %
...: (type(chunk), chunk.shape))

type: <class 'pandas.core.frame.DataFrame'> shape (50000, 14)


type: <class 'pandas.core.frame.DataFrame'> shape (50000, 14)
type: <class 'pandas.core.frame.DataFrame'> shape (50000, 14)
type: <class 'pandas.core.frame.DataFrame'> shape (49999, 14)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Examining a chunk
chunk.shape

(49999, 14)

chunk.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 49999 entries, 150000 to 199998
Data columns (total 14 columns):
medallion 49999 non-null object
...
dropoff_latitude 49999 non-null float64
dtypes: float64(5), int64(3), object(6)
memory usage: 5.3+ MB

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Filtering a chunk
is_long_trip = (chunk.trip_time_in_secs > 1200)

chunk.loc[is_long_trip].shape

(5565, 14)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Chunking & filtering together
def filter_is_long_trip(data):
...: "Returns DataFrame filtering trips longer than 20 minutes"
...: is_long_trip = (data.trip_time_in_secs > 1200)
...: return data.loc[is_long_trip]

chunks = []
for chunk in pd.read_csv(filename, chunksize=1000):
...: chunks.append(filter_is_long_trip(chunk))

chunks = [filter_is_long_trip(chunk)
...: for chunk in pd.read_csv(filename,
...: chunksize=1000) ]

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Using pd.concat()
len(chunks)

200

lengths = [len(chunk) for chunk in chunks]


lengths[-5:] # Each has ~100 rows

[115, 147, 137, 109, 119]

long_trips_df = pd.concat(chunks)
long_trips_df.shape

(21661, 14)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


PARALLEL PROGRAMMING WITH DASK IN PYTHON
Plotting the filtered results
import matplotlib.pyplot as plt
long_trips_df.plot.scatter(x='trip_time_in_secs',
y='trip_distance');
plt.xlabel('Trip duration [seconds]');
plt.ylabel('Trip distance [miles]');
plt.title('NYC Taxi rides over 20 minutes (2013-01-01
to 2013-01-14)');
plt.show();

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Managing Data with
Generators
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
Filtering in a list comprehension
import pandas as pd
filename = 'NYC_taxi_2013_01.csv'
def filter_is_long_trip(data):
"Returns DataFrame filtering trips longer than 20 mins"
is_long_trip = (data.trip_time_in_secs > 1200)
return data.loc[is_long_trip]
chunks = [filter_is_long_trip(chunk)
for chunk in pd.read_csv(filename,
chunksize=1000)]

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Filtering & summing with generators
chunks = (filter_is_long_trip(chunk)
for chunk in pd.read_csv(filename,
chunksize=1000))
distances = (chunk['trip_distance'].sum() for chunk in chunks)
sum(distances)

230909.56000000003

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Examining consumed generators
distances

<generator object <genexpr> at 0x10766f9e8>

next(distances)

StopIteration Traceback (most recent call last)


<ipython-input-10-9995a5373b05> in <module>()

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Reading many files
template = 'yellow_tripdata_2015-{:02d}.csv'
filenames = (template.format(k) for k in range(1,13)) # Generator
for fname in filenames:
...: print(fname) # Examine contents

yellow_tripdata_2015-01.csv
yellow_tripdata_2015-02.csv
yellow_tripdata_2015-03.csv
yellow_tripdata_2015-04.csv
...
yellow_tripdata_2015-09.csv
yellow_tripdata_2015-10.csv
yellow_tripdata_2015-11.csv
yellow_tripdata_2015-12.csv

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Examining a sample DataFrame
df = pd.read_csv('yellow_tripdata_2015-12.csv', parse_dates=[1, 2])
df.info() # Columns deleted from output

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 71634 entries, 0 to 71633
Data columns (total 19 columns):
VendorID 71634 non-null int64
tpep_pickup_datetime 71634 non-null datetime64[ns]
tpep_dropoff_datetime 71634 non-null datetime64[ns]
passenger_count 71634 non-null int64
...
...
dtypes: datetime64[ns](2), float64(12), int64(4), object(1)
memory usage: 10.4+ MB

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Examining a sample DataFrame
def count_long_trips(df):
...: df['duration'] = (df.tpep_dropoff_datetime -
...: df.tpep_pickup_datetime).dt.seconds
...: is_long_trip = df.duration > 1200
...: result_dict = {'n_long':[sum(is_long_trip)],
...: 'n_total':[len(df)]}
...: return pd.DataFrame(result_dict)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating with Generators
def count_long_trips(df):
...: df['duration'] = (df.tpep_dropoff_datetime -
...: df.tpep_pickup_datetime).dt.seconds
...: is_long_trip = df.duration > 1200
...: result_dict = {'n_long':[sum(is_long_trip)],
...: 'n_total':[len(df)]}
...: return pd.DataFrame(result_dict)
filenames = [template.format(k) for k in range(1,13)] # Listcomp
dataframes = (pd.read_csv(fname, parse_dates=[1,2])
...: for fname in filenames) # Generator
totals = (count_long_trips(df) for df in dataframes) # Generator
annual_totals = sum(totals) # Consumes generators

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Computing the fraction of long trips
print(annual_totals)

n_long n_total
0 172617 851390

fraction = annual_totals['n_long'] / annual_totals['n_total']


print(fraction)

0 0.202747
dtype: float64

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N
Delaying
Computation with
Dask
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

Dhavide Aruliah
Director of Training, Anaconda
Composing functions
from math import sqrt x = 4
def f(z): y = h(x)
...: return sqrt(z + 4) z = g(y)
w = f(z)
def g(y):
...: return y - 3 print(w) # Final result

def h(x):
4.123105625617661
...: return x ** 2

4.123105625617661
print(f(g(h(x)))) # Equal

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Deferring computation with `delayed`
from dask import delayed
y = delayed(h)(x)
z = delayed(g)(y)
w = delayed(f)(z)
print(w)

Delayed('f-5f9307e5-eb43-4304-877f-1df5c583c11c')

type(w) # a dask Delayed object

dask.delayed.Delayed

w.compute() # Computation occurs now

4.123105625617661

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Visualizing a task graph
w.visualize()

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Renaming decorated functions
f = delayed(f)
g = delayed(g)
h = delayed(h)
w = f(g(h(4))

type(w) # a dask Delayed object

dask.delayed.Delayed

w.compute() # Computation occurs now

4.123105625617661

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Using decorator @-notation
def f(x):
...: return sqrt(x + 4)
f = delayed(f)
@delayed # Equivalent to definition in above 2 cells
...: def f(x):
...: return sqrt(x + 4)

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Deferring Computation with Loops
@delayed data = [1, 2, 3, 4, 5]
...: def increment(x): output = []
...: return x + 1 for x in data:
...: a = increment(x)
@delayed ...: b = double(x)
...: def double(x): ...: c = add(a, b)
...: return 2 * x ...: output.append(c)

@delayed total = sum(output)


...: def add(x, y):
...: return x + y

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Deferring computation with loops 2
total

Delayed('add-c6803f9e890c95cec8e2e3dd3c62b384')

output

[Delayed('add-6a624d8b-8ddb-44fc-b0f0-0957064f54b7'),
Delayed('add-9e779958-f3a0-48c7-a558-ce47fc9899f6'),
Delayed('add-f3552c6f-b09d-4679-a770-a7372e2c278b'),
Delayed('add-ce05d7e9-42ec-4249-9fd3-61989d9a9f7d'),
Delayed('add-dd950ec2-c17d-4e62-a267-1dabe2101bc4')]

total.visualize()

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Visualizing the task graph

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Aggregating with delayed Functions
template = 'yellow_tripdata_2015-{:02d}.csv'
filenames = [template.format(k) for k in range(1,13)]
@delayed
...: def count_long_trips(df):
...: df['duration'] = (df.tpep_dropoff_datetime -
...: df.tpep_pickup_datetime).dt.seconds
...: is_long_trip = df.duration > 1200
...: result_dict = {'n_long':[sum(is_long_trip)],
...: 'n_total':[len(df)]}
...: return pd.DataFrame(result_dict)
@delayed
...: def read_file(fname):
...: return pd.read_csv(fname, parse_dates=[1,2])

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Computing fraction of long trips with `delayed`
functions
totals = [count_long_trips(read_file(fname)) for fname in filenames]
annual_totals = sum(totals)
annual_totals = annual_totals.compute()

n_long n_total
0 172617 851390

fraction = annual_totals['n_long']/annual_totals['n_total']
print(fraction)

0 0.202747
dtype: float64

PARALLEL PROGRAMMING WITH DASK IN PYTHON


Let's practice!
PA R A L L E L P R O G R A M M I N G W I T H D A S K I N P Y T H O N

You might also like