diff --git a/routes.el b/routes.el index b0d6ae1..3ac0fc9 100644 --- a/routes.el +++ b/routes.el @@ -7,6 +7,65 @@ import "neuron-api.el" import "sessions.el" import "soul.elh" +// --------------------------------------------------------------------------- +// Rate limiting — simple in-memory per-IP sliding window counter. +// +// State keys: +// rl::count — request count in the current window +// rl::window — window start timestamp (unix seconds) +// +// Limit: configurable via soul state key "soul_rate_limit" (requests per +// minute). Falls back to 60 req/min if not set. The /health endpoint is +// exempt so monitoring does not consume quota. +// +// State growth: each unique source IP accumulates exactly 2 state keys +// (count + window) for the lifetime of the process. Per-IP storage is +// bounded and constant; values reset on window expiry. In aggregate, state +// grows linearly with distinct IPs — typical for a trusted-client service. +// EL has no state_delete builtin, so keys from inactive IPs persist. +// TODO: add state_delete sweep when the EL runtime exposes that primitive. +// +// Returns "" when the request is allowed, or a 429 JSON body when rejected. +// --------------------------------------------------------------------------- +fn rate_limit_check(ip: String, path: String) -> String { + // Health checks are exempt — they must never be blocked. + if str_eq(path, "/health") { + return "" + } + + let limit_str: String = state_get("soul_rate_limit") + let limit: Int = if str_eq(limit_str, "") { 60 } else { str_to_int(limit_str) } + + let now: Int = time_now() + let window_key: String = "rl:" + ip + ":window" + let count_key: String = "rl:" + ip + ":count" + + let win_str: String = state_get(window_key) + let win_start: Int = if str_eq(win_str, "") { now } else { str_to_int(win_str) } + + // New window every 60 seconds. + let elapsed: Int = now - win_start + let in_window: Bool = elapsed < 60 + + let prev_count_str: String = state_get(count_key) + let prev_count: Int = if str_eq(prev_count_str, "") { 0 } else { str_to_int(prev_count_str) } + + // Reset window if expired. + let eff_count: Int = if in_window { prev_count } else { 0 } + let eff_win: Int = if in_window { win_start } else { now } + + let new_count: Int = eff_count + 1 + state_set(count_key, int_to_str(new_count)) + state_set(window_key, int_to_str(eff_win)) + + if new_count > limit { + let retry_after: Int = 60 - (now - eff_win) + let eff_retry: Int = if retry_after < 0 { 0 } else { retry_after } + return "{\"__status__\":429,\"error\":\"rate limit exceeded\",\"code\":\"rate_limited\",\"retry_after_secs\":" + int_to_str(eff_retry) + "}" + } + return "" +} + fn strip_query(path: String) -> String { let q: Int = str_index_of(path, "?") if q < 0 { @@ -16,11 +75,11 @@ fn strip_query(path: String) -> String { } fn err_404(path: String) -> String { - return "{\"error\":\"not found\",\"path\":\"" + path + "\"}" + return "{\"error\":\"not found\",\"code\":\"not_found\",\"path\":\"" + path + "\"}" } fn err_405(method: String, path: String) -> String { - return "{\"error\":\"method not allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}" + return "{\"error\":\"method not allowed\",\"code\":\"method_not_allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}" } fn route_health() -> String { @@ -31,12 +90,35 @@ fn route_health() -> String { let edge_ct: Int = engram_edge_count() let pulse: String = state_get("soul.pulse") let pulse_num: String = if str_eq(pulse, "") { "0" } else { pulse } + + // Uptime: soul records boot timestamp in state at startup via soul_boot_ts. + // Compute elapsed seconds; fall back to -1 if not yet set. + let boot_ts_str: String = state_get("soul_boot_ts") + let uptime_secs: Int = if str_eq(boot_ts_str, "") { + -1 + } else { + time_now() - str_to_int(boot_ts_str) + } + + // LLM connectivity: probe with a minimal call. Any non-error reply = ok. + // Use a short, fixed prompt so this never counts against conversation history. + let model: String = state_get("soul_model") + let eff_model: String = if str_eq(model, "") { "claude-sonnet-4-5" } else { model } + let llm_probe: String = llm_call_system(eff_model, "You are a health probe. Reply with the single word: ok", "ping") + let llm_ok: Bool = !str_eq(llm_probe, "") + && !str_starts_with(llm_probe, "{\"error\"") + && !str_starts_with(llm_probe, "{\"type\":\"error\"") + && !str_contains(llm_probe, "authentication_error") + let llm_status: String = if llm_ok { "ok" } else { "unreachable" } + return "{\"status\":\"alive\"" + ",\"cgi_id\":\"" + cgi_id + "\"" + ",\"boot\":" + boot_num + + ",\"uptime_secs\":" + int_to_str(uptime_secs) + ",\"node_count\":" + int_to_str(node_ct) + ",\"edge_count\":" + int_to_str(edge_ct) + ",\"pulse\":" + pulse_num + + ",\"llm\":\"" + llm_status + "\"" + ",\"layers\":{\"l0\":\"core\",\"l1\":\"safety\",\"l2\":\"stewardship\",\"l3\":\"" + imprint_current() + "\"}}" } @@ -103,15 +185,15 @@ fn route_imprint_user(body: String) -> String { fn route_synthesize(body: String) -> String { if str_eq(body, "") { - return "{\"mechanism\":\"did not engage\"}" + return "{\"error\":\"body is required\",\"code\":\"missing_param\"}" } let parent_a: String = json_get(body, "parent_a") let parent_b: String = json_get(body, "parent_b") if str_eq(parent_a, "") { - return "{\"mechanism\":\"did not engage\"}" + return "{\"error\":\"parent_a is required\",\"code\":\"missing_param\"}" } if str_eq(parent_b, "") { - return "{\"mechanism\":\"did not engage\"}" + return "{\"error\":\"parent_b is required\",\"code\":\"missing_param\"}" } let req: String = "synthesize " + parent_a + " " + parent_b let tags: String = "[\"soul-inbox-pending\",\"synthesis-request\"]" @@ -259,6 +341,17 @@ fn handle_connectors(method: String, clean: String, body: String) -> String { fn handle_request(method: String, path: String, body: String) -> String { let clean: String = strip_query(path) + // Rate limit check. Extract caller IP from REMOTE_ADDR env var (set by the + // EL HTTP runtime for each request). Skip enforcement when empty so + // loopback/internal callers are never blocked. + let ip: String = env("REMOTE_ADDR") + if !str_eq(ip, "") { + let rl_result: String = rate_limit_check(ip, clean) + if !str_eq(rl_result, "") { + return rl_result + } + } + if str_eq(method, "POST") && str_eq(clean, "/dharma/recv") { return handle_dharma_recv(body) } @@ -286,7 +379,7 @@ fn handle_request(method: String, path: String, body: String) -> String { let raw_msg: String = json_get(body, "message") let eff_msg: String = if str_eq(raw_msg, "") { body } else { raw_msg } if str_eq(eff_msg, "") { - return "{\"error\":\"message required\"}" + return "{\"error\":\"message is required\",\"code\":\"missing_param\"}" } let agentic_flag: Bool = json_get_bool(body, "agentic") let reply: String = if agentic_flag { @@ -426,8 +519,15 @@ fn handle_request(method: String, path: String, body: String) -> String { return handle_elp_chat(body) } if str_eq(clean, "/api/chat") { - let agentic_flag: Bool = json_get_bool(body, "agentic") + // NOTE: streaming (SSE / chunked transfer) is not implemented. All chat + // responses are buffered and returned as a single JSON object. Streaming + // would require runtime-level SSE support in el_runtime.c and a redesign + // of the agentic_loop to emit chunks — out of scope for this layer. let raw_msg: String = json_get(body, "message") + if str_eq(raw_msg, "") { + return "{\"error\":\"message is required\",\"code\":\"missing_param\"}" + } + let agentic_flag: Bool = json_get_bool(body, "agentic") let reply: String = if agentic_flag { handle_chat_agentic(body) } else { diff --git a/soul.el b/soul.el index 0147f2a..dbb4376 100644 --- a/soul.el +++ b/soul.el @@ -369,6 +369,7 @@ load_identity_context() seed_persona_from_env() let boot_num: Int = mem_boot_count_inc() state_set("soul_boot_count", int_to_str(boot_num)) +state_set("soul_boot_ts", int_to_str(time_now())) println("[soul] boot #" + int_to_str(boot_num)) emit_session_start_event()