task¶
The task package contains functionality for concurrency in Kiwi.
Package Functions¶
await(task_id)¶
Await a specified task and return its result. Blocks until the task completes.
If the task faulted (e.g. an async socket was reset), await throws a catchable SystemError with the fault message. Use try/catch to handle expected failures.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
task_id |
The task identifier. |
Returns
| Type | Description |
|---|---|
any |
The value returned by the task. |
Throws
| Error | Condition |
|---|---|
SystemError |
The task faulted (e.g. network error, socket reset). |
Example
import "task"
# Spawn a background computation and await its result
worker = do (n: integer)
total = 0
for i in [1 to n] do
total += i
end
total
end
id = task::spawn(worker, [1000])
result = task::await(id)
println "Sum 1..1000 = ${result}"
# => Sum 1..1000 = 500500
Example: handling a faulted task
import "socket"
import "task"
try
sock = socket::tcpconnect("127.0.0.1", 9)
data = task::await(socket::recv(sock, 1))
println "received: ${data.size()} bytes"
socket::close(sock)
catch (err)
println "connection failed: ${err}"
end
busy()¶
Returns true if there are active tasks.
Parameters None
Returns
| Type | Description |
|---|---|
boolean |
true if there are active tasks. |
Example
import "task"
worker = do (ms: integer)
task::sleep(ms)
"done"
end
id = task::spawn(worker, [200])
println "Busy right after spawn: ${task::busy()}" # => true
task::await(id)
println "Busy after await: ${task::busy()}" # => false
channel(cap)¶
Creates and returns a new Channel with the given capacity.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
cap |
Channel capacity. 0 means unbounded. |
Returns
| Type | Description |
|---|---|
Channel |
A new channel instance. |
Example
import "task"
# Producer/consumer connected through a bounded channel (capacity 3)
ch = task::channel(3)
producer = with (chan: Channel) do
for i in [1 to 5] do
println "[Producer] sending ${i}"
chan.send(i)
end
chan.close()
println "[Producer] done"
end
consumer = with (chan: Channel) do
while v = chan.recv() do
println "[Consumer] received ${v}"
task::sleep(50)
end
println "[Consumer] channel closed"
end
task::spawn(producer, [ch])
task::spawn(consumer, [ch])
task::wait()
interval(ms, callback)¶
Spawns a background task that calls callback repeatedly every ms milliseconds. Runs indefinitely until the task is cancelled or the program exits.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
ms |
Interval duration in milliseconds. |
lambda |
callback |
The lambda to invoke on each interval tick. |
Returns
| Type | Description |
|---|---|
integer |
The task identifier of the spawned interval task. |
Example
import "task"
import "time"
# Tick a counter every 200ms, let it run for 3 ticks then stop
count = 0
ticker_id = task::interval(200, with () do
count += 1
println "Tick ${count} at ${time::timestamp()}"
end)
# Wait long enough for ~3 ticks
task::sleep(700)
println "Final count: ${count}"
# The interval task keeps running until the program exits.
# Use task::list() and task::result() to inspect running tasks.
println "Running tasks: ${task::list()}"
spawn(callback, args)¶
Executes an asynchronous task.
Parameters
| Type | Name | Description |
|---|---|---|
lambda |
callback |
A lambda to invoke as a task. |
list |
args |
A list of arguments for the callback. |
Returns
| Type | Description |
|---|---|
integer |
The task identifier. |
Example
import "task"
fn slow_square(n: integer): integer
task::sleep(100) # simulate work
n * n
end
worker = do (n: integer)
slow_square(n)
end
# Spawn several tasks in parallel
ids = []
for n in [2, 4, 6, 8] do
ids.append(task::spawn(worker, [n]))
end
# Collect all results
for id, idx in ids do
result = task::await(id)
println "Result ${idx}: ${result}"
end
# => Result 0: 4
# => Result 1: 16
# => Result 2: 36
# => Result 3: 64
list()¶
Returns a list of task identifiers.
Parameters None
Returns
| Type | Description |
|---|---|
list |
A list of task identifiers. |
Example
import "task"
worker = do (ms: integer)
task::sleep(ms)
ms
end
task::spawn(worker, [300])
task::spawn(worker, [400])
task::spawn(worker, [500])
println "Active task ids: ${task::list()}"
# => Active task ids: [1, 2, 3]
task::wait()
println "Active task ids after wait: ${task::list()}"
# => Active task ids after wait: []
result(task_id)¶
Returns the result of a task.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
task_id |
The task identifier. |
Returns
| Type | Description |
|---|---|
any |
The task result. |
Example
import "task"
worker = do (x: integer)
task::sleep(50)
x * 10
end
id = task::spawn(worker, [7])
task::await(id)
# task::result returns the stored result without blocking
println task::result(id)
# => 70
sleep(ms)¶
Sleeps for a number of milliseconds.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
ms |
The number of milliseconds to sleep. |
Returns None
Example
import "task"
import "time"
# Pause 100ms between polling attempts
attempts = 0
while attempts < 3 do
attempts += 1
println "Attempt ${attempts} at ${time::timestamp("HH:mm:ss")}"
task::sleep(100)
end
status(task_id)¶
Returns the status of a given task as a string. Does not block and does not throw for faulted tasks — use this to poll task state before deciding whether to call await.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
task_id |
The task identifier. |
Returns
| Type | Description |
|---|---|
string |
One of "Running", "Completed", or "Faulted". |
Example
import "task"
worker = do (ms: integer)
task::sleep(ms)
"finished"
end
id = task::spawn(worker, [300])
# Poll status without blocking
task::sleep(50)
println "Status mid-run: ${task::status(id)}" # => Running
task::await(id)
println "Status after await: ${task::status(id)}" # => (empty — task removed)
Example: polling with a timeout
import "task"
import "socket"
sock = socket::tcpconnect("127.0.0.1", 8080)
recv_tid = socket::recv(sock, 1)
elapsed = 0
open = false
while elapsed < 500 do
s = task::status(recv_tid)
if s == "Completed"
open = true
break
elsif s == "Faulted"
break
end
task::sleep(10)
elapsed += 10
end
socket::close(sock)
println open ? "port is open" : "port is closed"
timer(ms, callback)¶
Waits ms milliseconds and then invokes callback once. Blocks until the timer completes.
Parameters
| Type | Name | Description |
|---|---|---|
integer |
ms |
Delay in milliseconds before invoking the callback. |
lambda |
callback |
The lambda to invoke after the delay. |
Returns None
Example
import "task"
import "time"
println "Waiting 250ms for delayed callback..."
task::timer(250, with () do
println "Timer fired at ${time::timestamp("HH:mm:ss")}"
end)
println "Timer complete — execution continues here"
wait()¶
Waits for all tasks to complete.
Parameters None
Returns None
Example
import "task"
worker = do (label: string, ms: integer)
task::sleep(ms)
println "${label} done"
end
task::spawn(worker, ["alpha", 200])
task::spawn(worker, ["beta", 150])
task::spawn(worker, ["gamma", 300])
println "All tasks spawned. Waiting..."
task::wait()
println "All tasks finished."
Channel¶
A synchronization structure for asynchronous producer-consumer patterns, acting as a thread-safe queue to pass data between concurrent tasks, ensuring items are processed in FIFO order.
Constructor¶
Creates a channel. When capacity is 0, the channel is "unbounded."
Parameters
| Type | Name | Description |
|---|---|---|
integer |
capacity |
The number of items the channel may store. |
close()¶
Closes the channel.
Parameters None
closed()¶
Returns true if the channel is closed.
Parameters None
Returns
| Type | Description |
|---|---|
boolean |
true when the channel is closed. |
recv()¶
Reads an item from the channel.
Parameters None
Returns
| Type | Description |
|---|---|
any |
The item. |
send(data)¶
Writes data to a channel.
Parameters
| Type | Name | Description |
|---|---|---|
any |
data |
An item to store on the channel. |
try_recv()¶
Attempts to receive an item from the channel.
Parameters None
Returns
| Type | Description |
|---|---|
list |
Returns a list containing two values: a boolean indicating whether a value was received, and the value if received (otherwise 0). |
Example
import "task"
ch = task::channel(4)
# Fill the channel with some values
for i in [1 to 4] do
ch.send(i * 10)
end
ch.close()
# Non-blocking drain via try_recv
while true do
pair = ch.try_recv()
ok = pair[0]
val = pair[1]
break when !ok
println "got: ${val}"
end
println "Channel closed: ${ch.closed()}"
# => got: 10
# => got: 20
# => got: 30
# => got: 40
# => Channel closed: true