Streams
Flo streams are append-only, ordered logs of records backed by the Unified Append Log. Every append is Raft-replicated before being acknowledged. There is no separate stream projection — reading a stream is reading UAL entries directly, making it zero-copy and the fastest primitive.
Streams are the foundation of event sourcing, activity feeds, change data capture, and real-time data pipelines.
Core Concepts
Section titled “Core Concepts”Stream IDs
Section titled “Stream IDs”Every record in a stream is identified by a Stream ID — a composite <timestamp_ms>-<sequence> value:
- timestamp_ms: Unix milliseconds when the record was appended
- sequence: Counter within the same millisecond (0-indexed)
- Example:
1703350800000-0,1703350800000-1,1703350801000-0
Stream IDs are monotonically increasing. If the system clock goes backward, the previous timestamp is reused and the sequence increments — guaranteeing ordering without coordination.
Special values:
0-0— the very beginning of the stream$— the current tail (only future records)
Namespace Isolation
Section titled “Namespace Isolation”Streams live inside namespaces. The default namespace is used when no -n flag is provided. The same stream name in different namespaces is fully independent — separate records, separate consumer groups, separate metadata.
flo ns create stagingflo ns create production
flo stream append events '{"env":"staging"}' -n stagingflo stream append events '{"env":"prod"}' -n production
# Each namespace sees only its own recordsflo stream read events --limit 10 -n stagingflo stream read events --limit 10 -n production
# Consumer groups are also namespace-scopedflo stream group create events --group workers -n stagingflo stream group create events --group workers -n productionPartitioning
Section titled “Partitioning”Streams can have multiple partitions for parallel throughput. When you create a stream with --partitions N, each partition is an independent ordered log.
# Create a 4-partition streamflo stream create orders --partitions 4Routing records to partitions:
- Explicit partition:
--partition 2sends to partition 2 directly - Partition key:
--partition-key alicehashes the key to a deterministic partition (Wyhash). All records with the same key land on the same partition, preserving per-key ordering. - No flag: defaults to partition 0
# Explicit partitionflo stream append orders '{"id":1}' --partition 2
# Partition key — all alice events on the same partitionflo stream append orders '{"user":"alice"}' --partition-key aliceflo stream append orders '{"user":"alice"}' --partition-key alice # same partition
# Different key may route to a different partitionflo stream append orders '{"user":"bob"}' --partition-key bobReading from partitions:
# Read only partition 1flo stream read orders --partition 1
# Read using the same partition key (resolves to the same partition)flo stream read orders --partition-key alice
# Bare read without --partition or --partition-key reads ALL partitionsflo stream read orders --limit 20A bare stream read without partition flags returns records from all partitions merged together. This matches industry-standard behavior (Kafka, Redpanda, Pulsar).
Operations
Section titled “Operations”Append
Section titled “Append”# Single recordflo stream append events '{"type":"signup","user":"alice"}'
# Batch append (multiple payloads in one request)flo stream append events 'event-a' 'event-b' 'event-c'
# With headersflo stream append events '{"type":"login"}' -H source=web -H trace-id=abc123
# To a specific partitionflo stream append events '{"type":"login"}' --partition 2
# With partition key routingflo stream append events '{"type":"login"}' --partition-key alice
# JSON output (returns the Stream ID)flo stream append events 'hello' --jsonThe response includes the assigned Stream ID (e.g., 1703350800000-0).
# From the beginningflo stream read events --start 0-0 --limit 10
# From a specific Stream IDflo stream read events --start 1703350800000-0 --limit 50
# Bounded range (inclusive on both ends)flo stream read events --start 1703350800000-0 --end 1703350815000-0
# JSON output (includes id, payload, tier, partition fields)flo stream read events --limit 10 --jsonThe default limit is 10. Maximum is 1000 per request.
Blocking Reads
Section titled “Blocking Reads”Blocking reads wait for new data instead of returning empty results immediately.
# Block up to 5 seconds for new recordsflo stream read events --block 5000 --start 0-0 --limit 5
# Block forever (0 = infinite timeout)flo stream read events --block 0 --start 0-0 --limit 5
# Block for new records from the tail only (skip existing data)flo stream read events --block 5000 --start '$' --limit 5If matching data already exists, the blocking read returns immediately. If no data arrives within the timeout, it returns an empty (but successful) result.
Follow Mode
Section titled “Follow Mode”Follow mode is a continuous tail — it keeps reading as new records arrive, like tail -f.
# Follow from the beginning (prints existing + new records)flo stream read events --follow --start 0-0
# Follow from the tail (new records only)flo stream read events --follow --start '$'--follow implies a blocking timeout and loops internally. It runs until interrupted.
Create
Section titled “Create”Streams are auto-created on first append. Explicit creation is only needed when you want multiple partitions or custom retention up front.
flo stream create events # 1 partition (default)flo stream create events --partitions 4 # 4 partitionsflo stream create events --retention 72 # 72-hour retentionflo stream info eventsflo stream info events --jsonReturns stream metadata: name, partition count, first/last Stream IDs, record count.
flo stream lsflo stream ls --limit 50 --jsonRemove old records to reclaim storage.
# Keep only the last 10,000 recordsflo stream trim events --maxlen 10000
# Remove records before a specific Stream IDflo stream trim events --before 1703350800000-0
# Remove records older than 1 hourflo stream trim events --maxage 3600
# Trim to a byte budgetflo stream trim events --maxbytes 1073741824
# Preview what would be trimmed (no changes made)flo stream trim events --maxlen 10000 --dry-runConsumer Groups
Section titled “Consumer Groups”Consumer groups allow multiple consumers to process a stream cooperatively. Each record is delivered to exactly one consumer in the group. The server tracks a Pending Entry List (PEL) — records that have been delivered but not yet acknowledged.
Lifecycle
Section titled “Lifecycle”# Create a group explicitly (optional — group read auto-creates)flo stream group create events --group processors
# Join a consumer to the groupflo stream group join events processors worker-1
# Read records (auto-creates group and joins consumer if needed)flo stream group read events --group processors --consumer worker-1 --limit 10
# Acknowledge processed recordsflo stream group ack events --group processors --consumer worker-1 \ --ids 1703350800000-0,1703350800000-1
# Leave the groupflo stream group leave events --group processors --consumer worker-1
# Delete the entire groupflo stream group delete events --group processorsAcknowledgment
Section titled “Acknowledgment”Records delivered via group read are placed in the PEL. They must be acknowledged to advance the cursor. Unacknowledged records will be redelivered.
# Ack specific IDs (comma-separated)flo stream group ack events --group processors --consumer worker-1 \ --ids 1703350800000-0,1703350800000-1
# Read with auto-ack — no PEL tracking (at-most-once delivery)flo stream group read events --group processors --consumer worker-1 --no-ackRedelivery and Nack
Section titled “Redelivery and Nack”If processing fails, you can explicitly nack records to put them back in the delivery pool:
# Nack — make records available for redelivery immediatelyflo stream group nack events --group processors --consumer worker-1 \ --ids 1703350800000-0
# Nack with delay — records become visible again after 5 secondsflo stream group nack events --group processors --consumer worker-1 \ --ids 1703350800000-0 --delay 5000The group can also auto-redeliver if the consumer doesn’t ack within a deadline:
# Auto-redeliver after 30 seconds of no ackflo stream group create events --group processors --ack-timeout 30000
# Give up after 5 delivery attempts (records go to DLQ)flo stream group create events --group processors --max-deliver 5Extending Delivery Deadlines
Section titled “Extending Delivery Deadlines”If a consumer needs more time to process a record, it can extend the ack deadline:
# Extend the deadline by 60 secondsflo stream group touch events --group processors --consumer worker-1 \ --ids 1703350800000-0 --extend 60000Inspecting Pending State
Section titled “Inspecting Pending State”# List all pending (delivered but unacked) records for a groupflo stream group pending events --group processors
# Group metadata (consumer count, PEL size, last delivered ID)flo stream group info events --group processorsConsumer Modes
Section titled “Consumer Modes”Groups support three delivery modes, set at creation time:
| Mode | Behavior |
|---|---|
shared (default) | Records distributed round-robin across all consumers |
exclusive | One active consumer holds the lease; others are standbys |
key_shared | Records with the same key always go to the same consumer |
# Shared mode (default)flo stream group create events --group processors --mode shared
# Exclusive — one active consumer, others waitflo stream group create events --group processors --mode exclusive
# Singleton — exclusive with zero standbys allowedflo stream group create events --group processors --mode exclusive --max-standbys 0
# Key-shared — partition by key across consumersflo stream group create events --group processors --mode key_shared --slots 16SDK Examples
Section titled “SDK Examples”client := flo.NewClient("localhost:9000")client.Connect()defer client.Close()
// Appendresult, _ := client.Stream.Append("events", []byte(`{"event":"signup"}`), nil)
// Read from beginningrecords, _ := client.Stream.Read("events", &flo.StreamReadOptions{ Start: &flo.StreamID{TimestampMS: 0, Sequence: 0}, Count: ptr(uint32(10)),})for _, rec := range records.Records { fmt.Printf("id=%d-%d: %s\n", rec.ID.TimestampMS, rec.ID.Sequence, rec.Payload)}
// Blocking readrecords, _ = client.Stream.Read("events", &flo.StreamReadOptions{ Tail: true, BlockMS: ptr(uint32(5000)), Count: ptr(uint32(10)),})
// Consumer grouprecords, _ = client.Stream.GroupRead("events", "processors", "worker-1", &flo.StreamGroupReadOptions{ Count: ptr(uint32(10)), BlockMS: ptr(uint32(30000)), },)// ... process records ...ids := make([]flo.StreamID, len(records.Records))for i, rec := range records.Records { ids[i] = rec.ID}client.Stream.GroupAck("events", "processors", ids, &flo.StreamGroupAckOptions{ Consumer: "worker-1",})async with FloClient("localhost:9000") as client: # Append result = await client.stream.append("events", b'{"event":"signup"}') print(f"Appended at {result.id.timestamp_ms}-{result.id.sequence}")
# Read from beginning result = await client.stream.read("events", StreamReadOptions( start=StreamID(0, 0), count=10, )) for record in result.records: print(f"id={record.id}: {record.payload}")
# Blocking read from tail result = await client.stream.read("events", StreamReadOptions( tail=True, block_ms=5000, count=10, ))
# Consumer group result = await client.stream.group_read( "events", "processors", "worker-1", StreamGroupReadOptions(count=10, block_ms=30000), ) await client.stream.group_ack( "events", "processors", [r.id for r in result.records], StreamGroupAckOptions(consumer="worker-1"), )const client = new FloClient("localhost:9000");await client.connect();
// Appendconst result = await client.stream.append( "events", encode('{"event":"signup"}'));
// Read from beginningconst records = await client.stream.read("events", { start: StreamID.fromTimestamp(0n), limit: 10,});for (const rec of records.records) { console.log(`id=${rec.id}: ${decode(rec.payload)}`);}
// Blocking read from tailconst newRecords = await client.stream.read("events", { tail: true, blockMs: 5000, limit: 10,});
// Consumer groupconst groupRecords = await client.stream.groupRead("events", { group: "processors", consumer: "worker-1", limit: 10, blockMs: 30000,});await client.stream.groupAck( "events", groupRecords.records.map(r => r.id), { group: "processors", consumer: "worker-1" },);var client = flo.Client.init(allocator, "localhost:9000", .{});defer client.deinit();try client.connect();
var stream = flo.Stream.init(&client);
// Appendconst result = try stream.append("events", "{\"event\":\"signup\"}", .{});
// Read from beginningvar records = try stream.read("events", .{ .start = .{ .timestamp_ms = 0, .sequence = 0 }, .count = 10,});defer records.deinit();for (records.records) |rec| { std.debug.print("id={d}-{d}: {s}\n", .{ rec.id.timestamp_ms, rec.id.sequence, rec.payload });}
// Consumer groupvar group_records = try stream.groupRead("events", "processors", "worker-1", .{ .count = 10, .block_ms = 5000,});defer group_records.deinit();try stream.groupAck("events", "processors", group_records.ids(), .{ .consumer = "worker-1",});Reading Modes
Section titled “Reading Modes”| Mode | CLI Flag | Behavior |
|---|---|---|
| From beginning | --start 0-0 | Returns all records in order |
| From ID | --start <id> | Returns records starting at (or after) the given Stream ID |
| Range | --start <id> --end <id> | Bounded range, inclusive on both ends |
| Tail | --start '$' | Only records appended after the read begins |
| Blocking | --block <ms> | Waits up to <ms> for new data (0 = forever) |
| Follow | --follow | Continuous tail — keeps reading as new records arrive |
| Partition | --partition <n> | Reads from a specific partition only |
| Partition key | --partition-key <key> | Reads from the partition that key hashes to |
| JSON output | --json | Machine-readable output with id, payload, tier, partition fields |
Tiered Storage
Section titled “Tiered Storage”Stream data flows through a tiered storage hierarchy automatically:
| Tier | Location | Purpose |
|---|---|---|
| Hot | In-memory ring buffer | Recent writes, lowest latency |
| Warm | On-disk segments | Overflowed from hot tier, still fast |
| Cold | External backend (file, S3, Azure) | Archival storage |
When you read with --json, each record includes a "tier" field indicating where it was served from. Reads are transparent — the server merges results across tiers automatically.
The thresholds for spilling between tiers are configurable per server (hot buffer capacity, max hot entries). Cold storage can be enabled with file or cloud backends.
Durability
Section titled “Durability”- All appends are Raft-replicated before acknowledgment
- Stream data, consumer group offsets, and pending state all survive server restarts
- After restart, new appends continue with monotonically increasing Stream IDs (no duplicates, no gaps)
- Immediate consistency — a read right after restart returns all pre-restart data
How It Works
Section titled “How It Works”Stream records are UAL entries with opcode = stream_append. Reading a stream is reading a range of UAL entries filtered by stream name hash — there is no separate data structure.
- Record IDs are derived from append wall-clock time plus sequence
- Zero-copy reads — no derived state, no projection overhead
- Consumer group state (cursors, PEL, mode) is managed per-shard
- Partition routing uses Wyhash on the partition key or explicit index
- Blocking reads register waiters that are notified on append
See Storage Internals for details.