// channel.el — Go-style channels for El // // Channels are the communication primitive for concurrent El programs. // Threads send values into a channel; other threads receive them. // Channels are typed by convention — all values are Strings. // // Backed by four seed primitives in el_runtime.c: // __channel_new(capacity) -> Int create channel; cap=0 = unbounded // __channel_send(ch, msg) push msg; blocks if bounded and full // __channel_recv(ch) -> String pop msg; blocks until available; "" on close // __channel_try_recv(ch) -> String non-blocking pop; "" if empty // __channel_close(ch) mark closed; wake all blocked recvers // // Usage: // let ch: Int = channel_new(10) // buffered channel, capacity 10 // spawn("producer", int_to_str(ch)) // let msg: String = channel_recv(ch) // ── Core channel API ───────────────────────────────────────────────────────── // channel_new — create a channel with the given buffer capacity. // // capacity: 0 = unbounded (never blocks sender) // N = bounded buffer of N messages (sender blocks when full) // // Returns a channel handle (Int) to pass to send/recv/close. fn channel_new(capacity: Int) -> Int { return __channel_new(capacity) } // channel_send — send a message into the channel. // // Blocks if the channel is bounded and full. // No-op if the channel is already closed. fn channel_send(ch: Int, msg: String) { __channel_send(ch, msg) } // channel_recv — receive the next message from the channel. // // Blocks until a message is available. // Returns "" when the channel is closed and all buffered messages are drained. // The "" sentinel signals end-of-stream to consumers in a loop. fn channel_recv(ch: Int) -> String { return __channel_recv(ch) } // channel_try_recv — non-blocking receive. // // Returns the next message if one is available, or "" if the channel is empty. // Does not block. Callers must distinguish "" (empty) from a legitimate "" // message by convention — use a non-empty sentinel in the message protocol. fn channel_try_recv(ch: Int) -> String { return __channel_try_recv(ch) } // channel_close — signal that no more messages will be sent. // // After close, channel_recv continues to drain buffered messages then // returns "" on every subsequent call. channel_send on a closed channel // is a no-op (the message is dropped). fn channel_close(ch: Int) { __channel_close(ch) } // ── channel_pipeline ───────────────────────────────────────────────────────── // channel_pipeline — producer/consumer pipeline with parallel workers. // // Reads messages from in_ch, applies fn_name to each, writes results to out_ch. // Spawns `workers` concurrent worker threads — each drains in_ch independently, // so messages are processed in arrival order within each worker but not globally. // // fn_name must be an El fn with signature (String) -> String. // // Call channel_close(in_ch) to signal EOF. Workers exit when they receive "". // The caller must also close out_ch after all workers finish (via join). // // let in_ch: Int = channel_new(0) // let out_ch: Int = channel_new(0) // channel_pipeline(in_ch, out_ch, "process_item", 4) // channel_send(in_ch, "work-1") // channel_close(in_ch) // let result: String = channel_recv(out_ch) fn channel_pipeline(in_ch: Int, out_ch: Int, fn_name: String, workers: Int) { let i: Int = 0 while i < workers { let arg: String = "{\"in_ch\":" + int_to_str(in_ch) + ",\"out_ch\":" + int_to_str(out_ch) + ",\"fn\":\"" + fn_name + "\"}" let _tid: Int = spawn("_channel_worker", arg) let i = i + 1 } } // _channel_worker — internal worker for channel_pipeline. // // Reads messages from in_ch until it receives "" (closed+empty), applies // fn_name to each, and writes results to out_ch. Runs in its own thread // (spawned by channel_pipeline). fn _channel_worker(arg: String) -> String { let in_ch: Int = str_to_int(json_get(arg, "in_ch")) let out_ch: Int = str_to_int(json_get(arg, "out_ch")) let fn_name: String = json_get(arg, "fn") let running: Bool = true while running { let msg: String = channel_recv(in_ch) if str_eq(msg, "") { let running = false } else { // Spawn fn_name in a child thread so it cannot block the worker loop. let tid: Int = spawn(fn_name, msg) let result: String = join(tid) channel_send(out_ch, result) } } return "" } // ── channel_drain ──────────────────────────────────────────────────────────── // channel_drain — collect all messages from ch into a list. // // Reads until the channel is closed and empty (recv returns ""). // Returns a [String] of all messages received. // // Typical usage: close the channel from the producer side, then call // channel_drain from the consumer to collect results. fn channel_drain(ch: Int) -> [String] { let results: [String] = el_list_empty() let running: Bool = true while running { let msg: String = channel_recv(ch) if str_eq(msg, "") { let running = false } else { let results = el_list_append(results, msg) } } return results } // ── channel_fan_out ─────────────────────────────────────────────────────────── // channel_fan_out — send every item in a list into a channel. // // items: [String] — items to send // ch: Int — destination channel // // Sends all items then closes the channel to signal end-of-stream. // Intended for the producer side of a pipeline: // // channel_fan_out(items, in_ch) // let results: [String] = channel_drain(out_ch) fn channel_fan_out(items: [String], ch: Int) { let n: Int = el_list_len(items) let i: Int = 0 while i < n { let item: String = el_list_get(items, i) channel_send(ch, item) let i = i + 1 } channel_close(ch) }