End-to-End Data Pipeline: From Web Scraping to ML Insights to Automated Alerts
Most teams have a scraper, a model, and some automation running independently. Here's how to wire them together as one system — and why the connections matter as much as the components.
Founder, Creative Codes. 8 years on backends; last 3 deep on AI agents, RAG pipelines, and production scraping. Python, LangGraph, Playwright, n8n, FastAPI.
Most teams have a scraper, a model, and some automation running as three separate tools. The value — and the failures — live in the connections between them. Here's how to wire them into one system end-to-end.
What "intelligent data pipeline" actually means
The term gets used loosely. For us, it means three connected layers:
- Data collection — web scraping, API polling, or file ingestion. Raw data, structured on the way out.
- ML processing — classification, scoring, extraction, or anomaly detection. Structured data transformed into decisions.
- Automation — routing the decision to where it matters: Slack alert, CRM update, email trigger, database record.
The insight is that each layer is useless without the others. A scraper that dumps data into a CSV nobody reads is a cost center. An ML model with no data source is a demo. Automation that fires on incorrect signals creates noise, not value. The value lives in the connections.
The example: competitor price monitoring
We'll use a pipeline we've built in several variations for e-commerce clients: monitor competitor product prices, score pricing anomalies with ML, and route high-confidence alerts to the right team.
The full system:
Daily Schedule (8am)
↓
Layer 1: Playwright scraper → 500 product URLs
↓
Postgres staging table: raw_price_records
↓
Layer 2: ML anomaly detector → scores each record
↓
Postgres output table: scored_price_changes
↓
Layer 3: n8n workflow → filters by score + routes alerts
↓
Slack (#pricing-ops) + CRM note + email digestLayer 1: Data collection
The scraper's job is to produce clean, schema-validated records. Not raw HTML. Not "whatever the CSS selector returned." Structured records that the ML layer can trust.
The schema for this pipeline:
from pydantic import BaseModel, validator
from decimal import Decimal
from datetime import datetime
from typing import Optional
class PriceRecord(BaseModel):
product_id: str
competitor: str
url: str
price_usd: Decimal
in_stock: bool
scraped_at: datetime
raw_price_text: str # keep for debugging
@validator("price_usd")
def price_must_be_positive(cls, v):
if v <= 0:
raise ValueError("price must be positive")
return vThe scraper validates every record against this schema before writing to Postgres. Records that fail validation go to a failed_records table for investigation. Nothing invalid reaches the ML layer.
For anti-detection on e-commerce sites, see How We Scrape at Scale Without Getting Blocked. The principles are the same regardless of what the downstream layers look like.
The staging table pattern
We don't write directly from scraper to processed output. The staging table (raw_price_records) serves as a buffer and audit log. If the ML layer changes, you can reprocess historical records. If something goes wrong, you have the raw data to investigate. The staging table is append-only.
CREATE TABLE raw_price_records (
id SERIAL PRIMARY KEY,
product_id VARCHAR(64),
competitor VARCHAR(64),
url TEXT,
price_usd NUMERIC(10,2),
in_stock BOOLEAN,
scraped_at TIMESTAMPTZ DEFAULT NOW(),
raw_price_text TEXT,
processed BOOLEAN DEFAULT FALSE
);Layer 2: ML processing
The ML layer reads unprocessed records from the staging table, scores each one, and writes results to the output table. It runs as a Python service — not a code node in n8n, not a notebook.
For this pipeline, the ML task is anomaly detection: given today's price and a 30-day price history, how unusual is this price? We use a simple but effective approach: compute the z-score against historical prices, then apply a calibrated threshold.
import numpy as np
from decimal import Decimal
def score_price_change(
current_price: Decimal,
price_history: list[Decimal],
product_id: str
) -> dict:
if len(price_history) < 7:
return {"score": 0.0, "reason": "insufficient_history", "action": "skip"}
history_arr = np.array([float(p) for p in price_history])
mean = np.mean(history_arr)
std = np.std(history_arr)
if std == 0:
# Price has never changed — any change is notable
if float(current_price) != mean:
return {"score": 0.9, "reason": "first_price_change", "action": "alert"}
return {"score": 0.0, "reason": "stable_price", "action": "skip"}
z_score = abs(float(current_price) - mean) / std
direction = "decrease" if float(current_price) < mean else "increase"
pct_change = (float(current_price) - mean) / mean * 100
# Calibrate: z-score > 2 = notable, > 3 = alert
if z_score > 3:
score = min(0.99, 0.7 + (z_score - 3) * 0.1)
action = "alert"
elif z_score > 2:
score = 0.4 + (z_score - 2) * 0.3
action = "monitor"
else:
score = z_score * 0.2
action = "skip"
return {
"score": round(score, 3),
"z_score": round(z_score, 2),
"direction": direction,
"pct_change": round(pct_change, 1),
"action": action,
"mean_30d": round(mean, 2),
}The scored output goes to a separate table with a foreign key to the staging record:
CREATE TABLE scored_price_changes (
id SERIAL PRIMARY KEY,
raw_record_id INTEGER REFERENCES raw_price_records(id),
product_id VARCHAR(64),
anomaly_score NUMERIC(5,3),
direction VARCHAR(16),
pct_change NUMERIC(7,2),
action VARCHAR(16), -- 'alert', 'monitor', 'skip'
scored_at TIMESTAMPTZ DEFAULT NOW()
);The ML service processes records in batches and marks raw_price_records.processed = TRUE after scoring. This prevents double-processing on re-runs.
Why not use an LLM here?
A common question: why not just ask GPT-4 "is this price change unusual?" For this task: latency, cost, and reliability. An LLM call per record at 500 records/day costs $3-5/day vs $0.001/day for a Python function. The statistical approach is also more consistent: the LLM might reason differently about "unusual" on different days.
LLMs are the right tool when the task requires language understanding or reasoning about unstructured input. For numerical anomaly detection on structured data, a well-calibrated statistical model is faster, cheaper, and more auditable.
Layer 3: Automation
The n8n workflow reads high-confidence alerts (action = 'alert' AND anomaly_score > 0.7) from the output table and routes them:
Schedule Trigger (every 30 min)
↓
Postgres: SELECT * FROM scored_price_changes
WHERE action = 'alert'
AND anomaly_score > 0.7
AND alerted = FALSE
↓
IF: records found
↓
Split into items → for each alert:
├── Slack: post to #pricing-ops with product, competitor, pct_change
├── CRM HTTP Request: add note to product record
└── Postgres: UPDATE scored_price_changes SET alerted = TRUE
↓
IF: high-value products (product tier = 'A')
→ Email digest: send summary to pricing team leadThe alerted = FALSE check is the idempotency guard. Even if the workflow fires every 30 minutes, each alert is sent exactly once.
The Slack message format matters. "Price anomaly detected" is useless. We send:
🔴 [competitor] dropped price on [product] by 12.3%
Current: $89.99 | 30-day avg: $102.40 | Score: 0.91
URL: [link]
Action: Review and decide on response pricingThe automation layer doesn't make the pricing decision. It surfaces the right information to the right person at the right time. That's the job.
Infrastructure: tying it together
All three layers run in Docker containers on the same host for this scale:
# docker-compose.yml
services:
scraper:
build: ./scraper
environment:
- POSTGRES_URL=${POSTGRES_URL}
- PROXY_URL=${PROXY_URL}
restart: unless-stopped
ml-scorer:
build: ./ml
environment:
- POSTGRES_URL=${POSTGRES_URL}
restart: unless-stopped
n8n:
image: n8nio/n8n
environment:
- DB_TYPE=postgresdb
- DB_POSTGRESDB_DATABASE=n8n
ports:
- "5678:5678"
volumes:
- n8n_data:/home/node/.n8n
restart: unless-stopped
postgres:
image: postgres:16-alpine
volumes:
- pg_data:/var/lib/postgresql/data
restart: unless-stoppedA single $48/month VPS handles this comfortably at 500 products/day. The scraper and ML service share the Postgres instance. n8n reads from the same database it uses for workflow state.
Redis is optional here — the Postgres staging table acts as a simple queue. For higher-throughput pipelines (50K+ records/day), add Redis + BullMQ between layers and run multiple ML worker instances in parallel.
What changes at scale
| Tier | Daily volume | Architecture change | |---|---|---| | Small | 1K-10K records | Single VPS, Postgres queue | | Medium | 10K-100K records | Redis queue between layers, 2-4 ML workers | | Large | 100K-1M records | Dedicated scraping cluster, ML inference on GPU, Kafka or SQS between layers | | Very large | 1M+ records | Full distributed architecture, horizontal scraper scaling |
The ML model doesn't change much across scales. What changes is the infrastructure: how data flows between layers, how failures are handled, and how you scale each layer independently.
Why decoupled layers beat a monolith
The architectural principle here is that each layer should be able to fail without taking down the others. If the ML service crashes at 2am, the scraper keeps collecting data into the staging table. When ML recovers, it processes the backlog. Nothing is lost.
If you wire these layers together with direct function calls (scraper calls ML inline, ML posts to Slack directly), a failure in any layer breaks the whole system. Decoupled layers with a shared database in between give you failure isolation, independent scaling, and the ability to swap out any layer (different scraper, better model, different alerting) without touching the others.
The BizBuySell Acquisition Intelligence Platform is a real example of this architecture: scraping + ML scoring + n8n alerting, running daily for an acquisition research client.
If you need a data pipeline that collects, processes, and acts on data automatically, let's scope it.
Related: How We Scrape at Scale Without Getting Blocked | Building Production n8n Workflows
Web Scraping services → | AI & Machine Learning services → | Workflow Automation →
Related service
Need a scraping, ML, and automation system built as one pipeline?
Data Pipeline Engineering →We publish new posts every few weeks. See more on the insights page.