-
Notifications
You must be signed in to change notification settings - Fork 1
add data ingestion code #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
92cf763
739114a
681eaf3
4afbf1c
744f80c
8cd7dc6
81265d2
3e86bda
580c8e1
4ed5a15
e6fe99d
540acaa
a7e2972
3e5ba3b
2106153
9febbad
5d56b97
22be5d3
f040542
55354df
5f80974
e883765
2ec8b30
b5ea09e
213dfca
589568a
9148f5b
1abf664
c600724
ce04a6f
ef2d2b3
93394a3
a6fc6e6
1c9f526
327cf5b
4bf07ee
8dbd3bc
4cdd733
0769754
52da79a
2ac38ab
5ead6b2
3bb0f79
c753ed7
e0ffb41
b0d334b
33afd6c
9acb94e
c97d454
41406f9
0fcb63e
ca3c592
cf3aae3
c0dc053
5c3df6e
4a3c941
dc11440
39b5289
d35b855
6105f79
35ec8cb
9561f35
3242654
b82059b
2f655e3
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
This file was deleted.
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1 @@ | ||
pytest==6.0.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,6 @@ | ||
#grpcio==1.29.0 | ||
#google-auth==1.16.0 | ||
#google-auth-httplib2==0.0.3 | ||
google-cloud-storage==1.28.1 | ||
google-cloud-dataproc==2.0.0 | ||
google-cloud-bigquery==1.25.0 |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,211 @@ | ||
"""Setup Dataproc job for Data Science Onramp Sample Application | ||
This job ingests an external gas prices in NY dataset as well as | ||
takes a New York Citibike dataset available on BigQuery and | ||
"dirties" the dataset before uploading it back to BigQuery | ||
It needs the following arguments | ||
* the name of the Google Cloud Storage bucket to be used | ||
* the name of the BigQuery dataset to be created | ||
* an optional --test flag to upload a subset of the dataset for testing | ||
""" | ||
|
||
import random | ||
import sys | ||
|
||
from google.cloud import bigquery | ||
import pandas as pd | ||
from py4j.protocol import Py4JJavaError | ||
from pyspark.sql import SparkSession | ||
from pyspark.sql.functions import date_format, expr, UserDefinedFunction, w 10000 hen | ||
from pyspark.sql.types import FloatType, StringType, StructField, StructType | ||
|
||
TABLE = "bigquery-public-data.new_york_citibike.citibike_trips" | ||
CITIBIKE_TABLE_NAME = "RAW_DATA" | ||
EXTERNAL_TABLES = { | ||
"gas_prices": { | ||
"url": "https://data.ny.gov/api/views/wuxr-ni2i/rows.csv", | ||
"schema": StructType( | ||
[ | ||
StructField("Date", StringType(), True), | ||
StructField("New_York_State_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Albany_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Blinghamton_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Buffalo_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Nassau_Average_USD_per_Gal", FloatType(), True), | ||
StructField("New_York_City_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Rochester_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Syracuse_Average_USD_per_Gal", FloatType(), True), | ||
StructField("Utica_Average_USD_per_Gal", FloatType(), True), | ||
] | ||
), | ||
}, | ||
} | ||
|
||
|
||
# START MAKING DATA DIRTY | ||
def trip_duration(duration): | ||
"""Converts trip duration to other units""" | ||
if not duration: | ||
return None | ||
seconds = f"{str(duration)} s" | ||
minutes = f"{str(float(duration) / 60)} min" | ||
hours = f"{str(float(duration) / 3600)} h" | ||
return random.choices( | ||
[seconds, minutes, hours, str(random.randint(-1000, -1))], | ||
weights=[0.3, 0.3, 0.3, 0.1], | ||
)[0] | ||
|
||
|
||
def station_name(name): | ||
"""Replaces '&' with '/' with a 50% chance""" | ||
if not name: | ||
return None | ||
return random.choice([name, name.replace("&", "/")]) | ||
|
||
|
||
def user_type(user): | ||
"""Manipulates the user type string""" | ||
if not user: | ||
return None | ||
return random.choice( | ||
[ | ||
user, | ||
user.upper(), | ||
user.lower(), | ||
"sub" if user == "Subscriber" else user, | ||
"cust" if user == "Customer" else user, | ||
] | ||
) | ||
|
||
|
||
def gender(s): | ||
"""Manipulates the gender string""" | ||
if not s: | ||
return None | ||
return random.choice( | ||
[ | ||
s.upper(), | ||
s.lower(), | ||
s[0].upper() if len(s) > 0 else "", | ||
s[0].lower() if len(s) > 0 else "", | ||
] | ||
) | ||
|
||
|
||
def convert_angle(angle): | ||
"""Converts long and lat to DMS notation""" | ||
if not angle: | ||
return None | ||
degrees = int(angle) | ||
minutes = int((angle - degrees) * 60) | ||
seconds = int((angle - degrees - minutes / 60) * 3600) | ||
new_angle = f"{degrees}\u00B0{minutes}'{seconds}\"" | ||
return random.choices([str(angle), new_angle], weights=[0.55, 0.45])[0] | ||
|
||
|
||
def create_bigquery_dataset(dataset_name): | ||
# Create BigQuery Dataset | ||
client = bigquery.Client() | ||
dataset_id = f"{client.project}.{dataset_name}" | ||
dataset = bigquery.Dataset(dataset_id) | ||
dataset.location = "US" | ||
dataset = client.create_dataset(dataset) | ||
|
||
|
||
def write_to_bigquery(df, table_name, dataset_name): | ||
"""Write a dataframe to BigQuery""" | ||
client = bigquery.Client() | ||
dataset_id = f"{client.project}.{dataset_name}" | ||
|
||
# Saving the data to BigQuery | ||
df.write.format("bigquery").option("table", f"{dataset_id}.{table_name}").save() | ||
|
||
print(f"Table {table_name} successfully written to BigQuery") | ||
|
||
|
||
def main(): | ||
# Get command line arguments | ||
BUCKET_NAME = sys.argv[1] | ||
DATASET_NAME = sys.argv[2] | ||
|
||
# Create a SparkSession under the name "setup" | ||
spark = SparkSession.builder.appName("setup").getOrCreate() | ||
|
||
spark.conf.set("temporaryGcsBucket", BUCKET_NAME) | ||
|
||
create_bigquery_dataset(DATASET_NAME) | ||
|
||
# Whether we are running the job as a test | ||
test = False | ||
|
||
# Check whether or not the job is running as a test | ||
if "--test" in sys.argv: | ||
test = True | ||
print("A subset of the whole dataset will be uploaded to BigQuery") | ||
else: | ||
print("Results will be uploaded to BigQuery") | ||
|
||
# Ingest External Datasets | ||
for table_name, data in EXTERNAL_TABLES.items(): | ||
df = spark.createDataFrame(pd.read_csv(data["url"]), schema=data["schema"]) | ||
|
||
write_to_bigquery(df, table_name, DATASET_NAME) | ||
|
||
# Check if table exists | ||
try: | ||
df = spark.read.format("bigquery").option("table", TABLE).load() | ||
# if we are running a test, perform computations on a subset of the data | ||
if test: | ||
df = df.sample(False, 0.00001) | ||
except Py4JJavaError: | ||
print(f"{TABLE} does not exist. ") | ||
return | ||
|
||
# Declare dictionary with keys column names and values user defined | ||
# functions and return types | ||
udf_map = { | ||
"tripduration": (trip_duration, StringType()), | ||
"start_station_name": (station_name, StringType()), | ||
"start_station_latitude": (convert_angle, StringType()), | ||
"start_station_longitude": (convert_angle, StringType()), | ||
"end_station_name": (station_name, StringType()), | ||
"end_station_latitude": (convert_angle, StringType()), | ||
"end_station_longitude": (convert_angle, StringType()), | ||
"usertype": (user_type, StringType()), | ||
"gender": (gender, StringType()), | ||
} | ||
|
||
# Declare which columns to set some values to null randomly | ||
null_columns = [ | ||
"tripduration", | ||
"starttime", | ||
"stoptime", | ||
"start_station_latitude", | ||
"start_station_longitude", | ||
"end_station_latitude", | ||
"end_station_longitude", | ||
] | ||
|
||
# Dirty the columns | ||
for name, udf in udf_map.items(): | ||
df = df.withColumn(name, UserDefinedFunction(*udf)(name)) | ||
|
||
# Format the datetimes correctly | ||
for name in ["starttime", "stoptime"]: | ||
df = df.withColumn(name, date_format(name, "yyyy-MM-dd'T'HH:mm:ss")) | ||
|
||
# Randomly set about 5% of the values in some columns to null | ||
for name in null_columns: | ||
df = df.withColumn(name, when(expr("rand() < 0.05"), None).otherwise(df[name])) | ||
|
||
# Duplicate about 0.01% of the rows | ||
dup_df = df.sample(True, 0.0001) | ||
|
||
# Create final dirty dataframe | ||
df = df.union(dup_df) | ||
|
||
print("Uploading citibike dataset...") | ||
write_to_bigquery(df, CITIBIKE_TABLE_NAME, DATASET_NAME) | ||
|
||
|
||
if __name__ == "__main__": | ||
main() |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,9 @@ | ||
# Submit a PySpark job via the Cloud Dataproc Jobs API | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would add a comment at the top with something along the lines of "requires having CLUSTER_NAME and BUCKET_NAME set in your environment" There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. +1 |
||
# Requires having CLUSTER_NAME and BUCKET_NAME set as | ||
# environment variables | ||
|
||
gcloud dataproc jobs submit pyspark \ | ||
--cluster ${CLUSTER_NAME} \ | ||
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \ | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @bradmiro how often would this jar change? I'm worried about this shell script going stale when the jar is updated There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not often at all. I haven't found a reliable way to dynamically call this jar, but transitioning to new Scala versions is not frequent. |
||
--driver-log-levels root=FATAL \ | ||
setup.py -- ${BUCKET_NAME} new_york_citibike_trips |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a short comment explaining why we'd return if this happens (if I understand correctly, this error will happen when you do the
--dry-run
option)