Skip to content

Actions

Actions are named, durable task types. You register an action once, then invoke it on demand — Flo handles dispatch, retries, lease management, and dead-letter routing. The handler can run in two places:

  • WASM — Compile your logic to WebAssembly and deploy it to Flo. The action executes inline on the shard, with sub-millisecond overhead and zero network hops.
  • User-hosted workers — Run a long-lived process (in Go, Python, JS, Zig, or any language that speaks the wire protocol) that pulls tasks from Flo and reports results.

Both models share the same invoke / status / list / delete API. The difference is where the code runs.

┌──────────────┐ ┌───────────────────────┐
│ Client │ │ Flo Server (Shard) │
│ │ action invoke │ │
│ flo action │ ─────────────────▸│ ActionHandler │
│ invoke X │ │ ├─ WASM? Execute │◄── inline, ~µs
│ │ ◀─ run_id ───────┤ │ inline │
│ │ │ └─ User? Queue │
└──────────────┘ │ pending │
└──────────┬────────────┘
┌──────────────┐ action_await │
│ Worker │ ◄─────────────────────────────┘
│ (Go/Py/JS) │ task_id + payload
│ │
│ handler() │
│ │ action_complete(result)
│ │ ────────────────────────────▸ Flo
└──────────────┘

Terminal window
flo action register send-email --timeout 60000 --max-retries 3
Terminal window
flo action invoke send-email '{"to":"alice@example.com","subject":"Welcome!"}'
# → Result: send-email-1
Terminal window
flo action status send-email-1
# → RUN ID STATUS CREATED
# send-email-1 pending 2026-03-14 10:00:00

For user-hosted actions, nothing happens until a worker picks up the task. For WASM actions, the result is available immediately.


A named task type stored in Flo’s action registry. Each action has:

FieldDefaultDescription
nameUnique name within a namespace (max 256 chars)
typeuseruser (worker-hosted) or wasm (Flo-hosted)
timeout_ms30000Max execution time before timeout
max_retries3Retries before dead-lettering
retry_delay_ms1000Base delay for exponential backoff
descriptionHuman-readable description
version1Auto-incremented on re-registration
enabledtrueCan be disabled to block new invocations

WASM actions additionally have:

FieldDefaultDescription
wasm_moduleWASM binary bytes
wasm_entrypointhandleExport function name
wasm_memory_limit16 MBMax linear memory (in pages of 64 KB)

A single invocation of an action. Every invoke creates a run with a unique ID.

StatusDescription
pendingQueued, waiting for a worker (or WASM execution)
runningClaimed by a worker, currently executing
completedFinished successfully with output
failedFailed permanently (retries exhausted or explicit fail)
cancelledCancelled by user
timed_outExecution exceeded timeout_ms

A long-running process that pulls pending tasks from Flo and executes them. Workers:

  • Register with Flo, declaring which action types they handle
  • Await tasks using long polling (blocking dequeue)
  • Execute the handler, optionally extending the lease with touch
  • Report completion or failure back to Flo

Workers can run anywhere — on the same machine, in a container, across the network. Multiple workers can handle the same action type for horizontal scaling.


WASM actions execute inside the Flo shard with no network overhead. They’re ideal for:

  • Pure data transformations (validation, enrichment, rules engines)
  • Low-latency operations (sub-millisecond execution)
  • Self-contained logic with no external dependencies

The WASM module must export three functions:

handle(input_ptr: u32, input_len: u32) → i64
alloc(size: u32) → u32
dealloc(ptr: u32, size: u32) → void

handle receives the input bytes and returns a packed i64: the upper 32 bits are the output pointer, the lower 32 bits are the output length. Negative return values indicate errors:

Return CodeMeaning
-1Invalid input
-2Allocation failed
-3Execution error

Optional exports:

ExportPurpose
init() → i32Called once after module instantiation
describe() → i64Returns a JSON description (packed ptr|len)

WASM guest code can call these host functions:

FunctionDescription
flo.log(level, msg_ptr, msg_len)Log a message (0=debug, 1=info, 2=warn, 3=error)
flo.kv_get(key_ptr, key_len, buf_ptr, buf_len) → i32Read from KV store
flo.kv_set(key_ptr, key_len, val_ptr, val_len) → i32Write to KV store
flo.kv_delete(key_ptr, key_len) → i32Delete from KV store

WASI shims (wasi_snapshot_preview1.*) are available for modules compiled with a WASI target.

Each shard allows up to 4 concurrent WASM executions (configurable via max_concurrent_executions). Additional invocations queue until a slot opens.

Here’s a complete rules engine in Zig targeting wasm32-freestanding:

// rules_engine.zig — Build with:
// zig build-lib -target wasm32-freestanding -O ReleaseSmall rules_engine.zig
var heap: [65536]u8 = undefined;
var heap_offset: usize = 0;
export fn alloc(size: u32) u32 {
const s: usize = @intCast(size);
const aligned = (heap_offset + 7) & ~@as(usize, 7);
if (aligned + s > heap.len) return 0;
const ptr: [*]u8 = @ptrCast(&heap[aligned]);
heap_offset = aligned + s;
return @intFromPtr(ptr);
}
export fn dealloc(_: u32, _: u32) void {}
export fn handle(input_ptr: [*]const u8, input_len: u32) i64 {
heap_offset = 0;
const input = input_ptr[0..input_len];
// Parse and evaluate rules against input...
const output = processRules(input);
const out_ptr = alloc(@intCast(output.len));
if (out_ptr == 0) return -2;
const dest: [*]u8 = @ptrFromInt(out_ptr);
@memcpy(dest[0..output.len], output);
return (@as(i64, out_ptr) << 32) | @as(i64, @intCast(output.len));
}

Register it:

Terminal window
flo action register rules-engine --wasm ./rules_engine.wasm

Now invocations execute inline — no worker needed:

Terminal window
flo action invoke rules-engine '{"age": 25, "country": "US"}'
# → Result: rules-engine-1 (completed immediately)
flo action status rules-engine-1
# → status: completed, output: {"eligible":true,"rules_evaluated":2,"rules_passed":2}

User-hosted actions are executed by external worker processes. This is the model for:

  • Actions that call external APIs (payment gateways, email services)
  • Long-running tasks (report generation, media processing)
  • Logic that needs access to your infrastructure (databases, file systems)

The flow:

  1. Invoke creates a run in pending status
  2. A worker calls await (long poll) and receives the task
  3. The worker runs the handler and calls complete or fail
  4. Flo updates the run status and stores the result

┌──────────┐ register ┌──────────┐ await ┌──────────┐
│ Init │ ──────────▸ │ Idle │ ──────────▸ │ Execute │
│ │ │ (polling)│ │ │
└──────────┘ └──────┬───┘ └────┬─────┘
│ │
heartbeat (30s) complete / fail
│ │
▼ ▼
┌──────────┐ ┌──────────┐
│ Draining │ │ Idle │
└──────────┘ └──────────┘

The worker announces itself and the action types it handles:

Terminal window
flo worker register worker-1 process-order send-email

Registration includes:

  • Worker ID — unique identifier (auto-generated if omitted)
  • Task types — list of action names this worker can execute
  • Max concurrency — how many tasks in parallel
  • Machine ID — for grouping workers on the same host
  • Metadata — arbitrary JSON for discovery

Workers block-wait for tasks:

Terminal window
flo worker await process-order --worker-id worker-1 --block 30000

When a matching pending task exists, Flo returns a task assignment:

FieldDescription
task_idThe run ID to complete/fail against
task_typeAction name
payloadInput bytes from the invoke call
created_atWhen the invoke happened
attemptAttempt number (starts at 1, increments on retry)

Report successful completion with the result:

Terminal window
flo worker complete <task-id> --worker-id worker-1 --action process-order --result '{"status":"done"}'

Report failure, optionally requesting a retry:

Terminal window
# Retry — task goes back to pending
flo worker fail <task-id> --worker-id worker-1 --action process-order --error "Temporary failure" --retry
# Permanent failure — task is marked failed
flo worker fail <task-id> --worker-id worker-1 --action process-order --error "Invalid input"

For long-running tasks, extend the execution lease to prevent timeout:

Terminal window
flo worker touch <task-id> --worker-id worker-1 --action process-order --extend 30000

Workers send periodic heartbeats (typically every 30 seconds) to report their current load and stay registered. If the server responds with a draining status, the worker should stop accepting new tasks and finish current ones.

Signal that the worker should finish current tasks but accept no new ones:

Terminal window
flo worker drain --worker-id worker-1

Actions can require specific worker capabilities using labels. When invoking an action with labels, only workers whose labels are a superset of the required labels will receive the task.

Terminal window
# Invoke with required labels
flo action invoke render-video '{"url":"..."}' --labels '{"gpu":true,"vram_gb":24}'

A worker with labels {"gpu":true, "vram_gb":24, "region":"us-east"} matches — it has all required keys with equal values. A worker with {"gpu":true, "vram_gb":16} does not matchvram_gb differs.

Label matching rules:

  • All keys in required must exist in worker labels
  • Values must be exactly equal (string, number, or boolean)
  • Extra keys on the worker side are ignored
  • Nested objects and arrays are not compared (flat values only)

OptionDefaultDescription
priority10Higher = dequeued first (0–255)
delay_ms0Delay before the task becomes available
idempotency_keyDeduplication key (same key → same run ID)
labelsRequired worker labels (JSON object)
namespacedefaultNamespace isolation

Use idempotency_key to prevent duplicate invocations:

Terminal window
flo action invoke charge-payment '{"order":"ORD-123"}' --idempotency-key order-123-charge
# Same key returns the existing run (no new execution)
flo action invoke charge-payment '{"order":"ORD-123"}' --idempotency-key order-123-charge
# → same run ID

When a worker reports failure with --retry:

  1. The run status resets to pending
  2. The attempt counter increments
  3. The task goes back to the queue for the next available worker
  4. Backoff delay is applied: retry_delay_ms × 2^(attempt-1)

When retries are exhausted (max_retries reached) or the worker fails without --retry:

  • The run status is set to failed
  • The error message and timestamps are recorded
  • The run can still be queried via action status

Actions and runs are scoped to namespaces. The same action name can exist independently in different namespaces:

Terminal window
flo action register send-email --namespace prod
flo action register send-email --namespace staging
# These create separate runs in separate registries
flo action invoke send-email '{}' --namespace prod
flo action invoke send-email '{}' --namespace staging

Action registrations and run state are persisted to the Unified Append Log (UAL). On node restart:

  • Registrations are replayed — all actions reappear in the registry
  • Runs are replayed — pending and running tasks are restored
  • Workers must re-register and resume polling

The SDKs provide a high-level ActionWorker that handles registration, polling, concurrency, heartbeats, and error recovery. You just write handler functions.

Every action handler receives an ActionContext and returns result bytes:

type ActionHandler func(actx *ActionContext) (result []byte, err error)

The context object passed to every handler:

PropertyTypeDescription
taskId / task_idstringUnique task/run identifier
actionName / action_namestringWhich action this is
input / payloadbytesRaw input from the invoke call
attemptintAttempt number (1-based)
createdAt / created_attimestampWhen the invoke happened
namespacestringNamespace scope

Methods:

MethodDescription
json() / Into()Parse input as JSON (typed or untyped)
toBytes() / Bytes()Serialize a value to JSON bytes for the response
touch(extendMs)Extend the execution lease (for long-running tasks)
package main
import (
"context"
"fmt"
"log"
"os"
"os/signal"
"syscall"
flo "github.com/floruntime/flo-go"
)
type OrderRequest struct {
OrderID string `json:"order_id"`
CustomerID string `json:"customer_id"`
Amount float64 `json:"amount"`
Items []Item `json:"items"`
}
type Item struct {
SKU string `json:"sku"`
Quantity int `json:"quantity"`
}
func processOrder(actx *flo.ActionContext) ([]byte, error) {
var req OrderRequest
if err := actx.Into(&req); err != nil {
return nil, fmt.Errorf("invalid input: %w", err)
}
log.Printf("Processing order %s ($%.2f)", req.OrderID, req.Amount)
// For long-running tasks, extend the lease periodically
for i, item := range req.Items {
log.Printf(" Item %d/%d: %s", i+1, len(req.Items), item.SKU)
// Extend lease every 3 items
if (i+1) % 3 == 0 {
actx.Touch(30000)
}
}
return actx.Bytes(map[string]string{
"order_id": req.OrderID,
"status": "processed",
})
}
func sendEmail(actx *flo.ActionContext) ([]byte, error) {
var input map[string]string
actx.Into(&input)
log.Printf("Sending email to %s", input["to"])
return actx.Bytes(map[string]string{"status": "sent"})
}
func main() {
client := flo.NewClient("localhost:9000",
flo.WithNamespace("myapp"),
)
if err := client.Connect(); err != nil {
log.Fatal(err)
}
defer client.Close()
w, err := client.NewActionWorker(flo.ActionWorkerOptions{
Concurrency: 10,
ActionTimeout: 5 * time.Minute,
})
if err != nil {
log.Fatal(err)
}
defer w.Close()
w.MustRegisterAction("process-order", processOrder)
w.MustRegisterAction("send-email", sendEmail)
// Graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
go func() {
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM)
<-sigCh
w.Stop()
cancel()
}()
log.Println("Worker starting...")
w.Start(ctx)
}
flo.ActionWorkerOptions{
WorkerID: "my-worker", // auto-generated if empty
MachineID: "host-01", // defaults to hostname
Concurrency: 10, // max parallel tasks
ActionTimeout: 5 * time.Minute, // per-task timeout
BlockMS: 30000, // long-poll timeout
}

You don’t need a worker to invoke actions — any client can invoke and check status:

client := flo.NewClient("localhost:9000")
client.Connect()
defer client.Close()
// Register
client.Action.Register("process-image", flo.ActionTypeUser, &flo.ActionRegisterOptions{
TimeoutMS: ptr(60000),
MaxRetries: ptr(3),
Description: "Resize and optimize images",
})
// Invoke
result, _ := client.Action.Invoke("process-image",
[]byte(`{"url":"https://example.com/img.jpg","width":800}`),
&flo.ActionInvokeOptions{
Priority: ptr(uint8(100)),
IdempotencyKey: "img-resize-abc",
},
)
fmt.Println("Run ID:", result.RunID)
// Poll for status
status, _ := client.Action.Status(result.RunID, nil)
fmt.Printf("Status: %s, Output: %s\n", status.Status, status.Output)
// Delete
client.Action.Delete("process-image", nil)

Actions are the building blocks that Workflows compose. A workflow step references an action with the @actions/ prefix:

steps:
charge:
run: "@actions/charge-payment"
retry:
max_attempts: 3
backoff: exponential
transitions:
success: ship
failure: flo.Failed

When a workflow invokes a WASM action, the result is available synchronously. When it invokes a user-hosted action, the workflow parks in waiting until the worker reports completion.


  • Workers — Low-level worker protocol details
  • Workflows — Compose actions into multi-step orchestrations
  • Stream Processing — Continuous data pipelines (different from actions)