Skip to content

Stream Processing

Flo ships a built-in stream processing engine inspired by Apache Flink. Jobs are defined in YAML, submitted via the CLI or SDKs, and executed entirely inside the Flo node — no external cluster required. The engine reads from Flo streams or time-series measurements, transforms records through a chain of operators, and writes results to stream, time-series, KV, or queue sinks.

Traditional architectures pair a storage layer (Kafka, Redis) with a separate processing framework (Flink, Spark Streaming). This means two clusters to deploy, two failure domains to monitor, and serialization overhead at every boundary.

Flo eliminates that gap:

  • Zero-copy reads — source records are read directly from the Unified Append Log, no network hop
  • Thread-per-shard execution — each pipeline runs on the shard that owns the source partition, no cross-thread coordination
  • Integrated state — operators can read and write Flo KV directly (e.g., kv_lookup), no external state store
  • Single deployment — one binary, one config, one monitoring surface

A job is a long-running processing pipeline defined by a YAML file. Each job specifies:

FieldDescription
kindAlways Processing
nameHuman-readable job name
namespaceNamespace scope (default: "default")
parallelismNumber of parallel task slots (default: 1)
batch_sizeRecords to process per tick (default: 100)
sourcesWhere to read data from
sinksWhere to write results to
operatorsTransformation chain applied to every record

The fundamental data unit flowing through a pipeline is a ProcessingRecord:

  • key — optional partitioning key (empty = broadcast)
  • value — payload bytes (typically JSON)
  • event_time_ms — event timestamp used for windowing and watermarks
  • source — origin stream name, partition, and offset for checkpoint tracking
  • headers — optional key-value metadata

Pipelines are linear chains: source → operator₁ → operator₂ → … → sink. All operators in a chain execute on the same shard thread with zero-copy record passing (fused execution, mirroring Flink’s operator chaining).

┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐
│ Source │───▶│ Filter │───▶│ KeyBy │───▶│ Aggregate│───▶│ Sink │
│ (stream) │ │ │ │ │ │ │ │ (stream) │
└──────────┘ └──────────┘ └──────────┘ └──────────┘ └──────────┘

Server-side job definitions use nested YAML or JSON. The examples below use YAML.

The simplest pipeline: read a stream, write to another stream, no transformation:

kind: Processing
name: user-events-pipe
namespace: analytics
parallelism: 1
batch_size: 100
sources:
- name: input
stream:
name: payment-events
namespace: analytics
sinks:
- name: output
stream:
name: enriched-events
namespace: analytics

The job continuously polls payment-events and appends every record to enriched-events.

kind: Processing
name: order-analytics
namespace: production
parallelism: 4
batch_size: 200
sources:
- name: orders
stream:
name: raw-orders
namespace: production
batch_size: 500
sinks:
- name: output
stream:
name: order-totals
namespace: production
operators:
- type: filter
name: completed-only
condition: "value_contains:completed"
- type: keyby
name: by-customer
key_expression: "$.customer_id"
- type: aggregate
name: hourly-total
function: sum
field: "$.amount"
window: tumbling
window_size: 3600

Sources define where the pipeline reads data. Two source kinds are supported: stream and time-series (ts).

Reads records from a Flo stream, tracking offsets for exactly-once semantics:

sources:
- name: events
stream:
name: click-events
namespace: production
partitions: 0 # single partition (integer)
batch_size: 500 # per-source override
SyntaxBehavior
partitions: 0Read partition 0 only
partitions: "0-63"Expand into one source per partition (64 sources)
partitions: "0,3,7"Explicit partition list
partitions: "all"Resolved at runtime from stream metadata
(omitted)Defaults to partition 0

Polls a TS measurement at a configurable interval, emitting data points as JSON records:

sources:
- name: cpu-metrics
ts:
measurement: cpu_usage
namespace: production
field: value # field to read (default: "value")
tags: # optional tag filter
host: web-01
poll_interval_ms: 1000 # polling interval (default: 1000)

Each emitted record looks like:

{"measurement": "cpu_usage", "value": 72.5, "timestamp_ms": 1700000000000}

Multiple sources are polled round-robin into the same pipeline:

sources:
- name: clicks
stream:
name: click-events
- name: checkouts
stream:
name: checkout-events
sinks:
- name: merged
stream:
name: unified-events

Sinks define where the pipeline writes results. Four sink kinds are supported.

Appends records to a Flo stream:

sinks:
- name: output
stream:
name: enriched-events
namespace: production

Extracts tags and numeric fields from JSON record payloads and writes them as time-series data points:

sinks:
- name: cpu-ts
ts:
measurement: proc_cpu_metrics
namespace: metrics
tags:
host: hostname # TS tag "host" ← JSON field "hostname"
fields:
cpu: cpu_percent # TS field "cpu" ← JSON field "cpu_percent"

Given the input {"hostname":"web-01","cpu_percent":72.5}, the sink writes a point to proc_cpu_metrics with tag host=web-01 and field cpu=72.5.

If the JSON payload contains a single scalar field to use as the value, use value_field instead of a full fields: map:

sinks:
- name: temp
ts:
measurement: proc_temp
value_field: temperature
tags:
sensor: sensor

Writes records to Flo KV storage:

sinks:
- name: profiles
kv:
namespace: profiles
key_prefix: user # prepended to record key
separator: ":" # between prefix and key (default: ":")
write_mode: upsert # upsert | if_absent | versioned
ttl_ms: 86400000 # optional TTL

Enqueues records into a Flo queue:

sinks:
- name: tasks
queue:
name: task-queue
namespace: default
priority: 5
delay_ms: 1000 # visibility delay

Sinks can subscribe to tagged records instead of receiving the full pipeline output. Tags are set by the classify operator and matched with AND logic — a record reaches a sink only when it carries all of the sink’s declared match tags.

Up to 32 distinct tag names per pipeline.

operators:
- type: classify
name: route-events
rules:
- condition: "json:level=error"
tag: errors
- condition: "json:$.latency_ms>5000"
tag: slow
sinks:
- name: main
stream:
name: results
- name: late-events
stream:
name: late-data
match: [late]
- name: failures
queue:
name: dead-letter
match: [errors]
- name: slow-errors
stream:
name: slow-error-stream
match: [errors, slow] # AND — must have BOTH tags

Sinks without match receive all records from the main operator chain. A single match: value is also accepted as sugar: match: late is equivalent to match: [late].

Operators are the transformation steps between source and sink. They are declared in the operators: array and execute in order. Flo includes eight built-in operator types.

Drops records that don’t match a condition. Supported expressions:

Record-level:

ExpressionMatches when…
value_contains:<substring>Record value contains the substring
key_contains:<substring>Record key contains the substring
key_equals:<exact>Record key equals the exact string
key_prefix:<prefix>Record key starts with the prefix
value_prefix:<prefix>Record value starts with the prefix
not_emptyRecord value is non-empty
key_not_emptyRecord key is non-empty
min_length:<n>Record value length ≥ n

JSON field — json:<path><op><value>:

ExpressionMatches when…
json:<path>=<value>Field equals value
json:<path>!=<value>Field does not equal value
json:<path>^=<value>Field starts with value (prefix)
json:<path>*=<value>Field contains value (substring)
json:<path>!^=<value>Field does not start with value
json:<path>!*=<value>Field does not contain value
json:<path>><value>Field > value (numeric)
json:<path>>=<value>Field >= value (numeric)
json:<path><<value>Field < value (numeric)
json:<path><=<value>Field <= value (numeric)
operators:
- type: filter
name: keep-important
condition: "value_contains:important"

Re-keys records by extracting a field from the JSON value using JSONPath expressions. Downstream operators like aggregate group by this key.

operators:
- type: keyby
name: by-user
key_expression: "$.user_id"

Supports nested paths: "$.metadata.region".

If the value isn’t valid JSON or the field doesn’t exist, the record passes through with its original key.

A 1:1 transformation that restructures JSON records by projecting, renaming, and adding fields:

operators:
- type: map
name: extract-user
user_id: "$.data.user_id" # JSONPath extraction
amount: "$.transaction.amount" # nested field
source: "payment-service" # constant string

Rules:

  • Values starting with $. are JSONPath extractions from the input
  • All other values are treated as string constants
  • Output is always a JSON object with the declared fields

A 1:N transformation that explodes a JSON array into individual records:

operators:
- type: flatmap
name: explode-items
array_field: "$.order.items"
element_key: "$.sku" # optional: extract key from each element

Each element of the array at $.order.items becomes a separate downstream record. If the field is missing or not an array, the record is dropped.

Accumulates numeric values extracted from JSON records. Records are grouped by key — chain a keyby operator before aggregate for custom grouping.

Functions: sum, count, avg, min, max

Window modes:

ModeConfigBehavior
Running(no window config)Emits updated aggregate on every record
Tumblingwindow: tumbling, window_size: <seconds>Emits when window closes (on watermark)
Count-basedwindow: count, window_size: <n>Emits after every N records per key

Output format: {"value": <result>, "count": <n>}

operators:
# Running total (emits on every record)
- type: aggregate
name: running-total
function: sum
field: "$.amount"
# Count per key in 60-second tumbling windows
- type: aggregate
name: event-counter
function: count
window: tumbling
window_size: 60
# Average every 100 records per key
- type: aggregate
name: batch-avg
function: avg
field: "$.latency_ms"
window: count
window_size: 100

kv_lookup — KV-Backed Enrichment & Filtering

Section titled “kv_lookup — KV-Backed Enrichment & Filtering”

Looks up keys in Flo KV storage to filter or enrich streaming records. The lookup key is constructed from a template that interpolates JSON fields from the record.

Template syntax: Mix literal text with ${<jsonpath>} placeholders:

TemplateResolved key (for {"account_id":"alice"})
"account:${$.account_id}"account:alice
"${$.account_id}"alice
"user.${$.org}.${$.user_id}"user.acme.u1

Modes:

ModeBehavior
filter (default)Drop the record if the constructed key is not found in KV
enrichAttach the KV value to the record as a JSON field; records with no match pass through unchanged
operators:
- type: kv_lookup
name: check-account
lookup_key: "account:${$.account_id}"
namespace: default
mode: filter

Enrich example — adds a "tier" field to each record:

operators:
- type: kv_lookup
name: enrich-user
lookup_key: "user:${$.user_id}"
namespace: default
mode: enrich
enrich_field: tier

Evaluates rules against each record and sets tag bits for matching conditions. Records continue flowing — classify never drops or forks, it only labels. Downstream sinks filter by AND-matching their tags list against the record’s tag bitfield.

Rules use the same condition expressions as filter.

operators:
- type: classify
name: route-payments
default_tag: unmatched # optional: tag when no rules match
rules:
- condition: "json:type^=payment"
tag: payments
- condition: "json:type*=transfer"
tag: transfers
- condition: "json:amount>10000"
tag: high-value

A single record can match multiple rules and carry multiple tags. This enables multi-way routing: a payments sink, a high-value sink, and a high-value, payments sink can all coexist.

Records that match no rules pass through untagged by default. Set default_tag to route unmatched records to a specific sink.

Emits every record unchanged. Useful for testing and as a pipeline placeholder:

operators:
- type: passthrough
name: debug-tap

Operators execute in declaration order. Records flow through each operator sequentially:

operators:
- type: filter
name: completed-only
condition: "value_contains:completed"
- type: map
name: extract-fields
order_id: "$.order.id"
total: "$.order.total"
- type: keyby
name: by-customer
key_expression: "$.customer_id"
- type: aggregate
name: daily-total
function: sum
field: "$.total"
window: tumbling
window_size: 86400

The processing engine supports event-time windowing for time-based aggregations.

TypeDescription
TumblingFixed-size, non-overlapping time windows. E.g., every 60 seconds.
SlidingFixed-size windows that overlap by a slide interval.
CountWindows defined by element count, not time.
SessionDynamic, gap-based windows per key. A session extends as long as records arrive within the gap.
GlobalSingle window for all elements (useful with custom triggers).

Triggers decide when a window fires (evaluates its function and emits results):

TriggerFires when…
Event-time (default)Watermark passes the window end
Processing-timeWall clock reaches the window end
CountElement count reaches a threshold
ContinuousAt regular intervals

Watermarks signal event-time progress through the pipeline. They propagate the guarantee that no record with an earlier timestamp will arrive. The engine tracks watermarks across multiple inputs and advances them based on the minimum across all sources.

Records that arrive after the watermark has passed their window are classified as:

  1. On-time — within the expected window
  2. Late — past the watermark but within the allowed lateness (re-fires window)
  3. Dropped — too late, beyond the allowed lateness threshold

Late records can be tagged (via a classify rule) and routed to a dedicated sink for monitoring or reprocessing.

Stateful operators (like aggregate) access per-key state scoped to the job and operator instance. Three state primitives are available:

PrimitiveDescription
ValueStateSingle value per key
ListStateAppend-only list per key
MapStateSub-key → value map per key

State is stored in the per-shard state backend with automatic key namespacing:

proc:{job_id}:{operator_id}:kv:{user_key} — ValueState
proc:{job_id}:{operator_id}:list:{user_key}:{idx} — ListState
proc:{job_id}:{operator_id}:map:{user_key}:{mk} — MapState

No locks are needed — single-threaded shard execution guarantees isolation.

The engine implements Chandy-Lamport distributed snapshots for fault tolerance:

  1. Trigger — coordinator assigns a new checkpoint_id
  2. Barrier injectionCheckpointBarrier is injected into sources
  3. Source snapshot — sources save their current offsets
  4. Operator snapshot — each operator snapshots its state, forwards the barrier
  5. Sink acknowledgment — sinks confirm receipt of the barrier
  6. Completion — all acks received, checkpoint is marked complete

Checkpoint metadata is persisted internally under _proc: keys in KV storage.

Savepoints are user-triggered snapshots of a job’s state:

Terminal window
flo processing savepoint job-1 -n analytics

A savepoint captures:

  • The job binding and configuration
  • Creation timestamp
  • records_processed at the savepoint time

You can restore a job from a savepoint:

Terminal window
flo processing restore job-1 sp-1 -n analytics

The simplest pipeline — data flows unmodified from source to sink:

kind: Processing
name: mirror
sources:
- stream: { name: events }
sinks:
- stream: { name: events-copy }

Stream to Time-Series (Metrics Extraction)

Section titled “Stream to Time-Series (Metrics Extraction)”

Convert JSON event streams into queryable time-series data:

kind: Processing
name: cpu-pipeline
namespace: metrics
sources:
- stream: { name: cpu-json }
sinks:
- ts:
measurement: proc_cpu_metrics
tags:
host: hostname
fields:
cpu: cpu_percent

Poll TS measurements and produce JSON records for downstream consumers:

kind: Processing
name: ts-to-alerts
namespace: metrics
sources:
- ts:
measurement: cpu_usage
field: value
tags:
host: web-01
sinks:
- stream: { name: cpu-alerts }

Time-Series to Time-Series (Derived Metrics)

Section titled “Time-Series to Time-Series (Derived Metrics)”

Build derived metrics from raw measurements:

kind: Processing
name: derive-temperature
namespace: metrics
sources:
- ts:
measurement: raw_temperature
field: value
sinks:
- ts:
measurement: derived_temp
value_field: value

Filter or enrich events using data stored in Flo KV:

kind: Processing
name: account-filter
namespace: production
sources:
- stream: { name: transactions }
sinks:
- stream: { name: valid-transactions }
operators:
- type: kv_lookup
name: check-account
lookup_key: "account:${$.account_id}"
namespace: production
mode: filter
Terminal window
flo processing submit pipeline.yaml -n analytics
# Output: Job submitted: job-1

On submission, the job enters the RUNNING state and begins polling its sources immediately.

Terminal window
flo processing status job-1 -n analytics

Returns JSON:

{
"job_id": "job-1",
"name": "order-analytics",
"status": "RUNNING",
"parallelism": 4,
"batch_size": 200,
"records_processed": 15420,
"created_at_ms": 1700000000000
}
Terminal window
flo processing list -n analytics

Stops the job gracefully. The job can be restored later:

Terminal window
flo processing stop job-1 -n analytics

Cancels the job permanently:

Terminal window
flo processing cancel job-1 -n analytics
StateDescription
RUNNINGActively processing records
STOPPEDGracefully halted, can be restored
CANCELLEDPermanently terminated
FAILEDTerminated due to an error
COMPLETEDFinished (for bounded sources)
Terminal window
# Create a savepoint
flo processing savepoint job-1 -n analytics
# Restore from a savepoint
flo processing restore job-1 sp-1 -n analytics

Change the parallelism of a running job:

Terminal window
flo processing rescale job-1 8 -n analytics

Jobs are scoped to namespaces, providing complete isolation:

  • The same job name in different namespaces are independent jobs
  • list returns only jobs in the specified namespace
  • Pipeline data flowsonly within the namespace scope
  • Source streams and sink targets are resolved within the job’s namespace
Terminal window
# These are two separate, independent jobs
flo processing submit job.yaml -n production
flo processing submit job.yaml -n staging

Job state survives node restarts:

  • Submitted jobs persist and resume on restart
  • Stopped and cancelled job states are preserved
  • Rescaled parallelism is remembered
  • New job IDs don’t collide with pre-restart IDs

All mutations (submit, stop, cancel, status updates) are persisted through Raft and replayed on startup.

The engine collects per-operator and per-pipeline metrics:

MetricDescription
records_inRecords processed by each operator
records_outRecords emitted by each operator
processing_time_nsCumulative processing time
errorsError count
throughputRecords per second (rolling)
latencyProcessing latency histogram (buckets: <1ms, <5ms, <10ms, <50ms, <100ms, <500ms, 500ms+)
backpressureBackpressure gauge (0.0–1.0)
  • Streams — source and sink behavior for stream-backed pipelines
  • Time-Series — TS query and write semantics
  • KV — KV storage used by kv_lookup operator and KV sinks
  • Queues — queue sinks for task pipelines
  • Actions — WASM action registration and invocation
  • Workflows — multi-step orchestration over actions