Compare commits
2 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| b7fd8901d4 | |||
| d92b8c279a |
@@ -631,38 +631,12 @@ fn handle_chat_agentic(body: String) -> String {
|
||||
return "{\"error\":\"message required\",\"reply\":\"\"}"
|
||||
}
|
||||
|
||||
// Workspace scope (#23): the desktop UI sends the user-chosen Agent Workspace root
|
||||
// on every agentic request. Persist it to state so agent_workspace_root() — and the
|
||||
// path/command tool guards that read it — confine this turn's file/command tools to
|
||||
// that subtree. The UI is the source of truth per request: empty means unscoped (the
|
||||
// backward-compatible default), and it also lets agent_workspace_root() fall through
|
||||
// to the NEURON_AGENT_ROOT env when no root is sent. FLAGGED FOR REVIEW: setting
|
||||
// state from the body each turn (vs. only-when-nonempty) so clearing the folder in
|
||||
// the UI un-scopes — confirm this is the intended ownership model.
|
||||
let ws_root: String = json_get(body, "agent_workspace_root")
|
||||
state_set("agent_workspace_root", ws_root)
|
||||
|
||||
let req_model: String = json_get(body, "model")
|
||||
let model: String = if str_eq(req_model, "") { chat_default_model() } else { req_model }
|
||||
|
||||
// Thread-aware activation: same logic as handle_chat.
|
||||
// Use the session's or global history to anchor short messages to the thread.
|
||||
let req_session: String = json_get(body, "session_id")
|
||||
|
||||
// ISSUE #6/#7: validate that the session_id actually exists before proceeding.
|
||||
// Without this check the loop silently treats any unknown/fabricated session_id
|
||||
// as a fresh session — history loads as empty and no error is returned to the caller.
|
||||
// Only validate when a session_id is explicitly provided; anonymous calls
|
||||
// (no session_id) continue to work for backward compatibility.
|
||||
let session_valid: Bool = if str_eq(req_session, "") {
|
||||
true
|
||||
} else {
|
||||
session_exists(req_session)
|
||||
}
|
||||
if !session_valid {
|
||||
return "{\"error\":\"session not found\",\"session_id\":\"" + req_session + "\",\"reply\":\"\"}"
|
||||
}
|
||||
|
||||
let hist_key: String = if str_eq(req_session, "") { "conv_history" } else { "session_hist_" + req_session }
|
||||
let agentic_hist: String = state_get(hist_key)
|
||||
let agentic_hist_len: Int = if str_eq(agentic_hist, "") { 0 } else { json_array_len(agentic_hist) }
|
||||
|
||||
@@ -7,6 +7,65 @@ import "neuron-api.el"
|
||||
import "sessions.el"
|
||||
import "soul.elh"
|
||||
|
||||
// ---------------------------------------------------------------------------
|
||||
// Rate limiting — simple in-memory per-IP sliding window counter.
|
||||
//
|
||||
// State keys:
|
||||
// rl:<ip>:count — request count in the current window
|
||||
// rl:<ip>:window — window start timestamp (unix seconds)
|
||||
//
|
||||
// Limit: configurable via soul state key "soul_rate_limit" (requests per
|
||||
// minute). Falls back to 60 req/min if not set. The /health endpoint is
|
||||
// exempt so monitoring does not consume quota.
|
||||
//
|
||||
// State growth: each unique source IP accumulates exactly 2 state keys
|
||||
// (count + window) for the lifetime of the process. Per-IP storage is
|
||||
// bounded and constant; values reset on window expiry. In aggregate, state
|
||||
// grows linearly with distinct IPs — typical for a trusted-client service.
|
||||
// EL has no state_delete builtin, so keys from inactive IPs persist.
|
||||
// TODO: add state_delete sweep when the EL runtime exposes that primitive.
|
||||
//
|
||||
// Returns "" when the request is allowed, or a 429 JSON body when rejected.
|
||||
// ---------------------------------------------------------------------------
|
||||
fn rate_limit_check(ip: String, path: String) -> String {
|
||||
// Health checks are exempt — they must never be blocked.
|
||||
if str_eq(path, "/health") {
|
||||
return ""
|
||||
}
|
||||
|
||||
let limit_str: String = state_get("soul_rate_limit")
|
||||
let limit: Int = if str_eq(limit_str, "") { 60 } else { str_to_int(limit_str) }
|
||||
|
||||
let now: Int = time_now()
|
||||
let window_key: String = "rl:" + ip + ":window"
|
||||
let count_key: String = "rl:" + ip + ":count"
|
||||
|
||||
let win_str: String = state_get(window_key)
|
||||
let win_start: Int = if str_eq(win_str, "") { now } else { str_to_int(win_str) }
|
||||
|
||||
// New window every 60 seconds.
|
||||
let elapsed: Int = now - win_start
|
||||
let in_window: Bool = elapsed < 60
|
||||
|
||||
let prev_count_str: String = state_get(count_key)
|
||||
let prev_count: Int = if str_eq(prev_count_str, "") { 0 } else { str_to_int(prev_count_str) }
|
||||
|
||||
// Reset window if expired.
|
||||
let eff_count: Int = if in_window { prev_count } else { 0 }
|
||||
let eff_win: Int = if in_window { win_start } else { now }
|
||||
|
||||
let new_count: Int = eff_count + 1
|
||||
state_set(count_key, int_to_str(new_count))
|
||||
state_set(window_key, int_to_str(eff_win))
|
||||
|
||||
if new_count > limit {
|
||||
let retry_after: Int = 60 - (now - eff_win)
|
||||
let eff_retry: Int = if retry_after < 0 { 0 } else { retry_after }
|
||||
return "{\"__status__\":429,\"error\":\"rate limit exceeded\",\"code\":\"rate_limited\",\"retry_after_secs\":" + int_to_str(eff_retry) + "}"
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
fn strip_query(path: String) -> String {
|
||||
let q: Int = str_index_of(path, "?")
|
||||
if q < 0 {
|
||||
@@ -16,11 +75,11 @@ fn strip_query(path: String) -> String {
|
||||
}
|
||||
|
||||
fn err_404(path: String) -> String {
|
||||
return "{\"error\":\"not found\",\"path\":\"" + path + "\"}"
|
||||
return "{\"error\":\"not found\",\"code\":\"not_found\",\"path\":\"" + path + "\"}"
|
||||
}
|
||||
|
||||
fn err_405(method: String, path: String) -> String {
|
||||
return "{\"error\":\"method not allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}"
|
||||
return "{\"error\":\"method not allowed\",\"code\":\"method_not_allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}"
|
||||
}
|
||||
|
||||
fn route_health() -> String {
|
||||
@@ -31,12 +90,35 @@ fn route_health() -> String {
|
||||
let edge_ct: Int = engram_edge_count()
|
||||
let pulse: String = state_get("soul.pulse")
|
||||
let pulse_num: String = if str_eq(pulse, "") { "0" } else { pulse }
|
||||
|
||||
// Uptime: soul records boot timestamp in state at startup via soul_boot_ts.
|
||||
// Compute elapsed seconds; fall back to -1 if not yet set.
|
||||
let boot_ts_str: String = state_get("soul_boot_ts")
|
||||
let uptime_secs: Int = if str_eq(boot_ts_str, "") {
|
||||
-1
|
||||
} else {
|
||||
time_now() - str_to_int(boot_ts_str)
|
||||
}
|
||||
|
||||
// LLM connectivity: probe with a minimal call. Any non-error reply = ok.
|
||||
// Use a short, fixed prompt so this never counts against conversation history.
|
||||
let model: String = state_get("soul_model")
|
||||
let eff_model: String = if str_eq(model, "") { "claude-sonnet-4-5" } else { model }
|
||||
let llm_probe: String = llm_call_system(eff_model, "You are a health probe. Reply with the single word: ok", "ping")
|
||||
let llm_ok: Bool = !str_eq(llm_probe, "")
|
||||
&& !str_starts_with(llm_probe, "{\"error\"")
|
||||
&& !str_starts_with(llm_probe, "{\"type\":\"error\"")
|
||||
&& !str_contains(llm_probe, "authentication_error")
|
||||
let llm_status: String = if llm_ok { "ok" } else { "unreachable" }
|
||||
|
||||
return "{\"status\":\"alive\""
|
||||
+ ",\"cgi_id\":\"" + cgi_id + "\""
|
||||
+ ",\"boot\":" + boot_num
|
||||
+ ",\"uptime_secs\":" + int_to_str(uptime_secs)
|
||||
+ ",\"node_count\":" + int_to_str(node_ct)
|
||||
+ ",\"edge_count\":" + int_to_str(edge_ct)
|
||||
+ ",\"pulse\":" + pulse_num
|
||||
+ ",\"llm\":\"" + llm_status + "\""
|
||||
+ ",\"layers\":{\"l0\":\"core\",\"l1\":\"safety\",\"l2\":\"stewardship\",\"l3\":\"" + imprint_current() + "\"}}"
|
||||
}
|
||||
|
||||
@@ -103,15 +185,15 @@ fn route_imprint_user(body: String) -> String {
|
||||
|
||||
fn route_synthesize(body: String) -> String {
|
||||
if str_eq(body, "") {
|
||||
return "{\"mechanism\":\"did not engage\"}"
|
||||
return "{\"error\":\"body is required\",\"code\":\"missing_param\"}"
|
||||
}
|
||||
let parent_a: String = json_get(body, "parent_a")
|
||||
let parent_b: String = json_get(body, "parent_b")
|
||||
if str_eq(parent_a, "") {
|
||||
return "{\"mechanism\":\"did not engage\"}"
|
||||
return "{\"error\":\"parent_a is required\",\"code\":\"missing_param\"}"
|
||||
}
|
||||
if str_eq(parent_b, "") {
|
||||
return "{\"mechanism\":\"did not engage\"}"
|
||||
return "{\"error\":\"parent_b is required\",\"code\":\"missing_param\"}"
|
||||
}
|
||||
let req: String = "synthesize " + parent_a + " " + parent_b
|
||||
let tags: String = "[\"soul-inbox-pending\",\"synthesis-request\"]"
|
||||
@@ -259,6 +341,17 @@ fn handle_connectors(method: String, clean: String, body: String) -> String {
|
||||
fn handle_request(method: String, path: String, body: String) -> String {
|
||||
let clean: String = strip_query(path)
|
||||
|
||||
// Rate limit check. Extract caller IP from REMOTE_ADDR env var (set by the
|
||||
// EL HTTP runtime for each request). Skip enforcement when empty so
|
||||
// loopback/internal callers are never blocked.
|
||||
let ip: String = env("REMOTE_ADDR")
|
||||
if !str_eq(ip, "") {
|
||||
let rl_result: String = rate_limit_check(ip, clean)
|
||||
if !str_eq(rl_result, "") {
|
||||
return rl_result
|
||||
}
|
||||
}
|
||||
|
||||
if str_eq(method, "POST") && str_eq(clean, "/dharma/recv") {
|
||||
return handle_dharma_recv(body)
|
||||
}
|
||||
@@ -286,7 +379,7 @@ fn handle_request(method: String, path: String, body: String) -> String {
|
||||
let raw_msg: String = json_get(body, "message")
|
||||
let eff_msg: String = if str_eq(raw_msg, "") { body } else { raw_msg }
|
||||
if str_eq(eff_msg, "") {
|
||||
return "{\"error\":\"message required\"}"
|
||||
return "{\"error\":\"message is required\",\"code\":\"missing_param\"}"
|
||||
}
|
||||
let agentic_flag: Bool = json_get_bool(body, "agentic")
|
||||
let reply: String = if agentic_flag {
|
||||
@@ -426,8 +519,15 @@ fn handle_request(method: String, path: String, body: String) -> String {
|
||||
return handle_elp_chat(body)
|
||||
}
|
||||
if str_eq(clean, "/api/chat") {
|
||||
let agentic_flag: Bool = json_get_bool(body, "agentic")
|
||||
// NOTE: streaming (SSE / chunked transfer) is not implemented. All chat
|
||||
// responses are buffered and returned as a single JSON object. Streaming
|
||||
// would require runtime-level SSE support in el_runtime.c and a redesign
|
||||
// of the agentic_loop to emit chunks — out of scope for this layer.
|
||||
let raw_msg: String = json_get(body, "message")
|
||||
if str_eq(raw_msg, "") {
|
||||
return "{\"error\":\"message is required\",\"code\":\"missing_param\"}"
|
||||
}
|
||||
let agentic_flag: Bool = json_get_bool(body, "agentic")
|
||||
let reply: String = if agentic_flag {
|
||||
handle_chat_agentic(body)
|
||||
} else {
|
||||
|
||||
+1
-97
@@ -36,49 +36,7 @@ fn session_make_content(id: String, title: String, created_at: Int, updated_at:
|
||||
+ ",\"updated_at\":" + int_to_str(updated_at) + "}"
|
||||
}
|
||||
|
||||
// session_exists — return true if the given session_id is known in Engram or state.
|
||||
// Used by chat.el to validate a session_id before processing a chat message.
|
||||
// Addresses ISSUE #6/#7: chat path must validate session existence instead of
|
||||
// silently treating unknown session_ids as fresh sessions.
|
||||
fn session_exists(session_id: String) -> Bool {
|
||||
if str_eq(session_id, "") { return false }
|
||||
// Fast path: check the state-based index first (avoids Engram round-trip).
|
||||
let idx: String = state_get("session_index")
|
||||
if !str_eq(idx, "") && !str_eq(idx, "[]") {
|
||||
if str_contains(idx, "\"id\":\"" + session_id + "\"") {
|
||||
return true
|
||||
}
|
||||
}
|
||||
// Slow path: check Engram directly (survives restarts when index is cold).
|
||||
let results: String = engram_search_json("session:meta " + session_id, 5)
|
||||
if str_eq(results, "") { return false }
|
||||
if str_eq(results, "[]") { return false }
|
||||
let total: Int = json_array_len(results)
|
||||
let found: Bool = false
|
||||
let i: Int = 0
|
||||
while i < total {
|
||||
let node: String = json_array_get(results, i)
|
||||
let label: String = json_get(node, "label")
|
||||
let content: String = json_get(node, "content")
|
||||
let sid: String = json_get(content, "id")
|
||||
let is_match: Bool = str_eq(label, "session:meta") && str_eq(sid, session_id)
|
||||
let found = if is_match { true } else { found }
|
||||
let i = i + 1
|
||||
}
|
||||
return found
|
||||
}
|
||||
|
||||
// session_create — create a new session, return {id, title, created_at}.
|
||||
//
|
||||
// ISSUE #1: Ghost sessions on failed first message.
|
||||
// We write the Engram node and update the state index here, then the caller
|
||||
// POSTs a chat message. If that chat call fails (LLM unavailable, network
|
||||
// error, etc.) the session is stranded with no messages. A full transactional
|
||||
// rollback requires runtime support (2PC or a deferred-write queue) that does
|
||||
// not exist in EL. Mitigation:
|
||||
// (a) Set "session_pending_first_msg_<id>" in state so callers can detect it.
|
||||
// (b) Provide session_create_cleanup() for callers that detect a failure.
|
||||
// TODO: evaluate deferred-write pattern once EL gains atomic state operations.
|
||||
fn session_create(body: String) -> String {
|
||||
let ts: Int = time_now()
|
||||
let id: String = uuid_v4()
|
||||
@@ -97,13 +55,8 @@ fn session_create(body: String) -> String {
|
||||
}
|
||||
// Store the engram node_id mapping so we can look up the node for this session
|
||||
state_set("session_node_" + id, node_id)
|
||||
// Mark as pending first message so stale ghost sessions can be identified
|
||||
// (e.g. if the caller\'s subsequent chat POST fails).
|
||||
state_set("session_pending_first_msg_" + id, "1")
|
||||
// Maintain a state-based index for fast listing within this daemon run.
|
||||
// Newest sessions first (prepend).
|
||||
// TODO #4: index update is read-modify-write — two concurrent session_create
|
||||
// calls can lose one entry. EL has no CAS primitive; fix requires runtime support.
|
||||
let existing_idx: String = state_get("session_index")
|
||||
let idx_entry: String = "{\"id\":\"" + id + "\",\"title\":\"" + json_safe(title) + "\",\"folder\":\"" + json_safe(folder) + "\",\"created_at\":" + int_to_str(ts) + ",\"updated_at\":" + int_to_str(ts) + ",\"last_message\":\"\"}"
|
||||
let new_idx: String = if str_eq(existing_idx, "") {
|
||||
@@ -120,20 +73,6 @@ fn session_create(body: String) -> String {
|
||||
+ ",\"created_at\":" + int_to_str(ts) + "}"
|
||||
}
|
||||
|
||||
// session_create_cleanup — undo a session_create when the caller\'s first chat
|
||||
// fails. Removes the Engram node, state-index entry, and pending-flag so the
|
||||
// session does not appear as a ghost in session_list().
|
||||
// Addresses ISSUE #1: cleanup path for ghost sessions.
|
||||
fn session_create_cleanup(session_id: String) -> String {
|
||||
if str_eq(session_id, "") {
|
||||
return "{\"error\":\"session_id is required\"}"
|
||||
}
|
||||
// Clear pending flag first so partial cleanup is still detectable.
|
||||
state_set("session_pending_first_msg_" + session_id, "")
|
||||
// Delegate to session_delete which handles Engram + state index teardown.
|
||||
return session_delete(session_id)
|
||||
}
|
||||
|
||||
// session_list — list all sessions. Returns [{id, title, last_message, created_at, updated_at}].
|
||||
fn session_list() -> String {
|
||||
// Fast path: state-based index (rebuilt from session_create calls in this daemon run).
|
||||
@@ -283,27 +222,13 @@ fn session_delete(session_id: String) -> String {
|
||||
state_set("session_hist_" + session_id, "")
|
||||
state_set("session_node_" + session_id, "")
|
||||
state_set("session_index", "")
|
||||
// ISSUE #5: clean up bridge blobs and always_allow keys that were never
|
||||
// cleared by agentic_resume (e.g. client abandoned a pending tool call).
|
||||
// Without this, stranded bridge blobs accumulate indefinitely in state.
|
||||
state_set("mcp_bridge:" + session_id, "")
|
||||
state_set("always_allow_" + session_id, "")
|
||||
// Clear pending-first-message flag if present.
|
||||
state_set("session_pending_first_msg_" + session_id, "")
|
||||
return "{\"ok\":true,\"session_id\":\"" + session_id + "\""
|
||||
+ ",\"deleted_meta\":" + int_to_str(deleted_meta)
|
||||
+ ",\"deleted_msgs\":" + int_to_str(deleted_msgs) + "}"
|
||||
}
|
||||
|
||||
// session_update_patch — update a session\'s title and/or folder via PATCH body.
|
||||
// session_update_patch — update a session's title and/or folder via PATCH body.
|
||||
// Body may contain "title", "folder", or both. Preserves unmentioned fields.
|
||||
//
|
||||
// ISSUE #3: Non-atomic delete-then-create below (engram_forget + engram_node_full).
|
||||
// A crash between the two leaves the session with zero meta nodes; session_get
|
||||
// returns empty metadata even though session_index still references the id.
|
||||
// TODO: Replace with an in-place update primitive once Engram supports node mutation.
|
||||
// Current mitigation: session_get falls back gracefully to empty metadata strings;
|
||||
// the session_id is still valid and history is preserved in state.
|
||||
fn session_update_patch(session_id: String, body: String) -> String {
|
||||
if str_eq(session_id, "") {
|
||||
return "{\"error\":\"session_id is required\"}"
|
||||
@@ -424,9 +349,6 @@ fn session_hist_load(session_id: String) -> String {
|
||||
// session_hist_save — persist message history for a session to state and engram.
|
||||
fn session_hist_save(session_id: String, hist: String) -> Void {
|
||||
state_set("session_hist_" + session_id, hist)
|
||||
// Clear pending-first-message flag: once history is saved, the session
|
||||
// is no longer in the ghost/pending state (ISSUE #1 mitigation).
|
||||
state_set("session_pending_first_msg_" + session_id, "")
|
||||
// Delete old history node and write fresh one
|
||||
let old_results: String = engram_search_json("session:messages:" + session_id, 3)
|
||||
let o_total: Int = if str_eq(old_results, "") { 0 } else { json_array_len(old_results) }
|
||||
@@ -449,16 +371,6 @@ fn session_hist_save(session_id: String, hist: String) -> Void {
|
||||
}
|
||||
|
||||
// session_update_meta_timestamp — update the updated_at field in the session:meta node.
|
||||
//
|
||||
// ISSUE #2: No TTL / idle expiry mechanism. Sessions accumulate indefinitely.
|
||||
// A sweep job (e.g. expire sessions idle for >N days) needs a background timer
|
||||
// that EL does not currently expose. Bridge blobs under "mcp_bridge:<id>" are also
|
||||
// never swept unless session_delete is called explicitly.
|
||||
// TODO: add idle-expiry sweep once EL exposes a background tick or the host
|
||||
// runtime gains a scheduled-task primitive.
|
||||
//
|
||||
// ISSUE #3 applies here too: delete-then-create is non-atomic. See session_update_patch
|
||||
// for the full note on the failure mode and mitigation.
|
||||
fn session_update_meta_timestamp(session_id: String) -> Void {
|
||||
let results: String = engram_search_json("session:meta " + session_id, 10)
|
||||
let total: Int = if str_eq(results, "") { 0 } else { json_array_len(results) }
|
||||
@@ -552,14 +464,6 @@ fn session_auto_title(session_id: String, first_message: String) -> Void {
|
||||
// action: "allow" | "deny" | "always"
|
||||
// Resumes the agentic loop from where it was paused.
|
||||
//
|
||||
// ISSUE #8: Reconnect/duplicate resume race. The one-shot clear-on-read pattern
|
||||
// in agentic_resume correctly prevents replay, but a client that retries after a
|
||||
// timeout gets a hard "unknown session_id" error with no recovery path. The
|
||||
// conversation is permanently stuck in that case. Full idempotency (e.g. caching
|
||||
// the last reply keyed by call_id) requires a new state structure.
|
||||
// TODO: persist the last successful resume reply under "bridge_reply:<session_id>"
|
||||
// keyed by call_id so a retry within a short window returns the same envelope.
|
||||
//
|
||||
// Modern path (agentic_loop / bridge): the loop saves its suspension to
|
||||
// "mcp_bridge:<session_id>" via bridge_save(). On approval we dispatch_tool()
|
||||
// if allowed (or build a denial string), then hand the result to agentic_resume()
|
||||
|
||||
@@ -369,6 +369,7 @@ load_identity_context()
|
||||
seed_persona_from_env()
|
||||
let boot_num: Int = mem_boot_count_inc()
|
||||
state_set("soul_boot_count", int_to_str(boot_num))
|
||||
state_set("soul_boot_ts", int_to_str(time_now()))
|
||||
println("[soul] boot #" + int_to_str(boot_num))
|
||||
emit_session_start_event()
|
||||
|
||||
|
||||
Reference in New Issue
Block a user