Go SDK
Installation
Section titled “Installation”go get github.com/floruntime/flo-goQuick Start
Section titled “Quick Start”package main
import ( "fmt" "log" flo "github.com/floruntime/flo-go")
func main() { client := flo.NewClient("localhost:9000", flo.WithNamespace("myapp"), ) if err := client.Connect(); err != nil { log.Fatal(err) } defer client.Close()
// KV client.KV.Put("user:123", []byte("John Doe"), nil) value, _ := client.KV.Get("user:123", nil) fmt.Printf("Got: %s\n", value)
// Queue client.Queue.Enqueue("tasks", []byte(`{"task":"process"}`), nil) result, _ := client.Queue.Dequeue("tasks", 10, nil) for _, msg := range result.Messages { client.Queue.Ack("tasks", []uint64{msg.Seq}, nil) }
// Stream client.Stream.Append("events", []byte(`{"event":"login"}`), nil) records, _ := client.Stream.Read("events", nil) for _, rec := range records.Records { fmt.Printf("Event: %s\n", rec.Payload) }}Client Options
Section titled “Client Options”client := flo.NewClient("localhost:9000", flo.WithNamespace("default"), // Default namespace flo.WithTimeout(5 * time.Second), // Connection/operation timeout flo.WithDebug(true), // Enable debug logging)KV Operations
Section titled “KV Operations”// Simple getvalue, err := client.KV.Get("key", nil)
// Blocking get (wait for key to appear)blockMS := uint32(5000)value, err := client.KV.Get("key", &flo.GetOptions{BlockMS: &blockMS})// Simple puterr := client.KV.Put("key", []byte("value"), nil)
// With TTLttl := uint64(3600)err := client.KV.Put("key", []byte("value"), &flo.PutOptions{TTLSeconds: &ttl})
// With CASversion := uint64(1)err := client.KV.Put("key", []byte("new"), &flo.PutOptions{CASVersion: &version})if flo.IsConflict(err) { // Version mismatch}
// Conditionalerr := client.KV.Put("key", []byte("value"), &flo.PutOptions{IfNotExists: true})Delete
Section titled “Delete”err := client.KV.Delete("key", nil)result, _ := client.KV.Scan("user:", nil)for _, entry := range result.Entries { fmt.Printf("%s = %s\n", entry.Key, entry.Value)}
// Paginatedlimit := uint32(100)result, _ := client.KV.Scan("user:", &flo.ScanOptions{Limit: &limit})for result.HasMore { result, _ = client.KV.Scan("user:", &flo.ScanOptions{Cursor: result.Cursor})}History
Section titled “History”entries, _ := client.KV.History("key", nil)for _, e := range entries { fmt.Printf("v%d at %d: %s\n", e.Version, e.Timestamp, e.Value)}Queue Operations
Section titled “Queue Operations”Enqueue
Section titled “Enqueue”seq, _ := client.Queue.Enqueue("tasks", payload, &flo.EnqueueOptions{ Priority: 10, DedupKey: "task-123",})Dequeue
Section titled “Dequeue”blockMS := uint32(30000)result, _ := client.Queue.Dequeue("tasks", 10, &flo.DequeueOptions{ BlockMS: &blockMS,})for _, msg := range result.Messages { fmt.Printf("seq=%d: %s\n", msg.Seq, msg.Payload)}Ack / Nack
Section titled “Ack / Nack”client.Queue.Ack("tasks", []uint64{msg.Seq}, nil)client.Queue.Nack("tasks", []uint64{msg.Seq}, &flo.NackOptions{ToDLQ: true})result, _ := client.Queue.DLQList("tasks", nil)client.Queue.DLQRequeue("tasks", seqs, nil)Peek / Touch
Section titled “Peek / Touch”result, _ := client.Queue.Peek("tasks", 10, nil) // No leaseclient.Queue.Touch("tasks", []uint64{msg.Seq}, nil) // Extend leaseStream Operations
Section titled “Stream Operations”// Appendclient.Stream.Append("events", []byte(`{"event":"click"}`), nil)
// Readrecords, _ := client.Stream.Read("events", nil)
// Consumer groupscount := uint32(10)records, _ = client.Stream.GroupRead("events", "processors", "worker-1", &flo.StreamGroupReadOptions{Count: &count},)client.Stream.GroupAck("events", "processors", []flo.StreamID{ {Sequence: 1, TimestampMS: 1700000000000},}, nil)Worker Pattern
Section titled “Worker Pattern”w, _ := client.NewActionWorker(flo.ActionWorkerOptions{Concurrency: 10})defer w.Close()
w.MustRegisterAction("process-order", func(actx *flo.ActionContext) ([]byte, error) { var input map[string]interface{} actx.Into(&input) return actx.Bytes(map[string]string{"status": "done"})})
w.Start(ctx)Workflow Operations
Section titled “Workflow Operations”client.Workflow manages the full lifecycle of workflow definitions and runs.
Declarative Sync
Section titled “Declarative Sync”Deploy or update a workflow definition safely — calling this on every app boot is idiomatic:
r, err := client.Workflow.SyncBytes([]byte(yamlString), nil)// r.Name, r.Version, r.Action → "created" | "updated" | "unchanged"Start and Monitor Runs
Section titled “Start and Monitor Runs”import "encoding/json"
// Start a runinput, _ := json.Marshal(map[string]interface{}{ "orderId": "ORD-123", "amount": 99.99,})runID, err := client.Workflow.Start("process-order", input, nil)
// Get statuss, err := client.Workflow.Status(runID, nil)fmt.Println(s.RunID) // stringfmt.Println(s.Workflow) // workflow namefmt.Println(s.Version) // version stringfmt.Println(s.Status) // "pending"|"running"|"waiting"|"completed"|"failed"|...fmt.Println(s.CurrentStep) // current or last step name// s.Input []byte — raw input bytes// s.CreatedAt int64 — epoch ms// s.StartedAt *int64 — nil if not yet started// s.CompletedAt *int64 — nil if not yet completed// s.WaitSignal *string — signal type being waited for, or nil
// Cancel a runerr = client.Workflow.Cancel(runID, nil)Signals
Section titled “Signals”Deliver external events to a waiting workflow:
sigData, _ := json.Marshal(map[string]interface{}{ "approved": true, "approver": "manager@corp.com",})err = client.Workflow.Signal(runID, "approval_decision", sigData, nil)History and Listings
Section titled “History and Listings”History, ListRuns, and ListDefinitions return raw binary and are parsed with helper functions (see the full example for reference parsers):
// Run event history (binary response)data, err := client.Workflow.History(runID, nil)events := parseHistory(data) // []HistoryEvent{Type, Detail, Timestamp}
// List runs for a workflowlimit := 50data, err = client.Workflow.ListRuns("process-order", &flo.WorkflowListRunsOptions{Limit: limit})runs := parseListRuns(data) // []RunEntry{RunID, Workflow, Status, CreatedAt}
// List all registered definitionsdata, err = client.Workflow.ListDefinitions(nil)defs := parseListDefinitions(data) // []DefinitionEntry{Name, Version, CreatedAt}
// Download a definition's YAMLyamlBytes, err := client.Workflow.GetDefinition("process-order", nil)Disable / Enable
Section titled “Disable / Enable”// Pause new runs (existing runs continue)err = client.Workflow.Disable("process-order", nil)
// Resumeerr = client.Workflow.Enable("process-order", nil)Full Example
Section titled “Full Example”See examples/workflows/main.go and examples/action_worker/main.go for a complete walkthrough covering sync, signals, signal timeouts, outcome-based routing, history, and cancellation.
Error Handling
Section titled “Error Handling”if flo.IsNotFound(err) { /* Key not found */ }if flo.IsConflict(err) { /* CAS conflict */ }if flo.IsBadRequest(err) { /* Invalid params */ }if flo.IsOverloaded(err) { /* Retry later */ }Thread Safety
Section titled “Thread Safety”The Go client is thread-safe. Multiple goroutines can use the same client instance.