8000 add data ingestion code by vuppalli · Pull Request #1 · leahecole/python-docs-samples · GitHub
[go: up one dir, main page]

Skip to content

add data ingestion code #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 65 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
65 commits
Select commit Hold shift + click to select a range
92cf763
add data ingestion code
vuppalli Jun 5, 2020
739114a
begin addressing comments
Symmetries Jun 8, 2020
681eaf3
change submit job
vuppalli Jun 8, 2020
4afbf1c
address code structure and global variable issues
Symmetries Jun 8, 2020
744f80c
get dataproc job output and fix linting
vuppalli Jun 8, 2020
8cd7dc6
fix PR comments
vuppalli Jun 9, 2020
81265d2
linting and global vars
vuppalli Jun 9, 2020
3e86bda
address Brad PR comments
vuppalli Jun 10, 2020
580c8e1
broken clean.py
tk744 Jun 11, 2020
4ed5a15
Revert "broken clean.py"
tk744 Jun 11, 2020
e6fe99d
optimize data ingestion
Symmetries Jun 16, 2020
540acaa
fix linting errors
vuppalli Jun 16, 2020
a7e2972
fix minor style issues
Symmetries Jun 16, 2020
3e5ba3b
remove pip from cluster config
Symmetries Jun 19, 2020
2106153
load external datasets from url
Symmetries Jun 26, 2020
9febbad
added dry-run flag
tk744 Jul 7, 2020
5d56b97
dry-run flag
Symmetries Jul 8, 2020
22be5d3
address some review comments
Symmetries Jul 9, 2020
f040542
optimize setup test
Symmetries Jul 14, 2020
55354df
query data in test
Symmetries Jul 15, 2020
5f80974
address live session comments
Symmetries Jul 17, 2020
e883765
add break statement
Symmetries Jul 20, 2020
2ec8b30
revert breaking table and dataset name change
Symmetries Jul 23, 2020
b5ea09e
fix datetime formatting in setup job
Symmetries Aug 4, 2020
213dfca
uncomment commented dataset creation and writing
Symmetries Aug 6, 2020
589568a
add data ingestion code
vuppalli Jun 5, 2020
9148f5b
begin addressing comments
Symmetries Jun 8, 2020
1abf664
change submit job
vuppalli Jun 8, 2020
c600724
address code structure and global variable issues
Symmetries Jun 8, 2020
ce04a6f
get dataproc job output and fix linting
vuppalli Jun 8, 2020
ef2d2b3
fix PR comments
vuppalli Jun 9, 2020
93394a3
linting and global vars
vuppalli Jun 9, 2020
a6fc6e6
address Brad PR comments
vuppalli Jun 10, 2020
1c9f526
broken clean.py
tk744 Jun 11, 2020
327cf5b
Revert "broken clean.py"
tk744 Jun 11, 2020
4bf07ee
optimize data ingestion
Symmetries Jun 16, 2020
8dbd3bc
fix linting errors
vuppalli Jun 16, 2020
4cdd733
fix minor style issues
Symmetries Jun 16, 2020
0769754
remove pip from cluster config
Symmetries Jun 19, 2020
52da79a
load external datasets from url
Symmetries Jun 26, 2020
2ac38ab
added dry-run flag
tk744 Jul 7, 2020
5ead6b2
dry-run flag
Symmetries Jul 8, 2020
3bb0f79
address some review comments
Symmetries Jul 9, 2020
c753ed7
optimize setup test
Symmetries Jul 14, 2020
e0ffb41
query data in test
Symmetries Jul 15, 2020
b0d334b
address live session comments
Symmetries Jul 17, 2020
33afd6c
add break statement
Symmetries Jul 20, 2020
9acb94e
revert breaking table and dataset name change
Symmetries Jul 23, 2020
c97d454
fix datetime formatting in setup job
Symmetries Aug 4, 2020
41406f9
uncomment commented dataset creation and writing
Symmetries Aug 6, 2020
0fcb63e
Merge branch 'master' into data-ingestion
Symmetries Aug 6, 2020
ca3c592
fix import order
Symmetries Aug 7, 2020
cf3aae3
use GOOGLE_CLOUD_PROJECT environment variable
Symmetries Aug 7, 2020
c0dc053
resolve merge issue
Symmetries Aug 7, 2020
5c3df6e
Merge branch 'master' of https://github.com/Symmetries/python-docs-sa…
Symmetries Aug 7, 2020
4a3c941
Merge branch 'master' into data-ingestion
leahecole Aug 10, 2020
dc11440
blacken and add f-strings to dms notation
Symmetries Aug 12, 2020
39b5289
Merge branch 'data-ingestion' of https://github.com/Symmetries/python…
Symmetries Aug 12, 2020
d35b855
change test variables names to match data cleaning
Symmetries Aug 12, 2020
6105f79
blacken setup_test file
Symmetries Aug 12, 2020
35ec8cb
fix unchanged variable name
Symmetries Aug 12, 2020
9561f35
WIP: address PR comments
Symmetries Aug 13, 2020
3242654
apply temporary fix for ANACONDA optional component
Symmetries Aug 13, 2020
b82059b
remove data cleaning files
Symmetries Aug 13, 2020
2f655e3
Merge branch 'master' into data-ingestion
leahecole Aug 13, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 0 additions & 29 deletions .gitignore

This file was deleted.

1 change: 1 addition & 0 deletions data-science-onramp/data-ingestion/requirements-test.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pytest==6.0.0
6 changes: 6 additions & 0 deletions data-science-onramp/data-ingestion/requirements.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
#grpcio==1.29.0
#google-auth==1.16.0
#google-auth-httplib2==0.0.3
google-cloud-storage==1.28.1
google-cloud-dataproc==2.0.0
google-cloud-bigquery==1.25.0
211 changes: 211 additions & 0 deletions data-science-onramp/data-ingestion/setup.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,211 @@
"""Setup Dataproc job for Data Science Onramp Sample Application
This job ingests an external gas prices in NY dataset as well as
takes a New York Citibike dataset available on BigQuery and
"dirties" the dataset before uploading it back to BigQuery
It needs the following arguments
* the name of the Google Cloud Storage bucket to be used
* the name of the BigQuery dataset to be created
* an optional --test flag to upload a subset of the dataset for testing
"""

import random
import sys

from google.cloud import bigquery
import pandas as pd
from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, expr, UserDefinedFunction, w 10000 hen
from pyspark.sql.types import FloatType, StringType, StructField, StructType

TABLE = "bigquery-public-data.new_york_citibike.citibike_trips"
CITIBIKE_TABLE_NAME = "RAW_DATA"
EXTERNAL_TABLES = {
"gas_prices": {
"url": "https://data.ny.gov/api/views/wuxr-ni2i/rows.csv",
"schema": StructType(
[
StructField("Date", StringType(), True),
StructField("New_York_State_Average_USD_per_Gal", FloatType(), True),
StructField("Albany_Average_USD_per_Gal", FloatType(), True),
StructField("Blinghamton_Average_USD_per_Gal", FloatType(), True),
StructField("Buffalo_Average_USD_per_Gal", FloatType(), True),
StructField("Nassau_Average_USD_per_Gal", FloatType(), True),
StructField("New_York_City_Average_USD_per_Gal", FloatType(), True),
StructField("Rochester_Average_USD_per_Gal", FloatType(), True),
StructField("Syracuse_Average_USD_per_Gal", FloatType(), True),
StructField("Utica_Average_USD_per_Gal", FloatType(), True),
]
),
},
}


# START MAKING DATA DIRTY
def trip_duration(duration):
"""Converts trip duration to other units"""
if not duration:
return None
seconds = f"{str(duration)} s"
minutes = f"{str(float(duration) / 60)} min"
hours = f"{str(float(duration) / 3600)} h"
return random.choices(
[seconds, minutes, hours, str(random.randint(-1000, -1))],
weights=[0.3, 0.3, 0.3, 0.1],
)[0]


def station_name(name):
"""Replaces '&' with '/' with a 50% chance"""
if not name:
return None
return random.choice([name, name.replace("&", "/")])


def user_type(user):
"""Manipulates the user type string"""
if not user:
return None
return random.choice(
[
user,
user.upper(),
user.lower(),
"sub" if user == "Subscriber" else user,
"cust" if user == "Customer" else user,
]
)


def gender(s):
"""Manipulates the gender string"""
if not s:
return None
return random.choice(
[
s.upper(),
s.lower(),
s[0].upper() if len(s) > 0 else "",
s[0].lower() if len(s) > 0 else "",
]
)


def convert_angle(angle):
"""Converts long and lat to DMS notation"""
if not angle:
return None
degrees = int(angle)
minutes = int((angle - degrees) * 60)
seconds = int((angle - degrees - minutes / 60) * 3600)
new_angle = f"{degrees}\u00B0{minutes}'{seconds}\""
return random.choices([str(angle), new_angle], weights=[0.55, 0.45])[0]


def create_bigquery_dataset(dataset_name):
# Create BigQuery Dataset
client = bigquery.Client()
dataset_id = f"{client.project}.{dataset_name}"
dataset = bigquery.Dataset(dataset_id)
dataset.location = "US"
dataset = client.create_dataset(dataset)


def write_to_bigquery(df, table_name, dataset_name):
"""Write a dataframe to BigQuery"""
client = bigquery.Client()
dataset_id = f"{client.project}.{dataset_name}"

# Saving the data to BigQuery
df.write.format("bigquery").option("table", f"{dataset_id}.{table_name}").save()

print(f"Table {table_name} successfully written to BigQuery")


def main():
# Get command line arguments
BUCKET_NAME = sys.argv[1]
DATASET_NAME = sys.argv[2]

# Create a SparkSession under the name "setup"
spark = SparkSession.builder.appName("setup").getOrCreate()

spark.conf.set("temporaryGcsBucket", BUCKET_NAME)

create_bigquery_dataset(DATASET_NAME)

# Whether we are running the job as a test
test = False

# Check whether or not the job is running as a test
if "--test" in sys.argv:
test = True
print("A subset of the whole dataset will be uploaded to BigQuery")
else:
print("Results will be uploaded to BigQuery")

# Ingest External Datasets
for table_name, data in EXTERNAL_TABLES.items():
df = spark.createDataFrame(pd.read_csv(data["url"]), schema=data["schema"])

write_to_bigquery(df, table_name, DATASET_NAME)

# Check if table exists
try:
df = spark.read.format("bigquery").option("table", TABLE).load()
# if we are running a test, perform computations on a subset of the data
if test:
df = df.sample(False, 0.00001)
except Py4JJavaError:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a short comment explaining why we'd return if this happens (if I understand correctly, this error will happen when you do the --dry-run option)

print(f"{TABLE} does not exist. ")
return

# Declare dictionary with keys column names and values user defined
# functions and return types
udf_map = {
"tripduration": (trip_duration, StringType()),
"start_station_name": (station_name, StringType()),
"start_station_latitude": (convert_angle, StringType()),
"start_station_longitude": (convert_angle, StringType()),
"end_station_name": (station_name, StringType()),
"end_station_latitude": (convert_angle, StringType()),
"end_station_longitude": (convert_angle, StringType()),
"usertype": (user_type, StringType()),
"gender": (gender, StringType()),
}

# Declare which columns to set some values to null randomly
null_columns = [
"tripduration",
"starttime",
"stoptime",
"start_station_latitude",
"start_station_longitude",
"end_station_latitude",
"end_station_longitude",
]

# Dirty the columns
for name, udf in udf_map.items():
df = df.withColumn(name, UserDefinedFunction(*udf)(name))

# Format the datetimes correctly
for name in ["starttime", "stoptime"]:
df = df.withColumn(name, date_format(name, "yyyy-MM-dd'T'HH:mm:ss"))

# Randomly set about 5% of the values in some columns to null
for name in null_columns:
df = df.withColumn(name, when(expr("rand() < 0.05"), None).otherwise(df[name]))

# Duplicate about 0.01% of the rows
dup_df = df.sample(True, 0.0001)

# Create final dirty dataframe
df = df.union(dup_df)

print("Uploading citibike dataset...")
write_to_bigquery(df, CITIBIKE_TABLE_NAME, DATASET_NAME)


if __name__ == "__main__":
main()
9 changes: 9 additions & 0 deletions data-science-onramp/data-ingestion/setup.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Submit a PySpark job via the Cloud Dataproc Jobs API
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment at the top with something along the lines of "requires having CLUSTER_NAME and BUCKET_NAME set in your environment"

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# Requires having CLUSTER_NAME and BUCKET_NAME set as
# environment variables

gcloud dataproc jobs submit pyspark \
--cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradmiro how often would this jar change? I'm worried about this shell script going stale when the jar is updated

Copy link
Collaborator
@bradmiro bradmiro Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not often at all. I haven't found a reliable way to dynamically call this jar, but transitioning to new Scala versions is not frequent.

--driver-log-levels root=FATAL \
setup.py -- ${BUCKET_NAME} new_york_citibike_trips
Loading
0