#!/usr/bin/env python3 """ run_migrations.py — apply pending Supabase migrations via the Management API. Reads SUPABASE_ACCESS_TOKEN from env (injected by CI from GCP Secret Manager). Migrations are tracked in a schema_migrations table (created if absent). Files in migrations/*.sql are applied in lexicographic order; already-applied files are skipped (idempotent). """ import json import glob import os import subprocess import sys ACCESS_TOKEN = os.environ.get("SUPABASE_ACCESS_TOKEN", "") if not ACCESS_TOKEN: # Fall back to fetching from GCP Secret Manager (for use in CI without # env var pre-injection). result = subprocess.run( [ "gcloud", "secrets", "versions", "access", "latest", "--secret=supabase-access-token", "--project=neuron-785695", ], capture_output=True, text=True, ) if result.returncode != 0: print(f"ERROR: could not fetch supabase-access-token: {result.stderr}", file=sys.stderr) sys.exit(1) ACCESS_TOKEN = result.stdout.strip() PROJECT_ID = "ocojsghaonltunidkzpw" API_URL = f"https://api.supabase.com/v1/projects/{PROJECT_ID}/database/query" def query(sql: str): r = subprocess.run( [ "curl", "-sf", "-X", "POST", API_URL, "-H", f"Authorization: Bearer {ACCESS_TOKEN}", "-H", "Content-Type: application/json", "-d", json.dumps({"query": sql}), ], capture_output=True, text=True, ) if r.returncode != 0: raise RuntimeError(f"curl failed: {r.stderr}") resp = json.loads(r.stdout) # The Management API returns a list of rows on success, or a dict with # "message" on error. if isinstance(resp, dict) and resp.get("message") and not isinstance(resp.get("message"), list): raise RuntimeError(f"DB error: {resp}") return resp # Ensure tracking table exists. query( """ CREATE TABLE IF NOT EXISTS schema_migrations ( id text PRIMARY KEY, applied_at timestamptz DEFAULT now() ) """ ) applied = {row["id"] for row in query("SELECT id FROM schema_migrations")} print(f"Already applied: {sorted(applied)}") pending = [ p for p in sorted(glob.glob("migrations/*.sql")) if os.path.basename(p) not in applied ] if not pending: print("No pending migrations.") sys.exit(0) for path in pending: name = os.path.basename(path) print(f"Applying {name}...") with open(path) as f: sql = f.read() query(sql) query(f"INSERT INTO schema_migrations (id) VALUES ('{name}')") print(f"Applied {name}") print(f"Done. Applied {len(pending)} migration(s).")