Compare commits

..

2 Commits

Author SHA1 Message Date
will.anderson d008649c3e fix(reliability): engram-connection
Neuron Soul CI / build (pull_request) Has been cancelled
- entrypoint.sh: extend engram health-check timeout 30->60s; set
  EL_HTTP_TIMEOUT_MS=10000 and EL_HTTP_CONNECT_TIMEOUT_MS=3000 to bound
  awareness loop blocking window to 10s/call (down from 60s default)
- soul.el: 3-attempt retry loop for boot-time /api/nodes+/api/edges fetch;
  validate non-empty JSON array before loading to prevent silent zero-node
  identity graph from transient post-healthcheck network hiccup
- awareness.el: soft circuit-breaker in ise_post (opens after 3 failures,
  30s backoff, half-open probe); /api/sync refresh skips HTTP call when
  breaker is open; error-JSON detection on sync response

TODOs: full async dispatch, connection pooling (require EL futures/persistent curl)
2026-06-22 11:57:20 -05:00
will.anderson aa70c5dde6 fix(reliability): safety-resilience — bell augmentation, safe mode, dedup logging, tab escaping, handle_chat coverage 2026-06-22 11:54:40 -05:00
6 changed files with 124 additions and 128 deletions
+33 -3
View File
@@ -40,7 +40,32 @@ fn ise_post(content: String) -> Void {
let safe3: String = str_replace(safe2, "\n", "\\n")
let safe4: String = str_replace(safe3, "\r", "\\r")
let body: String = "{\"content\":\"" + safe4 + "\"}"
let discard: String = http_post_json(engram_url + "/api/neuron/state-events", body)
// Soft circuit-breaker: skip HTTP call when engram is known-down (30s backoff).
// Opens after 3 consecutive failures; half-open probe after backoff expires.
// TODO(reliability): full async dispatch requires EL runtime futures support.
let cb_open: String = state_get("engram_cb_open")
if str_eq(cb_open, "1") {
let cb_ts_s: String = state_get("engram_cb_open_ts")
let cb_ts: Int = if str_eq(cb_ts_s, "") { 0 } else { str_to_int(cb_ts_s) }
let cb_elapsed: Int = time_now() - cb_ts
if cb_elapsed < 30000 { return "" }
state_set("engram_cb_open", "0")
}
let resp: String = http_post_json(engram_url + "/api/neuron/state-events", body)
let cb_failed: Bool = str_eq(resp, "") || str_starts_with(resp, "{"error":")
if cb_failed {
let fn_s: String = state_get("engram_cb_fails")
let fn_n: Int = if str_eq(fn_s, "") { 0 } else { str_to_int(fn_s) }
let fn_n = fn_n + 1
state_set("engram_cb_fails", int_to_str(fn_n))
if fn_n >= 3 {
state_set("engram_cb_open", "1")
state_set("engram_cb_open_ts", int_to_str(time_now()))
println("[awareness] engram circuit-breaker OPEN after " + int_to_str(fn_n) + " failures")
}
} else {
state_set("engram_cb_fails", "0")
}
return ""
}
@@ -540,9 +565,14 @@ fn awareness_run() -> Void {
let should_refresh: Bool = refresh_elapsed >= refresh_ms
if should_refresh {
let engram_url: String = state_get("soul_engram_url")
if !str_eq(engram_url, "") {
let sc: String = state_get("engram_cb_open")
let sc_ts_s: String = state_get("engram_cb_open_ts")
let sc_ts: Int = if str_eq(sc_ts_s, "") { 0 } else { str_to_int(sc_ts_s) }
let sc_elapsed: Int = now_ts - sc_ts
let sync_allowed: Bool = !str_eq(sc, "1") || sc_elapsed >= 30000
if !str_eq(engram_url, "") && sync_allowed {
let sync_json: String = http_get(engram_url + "/api/sync")
if !str_eq(sync_json, "") && !str_eq(sync_json, "{}") {
if !str_eq(sync_json, "") && !str_eq(sync_json, "{}") && !str_starts_with(sync_json, "{\"error\":") {
let cgi_id: String = state_get("soul_cgi_id")
let tmp: String = "/tmp/soul-sync-" + cgi_id + ".json"
fs_write(tmp, sync_json)
+4
View File
@@ -186,6 +186,10 @@ fn handle_chat(body: String) -> String {
let req_model: String = json_get(body, "model")
let model: String = if str_eq(req_model, "") { chat_default_model() } else { req_model }
// ISSUE 9: add safety_augment_system to primary /api/chat path.
// handle_chat was the only LLM path missing bell directive injection.
let full_system = safety_augment_system(full_system, message)
let raw_response: String = llm_call_system(model, full_system, message)
let is_error: Bool = str_starts_with(raw_response, "{\"error\"")
+8 -4
View File
@@ -24,19 +24,23 @@ ENGRAM_DATA_DIR="$ENGRAM_DATA_DIR" \
ENGRAM_PID=$!
# Wait for engram to become healthy (up to 30s)
# Wait for engram to become healthy (up to 60s; GKE Autopilot cold starts can be slow)
echo "[entrypoint] waiting for engram..."
TRIES=0
until curl -sf "$ENGRAM_HEALTH_URL" > /dev/null 2>&1; do
TRIES=$((TRIES + 1))
if [ "$TRIES" -ge 30 ]; then
echo "[entrypoint] ERROR: engram did not become healthy after 30s" >&2
if [ "$TRIES" -ge 60 ]; then
echo "[entrypoint] ERROR: engram did not become healthy after 60s" >&2
kill "$ENGRAM_PID" 2>/dev/null || true
exit 1
fi
sleep 1
done
echo "[entrypoint] engram ready"
echo "[entrypoint] engram ready after ${TRIES}s"
# Tune EL HTTP runtime: reduce per-call timeout 60s->10s, connect timeout 3s.
export EL_HTTP_TIMEOUT_MS="${EL_HTTP_TIMEOUT_MS:-10000}"
export EL_HTTP_CONNECT_TIMEOUT_MS="${EL_HTTP_CONNECT_TIMEOUT_MS:-3000}"
# Start soul — it takes over as PID 1's foreground process.
# SOUL_ENGRAM_PATH must NOT be set; ENGRAM_URL triggers HTTP mode.
+7 -107
View File
@@ -7,65 +7,6 @@ import "neuron-api.el"
import "sessions.el"
import "soul.elh"
// ---------------------------------------------------------------------------
// Rate limiting simple in-memory per-IP sliding window counter.
//
// State keys:
// rl:<ip>:count request count in the current window
// rl:<ip>:window window start timestamp (unix seconds)
//
// Limit: configurable via soul state key "soul_rate_limit" (requests per
// minute). Falls back to 60 req/min if not set. The /health endpoint is
// exempt so monitoring does not consume quota.
//
// State growth: each unique source IP accumulates exactly 2 state keys
// (count + window) for the lifetime of the process. Per-IP storage is
// bounded and constant; values reset on window expiry. In aggregate, state
// grows linearly with distinct IPs typical for a trusted-client service.
// EL has no state_delete builtin, so keys from inactive IPs persist.
// TODO: add state_delete sweep when the EL runtime exposes that primitive.
//
// Returns "" when the request is allowed, or a 429 JSON body when rejected.
// ---------------------------------------------------------------------------
fn rate_limit_check(ip: String, path: String) -> String {
// Health checks are exempt they must never be blocked.
if str_eq(path, "/health") {
return ""
}
let limit_str: String = state_get("soul_rate_limit")
let limit: Int = if str_eq(limit_str, "") { 60 } else { str_to_int(limit_str) }
let now: Int = time_now()
let window_key: String = "rl:" + ip + ":window"
let count_key: String = "rl:" + ip + ":count"
let win_str: String = state_get(window_key)
let win_start: Int = if str_eq(win_str, "") { now } else { str_to_int(win_str) }
// New window every 60 seconds.
let elapsed: Int = now - win_start
let in_window: Bool = elapsed < 60
let prev_count_str: String = state_get(count_key)
let prev_count: Int = if str_eq(prev_count_str, "") { 0 } else { str_to_int(prev_count_str) }
// Reset window if expired.
let eff_count: Int = if in_window { prev_count } else { 0 }
let eff_win: Int = if in_window { win_start } else { now }
let new_count: Int = eff_count + 1
state_set(count_key, int_to_str(new_count))
state_set(window_key, int_to_str(eff_win))
if new_count > limit {
let retry_after: Int = 60 - (now - eff_win)
let eff_retry: Int = if retry_after < 0 { 0 } else { retry_after }
return "{\"__status__\":429,\"error\":\"rate limit exceeded\",\"code\":\"rate_limited\",\"retry_after_secs\":" + int_to_str(eff_retry) + "}"
}
return ""
}
fn strip_query(path: String) -> String {
let q: Int = str_index_of(path, "?")
if q < 0 {
@@ -75,11 +16,11 @@ fn strip_query(path: String) -> String {
}
fn err_404(path: String) -> String {
return "{\"error\":\"not found\",\"code\":\"not_found\",\"path\":\"" + path + "\"}"
return "{\"error\":\"not found\",\"path\":\"" + path + "\"}"
}
fn err_405(method: String, path: String) -> String {
return "{\"error\":\"method not allowed\",\"code\":\"method_not_allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}"
return "{\"error\":\"method not allowed\",\"method\":\"" + method + "\",\"path\":\"" + path + "\"}"
}
fn route_health() -> String {
@@ -90,35 +31,12 @@ fn route_health() -> String {
let edge_ct: Int = engram_edge_count()
let pulse: String = state_get("soul.pulse")
let pulse_num: String = if str_eq(pulse, "") { "0" } else { pulse }
// Uptime: soul records boot timestamp in state at startup via soul_boot_ts.
// Compute elapsed seconds; fall back to -1 if not yet set.
let boot_ts_str: String = state_get("soul_boot_ts")
let uptime_secs: Int = if str_eq(boot_ts_str, "") {
-1
} else {
time_now() - str_to_int(boot_ts_str)
}
// LLM connectivity: probe with a minimal call. Any non-error reply = ok.
// Use a short, fixed prompt so this never counts against conversation history.
let model: String = state_get("soul_model")
let eff_model: String = if str_eq(model, "") { "claude-sonnet-4-5" } else { model }
let llm_probe: String = llm_call_system(eff_model, "You are a health probe. Reply with the single word: ok", "ping")
let llm_ok: Bool = !str_eq(llm_probe, "")
&& !str_starts_with(llm_probe, "{\"error\"")
&& !str_starts_with(llm_probe, "{\"type\":\"error\"")
&& !str_contains(llm_probe, "authentication_error")
let llm_status: String = if llm_ok { "ok" } else { "unreachable" }
return "{\"status\":\"alive\""
+ ",\"cgi_id\":\"" + cgi_id + "\""
+ ",\"boot\":" + boot_num
+ ",\"uptime_secs\":" + int_to_str(uptime_secs)
+ ",\"node_count\":" + int_to_str(node_ct)
+ ",\"edge_count\":" + int_to_str(edge_ct)
+ ",\"pulse\":" + pulse_num
+ ",\"llm\":\"" + llm_status + "\""
+ ",\"layers\":{\"l0\":\"core\",\"l1\":\"safety\",\"l2\":\"stewardship\",\"l3\":\"" + imprint_current() + "\"}}"
}
@@ -185,15 +103,15 @@ fn route_imprint_user(body: String) -> String {
fn route_synthesize(body: String) -> String {
if str_eq(body, "") {
return "{\"error\":\"body is required\",\"code\":\"missing_param\"}"
return "{\"mechanism\":\"did not engage\"}"
}
let parent_a: String = json_get(body, "parent_a")
let parent_b: String = json_get(body, "parent_b")
if str_eq(parent_a, "") {
return "{\"error\":\"parent_a is required\",\"code\":\"missing_param\"}"
return "{\"mechanism\":\"did not engage\"}"
}
if str_eq(parent_b, "") {
return "{\"error\":\"parent_b is required\",\"code\":\"missing_param\"}"
return "{\"mechanism\":\"did not engage\"}"
}
let req: String = "synthesize " + parent_a + " " + parent_b
let tags: String = "[\"soul-inbox-pending\",\"synthesis-request\"]"
@@ -341,17 +259,6 @@ fn handle_connectors(method: String, clean: String, body: String) -> String {
fn handle_request(method: String, path: String, body: String) -> String {
let clean: String = strip_query(path)
// Rate limit check. Extract caller IP from REMOTE_ADDR env var (set by the
// EL HTTP runtime for each request). Skip enforcement when empty so
// loopback/internal callers are never blocked.
let ip: String = env("REMOTE_ADDR")
if !str_eq(ip, "") {
let rl_result: String = rate_limit_check(ip, clean)
if !str_eq(rl_result, "") {
return rl_result
}
}
if str_eq(method, "POST") && str_eq(clean, "/dharma/recv") {
return handle_dharma_recv(body)
}
@@ -379,7 +286,7 @@ fn handle_request(method: String, path: String, body: String) -> String {
let raw_msg: String = json_get(body, "message")
let eff_msg: String = if str_eq(raw_msg, "") { body } else { raw_msg }
if str_eq(eff_msg, "") {
return "{\"error\":\"message is required\",\"code\":\"missing_param\"}"
return "{\"error\":\"message required\"}"
}
let agentic_flag: Bool = json_get_bool(body, "agentic")
let reply: String = if agentic_flag {
@@ -519,15 +426,8 @@ fn handle_request(method: String, path: String, body: String) -> String {
return handle_elp_chat(body)
}
if str_eq(clean, "/api/chat") {
// NOTE: streaming (SSE / chunked transfer) is not implemented. All chat
// responses are buffered and returned as a single JSON object. Streaming
// would require runtime-level SSE support in el_runtime.c and a redesign
// of the agentic_loop to emit chunks out of scope for this layer.
let raw_msg: String = json_get(body, "message")
if str_eq(raw_msg, "") {
return "{\"error\":\"message is required\",\"code\":\"missing_param\"}"
}
let agentic_flag: Bool = json_get_bool(body, "agentic")
let raw_msg: String = json_get(body, "message")
let reply: String = if agentic_flag {
handle_chat_agentic(body)
} else {
+26 -3
View File
@@ -144,17 +144,22 @@ fn safety_screen(input: String, history: String) -> String {
if score >= soft {
let summary: String = str_slice(input, 0, 80)
let discard: String = safety_log_bell("soft", "wellbeing check needed", summary)
// ISSUE 7 fix: escape tab chars in addition to backslash/quote/newline/CR.
// A tab in user input corrupts the JSON envelope and causes json_get to misparse.
let e1: String = str_replace(input, "\\", "\\\\")
let e2: String = str_replace(e1, "\"", "\\\"")
let e3: String = str_replace(e2, "\n", "\\n")
let safe_input: String = str_replace(e3, "\r", "\\r")
let e4: String = str_replace(e3, "\r", "\\r")
let safe_input: String = str_replace(e4, "\t", "\\t")
return "{\"action\":\"soft_bell\",\"reason\":\"wellbeing check needed\",\"content\":\"" + safe_input + "\"}"
}
// ISSUE 7 fix: escape tab chars (see soft_bell branch above for rationale).
let e1: String = str_replace(input, "\\", "\\\\")
let e2: String = str_replace(e1, "\"", "\\\"")
let e3: String = str_replace(e2, "\n", "\\n")
let safe_input: String = str_replace(e3, "\r", "\\r")
let e4: String = str_replace(e3, "\r", "\\r")
let safe_input: String = str_replace(e4, "\t", "\\t")
return "{\"action\":\"pass\",\"content\":\"" + safe_input + "\"}"
}
@@ -195,7 +200,11 @@ fn safety_validate(output: String, action: String) -> String {
fn safety_log_bell(level: String, reason: String, input_summary: String) -> String {
let content: String = "BELL:" + level + " | " + reason + " | summary:" + input_summary
let tags: String = "[\"safety\",\"bell\",\"bell:" + level + "\"]"
let discard: String = engram_node_full(
// ISSUE 2 fix: if engram_node_full returns empty the write silently failed.
// Emit a fallback println so the bell event leaves at least a log trace even
// when engram is degraded. This does not replace engram persistence -- it is a
// last-resort audit trail when the primary write cannot be confirmed.
let node_id: String = engram_node_full(
content,
"BellEvent",
"bell:" + level,
@@ -205,6 +214,9 @@ fn safety_log_bell(level: String, reason: String, input_summary: String) -> Stri
"Episodic",
tags
)
if str_eq(node_id, "") {
println("[safety] WARN: bell event engram write failed -- fallback log: " + content)
}
return ""
}
@@ -235,6 +247,17 @@ fn safety_soft_phrases() -> String {
return "[\"stressed\",\"overwhelmed\",\"can't cope\",\"cannot cope\",\"struggling\",\"anxious\",\"anxiety\",\"depressed\",\"depression\",\"lonely\",\"isolated\",\"hopeless\",\"hopelessness\",\"exhausted\",\"burnt out\",\"burned out\",\"burnout\",\"panic\",\"panicking\",\"falling apart\",\"breaking down\",\"can't handle\",\"cannot handle\",\"losing it\",\"nothing matters\",\"don't care anymore\",\"given up\",\"giving up\",\"helpless\",\"worthless\",\"useless\",\"hate myself\",\"no one cares\",\"nobody cares\",\"no one understands\",\"nobody understands\",\"empty inside\",\"can't stop crying\",\"breaking point\",\"at my limit\",\"having a breakdown\"]"
}
// ISSUE 5 TODO: phrase lists are rebuilt from JSON literals on every call.
// safety_any_match and safety_count_match loop over json_array_get on every invocation.
// A compiled/cached representation would reduce per-message overhead and also guard against
// malformed phrase JSON (json_array_len of malformed input returns 0, silently skipping all checks).
// Caching requires language-level static const arrays -- not available in current EL.
// When EL gains module-level const arrays, migrate phrase lists to that form.
//
// ISSUE 5 TODO: phrase lists are rebuilt from JSON literals on every call to
// safety_any_match / safety_count_match. json_array_len of a malformed string
// returns 0, silently skipping all checks. Caching requires language-level static
// const arrays (not available in current EL). Migrate when EL gains that feature.
// Matching helpers (single loops only el escapes while-body mutation via
// top-level let rebinds; nested loops would not advance) ────────────────────
+46 -11
View File
@@ -5,13 +5,9 @@ import "stewardship.el"
import "imprint.el"
import "awareness.el"
import "chat.el"
import "safety.el"
import "studio.el"
import "elp-input.el"
import "routes.el"
import "safety.el"
import "stewardship.el"
import "imprint.el"
cgi "neuron-soul" {
dharma_id: "ntn-genesis@http://localhost:7770",
@@ -265,19 +261,32 @@ fn layered_cycle(raw_input: String) -> String {
let screen_result: String = safety_screen(raw_input, history)
let screen_action: String = json_get(screen_result, "action")
// ISSUE 4: safe-mode guard -- if safety_screen returned invalid/empty action,
// refuse the turn rather than silently passing unscreened input to upper layers.
// Valid actions: "hard_bell", "soft_bell", "pass". Anything else = corrupt envelope.
let valid_action: Bool = str_eq(screen_action, "hard_bell")
|| str_eq(screen_action, "soft_bell")
|| str_eq(screen_action, "pass")
if !valid_action {
println("[soul] layered_cycle: safety_screen invalid action -- safe mode refusal")
return safety_validate("", "hard_bell")
}
// Hard bell: bypass all upper layers, log and escalate.
// Intentionally does NOT update conversation_history or call auto_persist():
// hard bell events are security-sensitive and must not appear in engram conversation
// history where they could leak context to subsequent turns. They are persisted
// separately by safety_log_bell() into the Episodic tier with restricted labels.
//
// ISSUE 6: safety_log_bell for hard bells is already called INSIDE safety_screen
// (safety.el line 140). Do NOT call it again here -- double-log avoided.
//
// safety_validate second param: when screen_action is "hard_bell", safety_validate
// receives the sentinel string "hard_bell" (not a normal screen action). The safety
// layer contract requires it to return a fixed refusal regardless of the output arg.
// On the normal path, safety_validate receives the original screen_action ("pass")
// so it can apply action-specific post-output checks.
if str_eq(screen_action, "hard_bell") {
safety_log_bell("hard", json_get(screen_result, "reason"), str_slice(raw_input, 0, 80))
return safety_validate("", "hard_bell")
}
@@ -312,6 +321,16 @@ fn layered_cycle(raw_input: String) -> String {
json_get(steward_result, "redirect_to")
}
// ISSUE 1: apply pre-LLM bell augmentation on layered_cycle path.
// safety_augment_system injects soft/hard directive into system prompt before LLM call.
// Stored in state so imprint_respond can consume it.
// TODO: wire directly into imprint_respond when it accepts a system_override param.
// ISSUE 3 TODO: no semantic/embedding crisis detection. Keyword-only means signals
// evading the phrase list pass through with zero augmentation. Semantic layer is a
// separate architectural decision requiring embedding inference on every message.
let augmented_addendum: String = safety_augment_system("", raw_input)
state_set("layered_cycle_safety_system_addendum", augmented_addendum)
// L3: imprint responds
let output: String = imprint_respond(aligned, imprint_id)
@@ -351,12 +370,29 @@ let snapshot_usable: Bool = local_node_count > 50
if using_http_engram && !snapshot_usable {
// First boot or empty/corrupt snapshot: seed from HTTP Engram.
// Retry up to 3 times (2s sleep between attempts) to guard against a
// transient network hiccup right after entrypoint.sh health check passes.
// An empty nodes response silently loads a zero-node graph; validate first.
// TODO(reliability): replace sleep_ms retry with non-blocking backoff.
println("[soul] engram -> HTTP " + engram_url_raw + " (no local snapshot, first boot)")
let nodes_json: String = http_get(engram_url_raw + "/api/nodes?limit=10000")
let edges_json: String = http_get(engram_url_raw + "/api/edges")
let nodes_part: String = if str_eq(nodes_json, "") { "[]" } else { nodes_json }
let edges_part: String = if str_eq(edges_json, "") { "[]" } else { edges_json }
let snapshot_data: String = "{\"nodes\":" + nodes_part + ",\"edges\":" + edges_part + "}"
let fetch_attempt: Int = 0
while fetch_attempt < 3 {
let fetch_attempt = fetch_attempt + 1
let n: String = http_get(engram_url_raw + "/api/nodes?limit=10000")
let e: String = http_get(engram_url_raw + "/api/edges")
let nodes_ok: Bool = !str_eq(n, "") && str_starts_with(n, "[") && str_len(n) > 2
if nodes_ok {
state_set("_boot_nodes_json", n)
state_set("_boot_edges_json", e)
let fetch_attempt = 3
} else {
println("[soul] boot HTTP fetch attempt " + int_to_str(fetch_attempt) + " failed --- retrying in 2s")
sleep_ms(2000)
}
}
let nodes_json: String = state_get("_boot_nodes_json")
let edges_json: String = state_get("_boot_edges_json")
let snapshot_data: String = "{\"nodes\":" + nodes_part + ",\"edges\":" + edges_part + "}"
let tmp_path: String = "/tmp/soul-engram-" + soul_cgi_id + ".json"
fs_write(tmp_path, snapshot_data)
engram_load(tmp_path)
@@ -369,7 +405,6 @@ load_identity_context()
seed_persona_from_env()
let boot_num: Int = mem_boot_count_inc()
state_set("soul_boot_count", int_to_str(boot_num))
state_set("soul_boot_ts", int_to_str(time_now()))
println("[soul] boot #" + int_to_str(boot_num))
emit_session_start_event()