Workers
Workers are long-running processes that connect to Flo and execute tasks. There are two kinds:
- Action workers — pull action tasks from a queue (long-polling), execute a handler, and report results. Used for durable task processing with retries, timeouts, and dead-letter routing.
- Stream workers — consume records from a stream via consumer groups, process each record, and auto-ack or nack. Used for continuous event processing.
Both worker types register themselves in the worker registry, send periodic heartbeats, report load metrics, and support graceful drain. The server tracks each worker’s health and can mark stale workers as unhealthy after 90 seconds of silence.
Worker Types
Section titled “Worker Types”| Type | Purpose | Task Source | Completion |
|---|---|---|---|
| Action | Durable task execution | action_await long-poll | Explicit complete / fail |
| Stream | Continuous event processing | group_read on stream | Auto-ack on success, auto-nack on error |
Both types share the same registration, heartbeat, drain, and health-monitoring infrastructure.
Lifecycle
Section titled “Lifecycle”Register ──▸ Heartbeat loop ──▸ Process tasks ──▸ Drain ──▸ Deregister │ │ │ │ │ │ every 30s await/poll sticky cleanup │ reports load execute no new remove │ │ complete/fail tasks record ▼ ▼ ▼ ▼ ▼ active active active/idle draining (removed)1. Register
Section titled “1. Register”The worker announces itself to the server with:
| Field | Description |
|---|---|
| worker_id | Unique identifier (auto-generated from hostname + random hex if not provided) |
| worker_type | action (0) or stream (1) |
| max_concurrency | Maximum parallel tasks (default: 10) |
| processes | List of action names or stream/group pairs this worker handles |
| metadata | Optional JSON string (labels, version, environment) |
| machine_id | Optional host identifier for grouping workers on the same machine |
Re-registering the same worker ID replaces the previous registration.
2. Heartbeat
Section titled “2. Heartbeat”Workers send a heartbeat every 30 seconds (configurable). The heartbeat carries the worker’s current_load (number of in-flight tasks). The server responds with the worker’s current status — if the server returns draining, the SDK stops accepting new tasks.
If a worker misses heartbeats for 90 seconds, the server marks it as unhealthy.
3. Process
Section titled “3. Process”Action workers long-poll with action_await, receive a TaskAssignment (task ID, action name, payload, attempt number), execute the registered handler, then call complete (with result bytes) or fail (with error message + optional retry flag).
Stream workers call group_read on a consumer group, receive a batch of records, process each one through the handler, and auto-ack on success or auto-nack on error.
4. Drain
Section titled “4. Drain”Drain is a graceful shutdown mechanism. When a worker is drained:
- The server sets its status to
draining(sticky — it never reverts toactive) - Subsequent heartbeats return
drainingso the SDK knows to stop polling - In-flight tasks continue to completion
- No new tasks are dispatched to the worker
- Once all active tasks finish, the worker deregisters
Drain can be initiated by the worker itself or externally via CLI/API.
5. Deregister
Section titled “5. Deregister”Removes the worker from the registry entirely. SDKs call deregister automatically on shutdown (in defer / finally blocks).
Worker Status
Section titled “Worker Status”| Status | Value | Description |
|---|---|---|
active | 0 | Healthy, processing or ready for tasks |
idle | 1 | Connected, no current tasks |
draining | 2 | Finishing current work, accepting nothing new |
unhealthy | 3 | No heartbeat for 90+ seconds |
Status transitions:
┌── heartbeat ──┐ ▼ │ register ──▸ active ◀────────────┘ │ ├── drain ──▸ draining ──▸ deregister │ └── 90s no heartbeat ──▸ unhealthy │ └── heartbeat ──▸ activeAction Workers
Section titled “Action Workers”Action workers poll for tasks, execute handlers, and report outcomes. All four SDKs provide a high-level ActionWorker that handles registration, heartbeats, concurrency control, and drain automatically.
Quick Start
Section titled “Quick Start”package main
import ( "context" "fmt" "os/signal" "syscall"
flo "github.com/razorbill/flo-go")
func main() { ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) defer stop()
client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp")) client.Connect() defer client.Close()
w, _ := client.NewActionWorker(flo.ActionWorkerOptions{ Concurrency: 10, }) defer w.Close()
w.MustRegisterAction("process-order", func(actx *flo.ActionContext) ([]byte, error) { var order map[string]interface{} actx.Into(&order) // Process the order... return actx.Bytes(map[string]string{"status": "done"}) })
w.MustRegisterAction("send-email", func(actx *flo.ActionContext) ([]byte, error) { // Send the email... return nil, nil })
// Blocks until context is cancelled or drain completes w.Start(ctx)}import asyncioimport signalfrom flo import FloClient, ActionContext
async def main(): client = FloClient("localhost:9000", namespace="myapp") await client.connect()
worker = client.new_action_worker(concurrency=5, action_timeout=300)
# Register with function worker.register_action("process-order", process_order)
# Register with decorator @worker.action("send-email") async def send_email(ctx: ActionContext) -> bytes: data = ctx.json() # Send the email... return ctx.to_bytes({"status": "sent"})
# Graceful shutdown on signal loop = asyncio.get_running_loop() for sig in (signal.SIGINT, signal.SIGTERM): loop.add_signal_handler(sig, worker.stop)
try: await worker.start() finally: await worker.close() await client.close()
async def process_order(ctx: ActionContext) -> bytes: order = ctx.json() # Process... return ctx.to_bytes({"status": "processed", "id": order["order_id"]})
asyncio.run(main())import { ActionWorker, ActionContext } from "@floruntime/node";
const worker = new ActionWorker({ endpoint: "localhost:9000", namespace: "myapp", concurrency: 5, actionTimeoutMs: 300_000,});
worker.action("process-order", async (ctx: ActionContext) => { const order = ctx.json<{ orderId: string; amount: number }>(); // Process the order... return ctx.toBytes({ status: "done", orderId: order.orderId });});
worker.action("send-email", async (ctx: ActionContext) => { const data = ctx.json<{ to: string; subject: string }>(); // Send the email... return ctx.toBytes({ status: "sent" });});
process.on("SIGINT", () => worker.stop());process.on("SIGTERM", () => worker.stop());
await worker.start();await worker.close();const std = @import("std");const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) ![]const u8 { const parsed = try ctx.json(struct { order_id: []const u8, amount: f64 }); defer parsed.deinit(); // Process the order... return ctx.toBytes(.{ .status = "done", .order_id = parsed.value.order_id });}
fn sendEmail(ctx: *flo.ActionContext) ![]const u8 { // Send the email... return ctx.toBytes(.{ .status = "sent" });}
pub fn main() !void { var gpa: std.heap.GeneralPurposeAllocator(.{}) = .init; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .concurrency = 10, }); defer worker.deinit();
try worker.registerAction("process-order", processOrder); try worker.registerAction("send-email", sendEmail);
try worker.start();}ActionContext
Section titled “ActionContext”Every action handler receives an ActionContext with helpers for parsing input and formatting output:
| Method | Description |
|---|---|
json() / Into() | Deserialise the input payload from JSON |
toBytes() / Bytes() | Serialise a result value to JSON bytes |
touch(extendMs) | Extend the task lease (prevents timeout during long work) |
taskId / task_id | Unique task identifier |
actionName / action_name | The action that was invoked |
attempt | Attempt number (starts at 1, increments on retry) |
createdAt / created_at | Timestamp when the task was created |
Lease Extension
Section titled “Lease Extension”For long-running tasks, periodically call touch() to extend the lease and prevent the server from reassigning the task to another worker:
w.MustRegisterAction("generate-report", func(actx *flo.ActionContext) ([]byte, error) { // For long tasks, extend the lease periodically for i := 0; i < totalSteps; i++ { processStep(i) if (i+1)%3 == 0 { actx.Touch(30000) // extend by 30s } } return actx.Bytes(result)})async def generate_report(ctx: ActionContext) -> bytes: for step in range(total_steps): await process_step(step) if (step + 1) % 3 == 0: await ctx.touch(30000) # extend by 30s return ctx.to_bytes(result)worker.action("generate-report", async (ctx) => { for (let i = 0; i < totalSteps; i++) { await processStep(i); if ((i + 1) % 3 === 0) { await ctx.touch(30000); // extend by 30s } } return ctx.toBytes(result);});fn generateReport(ctx: *flo.ActionContext) ![]const u8 { for (0..total_steps) |i| { processStep(i); if ((i + 1) % 3 == 0) { try ctx.touch(30000); // extend by 30s } } return ctx.toBytes(result);}Error Handling and Retries
Section titled “Error Handling and Retries”When a handler returns an error, the SDK automatically reports a failure with retry: true. The server re-queues the task for another attempt (up to the action’s max retry count). Specific error types change the behaviour:
| Condition | Retry | Notes |
|---|---|---|
| Handler returns error | Yes | Auto-retry with exponential backoff |
| Handler panics / throws | Yes | SDK catches panics, reports failure |
NonRetryableError / NewNonRetryableErrorf | No | Immediately failed — no retries, no DLQ |
| Context deadline exceeded | No | Task is permanently failed |
| Context cancelled (shutdown) | No | Task is permanently failed |
Explicit fail(..., retry: false) | No | Moves to dead-letter queue |
NonRetryableError
Section titled “NonRetryableError”Use NonRetryableError when the task has definitively failed and retrying would be pointless (invalid input, business rule violations, permanent service rejections). The task is immediately marked failed — no retry delay, no backoff, no dead-letter queue.
import { NonRetryableError } from "@floruntime/node";
worker.action("validate-order", async (ctx) => { const { orderId, amount } = ctx.json<{ orderId?: string; amount?: number }>();
if (!orderId) { throw new NonRetryableError("missing orderId"); } if ((amount ?? 0) > 2000) { throw new NonRetryableError(`order amount $${amount} exceeds $2000 limit`); } return ctx.toBytes({ valid: true, orderId });});w.MustRegisterAction("validate-order", func(actx *flo.ActionContext) (interface{}, error) { var data struct { OrderID string `json:"orderId"` Amount float64 `json:"amount"` } actx.Into(&data)
if data.OrderID == "" { return nil, flo.NewNonRetryableErrorf("missing orderId") } if data.Amount > 2000 { return nil, flo.NewNonRetryableErrorf("order amount $%.0f exceeds $2000 limit", data.Amount) } return map[string]interface{}{"valid": true, "orderId": data.OrderID}, nil})from flo import NonRetryableError
@worker.action("validate-order")async def validate_order(ctx: ActionContext) -> bytes: data = ctx.json() if not data.get("orderId"): raise NonRetryableError("missing orderId") if data.get("amount", 0) > 2000: raise NonRetryableError(f"order amount ${data['amount']} exceeds $2000 limit") return ctx.to_bytes({"valid": True, "orderId": data["orderId"]})When a workflow step handler throws NonRetryableError, the workflow run is routed through the failure transition without waiting for retries.
Named Outcomes
Section titled “Named Outcomes”By default, a returning handler produces the success outcome. To route a workflow down a specific named transition branch, use ctx.result(outcomeName, data). This is used for outcome-based routing — where the same step can branch to different next steps depending on the business decision.
import { ActionResult } from "@floruntime/node";
worker.action("review-order", async (ctx): Promise<ActionResult> => { const { orderId, amount } = ctx.json<{ orderId: string; amount: number }>();
if (amount < 100) { // Workflow follows "approved" transition return ctx.result("approved", { orderId, decision: "auto-approved" }); } else if (amount >= 500) { // Workflow follows "rejected" transition return ctx.result("rejected", { orderId, reason: "amount too high" }); } else { // Workflow follows "needs_review" transition return ctx.result("needs_review", { orderId, note: "manual review required" }); }});w.MustRegisterAction("review-order", func(actx *flo.ActionContext) (interface{}, error) { var data struct { OrderID string `json:"orderId"` Amount float64 `json:"amount"` } actx.Into(&data)
if data.Amount < 100 { return actx.Result("approved", map[string]interface{}{ "orderId": data.OrderID, "decision": "auto-approved", }) } else if data.Amount >= 500 { return actx.Result("rejected", map[string]interface{}{ "orderId": data.OrderID, "reason": "amount too high", }) } return actx.Result("needs_review", map[string]interface{}{ "orderId": data.OrderID, "note": "manual review required", })})@worker.action("review-order")async def review_order(ctx: ActionContext) -> bytes: data = ctx.json() amount = data.get("amount", 0)
if amount < 100: return ctx.result("approved", {"orderId": data["orderId"], "decision": "auto-approved"}) elif amount >= 500: return ctx.result("rejected", {"orderId": data["orderId"], "reason": "amount too high"}) else: return ctx.result("needs_review", {"orderId": data["orderId"], "note": "manual review required"})The workflow YAML declares the named outcome transitions:
start: run: "@actions/review-order" transitions: approved: fulfill # ctx.result("approved") → go to fulfill step rejected: notify_rejection # ctx.result("rejected") → go to notify_rejection needs_review: manual_review failure: flo.FailedThe concurrency setting controls how many tasks a worker processes in parallel:
| SDK | Mechanism | Default |
|---|---|---|
| Go | Goroutines behind a semaphore channel | 10 |
| Python | asyncio tasks with semaphore | 10 |
| JavaScript | Promise concurrency with semaphore | 10 |
| Zig | Sequential (single-threaded polling loop) | 10 |
The worker reports its current_load (number of in-flight tasks) in each heartbeat so the server can make load-aware routing decisions.
Stream Workers
Section titled “Stream Workers”Stream workers consume records from a stream via consumer groups. They automatically join the group on start, poll for batches, execute the handler for each record, and ack/nack based on the outcome.
Quick Start
Section titled “Quick Start”client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp"))client.Connect()defer client.Close()
sw, _ := client.NewStreamWorker(flo.StreamWorkerOptions{ Stream: "events", Group: "processors", Concurrency: 5, BatchSize: 10,}, func(sctx *flo.StreamContext) error { var event map[string]interface{} sctx.Into(&event) fmt.Printf("Processing event: %v\n", event) return nil // auto-ack})defer sw.Close()
sw.Start(ctx) // blocks until context cancelled or drainclient = FloClient("localhost:9000", namespace="myapp")await client.connect()
worker = client.new_stream_worker( stream="events", group="processors", handler=process_event, concurrency=5, batch_size=10,)
async def process_event(ctx: StreamContext) -> None: event = ctx.json() print(f"Processing: {event}") # Return normally to auto-ack # Raise an exception to auto-nack (redelivery)
await worker.start()const client = new FloClient("localhost:9000", { namespace: "myapp" });await client.connect();
const worker = new StreamWorker( client, { stream: "events", group: "processors", concurrency: 5, batchSize: 10 }, async (ctx: StreamContext) => { const event = ctx.json<UserEvent>(); console.log(`Processing: ${event.type}`); // Return normally to auto-ack // Throw to auto-nack });
await worker.start();var sw = try flo.StreamWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .stream = "events", .group = "my-group", .concurrency = 5, .batch_size = 10,}, processRecord);defer sw.deinit();
try sw.start();
fn processRecord(ctx: *flo.StreamContext) anyerror!void { const payload = ctx.payload(); std.debug.print("Got: {s}\n", .{payload}); // Return normally to auto-ack // Return error to auto-nack}StreamContext
Section titled “StreamContext”| Method | Description |
|---|---|
json() / Into() | Deserialise the record payload from JSON |
payload() / Payload() | Raw record payload bytes |
streamId / StreamID() | The record’s stream ID (timestamp + sequence) |
headers() / Headers() | Record headers (key-value map) |
stream / Stream() | Stream name |
group / Group() | Consumer group name |
consumer / Consumer() | Consumer name within the group |
namespace / Namespace() | Namespace |
Ack / Nack Behaviour
Section titled “Ack / Nack Behaviour”| Handler Result | Record Action | Effect |
|---|---|---|
| Returns normally | Auto-ack | Record is marked as consumed |
| Returns error / throws | Auto-nack | Record is redelivered to the group |
| Panics (Go) | Auto-nack | Panic is recovered, record is redelivered |
Stream Worker Configuration
Section titled “Stream Worker Configuration”| Option | Default | Description |
|---|---|---|
stream | (required) | Stream to consume from |
group | "default" | Consumer group name |
consumer | (auto: worker_id) | Consumer name within the group |
worker_id | (auto: hostname-hex) | Unique worker identifier |
concurrency | 10 | Maximum parallel record handlers |
batch_size | 10 | Records to read per poll |
block_ms | 30,000 | Blocking read timeout (ms) |
message_timeout | 5 minutes | Maximum time for a handler to process one record |
machine_id | (auto: hostname) | Host identifier for dashboard grouping |
metadata | — | Optional JSON metadata |
Worker Registry
Section titled “Worker Registry”The worker registry is a per-shard in-memory table that tracks all connected workers. It is separate from the action dispatch system — its role is observability and health monitoring, not task routing.
What the Registry Tracks
Section titled “What the Registry Tracks”Each WorkerRecord stores:
| Field | Type | Description |
|---|---|---|
worker_id | string | Unique identifier |
worker_type | action / stream | What kind of tasks this worker processes |
status | enum | active, idle, draining, unhealthy |
namespace | string | Namespace the worker belongs to |
max_concurrency | u32 | Maximum parallel tasks |
current_load | u32 | Current in-flight task count (from heartbeats) |
tasks_completed | u64 | Lifetime completed count |
tasks_failed | u64 | Lifetime failed count |
registered_at | timestamp | When the worker first registered |
last_heartbeat | timestamp | Last heartbeat received |
machine_id | string? | Host identifier for grouping |
metadata | JSON? | Arbitrary key-value data |
processes | array | Per-action or per-stream tracking (see below) |
Process Tracking
Section titled “Process Tracking”Each worker registers a list of processes — the specific actions or stream/group pairs it handles. The server tracks per-process metrics:
| Field | Description |
|---|---|
name | Action name (e.g. "process-order") or stream/group (e.g. "events/processors") |
kind | action (0) or stream_consumer (1) |
run_count | Successful executions |
fail_count | Failed executions |
last_run_at | Timestamp of last execution |
Health Monitoring
Section titled “Health Monitoring”The server runs a periodic health check (on each shard tick). Workers that haven’t sent a heartbeat in 90 seconds are marked unhealthy. Unhealthy workers are not removed — they can recover by sending a heartbeat, which resets their status to active.
Limits
Section titled “Limits”- Maximum 10,000 workers per shard
- Heartbeat timeout: 90 seconds
- Default heartbeat interval: 30 seconds (SDK-side)
Configuration Reference
Section titled “Configuration Reference”Action Worker Options
Section titled “Action Worker Options”| Option | Go | Python | JS | Zig | Default |
|---|---|---|---|---|---|
| Worker ID | WorkerID | worker_id | workerId | worker_id | hostname-random |
| Machine ID | MachineID | machine_id | machineId | machine_id | hostname |
| Concurrency | Concurrency | concurrency | concurrency | concurrency | 10 |
| Action timeout | ActionTimeout | action_timeout | actionTimeoutMs | action_timeout_ms | 5 min |
| Block timeout | BlockMS | block_ms | blockMs | block_ms | 30,000 ms |
Stream Worker Options
Section titled “Stream Worker Options”| Option | Go | Python | JS | Zig | Default |
|---|---|---|---|---|---|
| Stream | Stream | stream | stream | stream | (required) |
| Group | Group | group | group | group | "default" |
| Consumer | Consumer | consumer | consumer | consumer | worker_id |
| Worker ID | WorkerID | worker_id | workerId | worker_id | hostname-random |
| Concurrency | Concurrency | concurrency | concurrency | concurrency | 10 |
| Batch size | BatchSize | batch_size | batchSize | batch_size | 10 |
| Block timeout | BlockMS | block_ms | blockMs | block_ms | 30,000 ms |
| Message timeout | MessageTimeout | — | — | — | 5 min |
Related
Section titled “Related”- Actions — Register and invoke durable task types
- Streams — Append-only ordered log with consumer groups
- Workflows — Multi-step orchestration using actions as steps
- Stream Processing — Declarative stream processing jobs