Creative Codes
← All insights
AutomationJune 22, 202610 min read

ETL Without the Engineering Tax: Syncing Data Between APIs, Databases, and Warehouses

ETL pipelines have a reputation for becoming unmaintainable. They don't have to be. Here's how to build data sync pipelines that handle schema changes, API rate limits, and incremental updates without collapsing under their own weight.

Muhammad Hassan

Founder, Creative Codes. 8 years on backends; last 3 deep on AI agents, RAG pipelines, and production scraping. Python, LangGraph, Playwright, n8n, FastAPI.

ETL pipelines have a reputation for becoming the most expensive thing to maintain in a data stack. They break silently, they're hard to debug when they do, and the "quick fix" approach to building them turns into months of technical debt. The underlying problems are usually the same: no idempotency, no schema change handling, no incremental logic, and no observability.

This post covers the patterns that keep ETL pipelines maintainable: incremental extraction, idempotent loading, schema change detection, and the tooling choices that reduce operational burden.

The two failure modes of naive ETL

Failure mode 1: Full re-sync every run. The simplest ETL pattern — delete everything, re-pull from source, reload — works until the source has 5M records and the re-pull takes 3 hours. Then a network blip halfway through leaves the destination in a broken half-loaded state. Or the source rate-limits you. Or the destination has downstream consumers who are now looking at a partially loaded table.

Failure mode 2: No change tracking. Running a delta sync without proper change tracking means you either miss updates (you only capture new rows, not modified ones) or you re-process rows that haven't changed (wasted compute and idempotency failures).

The correct pattern for production ETL is incremental extraction with idempotent loading.

Incremental extraction

For most source systems, you need to track what you've already processed. Two strategies:

High-water mark: track the updated_at timestamp of the last processed record. On each run, pull only records where updated_at > last_high_water_mark.

python
import psycopg2
from datetime import datetime

def extract_incremental(source_conn, table: str, last_synced_at: datetime) -> list[dict]:
    with source_conn.cursor() as cur:
        cur.execute(
            f"SELECT * FROM {table} WHERE updated_at > %s ORDER BY updated_at ASC",
            (last_synced_at,),
        )
        columns = [desc[0] for desc in cur.description]
        rows = cur.fetchall()
        return [dict(zip(columns, row)) for row in rows]

def get_high_water_mark(state_conn, pipeline_id: str) -> datetime:
    with state_conn.cursor() as cur:
        cur.execute(
            "SELECT last_synced_at FROM pipeline_state WHERE pipeline_id = %s",
            (pipeline_id,),
        )
        result = cur.fetchone()
        return result[0] if result else datetime(2020, 1, 1)

def update_high_water_mark(state_conn, pipeline_id: str, new_mark: datetime):
    with state_conn.cursor() as cur:
        cur.execute(
            """INSERT INTO pipeline_state (pipeline_id, last_synced_at)
               VALUES (%s, %s)
               ON CONFLICT (pipeline_id) DO UPDATE SET last_synced_at = EXCLUDED.last_synced_at""",
            (pipeline_id, new_mark),
        )
        state_conn.commit()

Store the high-water mark in a separate state table, not in memory or a file. A deployment or crash between syncs should not lose the position.

CDC (Change Data Capture): for databases that support it (PostgreSQL with logical replication, MySQL with binlog), CDC captures every row change at the database level — including updates and deletes. The extraction tool subscribes to the database's change stream and processes events in order. Debezium is the standard open-source CDC tool; Fivetran and Airbyte both support CDC connectors for major databases.

CDC is more complex to set up but more complete: high-water mark misses hard-deletes (deleted rows have no updated_at). If you need to propagate deletes to the destination, CDC is the only reliable option.

Idempotent loading

An ETL pipeline that fails halfway through must be re-runnable without corrupting data. This requires idempotent load operations.

For SQL destinations, INSERT ... ON CONFLICT DO UPDATE (PostgreSQL's upsert) is the standard pattern:

python
def upsert_records(dest_conn, table: str, records: list[dict], primary_key: str):
    if not records:
        return

    columns = list(records[0].keys())
    placeholders = ", ".join(["%s"] * len(columns))
    update_clause = ", ".join(
        [f"{col} = EXCLUDED.{col}" for col in columns if col != primary_key]
    )

    query = f"""
        INSERT INTO {table} ({', '.join(columns)})
        VALUES ({placeholders})
        ON CONFLICT ({primary_key}) DO UPDATE SET {update_clause}
    """

    with dest_conn.cursor() as cur:
        cur.executemany(query, [list(r.values()) for r in records])
    dest_conn.commit()

For data warehouses (BigQuery, Redshift, Snowflake), the pattern is typically load-to-staging then merge:

sql
-- Load to staging table
COPY INTO staging_orders FROM @s3_stage/orders_delta_20260622.parquet;

-- Merge into production table
MERGE INTO orders AS target
USING staging_orders AS source
ON target.order_id = source.order_id
WHEN MATCHED THEN UPDATE SET ...
WHEN NOT MATCHED THEN INSERT ...;

The staging-then-merge pattern is atomic at the warehouse level and avoids partial updates in the production table.

Schema change handling

Source schemas change. A column gets renamed, a new field is added, a data type changes. Pipelines that assume a fixed schema break silently or loudly when this happens.

Defensive approach: detect schema changes and alert before loading:

python
def detect_schema_changes(
    current_schema: dict,
    stored_schema: dict,
    pipeline_id: str,
) -> list[str]:
    changes = []

    for col, dtype in current_schema.items():
        if col not in stored_schema:
            changes.append(f"NEW COLUMN: {col} ({dtype})")
        elif stored_schema[col] != dtype:
            changes.append(f"TYPE CHANGE: {col} ({stored_schema[col]} → {dtype})")

    for col in stored_schema:
        if col not in current_schema:
            changes.append(f"REMOVED COLUMN: {col}")

    return changes

When schema changes are detected, the pipeline should pause and alert rather than silently loading incompatible data. A new column is usually safe to add to the destination schema automatically. A type change or column removal requires human review.

API rate limits and pagination

Extracting from REST APIs introduces two constraints that database-to-database syncs don't have: rate limits and pagination.

For rate-limited APIs, track request counts and back off proactively:

python
import time
from typing import Iterator

def paginate_api(
    fetch_fn,
    rate_limit_per_minute: int = 100,
    page_size: int = 100,
) -> Iterator[list[dict]]:
    page = 1
    request_count = 0
    window_start = time.time()

    while True:
        # Rate limit enforcement
        if request_count >= rate_limit_per_minute:
            elapsed = time.time() - window_start
            if elapsed < 60:
                time.sleep(60 - elapsed)
            window_start = time.time()
            request_count = 0

        data = fetch_fn(page=page, per_page=page_size)
        request_count += 1

        if not data:
            break

        yield data
        page += 1

For cursor-based pagination (common in modern APIs), store the cursor alongside the high-water mark so syncs can resume from where they stopped rather than restarting from page 1.

Monitoring and alerting

An ETL pipeline without observability is a pipeline you won't trust. Minimum instrumentation:

  • Row count per run (significant drops indicate extraction failures or source issues)
  • Last successful run timestamp (alert if a pipeline hasn't run in 2x its expected interval)
  • Load duration (alert on significant increases — a 10-minute sync that starts taking 2 hours is a problem)
  • Validation failures (alert if data quality checks fail on the loaded records)

Log all of these to a pipeline_runs table. A Grafana dashboard or a simple Slack alert on anomalies turns an opaque background process into something you can actually monitor.

Handling deletes

Hard deletes — where a record is removed from the source rather than soft-deleted — are the hardest part of incremental ETL. A high-water mark approach only catches rows that changed; a deleted row leaves no trace.

Three strategies:

Soft-delete convention: agree with the source team that rows are never hard-deleted. Instead, a deleted_at timestamp is set. The destination includes deleted_at and filters it out at query time. This is the simplest solution but requires source-side cooperation.

Full refresh for small tables: for tables under ~100K rows that change frequently and have hard-deletes, a periodic full refresh (daily or weekly) is simpler than tracking deletes. Accept the overhead.

Tombstone via CDC: if the source database supports CDC, deletes appear as tombstone events in the change stream. The destination receives a DELETE event and can act on it. This is the correct approach for large tables with hard-deletes, but requires CDC infrastructure.

For most pipelines, a combination works: CDC or soft-deletes for the main transactional tables, periodic full refresh for small reference tables (countries, categories, settings).

Error handling and partial failures

ETL pipelines fail in the middle. An API call returns a 500, a network connection drops during a bulk insert, a transformation hits an unexpected null. The pipeline must handle partial failures without corrupting state.

Two rules:

  1. Don't update the high-water mark until the batch is confirmed loaded. If you advance the mark before the load completes and then the load fails, you've skipped records permanently.
  2. Log failures at the record level, not just the batch level. If 999 of 1000 records load successfully and one fails, log which record failed and why. A batch-level failure log tells you something failed; a record-level log tells you what to fix.
python
def load_with_error_tracking(
    dest_conn,
    records: list[dict],
    pipeline_id: str,
) -> tuple[int, int]:
    success_count = 0
    failure_count = 0

    for record in records:
        try:
            upsert_records(dest_conn, "orders", [record], "order_id")
            success_count += 1
        except Exception as e:
            log_record_failure(pipeline_id, record.get("order_id"), str(e))
            failure_count += 1

    return success_count, failure_count

For pipelines with low failure tolerance, fail the entire batch on the first error (transactional approach). For pipelines where a few failures are acceptable (high-volume, best-effort), log and continue. Know which mode your pipeline should operate in before building the error handling.

When to use an ETL tool vs building custom

Airbyte, Fivetran, and dbt handle a large subset of the patterns described above. For syncing between mainstream data sources (Salesforce, HubSpot, Stripe, Postgres, Snowflake) without custom transformation logic, a managed connector is faster to set up and cheaper to maintain than custom code.

Custom ETL code earns its keep when: the source doesn't have a connector (proprietary APIs, scraped data, non-standard data formats), the transformation logic is complex enough that dbt models become unwieldy, or the destination is a custom system that no off-the-shelf tool supports. The patterns in this post apply equally to both — whether you're building from scratch or extending a tool like dbt with custom logic.


If you're building a data sync pipeline and need it to handle schema changes, rate limits, and incremental updates reliably, tell us about the scope.

Related: Building Production n8n Workflows: Architecture, Error Handling, Deployment | End-to-End Data Pipeline: Scraping, ML, and Automation in One System

Data Sync and ETL services →

Related service

Need complex n8n workflows built to production standards?

AI Workflow Automation

We publish new posts every few weeks. See more on the insights page.