Challenge 3: Advanced Analysis

Challenge

In this challenge you will combine window functions and user-defined functions (UDFs) to perform a more advanced analysis of the weather dataset.

Use the daily-weather-data-sample.csv dataset.

Tasks:

  1. Load the dataset daily-weather-data-sample.csv into a Spark DataFrame.
  2. Create a window partitioned by city and ordered by valid_date_as_yyyymmdd.
  3. Use a window function to calculate the previous day’s temperature.
  4. Create a new column called temperature_change that shows the difference between the current temperature and the previous day’s temperature.
  5. Create a UDF that categorizes the temperature into:
    • "Cold" if temperature < 0
    • "Cool" if temperature is between 0 and 10
    • "Warm" if temperature > 10
  6. Add the category column to the DataFrame.
  7. Display the final results.

You should use the following concepts:

  • Window
  • lag()
  • withColumn()
  • UDF
  • partitionBy() and orderBy()

Potential Solution

One possible solution is shown below. This example uses a window function to compare temperatures across days and a UDF to categorize temperature values.

from pyspark.sql.functions import col, lag, udf
from pyspark.sql.window import Window
from pyspark.sql.types import StringType

# Load the dataset
weather = spark.read.csv(
    "data/daily-weather-data-sample.csv",
    header=True,
    inferSchema=True
)

# Step 1: Define the window (group by city, ordered by date)
windowSpec = Window.partitionBy(
    "city_location_identifier__up_to_9_alphanumeric_characters_"
).orderBy("valid_date_as_yyyymmdd")

# Step 2: Get the previous day's temperature
weather_lag = weather.withColumn(
    "previous_temp",
    lag("average_temperature_c___float_value_to_nearest_hundredths_place").over(windowSpec)
)

# Step 3: Calculate temperature change
weather_change = weather_lag.withColumn(
    "temperature_change",
    col("average_temperature_c___float_value_to_nearest_hundredths_place") - col("previous_temp")
)

# Step 4: Define a UDF to categorize temperature
def temp_category(temp):
    if temp < 0:
        return "Cold"
    elif temp <= 10:
        return "Cool"
    else:
        return "Warm"

temp_udf = udf(temp_category, StringType())

# Step 5: Apply the UDF
weather_final = weather_change.withColumn(
    "temperature_category",
    temp_udf(col("average_temperature_c___float_value_to_nearest_hundredths_place"))
)

# Display results
display(weather_final)

Explanation

  • Window.partitionBy() groups rows so calculations occur within each city.
  • orderBy() ensures the rows are processed in chronological order.
  • lag() retrieves the previous row’s value within the window.
  • withColumn() creates new calculated columns.
  • udf() allows custom Python logic to be applied to Spark DataFrames.
  • display() shows the final dataset with all new columns.