Airflow DAG: Check Weather API & Pull Data - Initial Release
Let's dive into creating an Airflow DAG that checks the availability of a Weather Open API and pulls weather data. This initial release sets the foundation for automating weather data collection, which can be incredibly useful for various applications, from weather forecasting to data analysis and more. This is a step-by-step guide to help you set up your first DAG. We will explore the necessary tasks, configurations, and code snippets to make this happen. So, grab your coding gloves and let's get started!
Setting Up the DAG
First things first, we need to set up our Directed Acyclic Graph (DAG) in Airflow. A DAG is a collection of tasks you want to run, organized in a way that reflects their dependencies. Here's how we'll structure our DAG:
- Import necessary libraries: We'll start by importing the libraries we need, such as
airflow,DAG,BashOperator,PythonOperator, andrequests. These libraries are essential for defining the DAG, creating tasks, and making HTTP requests. - Define default arguments: Next, we'll define default arguments for our DAG. These arguments include the owner, start date, retries, and retry delay. Setting these defaults ensures that our DAG runs smoothly and is easy to maintain.
- Instantiate the DAG: We'll then instantiate our DAG with a unique
dag_id, our default arguments, a description, and a schedule interval. Thedag_idshould be descriptive, and the schedule interval determines how often the DAG runs. For example, you might want it to run daily, weekly, or at specific times.
from airflow import DAG
from airflow.operators.bash_operator import BashOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
def check_api_availability():
try:
response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
return True
except requests.exceptions.RequestException as e:
print(f"API is not available: {e}")
return False
def get_weather_data():
try:
response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
response.raise_for_status()
data = response.json()
print(f"Weather data: {data}")
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching weather data: {e}")
return None
defaul_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'weather_data_pipeline',
default_args=default_args,
description='A DAG to check weather API availability and pull weather data',
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
Task 1: Check API Availability
The first task in our DAG is to check if the Weather Open API is available. This ensures that our subsequent tasks don't fail due to API downtime. We'll use a PythonOperator to execute a Python function that makes an HTTP request to the API and checks the response. If the API is available, the function returns True; otherwise, it returns False.
- Define the Python function: We'll define a Python function that uses the
requestslibrary to make an HTTP GET request to the Weather Open API endpoint. The function checks the HTTP status code of the response. A status code of 200 indicates that the API is available. - Handle exceptions: We'll also include error handling to catch any exceptions that might occur during the API request. This is important for ensuring that our DAG doesn't fail due to network issues or other unexpected errors.
- Create the
PythonOperator: We'll create aPythonOperatorthat calls our Python function. Thetask_idshould be descriptive, and thepython_callableargument should be set to our Python function.
check_api_task = PythonOperator(
task_id='check_api_availability',
python_callable=check_api_availability,
dag=dag,
)
Task 2: Pull Weather Data
Once we've confirmed that the API is available, the next step is to pull the weather data. We'll again use a PythonOperator to execute a Python function that makes an HTTP request to the API and retrieves the weather data. The function parses the JSON response and extracts the relevant information.
- Define the Python function: We'll define a Python function that uses the
requestslibrary to make an HTTP GET request to the Weather Open API endpoint. The function parses the JSON response and extracts the relevant information, such as temperature, humidity, and wind speed. - Handle exceptions: We'll also include error handling to catch any exceptions that might occur during the API request. This is important for ensuring that our DAG doesn't fail due to network issues or other unexpected errors.
- Create the
PythonOperator: We'll create aPythonOperatorthat calls our Python function. Thetask_idshould be descriptive, and thepython_callableargument should be set to our Python function. We also setprovide_context=Trueso that we can pass information between tasks using XComs.
get_weather_task = PythonOperator(
task_id='get_weather_data',
python_callable=get_weather_data,
dag=dag,
)
Setting Task Dependencies
Now that we've defined our tasks, we need to set their dependencies. This tells Airflow the order in which to run the tasks. In our case, we want to check the API availability before pulling the weather data. We can set this dependency using the set_downstream method.
check_api_task >> get_weather_task
This ensures that the get_weather_task only runs if the check_api_task succeeds. This is important for preventing errors and ensuring that our DAG runs smoothly.
Complete DAG Code
Here's the complete code for our DAG:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta
import requests
def check_api_availability():
try:
response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
response.raise_for_status() # Raises HTTPError for bad responses (4xx or 5xx)
return True
except requests.exceptions.RequestException as e:
print(f"API is not available: {e}")
return False
def get_weather_data():
try:
response = requests.get("https://api.openweathermap.org/data/2.5/weather?q=London&appid=YOUR_API_KEY")
response.raise_for_status()
data = response.json()
print(f"Weather data: {data}")
return data
except requests.exceptions.RequestException as e:
print(f"Error fetching weather data: {e}")
return None
defaul_args = {
'owner': 'airflow',
'start_date': datetime(2024, 1, 1),
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'weather_data_pipeline',
default_args=default_args,
description='A DAG to check weather API availability and pull weather data',
schedule_interval=timedelta(days=1),
catchup=False
) as dag:
check_api_task = PythonOperator(
task_id='check_api_availability',
python_callable=check_api_availability,
dag=dag,
)
get_weather_task = PythonOperator(
task_id='get_weather_data',
python_callable=get_weather_data,
dag=dag,
)
check_api_task >> get_weather_task
Testing the DAG
Before deploying our DAG to a production environment, it's important to test it thoroughly. Here's how we can test our DAG:
- Run the DAG manually: We can manually trigger the DAG in the Airflow UI to see if it runs successfully. This allows us to quickly identify any errors or issues.
- Check the logs: We can check the logs for each task to see if there are any errors or warnings. The logs provide valuable information about what's happening during the execution of the DAG.
- Verify the output: We can verify that the weather data is being pulled correctly by checking the output of the
get_weather_datatask. This ensures that our DAG is working as expected.
Conclusion
This initial release provides a solid foundation for automating weather data collection using Airflow. By checking API availability and pulling weather data, we can ensure that our DAG runs smoothly and provides us with the information we need. From here, you can expand upon this foundation by adding more tasks, such as storing the weather data in a database or visualizing it in a dashboard. The possibilities are endless! You can explore more about Airflow DAGs and best practices on the Apache Airflow documentation.