Skip to content

Go SDK

Terminal window
go get github.com/floruntime/flo-go
package main
import (
"fmt"
"log"
flo "github.com/floruntime/flo-go"
)
func main() {
client := flo.NewClient("localhost:9000",
flo.WithNamespace("myapp"),
)
if err := client.Connect(); err != nil {
log.Fatal(err)
}
defer client.Close()
// KV
client.KV.Put("user:123", []byte("John Doe"), nil)
value, _ := client.KV.Get("user:123", nil)
fmt.Printf("Got: %s\n", value)
// Queue
client.Queue.Enqueue("tasks", []byte(`{"task":"process"}`), nil)
result, _ := client.Queue.Dequeue("tasks", 10, nil)
for _, msg := range result.Messages {
client.Queue.Ack("tasks", []uint64{msg.Seq}, nil)
}
// Stream
client.Stream.Append("events", []byte(`{"event":"login"}`), nil)
records, _ := client.Stream.Read("events", nil)
for _, rec := range records.Records {
fmt.Printf("Event: %s\n", rec.Payload)
}
}
client := flo.NewClient("localhost:9000",
flo.WithNamespace("default"), // Default namespace
flo.WithTimeout(5 * time.Second), // Connection/operation timeout
flo.WithDebug(true), // Enable debug logging
)
// Returns *flo.GetResult (nil if not found) — carries both value and version
result, err := client.KV.Get("key", nil)
if result != nil {
fmt.Printf("%s @ v%d\n", result.Value, result.Version)
}
// Blocking get (wait for key to appear)
blockMS := uint32(5000)
result, _ = client.KV.Get("key", &flo.GetOptions{BlockMS: &blockMS})
// Simple put — returns flo.PutResult with the committed version
res, err := client.KV.Put("key", []byte("value"), nil)
fmt.Println("committed at version", res.Version)
// With TTL
ttl := uint64(3600)
res, _ = client.KV.Put("key", []byte("value"), &flo.PutOptions{TTLSeconds: &ttl})
// With CAS — use the version returned by Get or a previous Put
cur, _ := client.KV.Get("key", nil)
res, err = client.KV.Put("key", []byte("new"), &flo.PutOptions{CASVersion: &cur.Version})
if flo.IsConflict(err) {
// Version mismatch — re-read and retry
}
// CAS create-if-absent (version 0 means "must not exist")
zero := uint64(0)
res, err = client.KV.Put("key", []byte("first"), &flo.PutOptions{CASVersion: &zero})
// Conditional
res, err = client.KV.Put("key", []byte("value"), &flo.PutOptions{IfNotExists: true})
err := client.KV.Delete("key", nil)

Fetch up to 256 keys in a single round trip — keys may live on different shards. Each entry has a Found flag.

entries, err := client.KV.MGet([]string{"user:1", "user:2", "user:3"}, nil)
if err != nil { return err }
for _, e := range entries {
if e.Found {
fmt.Printf("%s = %s (v%d)\n", e.Key, e.Value, e.Version)
} else {
fmt.Printf("%s missing\n", e.Key)
}
}
n, err := client.KV.Incr("visits:home", nil) // +1
n, err = client.KV.Incr("visits:home", &flo.KVIncrOptions{Delta: flo.Int64Ptr(10)})
client.KV.Touch("lock:resource", 60, nil) // extend TTL
client.KV.Persist("lock:resource", nil) // clear TTL
ok, _ := client.KV.Exists("lock:resource", nil)
// Whole-document set (creates the key)
client.KV.JsonSet("order:42", "$", []byte(`{"items":3,"status":"new"}`), nil)
// Atomic sub-field update — single Raft entry, returns new version
res, _ := client.KV.JsonSet("order:42", "$.status", []byte(`"shipped"`), nil)
fmt.Println("doc now at v", res.Version)
// Read a sub-field — returns *GetResult{Value, Version}
status, _ := client.KV.JsonGet("order:42", "$.status", nil)
fmt.Printf("%s @ v%d\n", status.Value, status.Version)
// Remove a sub-field
client.KV.JsonDel("order:42", "$.status", nil)

Buffer multiple writes on a single pinned partition and commit them atomically as one Raft entry. Every key touched inside the transaction must hash to the same partition as the routing key.

txn, err := client.KV.Begin("user:42", nil)
if err != nil {
return err
}
if _, err := txn.Put("user:42:name", []byte("Jane"), nil); err != nil {
_ = txn.Rollback()
return err
}
if _, err := txn.Incr("user:42:visits", 1); err != nil {
_ = txn.Rollback()
return err
}
result, err := txn.Commit()
if err != nil {
_ = txn.Rollback() // idempotent
return err
}
fmt.Printf("committed %d ops at index %d\n", result.OpCount, result.CommitIndex)

scan, mget, JsonGet, JsonSet, JsonDel, and History are not supported inside a transaction and return ErrTxnUnsupportedOp. Server caps: 256 ops per transaction, 1 MiB total payload.

result, _ := client.KV.Scan("user:", nil)
for _, entry := range result.Entries {
fmt.Printf("%s = %s\n", entry.Key, entry.Value)
}
// Paginated
limit := uint32(100)
result, _ := client.KV.Scan("user:", &flo.ScanOptions{Limit: &limit})
for result.HasMore {
result, _ = client.KV.Scan("user:", &flo.ScanOptions{Cursor: result.Cursor})
}
entries, _ := client.KV.History("key", nil)
for _, e := range entries {
fmt.Printf("v%d at %d: %s\n", e.Version, e.Timestamp, e.Value)
}
seq, _ := client.Queue.Enqueue("tasks", payload, &flo.EnqueueOptions{
Priority: 10,
DedupKey: "task-123",
})
blockMS := uint32(30000)
result, _ := client.Queue.Dequeue("tasks", 10, &flo.DequeueOptions{
BlockMS: &blockMS,
})
for _, msg := range result.Messages {
fmt.Printf("seq=%d: %s\n", msg.Seq, msg.Payload)
}
client.Queue.Ack("tasks", []uint64{msg.Seq}, nil)
client.Queue.Nack("tasks", []uint64{msg.Seq}, &flo.NackOptions{ToDLQ: true})
result, _ := client.Queue.DLQList("tasks", nil)
client.Queue.DLQRequeue("tasks", seqs, nil)
result, _ := client.Queue.Peek("tasks", 10, nil) // No lease
client.Queue.Touch("tasks", []uint64{msg.Seq}, nil) // Extend lease
// Append
client.Stream.Append("events", []byte(`{"event":"click"}`), nil)
// Append with headers
client.Stream.Append("events", []byte(`{"event":"click"}`), &flo.StreamAppendOptions{
Headers: map[string]string{"content-type": "application/json", "source": "web"},
})
// Read
records, _ := client.Stream.Read("events", nil)
// Access record fields
for _, rec := range records.Records {
fmt.Println(rec.Stream) // stream name (e.g. "events")
fmt.Println(rec.Payload) // raw bytes
fmt.Println(rec.Headers) // map[string]string or nil
fmt.Println(rec.ID) // StreamID{TimestampMS, Sequence}
}
// Consumer groups
count := uint32(10)
records, _ = client.Stream.GroupRead("events", "processors", "worker-1",
&flo.StreamGroupReadOptions{Count: &count},
)
client.Stream.GroupAck("events", "processors", []flo.StreamID{
{Sequence: 1, TimestampMS: 1700000000000},
}, nil)

The recommended way to consume streams is the StreamWorker, created from a connected client. It handles consumer group join, polling, concurrency, ack/nack, and reconnection automatically:

worker, _ := client.NewStreamWorker(flo.StreamWorkerOptions{
Stream: "events",
Group: "processors",
Concurrency: 5,
BatchSize: 10,
BlockMS: 30000,
}, func(sctx *flo.StreamContext) error {
var data map[string]interface{}
sctx.Into(&data)
fmt.Printf("Stream: %s, ID: %v\n", sctx.Stream(), sctx.StreamID())
fmt.Printf("Headers: %v\n", sctx.Headers())
return process(data)
})
defer worker.Close()
worker.Start(ctx)
OptionTypeDefaultDescription
StreamstringSingle stream name (shorthand)
Streams[]stringMultiple streams (merged with Stream)
Groupstring"default"Consumer group name
ConsumerstringautoConsumer ID within the group
Concurrencyint10Max concurrent handlers
BatchSizeuint3210Records per poll
BlockMSuint3230000Long-poll timeout (ms)
MessageTimeouttime.Duration5mMax handler duration

The handler receives a *StreamContext with convenience accessors:

func handler(sctx *flo.StreamContext) error {
sctx.Payload() // []byte
sctx.StreamID() // StreamID
sctx.Stream() // stream name
sctx.Headers() // map[string]string
sctx.Namespace() // namespace
sctx.Group() // consumer group
sctx.Consumer() // consumer ID
sctx.Record() // full StreamRecord
sctx.Into(&v) // JSON unmarshal
return nil
}

Records are auto-acked on handler success and auto-nacked on error. Connection errors trigger automatic reconnect and consumer group re-join.

w, _ := client.NewActionWorker(flo.ActionWorkerOptions{Concurrency: 10})
defer w.Close()
w.MustRegisterAction("process-order", func(actx *flo.ActionContext) ([]byte, error) {
var input map[string]interface{}
actx.Into(&input)
return actx.Bytes(map[string]string{"status": "done"})
})
w.Start(ctx)

client.Workflow manages the full lifecycle of workflow definitions and runs.

Deploy or update a workflow definition safely — calling this on every app boot is idiomatic:

r, err := client.Workflow.SyncBytes([]byte(yamlString), nil)
// r.Name, r.Version, r.Action → "created" | "updated" | "unchanged"
import "encoding/json"
// Start a run
input, _ := json.Marshal(map[string]interface{}{
"orderId": "ORD-123",
"amount": 99.99,
})
runID, err := client.Workflow.Start("process-order", input, nil)
// Get status
s, err := client.Workflow.Status(runID, nil)
fmt.Println(s.RunID) // string
fmt.Println(s.Workflow) // workflow name
fmt.Println(s.Version) // version string
fmt.Println(s.Status) // "pending"|"running"|"waiting"|"completed"|"failed"|...
fmt.Println(s.CurrentStep) // current or last step name
// s.Input []byte — raw input bytes
// s.CreatedAt int64 — epoch ms
// s.StartedAt *int64 — nil if not yet started
// s.CompletedAt *int64 — nil if not yet completed
// s.WaitSignal *string — signal type being waited for, or nil
// Cancel a run
err = client.Workflow.Cancel(runID, nil)

Deliver external events to a waiting workflow:

sigData, _ := json.Marshal(map[string]interface{}{
"approved": true,
"approver": "manager@corp.com",
})
err = client.Workflow.Signal(runID, "approval_decision", sigData, nil)

History, ListRuns, and ListDefinitions return raw binary and are parsed with helper functions (see the full example for reference parsers):

// Run event history (binary response)
data, err := client.Workflow.History(runID, nil)
events := parseHistory(data) // []HistoryEvent{Type, Detail, Timestamp}
// List runs for a workflow
limit := 50
data, err = client.Workflow.ListRuns("process-order", &flo.WorkflowListRunsOptions{Limit: limit})
runs := parseListRuns(data) // []RunEntry{RunID, Workflow, Status, CreatedAt}
// List all registered definitions
data, err = client.Workflow.ListDefinitions(nil)
defs := parseListDefinitions(data) // []DefinitionEntry{Name, Version, CreatedAt}
// Download a definition's YAML
yamlBytes, err := client.Workflow.GetDefinition("process-order", nil)
// Pause new runs (existing runs continue)
err = client.Workflow.Disable("process-order", nil)
// Resume
err = client.Workflow.Enable("process-order", nil)

See examples/workflows/main.go and examples/action_worker/main.go for a complete walkthrough covering sync, signals, signal timeouts, outcome-based routing, history, and cancellation.

if flo.IsNotFound(err) { /* Key not found */ }
if flo.IsConflict(err) { /* CAS conflict */ }
if flo.IsBadRequest(err) { /* Invalid params */ }
if flo.IsOverloaded(err) { /* Retry later */ }

The Go client is thread-safe. Multiple goroutines can use the same client instance.