From 35c189759cc676e112817cf4255b44a7023468b1 Mon Sep 17 00:00:00 2001 From: "will.anderson" Date: Thu, 11 Jun 2026 13:40:10 -0500 Subject: [PATCH 1/2] =?UTF-8?q?feat(runtime):=20add=20engram=5Fwm=5F*,=20e?= =?UTF-8?q?ngram=5Fload=5Fmerge,=20http=5Fserve=5Fasync=20=E2=80=94=20need?= =?UTF-8?q?ed=20by=20soul=20CI?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- lang/el-compiler/runtime/el_runtime.c | 328 ++++++++++++++++++++++++++ lang/el-compiler/runtime/el_runtime.h | 7 + 2 files changed, 335 insertions(+) diff --git a/lang/el-compiler/runtime/el_runtime.c b/lang/el-compiler/runtime/el_runtime.c index 6b97221..2f4c839 100644 --- a/lang/el-compiler/runtime/el_runtime.c +++ b/lang/el-compiler/runtime/el_runtime.c @@ -1882,6 +1882,83 @@ el_val_t http_serve_v2(el_val_t port, el_val_t handler) { return 0; } +/* ── http_serve_async — non-blocking HTTP server ─────────────────────────── */ +/* Runs the accept loop in a background pthread, returns immediately so the + * calling EL script can continue (e.g. to run an awareness loop). + * + * El signature: http_serve_async(port, handler) -> Void */ + +typedef struct { int sock; } HttpServeAsyncArg; + +static void* _http_serve_async_loop(void* raw) { + HttpServeAsyncArg* a = (HttpServeAsyncArg*)raw; + int sock = a->sock; + free(a); + while (1) { + struct sockaddr_in6 cli; + socklen_t clen = sizeof(cli); + int cfd = accept(sock, (struct sockaddr*)&cli, &clen); + if (cfd < 0) { + if (errno == EINTR) continue; + perror("accept"); break; + } + pthread_mutex_lock(&_http_conn_mu); + while (_http_conn_active >= HTTP_MAX_CONNS) { + pthread_cond_wait(&_http_conn_cv, &_http_conn_mu); + } + _http_conn_active++; + pthread_mutex_unlock(&_http_conn_mu); + HttpWorkerArg* arg = malloc(sizeof(HttpWorkerArg)); + if (!arg) { close(cfd); continue; } + arg->fd = cfd; + pthread_t tid; + if (pthread_create(&tid, NULL, http_worker, arg) != 0) { + close(cfd); free(arg); + pthread_mutex_lock(&_http_conn_mu); + _http_conn_active--; + pthread_cond_signal(&_http_conn_cv); + pthread_mutex_unlock(&_http_conn_mu); + continue; + } + pthread_detach(tid); + } + close(sock); + return NULL; +} + +void http_serve_async(el_val_t port, el_val_t handler) { + const char* hname = EL_CSTR(handler); + if (hname && looks_like_string(handler)) { + http_set_handler(handler); + } + int p = (int)port; + if (p <= 0 || p > 65535) { fprintf(stderr, "http_serve_async: invalid port %d\n", p); return; } + int sock = socket(AF_INET6, SOCK_STREAM, 0); + if (sock < 0) { perror("socket"); return; } + int yes = 1; int no = 0; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &no, sizeof(no)); + struct sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_addr = in6addr_any; + addr.sin6_port = htons((uint16_t)p); + if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + perror("bind"); close(sock); return; + } + if (listen(sock, 64) < 0) { perror("listen"); close(sock); return; } + fprintf(stderr, "[http] async listening on [::]:%d (dual-stack)\n", p); + HttpServeAsyncArg* a = malloc(sizeof(HttpServeAsyncArg)); + if (!a) { close(sock); return; } + a->sock = sock; + pthread_t tid; + if (pthread_create(&tid, NULL, _http_serve_async_loop, a) != 0) { + perror("pthread_create"); free(a); close(sock); return; + } + pthread_detach(tid); + /* Returns immediately — caller can now run awareness_run() or any loop. */ +} + /* Build the response envelope a 4-arg handler can return. We hand-write * the JSON so the discriminator key always lands first — the runtime's * http_parse_envelope() detects it via prefix match. headers_json must be @@ -7915,6 +7992,257 @@ el_val_t engram_query_range(el_val_t start_ms_v, el_val_t end_ms_v) { return el_wrap_str(b.buf); } +/* engram_load_merge — like engram_load but WITHOUT resetting the store. + * Reads a JSON snapshot from `path` and adds any nodes/edges not already + * present in the in-memory graph. Dedup is by node id (for nodes) and by + * (from_id, to_id, relation) tuple (for edges). + * + * Returns (as an EL int) the count of new nodes added. Embeddings are + * intentionally skipped on merged nodes to avoid Ollama delays at runtime; + * auto_link_semantic will handle them when nodes are next activated. + * + * Does not merge layers — the in-process layer registry is authoritative. */ +el_val_t engram_load_merge(el_val_t path) { + const char* p = EL_CSTR(path); + if (!p || !*p) return 0; + FILE* f = fopen(p, "rb"); + if (!f) return 0; + fseek(f, 0, SEEK_END); + long sz = ftell(f); + rewind(f); + if (sz <= 0) { fclose(f); return 0; } + char* data = malloc((size_t)sz + 1); + if (!data) { fclose(f); return 0; } + size_t got = fread(data, 1, (size_t)sz, f); + fclose(f); + data[got] = '\0'; + + EngramStore* g = engram_get(); + int64_t added_nodes = 0; + + /* Walk nodes array — skip any node whose id already exists */ + const char* nodes_p = json_find_key(data, "nodes"); + if (nodes_p) { + nodes_p = eg_skip_ws(nodes_p); + if (*nodes_p == '[') { + nodes_p++; + nodes_p = eg_skip_ws(nodes_p); + while (*nodes_p && *nodes_p != ']') { + if (*nodes_p != '{') { nodes_p++; continue; } + const char* end = json_skip_value(nodes_p); + size_t n = (size_t)(end - nodes_p); + char* obj = malloc(n + 1); + memcpy(obj, nodes_p, n); obj[n] = '\0'; + char* nid = eg_get_str_field(obj, "id"); + int already = (nid && *nid && engram_find_node(nid) != NULL); + free(nid); + if (!already) { + engram_grow_nodes(); + EngramNode* nn = &g->nodes[g->node_count]; + memset(nn, 0, sizeof(*nn)); + nn->id = eg_get_str_field(obj, "id"); + nn->content = eg_get_str_field(obj, "content"); + nn->node_type = eg_get_str_field(obj, "node_type"); + nn->label = eg_get_str_field(obj, "label"); + nn->tier = eg_get_str_field(obj, "tier"); + nn->tags = eg_get_str_field(obj, "tags"); + nn->metadata = eg_get_str_field(obj, "metadata"); + if (!nn->metadata || !*nn->metadata) { free(nn->metadata); nn->metadata = strdup("{}"); } + nn->salience = eg_get_num_field(obj, "salience"); + nn->importance = eg_get_num_field(obj, "importance"); + nn->confidence = eg_get_num_field(obj, "confidence"); + nn->temporal_decay_rate = eg_get_num_field(obj, "temporal_decay_rate"); + nn->activation_count = eg_get_int_field(obj, "activation_count"); + nn->last_activated = eg_get_int_field(obj, "last_activated"); + nn->created_at = eg_get_int_field(obj, "created_at"); + nn->updated_at = eg_get_int_field(obj, "updated_at"); + nn->background_activation = eg_get_num_field(obj, "background_activation"); + nn->working_memory_weight = eg_get_num_field(obj, "working_memory_weight"); + if (!isfinite(nn->working_memory_weight) || nn->working_memory_weight < 0.0 || nn->working_memory_weight > 1.0) + nn->working_memory_weight = 0.0; /* clamp corrupt snapshot values */ + nn->suppression_count = (int32_t)eg_get_int_field(obj, "suppression_count"); + if (json_find_key(obj, "layer_id")) { + nn->layer_id = (uint32_t)eg_get_int_field(obj, "layer_id"); + } else { + nn->layer_id = ENGRAM_LAYER_DEFAULT; + } + g->node_count++; + added_nodes++; + } + free(obj); + nodes_p = end; + nodes_p = eg_skip_ws(nodes_p); + if (*nodes_p == ',') { nodes_p++; nodes_p = eg_skip_ws(nodes_p); } + } + } + } + + /* Walk edges array — skip if (from_id, to_id, relation) already present */ + const char* edges_p = json_find_key(data, "edges"); + if (edges_p) { + edges_p = eg_skip_ws(edges_p); + if (*edges_p == '[') { + edges_p++; + edges_p = eg_skip_ws(edges_p); + while (*edges_p && *edges_p != ']') { + if (*edges_p != '{') { edges_p++; continue; } + const char* end = json_skip_value(edges_p); + size_t n = (size_t)(end - edges_p); + char* obj = malloc(n + 1); + memcpy(obj, edges_p, n); obj[n] = '\0'; + char* efrom = eg_get_str_field(obj, "from_id"); + char* eto = eg_get_str_field(obj, "to_id"); + char* erel = eg_get_str_field(obj, "relation"); + /* Check for duplicate by scanning existing edges */ + int dup = 0; + if (efrom && eto && erel) { + for (int64_t ei = 0; ei < g->edge_count; ei++) { + EngramEdge* ex = &g->edges[ei]; + if (ex->from_id && ex->to_id && ex->relation && + strcmp(ex->from_id, efrom) == 0 && + strcmp(ex->to_id, eto) == 0 && + strcmp(ex->relation, erel) == 0) { + dup = 1; break; + } + } + } + if (!dup) { + engram_grow_edges(); + EngramEdge* ee = &g->edges[g->edge_count]; + memset(ee, 0, sizeof(*ee)); + ee->id = eg_get_str_field(obj, "id"); + ee->from_id = efrom ? efrom : strdup(""); + ee->to_id = eto ? eto : strdup(""); + ee->relation = erel ? erel : strdup(""); + ee->metadata = eg_get_str_field(obj, "metadata"); + if (!ee->metadata || !*ee->metadata) { free(ee->metadata); ee->metadata = strdup("{}"); } + ee->weight = eg_get_num_field(obj, "weight"); + ee->confidence = eg_get_num_field(obj, "confidence"); + ee->created_at = eg_get_int_field(obj, "created_at"); + ee->updated_at = eg_get_int_field(obj, "updated_at"); + ee->last_fired = eg_get_int_field(obj, "last_fired"); + ee->inhibitory = (int)eg_get_int_field(obj, "inhibitory"); + if (json_find_key(obj, "layer_id")) { + ee->layer_id = (uint32_t)eg_get_int_field(obj, "layer_id"); + } else { + ee->layer_id = ENGRAM_LAYER_DEFAULT; + } + g->edge_count++; + /* NOTE: efrom/eto/erel ownership transferred to ee above */ + efrom = NULL; eto = NULL; erel = NULL; + } else { + free(efrom); free(eto); free(erel); + } + free(obj); + edges_p = end; + edges_p = eg_skip_ws(edges_p); + if (*edges_p == ',') { edges_p++; edges_p = eg_skip_ws(edges_p); } + } + } + } + + free(data); + return (el_val_t)added_nodes; +} + +el_val_t engram_wm_count(void) { + EngramStore* g = engram_get(); + int64_t count = 0; + for (int64_t i = 0; i < g->node_count; i++) { + if (g->nodes[i].working_memory_weight > 0.0) count++; + } + return (el_val_t)count; +} + +/* Average working_memory_weight across all promoted nodes (wm > 0). + * Returns the float bit-pattern via el_from_float so EL can use it with + * float_to_str / float_gt. Returns 0.0 when no nodes are promoted. + * Useful in heartbeat ISEs to distinguish "many weak activations" (sparse + * graph, low avg) from "few strong activations" (dense subgraph, high avg). + * Added 2026-06-04 self-review for graph health observability. */ +el_val_t engram_wm_avg_weight(void) { + EngramStore* g = engram_get(); + double sum = 0.0; + int64_t count = 0; + for (int64_t i = 0; i < g->node_count; i++) { + double w = g->nodes[i].working_memory_weight; + /* Defensive guard: skip any corrupt/out-of-range values so a single + * bad snapshot node doesn't produce a garbage average (e.g. 1.77e+234). */ + if (w > 0.0 && w <= 1.0 && isfinite(w)) { sum += w; count++; } + } + double avg = (count > 0) ? (sum / (double)count) : 0.0; + return el_from_float(avg); +} + +/* engram_wm_top_json — return top N working-memory nodes (by wm weight) as a + * compact JSON array for ISE heartbeat reporting. + * + * Each element: {"label":"...","node_type":"...","tier":"...","wm":0.42} + * + * Purpose: the heartbeat ISE reports wm_active (count) and wm_avg_weight but + * gives zero visibility into WM *composition* — which types/tiers are active. + * After long uptime every WM slot is in steady-state decay+re-promotion so + * wm_promotion ISEs never fire (they only fire on 0→>0.1 transitions). + * This function fills the observability gap by snapshotting the current top-N + * WM nodes on every heartbeat. Inserted 2026-06-05 self-review. */ +el_val_t engram_wm_top_json(el_val_t n_v) { + int64_t top_n = (int64_t)n_v; + if (top_n <= 0) top_n = 10; + if (top_n > 50) top_n = 50; + EngramStore* g = engram_get(); + + /* Collect indices of promoted nodes, excluding monitoring noise. + * InternalStateEvent nodes are system-observation artifacts — they reflect + * what the daemon is doing, not what it knows. Including them in wm_top + * buries real knowledge (Memory, Knowledge, Belief nodes) under a wall of + * heartbeat/curiosity ISEs, making the heartbeat ISE useless for diagnosing + * WM composition. Filter them out here so wm_top always shows substantive + * content. (2026-06-07 self-review) */ + int64_t* idx = malloc((size_t)(g->node_count + 1) * sizeof(int64_t)); + if (!idx) return el_wrap_str(el_strdup("[]")); + int64_t mc = 0; + for (int64_t i = 0; i < g->node_count; i++) { + if (g->nodes[i].working_memory_weight > 0.0) { + const char* nt = g->nodes[i].node_type; + if (nt && strcmp(nt, "InternalStateEvent") == 0) continue; + idx[mc++] = i; + } + } + + /* Insertion-sort descending by wm weight (mc is typically small). */ + for (int64_t i = 1; i < mc; i++) { + int64_t key = idx[i]; + double kw = g->nodes[key].working_memory_weight; + int64_t j = i; + while (j > 0 && g->nodes[idx[j-1]].working_memory_weight < kw) { + idx[j] = idx[j-1]; j--; + } + idx[j] = key; + } + + int64_t emit = mc < top_n ? mc : top_n; + JsonBuf b; jb_init(&b); + jb_putc(&b, '['); + for (int64_t k = 0; k < emit; k++) { + EngramNode* n = &g->nodes[idx[k]]; + if (k > 0) jb_putc(&b, ','); + jb_putc(&b, '{'); + jb_puts(&b, "\"label\":"); + jb_emit_escaped(&b, n->label ? n->label : ""); + jb_puts(&b, ",\"node_type\":"); + jb_emit_escaped(&b, n->node_type ? n->node_type : ""); + jb_puts(&b, ",\"tier\":"); + jb_emit_escaped(&b, n->tier ? n->tier : ""); + char tmp[48]; + snprintf(tmp, sizeof(tmp), ",\"wm\":%.3f", n->working_memory_weight); + jb_puts(&b, tmp); + jb_putc(&b, '}'); + } + free(idx); + jb_putc(&b, ']'); + return el_wrap_str(b.buf); +} + #ifdef HAVE_CURL /* ── DHARMA network ───────────────────────────────────────────────────────── * Real implementation. Peers are addressed by `dharma_id` — either bare diff --git a/lang/el-compiler/runtime/el_runtime.h b/lang/el-compiler/runtime/el_runtime.h index 2f9583f..4ee01d8 100644 --- a/lang/el-compiler/runtime/el_runtime.h +++ b/lang/el-compiler/runtime/el_runtime.h @@ -176,6 +176,7 @@ el_val_t http_set_handler(el_val_t name); * existing handlers (e.g. products/web/server.el): it dispatches with * (method, path, body), hardcodes 200 OK, and auto-detects content type. */ el_val_t http_serve_v2(el_val_t port, el_val_t handler); +void http_serve_async(el_val_t port, el_val_t handler); el_val_t http_set_handler_v2(el_val_t name); /* Build an HTTP response envelope. `headers_json` should be a JSON object @@ -638,6 +639,12 @@ el_val_t engram_list_layers_json(void); * no nodes promoted to working memory. */ el_val_t engram_compile_layered_json(el_val_t intent, el_val_t depth); +/* ── Working memory ──────────────────────────────────────────────────────────*/ +el_val_t engram_wm_count(void); +el_val_t engram_wm_avg_weight(void); +el_val_t engram_wm_top_json(el_val_t n); +el_val_t engram_load_merge(el_val_t path); + /* ── LLM (Anthropic API client) ───────────────────────────────────────────── * All functions call https://api.anthropic.com/v1/messages with the API key * from env ANTHROPIC_API_KEY. Default model when empty: claude-sonnet-4-5. */ -- 2.52.0 From 2dec76c87ac804f4c0ab910833310a1b3f4eb6e8 Mon Sep 17 00:00:00 2001 From: Tim Lingo <1timlingo@gmail.com> Date: Tue, 16 Jun 2026 19:46:56 -0500 Subject: [PATCH 2/2] fix(runtime): reconcile live data-integrity fixes onto main (UAF + atomic engram_save) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Ports the fixes that until now lived only in the un-versioned el-sdk source the live macOS soul was hand-built from (captured in the [DO NOT MERGE] live-darwin-runtime snapshot) FORWARD onto main, faithfully and minimally — without dragging in the snapshot's deletions of main's newer engram_wm_/engram_load_merge/http_serve_async. 1. UAF (hallucinated/lost-saves root cause): engram_new_id + engram_node_full now use el_strdup_persist, NOT el_strdup. el_strdup tracks into the per-request arena that el_request_end() frees when the creating HTTP request completes — leaving stored nodes with dangling pointers (corrupted ids, 'saved but never listed'). Transplanted verbatim from the live runtime; el_strdup_persist sites 19->27, matching live. 2. Atomic engram_save: write .tmp, fflush+fsync, rename() over target (atomic on POSIX) so a booting soul's engram_load never reads a truncated/0-byte snapshot — the genesis -> nodes=1 -> 63-node-clobber loop. Plus a sparse-write floor: refuse to overwrite a >200KB snapshot with one < 1/16 its size. (Validated in isolation: harness 11/11; rebuilt+booted the darwin soul, round-tripped 5113 nodes, no clobber.) The response-truncation fix is already on main (_tl_fs_read_len binary-safe length). Compiles clean. For Will to build through CI/elb and deploy. Co-Authored-By: Claude Opus 4.8 (1M context) --- lang/el-compiler/runtime/el_runtime.c | 48 +++++++++++++++++++-------- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/lang/el-compiler/runtime/el_runtime.c b/lang/el-compiler/runtime/el_runtime.c index 2f4c839..1036d16 100644 --- a/lang/el-compiler/runtime/el_runtime.c +++ b/lang/el-compiler/runtime/el_runtime.c @@ -6322,7 +6322,9 @@ static void engram_grow_edges(void) { static char* engram_new_id(void) { el_val_t v = uuid_new(); const char* s = EL_CSTR(v); - return el_strdup(s ? s : ""); + /* Persistent: node ids live in the global store; an arena (el_strdup) id is + * freed at el_request_end(), corrupting the node after the creating request. */ + return el_strdup_persist(s ? s : ""); } /* Convert a node into an ElMap of its fields. */ @@ -6417,12 +6419,17 @@ el_val_t engram_node_full(el_val_t content, el_val_t node_type, el_val_t label, const char* lb = EL_CSTR(label); const char* ti = EL_CSTR(tier); const char* tg = EL_CSTR(tags); - n->content = el_strdup(c ? c : ""); - n->node_type = el_strdup(nt && *nt ? nt : "Memory"); - n->label = el_strdup(lb && *lb ? lb : (c ? engram_first_n_chars(c, 60) : "")); - n->tier = el_strdup(ti && *ti ? ti : "Working"); - n->tags = el_strdup(tg ? tg : ""); - n->metadata = el_strdup("{}"); + /* Persistent (el_strdup_persist, NOT el_strdup): these strings are owned by the + * persistent global node store. el_strdup tracks into the per-request arena, which + * el_request_end() frees when the creating HTTP request completes — leaving the + * stored node with dangling pointers (corrupted ids, "saved but never listed"). + * This is the root cause of the hallucinated/lost-saves class of bugs. */ + n->content = el_strdup_persist(c ? c : ""); + n->node_type = el_strdup_persist(nt && *nt ? nt : "Memory"); + n->label = el_strdup_persist(lb && *lb ? lb : (c ? engram_first_n_chars(c, 60) : "")); + n->tier = el_strdup_persist(ti && *ti ? ti : "Working"); + n->tags = el_strdup_persist(tg ? tg : ""); + n->metadata = el_strdup_persist("{}"); n->salience = engram_decode_score(salience); n->importance = engram_decode_score(importance); n->confidence = engram_decode_score(confidence); @@ -7365,13 +7372,28 @@ el_val_t engram_save(el_val_t path) { jb_putc(&b, '}'); } jb_puts(&b, "]}"); - FILE* f = fopen(p, "wb"); - if (!f) { free(b.buf); return 0; } + { + struct stat _st; + if (stat(p, &_st) == 0 && _st.st_size > 200000 && + (uint64_t)b.len < (uint64_t)_st.st_size / 16) { + fprintf(stderr, "[engram_save] REFUSED sparse write: new %zu vs existing %lld (<1/16) protecting %s\n", + b.len, (long long)_st.st_size, p); + free(b.buf); return 0; + } + } + size_t _plen = strlen(p); + char* _tmp = (char*)malloc(_plen + 5); + if (!_tmp) { free(b.buf); return 0; } + memcpy(_tmp, p, _plen); memcpy(_tmp + _plen, ".tmp", 5); + FILE* f = fopen(_tmp, "wb"); + if (!f) { free(_tmp); free(b.buf); return 0; } size_t w = fwrite(b.buf, 1, b.len, f); - fclose(f); - int ok = (w == b.len); - free(b.buf); - return ok ? 1 : 0; + int wok = (w == b.len); + if (wok) { fflush(f); fsync(fileno(f)); } + fclose(f); free(b.buf); + if (!wok) { unlink(_tmp); free(_tmp); return 0; } + if (rename(_tmp, p) != 0) { unlink(_tmp); free(_tmp); return 0; } + free(_tmp); return 1; } /* Helper: extract a string field from a JSON object substring. */ -- 2.52.0