Record and Query Reasoning Traces

How to record agent reasoning traces and use them to improve future decisions through the context graph.

Overview

Reasoning traces capture the step-by-step thought process of agents, including tool calls, decisions, and outcomes. Stored in the context graph, traces enable agents to learn from past experiences and improve over time.

Reasoning Traces in the Context Graph
                    Context Graph
                    ─────────────
            ┌─────────────────────────┐
            │    (ReasoningTrace)     │
            │    "Product search"     │
            └───────────┬─────────────┘
                        │
        ┌───────────────┼───────────────┐
        │               │               │
        ▼               ▼               ▼
  [:HAS_STEP]     [:HAS_STEP]     [:HAS_STEP]
        │               │               │
        ▼               ▼               ▼
  ┌───────────┐   ┌───────────┐   ┌───────────┐
  │ Step 1:   │   │ Step 2:   │   │ Step 3:   │
  │ Search    │──▶│ Filter    │──▶│ Recommend │
  │ products  │   │ by prefs  │   │ top 3     │
  └───────────┘   └───────────┘   └───────────┘
        │               │               │
        ▼               ▼               ▼
  [:USED_TOOL]    [:USED_TOOL]    [:USED_TOOL]
        │               │               │
        ▼               ▼               ▼
  ┌───────────┐   ┌───────────┐   ┌───────────┐
  │search_    │   │get_       │   │rank_      │
  │products   │   │preferences│   │products   │
  └───────────┘   └───────────┘   └───────────┘

Agent can query: "How did I successfully recommend
                  running shoes before?"

Prerequisites

  • Neo4j database running

  • neo4j-agent-memory installed

  • MemoryClient configured

Record Reasoning Traces

Start a Trace

Begin recording when an agent task starts:

from neo4j_agent_memory import MemoryClient

client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

# Start a new reasoning trace
trace = await client.reasoning.start_trace(
    task="Find running shoes matching customer preferences",
    user_id="CUST-12345",
    session_id="shopping-session-001",
    metadata={
        "agent_version": "2.1.0",
        "model": "gpt-4o",
        "trigger": "user_request",
    },
)

print(f"Started trace: {trace.id}")

Record Reasoning Steps

Capture each step of the agent’s reasoning:

# Step 1: Search for products
step1 = await client.reasoning.add_step(
    trace_id=trace.id,
    description="Searching product catalog for running shoes",
    reasoning="Customer asked for running shoes, starting with catalog search",
)

# Step 2: Get preferences
step2 = await client.reasoning.add_step(
    trace_id=trace.id,
    description="Retrieved customer preferences for brand and style",
    reasoning="Need to personalize results based on stored preferences",
    depends_on=step1.id,  # Link steps in sequence
)

# Step 3: Filter and rank
step3 = await client.reasoning.add_step(
    trace_id=trace.id,
    description="Filtered products by Nike brand preference, ranked by rating",
    reasoning="Customer prefers Nike, so boosting those results",
    depends_on=step2.id,
)

Record Tool Calls

Track tools used during reasoning:

# Record tool call for product search
tool_call = await client.reasoning.add_tool_call(
    step_id=step1.id,
    tool_name="search_products",
    arguments={
        "query": "running shoes",
        "category": "footwear",
        "limit": 20,
    },
    result={
        "products_found": 20,
        "top_brands": ["Nike", "Adidas", "Asics"],
    },
    duration_ms=145,
    success=True,
)

# Record preference retrieval
await client.reasoning.add_tool_call(
    step_id=step2.id,
    tool_name="get_user_preferences",
    arguments={"user_id": "CUST-12345", "categories": ["brand", "style"]},
    result={
        "preferences": [
            {"category": "brand", "value": "Nike"},
            {"category": "style", "value": "minimalist"},
        ]
    },
    duration_ms=32,
    success=True,
)

Complete the Trace

Mark trace as complete with outcome:

# Successful completion
await client.reasoning.complete_trace(
    trace_id=trace.id,
    outcome="success",
    result={
        "recommendations": [
            {"name": "Nike Pegasus 40", "sku": "NKE-PEG40-001"},
            {"name": "Nike Air Zoom", "sku": "NKE-AZ-002"},
            {"name": "Nike Free Run", "sku": "NKE-FR-003"},
        ],
        "customer_response": "positive",
    },
    metadata={
        "total_steps": 3,
        "total_tool_calls": 3,
        "execution_time_ms": 850,
    },
)

# Or record failure
await client.reasoning.complete_trace(
    trace_id=trace.id,
    outcome="failure",
    error="No products found matching criteria",
    metadata={
        "failure_reason": "inventory_empty",
        "category_searched": "running_shoes",
    },
)

Query Past Traces

Find Similar Traces

Find traces similar to current task for learning:

# Find traces for similar product recommendations
similar_traces = await client.reasoning.get_similar_traces(
    task="Recommend running shoes for marathon training",
    limit=5,
    success_only=True,  # Only successful traces
)

for trace in similar_traces:
    print(f"Task: {trace.task}")
    print(f"  Outcome: {trace.outcome}")
    print(f"  Similarity: {trace.similarity:.2f}")
    print(f"  Steps: {trace.step_count}")

Learn from Past Successes

Use past traces to inform current reasoning:

Ecommerce Product Recommendation
async def get_recommendation_strategy(query: str, user_id: str) -> dict:
    """Learn from past successful recommendations."""

    # Find similar successful traces
    traces = await client.reasoning.get_similar_traces(
        task=f"Recommend products: {query}",
        user_id=user_id,
        limit=3,
        success_only=True,
    )

    strategies = []
    for trace in traces:
        # Get the steps that led to success
        steps = await client.reasoning.get_trace_steps(trace.id)

        strategies.append({
            "task": trace.task,
            "steps": [s.description for s in steps],
            "tools_used": [t.tool_name for s in steps for t in s.tool_calls],
            "result": trace.result,
        })

    return {
        "similar_tasks": len(traces),
        "successful_strategies": strategies,
    }

# Use in agent prompt
strategy = await get_recommendation_strategy(
    query="comfortable running shoes",
    user_id="CUST-12345",
)

system_prompt = f"""
You are a product recommendation agent.

Based on past successful recommendations for similar queries:
{json.dumps(strategy['successful_strategies'], indent=2)}

Use similar approaches for the current request.
"""
Financial Advisory Decision
async def get_advisory_precedents(topic: str, client_id: str) -> list:
    """Find past advisory decisions on similar topics."""

    traces = await client.reasoning.get_similar_traces(
        task=f"Advisory consultation: {topic}",
        user_id=client_id,
        limit=5,
        success_only=True,
    )

    precedents = []
    for trace in traces:
        steps = await client.reasoning.get_trace_steps(trace.id)

        precedents.append({
            "topic": trace.task,
            "recommendation": trace.result.get("recommendation"),
            "rationale": [s.reasoning for s in steps],
            "client_response": trace.result.get("client_response"),
        })

    return precedents

# Use for similar advisory scenarios
precedents = await get_advisory_precedents(
    topic="portfolio rebalancing for moderate-growth profile",
    client_id="CL-78901",
)

Analyze Tool Usage

Understand which tools are most effective:

# Get tool statistics
stats = await client.reasoning.get_tool_statistics(
    user_id="CUST-12345",
    time_range_days=30,
)

print("Tool Usage Statistics:")
for tool_name, tool_stats in stats.items():
    print(f"\n{tool_name}:")
    print(f"  Total calls: {tool_stats['total_calls']}")
    print(f"  Success rate: {tool_stats['success_rate']:.1%}")
    print(f"  Avg duration: {tool_stats['avg_duration_ms']:.0f}ms")
    print(f"  Used in successful traces: {tool_stats['success_correlation']:.1%}")

Filter Traces by Criteria

# Get recent traces for a user
recent_traces = await client.reasoning.get_traces(
    user_id="CUST-12345",
    limit=20,
    after=datetime.now() - timedelta(days=7),
)

# Get traces by outcome
failed_traces = await client.reasoning.get_traces(
    user_id="CUST-12345",
    outcome="failure",
    limit=10,
)

# Get traces with specific tool usage
search_traces = await client.reasoning.get_traces(
    tools_used=["search_products", "get_preferences"],
    success_only=True,
    limit=10,
)

Integrate with Agent Frameworks

PydanticAI Integration

Automatically record traces with PydanticAI:

from pydantic_ai import Agent, RunContext
from dataclasses import dataclass

@dataclass
class AgentDeps:
    memory_client: MemoryClient
    user_id: str
    current_trace_id: str | None = None

agent = Agent(
    "openai:gpt-4o",
    deps_type=AgentDeps,
)

@agent.tool
async def search_products(
    ctx: RunContext[AgentDeps],
    query: str,
    category: str,
) -> str:
    """Search for products."""

    # Record the tool call in the trace
    if ctx.deps.current_trace_id:
        # Get current step (you'd track this in deps)
        await ctx.deps.memory_client.reasoning.add_tool_call(
            step_id=ctx.deps.current_step_id,
            tool_name="search_products",
            arguments={"query": query, "category": category},
            result={"status": "searching"},
            success=True,
        )

    # Actual tool implementation
    products = await do_product_search(query, category)
    return json.dumps(products)

async def run_with_tracing(user_query: str, user_id: str):
    """Run agent with automatic tracing."""

    # Start trace
    trace = await memory_client.reasoning.start_trace(
        task=user_query,
        user_id=user_id,
    )

    deps = AgentDeps(
        memory_client=memory_client,
        user_id=user_id,
        current_trace_id=trace.id,
    )

    try:
        result = await agent.run(user_query, deps=deps)

        # Complete trace on success
        await memory_client.reasoning.complete_trace(
            trace_id=trace.id,
            outcome="success",
            result={"response": result.data},
        )

        return result

    except Exception as e:
        # Record failure
        await memory_client.reasoning.complete_trace(
            trace_id=trace.id,
            outcome="failure",
            error=str(e),
        )
        raise

LangChain Integration

from langchain.callbacks.base import BaseCallbackHandler

class ReasoningTraceCallback(BaseCallbackHandler):
    """LangChain callback for reasoning traces."""

    def __init__(self, memory_client: MemoryClient, user_id: str):
        self.memory_client = memory_client
        self.user_id = user_id
        self.trace_id = None
        self.current_step_id = None

    async def on_chain_start(self, serialized, inputs, **kwargs):
        """Start trace when chain begins."""
        self.trace = await self.memory_client.reasoning.start_trace(
            task=str(inputs),
            user_id=self.user_id,
        )
        self.trace_id = self.trace.id

    async def on_tool_start(self, serialized, input_str, **kwargs):
        """Record when a tool is called."""
        tool_name = serialized.get("name", "unknown")

        step = await self.memory_client.reasoning.add_step(
            trace_id=self.trace_id,
            description=f"Calling {tool_name}",
        )
        self.current_step_id = step.id

    async def on_tool_end(self, output, **kwargs):
        """Record tool result."""
        await self.memory_client.reasoning.add_tool_call(
            step_id=self.current_step_id,
            tool_name=kwargs.get("name", "unknown"),
            arguments=kwargs.get("input", {}),
            result={"output": str(output)[:500]},
            success=True,
        )

    async def on_chain_end(self, outputs, **kwargs):
        """Complete trace when chain finishes."""
        await self.memory_client.reasoning.complete_trace(
            trace_id=self.trace_id,
            outcome="success",
            result=outputs,
        )

# Usage
callback = ReasoningTraceCallback(memory_client, user_id)
chain.invoke(input, callbacks=[callback])

Use Traces for Agent Improvement

Self-Reflection Pattern

Agent reviews its past performance:

async def generate_self_reflection(user_id: str, days: int = 7) -> str:
    """Generate insights from recent reasoning traces."""

    # Get recent traces
    traces = await client.reasoning.get_traces(
        user_id=user_id,
        after=datetime.now() - timedelta(days=days),
        limit=50,
    )

    # Analyze patterns
    success_count = sum(1 for t in traces if t.outcome == "success")
    failure_count = sum(1 for t in traces if t.outcome == "failure")

    # Get common failure reasons
    failures = [t for t in traces if t.outcome == "failure"]
    failure_reasons = [t.error for t in failures if t.error]

    # Get most effective tool combinations
    successful_traces = [t for t in traces if t.outcome == "success"]
    tool_patterns = []
    for trace in successful_traces[:10]:
        steps = await client.reasoning.get_trace_steps(trace.id)
        tools = [t.tool_name for s in steps for t in s.tool_calls]
        tool_patterns.append(tools)

    reflection = f"""
    ## Performance Summary (Last {days} Days)

    - Total tasks: {len(traces)}
    - Success rate: {success_count / len(traces):.1%}

    ### Common Failure Patterns
    {chr(10).join(f'- {r}' for r in set(failure_reasons)[:5])}

    ### Successful Tool Combinations
    {chr(10).join(f'- {" → ".join(p)}' for p in tool_patterns[:5])}

    ### Recommendations
    - Focus on improving: {failure_reasons[0] if failure_reasons else 'N/A'}
    - Most reliable approach: {' → '.join(tool_patterns[0]) if tool_patterns else 'N/A'}
    """

    return reflection

A/B Testing Strategies

Compare different reasoning approaches:

async def compare_strategies(
    task_pattern: str,
    strategy_a: str,
    strategy_b: str,
) -> dict:
    """Compare success rates of different strategies."""

    # Get traces for each strategy
    traces_a = await client.reasoning.get_traces(
        metadata_filter={"strategy": strategy_a},
        limit=100,
    )

    traces_b = await client.reasoning.get_traces(
        metadata_filter={"strategy": strategy_b},
        limit=100,
    )

    # Calculate metrics
    success_rate_a = sum(1 for t in traces_a if t.outcome == "success") / len(traces_a)
    success_rate_b = sum(1 for t in traces_b if t.outcome == "success") / len(traces_b)

    avg_steps_a = sum(t.step_count for t in traces_a) / len(traces_a)
    avg_steps_b = sum(t.step_count for t in traces_b) / len(traces_b)

    return {
        "strategy_a": {
            "name": strategy_a,
            "success_rate": success_rate_a,
            "avg_steps": avg_steps_a,
            "sample_size": len(traces_a),
        },
        "strategy_b": {
            "name": strategy_b,
            "success_rate": success_rate_b,
            "avg_steps": avg_steps_b,
            "sample_size": len(traces_b),
        },
        "winner": strategy_a if success_rate_a > success_rate_b else strategy_b,
    }

Best Practices

1. Record Sufficient Detail

Include enough context for future learning:

# Good: Detailed step with reasoning
await client.reasoning.add_step(
    trace_id=trace.id,
    description="Filtered 20 products down to 5 matching Nike brand preference",
    reasoning="Customer has strong Nike preference (confidence: 0.95). Filtering to boost relevance.",
    metadata={
        "input_count": 20,
        "output_count": 5,
        "filter_criteria": ["brand=Nike"],
    },
)

# Avoid: Vague step
await client.reasoning.add_step(
    trace_id=trace.id,
    description="Filtered products",
)

2. Always Complete Traces

Ensure traces are marked complete even on errors:

trace = await client.reasoning.start_trace(task=task, user_id=user_id)

try:
    result = await execute_task()
    await client.reasoning.complete_trace(
        trace_id=trace.id,
        outcome="success",
        result=result,
    )
except Exception as e:
    await client.reasoning.complete_trace(
        trace_id=trace.id,
        outcome="failure",
        error=str(e),
    )
    raise
finally:
    # Ensure trace is never left incomplete
    pass

3. Use Structured Outcomes

Standardize outcome categories:

OUTCOME_TYPES = {
    "success": "Task completed successfully",
    "partial_success": "Task completed with limitations",
    "failure": "Task failed",
    "timeout": "Task exceeded time limit",
    "cancelled": "Task cancelled by user",
    "error": "Unexpected error occurred",
}

await client.reasoning.complete_trace(
    trace_id=trace.id,
    outcome="partial_success",
    result={
        "completed": ["product_search", "preference_matching"],
        "skipped": ["inventory_check"],
        "reason": "inventory_service_unavailable",
    },
)

4. Prune Old Traces

Remove old traces to manage storage:

# Delete traces older than 90 days
await client.reasoning.delete_traces(
    before=datetime.now() - timedelta(days=90),
)

# Keep successful traces longer
await client.reasoning.delete_traces(
    before=datetime.now() - timedelta(days=30),
    outcome="failure",  # Only delete failures
)