In this article, we build an automated pipeline that integrates the OpenWeather API and AeroDataBox API, stores results in MySQL, and schedules recurring runs — with clean Python patterns for secrets and orchestration.

The Importance of Database Pipelines
Modern systems depend on continuous data. Pipelines standardize collection, transformation, storage, and access, reducing manual toil and making analytics reliable.
What is a Database Pipeline?
- Data Collection — fetch from APIs or internal systems
- Transformation — clean/normalize
- Storage — persist in DB/warehouse
- Retrieval — query for analytics/apps
Architecture Overview
- Google Pub/Sub — event ingestion
- Cloud Dataflow — streaming transforms
- BigQuery — storage & analytics
- Cloud Functions — triggers
1) Data Collection — Cities via Wikipedia (BeautifulSoup)
from bs4 import BeautifulSoup
import requests
from lat_lon_parser import parse
import pandas as pd
def cities_data(cities):
city_data = []
for city in cities:
url = f"https://www.wikipedia.org/wiki/{city}"
r = requests.get(url)
soup = BeautifulSoup(r.content, 'html.parser')
latitude = soup.find(class_="latitude").get_text()
longitude = soup.find(class_="longitude").get_text()
country = soup.find(class_="infobox-data").get_text()
city_data.append({
"City": city,
"Country": country,
"Latitude": parse(latitude),
"Longitude": parse(longitude),
})
return pd.DataFrame(city_data)
2) Weather from OpenWeather
def get_weather_data(lat, lon, api_key):
url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units=metric"
r = requests.get(url)
if r.status_code == 200:
return r.json()
print(f"Failed to retrieve data: {r.status_code}")
return None
3) Flights from AeroDataBox
def get_flight_data(icao_list, api_key):
import pandas as pd
flight_items = []
for icao in icao_list:
url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/2024-10-10T00:00/2024-10-10T23:59"
headers = { 'x-rapidapi-key': api_key }
r = requests.get(url, headers=headers)
flights_json = r.json()
for item in flights_json["arrivals"]:
flight_items.append({
"arrival_airport_icao": icao,
"flight_number": item.get("number"),
"scheduled_arrival_time": item["arrival"]["scheduledTime"].get("local")
})
return pd.DataFrame(flight_items)
4) Persist to MySQL with SQLAlchemy
from sqlalchemy import create_engine
def connect_database(cfg):
conn = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/{cfg['database']}"
return create_engine(conn)
def push_to_database(df, table, engine):
df.to_sql(table, con=engine, if_exists='append', index=False)
Retrieve & Analyze
def retrieve_from_db(table, engine):
import pandas as pd
return pd.read_sql(f"SELECT * FROM {table}", con=engine)
engine = connect_database(config)
weather_df = retrieve_from_db('weather_data', engine)
Automation with cron
# open editor
crontab -e
# run daily at midnight
0 0 * * * /path/to/your/project/automation.sh
#!/bin/bash
LOG_FILE="automation.log"
echo "Fetching weather data..." >> $LOG_FILE
python3 main.py --weather >> $LOG_FILE 2>&1
echo "Fetching flight data..." >> $LOG_FILE
python3 main.py --flights >> $LOG_FILE 2>&1
notify-send "Automation Complete" "Weather and Flight data fetching completed."
Full project: GitHub →
Database Schema Overview
- cities — name, country, coordinates
- airports — ICAO, location
- weather_data — temp, humidity, conditions
- flight_data — flight number, scheduled arrival
Key Takeaways
- Pipelines reduce manual work and standardize data flow.
- Secure secrets; validate sources and schemas.
- Automate with cron or cloud schedulers.
Conclusion
Automated pipelines ensure fresh, reliable data for decision-making. With a small set of Python functions and a scheduler, you can ingest, store, and query real-time signals at scale.