8000 add data ingestion code by vuppalli · Pull Request #1 · leahecole/python-docs-samples · GitHub
[go: up one dir, main page]

Skip to content

add data ingestion code #1

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 65 commits into
base: master
Choose a base branch
from
Open

add data ingestion code #1

wants to merge 65 commits into from

Conversation

vuppalli
Copy link
Collaborator
@vuppalli vuppalli commented Jun 5, 2020

The dirty data script and corresponding test file are in the data-ingestion folder. We are looking forward to you feedback :)

Copy link
Collaborator
@bradmiro bradmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Excellent work! I left some comments here for ya, feel free to add your own comments if anything I suggested didn't make sense.

@@ -0,0 +1,149 @@
from random import choice, choices, randint, seed
Copy link
Collaborator
@bradmiro bradmiro Jun 5, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since you have several other helper functions that you've written, it might be harder to distinguish between a function imported here and one of the ones you've written. I might suggest just doing "import random" and calling "random.func" to make this a bit clearer.

Comment on lines 22 to 26
try:
sys.argv[2]
upload = False
except IndexError:
print("Results will be uploaded to BigQuery")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might be better to replace this by checking the length of the array instead. (if array length > 1 then...)

'''Manipulates the gender string'''
return choice([s, s.upper(), s.lower(),
s[0] if len(s) > 0 else "",
s[0].lower() if len(s) > 0 else ""])
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Check the formatting here, should all be lined up.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running black on everything will be a fast format fix

@@ -0,0 +1,6 @@
# Submit a PySpark job via the Cloud Dataproc Jobs API
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would add a comment at the top with something along the lines of "requires having CLUSTER_NAME and BUCKET_NAME set in your environment"

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

Comment on lines 33 to 38
try:
operation = cluster_client.delete_cluster(project, region,
cluster_name)
operation.result()
except GoogleAPICallError:
pass
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would not catch this. If it fails it should fail loudly.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 - doing an exception of a pass is a no-no - even if you want to ignore an error, you should let the user know you're doing something.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an addition - I'm not sure what you're checking for here. Is this the failure that's thrown when it can't delete it because it's already been deleted? If that's the case, add a clarifying comment and instead of the pass, a print statement that says "Cluster already deleted"

Comment on lines 136 to 151
result = job_client.submit_job(project_id=project, region=region,
job=job_details)

job_id = result.reference.job_id
print('Submitted job \"{}\".'.format(job_id))

# Wait for job to complete
wait_for_job(job_client, job_id)

# Get job output
cluster_info = cluster_client.get_cluster(project, region, cluster_name)
bucket = storage_client.get_bucket(cluster_info.config.config_bucket)
output_blob = (
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000'
.format(cluster_info.cluster_uuid, job_id))
out = bucket.blob(output_blob).download_as_string().decode("utf-8")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The way to do this has actually JUST been updated about a week ago to be a bit easier to work with. Apologies as I haven't had the chance to update the samples yet. You can use this method

Something like this should work (might need tweaking):

operation = job_client.submit_job_as_operation(project_id=project, region=region,
                                   job=job_details)

# This will wait for the job to finish before continuing.
result = operation.result() 

output_location = result.driver_output_resource_uri + ".000000000"
output = bucket.blob(output_location).download_as_string().decode("utf-8")

Comment on lines 190 to 210
def callback(operation_future):
'''Sets a flag to stop waiting'''
global waiting_cluster_callback
waiting_cluster_callback = False


def wait_for_cluster_creation():
'''Waits for cluster to create'''
while True:
if not waiting_cluster_callback:
break


def wait_for_job(job_client, job_id):
'''Waits for job to finish'''
while True:
job = job_client.get_job(project, region, job_id)
assert job.status.State.Name(job.status.state) != "ERROR"

if job.status.State.Name(job.status.state) == "DONE":
return
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per my earlier comment, you can delete this.

pass


def test_setup(capsys):
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All of your setup code should go into your teardown function above (perhaps renamed to something like "setup_teardown". Based on wherever you put the yield, all code above it will run BEFORE executing the test, and all code after it will run AFTER. See here: dataproc_quickstart

In your code, I would create the cluster and GCS buckets in your setup/teardown function. I would leave the test function itself to only include elements of the test that involve submitting the actual job.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah yep - I see Brad called this out already 👍

Comment on lines 105 to 110
cluster = cluster_client.create_cluster(project, region, cluster_data)
cluster.add_done_callback(callback)

# Wait for cluster to provision
global waiting_cluster_callback
waiting_cluster_callback = True
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The docs are slightly misleading, as you can actually just do the following to accomplish the same thing:

operation = cluster_client.create_cluster(project, region, cluster_data)
result = operation.result() #This is blocking and will not proceed with the rest of the code until completion.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I defer to Brad on this for how to actually do it, but I'd like to see this accomplished without a global

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be ok to be removed altogether

Comment on lines 83 to 95
def dirty_data(proc_func, allow_none):
'''Master function returns a user defined function
that transforms the column data'''
def udf(col_value):
seed(hash(col_value) + time_ns())
if col_value is None:
return col_value
elif allow_none:
return random_select([None, proc_func(col_value)],
cum_weights=[0.05, 1])
else:
return proc_func(col_value)
return udf
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this require having a nested function?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a nested function here because a User Defined Function can only take 1 argument (the column value). If we do not have this nested function, we will have repetitive code of calling the UDF for each column or have an unwieldy list comprehension. Which option do you think is best?

Copy link
Owner
@leahecole leahecole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Awesome first pass - it's great to have you coding and to see what you've come up with. Also, pytest fixtures are NOT easy so def give yourselves a pat on the back for getting fixtures that run at all! :)

I think that setup.py can probably be reorganized in a way that makes the test much simpler to run. I'd like to see your code in setup.py laid out as follows:

  • global variables at the top
  • helper functions, probably should be private functions (preceded by an underscore in their name)
  • some kind of main function, though it doesn't have to be called main, that goes through and executes everything needed to dirty the data - the BQ checks you have at the beginning, the actual data dirtying, and the saving of the results.

Then, when you are testing, your will have

  • fixture to create/teardown the dataproc cluster

  • fixture to create/teardown the gcs bucket (possibly including uploading what you need to upload

  • test that executes your "main" function (or whatever it's called) and calls assertions on the output

  • Nit - setup-test.py should be renamed setup_test.py to match the conventions of the rest of the upstream repo

project = os.environ['GCLOUD_PROJECT']
region = "us-central1"
zone = "us-central1-a"
cluster_name = 'setup-test-{}'.format(str(uuid.uuid4()))
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit - for the next two lines, f-strings are now preferred Python practice over .format.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BUT YAY FOR UUIDS


# Set global variables
project = os.environ['GCLOUD_PROJECT']
region = "us-central1"
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider making the region + zone environment variables as well? Or include a TODO for users to update these to reflect their project? Not all folks are US based.

@bradmiro I know some products just straight up aren't available in other regions/zones, is Dataproc one?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've personally always hard-coded regions into tests as these don't typically make their way into tutorials. For the cluster to be used in the tutorial, I think a set of steps showing how to create a Dataproc cluster in a preferred region is appropriate.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I believe Dataproc is available in any region that Compute instances are available.


waiting_cluster_callback = False

# Set global variables
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Python style thing - global variables should be all caps (project -> PROJECT, region -> REGION, zone -> ZONE)



@pytest.fixture(autouse=True)
def teardown():
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This fixture only does teardown, but I think it should be renamed and actually should be a setup and a teardown fixture, with cluster creation happening above the yield, and the ID or name of the created cluster (or maybe the operation?) being passed to the test

Comment on lines 33 to 38
try:
operation = cluster_client.delete_cluster(project, region,
cluster_name)
operation.result()
except GoogleAPICallError:
pass
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As an addition - I'm not sure what you're checking for here. Is this the failure that's thrown when it can't delete it because it's already been deleted? If that's the case, add a clarifying comment and instead of the pass, a print statement that says "Cluster already deleted"

'''Manipulates the gender string'''
return choice([s, s.upper(), s.lower(),
s[0] if len(s) > 0 else "",
s[0].lower() if len(s) > 0 else ""])
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

running black on everything will be a fast format fix

s[0].lower() if len(s) > 0 else ""])


def convertAngle(angle):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - snake case convertAngle -> convert_angle

new_angle = str(degrees) + u"\u00B0" + \
str(minutes) + "'" + str(seconds) + '"'
return random_select([str(angle), new_angle], cum_weights=[0.55, 1])

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit spell out cumulative

@@ -0,0 +1,6 @@
# Submit a PySpark job via the Cloud Dataproc Jobs API
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

return choice([name, name.replace("&", "/")])


def usertype(user):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - snake case usertype -> user_type

@vuppalli
Copy link
Collaborator Author
vuppalli commented Jun 8, 2020
  • t

Thank you for these comments! The setup.py file is never run locally (only on the dataproc clusters) so we cannot call the main function specifically because we submit the entire file as a job. But, we can still restructure the setup.py file into multiple functions for easier readability.

Copy link
Owner
@leahecole leahecole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I focused more on the testing and am leaving some of the Spark stuff up to Brad for now

.gitignore Outdated
@@ -27,3 +27,4 @@ credentials.dat
.DS_store
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No committing the gitignore plz

return udf


def id(x):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit - if this is bike_id, can we call it bike id? id is very generic otherwise

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would also add that there should be a brief comment of why a function that just returns its input value is necessary.


gcloud dataproc jobs submit pyspark \
--cluster ${CLUSTER_NAME} \
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradmiro how often would this jar change? I'm worried about this shell script going stale when the jar is updated

Copy link
Collaborator
@bradmiro bradmiro Jun 9, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not often at all. I haven't found a reliable way to dynamically call this jar, but transitioning to new Scala versions is not frequent.

storage_client = storage.Client()
BUCKET = storage_client.create_bucket(BUCKET_NAME)

yield
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

instead of using a global variable BUCKET, you can yield the bucket itself, then pass the fixture to the test as an argument. So, in line 87 you'd have

bucket = storage_client.create_bucket(BUCKET_NAME)
yield bucket

then in the test itself, you'd call
def test_setup(capsys,setup_and_teardown_bucket )

and replace every other instance of BUCKET with setup_and_teardown_bucket.

That said, it might be worth renaming the fixture to test_bucket or something similar that you like, since that's what you're yielding.

BUCKET = None


@pytest.fixture(autouse=True)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From what I can tell, this fixture works without yielding anything, but you could also get rid of the autouse and instead yield the cluster name, which you'd then call as an argument in the definition of the test function (See below comment for the GCS example)

Afaik, it's the same result, different method. Not experienced enough with pytest to know what best practice in this case is.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like we need to yield to separate the setup code from the teardown code for the cluster. And, we do not need to yield the cluster name because it is a global variable that uses a uuid.

assert "null" in out


def get_blob_from_path(path):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this a helper function? If so, it should be at the top

df = spark.read.format('bigquery').option('table', TABLE).load()
except Py4JJavaError:
print(f"{TABLE} does not exist. ")
sys.exit(0)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can replace sys.exit(0) with return to achieve the same effect albeit potentially more gracefully now you've now wrapped this into a function.

Comment on lines 148 to 151
new_df.sample(False, 0.0001, seed=50).show(n=100)

# Duplicate about 0.01% of the rows
dup_df = new_df.sample(True, 0.0001, seed=42)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the intention behind hard-coding the seeds?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no intention, we just thought it might be better for this to be deterministic. We'll remove them in the next revision since there is no reason to hard-code them.

'cluster_name': CLUSTER_NAME,
'config': {
'gce_cluster_config': {
'zone_uri': zone_uri,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can use 'zone_uri': '' to have Dataproc automatically select a zone. This is generally preferred unless you have a reason for needing to use a particular zone.

Comment on lines 73 to 75
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': f'{REGION}-dataproc.googleapis.com:443'
})
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will persist from above, you don't need it twice.


# Create cluster using cluster client
cluster_client = dataproc.ClusterControllerClient(client_options={
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: use f string

Comment on lines 177 to 181
def get_blob_from_path(path):
bucket_name = re.search("dataproc.+?/", path).group(0)[0:-1]
bucket = storage.Client().get_bucket(bucket_name)
output_location = re.search("google-cloud-dataproc.+", path).group(0)
return bucket.blob(output_location)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would either inline this in the section of the code where you reference it, or move it up to the top of the test file.

'''Tests setup.py by submitting it to a dataproc cluster'''

# Upload file
destination_blob_name = "setup.py"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Move to top-level as a "final" capitalized field

Comment on lines 102 to 103
blob = BUCKET.blob(destination_blob_name)
blob.upload_from_filename("setup.py")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would do this in your setup steps

Comment on lines 105 to 122
job_file_name = "gs://" + BUCKET_NAME + "/setup.py"

# Create job configuration
job_details = {
'placement': {
'cluster_name': CLUSTER_NAME
},
'pyspark_job': {
'main_python_file_uri': job_file_name,
'args': [
BUCKET_NAME,
"--test",
],
"jar_file_uris": [
"gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar"
],
},
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would move this to the top level as a "final" variable, or make a function "get_job" that just returns this

}

# Submit job to dataproc cluster
job_client = dataproc.JobControllerClient(client_options={
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I write tests, I personally prefer that the tests have as little code as possible in them. I think if you start the code here and move everything either into "final" variables or in your setup function, it ends up being a bit more succinct.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't think this test has much code in it - most of it is asserts, which we could cut down on if you like.

Copy link
Collaborator
@bradmiro bradmiro Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the test is now fine as is in relation to this specific thread. My original point was more towards moving configs and such out of the file.

degrees = int(angle)
minutes = int((angle - degrees) * 60)
seconds = int((angle - degrees - minutes/60) * 3600)
new_angle = str(degrees) + u"\u00B0" + \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In Python3, all non-byte strings are unicode. You can safely change u\u00b0 to \u00b0.

# Declare data transformations for each column in dataframe
udfs = [
(dirty_data(trip_duration, True), StringType()), # tripduration
(dirty_data(identity, True), StringType()), # starttime
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless the identity function is used elsewhere within the job, you can replace it by using a lambda function _ = lambda x: x that you declare just before you create udfs. You can then replace every instance of identity with _.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unfortunately, when I run nox -s lint, the linter complains about lambda functions and says to use def instead. What do you think I should do?

Copy link
Collaborator
@bradmiro bradmiro Jun 10, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(echoing my comment from standup) You should instead be able to inline the lambdas instead: dirty_data(lambda x: x, True), StringType()),

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we talked about this at standup but I forgot to mention for future reference, if Brad or I don't have a good answer, this is a great thing to ask at Python Samples Office Hours! Or to ask me to ask the other samples owners if samples office hours are awhile away.

Comment on lines 1 to 12
import random
import sys

from time import time_ns

from google.cloud import bigquery

from py4j.protocol import Py4JJavaError
from pyspark.sql import SparkSession

from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType, StringType
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Comment on lines 98 to 101
< F438 tr>
spark.conf.set('temporaryGcsBucket', BUCKET_NAME)

df.write.format('bigquery') \
.option('table', dataset_id + ".RAW_DATA") \
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: you can move the temporary bucket into the write operation to avoid editing the Spark conf:

df.write
    .format("bigquery")
    .option("table", dataset_id + ".RAW_DATA")
    .option("temporaryGcsBucket", BUCKET_NAME)
    .save()

# Declare data transformations for each column in dataframe
udfs = [
(dirty_data(trip_duration, True), StringType()), # tripduration
(dirty_data(identity, True), StringType()), # starttime
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I know we talked about this at standup but I forgot to mention for future reference, if Brad or I don't have a good answer, this is a great thing to ask at Python Samples Office Hours! Or to ask me to ask the other samples owners if samples office hours are awhile away.

}

# Submit job to dataproc cluster
job_client = dataproc.JobControllerClient(client_options={
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TBH I don't think this test has much code in it - most of it is asserts, which we could cut down on if you like.

bucket.delete(force=True)


def get_blob_from_path(path):
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this function needed to get the bucket_name when you can yield it from the fixture?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function is actually getting a different bucket (the Dataproc job output). Also, it is here to convert the URL into a blob so that it can be downloaded as a string. The bucket created from the fixture is used to upload our script.

yield

# Delete cluster
operation = cluster_client.delete_cluster(PROJECT, REGION,
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What happens if the cluster isn't found? You made need to add a try/except here. @bradmiro plz chime in if there's a best practice

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically just let this fail loudly. In any instance where the cluster isn't properly deleted, it is usually indicative of other problems.

assert re.search("[0-9] h", out)

# station latitude & longitude
assert re.search(u"\u00B0" + "[0-9]+\'[0-9]+\"", out)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: remove the unicode u

@@ -46,12 +43,6 @@
'num_instances': 6,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the runtime change drastically if you change this to 4 or 8?

Comment on lines 111 to 120
def print_df(df, table_name):
'''Print 20 rows from dataframe and a random sample'''
# first 100 rows for smaller tables
df.show()

# random sample for larger tables
# for small tables this will be empty
df.sample(True, 0.0001).show(n=500, truncate=False)

print(f"Table {table_name} printed")
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you think printing the external datasets as well as the dirty one will affect some assert statements in our test script?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as what your asserting is in out somewhere, I don't think it should matter

@@ -91,12 +127,25 @@ def main():
upload = True # Whether to upload data to BigQuery

# Check whether or not results should be uploaded
if len(sys.argv) > 2:
if '--test' in sys.argv:
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we changed this to --dry-run?

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

RAW_TABLE_NAME = "RAW_DATA"
DATASET_NAME = "data_science_onramp"
RAW_TABLE_NAME = "new_york_citibike_trips"
EXTERNAL_DATASETS = {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of having an external datasets dictionary, can we just call it datasets and group in the citibike one too? Asking because I see that there is some repetitive code below (e.g. if upload: write_to_bigquery ...)? We might need to make some tweaks if we do this because it does not have a URL so not sure if it's worth it. I'm curious to see what everyone thinks!

@@ -118,6 +125,10 @@ def test_setup():
blob = get_blob_from_path(output_location)
out = blob.download_as_string().decode("utf-8")

# check that tables were printed
for table_name in TABLE_NAMES:
assert table_printed(table_name, out)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be its own function if its only being used once? Can we not do assert re.search(f"Table {table_name} printed" in out?

@@ -118,6 +125,10 @@ def test_setup():
blob = get_blob_from_path(output_location)
out = blob.download_as_string().decode("utf-8")

# check that tables were printed
for table_name in TABLE_NAMES:
assert table_printed(table_name, out)
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there another way to test that the table actually printed instead of checking that the print statement you added is present? There could be a possibility where the table actually did not print but your print statement did which is not a sufficient check.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the df.show prints column names, you could check for those

Comment on lines 111 to 120
def print_df(df, table_name):
'''Print 20 rows from dataframe and a random sample'''
# first 100 rows for smaller tables
df.show()

# random sample for larger tables
# for small tables this will be empty
df.sample(True, 0.0001).show(n=500, truncate=False)

print(f"Table {table_name} printed")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As long as what your asserting is in out somewhere, I don't think it should matter

@@ -91,12 +127,25 @@ def main():
upload = True # Whether to upload data to BigQuery

# Check whether or not results should be uploaded
if len(sys.argv) > 2:
if '--test' in sys.argv:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

# Check if table exists
try:
df = spark.read.format('bigquery').option('table', TABLE).load()
except Py4JJavaError:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a short comment explaining why we'd return if this happens (if I understand correctly, this error will happen when you do the --dry-run option)

# Create final dirty dataframe
df = df.union(dup_df)

if upload:
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's the difference between this upload and write to bigquery and the other?

You may want to consider also making this a function because it's repeated code.

@@ -118,6 +125,10 @@ def test_setup():
blob = get_blob_from_path(output_location)
out = blob.download_as_string().decode("utf-8")

# check that tables were printed
for table_name in TABLE_NAMES:
assert table_printed(table_name, out)
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the df.show prints column names, you could check for those

@bradmiro
Copy link
Collaborator

This needs a rebase (can help with this). Can we rebase and then merge?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants
0