Queues
Flo queues provide reliable task distribution with competing consumers, priority ordering, visibility timeouts, dead-letter handling, and deduplication. Every enqueue is Raft-replicated before acknowledgment. Dequeued messages are automatically acked on consume — they won’t reappear after restart.
Core Concepts
Section titled “Core Concepts”Sequence Numbers
Section titled “Sequence Numbers”Every message is assigned a monotonically increasing sequence number on enqueue. The sequence is returned by the enqueue operation and used to reference the message in ack, nack, and touch operations.
Priority Ordering
Section titled “Priority Ordering”Messages are dequeued in priority order. Lower numeric values are higher priority (0 is highest). Messages with equal priority are dequeued in FIFO order.
flo queue enqueue tasks '{"urgent":true}' --priority 1flo queue enqueue tasks '{"routine":true}' --priority 10
# Dequeue returns the priority-1 message firstflo queue dequeue tasksPriority is a u8 value (0–255).
Namespace Isolation
Section titled “Namespace Isolation”Queues live inside namespaces. The same queue name in different namespaces is fully independent — separate messages, separate DLQ, separate statistics.
Namespaces are set via FLO_NAMESPACE environment variable or client configuration:
export FLO_NAMESPACE=stagingflo queue enqueue tasks '{"env":"staging"}'
export FLO_NAMESPACE=productionflo queue enqueue tasks '{"env":"prod"}'# Each namespace has its own "tasks" queueOperations
Section titled “Operations”Enqueue
Section titled “Enqueue”# Basic enqueueflo queue enqueue tasks '{"task":"send-email","to":"alice@example.com"}'
# With priority (lower = higher priority, default: 0)flo queue enqueue tasks '{"urgent":true}' --priority 1
# Delayed delivery — invisible for 60 seconds after enqueueflo queue enqueue tasks '{"scheduled":true}' --delay 60000The response returns the assigned sequence number.
Dequeue
Section titled “Dequeue”# Pop one message (default count: 1)flo queue dequeue tasks
# Batch dequeue — up to 10 messages at once (max: 100)flo queue dequeue tasks --count 10
# Custom visibility timeout (default: 30 seconds)flo queue dequeue tasks --timeout 60000Dequeued messages are consumed immediately — they are removed from the queue and won’t be delivered to other consumers. The sequence number and payload are returned for each message.
Blocking Dequeue
Section titled “Blocking Dequeue”If the queue is empty, a blocking dequeue waits for new messages instead of returning immediately.
# Block up to 30 seconds for new messagesflo queue dequeue tasks --block 30000
# Block indefinitely (0 = wait forever)flo queue dequeue tasks --block 0When a new message is enqueued, all blocking dequeue waiters on that queue are notified. If the timeout expires with no messages, an empty (but successful) result is returned.
Acknowledge
Section titled “Acknowledge”Explicitly mark a message as processed:
flo queue ack tasks <seq>
# Ack multiple sequencesflo queue ack tasks 42 43 44Negative Acknowledge (Nack)
Section titled “Negative Acknowledge (Nack)”Return a message to the queue for redelivery, or send it directly to the DLQ:
# Nack — message goes back to the ready queueflo queue nack tasks <seq>
# Nack directly to DLQflo queue nack tasks <seq> --dlqInspect messages at the front of the queue without consuming them:
flo queue peek tasks --count 5Extend the visibility timeout on an in-flight message. Use this when processing takes longer than expected:
# Extend with default timeoutflo queue touch tasks <seq>
# Extend by a specific duration (60 seconds)flo queue touch tasks <seq> --extend 60000List Queues
Section titled “List Queues”flo queue lsflo queue list --limit 50 --jsonReturns queue metadata: name, namespace, pending count, available count, total enqueued, total dequeued, and DLQ count.
Continuously poll and display messages as they arrive:
flo queue watch tasksDead Letter Queue
Section titled “Dead Letter Queue”Messages that exceed the retry limit (default: 5 attempts) are moved to the DLQ. The DLQ is a bounded FIFO buffer (10,000 entries by default) — when full, the oldest DLQ entries are evicted.
# List DLQ messagesflo queue dlq list tasksflo queue dlq list tasks --limit 50
# Requeue DLQ messages back to the main queueflo queue dlq requeue tasks <seq> <seq> ...Each DLQ entry records the sequence number, delivery attempt count, and the timestamp when it was moved to the DLQ.
SDK Examples
Section titled “SDK Examples”client := flo.NewClient("localhost:9000")client.Connect()defer client.Close()
// Enqueue with priorityseq, _ := client.Queue.Enqueue("tasks", []byte(`{"task":"process"}`), &flo.EnqueueOptions{Priority: 1})
// Dequeue with long pollingresult, _ := client.Queue.Dequeue("tasks", 10, &flo.DequeueOptions{ BlockMS: ptr(uint32(30000)),})
for _, msg := range result.Messages { // Process message... client.Queue.Ack("tasks", []uint64{msg.Seq}, nil)}
// Nack to DLQ on failureclient.Queue.Nack("tasks", []uint64{msg.Seq}, &flo.NackOptions{ToDLQ: true})
// Peek without consumingpeeked, _ := client.Queue.Peek("tasks", 5, nil)
// Touch to extend visibilityclient.Queue.Touch("tasks", []uint64{msg.Seq}, nil)
// DLQ operationsdlq, _ := client.Queue.DLQList("tasks", nil)client.Queue.DLQRequeue("tasks", []uint64{dlq.Messages[0].Seq}, nil)async with FloClient("localhost:9000") as client: # Enqueue with priority seq = await client.queue.enqueue("tasks", b'{"task":"process"}', EnqueueOptions(priority=1))
# Dequeue with long polling result = await client.queue.dequeue("tasks", 10, DequeueOptions(block_ms=30000))
for msg in result.messages: try: process(msg.payload) await client.queue.ack("tasks", [msg.seq]) except Exception: await client.queue.nack("tasks", [msg.seq], NackOptions(to_dlq=True))
# Peek without consuming peeked = await client.queue.peek("tasks", 5)
# Touch to extend visibility await client.queue.touch("tasks", [msg.seq])
# DLQ operations dlq = await client.queue.dlq_list("tasks") await client.queue.dlq_requeue("tasks", [m.seq for m in dlq.messages])const client = new FloClient("localhost:9000");await client.connect();
// Enqueue with priorityconst seq = await client.queue.enqueue("tasks", encode('{"task":"process"}'), { priority: 1 });
// Dequeue with long pollingconst result = await client.queue.dequeue("tasks", 10, { blockMs: 30000,});
for (const msg of result.messages) { try { process(decode(msg.payload)); await client.queue.ack("tasks", [msg.seq]); } catch { await client.queue.nack("tasks", [msg.seq], { toDlq: true }); }}
// Peek without consumingconst peeked = await client.queue.peek("tasks", 5);
// Touch to extend visibilityawait client.queue.touch("tasks", [msg.seq]);
// DLQ operationsconst dlq = await client.queue.dlqList("tasks");await client.queue.dlqRequeue("tasks", dlq.messages.map(m => m.seq));var client = flo.Client.init(allocator, "localhost:9000", .{});defer client.deinit();try client.connect();
var queue = flo.Queue.init(&client);
// Enqueue with priorityconst seq = try queue.enqueue("tasks", "{\"task\":\"process\"}", .{ .priority = 1,});
// Dequeue with long pollingvar result = try queue.dequeue("tasks", 10, .{ .block_ms = 30000 });defer result.deinit();
for (result.messages) |msg| { // Process... try queue.ack("tasks", &[_]u64{msg.seq}, .{});}
// Peek without consumingvar peeked = try queue.peek("tasks", 5, .{});defer peeked.deinit();
// Touch to extend visibilitytry queue.touch("tasks", &[_]u64{seq}, .{});
// DLQ operationsvar dlq = try queue.dlqList("tasks", .{});defer dlq.deinit();Dequeue Message Fields
Section titled “Dequeue Message Fields”Each dequeued message includes:
| Field | Description |
|---|---|
seq | Sequence number (monotonically increasing per queue) |
payload | The message body (bytes) |
priority | Priority value (0–255, lower = higher priority) |
enqueued_at | Timestamp when the message was enqueued |
delivery_count | Number of delivery attempts |
Durability
Section titled “Durability”- All enqueues are Raft-replicated before the response is sent
- Dequeued messages are persisted as acked — they don’t reappear after restart
- Un-consumed messages survive server restarts with sync durability
- Queue metadata (names, statistics) is rebuilt from the UAL on recovery
How It Works
Section titled “How It Works”The Queue Projection uses purpose-built data structures:
- Ready heap — Min-heap ordered by
(priority, sequence). Lower priority values and lower sequences are dequeued first. - Lease tracker — Min-heap of
(expiry_time, sequence). Expired leases return messages to the ready heap automatically. - DLQ ring — Bounded FIFO buffer (default: 10,000 entries). Messages exceeding the max attempt count (default: 5) are moved here.
- Message map —
HashMap(seq → Message)holding the full message state (~128 bytes fixed overhead per message plus payload).
Blocking dequeue uses the same waiter pool as blocking stream reads — enqueue operations notify all registered waiters on that queue.
See Storage Internals for details.