Case Study

Multi-Tenant Migration Engine

Zero-dependency Python 2PC orchestrator — runs SQL migrations in parallel across isolated SQLite tenant databases with fleet-wide commit or rollback.

Client: Gnomad CRM
  • Python
  • sqlite3
  • ThreadPoolExecutor
  • Two-Phase Commit
  • python
  • multi-tenant
  • crm
  • sqlite
  • distributed-systems
  • migration
  • zero-dependency

Overview

Building a CRM like Gnomad CRM introduces one of the hardest problems in SaaS architecture: tenant isolation. When every client has their own isolated database, deploying a single structural update (like adding a new column) means running migrations across dozens or hundreds of databases simultaneously.

If tenant #42 fails due to a lock or corruption, you cannot leave the system out of sync. You must roll back tenants #1 through #41 so the entire fleet maintains identical schemas.

This project implements a Two-Phase Commit (2PC) orchestrator. It uses Python’s concurrent.futures to execute SQL migrations in parallel across an array of independent SQLite databases. If a single node reports a failure, the engine automatically triggers a fleet-wide rollback.

Part of the DIY build series: Projects 1 · 2 · 3 · 4 · 5 · 6.

What it implements

  • Fleet discovery — scans a tenants/ directory for .db files
  • Phase 1 (Prepare) — concurrent BEGIN + migration script per tenant, no commit
  • Phase 2 (Commit/Rollback) — fleet-wide resolution if any node fails
  • Manual transaction controlisolation_level=None for explicit SQLite lifecycle
  • Thread-safe I/OThreadPoolExecutor with connection cleanup on panic paths

Project setup

1. Initialize the environment

mkdir fleet-migrator && cd fleet-migrator
mkdir tenants
touch migrate.py 001_add_subscription_tier.sql

2. Mock tenant databases

Create three identical databases representing three Gnomad CRM clients:

for i in {1..3}; do sqlite3 tenants/tenant_0$i.db "CREATE TABLE users (id INTEGER PRIMARY KEY, name TEXT);"; done

3. Migration SQL

Paste into 001_add_subscription_tier.sql:

-- Standard deployment: Add a subscription tier to the users table
ALTER TABLE users ADD COLUMN subscription_tier TEXT DEFAULT 'basic';

The code (migrate.py)

Standard library only — maps connections to threads, handles manual transaction lifecycles, and aggregates system states:

#!/usr/bin/env python3
import argparse
import os
import sqlite3
import sys
from concurrent.futures import ThreadPoolExecutor, as_completed

# --- ANSI Terminal Colors ---
CLR_RESET = "\033[0m"
CLR_RED = "\033[91m"
CLR_GREEN = "\033[92m"
CLR_YELLOW = "\033[93m"
CLR_CYAN = "\033[96m"
CLR_BOLD = "\033[1m"

def get_tenant_databases(directory: str) -> list:
    """Discovers all SQLite databases in the target directory."""
    if not os.path.isdir(directory):
        print(f"{CLR_RED}Error: Directory '{directory}' not found.{CLR_RESET}")
        sys.exit(1)

    dbs = [os.path.join(directory, f) for f in os.listdir(directory) if f.endswith('.db')]
    if not dbs:
        print(f"{CLR_YELLOW}No .db files found in '{directory}'.{CLR_RESET}")
        sys.exit(1)
    return sorted(dbs)

def read_migration_file(filepath: str) -> str:
    """Reads the SQL migration payload."""
    if not os.path.isfile(filepath):
        print(f"{CLR_RED}Error: Migration file '{filepath}' not found.{CLR_RESET}")
        sys.exit(1)
    with open(filepath, 'r') as f:
        return f.read()

def execute_phase_1(db_path: str, sql_script: str) -> dict:
    """
    PHASE 1 (Prepare): Opens a connection, begins a manual transaction,
    and attempts to execute the SQL. It does NOT commit.
    """
    try:
        conn = sqlite3.connect(db_path, isolation_level=None, timeout=5.0)
        cursor = conn.cursor()

        cursor.execute("BEGIN TRANSACTION;")
        cursor.executescript(sql_script)

        return {"db": db_path, "conn": conn, "status": "prepared", "error": None}
    except Exception as e:
        return {"db": db_path, "conn": conn if 'conn' in locals() else None, "status": "failed", "error": str(e)}

def finalize_phase_2(node: dict, action: str):
    """
    PHASE 2 (Commit/Rollback): Resolves the holding transaction and closes the connection.
    """
    conn = node.get("conn")
    if not conn:
        return

    try:
        if action == "COMMIT":
            conn.execute("COMMIT;")
        elif action == "ROLLBACK":
            conn.execute("ROLLBACK;")
    except Exception as e:
        print(f"{CLR_RED}CRITICAL: Failed to {action} on {node['db']}: {e}{CLR_RESET}")
    finally:
        conn.close()

def main():
    parser = argparse.ArgumentParser(description="Multi-Tenant 2PC Migration Orchestrator")
    parser.add_argument("sql_file", help="Path to the .sql migration file")
    parser.add_argument("--dir", default="tenants", help="Directory containing tenant .db files")
    args = parser.parse_args()

    sql_payload = read_migration_file(args.sql_file)
    tenant_dbs = get_tenant_databases(args.dir)

    print(f"{CLR_CYAN}{CLR_BOLD}🚀 Initializing Fleet Migration Orchestrator...{CLR_RESET}")
    print(f"Targeting {len(tenant_dbs)} tenant databases.\n")

    nodes = []
    has_failures = False

    print(f"{CLR_YELLOW}▶ PHASE 1: Applying pending transactions across fleet...{CLR_RESET}")
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = {executor.submit(execute_phase_1, db, sql_payload): db for db in tenant_dbs}

        for future in as_completed(futures):
            result = future.result()
            nodes.append(result)

            if result["status"] == "failed":
                has_failures = True
                print(f"  {CLR_RED}[❌ FAILED ]{CLR_RESET} {os.path.basename(result['db'])} - {result['error']}")
            else:
                print(f"  {CLR_GREEN}[✅ PREPARED]{CLR_RESET} {os.path.basename(result['db'])}")

    print("-" * 50)

    if has_failures:
        print(f"{CLR_RED}{CLR_BOLD}🚨 Failure detected in fleet! Triggering global ROLLBACK...{CLR_RESET}")
        action = "ROLLBACK"
    else:
        print(f"{CLR_GREEN}{CLR_BOLD}✅ All nodes prepared successfully. Triggering global COMMIT...{CLR_RESET}")
        action = "COMMIT"

    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(finalize_phase_2, node, action) for node in nodes]
        for _ in as_completed(futures):
            pass

    if action == "COMMIT":
        print(f"\n{CLR_CYAN}Migration completed successfully across all tenants.{CLR_RESET}")
    else:
        print(f"\n{CLR_RED}Migration aborted. All tenants safely reverted to previous state.{CLR_RESET}")
        sys.exit(1)

if __name__ == "__main__":
    main()

Execution & testing

chmod +x migrate.py

Test 1: Happy path (successful migration)

Deploy the new column to all three databases:

./migrate.py 001_add_subscription_tier.sql

The orchestrator prepares all three databases concurrently, then issues a global COMMIT.

Test 2: Rollback simulation

Run the same script a second time:

./migrate.py 001_add_subscription_tier.sql

Because subscription_tier already exists, SQLite throws a duplicate column name error. Even if one thread hits this milliseconds before others, the orchestrator catches it, halts deployment, and issues a fleet-wide ROLLBACK.


Why this shines on a portfolio

  1. Solves a hard distributed systems problem — standard web developers assume databases handle their own transactions. Coordinating transactions across multiple distinct files/servers requires Two-Phase Commit thinking.
  2. Concurrent safetyThreadPoolExecutor maps I/O-bound tasks efficiently without leaking open connections during exception paths.