226 lines
8.9 KiB
EmacsLisp
226 lines
8.9 KiB
EmacsLisp
// thread.el — El native threading model
|
|
//
|
|
// First-class parallelism for El. Eliminates bash fan-out hacks for parallel
|
|
// HTTP dispatch, concurrent processing pipelines, and any other workload that
|
|
// benefits from concurrent execution.
|
|
//
|
|
// Built on four seed primitives exposed by el_seed.c via dlsym+pthread:
|
|
// __thread_create(fn_name, arg) -> Int spawn thread, return tid
|
|
// __thread_join(tid) -> String join thread, return result
|
|
// __mutex_new() -> Int allocate a mutex, return handle
|
|
// __mutex_lock(m) lock mutex
|
|
// __mutex_unlock(m) unlock mutex
|
|
//
|
|
// Every El fn compiles to a global C symbol. __thread_create uses dlsym to
|
|
// look up the function by name and run it in a pthread. This means any El fn
|
|
// with signature (String) -> String is directly threadable.
|
|
|
|
// ── Core primitives ──────────────────────────────────────────────────────────
|
|
|
|
// spawn — launch an El function in a new thread.
|
|
//
|
|
// fn_name: the name of an El fn with signature (String) -> String
|
|
// arg: the argument to pass to that fn
|
|
//
|
|
// Returns a thread id (tid) that can be passed to join().
|
|
// The El function must be a top-level fn — its C symbol must be globally
|
|
// visible so dlsym can resolve it.
|
|
fn spawn(fn_name: String, arg: String) -> Int {
|
|
return __thread_create(fn_name, arg)
|
|
}
|
|
|
|
// join — wait for a thread to finish and return its result.
|
|
//
|
|
// tid: the thread id returned by spawn()
|
|
//
|
|
// Blocks until the thread completes. Returns the String value the thread
|
|
// function returned.
|
|
fn join(tid: Int) -> String {
|
|
return __thread_join(tid)
|
|
}
|
|
|
|
// ── parallel_map ─────────────────────────────────────────────────────────────
|
|
|
|
// parallel_map — map an El function over a list of strings concurrently.
|
|
//
|
|
// items: [String] — the input list
|
|
// fn_name: String — name of an El fn with signature (String) -> String
|
|
//
|
|
// Spawns one thread per item. All threads run concurrently. Joins each thread
|
|
// in input order, so the output list preserves the same order as the input.
|
|
//
|
|
// This is the core primitive that replaces bash fan-out for parallel HTTP.
|
|
// Example — dispatch to N rooms at once:
|
|
// let responses: [String] = parallel_map(room_payloads, "dispatch_to_room")
|
|
fn parallel_map(items: [String], fn_name: String) -> [String] {
|
|
let n: Int = el_list_len(items)
|
|
|
|
// Phase 1: spawn all threads and collect tids in order.
|
|
let tids: [String] = el_list_empty()
|
|
let i = 0
|
|
while i < n {
|
|
let item: String = el_list_get(items, i)
|
|
let tid: Int = spawn(fn_name, item)
|
|
// Store tid as string so we can hold it in [String].
|
|
// int_to_str is available as a builtin.
|
|
let tids = el_list_append(tids, int_to_str(tid))
|
|
let i = i + 1
|
|
}
|
|
|
|
// Phase 2: join all threads in order, collecting results.
|
|
let results: [String] = el_list_empty()
|
|
let j = 0
|
|
while j < n {
|
|
let tid_str: String = el_list_get(tids, j)
|
|
let tid: Int = str_to_int(tid_str)
|
|
let result: String = join(tid)
|
|
let results = el_list_append(results, result)
|
|
let j = j + 1
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
// ── parallel_map_json ────────────────────────────────────────────────────────
|
|
|
|
// parallel_map_json — parallel_map over a JSON array string.
|
|
//
|
|
// items_json: String — a JSON array of strings, e.g. '["a","b","c"]'
|
|
// fn_name: String — name of an El fn with signature (String) -> String
|
|
//
|
|
// Parses the JSON array into an [String], runs parallel_map, then serialises
|
|
// the result list back to a JSON array string. Both input and output are JSON
|
|
// strings — the common El inter-service format.
|
|
//
|
|
// Example:
|
|
// let out_json: String = parallel_map_json(rooms_json, "dispatch_to_room")
|
|
fn parallel_map_json(items_json: String, fn_name: String) -> String {
|
|
let n: Int = json_array_len(items_json)
|
|
|
|
// Unpack JSON array into [String].
|
|
let items: [String] = el_list_empty()
|
|
let i = 0
|
|
while i < n {
|
|
let item: String = json_array_get(items_json, i)
|
|
let items = el_list_append(items, item)
|
|
let i = i + 1
|
|
}
|
|
|
|
// Run the concurrent map.
|
|
let results: [String] = parallel_map(items, fn_name)
|
|
|
|
// Repack results into a JSON array string.
|
|
let m: Int = el_list_len(results)
|
|
let out: String = "["
|
|
let j = 0
|
|
while j < m {
|
|
let val: String = el_list_get(results, j)
|
|
if j > 0 {
|
|
let out = out + ","
|
|
}
|
|
// Each result is treated as a raw JSON value (object, array, or
|
|
// quoted string as returned by the worker fn).
|
|
let out = out + val
|
|
let j = j + 1
|
|
}
|
|
let out = out + "]"
|
|
return out
|
|
}
|
|
|
|
// ── parallel_filter ──────────────────────────────────────────────────────────
|
|
|
|
// parallel_filter — keep items where fn_name returns "true", concurrently.
|
|
//
|
|
// items: [String] — the input list
|
|
// fn_name: String — name of an El fn with signature (String) -> String
|
|
// that returns "true" to keep the item or anything else
|
|
// to discard it
|
|
//
|
|
// Runs the predicate fn on all items in parallel. Collects results in order,
|
|
// preserving the relative order of kept items.
|
|
fn parallel_filter(items: [String], fn_name: String) -> [String] {
|
|
let n: Int = el_list_len(items)
|
|
|
|
// Spawn a predicate thread for every item.
|
|
let tids: [String] = el_list_empty()
|
|
let i = 0
|
|
while i < n {
|
|
let item: String = el_list_get(items, i)
|
|
let tid: Int = spawn(fn_name, item)
|
|
let tids = el_list_append(tids, int_to_str(tid))
|
|
let i = i + 1
|
|
}
|
|
|
|
// Join in order, keep item if the predicate returned "true".
|
|
let kept: [String] = el_list_empty()
|
|
let j = 0
|
|
while j < n {
|
|
let item: String = el_list_get(items, j)
|
|
let tid_str: String = el_list_get(tids, j)
|
|
let tid: Int = str_to_int(tid_str)
|
|
let verdict: String = join(tid)
|
|
if str_eq(verdict, "true") {
|
|
let kept = el_list_append(kept, item)
|
|
}
|
|
let j = j + 1
|
|
}
|
|
|
|
return kept
|
|
}
|
|
|
|
// ── HTTP helpers ─────────────────────────────────────────────────────────────
|
|
|
|
// fire_http_post — worker fn for parallel_posts.
|
|
//
|
|
// Expects arg to be a JSON object with "url" and "body" keys:
|
|
// {"url":"https://...","body":"{...}"}
|
|
//
|
|
// Returns the HTTP response body string. Registered as a global El fn so
|
|
// parallel_map can locate it via dlsym.
|
|
fn fire_http_post(arg: String) -> String {
|
|
let url: String = json_get(arg, "url")
|
|
let body: String = json_get(arg, "body")
|
|
return http_post(url, body)
|
|
}
|
|
|
|
// parallel_posts — fire a list of HTTP POSTs concurrently.
|
|
//
|
|
// requests: [String] — each element is a JSON object {"url":"...","body":"..."}
|
|
//
|
|
// Returns [String] of response bodies in the same order as the input.
|
|
//
|
|
// Example — fan out to N room endpoints at once:
|
|
// let reqs: [String] = el_list_empty()
|
|
// let reqs = el_list_append(reqs, "{\"url\":\"http://room-a/dispatch\",\"body\":\"" + payload + "\"}")
|
|
// let reqs = el_list_append(reqs, "{\"url\":\"http://room-b/dispatch\",\"body\":\"" + payload + "\"}")
|
|
// let responses: [String] = parallel_posts(reqs)
|
|
fn parallel_posts(requests: [String]) -> [String] {
|
|
return parallel_map(requests, "fire_http_post")
|
|
}
|
|
|
|
// ── Mutex helpers ────────────────────────────────────────────────────────────
|
|
|
|
// with_mutex — call fn_name(arg) while holding mutex m.
|
|
//
|
|
// m: Int — mutex handle returned by __mutex_new()
|
|
// fn_name: String — name of an El fn with signature (String) -> String
|
|
// arg: String — argument to pass to fn_name
|
|
//
|
|
// Locks the mutex, spawns fn_name(arg) in a child thread, joins to collect
|
|
// the result, then unlocks. The mutex is held across the entire duration of
|
|
// fn_name's execution, serializing concurrent callers.
|
|
//
|
|
// Note: fn_name must NOT itself acquire the same mutex — that would deadlock.
|
|
// This is the standard reentrant-mutex caveat.
|
|
//
|
|
// Usage:
|
|
// let m: Int = __mutex_new()
|
|
// let result: String = with_mutex(m, "update_shared_state", payload)
|
|
fn with_mutex(m: Int, fn_name: String, arg: String) -> String {
|
|
__mutex_lock(m)
|
|
let tid: Int = spawn(fn_name, arg)
|
|
let result: String = join(tid)
|
|
__mutex_unlock(m)
|
|
return result
|
|
}
|