adbdfd3e90
Dev — Build & local smoke test / build-smoke (pull_request) Successful in 1m36s
go-yaml (Gitea's parser) mishandles << inside block scalars, treating the bash heredoc delimiter as a YAML merge key. Move the migration logic to a standalone script called via python3 scripts/run_migrations.py.
104 lines
2.7 KiB
Python
104 lines
2.7 KiB
Python
#!/usr/bin/env python3
|
|
"""
|
|
run_migrations.py — apply pending Supabase migrations via the Management API.
|
|
|
|
Reads SUPABASE_ACCESS_TOKEN from env (injected by CI from GCP Secret Manager).
|
|
Migrations are tracked in a schema_migrations table (created if absent).
|
|
Files in migrations/*.sql are applied in lexicographic order; already-applied
|
|
files are skipped (idempotent).
|
|
"""
|
|
|
|
import json
|
|
import glob
|
|
import os
|
|
import subprocess
|
|
import sys
|
|
|
|
ACCESS_TOKEN = os.environ.get("SUPABASE_ACCESS_TOKEN", "")
|
|
if not ACCESS_TOKEN:
|
|
# Fall back to fetching from GCP Secret Manager (for use in CI without
|
|
# env var pre-injection).
|
|
result = subprocess.run(
|
|
[
|
|
"gcloud",
|
|
"secrets",
|
|
"versions",
|
|
"access",
|
|
"latest",
|
|
"--secret=supabase-access-token",
|
|
"--project=neuron-785695",
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if result.returncode != 0:
|
|
print(f"ERROR: could not fetch supabase-access-token: {result.stderr}", file=sys.stderr)
|
|
sys.exit(1)
|
|
ACCESS_TOKEN = result.stdout.strip()
|
|
|
|
PROJECT_ID = "ocojsghaonltunidkzpw"
|
|
API_URL = f"https://api.supabase.com/v1/projects/{PROJECT_ID}/database/query"
|
|
|
|
|
|
def query(sql: str):
|
|
r = subprocess.run(
|
|
[
|
|
"curl",
|
|
"-sf",
|
|
"-X",
|
|
"POST",
|
|
API_URL,
|
|
"-H",
|
|
f"Authorization: Bearer {ACCESS_TOKEN}",
|
|
"-H",
|
|
"Content-Type: application/json",
|
|
"-d",
|
|
json.dumps({"query": sql}),
|
|
],
|
|
capture_output=True,
|
|
text=True,
|
|
)
|
|
if r.returncode != 0:
|
|
raise RuntimeError(f"curl failed: {r.stderr}")
|
|
resp = json.loads(r.stdout)
|
|
# The Management API returns a list of rows on success, or a dict with
|
|
# "message" on error.
|
|
if isinstance(resp, dict) and resp.get("message") and not isinstance(resp.get("message"), list):
|
|
raise RuntimeError(f"DB error: {resp}")
|
|
return resp
|
|
|
|
|
|
# Ensure tracking table exists.
|
|
query(
|
|
"""
|
|
CREATE TABLE IF NOT EXISTS schema_migrations (
|
|
id text PRIMARY KEY,
|
|
applied_at timestamptz DEFAULT now()
|
|
)
|
|
"""
|
|
)
|
|
|
|
applied = {row["id"] for row in query("SELECT id FROM schema_migrations")}
|
|
print(f"Already applied: {sorted(applied)}")
|
|
|
|
pending = [
|
|
p
|
|
for p in sorted(glob.glob("migrations/*.sql"))
|
|
if os.path.basename(p) not in applied
|
|
]
|
|
|
|
if not pending:
|
|
print("No pending migrations.")
|
|
sys.exit(0)
|
|
|
|
for path in pending:
|
|
name = os.path.basename(path)
|
|
print(f"Applying {name}...")
|
|
with open(path) as f:
|
|
sql = f.read()
|
|
query(sql)
|
|
query(f"INSERT INTO schema_migrations (id) VALUES ('{name}')")
|
|
print(f"Applied {name}")
|
|
|
|
print(f"Done. Applied {len(pending)} migration(s).")
|