Zig SDK
Installation
Section titled “Installation”Add to build.zig.zon:
.dependencies = .{ .flo = .{ .url = "https://github.com/floruntime/flo-zig/archive/refs/tags/v0.1.0.tar.gz", .hash = "...", },},In build.zig:
const flo = b.dependency("flo", .{ .target = target, .optimize = optimize });exe.root_module.addImport("flo", flo.module("flo"));Quick Start
Section titled “Quick Start”const std = @import("std");const flo = @import("flo");
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var client = flo.Client.init(allocator, "localhost:9000", .{ .namespace = "myapp" }); defer client.deinit(); try client.connect();
// KV var kv = flo.KV.init(&client); _ = try kv.put("key", "value", .{}); if (try kv.get("key", .{})) |r_const| { var r = r_const; defer r.deinit(allocator); std.debug.print("Got: {s} @ v{d}\n", .{ r.value, r.version }); }
// Queue var queue = flo.Queue.init(&client); const seq = try queue.enqueue("tasks", "payload", .{ .priority = 5 }); var result = try queue.dequeue("tasks", 10, .{}); defer result.deinit();
// Stream var stream = flo.Stream.init(&client); _ = try stream.append("events", "data", .{}); var records = try stream.read("events", .{}); defer records.deinit();}KV Operations
Section titled “KV Operations”var kv = flo.KV.init(&client);
// Get returns ?GetResult — carries both value and versionif (try kv.get("key", .{})) |r_const| { var r = r_const; defer r.deinit(allocator); std.debug.print("{s} @ v{d}\n", .{ r.value, r.version });}
// Blocking get_ = try kv.get("key", .{ .block_ms = 5000 });
// Put returns PutResult { version: u64 }const put_res = try kv.put("key", "value", .{ .ttl_seconds = 3600, .if_not_exists = true, .namespace = "other-ns",});
// CAS — use the version returned by Get/Putif (try kv.get("key", .{})) |r_const| { var r = r_const; defer r.deinit(allocator); _ = try kv.put("key", "new", .{ .cas_version = r.version });}
// Deletetry kv.delete("key", .{});
// Multi-key get (single round trip, up to 256 keys, may span shards)var mget_res = try kv.mget(&.{ "user:1", "user:2", "user:3" }, .{});defer mget_res.deinit();for (mget_res.entries) |e| { if (e.found) { std.debug.print("{s} = {s} (v{d})\n", .{ e.key, e.value, e.version }); } else { std.debug.print("{s} missing\n", .{e.key}); }}
// Scanvar result = try kv.scan("prefix:", .{ .limit = 100, .keys_only = true });defer result.deinit();
// Historyconst versions = try kv.history("key", .{ .limit = 10 });
// Atomic counterconst n = try kv.incr("visits:home", .{});const m = try kv.incr("visits:home", .{ .delta = 10 });
// TTL lifecycle and existencetry kv.touch("lock:resource", 60, .{});try kv.persist("lock:resource", .{});const exists = try kv.exists("lock:resource", .{});
// JSON paths_ = try kv.jsonSet("order:42", "$", "{\"items\":3,\"status\":\"new\"}", .{});const set_res = try kv.jsonSet("order:42", "$.status", "\"shipped\"", .{}); // atomic sub-fieldstd.debug.print("doc now at v{d}\n", .{set_res.version});if (try kv.jsonGet("order:42", "$.status", .{})) |status_const| { var status = status_const; defer status.deinit(allocator); std.debug.print("{s} @ v{d}\n", .{ status.value, status.version });}_ = try kv.jsonDel("order:42", "$.status", .{});
// Per-shard transaction — atomic multi-key writes on one partitionvar txn = try kv.begin("user:42", .{});defer txn.deinit();_ = try txn.put("user:42:name", "Jane", .{});_ = try txn.incr("user:42:visits", 1);const result = try txn.commit();std.debug.print("committed {d} ops at index {d}\n", .{ result.op_count, result.commit_index });// On error, call `txn.rollback()` instead — idempotent.scan, mget, jsonGet, jsonSet, jsonDel, and history are not supported inside a transaction and return FloError.TxnUnsupportedOp. Server caps: 256 ops per transaction, 1 MiB total payload.
Queue Operations
Section titled “Queue Operations”var queue = flo.Queue.init(&client);
// Enqueue with optionsconst seq = try queue.enqueue("tasks", "payload", .{ .priority = 5, .delay_ms = 1000, .dedup_key = "unique-id",});
// Dequeue with long pollingvar result = try queue.dequeue("tasks", 10, .{ .visibility_timeout_ms = 60000, .block_ms = 5000,});defer result.deinit();
// Ack, Nack, DLQtry queue.ack("tasks", &seqs, .{});try queue.nack("tasks", &seqs, .{ .to_dlq = false });var dlq = try queue.dlqList("tasks", .{ .limit = 100 });defer dlq.deinit();try queue.dlqRequeue("tasks", &seqs, .{});Stream Operations
Section titled “Stream Operations”var stream = flo.Stream.init(&client);
// Appendconst result = try stream.append("events", "payload", .{});
// Append with headersconst result = try stream.append("events", "payload", .{ .headers = "content-type=application/json\nsource=web",});
// Read from beginningvar records = try stream.read("events", .{});defer records.deinit();
// Read from a specific positionvar records = try stream.read("events", .{ .start = flo.StreamID.fromSequence(100), .count = 50,});defer records.deinit();
// Read from tail with long pollingvar records = try stream.read("events", .{ .tail = true, .block_ms = 5000,});defer records.deinit();
// Access record fieldsfor (records.records) |rec| { std.debug.print("Stream: {s}\n", .{rec.stream}); // stream name std.debug.print("Payload: {s}\n", .{rec.payload}); // raw bytes // rec.headers = ?[]const u8 (raw wire bytes, null if no headers)}
// Consumer groupstry stream.groupJoin("events", "my-group", "worker-1", .{});var group_records = try stream.groupRead("events", "my-group", "worker-1", .{ .count = 10, .block_ms = 5000,});defer group_records.deinit();try stream.groupAck("events", "my-group", seqs, .{});
// Info and trimconst info = try stream.info("events", .{});try stream.trim("events", .{ .max_len = 1000 });StreamWorker (high-level)
Section titled “StreamWorker (high-level)”The recommended way to consume streams is the StreamWorker. It handles consumer group join, polling, concurrency, ack/nack, and reconnection automatically:
fn processEvent(ctx: *flo.StreamContext) !void { const data = try ctx.json(Event); defer data.deinit();
std.debug.print("Stream: {s}, ID: {d}\n", .{ ctx.stream(), ctx.streamID().sequence }); if (ctx.headers()) |hdrs| { // iterate parsed headers _ = hdrs; }}
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var worker = try flo.StreamWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .stream = "events", .group = "processors", .concurrency = 5, .batch_size = 10, .block_ms = 30_000, }, processEvent); defer worker.deinit();
try worker.start();}StreamWorkerConfig
Section titled “StreamWorkerConfig”| Option | Type | Default | Description |
|---|---|---|---|
stream | []const u8 | "" | Single stream name |
streams | ?[]const []const u8 | null | Multiple streams |
group | []const u8 | "default" | Consumer group name |
consumer | ?[]const u8 | auto | Consumer ID |
concurrency | u32 | 10 | Max concurrent handlers |
batch_size | u32 | 10 | Records per poll |
block_ms | u32 | 30_000 | Long-poll timeout (ms) |
heartbeat_interval_ms | u64 | 30_000 | Heartbeat interval (ms) |
StreamContext
Section titled “StreamContext”The handler receives a *StreamContext with convenience accessors:
fn handler(ctx: *flo.StreamContext) !void { ctx.payload(); // []const u8 ctx.streamID(); // StreamID ctx.stream(); // stream name ctx.headers(); // ?std.StringHashMap([]const u8) ctx.json(T); // parsed JSON}Records are auto-acked on handler success and auto-nacked on error. Connection errors trigger automatic reconnect and consumer group re-join.
ActionWorker
Section titled “ActionWorker”The ActionWorker is a high-level worker that manages connection, registration, polling, heartbeats, and graceful shutdown:
const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) ![]const u8 { const input = try ctx.json(OrderRequest); defer input.deinit();
// For long tasks, extend the lease try ctx.touch(30000);
return ctx.toBytes(.{ .status = "completed" });}
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .concurrency = 10, }); defer worker.deinit();
try worker.registerAction("process-order", processOrder); try worker.start();}For low-level action operations (register, invoke, status), use flo.Actions:
var actions = flo.Actions.init(&client);
// Register an action typetry actions.register("send-email", .user, .{ .timeout_ms = 30000, .max_retries = 3,});
// Invoke an actionconst run_id = try actions.invoke("send-email", "{\"to\":\"user@example.com\"}", .{});defer allocator.free(run_id);
// Check statusconst status = try actions.status(run_id, .{});Error Handling
Section titled “Error Handling”All operations return flo.FloError:
| Error | Description |
|---|---|
NotConnected | Client not connected |
ConnectionFailed | TCP connection failed |
NotFound | Key/queue not found |
BadRequest | Invalid request |
Conflict | CAS conflict |
ServerError | Internal server error |
Memory Management
Section titled “Memory Management”- All results that allocate memory have a
.deinit()method — alwaysdefer result.deinit() - The client requires an allocator at init time
- String values returned from
get()must be freed by the caller