Use with LlamaIndex

How to integrate neo4j-agent-memory with LlamaIndex for building RAG applications with persistent context graphs.

Overview

LlamaIndex excels at building retrieval-augmented generation (RAG) applications. Integrating with neo4j-agent-memory adds persistent memory that combines document retrieval with conversational context and user preferences stored in a context graph.

LlamaIndex + Context Graph Architecture
┌─────────────────────────────────────────────────────┐
│                   LlamaIndex Query                  │
│                                                     │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐ │
│  │   Vector    │  │   Context   │  │    Chat     │ │
│  │   Index     │  │    Graph    │  │   History   │ │
│  │  (Docs)     │  │  (Memory)   │  │  (Sessions) │ │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘ │
│         │                │                │        │
│         └────────────────┼────────────────┘        │
│                          ▼                          │
│              ┌─────────────────────┐               │
│              │   Combined Context  │               │
│              │   for LLM Response  │               │
│              └─────────────────────┘               │
└─────────────────────────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────┐
│               Neo4j Context Graph                   │
│  Documents ──── Entities ──── Messages              │
│      │            │              │                  │
│      └────────────┴──────────────┘                  │
│         User Preferences + Reasoning                │
└─────────────────────────────────────────────────────┘

Prerequisites

  • Python 3.10+

  • neo4j-agent-memory and llama-index installed

  • Neo4j database running

  • OpenAI API key

pip install neo4j-agent-memory llama-index llama-index-llms-openai

Chat Memory Integration

Create Custom Chat Store

Implement LlamaIndex’s chat store interface:

from typing import List, Optional
from llama_index.core.storage.chat_store import BaseChatStore
from llama_index.core.llms import ChatMessage, MessageRole
from neo4j_agent_memory import MemoryClient


class Neo4jChatStore(BaseChatStore):
    """LlamaIndex chat store backed by neo4j-agent-memory."""

    def __init__(self, memory_client: MemoryClient):
        self.memory_client = memory_client

    def set_messages(self, key: str, messages: List[ChatMessage]) -> None:
        """Set messages for a session."""
        import asyncio
        loop = asyncio.get_event_loop()

        # Clear existing messages
        loop.run_until_complete(
            self.memory_client.short_term.delete_session(session_id=key)
        )

        # Add new messages
        for msg in messages:
            role = "user" if msg.role == MessageRole.USER else "assistant"
            loop.run_until_complete(
                self.memory_client.short_term.add_message(
                    role=role,
                    content=msg.content,
                    session_id=key,
                )
            )

    def get_messages(self, key: str) -> List[ChatMessage]:
        """Get messages for a session."""
        import asyncio
        loop = asyncio.get_event_loop()

        raw_messages = loop.run_until_complete(
            self.memory_client.short_term.get_session_messages(
                session_id=key,
                limit=100,
            )
        )

        result = []
        for msg in raw_messages:
            role = MessageRole.USER if msg.role == "user" else MessageRole.ASSISTANT
            result.append(ChatMessage(role=role, content=msg.content))

        return result

    def add_message(self, key: str, message: ChatMessage) -> None:
        """Add a message to a session."""
        import asyncio
        loop = asyncio.get_event_loop()

        role = "user" if message.role == MessageRole.USER else "assistant"
        loop.run_until_complete(
            self.memory_client.short_term.add_message(
                role=role,
                content=message.content,
                session_id=key,
            )
        )

    def delete_messages(self, key: str) -> Optional[List[ChatMessage]]:
        """Delete all messages in a session."""
        import asyncio
        loop = asyncio.get_event_loop()

        # Get messages before deleting
        messages = self.get_messages(key)

        loop.run_until_complete(
            self.memory_client.short_term.delete_session(session_id=key)
        )

        return messages

    def delete_message(self, key: str, idx: int) -> Optional[ChatMessage]:
        """Delete a specific message (not directly supported, returns None)."""
        return None

    def delete_last_message(self, key: str) -> Optional[ChatMessage]:
        """Delete the last message (not directly supported, returns None)."""
        return None

    def get_keys(self) -> List[str]:
        """Get all session keys."""
        import asyncio
        loop = asyncio.get_event_loop()

        sessions = loop.run_until_complete(
            self.memory_client.short_term.list_sessions(limit=1000)
        )

        return [s.id for s in sessions]

Use with Chat Engine

from llama_index.core import VectorStoreIndex, SimpleDirectoryReader
from llama_index.core.memory import ChatMemoryBuffer
from llama_index.llms.openai import OpenAI

# Initialize memory client
memory_client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

# Create chat store
chat_store = Neo4jChatStore(memory_client)

# Create memory buffer with our store
memory = ChatMemoryBuffer.from_defaults(
    chat_store=chat_store,
    chat_store_key="user-session-001",
    token_limit=3000,
)

# Load documents and create index
documents = SimpleDirectoryReader("./data").load_data()
index = VectorStoreIndex.from_documents(documents)

# Create chat engine with memory
chat_engine = index.as_chat_engine(
    chat_mode="context",
    memory=memory,
    llm=OpenAI(model="gpt-4o"),
    system_prompt="""You are a helpful assistant with access to company documents.
    Use the conversation history to provide contextual responses.""",
)

# Chat - memory persists to Neo4j
response = chat_engine.chat("What is our return policy?")
print(response)

# Later messages have full context
response = chat_engine.chat("Can you elaborate on the exceptions?")
print(response)

Custom Retriever with Context Graph

Create Context-Aware Retriever

Combine document retrieval with context graph knowledge:

from llama_index.core.retrievers import BaseRetriever
from llama_index.core.schema import NodeWithScore, TextNode, QueryBundle
from typing import List


class ContextGraphRetriever(BaseRetriever):
    """Retriever that combines document index with context graph."""

    def __init__(
        self,
        vector_retriever: BaseRetriever,
        memory_client: MemoryClient,
        user_id: str,
        include_preferences: bool = True,
        include_entities: bool = True,
        include_history: bool = True,
    ):
        self.vector_retriever = vector_retriever
        self.memory_client = memory_client
        self.user_id = user_id
        self.include_preferences = include_preferences
        self.include_entities = include_entities
        self.include_history = include_history
        super().__init__()

    def _retrieve(self, query_bundle: QueryBundle) -> List[NodeWithScore]:
        import asyncio
        loop = asyncio.get_event_loop()

        # Get document results
        doc_nodes = self.vector_retriever.retrieve(query_bundle)

        # Add context graph knowledge
        context_nodes = []

        # Add user preferences
        if self.include_preferences:
            preferences = loop.run_until_complete(
                self.memory_client.long_term.get_preferences(
                    user_id=self.user_id,
                    limit=5,
                )
            )

            if preferences:
                pref_text = "User Preferences:\n" + "\n".join([
                    f"- {p.category}: {p.preference}"
                    for p in preferences
                ])
                context_nodes.append(NodeWithScore(
                    node=TextNode(text=pref_text),
                    score=0.9,  # High relevance for personalization
                ))

        # Add relevant entities
        if self.include_entities:
            entities = loop.run_until_complete(
                self.memory_client.long_term.search_entities(
                    query=query_bundle.query_str,
                    limit=5,
                )
            )

            if entities:
                entity_text = "Relevant Knowledge:\n" + "\n".join([
                    f"- {e.name} ({e.type}): {e.description or 'No description'}"
                    for e in entities
                ])
                context_nodes.append(NodeWithScore(
                    node=TextNode(text=entity_text),
                    score=0.85,
                ))

        # Add relevant conversation history
        if self.include_history:
            messages = loop.run_until_complete(
                self.memory_client.short_term.search_messages(
                    query=query_bundle.query_str,
                    limit=3,
                )
            )

            if messages:
                history_text = "Relevant Past Conversations:\n" + "\n".join([
                    f"- [{m.role}]: {m.content[:150]}..."
                    for m in messages
                ])
                context_nodes.append(NodeWithScore(
                    node=TextNode(text=history_text),
                    score=0.8,
                ))

        # Combine and sort by score
        all_nodes = doc_nodes + context_nodes
        all_nodes.sort(key=lambda x: x.score or 0, reverse=True)

        return all_nodes


# Usage
from llama_index.core import VectorStoreIndex

# Create base index
index = VectorStoreIndex.from_documents(documents)
vector_retriever = index.as_retriever(similarity_top_k=5)

# Wrap with context graph
context_retriever = ContextGraphRetriever(
    vector_retriever=vector_retriever,
    memory_client=memory_client,
    user_id="CUST-12345",
)

# Use in query engine
from llama_index.core.query_engine import RetrieverQueryEngine

query_engine = RetrieverQueryEngine.from_args(
    retriever=context_retriever,
    llm=OpenAI(model="gpt-4o"),
)

response = query_engine.query("What products would you recommend for me?")

Financial Document Assistant

Complete example for a financial services RAG application:

import json
from datetime import datetime
from llama_index.core import (
    VectorStoreIndex,
    SimpleDirectoryReader,
    Settings,
)
from llama_index.core.tools import QueryEngineTool, ToolMetadata
from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI
from llama_index.core.memory import ChatMemoryBuffer
from neo4j_agent_memory import MemoryClient


# --- Initialize ---

memory_client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

Settings.llm = OpenAI(model="gpt-4o", temperature=0)


# --- Create Document Indexes ---

# Index for regulatory documents
regulatory_docs = SimpleDirectoryReader("./data/regulatory").load_data()
regulatory_index = VectorStoreIndex.from_documents(regulatory_docs)

# Index for market research
research_docs = SimpleDirectoryReader("./data/research").load_data()
research_index = VectorStoreIndex.from_documents(research_docs)


# --- Create Context Graph Tools ---

from llama_index.core.tools import FunctionTool


def get_client_profile(client_id: str) -> str:
    """Get comprehensive client profile from context graph."""
    import asyncio
    loop = asyncio.get_event_loop()

    # Get preferences
    preferences = loop.run_until_complete(
        memory_client.long_term.get_preferences(
            user_id=client_id,
        )
    )

    # Get related entities (holdings, accounts)
    holdings = loop.run_until_complete(
        memory_client.long_term.execute_query(
            """
            MATCH (c:Client {id: $client_id})-[:HAS_ACCOUNT]->(a:Account)
                  -[:HOLDS]->(p:Position)-[:IN]->(s:Security)
            RETURN a.type as account_type, s.name as security,
                   s.ticker as ticker, p.shares as shares
            """,
            parameters={"client_id": client_id},
        )
    )

    return json.dumps({
        "preferences": [{"category": p.category, "value": p.preference} for p in preferences],
        "holdings": holdings,
    })


def search_securities(query: str, sector: str = None) -> str:
    """Search for securities in the context graph."""
    import asyncio
    loop = asyncio.get_event_loop()

    filters = {"sector": sector} if sector else None

    securities = loop.run_until_complete(
        memory_client.long_term.search_entities(
            query=query,
            entity_type="SECURITY",
            property_filter=filters,
            limit=10,
        )
    )

    return json.dumps([
        {
            "name": s.name,
            "ticker": s.properties.get("ticker"),
            "sector": s.properties.get("sector"),
            "description": s.description,
        }
        for s in securities
    ])


def get_past_recommendations(client_id: str, topic: str) -> str:
    """Get past successful recommendations for similar topics."""
    import asyncio
    loop = asyncio.get_event_loop()

    traces = loop.run_until_complete(
        memory_client.reasoning.get_similar_traces(
            task=topic,
            user_id=client_id,
            limit=3,
            success_only=True,
        )
    )

    return json.dumps([
        {
            "topic": t.task,
            "recommendation": t.result.get("recommendation") if t.result else None,
            "client_feedback": t.result.get("feedback") if t.result else None,
        }
        for t in traces
    ])


# --- Build Tools ---

tools = [
    # Document query tools
    QueryEngineTool(
        query_engine=regulatory_index.as_query_engine(),
        metadata=ToolMetadata(
            name="regulatory_search",
            description="Search regulatory documents for compliance and rules information.",
        ),
    ),
    QueryEngineTool(
        query_engine=research_index.as_query_engine(),
        metadata=ToolMetadata(
            name="research_search",
            description="Search market research reports for analysis and insights.",
        ),
    ),
    # Context graph tools
    FunctionTool.from_defaults(
        fn=get_client_profile,
        name="get_client_profile",
        description="Get client's investment profile, preferences, and current holdings.",
    ),
    FunctionTool.from_defaults(
        fn=search_securities,
        name="search_securities",
        description="Search for securities by description or characteristics.",
    ),
    FunctionTool.from_defaults(
        fn=get_past_recommendations,
        name="get_past_recommendations",
        description="Find past successful recommendations for similar client situations.",
    ),
]


# --- Create Agent with Memory ---

def create_financial_agent(client_id: str, session_id: str) -> ReActAgent:
    """Create financial advisory agent with context graph memory."""

    # Chat store for message persistence
    chat_store = Neo4jChatStore(memory_client)

    memory = ChatMemoryBuffer.from_defaults(
        chat_store=chat_store,
        chat_store_key=session_id,
        token_limit=4000,
    )

    agent = ReActAgent.from_tools(
        tools=tools,
        memory=memory,
        verbose=True,
        system_prompt=f"""You are a financial advisory assistant helping wealth managers.

Current client: {client_id}

Your capabilities:
1. Search regulatory documents for compliance guidance
2. Search market research for investment insights
3. Access client profiles, preferences, and holdings
4. Search for securities matching specific criteria
5. Review past successful recommendations

Guidelines:
- Always consider the client's risk profile and preferences
- Reference specific securities with ticker symbols
- Note compliance considerations when relevant
- Learn from past successful recommendations

Remember: All recommendations should be reviewed by the advisor before execution.""",
    )

    return agent


# --- Main Application ---

async def run_financial_assistant():
    client_id = "CL-78901"
    session_id = f"advisory-{datetime.now().strftime('%Y%m%d%H%M%S')}"

    agent = create_financial_agent(client_id, session_id)

    print("Financial Advisory Assistant (type 'quit' to exit)\n")

    while True:
        user_input = input("Advisor: ").strip()
        if user_input.lower() == "quit":
            break

        # Start reasoning trace
        trace = await memory_client.reasoning.start_trace(
            task=user_input,
            user_id=client_id,
            session_id=session_id,
        )

        try:
            response = agent.chat(user_input)
            print(f"\nAssistant: {response}\n")

            await memory_client.reasoning.complete_trace(
                trace_id=trace.id,
                outcome="success",
                result={"response": str(response)[:500]},
            )
        except Exception as e:
            print(f"\nError: {e}\n")
            await memory_client.reasoning.complete_trace(
                trace_id=trace.id,
                outcome="failure",
                error=str(e),
            )


if __name__ == "__main__":
    import asyncio
    asyncio.run(run_financial_assistant())

Ecommerce Product Assistant

Example combining product catalog with customer context:

from llama_index.core import VectorStoreIndex, Document
from llama_index.core.tools import FunctionTool, QueryEngineTool
from llama_index.core.agent import ReActAgent
from llama_index.llms.openai import OpenAI


def create_product_index(memory_client: MemoryClient) -> VectorStoreIndex:
    """Create product index from context graph."""
    import asyncio
    loop = asyncio.get_event_loop()

    # Get all products from context graph
    products = loop.run_until_complete(
        memory_client.long_term.search_entities(
            query="",
            entity_type="PRODUCT",
            limit=1000,
        )
    )

    # Convert to LlamaIndex documents
    documents = []
    for p in products:
        doc_text = f"""
Product: {p.name}
Brand: {p.properties.get('brand', 'Unknown')}
Category: {p.properties.get('category', 'Unknown')}
Price: ${p.properties.get('price', 'N/A')}
Rating: {p.properties.get('rating', 'N/A')}/5
Description: {p.description or 'No description available'}
        """
        documents.append(Document(
            text=doc_text,
            metadata={
                "product_name": p.name,
                "brand": p.properties.get("brand"),
                "category": p.properties.get("category"),
            },
        ))

    return VectorStoreIndex.from_documents(documents)


def get_customer_preferences(customer_id: str) -> str:
    """Get customer preferences from context graph."""
    import asyncio
    loop = asyncio.get_event_loop()

    preferences = loop.run_until_complete(
        memory_client.long_term.get_preferences(user_id=customer_id)
    )

    by_category = {}
    for p in preferences:
        if p.category not in by_category:
            by_category[p.category] = []
        by_category[p.category].append(p.preference)

    return json.dumps(by_category)


def get_purchase_history(customer_id: str) -> str:
    """Get customer purchase history."""
    import asyncio
    loop = asyncio.get_event_loop()

    purchases = loop.run_until_complete(
        memory_client.long_term.execute_query(
            """
            MATCH (c:Customer {id: $customer_id})-[p:PURCHASED]->(product:Product)
            RETURN product.name as product, product.brand as brand,
                   p.purchase_date as date, p.rating as rating
            ORDER BY p.purchase_date DESC
            LIMIT 10
            """,
            parameters={"customer_id": customer_id},
        )
    )

    return json.dumps(purchases)


def save_preference(customer_id: str, preference: str, category: str) -> str:
    """Save a customer preference."""
    import asyncio
    loop = asyncio.get_event_loop()

    loop.run_until_complete(
        memory_client.long_term.add_preference(
            user_id=customer_id,
            preference=preference,
            category=category,
        )
    )

    return f"Saved: {category} - {preference}"


def create_shopping_agent(
    memory_client: MemoryClient,
    customer_id: str,
    session_id: str,
) -> ReActAgent:
    """Create shopping assistant with product knowledge and customer context."""

    # Create product index
    product_index = create_product_index(memory_client)

    # Create tools
    tools = [
        QueryEngineTool(
            query_engine=product_index.as_query_engine(similarity_top_k=10),
            metadata=ToolMetadata(
                name="search_products",
                description="Search product catalog by description, category, or features.",
            ),
        ),
        FunctionTool.from_defaults(
            fn=lambda: get_customer_preferences(customer_id),
            name="get_preferences",
            description="Get current customer's stored preferences.",
        ),
        FunctionTool.from_defaults(
            fn=lambda: get_purchase_history(customer_id),
            name="get_purchase_history",
            description="Get customer's recent purchase history.",
        ),
        FunctionTool.from_defaults(
            fn=lambda pref, cat: save_preference(customer_id, pref, cat),
            name="save_preference",
            description="Save a new customer preference. Parameters: preference (str), category (str).",
        ),
    ]

    # Create chat store
    chat_store = Neo4jChatStore(memory_client)
    memory = ChatMemoryBuffer.from_defaults(
        chat_store=chat_store,
        chat_store_key=session_id,
        token_limit=3000,
    )

    return ReActAgent.from_tools(
        tools=tools,
        memory=memory,
        verbose=True,
        llm=OpenAI(model="gpt-4o"),
        system_prompt=f"""You are a personal shopping assistant.

Customer: {customer_id}

Your capabilities:
1. Search the product catalog for items matching requests
2. Access customer preferences for personalization
3. Review purchase history for context
4. Save new preferences when customers express them

Always:
- Check customer preferences before making recommendations
- Personalize suggestions based on past purchases
- Save important preferences for future interactions
- Explain why you're recommending specific products""",
    )

Best Practices

1. Combine Document and Graph Retrieval

Use both for richer context:

# Create composite retriever
class CompositeRetriever(BaseRetriever):
    def _retrieve(self, query_bundle):
        # Document retrieval
        doc_nodes = self.doc_retriever.retrieve(query_bundle)

        # Graph retrieval (entities, preferences)
        graph_nodes = self.graph_retriever.retrieve(query_bundle)

        # Merge and rank
        return self._merge_results(doc_nodes, graph_nodes)

2. Use Session-Based Memory

Isolate conversations properly:

# Good: Unique session per conversation
session_id = f"user-{user_id}-{uuid.uuid4().hex[:8]}"
memory = ChatMemoryBuffer.from_defaults(
    chat_store=chat_store,
    chat_store_key=session_id,
)

3. Track Reasoning with Traces

Record agent decisions for improvement:

# Before query
trace = await memory_client.reasoning.start_trace(
    task=query,
    user_id=user_id,
)

try:
    response = agent.chat(query)
    await memory_client.reasoning.complete_trace(
        trace_id=trace.id,
        outcome="success",
    )
except:
    await memory_client.reasoning.complete_trace(
        trace_id=trace.id,
        outcome="failure",
    )

4. Limit Token Usage

Control context size:

memory = ChatMemoryBuffer.from_defaults(
    chat_store=chat_store,
    chat_store_key=session_id,
    token_limit=3000,  # Limit history tokens
)