Skip to content

Queues

Flo queues provide reliable task distribution with competing consumers, priority ordering, visibility timeouts, dead-letter handling, and deduplication. Every enqueue is Raft-replicated before acknowledgment. Dequeued messages are automatically acked on consume — they won’t reappear after restart.

Every message is assigned a monotonically increasing sequence number on enqueue. The sequence is returned by the enqueue operation and used to reference the message in ack, nack, and touch operations.

Messages are dequeued in priority order. Lower numeric values are higher priority (0 is highest). Messages with equal priority are dequeued in FIFO order.

Terminal window
flo queue enqueue tasks '{"urgent":true}' --priority 1
flo queue enqueue tasks '{"routine":true}' --priority 10
# Dequeue returns the priority-1 message first
flo queue dequeue tasks

Priority is a u8 value (0–255).

Queues live inside namespaces. The same queue name in different namespaces is fully independent — separate messages, separate DLQ, separate statistics.

Namespaces are set via FLO_NAMESPACE environment variable or client configuration:

Terminal window
export FLO_NAMESPACE=staging
flo queue enqueue tasks '{"env":"staging"}'
export FLO_NAMESPACE=production
flo queue enqueue tasks '{"env":"prod"}'
# Each namespace has its own "tasks" queue
Terminal window
# Basic enqueue
flo queue enqueue tasks '{"task":"send-email","to":"alice@example.com"}'
# With priority (lower = higher priority, default: 0)
flo queue enqueue tasks '{"urgent":true}' --priority 1
# Delayed delivery — invisible for 60 seconds after enqueue
flo queue enqueue tasks '{"scheduled":true}' --delay 60000

The response returns the assigned sequence number.

Terminal window
# Pop one message (default count: 1)
flo queue dequeue tasks
# Batch dequeue — up to 10 messages at once (max: 100)
flo queue dequeue tasks --count 10
# Custom visibility timeout (default: 30 seconds)
flo queue dequeue tasks --timeout 60000

Dequeued messages are consumed immediately — they are removed from the queue and won’t be delivered to other consumers. The sequence number and payload are returned for each message.

If the queue is empty, a blocking dequeue waits for new messages instead of returning immediately.

Terminal window
# Block up to 30 seconds for new messages
flo queue dequeue tasks --block 30000
# Block indefinitely (0 = wait forever)
flo queue dequeue tasks --block 0

When a new message is enqueued, all blocking dequeue waiters on that queue are notified. If the timeout expires with no messages, an empty (but successful) result is returned.

Explicitly mark a message as processed:

Terminal window
flo queue ack tasks <seq>
# Ack multiple sequences
flo queue ack tasks 42 43 44

Return a message to the queue for redelivery, or send it directly to the DLQ:

Terminal window
# Nack — message goes back to the ready queue
flo queue nack tasks <seq>
# Nack directly to DLQ
flo queue nack tasks <seq> --dlq

Inspect messages at the front of the queue without consuming them:

Terminal window
flo queue peek tasks --count 5

Extend the visibility timeout on an in-flight message. Use this when processing takes longer than expected:

Terminal window
# Extend with default timeout
flo queue touch tasks <seq>
# Extend by a specific duration (60 seconds)
flo queue touch tasks <seq> --extend 60000
Terminal window
flo queue ls
flo queue list --limit 50 --json

Returns queue metadata: name, namespace, pending count, available count, total enqueued, total dequeued, and DLQ count.

Continuously poll and display messages as they arrive:

Terminal window
flo queue watch tasks

Messages that exceed the retry limit (default: 5 attempts) are moved to the DLQ. The DLQ is a bounded FIFO buffer (10,000 entries by default) — when full, the oldest DLQ entries are evicted.

Terminal window
# List DLQ messages
flo queue dlq list tasks
flo queue dlq list tasks --limit 50
# Requeue DLQ messages back to the main queue
flo queue dlq requeue tasks <seq> <seq> ...

Each DLQ entry records the sequence number, delivery attempt count, and the timestamp when it was moved to the DLQ.

client := flo.NewClient("localhost:9000")
client.Connect()
defer client.Close()
// Enqueue with priority
seq, _ := client.Queue.Enqueue("tasks", []byte(`{"task":"process"}`),
&flo.EnqueueOptions{Priority: 1})
// Dequeue with long polling
result, _ := client.Queue.Dequeue("tasks", 10, &flo.DequeueOptions{
BlockMS: ptr(uint32(30000)),
})
for _, msg := range result.Messages {
// Process message...
client.Queue.Ack("tasks", []uint64{msg.Seq}, nil)
}
// Nack to DLQ on failure
client.Queue.Nack("tasks", []uint64{msg.Seq}, &flo.NackOptions{ToDLQ: true})
// Peek without consuming
peeked, _ := client.Queue.Peek("tasks", 5, nil)
// Touch to extend visibility
client.Queue.Touch("tasks", []uint64{msg.Seq}, nil)
// DLQ operations
dlq, _ := client.Queue.DLQList("tasks", nil)
client.Queue.DLQRequeue("tasks", []uint64{dlq.Messages[0].Seq}, nil)

Each dequeued message includes:

FieldDescription
seqSequence number (monotonically increasing per queue)
payloadThe message body (bytes)
priorityPriority value (0–255, lower = higher priority)
enqueued_atTimestamp when the message was enqueued
delivery_countNumber of delivery attempts
  • All enqueues are Raft-replicated before the response is sent
  • Dequeued messages are persisted as acked — they don’t reappear after restart
  • Un-consumed messages survive server restarts with sync durability
  • Queue metadata (names, statistics) is rebuilt from the UAL on recovery

The Queue Projection uses purpose-built data structures:

  • Ready heap — Min-heap ordered by (priority, sequence). Lower priority values and lower sequences are dequeued first.
  • Lease tracker — Min-heap of (expiry_time, sequence). Expired leases return messages to the ready heap automatically.
  • DLQ ring — Bounded FIFO buffer (default: 10,000 entries). Messages exceeding the max attempt count (default: 5) are moved here.
  • Message mapHashMap(seq → Message) holding the full message state (~128 bytes fixed overhead per message plus payload).

Blocking dequeue uses the same waiter pool as blocking stream reads — enqueue operations notify all registered waiters on that queue.

See Storage Internals for details.