From a89080e3d4cf14a41be8644794eb083fea0d6d2c Mon Sep 17 00:00:00 2001 From: "will.anderson" Date: Thu, 11 Jun 2026 13:28:00 -0500 Subject: [PATCH] feat(runtime): add WM APIs and http_serve_async to el_runtime --- lang/el-compiler/runtime/el_runtime.c | 310 ++++++++++++++++++++++++++ lang/el-compiler/runtime/el_runtime.h | 7 + 2 files changed, 317 insertions(+) diff --git a/lang/el-compiler/runtime/el_runtime.c b/lang/el-compiler/runtime/el_runtime.c index 6b97221..ce7519a 100644 --- a/lang/el-compiler/runtime/el_runtime.c +++ b/lang/el-compiler/runtime/el_runtime.c @@ -1632,6 +1632,83 @@ el_val_t http_serve(el_val_t port, el_val_t handler) { return 0; } +/* ── http_serve_async — non-blocking HTTP server ─────────────────────────── */ +/* Runs the accept loop in a background pthread, returns immediately so the + * calling EL script can continue (e.g. to run an awareness loop). + * + * El signature: http_serve_async(port, handler) -> Void */ + +typedef struct { int sock; } HttpServeAsyncArg; + +static void* _http_serve_async_loop(void* raw) { + HttpServeAsyncArg* a = (HttpServeAsyncArg*)raw; + int sock = a->sock; + free(a); + while (1) { + struct sockaddr_in6 cli; + socklen_t clen = sizeof(cli); + int cfd = accept(sock, (struct sockaddr*)&cli, &clen); + if (cfd < 0) { + if (errno == EINTR) continue; + perror("accept"); break; + } + pthread_mutex_lock(&_http_conn_mu); + while (_http_conn_active >= HTTP_MAX_CONNS) { + pthread_cond_wait(&_http_conn_cv, &_http_conn_mu); + } + _http_conn_active++; + pthread_mutex_unlock(&_http_conn_mu); + HttpWorkerArg* arg = malloc(sizeof(HttpWorkerArg)); + if (!arg) { close(cfd); continue; } + arg->fd = cfd; + pthread_t tid; + if (pthread_create(&tid, NULL, http_worker, arg) != 0) { + close(cfd); free(arg); + pthread_mutex_lock(&_http_conn_mu); + _http_conn_active--; + pthread_cond_signal(&_http_conn_cv); + pthread_mutex_unlock(&_http_conn_mu); + continue; + } + pthread_detach(tid); + } + close(sock); + return NULL; +} + +void http_serve_async(el_val_t port, el_val_t handler) { + const char* hname = EL_CSTR(handler); + if (hname && looks_like_string(handler)) { + http_set_handler(handler); + } + int p = (int)port; + if (p <= 0 || p > 65535) { fprintf(stderr, "http_serve_async: invalid port %d\n", p); return; } + int sock = socket(AF_INET6, SOCK_STREAM, 0); + if (sock < 0) { perror("socket"); return; } + int yes = 1; int no = 0; + setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &yes, sizeof(yes)); + setsockopt(sock, IPPROTO_IPV6, IPV6_V6ONLY, &no, sizeof(no)); + struct sockaddr_in6 addr; + memset(&addr, 0, sizeof(addr)); + addr.sin6_family = AF_INET6; + addr.sin6_addr = in6addr_any; + addr.sin6_port = htons((uint16_t)p); + if (bind(sock, (struct sockaddr*)&addr, sizeof(addr)) < 0) { + perror("bind"); close(sock); return; + } + if (listen(sock, 64) < 0) { perror("listen"); close(sock); return; } + fprintf(stderr, "[http] async listening on [::]:%d (dual-stack)\n", p); + HttpServeAsyncArg* a = malloc(sizeof(HttpServeAsyncArg)); + if (!a) { close(sock); return; } + a->sock = sock; + pthread_t tid; + if (pthread_create(&tid, NULL, _http_serve_async_loop, a) != 0) { + perror("pthread_create"); free(a); close(sock); return; + } + pthread_detach(tid); + /* Returns immediately — caller can now run awareness_run() or any loop. */ +} + /* ── HTTP server v2 — request headers + structured response ──────────────── */ /* * v2 widens the handler signature from @@ -7508,6 +7585,159 @@ el_val_t engram_load(el_val_t path) { return 1; } +/* engram_load_merge — like engram_load but WITHOUT resetting the store. + * Reads a JSON snapshot from `path` and adds any nodes/edges not already + * present in the in-memory graph. Dedup is by node id (for nodes) and by + * (from_id, to_id, relation) tuple (for edges). + * + * Returns (as an EL int) the count of new nodes added. Embeddings are + * intentionally skipped on merged nodes to avoid Ollama delays at runtime; + * auto_link_semantic will handle them when nodes are next activated. + * + * Does not merge layers — the in-process layer registry is authoritative. */ +el_val_t engram_load_merge(el_val_t path) { + const char* p = EL_CSTR(path); + if (!p || !*p) return 0; + FILE* f = fopen(p, "rb"); + if (!f) return 0; + fseek(f, 0, SEEK_END); + long sz = ftell(f); + rewind(f); + if (sz <= 0) { fclose(f); return 0; } + char* data = malloc((size_t)sz + 1); + if (!data) { fclose(f); return 0; } + size_t got = fread(data, 1, (size_t)sz, f); + fclose(f); + data[got] = '\0'; + + EngramStore* g = engram_get(); + int64_t added_nodes = 0; + + /* Walk nodes array — skip any node whose id already exists */ + const char* nodes_p = json_find_key(data, "nodes"); + if (nodes_p) { + nodes_p = eg_skip_ws(nodes_p); + if (*nodes_p == '[') { + nodes_p++; + nodes_p = eg_skip_ws(nodes_p); + while (*nodes_p && *nodes_p != ']') { + if (*nodes_p != '{') { nodes_p++; continue; } + const char* end = json_skip_value(nodes_p); + size_t n = (size_t)(end - nodes_p); + char* obj = malloc(n + 1); + memcpy(obj, nodes_p, n); obj[n] = '\0'; + char* nid = eg_get_str_field(obj, "id"); + int already = (nid && *nid && engram_find_node(nid) != NULL); + free(nid); + if (!already) { + engram_grow_nodes(); + EngramNode* nn = &g->nodes[g->node_count]; + memset(nn, 0, sizeof(*nn)); + nn->id = eg_get_str_field(obj, "id"); + nn->content = eg_get_str_field(obj, "content"); + nn->node_type = eg_get_str_field(obj, "node_type"); + nn->label = eg_get_str_field(obj, "label"); + nn->tier = eg_get_str_field(obj, "tier"); + nn->tags = eg_get_str_field(obj, "tags"); + nn->metadata = eg_get_str_field(obj, "metadata"); + if (!nn->metadata || !*nn->metadata) { free(nn->metadata); nn->metadata = strdup("{}"); } + nn->salience = eg_get_num_field(obj, "salience"); + nn->importance = eg_get_num_field(obj, "importance"); + nn->confidence = eg_get_num_field(obj, "confidence"); + nn->temporal_decay_rate = eg_get_num_field(obj, "temporal_decay_rate"); + nn->activation_count = eg_get_int_field(obj, "activation_count"); + nn->last_activated = eg_get_int_field(obj, "last_activated"); + nn->created_at = eg_get_int_field(obj, "created_at"); + nn->updated_at = eg_get_int_field(obj, "updated_at"); + nn->background_activation = eg_get_num_field(obj, "background_activation"); + nn->working_memory_weight = eg_get_num_field(obj, "working_memory_weight"); + if (!isfinite(nn->working_memory_weight) || nn->working_memory_weight < 0.0 || nn->working_memory_weight > 1.0) + nn->working_memory_weight = 0.0; /* clamp corrupt snapshot values */ + nn->suppression_count = (int32_t)eg_get_int_field(obj, "suppression_count"); + if (json_find_key(obj, "layer_id")) { + nn->layer_id = (uint32_t)eg_get_int_field(obj, "layer_id"); + } else { + nn->layer_id = ENGRAM_LAYER_DEFAULT; + } + g->node_count++; + added_nodes++; + } + free(obj); + nodes_p = end; + nodes_p = eg_skip_ws(nodes_p); + if (*nodes_p == ',') { nodes_p++; nodes_p = eg_skip_ws(nodes_p); } + } + } + } + + /* Walk edges array — skip if (from_id, to_id, relation) already present */ + const char* edges_p = json_find_key(data, "edges"); + if (edges_p) { + edges_p = eg_skip_ws(edges_p); + if (*edges_p == '[') { + edges_p++; + edges_p = eg_skip_ws(edges_p); + while (*edges_p && *edges_p != ']') { + if (*edges_p != '{') { edges_p++; continue; } + const char* end = json_skip_value(edges_p); + size_t n = (size_t)(end - edges_p); + char* obj = malloc(n + 1); + memcpy(obj, edges_p, n); obj[n] = '\0'; + char* efrom = eg_get_str_field(obj, "from_id"); + char* eto = eg_get_str_field(obj, "to_id"); + char* erel = eg_get_str_field(obj, "relation"); + /* Check for duplicate by scanning existing edges */ + int dup = 0; + if (efrom && eto && erel) { + for (int64_t ei = 0; ei < g->edge_count; ei++) { + EngramEdge* ex = &g->edges[ei]; + if (ex->from_id && ex->to_id && ex->relation && + strcmp(ex->from_id, efrom) == 0 && + strcmp(ex->to_id, eto) == 0 && + strcmp(ex->relation, erel) == 0) { + dup = 1; break; + } + } + } + if (!dup) { + engram_grow_edges(); + EngramEdge* ee = &g->edges[g->edge_count]; + memset(ee, 0, sizeof(*ee)); + ee->id = eg_get_str_field(obj, "id"); + ee->from_id = efrom ? efrom : strdup(""); + ee->to_id = eto ? eto : strdup(""); + ee->relation = erel ? erel : strdup(""); + ee->metadata = eg_get_str_field(obj, "metadata"); + if (!ee->metadata || !*ee->metadata) { free(ee->metadata); ee->metadata = strdup("{}"); } + ee->weight = eg_get_num_field(obj, "weight"); + ee->confidence = eg_get_num_field(obj, "confidence"); + ee->created_at = eg_get_int_field(obj, "created_at"); + ee->updated_at = eg_get_int_field(obj, "updated_at"); + ee->last_fired = eg_get_int_field(obj, "last_fired"); + ee->inhibitory = (int)eg_get_int_field(obj, "inhibitory"); + if (json_find_key(obj, "layer_id")) { + ee->layer_id = (uint32_t)eg_get_int_field(obj, "layer_id"); + } else { + ee->layer_id = ENGRAM_LAYER_DEFAULT; + } + g->edge_count++; + /* NOTE: efrom/eto/erel ownership transferred to ee above */ + efrom = NULL; eto = NULL; erel = NULL; + } else { + free(efrom); free(eto); free(erel); + } + free(obj); + edges_p = end; + edges_p = eg_skip_ws(edges_p); + if (*edges_p == ',') { edges_p++; edges_p = eg_skip_ws(edges_p); } + } + } + } + + free(data); + return (el_val_t)added_nodes; +} + /* ── Engram JSON-string accessors ───────────────────────────────────────── * These return pre-serialized JSON strings so callers (especially HTTP * handlers) don't have to round-trip ElList/ElMap through json_stringify @@ -7736,6 +7966,86 @@ el_val_t engram_stats_json(void) { return el_wrap_str(el_strdup(buf)); } +/* ── Working memory accessors ─────────────────────────────────────────────── */ + +el_val_t engram_wm_count(void) { + EngramStore* g = engram_get(); + int64_t count = 0; + for (int64_t i = 0; i < g->node_count; i++) { + if (g->nodes[i].working_memory_weight > 0.0) count++; + } + return (el_val_t)count; +} + +/* Average working_memory_weight across all promoted nodes (wm > 0). + * Returns the float bit-pattern via el_from_float so EL can use it with + * float_to_str / float_gt. Returns 0.0 when no nodes are promoted. */ +el_val_t engram_wm_avg_weight(void) { + EngramStore* g = engram_get(); + double sum = 0.0; + int64_t count = 0; + for (int64_t i = 0; i < g->node_count; i++) { + double w = g->nodes[i].working_memory_weight; + if (w > 0.0 && w <= 1.0 && isfinite(w)) { sum += w; count++; } + } + double avg = (count > 0) ? (sum / (double)count) : 0.0; + return el_from_float(avg); +} + +/* engram_wm_top_json — return top N working-memory nodes (by wm weight) as a + * compact JSON array for ISE heartbeat reporting. + * Each element: {"label":"...","node_type":"...","tier":"...","wm":0.42} */ +el_val_t engram_wm_top_json(el_val_t n_v) { + int64_t top_n = (int64_t)n_v; + if (top_n <= 0) top_n = 10; + if (top_n > 50) top_n = 50; + EngramStore* g = engram_get(); + + int64_t* idx = malloc((size_t)(g->node_count + 1) * sizeof(int64_t)); + if (!idx) return el_wrap_str(el_strdup("[]")); + int64_t mc = 0; + for (int64_t i = 0; i < g->node_count; i++) { + if (g->nodes[i].working_memory_weight > 0.0) { + const char* nt = g->nodes[i].node_type; + if (nt && strcmp(nt, "InternalStateEvent") == 0) continue; + idx[mc++] = i; + } + } + + /* Insertion-sort descending by wm weight (mc is typically small). */ + for (int64_t i = 1; i < mc; i++) { + int64_t key = idx[i]; + double kw = g->nodes[key].working_memory_weight; + int64_t j = i; + while (j > 0 && g->nodes[idx[j-1]].working_memory_weight < kw) { + idx[j] = idx[j-1]; j--; + } + idx[j] = key; + } + + int64_t emit = mc < top_n ? mc : top_n; + JsonBuf b; jb_init(&b); + jb_putc(&b, '['); + for (int64_t k = 0; k < emit; k++) { + EngramNode* n = &g->nodes[idx[k]]; + if (k > 0) jb_putc(&b, ','); + jb_putc(&b, '{'); + jb_puts(&b, "\"label\":"); + jb_emit_escaped(&b, n->label ? n->label : ""); + jb_puts(&b, ",\"node_type\":"); + jb_emit_escaped(&b, n->node_type ? n->node_type : ""); + jb_puts(&b, ",\"tier\":"); + jb_emit_escaped(&b, n->tier ? n->tier : ""); + char tmp[48]; + snprintf(tmp, sizeof(tmp), ",\"wm\":%.3f", n->working_memory_weight); + jb_puts(&b, tmp); + jb_putc(&b, '}'); + } + free(idx); + jb_putc(&b, ']'); + return el_wrap_str(b.buf); +} + /* engram_list_layers_json — serialized counterpart of engram_list_layers. * Returns a JSON array, sorted by activation_priority ascending. */ el_val_t engram_list_layers_json(void) { diff --git a/lang/el-compiler/runtime/el_runtime.h b/lang/el-compiler/runtime/el_runtime.h index 2f9583f..9c8054d 100644 --- a/lang/el-compiler/runtime/el_runtime.h +++ b/lang/el-compiler/runtime/el_runtime.h @@ -154,6 +154,7 @@ el_val_t http_post_json_with_headers(el_val_t url, el_val_t headers_map, el_val el_val_t http_post_form_auth(el_val_t url, el_val_t form_body, el_val_t auth_header); el_val_t http_delete(el_val_t url); el_val_t http_serve(el_val_t port, el_val_t handler); +void http_serve_async(el_val_t port, el_val_t handler); el_val_t http_set_handler(el_val_t name); /* HTTP server v2 ───────────────────────────────────────────────────────────── @@ -632,6 +633,12 @@ el_val_t engram_neighbors_json(el_val_t node_id, el_val_t max_depth, el_val_t d el_val_t engram_activate_json(el_val_t query, el_val_t depth); el_val_t engram_stats_json(void); el_val_t engram_list_layers_json(void); + +/* ── Working memory ──────────────────────────────────────────────────────────*/ +el_val_t engram_wm_count(void); +el_val_t engram_wm_avg_weight(void); +el_val_t engram_wm_top_json(el_val_t n); +el_val_t engram_load_merge(el_val_t path); /* engram_compile_layered_json — produce a prompt-ready text block split * into "[LAYER 0 — STRUCTURAL]" (non-suppressible layers, sacred fire) * and "[ENGRAM CONTEXT]" (standard suppressible layers). Returns "" if -- 2.52.0