Use with LlamaIndex
How to integrate neo4j-agent-memory with LlamaIndex for building RAG applications with persistent context graphs.
Overview
LlamaIndex excels at building retrieval-augmented generation (RAG) applications. Integrating with neo4j-agent-memory adds persistent memory that combines document retrieval with conversational context and user preferences stored in a context graph.
| LlamaIndex + Context Graph Architecture |
|---|
|
Prerequisites
-
Python 3.10+
-
neo4j-agent-memoryandllama-indexinstalled -
Neo4j database running
-
OpenAI API key
pip install neo4j-agent-memory llama-index llama-index-llms-openai
Chat Memory Integration
Create Custom Chat Store
Implement LlamaIndex’s chat store interface:
from typing import List, Optional
from llama_index.core.storage.chat_store import BaseChatStore
from llama_index.core.llms import ChatMessage, MessageRole
from neo4j_agent_memory import MemoryClient
class Neo4jChatStore(BaseChatStore):
"""LlamaIndex chat store backed by neo4j-agent-memory."""
def __init__(self, memory_client: MemoryClient):
self.memory_client = memory_client
def set_messages(self, key: str, messages: List[ChatMessage]) -> None:
"""Set messages for a session."""
import asyncio
loop = asyncio.get_event_loop()
# Clear existing messages
loop.run_until_complete(
self.memory_client.short_term.delete_session(session_id=key)
)
# Add new messages
for msg in messages:
role = "user" if msg.role == MessageRole.USER else "assistant"
loop.run_until_complete(
self.memory_client.short_term.add_message(
role=role,
content=msg.content,
session_id=key,
)
)
def get_messages(self, key: str) -> List[ChatMessage]:
"""Get messages for a session."""
import asyncio
loop = asyncio.get_event_loop()
raw_messages = loop.run_until_complete(
self.memory_client.short_term.get_session_messages(
session_id=key,
limit=100,
)
)
result = []
for msg in raw_messages:
role = MessageRole.USER if msg.role == "user" else MessageRole.ASSISTANT
result.append(ChatMessage(role=role, content=msg.content))
return result
def add_message(self, key: str, message: ChatMessage) -> None:
"""Add a message to a session."""
import asyncio
loop = asyncio.get_event_loop()
role = "user" if message.role == MessageRole.USER else "assistant"
loop.run_until_complete(
self.memory_client.short_term.add_message(
role=role,
content=message.content,
session_id=key,
)
)
def delete_messages(self, key: str) -> Optional[List[ChatMessage]]:
"""Delete all messages in a session."""
import asyncio
loop = asyncio.get_event_loop()
# Get messages before deleting
messages = self.get_messages(key)
loop.run_until_complete(
self.memory_client.short_term.delete_session(session_id=key)
)
return messages
def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
"""Delete a specific message (not directly supported, returns None)."""
return None
def delete_last_message(self, key: str) -> Optional[ChatMessage]:
"""Delete the last message (not directly supported, returns None)."""
return None
def get_keys(self) -> List[str]:
"""Get all session keys."""
import asyncio
loop = asyncio.get_event_loop()
sessions = loop.run_until_complete(
self.memory_client.short_term.list_sessions(limit=1000)
)
return [s.id for s in sessions]
Use with Chat Engine
from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.llms.openai import OpenAI
# Initialize memory client
memory_client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
# Create chat store
chat_store = Neo4jChatStore(memory_client)
# Create memory buffer with our store
memory = ChatMemoryBuffer.from_defaults(
chat_store=chat_store,
chat_store_key="user-session-001",
token_limit=3000,
)
# Load documents and create index
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)
# Create chat engine with memory
chat_engine = index.as_chat_engine(
chat_mode="context",
memory=memory,
llm=OpenAI(model="gpt-4o"),
system_prompt="""You are a helpful assistant with access to company documents.
Use the conversation history to provide contextual responses.""",
)
# Chat - memory persists to Neo4j
response = chat_engine.chat("What is our return policy?")
print(response)
# Later messages have full context
response = chat_engine.chat("Can you elaborate on the exceptions?")
print(response)
Custom Retriever with Context Graph
Create Context-Aware Retriever
Combine document retrieval with context graph knowledge:
from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore, TextNode, QueryBundle
from typing import List
class ContextGraphRetriever(BaseRetriever):
"""Retriever that combines document index with context graph."""
def __init__(
self,
vector_retriever: BaseRetriever,
memory_client: MemoryClient,
user_id: str,
include_preferences: bool = True,
include_entities: bool = True,
include_history: bool = True,
):
self.vector_retriever = vector_retriever
self.memory_client = memory_client
self.user_id = user_id
self.include_preferences = include_preferences
self.include_entities = include_entities
self.include_history = include_history
super().__init__()
def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
import asyncio
loop = asyncio.get_event_loop()
# Get document results
doc_nodes = self.vector_retriever.retrieve(query_bundle)
# Add context graph knowledge
context_nodes = []
# Add user preferences
if self.include_preferences:
preferences = loop.run_until_complete(
self.memory_client.long_term.get_preferences(
user_id=self.user_id,
limit=5,
)
)
if preferences:
pref_text = "User Preferences:\n" + "\n".join([
f"- {p.category}: {p.preference}"
for p in preferences
])
context_nodes.append(NodeWithScore(
node=TextNode(text=pref_text),
score=0.9, # High relevance for personalization
))
# Add relevant entities
if self.include_entities:
entities = loop.run_until_complete(
self.memory_client.long_term.search_entities(
query=query_bundle.query_str,
limit=5,
)
)
if entities:
entity_text = "Relevant Knowledge:\n" + "\n".join([
f"- {e.name} ({e.type}): {e.description or 'No description'}"
for e in entities
])
context_nodes.append(NodeWithScore(
node=TextNode(text=entity_text),
score=0.85,
))
# Add relevant conversation history
if self.include_history:
messages = loop.run_until_complete(
self.memory_client.short_term.search_messages(
query=query_bundle.query_str,
limit=3,
)
)
if messages:
history_text = "Relevant Past Conversations:\n" + "\n".join([
f"- [{m.role}]: {m.content[:150]}..."
for m in messages
])
context_nodes.append(NodeWithScore(
node=TextNode(text=history_text),
score=0.8,
))
# Combine and sort by score
all_nodes = doc_nodes + context_nodes
all_nodes.sort(key=lambda x: x.score or 0, reverse=True)
return all_nodes
# Usage
from llama_index.core import VectorStoreIndex
# Create base index
index = VectorStoreIndex.from_documents(documents)
vector_retriever = index.as_retriever(similarity_top_k=5)
# Wrap with context graph
context_retriever = ContextGraphRetriever(
vector_retriever=vector_retriever,
memory_client=memory_client,
user_id="CUST-12345",
)
# Use in query engine
from llama_index.core.query_engine import RetrieverQueryEngine
query_engine = RetrieverQueryEngine.from_args(
retriever=context_retriever,
llm=OpenAI(model="gpt-4o"),
)
response = query_engine.query("What products would you recommend for me?")
Financial Document Assistant
Complete example for a financial services RAG application:
import json
from datetime import datetime
from llama_index.core import (
VectorStoreIndex,
SimpleDirectoryReader,
Settings,
)
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.memory import ChatMemoryBuffer
from neo4j_agent_memory import MemoryClient
# --- Initialize ---
memory_client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
Settings.llm = OpenAI(model="gpt-4o", temperature=0)
# --- Create Document Indexes ---
# Index for regulatory documents
regulatory_docs = SimpleDirectoryReader("./data/regulatory").load_data()
regulatory_index = VectorStoreIndex.from_documents(regulatory_docs)
# Index for market research
research_docs = SimpleDirectoryReader("./data/research").load_data()
research_index = VectorStoreIndex.from_documents(research_docs)
# --- Create Context Graph Tools ---
from llama_index.core.tools import FunctionTool
def get_client_profile(client_id: str) -> str:
"""Get comprehensive client profile from context graph."""
import asyncio
loop = asyncio.get_event_loop()
# Get preferences
preferences = loop.run_until_complete(
memory_client.long_term.get_preferences(
user_id=client_id,
)
)
# Get related entities (holdings, accounts)
holdings = loop.run_until_complete(
memory_client.long_term.execute_query(
"""
MATCH (c:Client {id: $client_id})-[:HAS_ACCOUNT]->(a:Account)
-[:HOLDS]->(p:Position)-[:IN]->(s:Security)
RETURN a.type as account_type, s.name as security,
s.ticker as ticker, p.shares as shares
""",
parameters={"client_id": client_id},
)
)
return json.dumps({
"preferences": [{"category": p.category, "value": p.preference} for p in preferences],
"holdings": holdings,
})
def search_securities(query: str, sector: str = None) -> str:
"""Search for securities in the context graph."""
import asyncio
loop = asyncio.get_event_loop()
filters = {"sector": sector} if sector else None
securities = loop.run_until_complete(
memory_client.long_term.search_entities(
query=query,
entity_type="SECURITY",
property_filter=filters,
limit=10,
)
)
return json.dumps([
{
"name": s.name,
"ticker": s.properties.get("ticker"),
"sector": s.properties.get("sector"),
"description": s.description,
}
for s in securities
])
def get_past_recommendations(client_id: str, topic: str) -> str:
"""Get past successful recommendations for similar topics."""
import asyncio
loop = asyncio.get_event_loop()
traces = loop.run_until_complete(
memory_client.reasoning.get_similar_traces(
task=topic,
user_id=client_id,
limit=3,
success_only=True,
)
)
return json.dumps([
{
"topic": t.task,
"recommendation": t.result.get("recommendation") if t.result else None,
"client_feedback": t.result.get("feedback") if t.result else None,
}
for t in traces
])
# --- Build Tools ---
tools = [
# Document query tools
QueryEngineTool(
query_engine=regulatory_index.as_query_engine(),
metadata=ToolMetadata(
name="regulatory_search",
description="Search regulatory documents for compliance and rules information.",
),
),
QueryEngineTool(
query_engine=research_index.as_query_engine(),
metadata=ToolMetadata(
name="research_search",
description="Search market research reports for analysis and insights.",
),
),
# Context graph tools
FunctionTool.from_defaults(
fn=get_client_profile,
name="get_client_profile",
description="Get client's investment profile, preferences, and current holdings.",
),
FunctionTool.from_defaults(
fn=search_securities,
name="search_securities",
description="Search for securities by description or characteristics.",
),
FunctionTool.from_defaults(
fn=get_past_recommendations,
name="get_past_recommendations",
description="Find past successful recommendations for similar client situations.",
),
]
# --- Create Agent with Memory ---
def create_financial_agent(client_id: str, session_id: str) -> ReActAgent:
"""Create financial advisory agent with context graph memory."""
# Chat store for message persistence
chat_store = Neo4jChatStore(memory_client)
memory = ChatMemoryBuffer.from_defaults(
chat_store=chat_store,
chat_store_key=session_id,
token_limit=4000,
)
agent = ReActAgent.from_tools(
tools=tools,
memory=memory,
verbose=True,
system_prompt=f"""You are a financial advisory assistant helping wealth managers.
Current client: {client_id}
Your capabilities:
1. Search regulatory documents for compliance guidance
2. Search market research for investment insights
3. Access client profiles, preferences, and holdings
4. Search for securities matching specific criteria
5. Review past successful recommendations
Guidelines:
- Always consider the client's risk profile and preferences
- Reference specific securities with ticker symbols
- Note compliance considerations when relevant
- Learn from past successful recommendations
Remember: All recommendations should be reviewed by the advisor before execution.""",
)
return agent
# --- Main Application ---
async def run_financial_assistant():
client_id = "CL-78901"
session_id = f"advisory-{datetime.now().strftime('%Y%m%d%H%M%S')}"
agent = create_financial_agent(client_id, session_id)
print("Financial Advisory Assistant (type 'quit' to exit)\n")
while True:
user_input = input("Advisor: ").strip()
if user_input.lower() == "quit":
break
# Start reasoning trace
trace = await memory_client.reasoning.start_trace(
task=user_input,
user_id=client_id,
session_id=session_id,
)
try:
response = agent.chat(user_input)
print(f"\nAssistant: {response}\n")
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
result={"response": str(response)[:500]},
)
except Exception as e:
print(f"\nError: {e}\n")
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
error=str(e),
)
if __name__ == "__main__":
import asyncio
asyncio.run(run_financial_assistant())
Ecommerce Product Assistant
Example combining product catalog with customer context:
from llama_index.core import VectorStoreIndex, Document
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI
def create_product_index(memory_client: MemoryClient) -> VectorStoreIndex:
"""Create product index from context graph."""
import asyncio
loop = asyncio.get_event_loop()
# Get all products from context graph
products = loop.run_until_complete(
memory_client.long_term.search_entities(
query="",
entity_type="PRODUCT",
limit=1000,
)
)
# Convert to LlamaIndex documents
documents = []
for p in products:
doc_text = f"""
Product: {p.name}
Brand: {p.properties.get('brand', 'Unknown')}
Category: {p.properties.get('category', 'Unknown')}
Price: ${p.properties.get('price', 'N/A')}
Rating: {p.properties.get('rating', 'N/A')}/5
Description: {p.description or 'No description available'}
"""
documents.append(Document(
text=doc_text,
metadata={
"product_name": p.name,
"brand": p.properties.get("brand"),
"category": p.properties.get("category"),
},
))
return VectorStoreIndex.from_documents(documents)
def get_customer_preferences(customer_id: str) -> str:
"""Get customer preferences from context graph."""
import asyncio
loop = asyncio.get_event_loop()
preferences = loop.run_until_complete(
memory_client.long_term.get_preferences(user_id=customer_id)
)
by_category = {}
for p in preferences:
if p.category not in by_category:
by_category[p.category] = []
by_category[p.category].append(p.preference)
return json.dumps(by_category)
def get_purchase_history(customer_id: str) -> str:
"""Get customer purchase history."""
import asyncio
loop = asyncio.get_event_loop()
purchases = loop.run_until_complete(
memory_client.long_term.execute_query(
"""
MATCH (c:Customer {id: $customer_id})-[p:PURCHASED]->(product:Product)
RETURN product.name as product, product.brand as brand,
p.purchase_date as date, p.rating as rating
ORDER BY p.purchase_date DESC
LIMIT 10
""",
parameters={"customer_id": customer_id},
)
)
return json.dumps(purchases)
def save_preference(customer_id: str, preference: str, category: str) -> str:
"""Save a customer preference."""
import asyncio
loop = asyncio.get_event_loop()
loop.run_until_complete(
memory_client.long_term.add_preference(
user_id=customer_id,
preference=preference,
category=category,
)
)
return f"Saved: {category} - {preference}"
def create_shopping_agent(
memory_client: MemoryClient,
customer_id: str,
session_id: str,
) -> ReActAgent:
"""Create shopping assistant with product knowledge and customer context."""
# Create product index
product_index = create_product_index(memory_client)
# Create tools
tools = [
QueryEngineTool(
query_engine=product_index.as_query_engine(similarity_top_k=10),
metadata=ToolMetadata(
name="search_products",
description="Search product catalog by description, category, or features.",
),
),
FunctionTool.from_defaults(
fn=lambda: get_customer_preferences(customer_id),
name="get_preferences",
description="Get current customer's stored preferences.",
),
FunctionTool.from_defaults(
fn=lambda: get_purchase_history(customer_id),
name="get_purchase_history",
description="Get customer's recent purchase history.",
),
FunctionTool.from_defaults(
fn=lambda pref, cat: save_preference(customer_id, pref, cat),
name="save_preference",
description="Save a new customer preference. Parameters: preference (str), category (str).",
),
]
# Create chat store
chat_store = Neo4jChatStore(memory_client)
memory = ChatMemoryBuffer.from_defaults(
chat_store=chat_store,
chat_store_key=session_id,
token_limit=3000,
)
return ReActAgent.from_tools(
tools=tools,
memory=memory,
verbose=True,
llm=OpenAI(model="gpt-4o"),
system_prompt=f"""You are a personal shopping assistant.
Customer: {customer_id}
Your capabilities:
1. Search the product catalog for items matching requests
2. Access customer preferences for personalization
3. Review purchase history for context
4. Save new preferences when customers express them
Always:
- Check customer preferences before making recommendations
- Personalize suggestions based on past purchases
- Save important preferences for future interactions
- Explain why you're recommending specific products""",
)
Best Practices
1. Combine Document and Graph Retrieval
Use both for richer context:
# Create composite retriever
class CompositeRetriever(BaseRetriever):
def _retrieve(self, query_bundle):
# Document retrieval
doc_nodes = self.doc_retriever.retrieve(query_bundle)
# Graph retrieval (entities, preferences)
graph_nodes = self.graph_retriever.retrieve(query_bundle)
# Merge and rank
return self._merge_results(doc_nodes, graph_nodes)
2. Use Session-Based Memory
Isolate conversations properly:
# Good: Unique session per conversation
session_id = f"user-{user_id}-{uuid.uuid4().hex[:8]}"
memory = ChatMemoryBuffer.from_defaults(
chat_store=chat_store,
chat_store_key=session_id,
)
3. Track Reasoning with Traces
Record agent decisions for improvement:
# Before query
trace = await memory_client.reasoning.start_trace(
task=query,
user_id=user_id,
)
try:
response = agent.chat(query)
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
)
except:
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
)
4. Limit Token Usage
Control context size:
memory = ChatMemoryBuffer.from_defaults(
chat_store=chat_store,
chat_store_key=session_id,
token_limit=3000, # Limit history tokens
)