Skip to content

Zig SDK

Add to build.zig.zon:

.dependencies = .{
.flo = .{
.url = "https://github.com/floruntime/flo-zig/archive/refs/tags/v0.1.0.tar.gz",
.hash = "...",
},
},

In build.zig:

const flo = b.dependency("flo", .{ .target = target, .optimize = optimize });
exe.root_module.addImport("flo", flo.module("flo"));
const std = @import("std");
const flo = @import("flo");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var client = flo.Client.init(allocator, "localhost:9000", .{ .namespace = "myapp" });
defer client.deinit();
try client.connect();
// KV
var kv = flo.KV.init(&client);
_ = try kv.put("key", "value", .{});
if (try kv.get("key", .{})) |r_const| {
var r = r_const;
defer r.deinit(allocator);
std.debug.print("Got: {s} @ v{d}\n", .{ r.value, r.version });
}
// Queue
var queue = flo.Queue.init(&client);
const seq = try queue.enqueue("tasks", "payload", .{ .priority = 5 });
var result = try queue.dequeue("tasks", 10, .{});
defer result.deinit();
// Stream
var stream = flo.Stream.init(&client);
_ = try stream.append("events", "data", .{});
var records = try stream.read("events", .{});
defer records.deinit();
}
var kv = flo.KV.init(&client);
// Get returns ?GetResult — carries both value and version
if (try kv.get("key", .{})) |r_const| {
var r = r_const;
defer r.deinit(allocator);
std.debug.print("{s} @ v{d}\n", .{ r.value, r.version });
}
// Blocking get
_ = try kv.get("key", .{ .block_ms = 5000 });
// Put returns PutResult { version: u64 }
const put_res = try kv.put("key", "value", .{
.ttl_seconds = 3600,
.if_not_exists = true,
.namespace = "other-ns",
});
// CAS — use the version returned by Get/Put
if (try kv.get("key", .{})) |r_const| {
var r = r_const;
defer r.deinit(allocator);
_ = try kv.put("key", "new", .{ .cas_version = r.version });
}
// Delete
try kv.delete("key", .{});
// Multi-key get (single round trip, up to 256 keys, may span shards)
var mget_res = try kv.mget(&.{ "user:1", "user:2", "user:3" }, .{});
defer mget_res.deinit();
for (mget_res.entries) |e| {
if (e.found) {
std.debug.print("{s} = {s} (v{d})\n", .{ e.key, e.value, e.version });
} else {
std.debug.print("{s} missing\n", .{e.key});
}
}
// Scan
var result = try kv.scan("prefix:", .{ .limit = 100, .keys_only = true });
defer result.deinit();
// History
const versions = try kv.history("key", .{ .limit = 10 });
// Atomic counter
const n = try kv.incr("visits:home", .{});
const m = try kv.incr("visits:home", .{ .delta = 10 });
// TTL lifecycle and existence
try kv.touch("lock:resource", 60, .{});
try kv.persist("lock:resource", .{});
const exists = try kv.exists("lock:resource", .{});
// JSON paths
_ = try kv.jsonSet("order:42", "$", "{\"items\":3,\"status\":\"new\"}", .{});
const set_res = try kv.jsonSet("order:42", "$.status", "\"shipped\"", .{}); // atomic sub-field
std.debug.print("doc now at v{d}\n", .{set_res.version});
if (try kv.jsonGet("order:42", "$.status", .{})) |status_const| {
var status = status_const;
defer status.deinit(allocator);
std.debug.print("{s} @ v{d}\n", .{ status.value, status.version });
}
_ = try kv.jsonDel("order:42", "$.status", .{});
// Per-shard transaction — atomic multi-key writes on one partition
var txn = try kv.begin("user:42", .{});
defer txn.deinit();
_ = try txn.put("user:42:name", "Jane", .{});
_ = try txn.incr("user:42:visits", 1);
const result = try txn.commit();
std.debug.print("committed {d} ops at index {d}\n", .{ result.op_count, result.commit_index });
// On error, call `txn.rollback()` instead — idempotent.

scan, mget, jsonGet, jsonSet, jsonDel, and history are not supported inside a transaction and return FloError.TxnUnsupportedOp. Server caps: 256 ops per transaction, 1 MiB total payload.

var queue = flo.Queue.init(&client);
// Enqueue with options
const seq = try queue.enqueue("tasks", "payload", .{
.priority = 5,
.delay_ms = 1000,
.dedup_key = "unique-id",
});
// Dequeue with long polling
var result = try queue.dequeue("tasks", 10, .{
.visibility_timeout_ms = 60000,
.block_ms = 5000,
});
defer result.deinit();
// Ack, Nack, DLQ
try queue.ack("tasks", &seqs, .{});
try queue.nack("tasks", &seqs, .{ .to_dlq = false });
var dlq = try queue.dlqList("tasks", .{ .limit = 100 });
defer dlq.deinit();
try queue.dlqRequeue("tasks", &seqs, .{});
var stream = flo.Stream.init(&client);
// Append
const result = try stream.append("events", "payload", .{});
// Append with headers
const result = try stream.append("events", "payload", .{
.headers = "content-type=application/json\nsource=web",
});
// Read from beginning
var records = try stream.read("events", .{});
defer records.deinit();
// Read from a specific position
var records = try stream.read("events", .{
.start = flo.StreamID.fromSequence(100), .count = 50,
});
defer records.deinit();
// Read from tail with long polling
var records = try stream.read("events", .{
.tail = true, .block_ms = 5000,
});
defer records.deinit();
// Access record fields
for (records.records) |rec| {
std.debug.print("Stream: {s}\n", .{rec.stream}); // stream name
std.debug.print("Payload: {s}\n", .{rec.payload}); // raw bytes
// rec.headers = ?[]const u8 (raw wire bytes, null if no headers)
}
// Consumer groups
try stream.groupJoin("events", "my-group", "worker-1", .{});
var group_records = try stream.groupRead("events", "my-group", "worker-1", .{
.count = 10, .block_ms = 5000,
});
defer group_records.deinit();
try stream.groupAck("events", "my-group", seqs, .{});
// Info and trim
const info = try stream.info("events", .{});
try stream.trim("events", .{ .max_len = 1000 });

The recommended way to consume streams is the StreamWorker. It handles consumer group join, polling, concurrency, ack/nack, and reconnection automatically:

fn processEvent(ctx: *flo.StreamContext) !void {
const data = try ctx.json(Event);
defer data.deinit();
std.debug.print("Stream: {s}, ID: {d}\n", .{ ctx.stream(), ctx.streamID().sequence });
if (ctx.headers()) |hdrs| {
// iterate parsed headers
_ = hdrs;
}
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var worker = try flo.StreamWorker.init(allocator, .{
.endpoint = "localhost:9000",
.namespace = "myapp",
.stream = "events",
.group = "processors",
.concurrency = 5,
.batch_size = 10,
.block_ms = 30_000,
}, processEvent);
defer worker.deinit();
try worker.start();
}
OptionTypeDefaultDescription
stream[]const u8""Single stream name
streams?[]const []const u8nullMultiple streams
group[]const u8"default"Consumer group name
consumer?[]const u8autoConsumer ID
concurrencyu3210Max concurrent handlers
batch_sizeu3210Records per poll
block_msu3230_000Long-poll timeout (ms)
heartbeat_interval_msu6430_000Heartbeat interval (ms)

The handler receives a *StreamContext with convenience accessors:

fn handler(ctx: *flo.StreamContext) !void {
ctx.payload(); // []const u8
ctx.streamID(); // StreamID
ctx.stream(); // stream name
ctx.headers(); // ?std.StringHashMap([]const u8)
ctx.json(T); // parsed JSON
}

Records are auto-acked on handler success and auto-nacked on error. Connection errors trigger automatic reconnect and consumer group re-join.

The ActionWorker is a high-level worker that manages connection, registration, polling, heartbeats, and graceful shutdown:

const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) ![]const u8 {
const input = try ctx.json(OrderRequest);
defer input.deinit();
// For long tasks, extend the lease
try ctx.touch(30000);
return ctx.toBytes(.{ .status = "completed" });
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{
.endpoint = "localhost:9000",
.namespace = "myapp",
.concurrency = 10,
});
defer worker.deinit();
try worker.registerAction("process-order", processOrder);
try worker.start();
}

For low-level action operations (register, invoke, status), use flo.Actions:

var actions = flo.Actions.init(&client);
// Register an action type
try actions.register("send-email", .user, .{
.timeout_ms = 30000, .max_retries = 3,
});
// Invoke an action
const run_id = try actions.invoke("send-email", "{\"to\":\"user@example.com\"}", .{});
defer allocator.free(run_id);
// Check status
const status = try actions.status(run_id, .{});

All operations return flo.FloError:

ErrorDescription
NotConnectedClient not connected
ConnectionFailedTCP connection failed
NotFoundKey/queue not found
BadRequestInvalid request
ConflictCAS conflict
ServerErrorInternal server error
  • All results that allocate memory have a .deinit() method — always defer result.deinit()
  • The client requires an allocator at init time
  • String values returned from get() must be freed by the caller