8000 Kaiyang expansion project 2022 by kaiyang-code · Pull Request #8224 · GoogleCloudPlatform/python-docs-samples · GitHub
[go: up one dir, main page]

Skip to content
10000

Kaiyang expansion project 2022 #8224

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Conversation

kaiyang-code
Copy link
Contributor
@kaiyang-code kaiyang-code commented Aug 1, 2022

Description

@leahecole @bradmiro This is the draft PR for Kaiyang Yu's expansion project. The DAG script is an expansion of data_analytics_dag.py and the PySpark code is an expansion of the data_analytics_process.py. The entire workflow is trying to answer the question: "How has the rainfall and snowfall patterns in the western US changed over the past 25 years?" and "How has the rainfall and snowfall patterns in Phoenix changed over the past 25 years?". The instructions are specifically designed for @leahecole and @bradmiro as others may not have access to the recourses.

How to run

The expansion project can be run in the following way:

  1. Download the ghcnd-stations-new.txt (if the link doesn't work, you can find it in the workshop_example_bucket GCS bucket) file and upload it to your desired GCS bucket. This is the dataset after pre-processing.
  2. Upload the data_analytics_process_expansion.py to the same GCS bucket as the last step.
  3. Create a Cloud Composer environment with the latest version of Composer and Airflow. Add the following variables using Airflow UI:
  4. Upload the data_analytics_dag_expansion.py to the Composer environment you just created and trigger the DAG

Alternatively, you can also directly run it with the same environment that I'm using:

  1. Navigate to the Cloud Composer console and select the environment called expansion_project
  2. Select data_analytics_dag
  3. Trigger the DAG

If you just want to run the PySpark code:

  1. In data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).
  2. Upload the new data_analytics_process_expansion.py to a GCS bucket.
  3. Run gcloud dataproc jobs submit pyspark gs://path_to_your_file_from_last_step --cluster=your_cluster --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar using Cloud Shell.

Alternatively, you can run the PySpark code using my cluster:

  1. In data_analytics_process_expansion.py, comment out line 32 to 38 (inclusive), uncomment line 23 to 29 and 40 to 46 (inclusive).
  2. Re-upload the data_analytics_process_expansion.py to the workshop_example_bucket GCS bucket, overwriting the original file.
  3. In Cloud Shell, run gcloud dataproc jobs submit pyspark gs://workshop_example_bucket/data_analytics_process_expansion.py --cluster=cluster-d630 --region=us-central1 --jars=gs://spark-lib/bigquery/spark-bigquery-latest_2.12.jar.

View results

  • You can view the results in BigQuery, under the dataset expansion_project:
    • ghcnd_stations_joined: merged dataset after the BigQuery query job.
    • ghcnd_stations_normalization: dataset after row filtering and unit normalization.
    • ghcnd_stations_prcp_mean: arithmetic mean of annual precipitation in western US over the past 25 years
    • ghcnd_stations_snow_mean: arithmetic mean of annual snowfall in western US over the past 25 years
    • phx_annual_prcp: annual precipitation in Phoenix over the past 25 years (result of distance weighting algorithm)
    • phx_annual_snow: annual snowfall in Phoenix over the past 25 years (result of distance weighting algorithm)
⚠️ CAUTION: Before running the DAG, be sure to remove the ghcnd-stations-joined dataset since the DAG codes features a WRITE_APPEND write disposition and the dataset will double in size every time the DAG runs. You don't have to worry about it if you are only running the PySpark program. However, be sure that the ghcnd-stations-joined dataset exists in BQ if you're only running the PySpark code.

Next step

  • Add test for data_analytics_process_expansion.py.
  • Delete the print() functions, as they are here only for debugging purposes.

Checklist

@product-auto-label product-auto-label bot added the samples Issues that are directly related to samples. label Aug 1, 2022
@leahecole
Copy link
Collaborator

Ok I haven't even started reviewing but this PR description is 🔥 and because of that it has me excited to review it , well done @kaiyang-code

Copy link
Collaborator
@bradmiro bradmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looking great! Let me know if you'd like me to clarify any comments.

f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}",
],

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Extra line? @leahecole to confirm DAG syntax

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk I'd probably run black on it though for formatting

Comment on lines +23 to +29
# BQ_DESTINATION_DATASET_NAME = "expansion_project"
# BQ_DESTINATION_TABLE_NAME = "ghcnd_stations_joined"
# BQ_NORMALIZED_TABLE_NAME = "ghcnd_stations_normalized"
# BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
# BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_snow_mean"
# BQ_PHX_PRCP_TABLE_NAME = "phx_annual_prcp"
# BQ_PHX_SNOW_TABLE_NAME = "phx_annual_snow"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary to leave in for the purposes of the sample?

Copy link
Contributor Author
@kaiyang-code kaiyang-code Aug 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I left it on purpose so that the PySpark program can run independently without having to run together with the DAG. You can check out the "If you just want to run the PySpark code:" section in my description :)

Comment on lines +40 to +46
# BUCKET_NAME = "workshop_example_bucket"
# READ_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_DESTINATION_TABLE_NAME}"
# DF_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_NORMALIZED_TABLE_NAME}"
# PRCP_MEAN_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PRCP_MEAN_TABLE_NAME}"
# SNOW_MEAN_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_SNOW_MEAN_TABLE_NAME}"
# PHX_PRCP_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}"
# PHX_SNOW_WRITE_TABLE = f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this necessary to leave in?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same reason as above

Comment on lines 180 to 185
phx_annual_prcp_df = (
phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year}", lit(phx_dw_compute(prcp_year)))
)
phx_annual_snow_df = (
phx_annual_snow_df.withColumn(f"PHX_SNOW_{year}", lit(phx_dw_compute(snow_year)))
)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can also just create the DataFrames here and populate them in one line instead of declaring them on 119 / 120. See Section 2.2: https://sparkbyexamples.com/pyspark/different-ways-to-create-dataframe-in-pyspark/

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason I'm doing this is because they are in a for loop. If I didn't misunderstand your point, I think creating and populating them in one line would result in more than one DF, which is not what I want.

Copy link
Collaborator
@leahecole leahecole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to what Brad says, and then in the data processing file, extra emphasis on the adding comments. We aren't going to have time to turn this into a tutorial, so having it as a well commented, runnable code sample is the next best thing

Comment on lines +31 to +32
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these supposed to be identical?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's a typo! Just fixed!

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still showing up as identical

f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_PRCP_TABLE_NAME}",
f"{BQ_DESTINATION_DATASET_NAME}.{BQ_PHX_SNOW_TABLE_NAME}",
],

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

idk I'd probably run black on it though for formatting

Copy link
Collaborator
@leahecole leahecole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couple of other small nits

Comment on lines +31 to +32
BQ_PRCP_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
BQ_SNOW_MEAN_TABLE_NAME = "ghcnd_stations_prcp_mean"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still showing up as identical

@leahecole leahecole changed the base branch from main to kaiyang_expansion_project August 4, 2022 17:57
)

phx_annual_prcp_df = (
phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a reason you're introducing new columns each time? As opposed to making this statically have two columns that you're appending (Year, Value) to?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is due to the design of the expansion project. We can make the changes in the future if adding rows is something that we want. leaving it as unresolved for now 😃

phx_annual_prcp_df.withColumn(f"PHX_PRCP_{year_val}", lit(phx_dw_compute(prcp_year)))
)
phx_annual_snow_df = (
phx_annual_snow_df.withColumn(f"PHX_SNOW_{year_val}", lit(phx_dw_compute(snow_year)))
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same as above.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as above

engelke and others added 3 commits August 4, 2022 16:11
* Fix: add region tags

* Fix: region tag typos

* Fix: urlpatterns moved to end

* Fix: typo

* Fix: cli retries to fix flakiness

* Fix: remove duplicate tags

* Fix: use backoff for retries

* Fix: lint import order error
@kaiyang-code kaiyang-code requested a review from leahecole August 6, 2022 03:03
@kaiyang-code kaiyang-code requested a review from bradmiro August 6, 2022 03:03
@leahecole leahecole self-assigned this Aug 11, 2022
Copy link
Collaborator
@leahecole leahecole left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@bradmiro I'm fine to merge this to a branch for now, wdyt?

Copy link
Collaborator
@bradmiro bradmiro left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM for merge into staging upstream branch - rebase to happen before merging into upstream main.

@leahecole leahecole marked this pull request as ready for review August 18, 2022 19:15
@leahecole leahecole requested review from rachael-ds, rafalbiegacz and a team as code owners August 18, 2022 19:15
@leahecole leahecole merged this pull request into GoogleCloudPlatform:kaiyang_expansion_project Aug 18, 2022
leahecole pushed a commit that referenced this pull request Aug 19, 2022
* chenged the dag to load ghcn dataset

* data preprocessing done

* modified preprocessing

* dataproc file added

* code runs great

* modifyed code based on Brad, still buggy

* finished modifying, haven't sync wit hDAG

* finished modifying DAG codes

* ready for draft PR

* pass lint

* addressed Brad and Leah's comments

* pass nox lint

* pass nox lint

* Fix: Retry CLI launch if needed (#8221)

* Fix: add region tags

* Fix: region tag typos

* Fix: urlpatterns moved to end

* Fix: typo

* Fix: cli retries to fix flakiness

* Fix: remove duplicate tags

* Fix: use backoff for retries

* Fix: lint import order error

* address Leah's comments about typo and comments

Co-authored-by: Charles Engelke <engelke@google.com>
leahecole pushed a commit that referenced this pull request Aug 25, 2022
* chenged the dag to load ghcn dataset

* data preprocessing done

* modified preprocessing

* dataproc file added

* code runs great

* modifyed code based on Brad, still buggy

* finished modifying, haven't sync wit hDAG

* finished modifying DAG codes

* ready for draft PR

* pass lint

* addressed Brad and Leah's comments

* pass nox lint

* pass nox lint

* Fix: Retry CLI launch if needed (#8221)

* Fix: add region tags

* Fix: region tag typos

* Fix: urlpatterns moved to end

* Fix: typo

* Fix: cli retries to fix flakiness

* Fix: remove duplicate tags

* Fix: use backoff for retries

* Fix: lint import order error

* address Leah's comments about typo and comments

Co-authored-by: Charles Engelke <engelke@google.com>
leahecole added a commit that referenced this pull request Sep 28, 2022
* Kaiyang expansion project 2022 (#8224)

* chenged the dag to load ghcn dataset

* data preprocessing done

* modified preprocessing

* dataproc file added

* code runs great

* modifyed code based on Brad, still buggy

* finished modifying, haven't sync wit hDAG

* finished modifying DAG codes

* ready for draft PR

* pass lint

* addressed Brad and Leah's comments

* pass nox lint

* pass nox lint

* Fix: Retry CLI launch if needed (#8221)

* Fix: add region tags

* Fix: region tag typos

* Fix: urlpatterns moved to end

* Fix: typo

* Fix: cli retries to fix flakiness

* Fix: remove duplicate tags

* Fix: use backoff for retries

* Fix: lint import order error

* address Leah's comments about typo and comments

Co-authored-by: Charles Engelke <engelke@google.com>

* run blacken on dag and dataproc code

* WIP: not working test for process job

* working test for expansion dataproc script

* move dataproc expansion files to separate directory

* add readme

* update readme

* run black

* ignore data file

* fix import order

* ignore one line of lint because it's being silly

* add check for Notfound for test

* add requirements files

* add noxfile config

* update try/except

* experiment - fully qualify path

* update filepath

* update path

* try different path

* remove the directory that was causing test problems

* fix typo in header checker

* tell folks to skip cleanup of prereq

* clean up hyperlinks for distance weighting and arithmetic mean

* fix math links again

* remove debug statements

* remove commented out variables

* Update composer/2022_airflow_summit/data_analytics_dag_expansion_test.py

Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com>

* Apply suggestions from code review

Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com>

* Apply suggestions from code review

* update apache-beam version (#8302)

Bumping the `apache-beam[gcp]` version to (indirectly) bump the `google-cloud-pubsub` version to accept the keyword argument `request` on `create_topic()`

* dataflow: replace job name underscores with hyphens (#8303)

* dataflow: replace job name underscores with hyphens

It looks like Dataflow no longer accepts underscores in the job names. Replacing them with hyphens should work.

* fix test checks

* improve error reporting

* fix test name for exception handling

* chore(deps): update dependency datalab to v1.2.1 (#8309)

* fix: unsanitized output (#8316)

* fix: unsanitized output

* fix: add license to template

* chore(deps): update dependency cryptography to v38 (#8317)

* chore(deps): update dependency cryptography to v38

* lint

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* Remove region tags to be consistent with other languages (#8322)

* fix lint in conftest (#8324)

* Pin perl version to 5.34.0 as latest doesn't work with the example. (#8319)

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

* refactor fixtures

* revert last change

* revert last change

* chore(deps): update dependency tensorflow to v2.7.2 [security] (#8329)

* remove backoff, add manual retry (#8328)

* remove backoff, add manual retry

* fix lint

* remove unused import

Co-authored-by: Anthonios Partheniou <partheniou@google.com>

* refactor test to match #8328

* update most write methods, fix test issue with comparing to exception

* Bmiro kaiyang edit (#8350)

* modified code to more closely adhere to Spark best practices

* remove unnecessary import

* improved explanation of Inverse Distance Weighting

* Apply suggestions from code review

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

Co-authored-by: Leah E. Cole <6719667+leahecole@users.noreply.github.com>

* run black on process files

* fix relative import issue

* fixed jvm error (#8360)

* Add UDF type hinting (#8361)

* fixed jvm error

* add type hinting to UDF

* Update composer/2022_airflow_summit/data_analytics_process_expansion.py

* fix comment alignment

* change dataproc region to northamerica-northeast1

* refactor import

* switch other test to also use northamerica-northeast1

Co-authored-by: kaiyang-code <57576013+kaiyang-code@users.noreply.github.com>
Co-authored-by: Charles Engelke <engelke@google.com>
Co-authored-by: Maciej Strzelczyk <strzelczyk@google.com>
Co-authored-by: Dan Lee <71398022+dandhlee@users.noreply.github.com>
Co-authored-by: David Cavazos <dcavazos@google.com>
Co-authored-by: WhiteSource Renovate <bot@renovateapp.com>
Co-authored-by: Anthonios Partheniou <partheniou@google.com>
Co-authored-by: Averi Kitsch <akitsch@google.com>
Co-authored-by: mhenc <mhenc@google.com>
Co-authored-by: Brad Miro <bmiro@google.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
samples Issues that are directly related to samples.
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants
0