-
Notifications
You must be signed in to change notification settings - Fork 6.6k
Kaiyang expansion project 2022 #8224
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Kaiyang expansion project 2022 #8224
Conversation
Ok I haven't even started reviewing but this PR description is 🔥 and because of that it has me excited to review it , well done @kaiyang-code |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking great! Let me know if you'd like me to clarify any comments.
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}", | ||
], | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Extra line? @leahecole to confirm DAG syntax
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk I'd probably run black
on it though for formatting
# BQ_DESTINATION_DATASET_NAME = "expansion_project" | ||
# BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined" | ||
# BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized" | ||
# BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" | ||
# BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_snow_mean" | ||
# BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp" | ||
# BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary to leave in for the purposes of the sample?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I left it on purpose so that the PySpark program can run independently without having to run together with the DAG. You can check out the "If you just want to run the PySpark code:" section in my description :)
# BUCKET_NAME = "workshop_example_bucket" | ||
# READ_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}" | ||
# DF_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}" | ||
# PRCP_MEAN_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}" | ||
# SNOW_MEAN_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}" | ||
# PHX_PRCP_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}" | ||
# PHX_SNOW_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this necessary to leave in?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same reason as above
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
phx_annual_prcp_df = ( | ||
phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year}", lit(phx_dw_compute(prcp_year))) | ||
) | ||
phx_annual_snow_df = ( | ||
phx_annual_snow_df.withColumn(f"PHX_SNOW_{year}", lit(phx_dw_compute(snow_year))) | ||
) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can also just create the DataFrames here and populate them in one line instead of declaring them on 119 / 120. See Section 2.2: https://sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I'm doing this is because they are in a for loop
. If I didn't misunderstand your point, I think creating and populating them in one line would result in more than one DF, which is not what I want.
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
+1 to what Brad says, and then in the data processing file, extra emphasis on the adding comments. We aren't going to have time to turn this into a tutorial, so having it as a well commented, runnable code sample is the next best thing
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" | ||
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are these supposed to be identical?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it's a typo! Just fixed!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still showing up as identical
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}", | ||
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}", | ||
], | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
idk I'd probably run black
on it though for formatting
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
composer/2022_airflow_summit/data_analytics_process_expansion.py
Outdated
Show resolved
Hide resolved
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
couple of other small nits
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" | ||
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Still showing up as identical
) | ||
|
||
phx_annual_prcp_df = ( | ||
phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is there a reason you're introducing new columns each time? As opposed to making this statically have two columns that you're appending (Year, Value) to?
phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year))) | ||
) | ||
phx_annual_snow_df = ( | ||
phx_annual_snow_df.withColumn(f"PHX_SNOW_{year_val}", lit(phx_dw_compute(snow_year))) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as above.
Sorry, something went wrong.
All reactions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same as above
Sorry, something went wrong.
All reactions
* Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@bradmiro I'm fine to merge this to a branch for now, wdyt?
Sorry, something went wrong.
All reactions
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM for merge into staging upstream branch - rebase to happen before merging into upstream main.
Sorry, something went wrong.
All reactions
* chenged the dag to load ghcn dataset * data preprocessing done * modified preprocessing * dataproc file added * code runs great * modifyed code based on Brad, still buggy * finished modifying, haven't sync wit hDAG * finished modifying DAG codes * ready for draft PR * pass lint * addressed Brad and Leah's comments * pass nox lint * pass nox lint * Fix: Retry CLI launch if needed (#8221) * Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error * address Leah's comments about typo and comments Co-authored-by: Charles Engelke <engelke@google.com>
* chenged the dag to load ghcn dataset * data preprocessing done * modified preprocessing * dataproc file added * code runs great * modifyed code based on Brad, still buggy * finished modifying, haven't sync wit hDAG * finished modifying DAG codes * ready for draft PR * pass lint * addressed Brad and Leah's comments * pass nox lint * pass nox lint * Fix: Retry CLI launch if needed (#8221) * Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error * address Leah's comments about typo and comments Co-authored-by: Charles Engelke <engelke@google.com>
* Kaiyang expansion project 2022 (#8224) * chenged the dag to load ghcn dataset * data preprocessing done * modified preprocessing * dataproc file added * code runs great * modifyed code based on Brad, still buggy * finished modifying, haven't sync wit hDAG * finished modifying DAG codes * ready for draft PR * pass lint * addressed Brad and Leah's comments * pass nox lint * pass nox lint * Fix: Retry CLI launch if needed (#8221) * Fix: add region tags * Fix: region tag typos * Fix: urlpatterns moved to end * Fix: typo * Fix: cli retries to fix flakiness * Fix: remove duplicate tags * Fix: use backoff for retries * Fix: lint import order error * address Leah's comments about typo and comments Co-authored-by: Charles Engelke <engelke@google.com> * run blacken on dag and dataproc code * WIP: not working test for process job * working test for expansion dataproc script * move dataproc expansion files to separate directory * add readme * update readme * run black * ignore data file * fix import order * ignore one line of lint because it's being silly * add check for Notfound for test * add requirements files * add noxfile config * update try/except * experiment - fully qualify path * update filepath * update path * try different path * remove the directory that was causing test problems * fix typo in header checker * tell folks to skip cleanup of prereq * clean up hyperlinks for distance weighting and arithmetic mean * fix math links again * remove debug statements * remove commented out variables * Update composer/2022_airflow_summit/data_analytics_dag_expansion_test.py Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com> * Apply suggestions from code review Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com> * Apply suggestions from code review * update apache-beam version (#8302) Bumping the `apache-beam[gcp]` version to (indirectly) bump the `google-cloud-pubsub` version to accept the keyword argument `request` on `create_topic()` * dataflow: replace job name underscores with hyphens (#8303) * dataflow: replace job name underscores with hyphens It looks like Dataflow no longer accepts underscores in the job names. Replacing them with hyphens should work. * fix test checks * improve error reporting * fix test name for exception handling * chore(deps): update dependency datalab to v1.2.1 (#8309) * fix: unsanitized output (#8316) * fix: unsanitized output * fix: add license to template * chore(deps): update dependency cryptography to v38 (#8317) * chore(deps): update dependency cryptography to v38 * lint Co-authored-by: Anthonios Partheniou <partheniou@google.com> * Remove region tags to be consistent with other languages (#8322) * fix lint in conftest (#8324) * Pin perl version to 5.34.0 as latest doesn't work with the example. (#8319) Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> * refactor fixtures * revert last change * revert last change * chore(deps): update dependency tensorflow to v2.7.2 [security] (#8329) * remove backoff, add manual retry (#8328) * remove backoff, add manual retry * fix lint * remove unused import Co-authored-by: Anthonios Partheniou <partheniou@google.com> * refactor test to match #8328 * update most write methods, fix test issue with comparing to exception * Bmiro kaiyang edit (#8350) * modified code to more closely adhere to Spark best practices * remove unnecessary import * improved explanation of Inverse Distance Weighting * Apply suggestions from code review Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com> * run black on process files * fix relative import issue * fixed jvm error (#8360) * Add UDF type hinting (#8361) * fixed jvm error * add type hinting to UDF * Update composer/2022_airflow_summit/data_analytics_process_expansion.py * fix comment alignment * change dataproc region to northamerica-northeast1 * refactor import * switch other test to also use northamerica-northeast1 Co-authored-by: kaiyang-code <57576013+kaiyang-code@users.noreply.github.com> Co-authored-by: Charles Engelke <engelke@google.com> Co-authored-by: Maciej Strzelczyk <strzelczyk@google.com> Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com> Co-authored-by: David Cavazos <dcavazos@google.com> Co-authored-by: WhiteSource Renovate <bot@renovateapp.com> Co-authored-by: Anthonios Partheniou <partheniou@google.com> Co-authored-by: Averi Kitsch <akitsch@google.com> Co-authored-by: mhenc <mhenc@google.com> Co-authored-by: Brad Miro <bmiro@google.com>
leahecole
bradmiro
rachael-ds
rafalbiegacz
Successfully merging this pull request may close these issues.
Description
@leahecole @bradmiro This is the draft PR for Kaiyang Yu's expansion project. The DAG script is an expansion of data_analytics_dag.py and the PySpark code is an expansion of the data_analytics_process.py. The entire workflow is trying to answer the question: "How has the rainfall and snowfall patterns in the western US changed over the past 25 years?" and "How has the rainfall and snowfall patterns in Phoenix changed over the past 25 years?". The instructions are specifically designed for @leahecole and @bradmiro as others may not have access to the recourses.
How to run
The expansion project can be run in the following way:
workshop_example_bucket
GCS bucket) file and upload it to your desired GCS bucket. This is the dataset after pre-processing.data_analytics_process_expansion.py
to the same GCS bucket as the last step.dataproc_service_account
: #######-compute@developer.gserviceaccount.comgce_region
: us-central1gcp_project
: <your_gcp_project>gcs_bucket
: the bucket you created in step onedata_analytics_dag_expansion.py
to the Composer environment you just created and trigger the DAGAlternatively, you can also directly run it with the same environment that I'm using:
expansion_project
data_analytics_dag
If you just want to run the PySpark code:
data_analytics_process_expansion.py
, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).data_analytics_process_expansion.py
to a GCS bucket.gcloud dataproc jobs submit pyspark gs://path_to_your_file_from_last_step --cluster=your_cluster --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
using Cloud Shell.Alternatively, you can run the PySpark code using my cluster:
data_analytics_process_expansion.py
, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).data_analytics_process_expansion.py
to theworkshop_example_bucket
GCS bucket, overwriting the original file.gcloud dataproc jobs submit pyspark gs://workshop_example_bucket/data_analytics_process_expansion.py --cluster=cluster-d630 --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar
.View results
expansion_project
:ghcnd_stations_joined
: merged dataset after the BigQuery query job.ghcnd_stations_normalization
: dataset after row filtering and unit normalization.ghcnd_stations_prcp_mean
: arithmetic mean of annual precipitation in western US over the past 25 yearsghcnd_stations_snow_mean
: arithmetic mean of annual snowfall in western US over the past 25 yearsphx_annual_prcp
: annual precipitation in Phoenix over the past 25 years (result of distance weighting algorithm)phx_annual_snow
: annual snowfall in Phoenix over the past 25 years (result of distance weighting algorithm)ghcnd-stations-joined
dataset since the DAG codes features aWRITE_APPEND
write disposition and the dataset will double in size every time the DAG runs. You don't have to worry about it if you are only running the PySpark program. However, be sure that theghcnd-stations-joined
dataset exists in BQ if you're only running the PySpark code.Next step
data_analytics_process_expansion.py
.print()
functions, as they are here only for debugging purposes.Checklist
nox -s py-3.9
(see Test Environment Setup)nox -s lint
(see Test Environment Setup)