Skip to content

Zig SDK

Add to build.zig.zon:

.dependencies = .{
.flo = .{
.url = "https://github.com/floruntime/flo-zig/archive/refs/tags/v0.1.0.tar.gz",
.hash = "...",
},
},

In build.zig:

const flo = b.dependency("flo", .{ .target = target, .optimize = optimize });
exe.root_module.addImport("flo", flo.module("flo"));
const std = @import("std");
const flo = @import("flo");
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var client = flo.Client.init(allocator, "localhost:9000", .{ .namespace = "myapp" });
defer client.deinit();
try client.connect();
// KV
var kv = flo.KV.init(&client);
try kv.put("key", "value", .{});
if (try kv.get("key", .{})) |value| {
defer allocator.free(value);
std.debug.print("Got: {s}\n", .{value});
}
// Queue
var queue = flo.Queue.init(&client);
const seq = try queue.enqueue("tasks", "payload", .{ .priority = 5 });
var result = try queue.dequeue("tasks", 10, .{});
defer result.deinit();
// Stream
var stream = flo.Stream.init(&client);
_ = try stream.append("events", "data", .{});
var records = try stream.read("events", .{});
defer records.deinit();
}
var kv = flo.KV.init(&client);
// Get (returns null if not found)
const value = try kv.get("key", .{});
// Blocking get
const value = try kv.get("key", .{ .block_ms = 5000 });
// Put with options
try kv.put("key", "value", .{
.ttl_seconds = 3600,
.cas_version = 5,
.if_not_exists = true,
.namespace = "other-ns", // override default
});
// Delete
try kv.delete("key", .{});
// Scan
var result = try kv.scan("prefix:", .{ .limit = 100, .keys_only = true });
defer result.deinit();
// History
const versions = try kv.history("key", .{ .limit = 10 });
var queue = flo.Queue.init(&client);
// Enqueue with options
const seq = try queue.enqueue("tasks", "payload", .{
.priority = 5,
.delay_ms = 1000,
.dedup_key = "unique-id",
});
// Dequeue with long polling
var result = try queue.dequeue("tasks", 10, .{
.visibility_timeout_ms = 60000,
.block_ms = 5000,
});
defer result.deinit();
// Ack, Nack, DLQ
try queue.ack("tasks", &seqs, .{});
try queue.nack("tasks", &seqs, .{ .to_dlq = false });
var dlq = try queue.dlqList("tasks", .{ .limit = 100 });
defer dlq.deinit();
try queue.dlqRequeue("tasks", &seqs, .{});
var stream = flo.Stream.init(&client);
// Append
const result = try stream.append("events", "payload", .{});
// Read from beginning
var records = try stream.read("events", .{});
defer records.deinit();
// Read from a specific position
var records = try stream.read("events", .{
.start = flo.StreamID.fromSequence(100), .count = 50,
});
defer records.deinit();
// Read from tail with long polling
var records = try stream.read("events", .{
.tail = true, .block_ms = 5000,
});
defer records.deinit();
// Consumer groups
try stream.groupJoin("events", "my-group", "worker-1", .{});
var group_records = try stream.groupRead("events", "my-group", "worker-1", .{
.count = 10, .block_ms = 5000,
});
defer group_records.deinit();
try stream.groupAck("events", "my-group", seqs, .{});
// Info and trim
const info = try stream.info("events", .{});
try stream.trim("events", .{ .max_len = 1000 });

The ActionWorker is a high-level worker that manages connection, registration, polling, heartbeats, and graceful shutdown:

const flo = @import("flo");
fn processOrder(ctx: *flo.ActionContext) ![]const u8 {
const input = try ctx.json(OrderRequest);
defer input.deinit();
// For long tasks, extend the lease
try ctx.touch(30000);
return ctx.toBytes(.{ .status = "completed" });
}
pub fn main() !void {
var gpa = std.heap.GeneralPurposeAllocator(.{}){};
defer _ = gpa.deinit();
const allocator = gpa.allocator();
var worker = try flo.ActionWorker.init(allocator, .{
.endpoint = "localhost:9000",
.namespace = "myapp",
.concurrency = 10,
});
defer worker.deinit();
try worker.registerAction("process-order", processOrder);
try worker.start();
}

For low-level action operations (register, invoke, status), use flo.Actions:

var actions = flo.Actions.init(&client);
// Register an action type
try actions.register("send-email", .user, .{
.timeout_ms = 30000, .max_retries = 3,
});
// Invoke an action
const run_id = try actions.invoke("send-email", "{\"to\":\"user@example.com\"}", .{});
defer allocator.free(run_id);
// Check status
const status = try actions.status(run_id, .{});

All operations return flo.FloError:

ErrorDescription
NotConnectedClient not connected
ConnectionFailedTCP connection failed
NotFoundKey/queue not found
BadRequestInvalid request
ConflictCAS conflict
ServerErrorInternal server error
  • All results that allocate memory have a .deinit() method — always defer result.deinit()
  • The client requires an allocator at init time
  • String values returned from get() must be freed by the caller