Files
neuron/council/compressor_service.py
T

235 lines
7.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
Neuron CCR Phase 1 — System Prompt Compressor Service.
Receives a verbose soul system prompt and returns a semantically equivalent
but token-dense compressed version. Reduces system prompt tokens by 60-80%
with no behavioral information loss.
Architecture reference: foundation/forge/docs/token-compression-architecture.md
Model: qwen3:1.7b (primary), neuron:latest (fallback)
Usage:
python3 compressor_service.py [--port 7772]
API:
POST /api/neuron/compress
{"system_prompt": "...", "context_type": "identity|rules|memory"}
Response:
{"compressed": "...", "original_tokens": N, "compressed_tokens": N,
"reduction_pct": X, "model": "...", "latency_ms": N}
"""
import argparse
import time
import uuid
from typing import Optional
import httpx
import uvicorn
from fastapi import FastAPI
from fastapi.middleware.cors import CORSMiddleware
from pydantic import BaseModel
# ---------------------------------------------------------------------------
# Config
# ---------------------------------------------------------------------------
OLLAMA_BASE = "http://localhost:11434/api/generate"
# qwen3:1.7b is the architecture-specified compressor (Phase 1).
# neuron:latest is the fallback: already running, domain-appropriate.
PRIMARY_MODEL = "qwen3:1.7b"
FALLBACK_MODEL = "neuron:latest"
MODEL_TIMEOUT = 60.0 # seconds; compression of a long prompt can take time
# Compression prompt — preserves all facts/rules/constraints, strips verbosity.
# /no_think suppresses qwen3's chain-of-thought tokens, keeping output clean.
COMPRESSOR_PROMPT_TEMPLATE = """\
/no_think
You are a semantic compression engine. Compress the following system prompt while preserving ALL specific facts, rules, constraints, and named entities. Do not lose any information that would change behavior. Output ONLY the compressed text, nothing else.
Original prompt:
{system_prompt}
Compressed (preserve all facts and rules):"""
# ---------------------------------------------------------------------------
# App
# ---------------------------------------------------------------------------
app = FastAPI(
title="Neuron Compressor Service",
description="CCR Phase 1 — system prompt compression for the Neuron soul",
version="1.0.0",
)
app.add_middleware(
CORSMiddleware,
allow_origins=["*"],
allow_methods=["*"],
allow_headers=["*"],
)
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class CompressRequest(BaseModel):
system_prompt: str
context_type: Optional[str] = "mixed" # identity | rules | memory | mixed
class CompressResponse(BaseModel):
id: str
compressed: str
original_tokens: int
compressed_tokens: int
reduction_pct: float
model: str
context_type: str
latency_ms: int
# ---------------------------------------------------------------------------
# Token estimation (rough: word_count × 1.3, matching architecture doc)
# ---------------------------------------------------------------------------
def estimate_tokens(text: str) -> int:
"""Rough token count estimate: words × 1.3. No tokenizer dependency."""
words = len(text.split())
return max(1, int(words * 1.3))
# ---------------------------------------------------------------------------
# Core compression
# ---------------------------------------------------------------------------
async def ollama_available(client: httpx.AsyncClient) -> bool:
"""Quick connectivity check to Ollama."""
try:
await client.get("http://localhost:11434/", timeout=2.0)
return True
except (httpx.ConnectError, httpx.TimeoutException):
return False
async def compress_with_model(
client: httpx.AsyncClient, model: str, prompt_text: str
) -> str:
"""
Call a single Ollama model to compress the given text.
Returns the compressed string, or "" on failure.
"""
payload = {
"model": model,
"prompt": prompt_text,
"stream": False,
# Keep temperature low for deterministic compression
"options": {
"temperature": 0.1,
"top_p": 0.9,
},
}
try:
resp = await client.post(OLLAMA_BASE, json=payload, timeout=MODEL_TIMEOUT)
resp.raise_for_status()
data = resp.json()
return data.get("response", "").strip()
except (httpx.TimeoutException, httpx.HTTPStatusError, Exception):
return ""
async def run_compression(system_prompt: str, context_type: str) -> CompressResponse:
start = time.monotonic()
request_id = str(uuid.uuid4())
original_tokens = estimate_tokens(system_prompt)
prompt_text = COMPRESSOR_PROMPT_TEMPLATE.format(system_prompt=system_prompt)
async with httpx.AsyncClient() as client:
# Connectivity gate
if not await ollama_available(client):
latency_ms = int((time.monotonic() - start) * 1000)
return CompressResponse(
id=request_id,
compressed=system_prompt, # passthrough on failure
original_tokens=original_tokens,
compressed_tokens=original_tokens,
reduction_pct=0.0,
model="unavailable",
context_type=context_type,
latency_ms=latency_ms,
)
# Try primary model (qwen3:1.7b), fall back to neuron:latest
compressed = await compress_with_model(client, PRIMARY_MODEL, prompt_text)
model_used = PRIMARY_MODEL
if not compressed:
compressed = await compress_with_model(client, FALLBACK_MODEL, prompt_text)
model_used = FALLBACK_MODEL
if not compressed:
# Both models failed — passthrough
latency_ms = int((time.monotonic() - start) * 1000)
return CompressResponse(
id=request_id,
compressed=system_prompt,
original_tokens=original_tokens,
compressed_tokens=original_tokens,
reduction_pct=0.0,
model="both-failed",
context_type=context_type,
latency_ms=latency_ms,
)
compressed_tokens = estimate_tokens(compressed)
reduction_pct = round(
(1.0 - compressed_tokens / max(1, original_tokens)) * 100.0, 1
)
latency_ms = int((time.monotonic() - start) * 1000)
return CompressResponse(
id=request_id,
compressed=compressed,
original_tokens=original_tokens,
compressed_tokens=compressed_tokens,
reduction_pct=reduction_pct,
model=model_used,
context_type=context_type,
latency_ms=latency_ms,
)
# ---------------------------------------------------------------------------
# Routes
# ---------------------------------------------------------------------------
@app.post("/api/neuron/compress", response_model=CompressResponse)
async def compress(req: CompressRequest):
return await run_compression(req.system_prompt, req.context_type or "mixed")
@app.get("/healthz")
async def health():
return {"status": "ok", "service": "compressor", "version": "1.0.0"}
# ---------------------------------------------------------------------------
# Entrypoint
# ---------------------------------------------------------------------------
if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Neuron Compressor Service (CCR Phase 1)")
parser.add_argument("--port", type=int, default=7772, help="Port to listen on")
parser.add_argument("--host", default="127.0.0.1", help="Host to bind to")
args = parser.parse_args()
print(f"[compressor] Starting on {args.host}:{args.port}")
print(f"[compressor] Primary model: {PRIMARY_MODEL}")
print(f"[compressor] Fallback model: {FALLBACK_MODEL}")
uvicorn.run(app, host=args.host, port=args.port, log_level="info")