-
Notifications
You must be signed in to change notification settings - Fork 1
add data ingestion code #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Excellent work! I left some comments here for ya, feel free to add your own comments if anything I suggested didn't make sense.
@@ -0,0 +1,149 @@ | |||
from random import choice, choices, randint, seed |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since you have several other helper functions that you've written, it might be harder to distinguish between a function imported here and one of the ones you've written. I might suggest just doing "import random" and calling "random.func" to make this a bit clearer.
try: | ||
sys.argv[2] | ||
upload = False | ||
except IndexError: | ||
print("Results will be uploaded to BigQuery") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be better to replace this by checking the length of the array instead. (if array length > 1 then...)
'''Manipulates the gender string''' | ||
return choice([s, s.upper(), s.lower(), | ||
s[0] if len(s) > 0 else "", | ||
s[0].lower() if len(s) > 0 else ""]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Check the formatting here, should all be lined up.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
running black
on everything will be a fast format fix
@@ -0,0 +1,6 @@ | |||
# Submit a PySpark job via the Cloud Dataproc Jobs API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would add a comment at the top with something along the lines of "requires having CLUSTER_NAME and BUCKET_NAME set in your environment"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
try: | ||
operation = cluster_client.delete_cluster(project, region, | ||
cluster_name) | ||
operation.result() | ||
except GoogleAPICallError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would not catch this. If it fails it should fail loudly.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 - doing an exception of a pass is a no-no - even if you want to ignore an error, you should let the user know you're doing something.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an addition - I'm not sure what you're checking for here. Is this the failure that's thrown when it can't delete it because it's already been deleted? If that's the case, add a clarifying comment and instead of the pass, a print statement that says "Cluster already deleted"
result = job_client.submit_job(project_id=project, region=region, | ||
job=job_details) | ||
|
||
job_id = result.reference.job_id | ||
print('Submitted job \"{}\".'.format(job_id)) | ||
|
||
# Wait for job to complete | ||
wait_for_job(job_client, job_id) | ||
|
||
# Get job output | ||
cluster_info = cluster_client.get_cluster(project, region, cluster_name) | ||
bucket = storage_client.get_bucket(cluster_info.config.config_bucket) | ||
output_blob = ( | ||
'google-cloud-dataproc-metainfo/{}/jobs/{}/driveroutput.000000000' | ||
.format(cluster_info.cluster_uuid, job_id)) | ||
out = bucket.blob(output_blob).download_as_string().decode("utf-8") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The way to do this has actually JUST been updated about a week ago to be a bit easier to work with. Apologies as I haven't had the chance to update the samples yet. You can use this method
Something like this should work (might need tweaking):
operation = job_client.submit_job_as_operation(project_id=project, region=region,
job=job_details)
# This will wait for the job to finish before continuing.
result = operation.result()
output_location = result.driver_output_resource_uri + ".000000000"
output = bucket.blob(output_location).download_as_string().decode("utf-8")
def callback(operation_future): | ||
'''Sets a flag to stop waiting''' | ||
global waiting_cluster_callback | ||
waiting_cluster_callback = False | ||
|
||
|
||
def wait_for_cluster_creation(): | ||
'''Waits for cluster to create''' | ||
while True: | ||
if not waiting_cluster_callback: | ||
break | ||
|
||
|
||
def wait_for_job(job_client, job_id): | ||
'''Waits for job to finish''' | ||
while True: | ||
job = job_client.get_job(project, region, job_id) | ||
assert job.status.State.Name(job.status.state) != "ERROR" | ||
|
||
if job.status.State.Name(job.status.state) == "DONE": | ||
return |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Per my earlier comment, you can delete this.
pass | ||
|
||
|
||
def test_setup(capsys): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
All of your setup code should go into your teardown
function above (perhaps renamed to something like "setup_teardown". Based on wherever you put the yield
, all code above it will run BEFORE executing the test, and all code after it will run AFTER. See here: dataproc_quickstart
In your code, I would create the cluster and GCS buckets in your setup/teardown function. I would leave the test function itself to only include elements of the test that involve submitting the actual job.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah yep - I see Brad called this out already 👍
cluster = cluster_client.create_cluster(project, region, cluster_data) | ||
cluster.add_done_callback(callback) | ||
|
||
# Wait for cluster to provision | ||
global waiting_cluster_callback | ||
waiting_cluster_callback = True |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The docs are slightly misleading, as you can actually just do the following to accomplish the same thing:
operation = cluster_client.create_cluster(project, region, cluster_data)
result = operation.result() #This is blocking and will not proceed with the rest of the code until completion.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I defer to Brad on this for how to actually do it, but I'd like to see this accomplished without a global
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be ok to be removed altogether
def dirty_data(proc_func, allow_none): | ||
'''Master function returns a user defined function | ||
that transforms the column data''' | ||
def udf(col_value): | ||
seed(hash(col_value) + time_ns()) | ||
if col_value is None: | ||
return col_value | ||
elif allow_none: | ||
return random_select([None, proc_func(col_value)], | ||
cum_weights=[0.05, 1]) | ||
else: | ||
return proc_func(col_value) | ||
return udf |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this require having a nested function?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We have a nested function here because a User Defined Function can only take 1 argument (the column value). If we do not have this nested function, we will have repetitive code of calling the UDF for each column or have an unwieldy list comprehension. Which option do you think is best?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Awesome first pass - it's great to have you coding and to see what you've come up with. Also, pytest fixtures are NOT easy so def give yourselves a pat on the back for getting fixtures that run at all! :)
I think that setup.py can probably be reorganized in a way that makes the test much simpler to run. I'd like to see your code in setup.py
laid out as follows:
- global variables at the top
- helper functions, probably should be private functions (preceded by an underscore in their name)
- some kind of
main
function, though it doesn't have to be called main, that goes through and executes everything needed to dirty the data - the BQ checks you have at the beginning, the actual data dirtying, and the saving of the results.
Then, when you are testing, your will have
-
fixture to create/teardown the dataproc cluster
-
fixture to create/teardown the gcs bucket (possibly including uploading what you need to upload
-
test that executes your "main" function (or whatever it's called) and calls assertions on the output
-
Nit -
setup-test.py
should be renamedsetup_test.py
to match the conventions of the rest of the upstream repo
project = os.environ['GCLOUD_PROJECT'] | ||
region = "us-central1" | ||
zone = "us-central1-a" | ||
cluster_name = 'setup-test-{}'.format(str(uuid.uuid4())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit - for the next two lines, f-strings are now preferred Python practice over .format.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
BUT YAY FOR UUIDS
|
||
# Set global variables | ||
project = os.environ['GCLOUD_PROJECT'] | ||
region = "us-central1" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should we consider making the region + zone environment variables as well? Or include a TODO for users to update these to reflect their project? Not all folks are US based.
@bradmiro I know some products just straight up aren't available in other regions/zones, is Dataproc one?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've personally always hard-coded regions into tests as these don't typically make their way into tutorials. For the cluster to be used in the tutorial, I think a set of steps showing how to create a Dataproc cluster in a preferred region is appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe Dataproc is available in any region that Compute instances are available.
|
||
waiting_cluster_callback = False | ||
|
||
# Set global variables |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Python style thing - global variables should be all caps (project -> PROJECT, region -> REGION, zone -> ZONE)
|
||
|
||
@pytest.fixture(autouse=True) | ||
def teardown(): |
There was a problem hidi 10000 ng this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This fixture only does teardown, but I think it should be renamed and actually should be a setup and a teardown fixture, with cluster creation happening above the yield, and the ID or name of the created cluster (or maybe the operation?) being passed to the test
try: | ||
operation = cluster_client.delete_cluster(project, region, | ||
cluster_name) | ||
operation.result() | ||
except GoogleAPICallError: | ||
pass |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As an addition - I'm not sure what you're checking for here. Is this the failure that's thrown when it can't delete it because it's already been deleted? If that's the case, add a clarifying comment and instead of the pass, a print statement that says "Cluster already deleted"
'''Manipulates the gender string''' | ||
return choice([s, s.upper(), s.lower(), | ||
s[0] if len(s) > 0 else "", | ||
s[0].lower() if len(s) > 0 else ""]) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
running black
on everything will be a fast format fix
s[0].lower() if len(s) > 0 else ""]) | ||
|
||
|
||
def convertAngle(angle): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - snake case convertAngle
-> convert_angle
new_angle = str(degrees) + u"\u00B0" + \ | ||
str(minutes) + "'" + str(seconds) + '"' | ||
return random_select([str(angle), new_angle], cum_weights=[0.55, 1]) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit spell out cumulative
@@ -0,0 +1,6 @@ | |||
# Submit a PySpark job via the Cloud Dataproc Jobs API |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
return choice([name, name.replace("&", "/")]) | ||
|
||
|
||
def usertype(user): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - snake case usertype
-> user_type
Thank you for these comments! The setup.py file is never run locally (only on the dataproc clusters) so we cannot call the main function specifically because we submit the entire file as a job. But, we can still restructure the setup.py file into multiple functions for easier readability. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I focused more on the testing and am leaving some of the Spark stuff up to Brad for now
.gitignore
Outdated
@@ -27,3 +27,4 @@ credentials.dat | |||
.DS_store |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No committing the gitignore plz
return udf | ||
|
||
|
||
def id(x): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit - if this is bike_id, can we call it bike id? id is very generic otherwise
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would also add that there should be a brief comment of why a function that just returns its input value is necessary.
|
||
gcloud dataproc jobs submit pyspark \ | ||
--cluster ${CLUSTER_NAME} \ | ||
--jars gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bradmiro how often would this jar change? I'm worried about this shell script going stale when the jar is updated
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not often at all. I haven't found a reliable way to dynamically call this jar, but transitioning to new Scala versions is not frequent.
storage_client = storage.Client() | ||
BUCKET = storage_client.create_bucket(BUCKET_NAME) | ||
|
||
yield |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
instead of using a global variable BUCKET
, you can yield the bucket itself, then pass the fixture to the test as an argument. So, in line 87 you'd have
bucket = storage_client.create_bucket(BUCKET_NAME)
yield bucket
then in the test itself, you'd call
def test_setup(capsys,setup_and_teardown_bucket )
and replace every other instance of BUCKET
with setup_and_teardown_bucket
.
That said, it might be worth renaming the fixture to test_bucket
or something similar that you like, since that's what you're yielding.
BUCKET = None | ||
|
||
|
||
@pytest.fixture(autouse=True) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
From what I can tell, this fixture works without yielding anything, but you could also get rid of the autouse and instead yield the cluster name, which you'd then call as an argument in the definition of the test function (See below comment for the GCS example)
Afaik, it's the same result, different method. Not experienced enough with pytest to know what best practice in this case is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It looks like we need to yield to separate the setup code from the teardown code for the cluster. And, we do not need to yield the cluster name because it is a global variable that uses a uuid.
assert "null" in out | ||
|
||
|
||
def get_blob_from_path(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a helper function? If so, it should be at the top
df = spark.read.format('bigquery').option('table', TABLE).load() | ||
except Py4JJavaError: | ||
print(f"{TABLE} does not exist. ") | ||
sys.exit(0) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can replace sys.exit(0)
with return
to achieve the same effect albeit potentially more gracefully now you've now wrapped this into a function.
new_df.sample(False, 0.0001, seed=50).show(n=100) | ||
|
||
# Duplicate about 0.01% of the rows | ||
dup_df = new_df.sample(True, 0.0001, seed=42) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the intention behind hard-coding the seeds?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's no intention, we just thought it might be better for this to be deterministic. We'll remove them in the next revision since there is no reason to hard-code them.
'cluster_name': CLUSTER_NAME, | ||
'config': { | ||
'gce_cluster_config': { | ||
'zone_uri': zone_uri, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use 'zone_uri': ''
to have Dataproc automatically select a zone. This is generally preferred unless you have a reason for needing to use a particular zone.
cluster_client = dataproc.ClusterControllerClient(client_options={ | ||
'api_endpoint': f'{REGION}-dataproc.googleapis.com:443' | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will persist from above, you don't need it twice.
|
||
# Create cluster using cluster client | ||
cluster_client = dataproc.ClusterControllerClient(client_options={ | ||
'api_endpoint': '{}-dataproc.googleapis.com:443'.format(REGION) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: use f string
def get_blob_from_path(path): | ||
bucket_name = re.search("dataproc.+?/", path).group(0)[0:-1] | ||
bucket = storage.Client().get_bucket(bucket_name) | ||
output_location = re.search("google-cloud-dataproc.+", path).group(0) | ||
return bucket.blob(output_location) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would either inline this in the section of the code where you reference it, or move it up to the top of the test file.
'''Tests setup.py by submitting it to a dataproc cluster''' | ||
|
||
# Upload file | ||
destination_blob_name = "setup.py" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move to top-level as a "final" capitalized field
blob = BUCKET.blob(destination_blob_name) | ||
blob.upload_from_filename("setup.py") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would do this in your setup steps
job_file_name = "gs://" + BUCKET_NAME + "/setup.py" | ||
|
||
# Create job configuration | ||
job_details = { | ||
'placement': { | ||
'cluster_name': CLUSTER_NAME | ||
}, | ||
'pyspark_job': { | ||
'main_python_file_uri': job_file_name, | ||
'args': [ | ||
BUCKET_NAME, | ||
"--test", | ||
], | ||
"jar_file_uris": [ | ||
"gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar" | ||
], | ||
}, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move this to the top level as a "final" variable, or make a function "get_job" that just returns this
} | ||
|
||
# Submit job to dataproc cluster | ||
job_client = dataproc.JobControllerClient(client_options={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I write tests, I personally prefer that the tests have as little code as possible in them. I think if you start the code here and move everything either into "final" variables or in your setup function, it ends up being a bit more succinct.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't think this test has much code in it - most of it is asserts, which we could cut down on if you like.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the test is now fine as is in relation to this specific thread. My original point was more towards moving configs and such out of the file.
degrees = int(angle) | ||
minutes = int((angle - degrees) * 60) | ||
seconds = int((angle - degrees - minutes/60) * 3600) | ||
new_angle = str(degrees) + u"\u00B0" + \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In Python3, all non-byte strings are unicode. You can safely change u\u00b0
to \u00b0
.
# Declare data transformations for each column in dataframe | ||
udfs = [ | ||
(dirty_data(trip_duration, True), StringType()), # tripduration | ||
(dirty_data(identity, True), StringType()), # starttime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unless the identity
function is used elsewhere within the job, you can replace it by using a lambda function _ = lambda x: x
that you declare just before you create udfs
. You can then replace every instance of identity
with _
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unfortunately, when I run nox -s lint,
the linter complains about lambda functions and says to use def instead. What do you think I should do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(echoing my comment from standup) You should instead be able to inline the lambdas instead: dirty_data(lambda x: x, True), StringType()),
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we talked about this at standup but I forgot to mention for future reference, if Brad or I don't have a good answer, this is a great thing to ask at Python Samples Office Hours! Or to ask me to ask the other samples owners if samples office hours are awhile away.
import random | ||
import sys | ||
|
||
from time import time_ns | ||
|
||
from google.cloud import bigquery | ||
|
||
from py4j.protocol import Py4JJavaError | ||
from pyspark.sql import SparkSession | ||
|
||
from pyspark.sql.functions import UserDefinedFunction | ||
from pyspark.sql.types import IntegerType, StringType |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: linting imports - https://www.python.org/dev/peps/pep-0008/#imports
spark.conf.set('temporaryGcsBucket', BUCKET_NAME) | ||
|
||
df.write.format('bigquery') \ | ||
.option('table', dataset_id + ".RAW_DATA") \ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: you can move the temporary bucket into the write
operation to avoid editing the Spark conf:
df.write
.format("bigquery")
.option("table", dataset_id + ".RAW_DATA")
.option("temporaryGcsBucket", BUCKET_NAME)
.save()
# Declare data transformations for each column in dataframe | ||
udfs = [ | ||
(dirty_data(trip_duration, True), StringType()), # tripduration | ||
(dirty_data(identity, True), StringType()), # starttime |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I know we talked about this at standup but I forgot to mention for future reference, if Brad or I don't have a good answer, this is a great thing to ask at Python Samples Office Hours! Or to ask me to ask the other samples owners if samples office hours are awhile away.
} | ||
|
||
# Submit job to dataproc cluster | ||
job_client = dataproc.JobControllerClient(client_options={ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
TBH I don't think this test has much code in it - most of it is asserts, which we could cut down on if you like.
bucket.delete(force=True) | ||
|
||
|
||
def get_blob_from_path(path): |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this function needed to get the bucket_name when you can yield it from the fixture?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This function is actually getting a different bucket (the Dataproc job output). Also, it is here to convert the URL into a blob so that it can be downloaded as a string. The bucket created from the fixture is used to upload our script.
yield | ||
|
||
# Delete cluster | ||
operation = cluster_client.delete_cluster(PROJECT, REGION, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What happens if the cluster isn't found? You made need to add a try/except here. @bradmiro plz chime in if there's a best practice
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I typically just let this fail loudly. In any instance where the cluster isn't properly deleted, it is usually indicative of other problems.
assert re.search("[0-9] h", out) | ||
|
||
# station latitude & longitude | ||
assert re.search(u"\u00B0" + "[0-9]+\'[0-9]+\"", out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: remove the unicode u
@@ -46,12 +43,6 @@ | |||
'num_instances': 6, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does the runtime change drastically if you change this to 4 or 8?
def print_df(df, table_name): | ||
'''Print 20 rows from dataframe and a random sample''' | ||
# first 100 rows for smaller tables | ||
df.show() | ||
|
||
# random sample for larger tables | ||
# for small tables this will be empty | ||
df.sample(True, 0.0001).show(n=500, truncate=False) | ||
|
||
print(f"Table {table_name} printed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think printing the external datasets as well as the dirty one will affect some assert statements in our test script?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as what your asserting is in out
somewhere, I don't think it should matter
@@ -91,12 +127,25 @@ def main(): | |||
upload = True # Whether to upload data to BigQuery | |||
|
|||
# Check whether or not results should be uploaded | |||
if len(sys.argv) > 2: | |||
if '--test' in sys.argv: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we changed this to --dry-run?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
RAW_TABLE_NAME = "RAW_DATA" | ||
DATASET_NAME = "data_science_onramp" | ||
RAW_TABLE_NAME = "new_york_citibike_trips" | ||
EXTERNAL_DATASETS = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Instead of having an external datasets dictionary, can we just call it datasets and group in the citibike one too? Asking because I see that there is some repetitive code below (e.g. if upload: write_to_bigquery ...
)? We might need to make some tweaks if we do this because it does not have a URL so not sure if it's worth it. I'm curious to see what everyone thinks!
@@ -118,6 +125,10 @@ def test_setup(): | |||
blob = get_blob_from_path(output_location) | |||
out = blob.download_as_string().decode("utf-8") | |||
|
|||
# check that tables were printed | |||
for table_name in TABLE_NAMES: | |||
assert table_printed(table_name, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this need to be its own function if its only being used once? Can we not do assert re.search(f"Table {table_name} printed" in out
?
@@ -118,6 +125,10 @@ def test_setup(): | |||
blob = get_blob_from_path(output_location) | |||
out = blob.download_as_string().decode("utf-8") | |||
|
|||
# check that tables were printed | |||
for table_name in TABLE_NAMES: | |||
assert table_printed(table_name, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there another way to test that the table actually printed instead of checking that the print statement you added is present? There could be a possibility where the table actually did not print but your print statement did which is not a sufficient check.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the df.show prints column names, you could check for those
def print_df(df, table_name): | ||
'''Print 20 rows from dataframe and a random sample''' | ||
# first 100 rows for smaller tables | ||
df.show() | ||
|
||
# random sample for larger tables | ||
# for small tables this will be empty | ||
df.sample(True, 0.0001).show(n=500, truncate=False) | ||
|
||
print(f"Table {table_name} printed") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As long as what your asserting is in out
somewhere, I don't think it should matter
@@ -91,12 +127,25 @@ def main(): | |||
upload = True # Whether to upload data to BigQuery | |||
|
|||
# Check whether or not results should be uploaded | |||
if len(sys.argv) > 2: | |||
if '--test' in sys.argv: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1
# Check if table exists | ||
try: | ||
df = spark.read.format('bigquery').option('table', TABLE).load() | ||
except Py4JJavaError: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Add a short comment explaining why we'd return if this happens (if I understand correctly, this error will happen when you do the --dry-run
option)
# Create final dirty dataframe | ||
df = df.union(dup_df) | ||
|
||
if upload: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the difference between this upload and write to bigquery and the other?
You may want to consider also making this a function because it's repeated code.
@@ -118,6 +125,10 @@ def test_setup(): | |||
blob = get_blob_from_path(output_location) | |||
out = blob.download_as_string().decode("utf-8") | |||
|
|||
# check that tables were printed | |||
for table_name in TABLE_NAMES: | |||
assert table_printed(table_name, out) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the df.show prints column names, you could check for those
…mples into data-ingestion
This needs a rebase (can help with this). Can we rebase and then merge? |
…-docs-samples into data-ingestion
743a664
to
b82059b
Compare
The dirty data script and corresponding test file are in the data-ingestion folder. We are looking forward to you feedback :)