Automating Real-Time Data Pipelines: Weather & Flights with Python + MySQL

By Janmajay Kumar · October 2024 · Data Engineering

In this article, we build an automated pipeline that integrates the OpenWeather API and AeroDataBox API, stores results in MySQL, and schedules recurring runs — with clean Python patterns for secrets and orchestration.

Real-time data pipeline diagram

The Importance of Database Pipelines

Modern systems depend on continuous data. Pipelines standardize collection, transformation, storage, and access, reducing manual toil and making analytics reliable.

What is a Database Pipeline?

Architecture Overview

1) Data Collection — Cities via Wikipedia (BeautifulSoup)

from bs4 import BeautifulSoup
import requests
from lat_lon_parser import parse
import pandas as pd

def cities_data(cities):
    city_data = []
    for city in cities:
        url = f"https://www.wikipedia.org/wiki/{city}"
        r = requests.get(url)
        soup = BeautifulSoup(r.content, 'html.parser')
        latitude = soup.find(class_="latitude").get_text()
        longitude = soup.find(class_="longitude").get_text()
        country = soup.find(class_="infobox-data").get_text()
        city_data.append({
            "City": city,
            "Country": country,
            "Latitude": parse(latitude),
            "Longitude": parse(longitude),
        })
    return pd.DataFrame(city_data)

2) Weather from OpenWeather

def get_weather_data(lat, lon, api_key):
    url = f"https://api.openweathermap.org/data/2.5/forecast?lat={lat}&lon={lon}&appid={api_key}&units=metric"
    r = requests.get(url)
    if r.status_code == 200:
        return r.json()
    print(f"Failed to retrieve data: {r.status_code}")
    return None

3) Flights from AeroDataBox

def get_flight_data(icao_list, api_key):
    import pandas as pd
    flight_items = []
    for icao in icao_list:
        url = f"https://aerodatabox.p.rapidapi.com/flights/airports/icao/{icao}/2024-10-10T00:00/2024-10-10T23:59"
        headers = { 'x-rapidapi-key': api_key }
        r = requests.get(url, headers=headers)
        flights_json = r.json()
        for item in flights_json["arrivals"]:
            flight_items.append({
                "arrival_airport_icao": icao,
                "flight_number": item.get("number"),
                "scheduled_arrival_time": item["arrival"]["scheduledTime"].get("local")
            })
    return pd.DataFrame(flight_items)

4) Persist to MySQL with SQLAlchemy

from sqlalchemy import create_engine

def connect_database(cfg):
    conn = f"mysql+pymysql://{cfg['user']}:{cfg['password']}@{cfg['host']}:{cfg['port']}/{cfg['database']}"
    return create_engine(conn)

def push_to_database(df, table, engine):
    df.to_sql(table, con=engine, if_exists='append', index=False)

Retrieve & Analyze

def retrieve_from_db(table, engine):
    import pandas as pd
    return pd.read_sql(f"SELECT * FROM {table}", con=engine)

engine = connect_database(config)
weather_df = retrieve_from_db('weather_data', engine)

Automation with cron

# open editor
crontab -e

# run daily at midnight
0 0 * * * /path/to/your/project/automation.sh
#!/bin/bash
LOG_FILE="automation.log"

echo "Fetching weather data..." >> $LOG_FILE
python3 main.py --weather >> $LOG_FILE 2>&1

echo "Fetching flight data..." >> $LOG_FILE
python3 main.py --flights >> $LOG_FILE 2>&1

notify-send "Automation Complete" "Weather and Flight data fetching completed."

Full project: GitHub →

Database Schema Overview

Key Takeaways

Conclusion

Automated pipelines ensure fresh, reliable data for decision-making. With a small set of Python functions and a scheduler, you can ingest, store, and query real-time signals at scale.