Files
2026-05-05 01:38:51 -05:00

226 lines
8.9 KiB
EmacsLisp

// thread.el El native threading model
//
// First-class parallelism for El. Eliminates bash fan-out hacks for parallel
// HTTP dispatch, concurrent processing pipelines, and any other workload that
// benefits from concurrent execution.
//
// Built on four seed primitives exposed by el_seed.c via dlsym+pthread:
// __thread_create(fn_name, arg) -> Int spawn thread, return tid
// __thread_join(tid) -> String join thread, return result
// __mutex_new() -> Int allocate a mutex, return handle
// __mutex_lock(m) lock mutex
// __mutex_unlock(m) unlock mutex
//
// Every El fn compiles to a global C symbol. __thread_create uses dlsym to
// look up the function by name and run it in a pthread. This means any El fn
// with signature (String) -> String is directly threadable.
// Core primitives
// spawn launch an El function in a new thread.
//
// fn_name: the name of an El fn with signature (String) -> String
// arg: the argument to pass to that fn
//
// Returns a thread id (tid) that can be passed to join().
// The El function must be a top-level fn its C symbol must be globally
// visible so dlsym can resolve it.
fn spawn(fn_name: String, arg: String) -> Int {
return __thread_create(fn_name, arg)
}
// join wait for a thread to finish and return its result.
//
// tid: the thread id returned by spawn()
//
// Blocks until the thread completes. Returns the String value the thread
// function returned.
fn join(tid: Int) -> String {
return __thread_join(tid)
}
// parallel_map
// parallel_map map an El function over a list of strings concurrently.
//
// items: [String] the input list
// fn_name: String name of an El fn with signature (String) -> String
//
// Spawns one thread per item. All threads run concurrently. Joins each thread
// in input order, so the output list preserves the same order as the input.
//
// This is the core primitive that replaces bash fan-out for parallel HTTP.
// Example dispatch to N rooms at once:
// let responses: [String] = parallel_map(room_payloads, "dispatch_to_room")
fn parallel_map(items: [String], fn_name: String) -> [String] {
let n: Int = el_list_len(items)
// Phase 1: spawn all threads and collect tids in order.
let tids: [String] = el_list_empty()
let i = 0
while i < n {
let item: String = el_list_get(items, i)
let tid: Int = spawn(fn_name, item)
// Store tid as string so we can hold it in [String].
// int_to_str is available as a builtin.
let tids = el_list_append(tids, int_to_str(tid))
let i = i + 1
}
// Phase 2: join all threads in order, collecting results.
let results: [String] = el_list_empty()
let j = 0
while j < n {
let tid_str: String = el_list_get(tids, j)
let tid: Int = str_to_int(tid_str)
let result: String = join(tid)
let results = el_list_append(results, result)
let j = j + 1
}
return results
}
// parallel_map_json
// parallel_map_json parallel_map over a JSON array string.
//
// items_json: String a JSON array of strings, e.g. '["a","b","c"]'
// fn_name: String name of an El fn with signature (String) -> String
//
// Parses the JSON array into an [String], runs parallel_map, then serialises
// the result list back to a JSON array string. Both input and output are JSON
// strings the common El inter-service format.
//
// Example:
// let out_json: String = parallel_map_json(rooms_json, "dispatch_to_room")
fn parallel_map_json(items_json: String, fn_name: String) -> String {
let n: Int = json_array_len(items_json)
// Unpack JSON array into [String].
let items: [String] = el_list_empty()
let i = 0
while i < n {
let item: String = json_array_get(items_json, i)
let items = el_list_append(items, item)
let i = i + 1
}
// Run the concurrent map.
let results: [String] = parallel_map(items, fn_name)
// Repack results into a JSON array string.
let m: Int = el_list_len(results)
let out: String = "["
let j = 0
while j < m {
let val: String = el_list_get(results, j)
if j > 0 {
let out = out + ","
}
// Each result is treated as a raw JSON value (object, array, or
// quoted string as returned by the worker fn).
let out = out + val
let j = j + 1
}
let out = out + "]"
return out
}
// parallel_filter
// parallel_filter keep items where fn_name returns "true", concurrently.
//
// items: [String] the input list
// fn_name: String name of an El fn with signature (String) -> String
// that returns "true" to keep the item or anything else
// to discard it
//
// Runs the predicate fn on all items in parallel. Collects results in order,
// preserving the relative order of kept items.
fn parallel_filter(items: [String], fn_name: String) -> [String] {
let n: Int = el_list_len(items)
// Spawn a predicate thread for every item.
let tids: [String] = el_list_empty()
let i = 0
while i < n {
let item: String = el_list_get(items, i)
let tid: Int = spawn(fn_name, item)
let tids = el_list_append(tids, int_to_str(tid))
let i = i + 1
}
// Join in order, keep item if the predicate returned "true".
let kept: [String] = el_list_empty()
let j = 0
while j < n {
let item: String = el_list_get(items, j)
let tid_str: String = el_list_get(tids, j)
let tid: Int = str_to_int(tid_str)
let verdict: String = join(tid)
if str_eq(verdict, "true") {
let kept = el_list_append(kept, item)
}
let j = j + 1
}
return kept
}
// HTTP helpers
// fire_http_post worker fn for parallel_posts.
//
// Expects arg to be a JSON object with "url" and "body" keys:
// {"url":"https://...","body":"{...}"}
//
// Returns the HTTP response body string. Registered as a global El fn so
// parallel_map can locate it via dlsym.
fn fire_http_post(arg: String) -> String {
let url: String = json_get(arg, "url")
let body: String = json_get(arg, "body")
return http_post(url, body)
}
// parallel_posts fire a list of HTTP POSTs concurrently.
//
// requests: [String] each element is a JSON object {"url":"...","body":"..."}
//
// Returns [String] of response bodies in the same order as the input.
//
// Example fan out to N room endpoints at once:
// let reqs: [String] = el_list_empty()
// let reqs = el_list_append(reqs, "{\"url\":\"http://room-a/dispatch\",\"body\":\"" + payload + "\"}")
// let reqs = el_list_append(reqs, "{\"url\":\"http://room-b/dispatch\",\"body\":\"" + payload + "\"}")
// let responses: [String] = parallel_posts(reqs)
fn parallel_posts(requests: [String]) -> [String] {
return parallel_map(requests, "fire_http_post")
}
// Mutex helpers
// with_mutex call fn_name(arg) while holding mutex m.
//
// m: Int mutex handle returned by __mutex_new()
// fn_name: String name of an El fn with signature (String) -> String
// arg: String argument to pass to fn_name
//
// Locks the mutex, spawns fn_name(arg) in a child thread, joins to collect
// the result, then unlocks. The mutex is held across the entire duration of
// fn_name's execution, serializing concurrent callers.
//
// Note: fn_name must NOT itself acquire the same mutex that would deadlock.
// This is the standard reentrant-mutex caveat.
//
// Usage:
// let m: Int = __mutex_new()
// let result: String = with_mutex(m, "update_shared_state", payload)
fn with_mutex(m: Int, fn_name: String, arg: String) -> String {
__mutex_lock(m)
let tid: Int = spawn(fn_name, arg)
let result: String = join(tid)
__mutex_unlock(m)
return result
}