Skip to content

Workers

Workers are long-running processes that connect to Flo and execute tasks. There are two kinds:

  • Action workers — pull action tasks from a queue (long-polling), execute a handler, and report results. Used for durable task processing with retries, timeouts, and dead-letter routing.
  • Stream workers — consume records from a stream via consumer groups, process each record, and auto-ack or nack. Used for continuous event processing.

Both worker types register themselves in the worker registry, send periodic heartbeats, report load metrics, and support graceful drain. The server tracks each worker’s health and can mark stale workers as unhealthy after 90 seconds of silence.

TypePurposeTask SourceCompletion
ActionDurable task executionaction_await long-pollExplicit complete / fail
StreamContinuous event processinggroup_read on streamAuto-ack on success, auto-nack on error

Both types share the same registration, heartbeat, drain, and health-monitoring infrastructure.

Register ──▸ Heartbeat loop ──▸ Process tasks ──▸ Drain ──▸ Deregister
│ │ │ │ │
│ every 30s await/poll sticky cleanup
│ reports load execute no new remove
│ │ complete/fail tasks record
▼ ▼ ▼ ▼ ▼
active active active/idle draining (removed)

The worker announces itself to the server with:

FieldDescription
worker_idUnique identifier (auto-generated from hostname + random hex if not provided)
worker_typeaction (0) or stream (1)
max_concurrencyMaximum parallel tasks (default: 10)
processesList of action names or stream/group pairs this worker handles
metadataOptional JSON string (labels, version, environment)
machine_idOptional host identifier for grouping workers on the same machine

Re-registering the same worker ID replaces the previous registration.

Workers send a heartbeat every 30 seconds (configurable). The heartbeat carries the worker’s current_load (number of in-flight tasks). The server responds with the worker’s current status — if the server returns draining, the SDK stops accepting new tasks.

If a worker misses heartbeats for 90 seconds, the server marks it as unhealthy.

Action workers long-poll with action_await, receive a TaskAssignment (task ID, action name, payload, attempt number), execute the registered handler, then call complete (with result bytes) or fail (with error message + optional retry flag).

Stream workers call group_read on a consumer group, receive a batch of records, process each one through the handler, and auto-ack on success or auto-nack on error.

Drain is a graceful shutdown mechanism. When a worker is drained:

  • The server sets its status to draining (sticky — it never reverts to active)
  • Subsequent heartbeats return draining so the SDK knows to stop polling
  • In-flight tasks continue to completion
  • No new tasks are dispatched to the worker
  • Once all active tasks finish, the worker deregisters

Drain can be initiated by the worker itself or externally via CLI/API.

Removes the worker from the registry entirely. SDKs call deregister automatically on shutdown (in defer / finally blocks).

StatusValueDescription
active0Healthy, processing or ready for tasks
idle1Connected, no current tasks
draining2Finishing current work, accepting nothing new
unhealthy3No heartbeat for 90+ seconds

Status transitions:

┌── heartbeat ──┐
▼ │
register ──▸ active ◀────────────┘
├── drain ──▸ draining ──▸ deregister
└── 90s no heartbeat ──▸ unhealthy
└── heartbeat ──▸ active

Action workers poll for tasks, execute handlers, and report outcomes. All four SDKs provide a high-level ActionWorker that handles registration, heartbeats, concurrency control, and drain automatically.

package main
import (
"context"
"fmt"
"os/signal"
"syscall"
flo "github.com/razorbill/flo-go"
)
func main() {
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()
client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp"))
client.Connect()
defer client.Close()
w, _ := client.NewActionWorker(flo.ActionWorkerOptions{
Concurrency: 10,
})
defer w.Close()
w.MustRegisterAction("process-order", func(actx *flo.ActionContext) ([]byte, error) {
var order map[string]interface{}
actx.Into(&order)
// Process the order...
return actx.Bytes(map[string]string{"status": "done"})
})
w.MustRegisterAction("send-email", func(actx *flo.ActionContext) ([]byte, error) {
// Send the email...
return nil, nil
})
// Blocks until context is cancelled or drain completes
w.Start(ctx)
}

Every action handler receives an ActionContext with helpers for parsing input and formatting output:

MethodDescription
json() / Into()Deserialise the input payload from JSON
toBytes() / Bytes()Serialise a result value to JSON bytes
touch(extendMs)Extend the task lease (prevents timeout during long work)
taskId / task_idUnique task identifier
actionName / action_nameThe action that was invoked
attemptAttempt number (starts at 1, increments on retry)
createdAt / created_atTimestamp when the task was created

For long-running tasks, periodically call touch() to extend the lease and prevent the server from reassigning the task to another worker:

w.MustRegisterAction("generate-report", func(actx *flo.ActionContext) ([]byte, error) {
// For long tasks, extend the lease periodically
for i := 0; i < totalSteps; i++ {
processStep(i)
if (i+1)%3 == 0 {
actx.Touch(30000) // extend by 30s
}
}
return actx.Bytes(result)
})

When a handler returns an error, the SDK automatically reports a failure with retry: true. The server re-queues the task for another attempt (up to the action’s max retry count). Specific error types change the behaviour:

ConditionRetryNotes
Handler returns errorYesAuto-retry with exponential backoff
Handler panics / throwsYesSDK catches panics, reports failure
NonRetryableError / NewNonRetryableErrorfNoImmediately failed — no retries, no DLQ
Context deadline exceededNoTask is permanently failed
Context cancelled (shutdown)NoTask is permanently failed
Explicit fail(..., retry: false)NoMoves to dead-letter queue

Use NonRetryableError when the task has definitively failed and retrying would be pointless (invalid input, business rule violations, permanent service rejections). The task is immediately marked failed — no retry delay, no backoff, no dead-letter queue.

import { NonRetryableError } from "@floruntime/node";
worker.action("validate-order", async (ctx) => {
const { orderId, amount } = ctx.json<{ orderId?: string; amount?: number }>();
if (!orderId) {
throw new NonRetryableError("missing orderId");
}
if ((amount ?? 0) > 2000) {
throw new NonRetryableError(`order amount $${amount} exceeds $2000 limit`);
}
return ctx.toBytes({ valid: true, orderId });
});

When a workflow step handler throws NonRetryableError, the workflow run is routed through the failure transition without waiting for retries.

By default, a returning handler produces the success outcome. To route a workflow down a specific named transition branch, use ctx.result(outcomeName, data). This is used for outcome-based routing — where the same step can branch to different next steps depending on the business decision.

import { ActionResult } from "@floruntime/node";
worker.action("review-order", async (ctx): Promise<ActionResult> => {
const { orderId, amount } = ctx.json<{ orderId: string; amount: number }>();
if (amount < 100) {
// Workflow follows "approved" transition
return ctx.result("approved", { orderId, decision: "auto-approved" });
} else if (amount >= 500) {
// Workflow follows "rejected" transition
return ctx.result("rejected", { orderId, reason: "amount too high" });
} else {
// Workflow follows "needs_review" transition
return ctx.result("needs_review", { orderId, note: "manual review required" });
}
});

The workflow YAML declares the named outcome transitions:

start:
run: "@actions/review-order"
transitions:
approved: fulfill # ctx.result("approved") → go to fulfill step
rejected: notify_rejection # ctx.result("rejected") → go to notify_rejection
needs_review: manual_review
failure: flo.Failed

The concurrency setting controls how many tasks a worker processes in parallel:

SDKMechanismDefault
GoGoroutines behind a semaphore channel10
Pythonasyncio tasks with semaphore10
JavaScriptPromise concurrency with semaphore10
ZigSequential (single-threaded polling loop)10

The worker reports its current_load (number of in-flight tasks) in each heartbeat so the server can make load-aware routing decisions.

Stream workers consume records from a stream via consumer groups. They automatically join the group on start, poll for batches, execute the handler for each record, and ack/nack based on the outcome.

client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp"))
client.Connect()
defer client.Close()
sw, _ := client.NewStreamWorker(flo.StreamWorkerOptions{
Stream: "events",
Group: "processors",
Concurrency: 5,
BatchSize: 10,
}, func(sctx *flo.StreamContext) error {
var event map[string]interface{}
sctx.Into(&event)
fmt.Printf("Processing event: %v\n", event)
return nil // auto-ack
})
defer sw.Close()
sw.Start(ctx) // blocks until context cancelled or drain
MethodDescription
json() / Into()Deserialise the record payload from JSON
payload() / Payload()Raw record payload bytes
streamId / StreamID()The record’s stream ID (timestamp + sequence)
headers() / Headers()Record headers (key-value map)
stream / Stream()Stream name
group / Group()Consumer group name
consumer / Consumer()Consumer name within the group
namespace / Namespace()Namespace
Handler ResultRecord ActionEffect
Returns normallyAuto-ackRecord is marked as consumed
Returns error / throwsAuto-nackRecord is redelivered to the group
Panics (Go)Auto-nackPanic is recovered, record is redelivered
OptionDefaultDescription
stream(required)Stream to consume from
group"default"Consumer group name
consumer(auto: worker_id)Consumer name within the group
worker_id(auto: hostname-hex)Unique worker identifier
concurrency10Maximum parallel record handlers
batch_size10Records to read per poll
block_ms30,000Blocking read timeout (ms)
message_timeout5 minutesMaximum time for a handler to process one record
machine_id(auto: hostname)Host identifier for dashboard grouping
metadataOptional JSON metadata

The worker registry is a per-shard in-memory table that tracks all connected workers. It is separate from the action dispatch system — its role is observability and health monitoring, not task routing.

Each WorkerRecord stores:

FieldTypeDescription
worker_idstringUnique identifier
worker_typeaction / streamWhat kind of tasks this worker processes
statusenumactive, idle, draining, unhealthy
namespacestringNamespace the worker belongs to
max_concurrencyu32Maximum parallel tasks
current_loadu32Current in-flight task count (from heartbeats)
tasks_completedu64Lifetime completed count
tasks_failedu64Lifetime failed count
registered_attimestampWhen the worker first registered
last_heartbeattimestampLast heartbeat received
machine_idstring?Host identifier for grouping
metadataJSON?Arbitrary key-value data
processesarrayPer-action or per-stream tracking (see below)

Each worker registers a list of processes — the specific actions or stream/group pairs it handles. The server tracks per-process metrics:

FieldDescription
nameAction name (e.g. "process-order") or stream/group (e.g. "events/processors")
kindaction (0) or stream_consumer (1)
run_countSuccessful executions
fail_countFailed executions
last_run_atTimestamp of last execution

The server runs a periodic health check (on each shard tick). Workers that haven’t sent a heartbeat in 90 seconds are marked unhealthy. Unhealthy workers are not removed — they can recover by sending a heartbeat, which resets their status to active.

  • Maximum 10,000 workers per shard
  • Heartbeat timeout: 90 seconds
  • Default heartbeat interval: 30 seconds (SDK-side)
OptionGoPythonJSZigDefault
Worker IDWorkerIDworker_idworkerIdworker_idhostname-random
Machine IDMachineIDmachine_idmachineIdmachine_idhostname
ConcurrencyConcurrencyconcurrencyconcurrencyconcurrency10
Action timeoutActionTimeoutaction_timeoutactionTimeoutMsaction_timeout_ms5 min
Block timeoutBlockMSblock_msblockMsblock_ms30,000 ms
OptionGoPythonJSZigDefault
StreamStreamstreamstreamstream(required)
GroupGroupgroupgroupgroup"default"
ConsumerConsumerconsumerconsumerconsumerworker_id
Worker IDWorkerIDworker_idworkerIdworker_idhostname-random
ConcurrencyConcurrencyconcurrencyconcurrencyconcurrency10
Batch sizeBatchSizebatch_sizebatchSizebatch_size10
Block timeoutBlockMSblock_msblockMsblock_ms30,000 ms
Message timeoutMessageTimeout5 min
  • Actions — Register and invoke durable task types
  • Streams — Append-only ordered log with consumer groups
  • Workflows — Multi-step orchestration using actions as steps
  • Stream Processing — Declarative stream processing jobs