Buffered (Fire-and-Forget) Writes

How to keep the agent’s user-facing response from blocking on Neo4j round-trips. v0.4 introduces an opt-in fire-and-forget write API.

Default behavior is sync — every write awaits a Neo4j round-trip. In production, that means each record_tool_call, add_step, custom write, etc. delays the agent’s reply by the round-trip latency. With write_mode="buffered", writes go through client.buffered.submit(…​), return immediately, and a background drainer pushes them to Neo4j.

Goal

Buffered write architecture: agent submits to queue and returns immediately; background drainer pushes to Neo4j

The agent responds before persistence completes:

async def agent_turn(client, content):
    # Submit the write to the buffer; return immediately.
    await client.buffered.submit(
        "MERGE (m:Message {id: $id}) SET m.content = $content",
        {"id": message_id, "content": content},
    )
    # The agent's response is not blocked on Neo4j.
    return generate_response(content)


async with MemoryClient(settings) as client:
    response = await agent_turn(client, "Hello")
    # Drain the queue at end of session / before shutdown.
    await client.flush()

Steps

1. Enable buffered mode

settings = MemorySettings(...)
settings.memory.write_mode = "buffered"
settings.memory.max_pending = 200   # back-pressure threshold

When max_pending writes are queued, submit() blocks until a worker drains an item. This is back-pressure — preferable to dropping writes silently or letting memory grow unbounded.

2. Submit fire-and-forget writes

await client.buffered.submit(
    """
    MATCH (m:Message {id: $message_id})
    MATCH (e:Entity {name: $name})
    MERGE (m)-[:MENTIONS]->(e)
    """,
    {"message_id": message_id, "name": "Anthem"},
)

submit returns as soon as the work is queued. The drainer task pulls from the queue and runs the actual execute_write against Neo4j.

In write_mode="sync" (default) submit is a thin passthrough — the write awaits inline. This means tests that opt into buffered patterns work in both modes without changes.

3. Drain the queue

client.flush() blocks until every queued write has been persisted:

# At end of session, end of request handler, or before metrics flush.
await client.flush()

client.wait_for_pending() is an alias.

client.close() (and async with on its way out) automatically drains the queue before disconnecting from Neo4j, so writes are never silently lost on a clean shutdown.

4. Inspect background errors

Errors during background writes are captured rather than raised into the agent’s hot path:

errors = client.write_errors
if errors:
    for err in errors:
        log.warning("Buffered write failed: %s — %s", err.query[:32], err.error)

Tradeoffs

Concern Behavior

API surface

Fire-and-forget only. Memory APIs that return values (e.g. add_step returning a ReasoningStep) still construct their Pydantic model synchronously and persist inline. Only callers who want to push raw Cypher into the background reach for client.buffered.submit(…​).

Order

Writes drain in submission order (single drainer task). If you submit two writes that depend on each other (one creates a node, the other links to it), they will execute in order — but they are not transactional with each other.

Crash safety

Queued-but-not-yet-drained writes are lost if the process is killed before flush() runs. close() drains before disconnecting; async with ensures close() runs even on exception.

Errors

Captured into client.write_errors. An optional on_error callback in BufferedWriter (not currently exposed via settings) can also push errors out for alerting.

See Also

  • Multi-Tenant Memory

  • examples/buffered-writes/ — runnable example with a 50-turn timing comparison.