From 78553024aca44d375427f5934e2a72847c161540 Mon Sep 17 00:00:00 2001 From: bradmiro Date: Tue, 27 Sep 2022 14:03:52 -0400 Subject: [PATCH] fixed jvm error --- .../data_analytics_process_expansion.py | 65 ++++++++++--------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/composer/2022_airflow_summit/data_analytics_process_expansion.py b/composer/2022_airflow_summit/data_analytics_process_expansion.py index e6b43ce35c5..55107ac1683 100644 --- a/composer/2022_airflow_summit/data_analytics_process_expansion.py +++ b/composer/2022_airflow_summit/data_analytics_process_expansion.py @@ -23,38 +23,6 @@ from pyspark.sql import SparkSession import pyspark.sql.functions as f -# Inverse Distance Weighting algorithm (DWA) -@f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP) -def phx_dw_compute(year, df) -> pd.DataFrame: - # This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting - # based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher - # its measurement is weighed. - # - # This function combines the distance equation and inverse distance factor since the distance equation is: - # - # d = sqrt((x1-x2)^2 + (y1-y2)^2)) - # - # and the inverse distance factor is: - # - # idf = 1 / d^2 - # - # so we negate the square and square root to combine this into: - # - # idf = 1 / ((x1-x2)^2 + (y1-y2)^2)) - - # Latitude and longitude of Phoenix - PHX_LATITUDE = 33.4484 - PHX_LONGITUDE = -112.0740 - - inverse_distance_factors = 1.0 / ( - (PHX_LATITUDE - df.LATITUDE) ** 2 + (PHX_LONGITUDE - df.LONGITUDE) ** 2 - ) - - # Calculate each station's weight - weights = inverse_distance_factors / inverse_distance_factors.sum() - - return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()}) - if __name__ == "__main__": # read in the input argument @@ -133,6 +101,39 @@ def phx_dw_compute(year, df) -> pd.DataFrame: states_near_phx = ["AZ", "CA", "CO", "NM", "NV", "UT"] annual_df = df.where(df.STATE.isin(states_near_phx)) + # Inverse Distance Weighting algorithm (DWA) + @f.pandas_udf("YEAR integer, VALUE double", f.PandasUDFType.GROUPED_MAP) + def phx_dw_compute(year, df) -> pd.DataFrame: + # This adjusts the rainfall / snowfall in Phoenix for a given year using Inverse Distance Weighting + # based on each weather station's distance to Phoenix. The closer a station is to Phoenix, the higher + # its measurement is weighed. + # + # This function combines the distance equation and inverse distance factor since the distance equation is: + # + # d = sqrt((x1-x2)^2 + (y1-y2)^2)) + # + # and the inverse distance factor is: + # + # idf = 1 / d^2 + # + # so we negate the square and square root to combine this into: + # + # idf = 1 / ((x1-x2)^2 + (y1-y2)^2)) + + # Latitude and longitude of Phoenix + PHX_LATITUDE = 33.4484 + PHX_LONGITUDE = -112.0740 + + inverse_distance_factors = 1.0 / ( + (PHX_LATITUDE - df.LATITUDE) ** 2 + + (PHX_LONGITUDE - df.LONGITUDE) ** 2 + ) + + # Calculate each station's weight + weights = inverse_distance_factors / inverse_distance_factors.sum() + + return pd.DataFrame({"YEAR": year, "VALUE": (weights * df.ANNUAL_AMOUNT).sum()}) + # Calculate the distance-weighted precipitation amount phx_annual_prcp_df = ( annual_df.where((annual_df.ELEMENT == "PRCP"))