Airflow DAG: Check Weather API & Pull Data - Initial Release

Alex Johnson
-
Airflow DAG: Check Weather API & Pull Data - Initial Release

Let's dive into creating an Airflow DAG that checks the availability of a Weather Open API and pulls weather data. This initial release sets the foundation for automating weather data collection, which can be incredibly useful for various applications, from weather forecasting to data analysis and more. This is a step-by-step guide to help you set up your first DAG. We will explore the necessary tasks, configurations, and code snippets to make this happen. So, grab your coding gloves and let's get started!

Setting Up the DAG

First things first, we need to set up our Directed Acyclic Graph (DAG) in Airflow. A DAG is a collection of tasks you want to run, organized in a way that reflects their dependencies. Here's how we'll structure our DAG:

  1. Import necessary libraries: We'll start by importing the libraries we need, such as airflow, DAG, BashOperator, PythonOperator, and requests. These libraries are essential for defining the DAG, creating tasks, and making HTTP requests.
  2. Define default arguments: Next, we'll define default arguments for our DAG. These arguments include the owner, start date, retries, and retry delay. Setting these defaults ensures that our DAG runs smoothly and is easy to maintain.
  3. Instantiate the DAG: We'll then instantiate our DAG with a unique dag_id, our default arguments, a description, and a schedule interval. The dag_id should be descriptive, and the schedule interval determines how often the DAG runs. For example, you might want it to run daily, weekly, or at specific times.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests

def check_api_availability():
    try:
        response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
        response.raise_for_status()  # Raises HTTPError for bad responses (4xx or 5xx)
        return True
    except requests.exceptions.RequestException as e:
        print(f"API is not available: {e}")
        return False

def get_weather_data():
    try:
        response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
        response.raise_for_status()
        data = response.json()
        print(f"Weather data: {data}")
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather data: {e}")
        return None

defaul_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'weather_data_pipeline',
    default_args=default_args,
    description='A DAG to check weather API availability and pull weather data',
    schedule_interval=timedelta(days=1),
    catchup=False
) as dag:

Task 1: Check API Availability

The first task in our DAG is to check if the Weather Open API is available. This ensures that our subsequent tasks don't fail due to API downtime. We'll use a PythonOperator to execute a Python function that makes an HTTP request to the API and checks the response. If the API is available, the function returns True; otherwise, it returns False.

  1. Define the Python function: We'll define a Python function that uses the requests library to make an HTTP GET request to the Weather Open API endpoint. The function checks the HTTP status code of the response. A status code of 200 indicates that the API is available.
  2. Handle exceptions: We'll also include error handling to catch any exceptions that might occur during the API request. This is important for ensuring that our DAG doesn't fail due to network issues or other unexpected errors.
  3. Create the PythonOperator: We'll create a PythonOperator that calls our Python function. The task_id should be descriptive, and the python_callable argument should be set to our Python function.
    check_api_task = PythonOperator(
        task_id='check_api_availability',
        python_callable=check_api_availability,
        dag=dag,
    )

Task 2: Pull Weather Data

Once we've confirmed that the API is available, the next step is to pull the weather data. We'll again use a PythonOperator to execute a Python function that makes an HTTP request to the API and retrieves the weather data. The function parses the JSON response and extracts the relevant information.

  1. Define the Python function: We'll define a Python function that uses the requests library to make an HTTP GET request to the Weather Open API endpoint. The function parses the JSON response and extracts the relevant information, such as temperature, humidity, and wind speed.
  2. Handle exceptions: We'll also include error handling to catch any exceptions that might occur during the API request. This is important for ensuring that our DAG doesn't fail due to network issues or other unexpected errors.
  3. Create the PythonOperator: We'll create a PythonOperator that calls our Python function. The task_id should be descriptive, and the python_callable argument should be set to our Python function. We also set provide_context=True so that we can pass information between tasks using XComs.
    get_weather_task = PythonOperator(
        task_id='get_weather_data',
        python_callable=get_weather_data,
        dag=dag,
    )

Setting Task Dependencies

Now that we've defined our tasks, we need to set their dependencies. This tells Airflow the order in which to run the tasks. In our case, we want to check the API availability before pulling the weather data. We can set this dependency using the set_downstream method.

check_api_task >> get_weather_task

This ensures that the get_weather_task only runs if the check_api_task succeeds. This is important for preventing errors and ensuring that our DAG runs smoothly.

Complete DAG Code

Here's the complete code for our DAG:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests


def check_api_availability():
    try:
        response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
        response.raise_for_status()  # Raises HTTPError for bad responses (4xx or 5xx)
        return True
    except requests.exceptions.RequestException as e:
        print(f"API is not available: {e}")
        return False


def get_weather_data():
    try:
        response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
        response.raise_for_status()
        data = response.json()
        print(f"Weather data: {data}")
        return data
    except requests.exceptions.RequestException as e:
        print(f"Error fetching weather data: {e}")
        return None

defaul_args = {
    'owner': 'airflow',
    'start_date': datetime(2024, 1, 1),
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

with DAG(
    'weather_data_pipeline',
    default_args=default_args,
    description='A DAG to check weather API availability and pull weather data',
    schedule_interval=timedelta(days=1),
    catchup=False
) as dag:
    check_api_task = PythonOperator(
        task_id='check_api_availability',
        python_callable=check_api_availability,
        dag=dag,
    )

    get_weather_task = PythonOperator(
        task_id='get_weather_data',
        python_callable=get_weather_data,
        dag=dag,
    )

    check_api_task >> get_weather_task

Testing the DAG

Before deploying our DAG to a production environment, it's important to test it thoroughly. Here's how we can test our DAG:

  1. Run the DAG manually: We can manually trigger the DAG in the Airflow UI to see if it runs successfully. This allows us to quickly identify any errors or issues.
  2. Check the logs: We can check the logs for each task to see if there are any errors or warnings. The logs provide valuable information about what's happening during the execution of the DAG.
  3. Verify the output: We can verify that the weather data is being pulled correctly by checking the output of the get_weather_data task. This ensures that our DAG is working as expected.

Conclusion

This initial release provides a solid foundation for automating weather data collection using Airflow. By checking API availability and pulling weather data, we can ensure that our DAG runs smoothly and provides us with the information we need. From here, you can expand upon this foundation by adding more tasks, such as storing the weather data in a database or visualizing it in a dashboard. The possibilities are endless! You can explore more about Airflow DAGs and best practices on the Apache Airflow documentation.

You may also like