diff --git a/.gitignore b/.gitignore deleted file mode 100644 index c827e035649..00000000000 --- a/.gitignore +++ /dev/null @@ -1,29 +0,0 @@ -.coveralls.yml -*.pyc -.coverage -.tox -.pytest_cache -.ipynb_checkpoints -.executed_notebooks -coverage.xml -python-docs-samples.json -service-account.json -client-secrets.json -__pycache__ -*db\.sqlite3 -managed_vms/django_tutorial/static/* -**/migrations/* -lib -testing/resources/test-env.sh -testing/resources/service-account.json -testing/resources/client-secrets.json -secrets.tar -.cache -junit.xml -credentials.dat -.nox -.vscode/ -*sponge_log.xml -.DS_store -env/ -.idea diff --git a/data-science-onramp/data-ingestion/requirements-test.txt b/data-science-onramp/data-ingestion/requirements-test.txt new file mode 100644 index 00000000000..2018c08113a --- /dev/null +++ b/data-science-onramp/data-ingestion/requirements-test.txt @@ -0,0 +1 @@ +pytest==6.0.0 diff --git a/data-science-onramp/data-ingestion/requirements.txt b/data-science-onramp/data-ingestion/requirements.txt new file mode 100644 index 00000000000..b5edbdf1ad7 --- /dev/null +++ b/data-science-onramp/data-ingestion/requirements.txt @@ -0,0 +1,6 @@ +#grpcio==1.29.0 +#google-auth==1.16.0 +#google-auth-httplib2==0.0.3 +google-cloud-storage==1.28.1 +google-cloud-dataproc==2.0.0 +google-cloud-bigquery==1.25.0 diff --git a/data-science-onramp/data-ingestion/setup.py b/data-science-onramp/data-ingestion/setup.py new file mode 100644 index 00000000000..6921947ddca --- /dev/null +++ b/data-science-onramp/data-ingestion/setup.py @@ -0,0 +1,211 @@ +"""Setup Dataproc job for Data Science Onramp Sample Application +This job ingests an external gas prices in NY dataset as well as +takes a New York Citibike dataset available on BigQuery and +"dirties" the dataset before uploading it back to BigQuery +It needs the following arguments +* the name of the Google Cloud Storage bucket to be used +* the name of the BigQuery dataset to be created +* an optional --test flag to upload a subset of the dataset for testing +""" + +import random +import sys + +from google.cloud import bigquery +import pandas as pd +from py4j.protocol import Py4JJavaError +from pyspark.sql import SparkSession +from pyspark.sql.functions import date_format, expr, UserDefinedFunction, when +from pyspark.sql.types import FloatType, StringType, StructField, StructType + +TABLE = "bigquery-public-data.new_york_citibike.citibike_trips" +CITIBIKE_TABLE_NAME = "RAW_DATA" +EXTERNAL_TABLES = { + "gas_prices": { + "url": "https://data.ny.gov/api/views/wuxr-ni2i/rows.csv", + "schema": StructType( + [ + StructField("Date", StringType(), True), + StructField("New_York_State_Average_USD_per_Gal", FloatType(), True), + StructField("Albany_Average_USD_per_Gal", FloatType(), True), + StructField("Blinghamton_Average_USD_per_Gal", FloatType(), True), + StructField("Buffalo_Average_USD_per_Gal", FloatType(), True), + StructField("Nassau_Average_USD_per_Gal", FloatType(), True), + StructField("New_York_City_Average_USD_per_Gal", FloatType(), True), + StructField("Rochester_Average_USD_per_Gal", FloatType(), True), + StructField("Syracuse_Average_USD_per_Gal", FloatType(), True), + StructField("Utica_Average_USD_per_Gal", FloatType(), True), + ] + ), + }, +} + + +# START MAKING DATA DIRTY +def trip_duration(duration): + """Converts trip duration to other units""" + if not duration: + return None + seconds = f"{str(duration)} s" + minutes = f"{str(float(duration) / 60)} min" + hours = f"{str(float(duration) / 3600)} h" + return random.choices( + [seconds, minutes, hours, str(random.randint(-1000, -1))], + weights=[0.3, 0.3, 0.3, 0.1], + )[0] + + +def station_name(name): + """Replaces '&' with '/' with a 50% chance""" + if not name: + return None + return random.choice([name, name.replace("&", "/")]) + + +def user_type(user): + """Manipulates the user type string""" + if not user: + return None + return random.choice( + [ + user, + user.upper(), + user.lower(), + "sub" if user == "Subscriber" else user, + "cust" if user == "Customer" else user, + ] + ) + + +def gender(s): + """Manipulates the gender string""" + if not s: + return None + return random.choice( + [ + s.upper(), + s.lower(), + s[0].upper() if len(s) > 0 else "", + s[0].lower() if len(s) > 0 else "", + ] + ) + + +def convert_angle(angle): + """Converts long and lat to DMS notation""" + if not angle: + return None + degrees = int(angle) + minutes = int((angle - degrees) * 60) + seconds = int((angle - degrees - minutes / 60) * 3600) + new_angle = f"{degrees}\u00B0{minutes}'{seconds}\"" + return random.choices([str(angle), new_angle], weights=[0.55, 0.45])[0] + + +def create_bigquery_dataset(dataset_name): + # Create BigQuery Dataset + client = bigquery.Client() + dataset_id = f"{client.project}.{dataset_name}" + dataset = bigquery.Dataset(dataset_id) + dataset.location = "US" + dataset = client.create_dataset(dataset) + + +def write_to_bigquery(df, table_name, dataset_name): + """Write a dataframe to BigQuery""" + client = bigquery.Client() + dataset_id = f"{client.project}.{dataset_name}" + + # Saving the data to BigQuery + df.write.format("bigquery").option("table", f"{dataset_id}.{table_name}").save() + + print(f"Table {table_name} successfully written to BigQuery") + + +def main(): + # Get command line arguments + BUCKET_NAME = sys.argv[1] + DATASET_NAME = sys.argv[2] + + # Create a SparkSession under the name "setup" + spark = SparkSession.builder.appName("setup").getOrCreate() + + spark.conf.set("temporaryGcsBucket", BUCKET_NAME) + + create_bigquery_dataset(DATASET_NAME) + + # Whether we are running the job as a test + test = False + + # Check whether or not the job is running as a test + if "--test" in sys.argv: + test = True + print("A subset of the whole dataset will be uploaded to BigQuery") + else: + print("Results will be uploaded to BigQuery") + + # Ingest External Datasets + for table_name, data in EXTERNAL_TABLES.items(): + df = spark.createDataFrame(pd.read_csv(data["url"]), schema=data["schema"]) + + write_to_bigquery(df, table_name, DATASET_NAME) + + # Check if table exists + try: + df = spark.read.format("bigquery").option("table", TABLE).load() + # if we are running a test, perform computations on a subset of the data + if test: + df = df.sample(False, 0.00001) + except Py4JJavaError: + print(f"{TABLE} does not exist. ") + return + + # Declare dictionary with keys column names and values user defined + # functions and return types + udf_map = { + "tripduration": (trip_duration, StringType()), + "start_station_name": (station_name, StringType()), + "start_station_latitude": (convert_angle, StringType()), + "start_station_longitude": (convert_angle, StringType()), + "end_station_name": (station_name, StringType()), + "end_station_latitude": (convert_angle, StringType()), + "end_station_longitude": (convert_angle, StringType()), + "usertype": (user_type, StringType()), + "gender": (gender, StringType()), + } + + # Declare which columns to set some values to null randomly + null_columns = [ + "tripduration", + "starttime", + "stoptime", + "start_station_latitude", + "start_station_longitude", + "end_station_latitude", + "end_station_longitude", + ] + + # Dirty the columns + for name, udf in udf_map.items(): + df = df.withColumn(name, UserDefinedFunction(*udf)(name)) + + # Format the datetimes correctly + for name in ["starttime", "stoptime"]: + df = df.withColumn(name, date_format(name, "yyyy-MM-dd'T'HH:mm:ss")) + + # Randomly set about 5% of the values in some columns to null + for name in null_columns: + df = df.withColumn(name, when(expr("rand() < 0.05"), None).otherwise(df[name])) + + # Duplicate about 0.01% of the rows + dup_df = df.sample(True, 0.0001) + + # Create final dirty dataframe + df = df.union(dup_df) + + print("Uploading citibike dataset...") + write_to_bigquery(df, CITIBIKE_TABLE_NAME, DATASET_NAME) + + +if __name__ == "__main__": + main() diff --git a/data-science-onramp/data-ingestion/setup.sh b/data-science-onramp/data-ingestion/setup.sh new file mode 100755 index 00000000000..2c4773f7272 --- /dev/null +++ b/data-science-onramp/data-ingestion/setup.sh @@ -0,0 +1,9 @@ +# Submit a PySpark job via the Cloud Dataproc Jobs API +# Requires having CLUSTER_NAME and BUCKET_NAME set as +# environment variables + +gcloud dataproc jobs submit pyspark \ + --cluster ${CLUSTER_NAME} \ + --jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \ + --driver-log-levels root=FATAL \ + setup.py -- ${BUCKET_NAME} new_york_citibike_trips diff --git a/data-science-onramp/data-ingestion/setup_test.py b/data-science-onramp/data-ingestion/setup_test.py new file mode 100644 index 00000000000..c325b5a0e98 --- /dev/null +++ b/data-science-onramp/data-ingestion/setup_test.py @@ -0,0 +1,212 @@ +"""Test file for the setup job in the Data Science Onramp sample application Creates a test Dataproc cluster and runs the job with a --test flag. +The job uploads a subset of the data to BigQuery. +Then, data is pulled from BigQuery and checks are made to see if the data is dirty. +""" + +import os +import re +import uuid + +from google.cloud import bigquery +from google.cloud import dataproc_v1 as dataproc +from google.cloud import storage +import pytest + +# GCP Project +PROJECT_ID = os.environ["GOOGLE_CLOUD_PROJECT"] +TEST_ID = uuid.uuid4() + +# Google Cloud Storage constants +BUCKET_NAME = f"setup-test-{TEST_ID}" +BUCKET_BLOB = "setup.py" + +BQ_DATASET = f"setup-test-{TEST_ID}".replace("-", "_") +BQ_CITIBIKE_TABLE = "RAW_DATA" +BQ_TABLES = [ + BQ_CITIBIKE_TABLE, + "gas_prices", +] + +# Dataproc constants +DATAPROC_CLUSTER = f"setup-test-{TEST_ID}" +CLUSTER_REGION = "us-central1" +CLUSTER_IMAGE = "1.5.4-debian10" +CLUSTER_CONFIG = { # Dataproc cluster configuration + "project_id": PROJECT_ID, + "cluster_name": DATAPROC_CLUSTER, + "config": { + "gce_cluster_config": {"zone_uri": ""}, + "master_config": {"num_instances": 1, "machine_type_uri": "n1-standard-8"}, + "worker_config": {"num_instances": 6, "machine_type_uri": "n1-standard-8"}, + "software_config": { + "image_version": CLUSTER_IMAGE, + "optional_components": [5], + }, + }, +} +DATAPROC_JOB = { # Dataproc job configuration + "placement": {"cluster_name": DATAPROC_CLUSTER}, + "pyspark_job": { + "main_python_file_uri": f"gs://{BUCKET_NAME}/{BUCKET_BLOB}", + "args": [BUCKET_NAME, BQ_DATASET, "--test"], + "jar_file_uris": ["gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"], + }, +} + + +@pytest.fixture(autouse=True) +def setup_and_teardown_cluster(): + # Create cluster using cluster client + cluster_client = dataproc.ClusterControllerClient( + client_options={"api_endpoint": f"{CLUSTER_REGION}-dataproc.googleapis.com:443"} + ) + + operation = cluster_client.create_cluster( + project_id=PROJECT_ID, region=CLUSTER_REGION, cluster=CLUSTER_CONFIG + ) + + # Wait for cluster to provision + operation.result() + + yield + + # Delete cluster + operation = cluster_client.delete_cluster( + project_id=PROJECT_ID, region=CLUSTER_REGION, cluster_name=DATAPROC_CLUSTER + ) + operation.result() + + +@pytest.fixture(autouse=True) +def setup_and_teardown_bucket(): + # Create GCS Bucket + storage_client = storage.Client() + bucket = storage_client.create_bucket(BUCKET_NAME) + + # Upload file + blob = bucket.blob(BUCKET_BLOB) + blob.upload_from_filename("setup.py") + + yield + + # Delete GCS bucket + bucket = storage_client.get_bucket(BUCKET_NAME) + bucket.delete(force=True) + + +@pytest.fixture(autouse=True) +def setup_and_teardown_bq_dataset(): + # Dataset is created by the client + bq_client = bigquery.Client(project=PROJECT_ID) + + yield + + # Delete Dataset + bq_client.delete_dataset(BQ_DATASET, delete_contents=True) + + +def get_blob_from_path(path): + bucket_name = re.search("dataproc.+?/", path).group(0)[0:-1] + bucket = storage.Client().get_bucket(bucket_name) + output_location = re.search("google-cloud-dataproc.+", path).group(0) + return bucket.blob(output_location) + + +def get_dataproc_job_output(result): + """Get the dataproc job logs in plain text""" + output_location = result.driver_output_resource_uri + ".000000000" + blob = get_blob_from_path(output_location) + return blob.download_as_string().decode("utf-8") + + +def assert_table_success_message(table_name, out): + """Check table upload success message was printed in job logs.""" + assert re.search( + f"Table {table_name} successfully written to BigQuery", out + ), f"Table {table_name} sucess message not printed in job logs" + + +def test_setup(): + """Test setup.py by submitting it to a dataproc cluster + Check table upload success message as well as data in the table itself""" + + # Submit job to dataproc cluster + job_client = dataproc.JobControllerClient( + client_options={"api_endpoint": f"{CLUSTER_REGION}-dataproc.googleapis.com:443"} + ) + response = job_client.submit_job_as_operation( + project_id=PROJECT_ID, region=CLUSTER_REGION, job=DATAPROC_JOB + ) + + # Wait for job to complete + result = response.result() + + # Get job output + out = get_dataproc_job_output(result) + + # Check logs to see if tables were uploaded + for table_name in BQ_TABLES: + assert_table_success_message(table_name, out) + + # Query BigQuery Table + client = bigquery.Client() + + dms_regex = "-?[0-9]+\u00B0-?[0-9]+'-?[0-9]+\"" + + regex_dict = { + "tripduration": [ + "(\\d+(?:\\.\\d+)?) s", + "(\\d+(?:\\.\\d+)?) min", + "(\\d+(?:\\.\\d+)?) h", + ], + "gender": [ + "f", + "F", + "m", + "M", + "u", + "U", + "male", + "MALE", + "female", + "FEMALE", + "unknown", + "UNKNOWN", + ], + "start_station_latitude": [dms_regex], + "start_station_longitude": [dms_regex], + "end_station_latitude": [dms_regex], + "end_station_longitude": [dms_regex], + "usertype": [ + "Subscriber", + "subscriber", + "SUBSCRIBER", + "sub", + "Customer", + "customer", + "CUSTOMER", + "cust", + ], + } + + for column_name, regexes in regex_dict.items(): + query = ( + f"SELECT {column_name} FROM `{PROJECT_ID}.{BQ_DATASET}.{BQ_CITIBIKE_TABLE}`" + ) + query_job = client.query(query) + + result = query_job.result() + + rows = [] + for row in result: + rows.append(row[column_name]) + + for regex in regexes: + found = False + for row in rows: + if row and re.match(f"\\A{regex}\\Z", row): + found = True + break + assert ( + found + ), f'No matches to regular expression "{regex}" found in column {column_name}'