Python SDK
Installation
Section titled “Installation”pip install floQuick Start
Section titled “Quick Start”import asynciofrom flo import FloClient
async def main(): async with FloClient("localhost:9000") as client: # KV await client.kv.put("user:123", b"John Doe") value = await client.kv.get("user:123") print(f"Got: {value.decode()}")
# Queue seq = await client.queue.enqueue("tasks", b'{"task": "process"}') result = await client.queue.dequeue("tasks", 10) for msg in result.messages: await client.queue.ack("tasks", [msg.seq])
# Stream await client.stream.append("events", b'{"event": "login"}') result = await client.stream.read("events") for rec in result.records: print(rec.payload)
asyncio.run(main())KV Operations
Section titled “KV Operations”from flo import GetOptions, PutOptions, ScanOptions, HistoryOptions
# Getvalue = await client.kv.get("key")
# Blocking getvalue = await client.kv.get("key", GetOptions(block_ms=5000))
# Put with TTLawait client.kv.put("session:abc", b"data", PutOptions(ttl_seconds=3600))
# Put with CASawait client.kv.put("counter", b"2", PutOptions(cas_version=1))
# Conditional writesawait client.kv.put("key", b"value", PutOptions(if_not_exists=True))
# Deleteawait client.kv.delete("key")
# Scan with paginationresult = await client.kv.scan("user:", ScanOptions(limit=100))while result.has_more: result = await client.kv.scan("user:", ScanOptions(cursor=result.cursor))
# Version historyhistory = await client.kv.history("user:123", HistoryOptions(limit=10))Queue Operations
Section titled “Queue Operations”from flo import EnqueueOptions, DequeueOptions, NackOptions
# Enqueue with priority and delayseq = await client.queue.enqueue("tasks", payload, EnqueueOptions(priority=10, delay_ms=60000))
# Dequeue with long pollingresult = await client.queue.dequeue("tasks", 10, DequeueOptions(block_ms=30000))
# Process and ack/nackfor msg in result.messages: try: process(msg.payload) await client.queue.ack("tasks", [msg.seq]) except Exception: await client.queue.nack("tasks", [msg.seq], NackOptions(to_dlq=True))
# DLQ operationsresult = await client.queue.dlq_list("tasks")await client.queue.dlq_requeue("tasks", [msg.seq for msg in result.messages])
# Peek (no lease) and Touch (extend lease)result = await client.queue.peek("tasks", 5)await client.queue.touch("tasks", [msg.seq])Stream Operations
Section titled “Stream Operations”from flo import StreamReadOptions, StreamID, StreamGroupReadOptions
# Appendresult = await client.stream.append("events", b'{"event": "click"}')
# Read from a specific positionresult = await client.stream.read("events", StreamReadOptions(start=StreamID(timestamp_ms=1700000000000, sequence=0), count=10))
# Read from tail (latest records)result = await client.stream.read("events", StreamReadOptions(tail=True, count=10))
# Blocking read (long polling)result = await client.stream.read("events", StreamReadOptions(tail=True, count=10, block_ms=30000))
# Consumer groupsawait client.stream.group_join("events", "processors", "worker-1")result = await client.stream.group_read("events", "processors", "worker-1", StreamGroupReadOptions(count=10, block_ms=30000))await client.stream.group_ack("events", "processors", [r.id for r in result.records])Action & Worker Operations
Section titled “Action & Worker Operations”from flo import ActionType, ActionRegisterOptions, ActionInvokeOptions
# Register and invoke actionsawait client.action.register("process-image", ActionType.USER, ActionRegisterOptions(timeout_ms=60000, max_retries=3))
result = await client.action.invoke("process-image", payload, ActionInvokeOptions(priority=10, idempotency_key="order-123"))
status = await client.action.status(result.run_id)ActionWorker (high-level)
Section titled “ActionWorker (high-level)”The recommended way to execute actions is the ActionWorker, created from a connected client. It handles registration, polling, concurrency, and heartbeats automatically:
from flo import FloClient, ActionContext
async with FloClient("localhost:9000", namespace="myapp") as client: worker = client.new_action_worker(concurrency=5)
@worker.action("process-image") async def process_image(ctx: ActionContext) -> bytes: data = ctx.json() result = await do_processing(data) return ctx.to_bytes({"status": "done", "result": result})
await worker.start()Features
Section titled “Features”- Native asyncio support
- Full type annotations for IDE support
- Context manager (
async with) for automatic cleanup - All operations support namespace overrides