Skip to content

Streams

Flo streams are append-only, ordered logs of records backed by the Unified Append Log. Every append is Raft-replicated before being acknowledged. There is no separate stream projection — reading a stream is reading UAL entries directly, making it zero-copy and the fastest primitive.

Streams are the foundation of event sourcing, activity feeds, change data capture, and real-time data pipelines.

Every record in a stream is identified by a Stream ID — a composite <timestamp_ms>-<sequence> value:

  • timestamp_ms: Unix milliseconds when the record was appended
  • sequence: Counter within the same millisecond (0-indexed)
  • Example: 1703350800000-0, 1703350800000-1, 1703350801000-0

Stream IDs are monotonically increasing. If the system clock goes backward, the previous timestamp is reused and the sequence increments — guaranteeing ordering without coordination.

Special values:

  • 0-0 — the very beginning of the stream
  • $ — the current tail (only future records)

Streams live inside namespaces. The default namespace is used when no -n flag is provided. The same stream name in different namespaces is fully independent — separate records, separate consumer groups, separate metadata.

Terminal window
flo ns create staging
flo ns create production
flo stream append events '{"env":"staging"}' -n staging
flo stream append events '{"env":"prod"}' -n production
# Each namespace sees only its own records
flo stream read events --limit 10 -n staging
flo stream read events --limit 10 -n production
# Consumer groups are also namespace-scoped
flo stream group create events --group workers -n staging
flo stream group create events --group workers -n production

Streams can have multiple partitions for parallel throughput. When you create a stream with --partitions N, each partition is an independent ordered log.

Terminal window
# Create a 4-partition stream
flo stream create orders --partitions 4

Routing records to partitions:

  • Explicit partition: --partition 2 sends to partition 2 directly
  • Partition key: --partition-key alice hashes the key to a deterministic partition (Wyhash). All records with the same key land on the same partition, preserving per-key ordering.
  • No flag: defaults to partition 0
Terminal window
# Explicit partition
flo stream append orders '{"id":1}' --partition 2
# Partition key — all alice events on the same partition
flo stream append orders '{"user":"alice"}' --partition-key alice
flo stream append orders '{"user":"alice"}' --partition-key alice # same partition
# Different key may route to a different partition
flo stream append orders '{"user":"bob"}' --partition-key bob

Reading from partitions:

Terminal window
# Read only partition 1
flo stream read orders --partition 1
# Read using the same partition key (resolves to the same partition)
flo stream read orders --partition-key alice
# Bare read without --partition or --partition-key reads ALL partitions
flo stream read orders --limit 20

A bare stream read without partition flags returns records from all partitions merged together. This matches industry-standard behavior (Kafka, Redpanda, Pulsar).

Terminal window
# Single record
flo stream append events '{"type":"signup","user":"alice"}'
# Batch append (multiple payloads in one request)
flo stream append events 'event-a' 'event-b' 'event-c'
# With headers
flo stream append events '{"type":"login"}' -H source=web -H trace-id=abc123
# To a specific partition
flo stream append events '{"type":"login"}' --partition 2
# With partition key routing
flo stream append events '{"type":"login"}' --partition-key alice
# JSON output (returns the Stream ID)
flo stream append events 'hello' --json

The response includes the assigned Stream ID (e.g., 1703350800000-0).

Terminal window
# From the beginning
flo stream read events --start 0-0 --limit 10
# From a specific Stream ID
flo stream read events --start 1703350800000-0 --limit 50
# Bounded range (inclusive on both ends)
flo stream read events --start 1703350800000-0 --end 1703350815000-0
# JSON output (includes id, payload, tier, partition fields)
flo stream read events --limit 10 --json

The default limit is 10. Maximum is 1000 per request.

Blocking reads wait for new data instead of returning empty results immediately.

Terminal window
# Block up to 5 seconds for new records
flo stream read events --block 5000 --start 0-0 --limit 5
# Block forever (0 = infinite timeout)
flo stream read events --block 0 --start 0-0 --limit 5
# Block for new records from the tail only (skip existing data)
flo stream read events --block 5000 --start '$' --limit 5

If matching data already exists, the blocking read returns immediately. If no data arrives within the timeout, it returns an empty (but successful) result.

Follow mode is a continuous tail — it keeps reading as new records arrive, like tail -f.

Terminal window
# Follow from the beginning (prints existing + new records)
flo stream read events --follow --start 0-0
# Follow from the tail (new records only)
flo stream read events --follow --start '$'

--follow implies a blocking timeout and loops internally. It runs until interrupted.

Streams are auto-created on first append. Explicit creation is only needed when you want multiple partitions or custom retention up front.

Terminal window
flo stream create events # 1 partition (default)
flo stream create events --partitions 4 # 4 partitions
flo stream create events --retention 72 # 72-hour retention
Terminal window
flo stream info events
flo stream info events --json

Returns stream metadata: name, partition count, first/last Stream IDs, record count.

Terminal window
flo stream ls
flo stream ls --limit 50 --json

Remove old records to reclaim storage.

Terminal window
# Keep only the last 10,000 records
flo stream trim events --maxlen 10000
# Remove records before a specific Stream ID
flo stream trim events --before 1703350800000-0
# Remove records older than 1 hour
flo stream trim events --maxage 3600
# Trim to a byte budget
flo stream trim events --maxbytes 1073741824
# Preview what would be trimmed (no changes made)
flo stream trim events --maxlen 10000 --dry-run

Consumer groups allow multiple consumers to process a stream cooperatively. Each record is delivered to exactly one consumer in the group. The server tracks a Pending Entry List (PEL) — records that have been delivered but not yet acknowledged.

Terminal window
# Create a group explicitly (optional — group read auto-creates)
flo stream group create events --group processors
# Join a consumer to the group
flo stream group join events processors worker-1
# Read records (auto-creates group and joins consumer if needed)
flo stream group read events --group processors --consumer worker-1 --limit 10
# Acknowledge processed records
flo stream group ack events --group processors --consumer worker-1 \
--ids 1703350800000-0,1703350800000-1
# Leave the group
flo stream group leave events --group processors --consumer worker-1
# Delete the entire group
flo stream group delete events --group processors

Records delivered via group read are placed in the PEL. They must be acknowledged to advance the cursor. Unacknowledged records will be redelivered.

Terminal window
# Ack specific IDs (comma-separated)
flo stream group ack events --group processors --consumer worker-1 \
--ids 1703350800000-0,1703350800000-1
# Read with auto-ack — no PEL tracking (at-most-once delivery)
flo stream group read events --group processors --consumer worker-1 --no-ack

If processing fails, you can explicitly nack records to put them back in the delivery pool:

Terminal window
# Nack — make records available for redelivery immediately
flo stream group nack events --group processors --consumer worker-1 \
--ids 1703350800000-0
# Nack with delay — records become visible again after 5 seconds
flo stream group nack events --group processors --consumer worker-1 \
--ids 1703350800000-0 --delay 5000

The group can also auto-redeliver if the consumer doesn’t ack within a deadline:

Terminal window
# Auto-redeliver after 30 seconds of no ack
flo stream group create events --group processors --ack-timeout 30000
# Give up after 5 delivery attempts (records go to DLQ)
flo stream group create events --group processors --max-deliver 5

If a consumer needs more time to process a record, it can extend the ack deadline:

Terminal window
# Extend the deadline by 60 seconds
flo stream group touch events --group processors --consumer worker-1 \
--ids 1703350800000-0 --extend 60000
Terminal window
# List all pending (delivered but unacked) records for a group
flo stream group pending events --group processors
# Group metadata (consumer count, PEL size, last delivered ID)
flo stream group info events --group processors

Groups support three delivery modes, set at creation time:

ModeBehavior
shared (default)Records distributed round-robin across all consumers
exclusiveOne active consumer holds the lease; others are standbys
key_sharedRecords with the same key always go to the same consumer
Terminal window
# Shared mode (default)
flo stream group create events --group processors --mode shared
# Exclusive — one active consumer, others wait
flo stream group create events --group processors --mode exclusive
# Singleton — exclusive with zero standbys allowed
flo stream group create events --group processors --mode exclusive --max-standbys 0
# Key-shared — partition by key across consumers
flo stream group create events --group processors --mode key_shared --slots 16
client := flo.NewClient("localhost:9000")
client.Connect()
defer client.Close()
// Append
result, _ := client.Stream.Append("events", []byte(`{"event":"signup"}`), nil)
// Read from beginning
records, _ := client.Stream.Read("events", &flo.StreamReadOptions{
Start: &flo.StreamID{TimestampMS: 0, Sequence: 0},
Count: ptr(uint32(10)),
})
for _, rec := range records.Records {
fmt.Printf("id=%d-%d: %s\n", rec.ID.TimestampMS, rec.ID.Sequence, rec.Payload)
}
// Blocking read
records, _ = client.Stream.Read("events", &flo.StreamReadOptions{
Tail: true,
BlockMS: ptr(uint32(5000)),
Count: ptr(uint32(10)),
})
// Consumer group
records, _ = client.Stream.GroupRead("events", "processors", "worker-1",
&flo.StreamGroupReadOptions{
Count: ptr(uint32(10)),
BlockMS: ptr(uint32(30000)),
},
)
// ... process records ...
ids := make([]flo.StreamID, len(records.Records))
for i, rec := range records.Records {
ids[i] = rec.ID
}
client.Stream.GroupAck("events", "processors", ids, &flo.StreamGroupAckOptions{
Consumer: "worker-1",
})
ModeCLI FlagBehavior
From beginning--start 0-0Returns all records in order
From ID--start <id>Returns records starting at (or after) the given Stream ID
Range--start <id> --end <id>Bounded range, inclusive on both ends
Tail--start '$'Only records appended after the read begins
Blocking--block <ms>Waits up to <ms> for new data (0 = forever)
Follow--followContinuous tail — keeps reading as new records arrive
Partition--partition <n>Reads from a specific partition only
Partition key--partition-key <key>Reads from the partition that key hashes to
JSON output--jsonMachine-readable output with id, payload, tier, partition fields

Stream data flows through a tiered storage hierarchy automatically:

TierLocationPurpose
HotIn-memory ring bufferRecent writes, lowest latency
WarmOn-disk segmentsOverflowed from hot tier, still fast
ColdExternal backend (file, S3, Azure)Archival storage

When you read with --json, each record includes a "tier" field indicating where it was served from. Reads are transparent — the server merges results across tiers automatically.

The thresholds for spilling between tiers are configurable per server (hot buffer capacity, max hot entries). Cold storage can be enabled with file or cloud backends.

  • All appends are Raft-replicated before acknowledgment
  • Stream data, consumer group offsets, and pending state all survive server restarts
  • After restart, new appends continue with monotonically increasing Stream IDs (no duplicates, no gaps)
  • Immediate consistency — a read right after restart returns all pre-restart data

Stream records are UAL entries with opcode = stream_append. Reading a stream is reading a range of UAL entries filtered by stream name hash — there is no separate data structure.

  • Record IDs are derived from append wall-clock time plus sequence
  • Zero-copy reads — no derived state, no projection overhead
  • Consumer group state (cursors, PEL, mode) is managed per-shard
  • Partition routing uses Wyhash on the partition key or explicit index
  • Blocking reads register waiters that are notified on append

See Storage Internals for details.