Zig SDK
Installation
Section titled “Installation”Add to build.zig.zon:
.dependencies = .{ .flo = .{ .url = "https://github.com/floruntime/flo-zig/archive/refs/tags/v0.1.0.tar.gz", .hash = "...", },},In build.zig:
const flo = b.dependency("flo", .{ .target = target, .optimize = optimize });exe.root_module.addImport("flo", flo.module("flo"));Quick Start
Section titled “Quick Start”const std = @import("std");const flo = @import("flo");
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var client = flo.Client.init(allocator, "localhost:9000", .{ .namespace = "myapp" }); defer client.deinit(); try client.connect();
// KV var kv = flo.KV.init(&client); try kv.put("key", "value", .{}); if (try kv.get("key", .{})) |value| { defer allocator.free(value); std.debug.print("Got: {s}\n", .{value}); }
// Queue var queue = flo.Queue.init(&client); const seq = try queue.enqueue("tasks", "payload", .{ .priority = 5 }); var result = try queue.dequeue("tasks", 10, .{}); defer result.deinit();
// Stream var stream = flo.Stream.init(&client); _ = try stream.append("events", "data", .{}); var records = try stream.read("events", .{}); defer records.deinit();}KV Operations
Section titled “KV Operations”var kv = flo.KV.init(&client);
// Get (returns null if not found)const value = try kv.get("key", .{});
// Blocking getconst value = try kv.get("key", .{ .block_ms = 5000 });
// Put with optionstry kv.put("key", "value", .{ .ttl_seconds = 3600, .cas_version = 5, .if_not_exists = true, .namespace = "other-ns", // override default});
// Deletetry kv.delete("key", .{});
// Scanvar result = try kv.scan("prefix:", .{ .limit = 100, .keys_only = true });defer result.deinit();
// Historyconst versions = try kv.history("key", .{ .limit = 10 });Queue Operations
Section titled “Queue Operations”var queue = flo.Queue.init(&client);
// Enqueue with optionsconst seq = try queue.enqueue("tasks", "payload", .{ .priority = 5, .delay_ms = 1000, .dedup_key = "unique-id",});
// Dequeue with long pollingvar result = try queue.dequeue("tasks", 10, .{ .visibility_timeout_ms = 60000, .block_ms = 5000,});defer result.deinit();
// Ack, Nack, DLQtry queue.ack("tasks", &seqs, .{});try queue.nack("tasks", &seqs, .{ .to_dlq = false });var dlq = try queue.dlqList("tasks", .{ .limit = 100 });defer dlq.deinit();try queue.dlqRequeue("tasks", &seqs, .{});Stream Operations
Section titled “Stream Operations”var stream = flo.Stream.init(&client);
// Appendconst result = try stream.append("events", "payload", .{});
// Read from beginningvar records = try stream.read("events", .{});defer records.deinit();
// Read from a specific positionvar records = try stream.read("events", .{ .start = flo.StreamID.fromSequence(100), .count = 50,});defer records.deinit();
// Read from tail with long pollingvar records = try stream.read("events", .{ .tail = true, .block_ms = 5000,});defer records.deinit();
// Consumer groupstry stream.groupJoin("events", "my-group", "worker-1", .{});var group_records = try stream.groupRead("events", "my-group", "worker-1", .{ .count = 10, .block_ms = 5000,});defer group_records.deinit();try stream.groupAck("events", "my-group", seqs, .{});
// Info and trimconst info = try stream.info("events", .{});try stream.trim("events", .{ .max_len = 1000 });Worker / Action Operations
Section titled “Worker / Action Operations”The ActionWorker is a high-level worker that manages connection, registration, polling, heartbeats, and graceful shutdown:
const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) ![]const u8 { const input = try ctx.json(OrderRequest); defer input.deinit();
// For long tasks, extend the lease try ctx.touch(30000);
return ctx.toBytes(.{ .status = "completed" });}
pub fn main() !void { var gpa = std.heap.GeneralPurposeAllocator(.{}){}; defer _ = gpa.deinit(); const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{ .endpoint = "localhost:9000", .namespace = "myapp", .concurrency = 10, }); defer worker.deinit();
try worker.registerAction("process-order", processOrder); try worker.start();}For low-level action operations (register, invoke, status), use flo.Actions:
var actions = flo.Actions.init(&client);
// Register an action typetry actions.register("send-email", .user, .{ .timeout_ms = 30000, .max_retries = 3,});
// Invoke an actionconst run_id = try actions.invoke("send-email", "{\"to\":\"user@example.com\"}", .{});defer allocator.free(run_id);
// Check statusconst status = try actions.status(run_id, .{});Error Handling
Section titled “Error Handling”All operations return flo.FloError:
| Error | Description |
|---|---|
NotConnected | Client not connected |
ConnectionFailed | TCP connection failed |
NotFound | Key/queue not found |
BadRequest | Invalid request |
Conflict | CAS conflict |
ServerError | Internal server error |
Memory Management
Section titled “Memory Management”- All results that allocate memory have a
.deinit()method — alwaysdefer result.deinit() - The client requires an allocator at init time
- String values returned from
get()must be freed by the caller