Actions
Actions are named, durable task types. You register an action once, then invoke it on demand — Flo handles dispatch, retries, lease management, and dead-letter routing. The handler can run in two places:
- WASM — Compile your logic to WebAssembly and deploy it to Flo. The action executes inline on the shard, with sub-millisecond overhead and zero network hops.
- User-hosted workers — Run a long-lived process (in Go, Python, JS, Zig, or any language that speaks the wire protocol) that pulls tasks from Flo and reports results.
Both models share the same invoke / status / list / delete API. The difference is where the code runs.
┌──────────────┐ ┌───────────────────────┐│ Client │ │ Flo Server (Shard) ││ │ action invoke │ ││ flo action │ ─────────────────▸│ ActionHandler ││ invoke X │ │ ├─ WASM? Execute │◄── inline, ~µs│ │ ◀─ run_id ───────┤ │ inline ││ │ │ └─ User? Queue │└──────────────┘ │ pending │ └──────────┬────────────┘ │┌──────────────┐ action_await ││ Worker │ ◄─────────────────────────────┘│ (Go/Py/JS) │ task_id + payload│ ││ handler() ││ │ action_complete(result)│ │ ────────────────────────────▸ Flo└──────────────┘Quick Start
Section titled “Quick Start”1. Register an action
Section titled “1. Register an action”flo action register send-email --timeout 60000 --max-retries 32. Invoke it
Section titled “2. Invoke it”flo action invoke send-email '{"to":"alice@example.com","subject":"Welcome!"}'# → Result: send-email-13. Check status
Section titled “3. Check status”flo action status send-email-1# → RUN ID STATUS CREATED# send-email-1 pending 2026-03-14 10:00:00For user-hosted actions, nothing happens until a worker picks up the task. For WASM actions, the result is available immediately.
Core Concepts
Section titled “Core Concepts”Action
Section titled “Action”A named task type stored in Flo’s action registry. Each action has:
| Field | Default | Description |
|---|---|---|
name | — | Unique name within a namespace (max 256 chars) |
type | user | user (worker-hosted) or wasm (Flo-hosted) |
timeout_ms | 30000 | Max execution time before timeout |
max_retries | 3 | Retries before dead-lettering |
retry_delay_ms | 1000 | Base delay for exponential backoff |
description | — | Human-readable description |
version | 1 | Auto-incremented on re-registration |
enabled | true | Can be disabled to block new invocations |
WASM actions additionally have:
| Field | Default | Description |
|---|---|---|
wasm_module | — | WASM binary bytes |
wasm_entrypoint | handle | Export function name |
wasm_memory_limit | 16 MB | Max linear memory (in pages of 64 KB) |
A single invocation of an action. Every invoke creates a run with a unique ID.
| Status | Description |
|---|---|
pending | Queued, waiting for a worker (or WASM execution) |
running | Claimed by a worker, currently executing |
completed | Finished successfully with output |
failed | Failed permanently (retries exhausted or explicit fail) |
cancelled | Cancelled by user |
timed_out | Execution exceeded timeout_ms |
Worker
Section titled “Worker”A long-running process that pulls pending tasks from Flo and executes them. Workers:
- Register with Flo, declaring which action types they handle
- Await tasks using long polling (blocking dequeue)
- Execute the handler, optionally extending the lease with
touch - Report completion or failure back to Flo
Workers can run anywhere — on the same machine, in a container, across the network. Multiple workers can handle the same action type for horizontal scaling.
Execution Models
Section titled “Execution Models”WASM Actions (Flo-Hosted)
Section titled “WASM Actions (Flo-Hosted)”WASM actions execute inside the Flo shard with no network overhead. They’re ideal for:
- Pure data transformations (validation, enrichment, rules engines)
- Low-latency operations (sub-millisecond execution)
- Self-contained logic with no external dependencies
The WASM module must export three functions:
handle(input_ptr: u32, input_len: u32) → i64alloc(size: u32) → u32dealloc(ptr: u32, size: u32) → voidhandle receives the input bytes and returns a packed i64: the upper 32 bits are the output pointer, the lower 32 bits are the output length. Negative return values indicate errors:
| Return Code | Meaning |
|---|---|
-1 | Invalid input |
-2 | Allocation failed |
-3 | Execution error |
Optional exports:
| Export | Purpose |
|---|---|
init() → i32 | Called once after module instantiation |
describe() → i64 | Returns a JSON description (packed ptr|len) |
Host Functions
Section titled “Host Functions”WASM guest code can call these host functions:
| Function | Description |
|---|---|
flo.log(level, msg_ptr, msg_len) | Log a message (0=debug, 1=info, 2=warn, 3=error) |
flo.kv_get(key_ptr, key_len, buf_ptr, buf_len) → i32 | Read from KV store |
flo.kv_set(key_ptr, key_len, val_ptr, val_len) → i32 | Write to KV store |
flo.kv_delete(key_ptr, key_len) → i32 | Delete from KV store |
WASI shims (wasi_snapshot_preview1.*) are available for modules compiled with a WASI target.
Concurrency
Section titled “Concurrency”Each shard allows up to 4 concurrent WASM executions (configurable via max_concurrent_executions). Additional invocations queue until a slot opens.
Building a WASM Action
Section titled “Building a WASM Action”Here’s a complete rules engine in Zig targeting wasm32-freestanding:
// rules_engine.zig — Build with:// zig build-lib -target wasm32-freestanding -O ReleaseSmall rules_engine.zig
var heap: [65536]u8 = undefined;var heap_offset: usize = 0;
export fn alloc(size: u32) u32 { const s: usize = @intCast(size); const aligned = (heap_offset + 7) & ~@as(usize, 7); if (aligned + s > heap.len) return 0; const ptr: [*]u8 = @ptrCast(&heap[aligned]); heap_offset = aligned + s; return @intFromPtr(ptr);}
export fn dealloc(_: u32, _: u32) void {}
export fn handle(input_ptr: [*]const u8, input_len: u32) i64 { heap_offset = 0; const input = input_ptr[0..input_len];
// Parse and evaluate rules against input... const output = processRules(input);
const out_ptr = alloc(@intCast(output.len)); if (out_ptr == 0) return -2; const dest: [*]u8 = @ptrFromInt(out_ptr); @memcpy(dest[0..output.len], output);
return (@as(i64, out_ptr) << 32) | @as(i64, @intCast(output.len));}Register it:
flo action register rules-engine --wasm ./rules_engine.wasmNow invocations execute inline — no worker needed:
flo action invoke rules-engine '{"age": 25, "country": "US"}'# → Result: rules-engine-1 (completed immediately)
flo action status rules-engine-1# → status: completed, output: {"eligible":true,"rules_evaluated":2,"rules_passed":2}User-Hosted Actions (Workers)
Section titled “User-Hosted Actions (Workers)”User-hosted actions are executed by external worker processes. This is the model for:
- Actions that call external APIs (payment gateways, email services)
- Long-running tasks (report generation, media processing)
- Logic that needs access to your infrastructure (databases, file systems)
The flow:
- Invoke creates a run in
pendingstatus - A worker calls
await(long poll) and receives the task - The worker runs the handler and calls complete or fail
- Flo updates the run status and stores the result
Worker Lifecycle
Section titled “Worker Lifecycle”┌──────────┐ register ┌──────────┐ await ┌──────────┐│ Init │ ──────────▸ │ Idle │ ──────────▸ │ Execute ││ │ │ (polling)│ │ │└──────────┘ └──────┬───┘ └────┬─────┘ │ │ heartbeat (30s) complete / fail │ │ ▼ ▼ ┌──────────┐ ┌──────────┐ │ Draining │ │ Idle │ └──────────┘ └──────────┘Register
Section titled “Register”The worker announces itself and the action types it handles:
flo worker register worker-1 process-order send-emailRegistration includes:
- Worker ID — unique identifier (auto-generated if omitted)
- Task types — list of action names this worker can execute
- Max concurrency — how many tasks in parallel
- Machine ID — for grouping workers on the same host
- Metadata — arbitrary JSON for discovery
Await (Long Poll)
Section titled “Await (Long Poll)”Workers block-wait for tasks:
flo worker await process-order --worker-id worker-1 --block 30000When a matching pending task exists, Flo returns a task assignment:
| Field | Description |
|---|---|
task_id | The run ID to complete/fail against |
task_type | Action name |
payload | Input bytes from the invoke call |
created_at | When the invoke happened |
attempt | Attempt number (starts at 1, increments on retry) |
Complete
Section titled “Complete”Report successful completion with the result:
flo worker complete <task-id> --worker-id worker-1 --action process-order --result '{"status":"done"}'Report failure, optionally requesting a retry:
# Retry — task goes back to pendingflo worker fail <task-id> --worker-id worker-1 --action process-order --error "Temporary failure" --retry
# Permanent failure — task is marked failedflo worker fail <task-id> --worker-id worker-1 --action process-order --error "Invalid input"Touch (Lease Extension)
Section titled “Touch (Lease Extension)”For long-running tasks, extend the execution lease to prevent timeout:
flo worker touch <task-id> --worker-id worker-1 --action process-order --extend 30000Heartbeat
Section titled “Heartbeat”Workers send periodic heartbeats (typically every 30 seconds) to report their current load and stay registered. If the server responds with a draining status, the worker should stop accepting new tasks and finish current ones.
Signal that the worker should finish current tasks but accept no new ones:
flo worker drain --worker-id worker-1Labels
Section titled “Labels”Actions can require specific worker capabilities using labels. When invoking an action with labels, only workers whose labels are a superset of the required labels will receive the task.
# Invoke with required labelsflo action invoke render-video '{"url":"..."}' --labels '{"gpu":true,"vram_gb":24}'A worker with labels {"gpu":true, "vram_gb":24, "region":"us-east"} matches — it has all required keys with equal values. A worker with {"gpu":true, "vram_gb":16} does not match — vram_gb differs.
Label matching rules:
- All keys in
requiredmust exist inworkerlabels - Values must be exactly equal (string, number, or boolean)
- Extra keys on the worker side are ignored
- Nested objects and arrays are not compared (flat values only)
Invocation Options
Section titled “Invocation Options”| Option | Default | Description |
|---|---|---|
priority | 10 | Higher = dequeued first (0–255) |
delay_ms | 0 | Delay before the task becomes available |
idempotency_key | — | Deduplication key (same key → same run ID) |
labels | — | Required worker labels (JSON object) |
namespace | default | Namespace isolation |
Idempotency
Section titled “Idempotency”Use idempotency_key to prevent duplicate invocations:
flo action invoke charge-payment '{"order":"ORD-123"}' --idempotency-key order-123-charge
# Same key returns the existing run (no new execution)flo action invoke charge-payment '{"order":"ORD-123"}' --idempotency-key order-123-charge# → same run IDRetry Behavior
Section titled “Retry Behavior”When a worker reports failure with --retry:
- The run status resets to
pending - The attempt counter increments
- The task goes back to the queue for the next available worker
- Backoff delay is applied:
retry_delay_ms × 2^(attempt-1)
When retries are exhausted (max_retries reached) or the worker fails without --retry:
- The run status is set to
failed - The error message and timestamps are recorded
- The run can still be queried via
action status
Namespace Isolation
Section titled “Namespace Isolation”Actions and runs are scoped to namespaces. The same action name can exist independently in different namespaces:
flo action register send-email --namespace prodflo action register send-email --namespace staging
# These create separate runs in separate registriesflo action invoke send-email '{}' --namespace prodflo action invoke send-email '{}' --namespace stagingPersistence
Section titled “Persistence”Action registrations and run state are persisted to the Unified Append Log (UAL). On node restart:
- Registrations are replayed — all actions reappear in the registry
- Runs are replayed — pending and running tasks are restored
- Workers must re-register and resume polling
SDK: Building Workers
Section titled “SDK: Building Workers”The SDKs provide a high-level ActionWorker that handles registration, polling, concurrency, heartbeats, and error recovery. You just write handler functions.
Handler Signature
Section titled “Handler Signature”Every action handler receives an ActionContext and returns result bytes:
type ActionHandler func(actx *ActionContext) (result []byte, err error)async def handler(ctx: ActionContext) -> bytes:type ActionHandler = (ctx: ActionContext) => Promise<Uint8Array>;pub const ActionHandler = *const fn (*ActionContext) anyerror![]const u8;ActionContext
Section titled “ActionContext”The context object passed to every handler:
| Property | Type | Description |
|---|---|---|
taskId / task_id | string | Unique task/run identifier |
actionName / action_name | string | Which action this is |
input / payload | bytes | Raw input from the invoke call |
attempt | int | Attempt number (1-based) |
createdAt / created_at | timestamp | When the invoke happened |
namespace | string | Namespace scope |
Methods:
| Method | Description |
|---|---|
json() / Into() | Parse input as JSON (typed or untyped) |
toBytes() / Bytes() | Serialize a value to JSON bytes for the response |
touch(extendMs) | Extend the execution lease (for long-running tasks) |
Complete Examples
Section titled “Complete Examples”package main
import ( "context" "fmt" "log" "os" "os/signal" "syscall"
flo "github.com/floruntime/flo-go")
type OrderRequest struct { OrderID string `json:"order_id"` CustomerID string `json:"customer_id"` Amount float64 `json:"amount"` Items []Item `json:"items"`}
type Item struct { SKU string `json:"sku"` Quantity int `json:"quantity"`}
func processOrder(actx *flo.ActionContext) ([]byte, error) { var req OrderRequest if err := actx.Into(&req); err != nil { return nil, fmt.Errorf("invalid input: %w", err) }
log.Printf("Processing order %s ($%.2f)", req.OrderID, req.Amount)
// For long-running tasks, extend the lease periodically for i, item := range req.Items { log.Printf(" Item %d/%d: %s", i+1, len(req.Items), item.SKU)
// Extend lease every 3 items if (i+1) % 3 == 0 { actx.Touch(30000) } }
return actx.Bytes(map[string]string{ "order_id": req.OrderID, "status": "processed", })}
func sendEmail(actx *flo.ActionContext) ([]byte, error) { var input map[string]string actx.Into(&input)
log.Printf("Sending email to %s", input["to"])
return actx.Bytes(map[string]string{"status": "sent"})}
func main() { client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp"), ) if err := client.Connect(); err != nil { log.Fatal(err) } defer client.Close()
w, err := client.NewActionWorker(flo.ActionWorkerOptions{ Concurrency: 10, ActionTimeout: 5 * time.Minute, }) if err != nil { log.Fatal(err) } defer w.Close()
w.MustRegisterAction("process-order", processOrder) w.MustRegisterAction("send-email", sendEmail)
// Graceful shutdown ctx, cancel := context.WithCancel(context.Background()) go func() { sigCh := make(chan os.Signal, 1) signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) <-sigCh w.Stop() cancel() }()
log.Println("Worker starting...") w.Start(ctx)}import asyncioimport loggingimport signalfrom flo import FloClient, ActionContext
logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)
async def process_order(ctx: ActionContext) -> bytes: data = ctx.json() logger.info(f"Processing order {data['order_id']} (${data['amount']:.2f})")
items = data.get("items", []) for i, item in enumerate(items): logger.info(f" Item {i+1}/{len(items)}: {item['sku']}") await asyncio.sleep(1)
# Extend lease every 3 items if (i + 1) % 3 == 0: await ctx.touch(30000)
return ctx.to_bytes({ "order_id": data["order_id"], "status": "processed", })
async def send_email(ctx: ActionContext) -> bytes: data = ctx.json() logger.info(f"Sending email to {data['to']}") return ctx.to_bytes({"status": "sent"})
async def main(): client = FloClient("localhost:9000", namespace="myapp") await client.connect()
worker = client.new_action_worker(concurrency=10, action_timeout=300)
worker.register_action("process-order", process_order) worker.register_action("send-email", send_email)
# Decorator syntax also works @worker.action("health-check") async def health_check(ctx: ActionContext) -> bytes: return ctx.to_bytes({"status": "healthy"})
# Shutdown handling stop = asyncio.Event() loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, lambda: (worker.stop(), stop.set()))
try: await worker.start() finally: await worker.close() await client.close()
asyncio.run(main())import { ActionWorker, ActionContext } from "@floruntime/node";
async function processOrder(ctx: ActionContext): Promise<Uint8Array> { const req = ctx.json<{ orderId: string; amount: number; items: { sku: string; quantity: number }[]; }>();
console.log(`Processing order ${req.orderId} ($${req.amount.toFixed(2)})`);
for (let i = 0; i < req.items.length; i++) { console.log(` Item ${i + 1}/${req.items.length}: ${req.items[i].sku}`); await new Promise((r) => setTimeout(r, 1000));
// Extend lease every 3 items if ((i + 1) % 3 === 0) { await ctx.touch(30000); } }
return ctx.toBytes({ orderId: req.orderId, status: "processed", });}
async function sendEmail(ctx: ActionContext): Promise<Uint8Array> { const data = ctx.json<{ to: string; subject: string }>(); console.log(`Sending email to ${data.to}`); return ctx.toBytes({ status: "sent" });}
const worker = new ActionWorker({ endpoint: "localhost:9000", namespace: "myapp", concurrency: 10, actionTimeoutMs: 300_000,});
worker.action("process-order", processOrder);worker.action("send-email", sendEmail);
process.on("SIGINT", () => worker.stop());
await worker.start();await worker.close();const std = @import("std");const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) anyerror![]const u8 { const input = try ctx.json(struct { order_id: []const u8, amount: f64, items: []const struct { sku: []const u8, quantity: u32 }, });
std.log.info("Processing order {s} (${d:.2})", .{ input.order_id, input.amount });
for (input.items, 0..) |item, i| { std.log.info(" Item {d}/{d}: {s}", .{ i + 1, input.items.len, item.sku });
if ((i + 1) % 3 == 0) { try ctx.touch(30000); } }
return ctx.toBytes(.{ .order_id = input.order_id, .status = "processed", });}
fn sendEmail(ctx: *flo.ActionContext) anyerror![]const u8 { const input = try ctx.json(struct { to: []const u8, subject: []const u8 }); std.log.info("Sending email to {s}", .{input.to}); return ctx.toBytes(.{ .status = "sent" });}
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .concurrency = 10, .action_timeout_ms = 300_000, }); defer worker.deinit();
try worker.registerAction("process-order", processOrder); try worker.registerAction("send-email", sendEmail);
try worker.start();}Worker Configuration
Section titled “Worker Configuration”flo.ActionWorkerOptions{ WorkerID: "my-worker", // auto-generated if empty MachineID: "host-01", // defaults to hostname Concurrency: 10, // max parallel tasks ActionTimeout: 5 * time.Minute, // per-task timeout BlockMS: 30000, // long-poll timeout}client.new_action_worker( worker_id="my-worker", # auto-generated if empty concurrency=10, # max parallel tasks action_timeout=300, # seconds block_ms=30000, # long-poll timeout)new ActionWorker({ endpoint: "localhost:9000", namespace: "myapp", workerId: "my-worker", // auto-generated if omitted machineId: "host-01", concurrency: 10, // max parallel tasks actionTimeoutMs: 300_000, // per-task timeout blockMs: 30_000, // long-poll timeout heartbeatIntervalMs: 30_000, // heartbeat interval});flo.WorkerConfig{ .endpoint = "localhost:9000", .namespace = "myapp", .worker_id = "my-worker", // null = auto-generated .concurrency = 10, // max parallel tasks .action_timeout_ms = 300_000, // per-task timeout .block_ms = 30_000, // long-poll timeout .heartbeat_interval_ms = 30_000, // heartbeat interval .machine_id = "host-01",}SDK: Invoking Actions
Section titled “SDK: Invoking Actions”You don’t need a worker to invoke actions — any client can invoke and check status:
client := flo.NewClient("localhost:9000")client.Connect()defer client.Close()
// Registerclient.Action.Register("process-image", flo.ActionTypeUser, &flo.ActionRegisterOptions{ TimeoutMS: ptr(60000), MaxRetries: ptr(3), Description: "Resize and optimize images",})
// Invokeresult, _ := client.Action.Invoke("process-image", []byte(`{"url":"https://example.com/img.jpg","width":800}`), &flo.ActionInvokeOptions{ Priority: ptr(uint8(100)), IdempotencyKey: "img-resize-abc", },)fmt.Println("Run ID:", result.RunID)
// Poll for statusstatus, _ := client.Action.Status(result.RunID, nil)fmt.Printf("Status: %s, Output: %s\n", status.Status, status.Output)
// Deleteclient.Action.Delete("process-image", nil)from flo import FloClient, ActionType, ActionRegisterOptions, ActionInvokeOptions
async with FloClient("localhost:9000") as client: # Register await client.action.register("process-image", ActionType.USER, ActionRegisterOptions(timeout_ms=60000, max_retries=3))
# Invoke result = await client.action.invoke("process-image", b'{"url":"https://example.com/img.jpg","width":800}', ActionInvokeOptions(priority=100, idempotency_key="img-resize-abc")) print(f"Run ID: {result.run_id}")
# Poll for status status = await client.action.status(result.run_id) print(f"Status: {status.status}")
# Delete await client.action.delete("process-image")import { FloClient, ActionType } from "@floruntime/node";
const client = new FloClient("localhost:9000");await client.connect();
// Registerawait client.action.register("process-image", ActionType.User, { timeoutMs: 60000, maxRetries: 3,});
// Invokeconst result = await client.action.invoke( "process-image", encode('{"url":"https://example.com/img.jpg","width":800}'), { priority: 100, idempotencyKey: "img-resize-abc" });console.log("Run ID:", result.runId);
// Poll for statusconst status = await client.action.status(result.runId);console.log("Status:", status.status);
// Deleteawait client.action.delete("process-image");var client = try flo.Client.init(allocator, "localhost:9000", .{});defer client.deinit();try client.connect();
// Registertry client.actions.registerAction("process-image", .user, .{ .timeout_ms = 60000, .max_retries = 3,});
// Invokeconst result = try client.actions.invoke("process-image", "{\"url\":\"https://example.com/img.jpg\"}", .{ .priority = 100, .idempotency_key = "img-resize-abc",});defer result.deinit();
// Statusconst status = try client.actions.getStatus(result.run_id, .{});defer status.deinit();
// Deletetry client.actions.deleteAction("process-image", .{});Workflows Integration
Section titled “Workflows Integration”Actions are the building blocks that Workflows compose. A workflow step references an action with the @actions/ prefix:
steps: charge: run: "@actions/charge-payment" retry: max_attempts: 3 backoff: exponential transitions: success: ship failure: flo.FailedWhen a workflow invokes a WASM action, the result is available synchronously. When it invokes a user-hosted action, the workflow parks in waiting until the worker reports completion.
Related Docs
Section titled “Related Docs”- Workers — Low-level worker protocol details
- Workflows — Compose actions into multi-step orchestrations
- Stream Processing — Continuous data pipelines (different from actions)