Files
neuron-web/scripts/run_migrations.py
T
will.anderson adbdfd3e90
Dev — Build & local smoke test / build-smoke (pull_request) Successful in 1m36s
Fix CI migration step: extract Python to scripts/run_migrations.py
go-yaml (Gitea's parser) mishandles << inside block scalars, treating the
bash heredoc delimiter as a YAML merge key. Move the migration logic to a
standalone script called via python3 scripts/run_migrations.py.
2026-05-11 13:33:44 -05:00

104 lines
2.7 KiB
Python

#!/usr/bin/env python3
"""
run_migrations.py — apply pending Supabase migrations via the Management API.
Reads SUPABASE_ACCESS_TOKEN from env (injected by CI from GCP Secret Manager).
Migrations are tracked in a schema_migrations table (created if absent).
Files in migrations/*.sql are applied in lexicographic order; already-applied
files are skipped (idempotent).
"""
import json
import glob
import os
import subprocess
import sys
ACCESS_TOKEN = os.environ.get("SUPABASE_ACCESS_TOKEN", "")
if not ACCESS_TOKEN:
# Fall back to fetching from GCP Secret Manager (for use in CI without
# env var pre-injection).
result = subprocess.run(
[
"gcloud",
"secrets",
"versions",
"access",
"latest",
"--secret=supabase-access-token",
"--project=neuron-785695",
],
capture_output=True,
text=True,
)
if result.returncode != 0:
print(f"ERROR: could not fetch supabase-access-token: {result.stderr}", file=sys.stderr)
sys.exit(1)
ACCESS_TOKEN = result.stdout.strip()
PROJECT_ID = "ocojsghaonltunidkzpw"
API_URL = f"https://api.supabase.com/v1/projects/{PROJECT_ID}/database/query"
def query(sql: str):
r = subprocess.run(
[
"curl",
"-sf",
"-X",
"POST",
API_URL,
"-H",
f"Authorization: Bearer {ACCESS_TOKEN}",
"-H",
"Content-Type: application/json",
"-d",
json.dumps({"query": sql}),
],
capture_output=True,
text=True,
)
if r.returncode != 0:
raise RuntimeError(f"curl failed: {r.stderr}")
resp = json.loads(r.stdout)
# The Management API returns a list of rows on success, or a dict with
# "message" on error.
if isinstance(resp, dict) and resp.get("message") and not isinstance(resp.get("message"), list):
raise RuntimeError(f"DB error: {resp}")
return resp
# Ensure tracking table exists.
query(
"""
CREATE TABLE IF NOT EXISTS schema_migrations (
id text PRIMARY KEY,
applied_at timestamptz DEFAULT now()
)
"""
)
applied = {row["id"] for row in query("SELECT id FROM schema_migrations")}
print(f"Already applied: {sorted(applied)}")
pending = [
p
for p in sorted(glob.glob("migrations/*.sql"))
if os.path.basename(p) not in applied
]
if not pending:
print("No pending migrations.")
sys.exit(0)
for path in pending:
name = os.path.basename(path)
print(f"Applying {name}...")
with open(path) as f:
sql = f.read()
query(sql)
query(f"INSERT INTO schema_migrations (id) VALUES ('{name}')")
print(f"Applied {name}")
print(f"Done. Applied {len(pending)} migration(s).")