Record and Query Reasoning Traces
How to record agent reasoning traces and use them to improve future decisions through the context graph.
Overview
Reasoning traces capture the step-by-step thought process of agents, including tool calls, decisions, and outcomes. Stored in the context graph, traces enable agents to learn from past experiences and improve over time.
| Reasoning Traces in the Context Graph |
|---|
|
Prerequisites
-
Neo4j database running
-
neo4j-agent-memoryinstalled -
MemoryClient configured
Record Reasoning Traces
Start a Trace
Begin recording when an agent task starts:
from neo4j_agent_memory import MemoryClient
client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
# Start a new reasoning trace
trace = await client.reasoning.start_trace(
task="Find running shoes matching customer preferences",
user_id="CUST-12345",
session_id="shopping-session-001",
metadata={
"agent_version": "2.1.0",
"model": "gpt-4o",
"trigger": "user_request",
},
)
print(f"Started trace: {trace.id}")
Record Reasoning Steps
Capture each step of the agent’s reasoning:
# Step 1: Search for products
step1 = await client.reasoning.add_step(
trace_id=trace.id,
description="Searching product catalog for running shoes",
reasoning="Customer asked for running shoes, starting with catalog search",
)
# Step 2: Get preferences
step2 = await client.reasoning.add_step(
trace_id=trace.id,
description="Retrieved customer preferences for brand and style",
reasoning="Need to personalize results based on stored preferences",
depends_on=step1.id, # Link steps in sequence
)
# Step 3: Filter and rank
step3 = await client.reasoning.add_step(
trace_id=trace.id,
description="Filtered products by Nike brand preference, ranked by rating",
reasoning="Customer prefers Nike, so boosting those results",
depends_on=step2.id,
)
Record Tool Calls
Track tools used during reasoning:
# Record tool call for product search
tool_call = await client.reasoning.add_tool_call(
step_id=step1.id,
tool_name="search_products",
arguments={
"query": "running shoes",
"category": "footwear",
"limit": 20,
},
result={
"products_found": 20,
"top_brands": ["Nike", "Adidas", "Asics"],
},
duration_ms=145,
success=True,
)
# Record preference retrieval
await client.reasoning.add_tool_call(
step_id=step2.id,
tool_name="get_user_preferences",
arguments={"user_id": "CUST-12345", "categories": ["brand", "style"]},
result={
"preferences": [
{"category": "brand", "value": "Nike"},
{"category": "style", "value": "minimalist"},
]
},
duration_ms=32,
success=True,
)
Complete the Trace
Mark trace as complete with outcome:
# Successful completion
await client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
result={
"recommendations": [
{"name": "Nike Pegasus 40", "sku": "NKE-PEG40-001"},
{"name": "Nike Air Zoom", "sku": "NKE-AZ-002"},
{"name": "Nike Free Run", "sku": "NKE-FR-003"},
],
"customer_response": "positive",
},
metadata={
"total_steps": 3,
"total_tool_calls": 3,
"execution_time_ms": 850,
},
)
# Or record failure
await client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
error="No products found matching criteria",
metadata={
"failure_reason": "inventory_empty",
"category_searched": "running_shoes",
},
)
Query Past Traces
Find Similar Traces
Find traces similar to current task for learning:
# Find traces for similar product recommendations
similar_traces = await client.reasoning.get_similar_traces(
task="Recommend running shoes for marathon training",
limit=5,
success_only=True, # Only successful traces
)
for trace in similar_traces:
print(f"Task: {trace.task}")
print(f" Outcome: {trace.outcome}")
print(f" Similarity: {trace.similarity:.2f}")
print(f" Steps: {trace.step_count}")
Learn from Past Successes
Use past traces to inform current reasoning:
async def get_recommendation_strategy(query: str, user_id: str) -> dict:
"""Learn from past successful recommendations."""
# Find similar successful traces
traces = await client.reasoning.get_similar_traces(
task=f"Recommend products: {query}",
user_id=user_id,
limit=3,
success_only=True,
)
strategies = []
for trace in traces:
# Get the steps that led to success
steps = await client.reasoning.get_trace_steps(trace.id)
strategies.append({
"task": trace.task,
"steps": [s.description for s in steps],
"tools_used": [t.tool_name for s in steps for t in s.tool_calls],
"result": trace.result,
})
return {
"similar_tasks": len(traces),
"successful_strategies": strategies,
}
# Use in agent prompt
strategy = await get_recommendation_strategy(
query="comfortable running shoes",
user_id="CUST-12345",
)
system_prompt = f"""
You are a product recommendation agent.
Based on past successful recommendations for similar queries:
{json.dumps(strategy['successful_strategies'], indent=2)}
Use similar approaches for the current request.
"""
async def get_advisory_precedents(topic: str, client_id: str) -> list:
"""Find past advisory decisions on similar topics."""
traces = await client.reasoning.get_similar_traces(
task=f"Advisory consultation: {topic}",
user_id=client_id,
limit=5,
success_only=True,
)
precedents = []
for trace in traces:
steps = await client.reasoning.get_trace_steps(trace.id)
precedents.append({
"topic": trace.task,
"recommendation": trace.result.get("recommendation"),
"rationale": [s.reasoning for s in steps],
"client_response": trace.result.get("client_response"),
})
return precedents
# Use for similar advisory scenarios
precedents = await get_advisory_precedents(
topic="portfolio rebalancing for moderate-growth profile",
client_id="CL-78901",
)
Analyze Tool Usage
Understand which tools are most effective:
# Get tool statistics
stats = await client.reasoning.get_tool_statistics(
user_id="CUST-12345",
time_range_days=30,
)
print("Tool Usage Statistics:")
for tool_name, tool_stats in stats.items():
print(f"\n{tool_name}:")
print(f" Total calls: {tool_stats['total_calls']}")
print(f" Success rate: {tool_stats['success_rate']:.1%}")
print(f" Avg duration: {tool_stats['avg_duration_ms']:.0f}ms")
print(f" Used in successful traces: {tool_stats['success_correlation']:.1%}")
Filter Traces by Criteria
# Get recent traces for a user
recent_traces = await client.reasoning.get_traces(
user_id="CUST-12345",
limit=20,
after=datetime.now() - timedelta(days=7),
)
# Get traces by outcome
failed_traces = await client.reasoning.get_traces(
user_id="CUST-12345",
outcome="failure",
limit=10,
)
# Get traces with specific tool usage
search_traces = await client.reasoning.get_traces(
tools_used=["search_products", "get_preferences"],
success_only=True,
limit=10,
)
Integrate with Agent Frameworks
PydanticAI Integration
Automatically record traces with PydanticAI:
from pydantic_ai import Agent, RunContext
from dataclasses import dataclass
@dataclass
class AgentDeps:
memory_client: MemoryClient
user_id: str
current_trace_id: str | None = None
agent = Agent(
"openai:gpt-4o",
deps_type=AgentDeps,
)
@agent.tool
async def search_products(
ctx: RunContext[AgentDeps],
query: str,
category: str,
) -> str:
"""Search for products."""
# Record the tool call in the trace
if ctx.deps.current_trace_id:
# Get current step (you'd track this in deps)
await ctx.deps.memory_client.reasoning.add_tool_call(
step_id=ctx.deps.current_step_id,
tool_name="search_products",
arguments={"query": query, "category": category},
result={"status": "searching"},
success=True,
)
# Actual tool implementation
products = await do_product_search(query, category)
return json.dumps(products)
async def run_with_tracing(user_query: str, user_id: str):
"""Run agent with automatic tracing."""
# Start trace
trace = await memory_client.reasoning.start_trace(
task=user_query,
user_id=user_id,
)
deps = AgentDeps(
memory_client=memory_client,
user_id=user_id,
current_trace_id=trace.id,
)
try:
result = await agent.run(user_query, deps=deps)
# Complete trace on success
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
result={"response": result.data},
)
return result
except Exception as e:
# Record failure
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
error=str(e),
)
raise
LangChain Integration
from langchain.callbacks.base import BaseCallbackHandler
class ReasoningTraceCallback(BaseCallbackHandler):
"""LangChain callback for reasoning traces."""
def __init__(self, memory_client: MemoryClient, user_id: str):
self.memory_client = memory_client
self.user_id = user_id
self.trace_id = None
self.current_step_id = None
async def on_chain_start(self, serialized, inputs, **kwargs):
"""Start trace when chain begins."""
self.trace = await self.memory_client.reasoning.start_trace(
task=str(inputs),
user_id=self.user_id,
)
self.trace_id = self.trace.id
async def on_tool_start(self, serialized, input_str, **kwargs):
"""Record when a tool is called."""
tool_name = serialized.get("name", "unknown")
step = await self.memory_client.reasoning.add_step(
trace_id=self.trace_id,
description=f"Calling {tool_name}",
)
self.current_step_id = step.id
async def on_tool_end(self, output, **kwargs):
"""Record tool result."""
await self.memory_client.reasoning.add_tool_call(
step_id=self.current_step_id,
tool_name=kwargs.get("name", "unknown"),
arguments=kwargs.get("input", {}),
result={"output": str(output)[:500]},
success=True,
)
async def on_chain_end(self, outputs, **kwargs):
"""Complete trace when chain finishes."""
await self.memory_client.reasoning.complete_trace(
trace_id=self.trace_id,
outcome="success",
result=outputs,
)
# Usage
callback = ReasoningTraceCallback(memory_client, user_id)
chain.invoke(input, callbacks=[callback])
Use Traces for Agent Improvement
Self-Reflection Pattern
Agent reviews its past performance:
async def generate_self_reflection(user_id: str, days: int = 7) -> str:
"""Generate insights from recent reasoning traces."""
# Get recent traces
traces = await client.reasoning.get_traces(
user_id=user_id,
after=datetime.now() - timedelta(days=days),
limit=50,
)
# Analyze patterns
success_count = sum(1 for t in traces if t.outcome == "success")
failure_count = sum(1 for t in traces if t.outcome == "failure")
# Get common failure reasons
failures = [t for t in traces if t.outcome == "failure"]
failure_reasons = [t.error for t in failures if t.error]
# Get most effective tool combinations
successful_traces = [t for t in traces if t.outcome == "success"]
tool_patterns = []
for trace in successful_traces[:10]:
steps = await client.reasoning.get_trace_steps(trace.id)
tools = [t.tool_name for s in steps for t in s.tool_calls]
tool_patterns.append(tools)
reflection = f"""
## Performance Summary (Last {days} Days)
- Total tasks: {len(traces)}
- Success rate: {success_count / len(traces):.1%}
### Common Failure Patterns
{chr(10).join(f'- {r}' for r in set(failure_reasons)[:5])}
### Successful Tool Combinations
{chr(10).join(f'- {" → ".join(p)}' for p in tool_patterns[:5])}
### Recommendations
- Focus on improving: {failure_reasons[0] if failure_reasons else 'N/A'}
- Most reliable approach: {' → '.join(tool_patterns[0]) if tool_patterns else 'N/A'}
"""
return reflection
A/B Testing Strategies
Compare different reasoning approaches:
async def compare_strategies(
task_pattern: str,
strategy_a: str,
strategy_b: str,
) -> dict:
"""Compare success rates of different strategies."""
# Get traces for each strategy
traces_a = await client.reasoning.get_traces(
metadata_filter={"strategy": strategy_a},
limit=100,
)
traces_b = await client.reasoning.get_traces(
metadata_filter={"strategy": strategy_b},
limit=100,
)
# Calculate metrics
success_rate_a = sum(1 for t in traces_a if t.outcome == "success") / len(traces_a)
success_rate_b = sum(1 for t in traces_b if t.outcome == "success") / len(traces_b)
avg_steps_a = sum(t.step_count for t in traces_a) / len(traces_a)
avg_steps_b = sum(t.step_count for t in traces_b) / len(traces_b)
return {
"strategy_a": {
"name": strategy_a,
"success_rate": success_rate_a,
"avg_steps": avg_steps_a,
"sample_size": len(traces_a),
},
"strategy_b": {
"name": strategy_b,
"success_rate": success_rate_b,
"avg_steps": avg_steps_b,
"sample_size": len(traces_b),
},
"winner": strategy_a if success_rate_a > success_rate_b else strategy_b,
}
Best Practices
1. Record Sufficient Detail
Include enough context for future learning:
# Good: Detailed step with reasoning
await client.reasoning.add_step(
trace_id=trace.id,
description="Filtered 20 products down to 5 matching Nike brand preference",
reasoning="Customer has strong Nike preference (confidence: 0.95). Filtering to boost relevance.",
metadata={
"input_count": 20,
"output_count": 5,
"filter_criteria": ["brand=Nike"],
},
)
# Avoid: Vague step
await client.reasoning.add_step(
trace_id=trace.id,
description="Filtered products",
)
2. Always Complete Traces
Ensure traces are marked complete even on errors:
trace = await client.reasoning.start_trace(task=task, user_id=user_id)
try:
result = await execute_task()
await client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
result=result,
)
except Exception as e:
await client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
error=str(e),
)
raise
finally:
# Ensure trace is never left incomplete
pass
3. Use Structured Outcomes
Standardize outcome categories:
OUTCOME_TYPES = {
"success": "Task completed successfully",
"partial_success": "Task completed with limitations",
"failure": "Task failed",
"timeout": "Task exceeded time limit",
"cancelled": "Task cancelled by user",
"error": "Unexpected error occurred",
}
await client.reasoning.complete_trace(
trace_id=trace.id,
outcome="partial_success",
result={
"completed": ["product_search", "preference_matching"],
"skipped": ["inventory_check"],
"reason": "inventory_service_unavailable",
},
)
4. Prune Old Traces
Remove old traces to manage storage:
# Delete traces older than 90 days
await client.reasoning.delete_traces(
before=datetime.now() - timedelta(days=90),
)
# Keep successful traces longer
await client.reasoning.delete_traces(
before=datetime.now() - timedelta(days=30),
outcome="failure", # Only delete failures
)