Real-Time Air Quality Monitoring Made Simple: Spark and Hugging Face (gpt-oss)

I would like to create an alerting system which has the data source as multiple streams combined to form single data source that is provided to my application as input and it is updated regularly.

My goal from this project is to —

  • Collect the real time updates from multiple data sources.
  • Read them using Apache Spark Structured Streaming
  • Aggregates in short time windows and flags unhealthy air
  • Personalize the risk for a person registered as profile whose health data is available to the application
  • Generates a short human readable alert with a tiny open source chat model
  • Writes alerts to CSV (to be simple) and shows a live chart and latest data in table
  • Stops after approx. 90 seconds

Step : 1 — Install the necessary packages

!pip -q install pyspark transformers accelerate datasets sentencepiece
!pip install -U transformers kernels torch

Step: 2 — Create directory for the streaming data. Let us mock it as of now

BASE = "/content/aqi_stream"
SRC_A, SRC_B, OUT_DIR = f"{BASE}/src_a", f"{BASE}/src_b", f"{BASE}/alerts_out"
for d in [SRC_A, SRC_B, OUT_DIR]: pathlib.Path(d).mkdir(parents=True, exist_ok=True)

Personalize the profile. Let us keep that in the form of JSON and emit some mock events so that the apache streams can capture it. In real life you’d use Kafka, sockets, or a live API.

profile = {
"name": "Danny",
"age": 9,
"is_child": False,
"has_respiratory_condition": True,
"city": "Schaumburg"
}
pathlib.Path(BASE).mkdir(parents=True, exist_ok=True)
with open(f"{BASE}/profile.json","w") as f: json.dump(profile, f, indent=2)

cities = ["Northbrook","Chicago","Evanston","Glenview","Schaumburg"]
def emit_event(dirp):
city = random.choice(cities)
pm25 = max(5, int(random.gauss(35, 25))) # pm2.5 variation
aqi = max(5, int(pm25*1.3 + random.randint(-10,10)))# simple AQI proxy
evt = {
"location": city,
"pm25": pm25,
"aqi": aqi,
"timestamp": datetime.utcnow().isoformat() + "Z"
}
with open(f"{dirp}/{datetime.utcnow().timestamp()}_{random.randint(0,9999)}.json","w") as f:
f.write(json.dumps(evt))

# Seed some initial files
for _ in range(12):
emit_event(SRC_A); emit_event(SRC_B)
time.sleep(1)

Randomizes PM2.5 and AQI so the stream looks “alive”. Occasionally injects “bad” values so thresholds are crossed → alerts appear.

Step: 3 — Start the Apache Spark using the python library and define the streams


from pyspark.sql import SparkSession, functions as F, types as T

spark = SparkSession.builder.appName("AQIStreamingDemo").getOrCreate()
spark.sparkContext.setLogLevel("WARN")

schema = T.StructType([
T.StructField("location", T.StringType()),
T.StructField("pm25", T.IntegerType()),
T.StructField("aqi", T.IntegerType()),
T.StructField("timestamp", T.StringType())
])

stream_a = spark.readStream.schema(schema).json(SRC_A)
stream_b = spark.readStream.schema(schema).json(SRC_B)

events = stream_a.unionByName(stream_b) \
.withColumn("event_time", F.to_timestamp("timestamp"))
  • A schema lets Spark parse JSON efficiently and catch malformed data.
  • unionByName merges both streams into one logical pipeline.
  • We convert ISO timestamp → event_time (actual Spark timestamp type) to use time windows.

Step: 4 — Aggregate the events for the window time of 2 minutes and group them by 5 minutes. Also Add the thresholds to filter the alertable events. This can all be done using spark

  • Windowing groups events that occur in the same minute (or whatever you set).
  • Watermark tells Spark how long to wait for late data and how to clean state.
  • We compute averages and counts to smooth noisy readings and make threshold decisions more robust.

# 5-minute tumbling window (short for demo)
windowed = (events
.withWatermark("event_time", "2 minutes")
.groupBy(F.window("event_time", "5 minutes"), F.col("location"))
.agg(
F.avg("aqi").alias("avg_aqi"),
F.max("aqi").alias("max_aqi"),
F.avg("pm25").alias("avg_pm25"),
F.count("*").alias("cnt")
))

# Trigger alerts if averages cross thresholds
thresholded = (windowed
.withColumn("flag", (F.col("avg_aqi") >= 100) | (F.col("avg_pm25") >= 55))
.filter("flag = true"))
  • Only unhealthy windows become alerts.
  • The thresholds are tunable — lower them to see more alerts, raise them for stricter alerts.

Step: 5 — Now it is time to use our LLM. For this as I’m using GPU to run, I have used the new open source model from Open AI on Hugging Face.

from transformers import pipeline
import json, pandas as pd, os
from datetime import datetime

# A tiny instruct/chat model that runs fast on Colab
MODEL_NAME = "openai/gpt-oss-20b" # swap if you like another tiny instruct model

nlp = pipeline(
"text-generation",
model=MODEL_NAME,
tokenizer=MODEL_NAME,
device_map="auto", # will use the GPU
torch_dtype="auto", # let it pick FP16/bfloat16
trust_remote_code=True, # some new models need this
max_new_tokens=64,
temperature=0.4
)

with open(f"{BASE}/profile.json") as f:
PROFILE = json.load(f)

def personalize_and_generate(loc:str, avg_aqi:float, avg_pm25:float, cnt:int):
# Baseline risk tiers from thresholds
risk = "Moderate"
if avg_aqi >= 150 or avg_pm25 >= 100: risk = "High"
elif avg_aqi >= 100 or avg_pm25 >= 55: risk = "Elevated"

# Personalization boost for kids / sensitive groups
if PROFILE.get("is_child") or PROFILE.get("has_respiratory_condition"):
if risk == "Moderate": risk = "Elevated"
elif risk == "Elevated": risk = "High"

prompt = (
f"You are a family health assistant. AQI={avg_aqi:.0f}, PM2.5={avg_pm25:.0f} in {loc}. "
f"Child: {PROFILE.get('name','the child')} (age {PROFILE.get('age',5)}). "
f"Risk level: {risk}. "
f"Give a short, friendly alert with 1–2 concrete actions (e.g., limit outdoor time, use purifier). "
f"Keep it under 2 sentences."
)
text = nlp(prompt)[0]["generated_text"]
return risk, text

As I have generated the personalized prompt with the augmented data from the profile and the alertable events. Now let us have some helper method to frame the data better using Pandas with each row having the necessary data and send to alerts.csv file.

def process_batch(df, epoch_id:int):
pdf = df.toPandas()
if pdf.empty: return
rows = []
for _, r in pdf.iterrows():
risk, msg = personalize_and_generate(
r["location"], float(r["avg_aqi"]), float(r["avg_pm25"]), int(r["cnt"])
)
rows.append({
"ts": datetime.utcnow().isoformat() + "Z",
"location": r["location"],
"avg_aqi": float(r["avg_aqi"]),
"avg_pm25": float(r["avg_pm25"]),
"events": int(r["cnt"]),
"risk": risk,
"message": msg
})
out_df = pd.DataFrame(rows)
out_csv = f"{OUT_DIR}/alerts.csv"
header = not os.path.exists(out_csv)
out_df.to_csv(out_csv, mode="a", index=False, header=header)
print("\n=== New Alerts ===")
print(out_df.to_string(index=False))
  • foreachBatch is the common pattern to bridge Spark → external systems (DBs, APIs, services).
  • We keep it simple: append to CSV + print a summary line.
  • In production, you’d publish to Kafka, call a microservice, or write to a database.

Step: 6 — Now that the data is present in alerts.csv. we can start the streaming query. This is the engine that keeps processing new files as they arrive.

query = (thresholded
.writeStream
.foreachBatch(process_batch)
.outputMode("update")
.start())

Step:7 — Now let us create the dashboard to view the live data using matplotlib

# ---- simple live dashboard in THIS cell (matplotlib) ----
import matplotlib.pyplot as plt
from IPython.display import clear_output, display
plt.figure(figsize=(8,4))

start = time.time()
tick = 0
print("Streaming & visualizing... (this cell will run ~{}s)".format(DURATION_SEC))
while time.time() - start < DURATION_SEC:
# keep feeding events so windows fill
emit_event(SRC_A); emit_event(SRC_B)
if tick % 3 == 0:
# occasionally push some "bad" events to guarantee alerts
emit_event(SRC_A, city="Northbrook", pm25=120, aqi=180)
tick += 1

# draw simple chart from CSV (if exists)
clear_output(wait=True)
print("Streaming & visualizing... ({}s left)".format(int(DURATION_SEC - (time.time()-start))))
if os.path.exists(CSV_PATH):
try:
df = pd.read_csv(CSV_PATH)
if not df.empty:
df["ts"] = pd.to_datetime(df["ts"])
df = df.sort_values("ts").tail(200)
# plot avg_aqi time series (all locations combined, last 200)
plt.cla()
plt.plot(df["ts"], df["avg_aqi"])
plt.axhline(AQI_TH) # threshold line
plt.title("Avg AQI of Alerted Windows (last 200)")
plt.xlabel("Time"); plt.ylabel("Avg AQI")
plt.xticks(rotation=30)
display(plt.gcf())
# also show the last few alerts
display(df.tail(6)[["ts","location","avg_aqi","avg_pm25","risk","message"]])
else:
print("alerts.csv exists but is empty...")
except Exception as e:
print("Reading alerts.csv...", e)
else:
print("Waiting for alerts.csv…")
time.sleep(REFRESH_SEC)
  • Emits events every tick (so the stream has data)
  • Every few ticks, injects a “bad” event to ensure alerts
  • Reads alerts.csv and draws a simple line chart (matplotlib)
  • Also prints a tail of latest alerts (with the model’s message)
  • Loops until DURATION_SEC then stops the stream

I have given some parameters to run the stream events for 90 seconds. After this the query would be stopped.

try:
query.stop()
except Exception:
pass

print("\nDone. Alerts saved to:", CSV_PATH)
if os.path.exists(CSV_PATH):
tail = pd.read_csv(CSV_PATH).tail(10)
print("\nLast few alerts:")
print(tail[["ts","location","avg_aqi","avg_pm25","risk"]].to_string(index=False))

Stop the stream cleanly and save the CSV

Here is the output

If you would like to run it on CPU with a different light weight LLM, you can use “Qwen/Qwen2.5–0.5B-Instruct”. I have provided this in my Python notebook

Next Steps —

  1. Switch file sources → Kafka: use Spark’s Kafka connector (spark-sql-kafka-0-10).
  2. Separate inference service: host the HF model behind FastAPI, call it from foreachBatch.
  3. Dashboard: push alerts into a DB (e.g., Postgres) and visualize with Grafana or a small React app.
  4. Predictive alerts: train a tiny regressor on recent windows to predict AQI 1–3 hours ahead.
  5. A/B test messages: rotate phrasing templates, track click/open/ack metrics.
  6. Schema & quality: add a schema registry or Pydantic validation to reject bad events.

Happy Learning!!