// thread.el — El native threading model // // First-class parallelism for El. Eliminates bash fan-out hacks for parallel // HTTP dispatch, concurrent processing pipelines, and any other workload that // benefits from concurrent execution. // // Built on four seed primitives exposed by el_seed.c via dlsym+pthread: // __thread_create(fn_name, arg) -> Int spawn thread, return tid // __thread_join(tid) -> String join thread, return result // __mutex_new() -> Int allocate a mutex, return handle // __mutex_lock(m) lock mutex // __mutex_unlock(m) unlock mutex // // Every El fn compiles to a global C symbol. __thread_create uses dlsym to // look up the function by name and run it in a pthread. This means any El fn // with signature (String) -> String is directly threadable. // ── Core primitives ────────────────────────────────────────────────────────── // spawn — launch an El function in a new thread. // // fn_name: the name of an El fn with signature (String) -> String // arg: the argument to pass to that fn // // Returns a thread id (tid) that can be passed to join(). // The El function must be a top-level fn — its C symbol must be globally // visible so dlsym can resolve it. fn spawn(fn_name: String, arg: String) -> Int { return __thread_create(fn_name, arg) } // join — wait for a thread to finish and return its result. // // tid: the thread id returned by spawn() // // Blocks until the thread completes. Returns the String value the thread // function returned. fn join(tid: Int) -> String { return __thread_join(tid) } // ── parallel_map ───────────────────────────────────────────────────────────── // parallel_map — map an El function over a list of strings concurrently. // // items: [String] — the input list // fn_name: String — name of an El fn with signature (String) -> String // // Spawns one thread per item. All threads run concurrently. Joins each thread // in input order, so the output list preserves the same order as the input. // // This is the core primitive that replaces bash fan-out for parallel HTTP. // Example — dispatch to N rooms at once: // let responses: [String] = parallel_map(room_payloads, "dispatch_to_room") fn parallel_map(items: [String], fn_name: String) -> [String] { let n: Int = el_list_len(items) // Phase 1: spawn all threads and collect tids in order. let tids: [String] = el_list_empty() let i = 0 while i < n { let item: String = el_list_get(items, i) let tid: Int = spawn(fn_name, item) // Store tid as string so we can hold it in [String]. // int_to_str is available as a builtin. let tids = el_list_append(tids, int_to_str(tid)) let i = i + 1 } // Phase 2: join all threads in order, collecting results. let results: [String] = el_list_empty() let j = 0 while j < n { let tid_str: String = el_list_get(tids, j) let tid: Int = str_to_int(tid_str) let result: String = join(tid) let results = el_list_append(results, result) let j = j + 1 } return results } // ── parallel_map_json ──────────────────────────────────────────────────────── // parallel_map_json — parallel_map over a JSON array string. // // items_json: String — a JSON array of strings, e.g. '["a","b","c"]' // fn_name: String — name of an El fn with signature (String) -> String // // Parses the JSON array into an [String], runs parallel_map, then serialises // the result list back to a JSON array string. Both input and output are JSON // strings — the common El inter-service format. // // Example: // let out_json: String = parallel_map_json(rooms_json, "dispatch_to_room") fn parallel_map_json(items_json: String, fn_name: String) -> String { let n: Int = json_array_len(items_json) // Unpack JSON array into [String]. let items: [String] = el_list_empty() let i = 0 while i < n { let item: String = json_array_get(items_json, i) let items = el_list_append(items, item) let i = i + 1 } // Run the concurrent map. let results: [String] = parallel_map(items, fn_name) // Repack results into a JSON array string. let m: Int = el_list_len(results) let out: String = "[" let j = 0 while j < m { let val: String = el_list_get(results, j) if j > 0 { let out = out + "," } // Each result is treated as a raw JSON value (object, array, or // quoted string as returned by the worker fn). let out = out + val let j = j + 1 } let out = out + "]" return out } // ── parallel_filter ────────────────────────────────────────────────────────── // parallel_filter — keep items where fn_name returns "true", concurrently. // // items: [String] — the input list // fn_name: String — name of an El fn with signature (String) -> String // that returns "true" to keep the item or anything else // to discard it // // Runs the predicate fn on all items in parallel. Collects results in order, // preserving the relative order of kept items. fn parallel_filter(items: [String], fn_name: String) -> [String] { let n: Int = el_list_len(items) // Spawn a predicate thread for every item. let tids: [String] = el_list_empty() let i = 0 while i < n { let item: String = el_list_get(items, i) let tid: Int = spawn(fn_name, item) let tids = el_list_append(tids, int_to_str(tid)) let i = i + 1 } // Join in order, keep item if the predicate returned "true". let kept: [String] = el_list_empty() let j = 0 while j < n { let item: String = el_list_get(items, j) let tid_str: String = el_list_get(tids, j) let tid: Int = str_to_int(tid_str) let verdict: String = join(tid) if str_eq(verdict, "true") { let kept = el_list_append(kept, item) } let j = j + 1 } return kept } // ── HTTP helpers ───────────────────────────────────────────────────────────── // fire_http_post — worker fn for parallel_posts. // // Expects arg to be a JSON object with "url" and "body" keys: // {"url":"https://...","body":"{...}"} // // Returns the HTTP response body string. Registered as a global El fn so // parallel_map can locate it via dlsym. fn fire_http_post(arg: String) -> String { let url: String = json_get(arg, "url") let body: String = json_get(arg, "body") return http_post(url, body) } // parallel_posts — fire a list of HTTP POSTs concurrently. // // requests: [String] — each element is a JSON object {"url":"...","body":"..."} // // Returns [String] of response bodies in the same order as the input. // // Example — fan out to N room endpoints at once: // let reqs: [String] = el_list_empty() // let reqs = el_list_append(reqs, "{\"url\":\"http://room-a/dispatch\",\"body\":\"" + payload + "\"}") // let reqs = el_list_append(reqs, "{\"url\":\"http://room-b/dispatch\",\"body\":\"" + payload + "\"}") // let responses: [String] = parallel_posts(reqs) fn parallel_posts(requests: [String]) -> [String] { return parallel_map(requests, "fire_http_post") } // ── Mutex helpers ──────────────────────────────────────────────────────────── // with_mutex — call fn_name(arg) while holding mutex m. // // m: Int — mutex handle returned by __mutex_new() // fn_name: String — name of an El fn with signature (String) -> String // arg: String — argument to pass to fn_name // // Locks the mutex, spawns fn_name(arg) in a child thread, joins to collect // the result, then unlocks. The mutex is held across the entire duration of // fn_name's execution, serializing concurrent callers. // // Note: fn_name must NOT itself acquire the same mutex — that would deadlock. // This is the standard reentrant-mutex caveat. // // Usage: // let m: Int = __mutex_new() // let result: String = with_mutex(m, "update_shared_state", payload) fn with_mutex(m: Int, fn_name: String, arg: String) -> String { __mutex_lock(m) let tid: Int = spawn(fn_name, arg) let result: String = join(tid) __mutex_unlock(m) return result }