Use with CrewAI

How to integrate neo4j-agent-memory with CrewAI for building multi-agent systems with shared context graphs.

Overview

CrewAI enables building multi-agent systems where specialized agents collaborate on complex tasks. Integrating with neo4j-agent-memory provides a shared context graph that all agents can read from and write to, enabling persistent memory across agent interactions.

CrewAI + Shared Context Graph Architecture
┌─────────────────────────────────────────────────────────────┐
│                        CrewAI Crew                          │
│                                                             │
│  ┌─────────────┐  ┌─────────────┐  ┌─────────────┐         │
│  │  Research   │  │   Analyst   │  │   Writer    │         │
│  │   Agent     │  │   Agent     │  │   Agent     │         │
│  └──────┬──────┘  └──────┬──────┘  └──────┬──────┘         │
│         │                │                │                 │
│         └────────────────┼────────────────┘                 │
│                          │                                  │
│                          ▼                                  │
│              ┌─────────────────────┐                        │
│              │   Memory Tools      │                        │
│              │   (Shared Access)   │                        │
│              └──────────┬──────────┘                        │
└─────────────────────────┼───────────────────────────────────┘
                          │
                          ▼
┌─────────────────────────────────────────────────────────────┐
│                  Neo4j Shared Context Graph                 │
│                                                             │
│   Agent A writes ────► Entities ◄──── Agent B reads         │
│   Agent B writes ────► Facts    ◄──── Agent C reads         │
│   All agents     ────► Messages ◄──── All agents            │
│                                                             │
│   Shared: Preferences, Entities, Reasoning Traces           │
└─────────────────────────────────────────────────────────────┘

Prerequisites

  • Python 3.10+

  • neo4j-agent-memory and crewai installed

  • Neo4j database running

  • OpenAI API key

pip install neo4j-agent-memory crewai crewai-tools

Basic Integration

Create Memory Tools

Define tools that wrap neo4j-agent-memory for CrewAI agents:

from crewai_tools import BaseTool
from neo4j_agent_memory import MemoryClient
import json


class SearchEntitiesToolClass(BaseTool):
    """Tool for searching entities in the context graph."""

    name: str = "search_entities"
    description: str = """
    Search the shared knowledge graph for entities like products, people,
    organizations, or any other stored knowledge. Use this to find information
    that other agents may have discovered.

    Parameters:
    - query: Search query describing what you're looking for
    - entity_type: Optional filter (PRODUCT, PERSON, ORGANIZATION, etc.)
    """

    memory_client: MemoryClient

    class Config:
        arbitrary_types_allowed = True

    def _run(self, query: str, entity_type: str = None) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        entities = loop.run_until_complete(
            self.memory_client.long_term.search_entities(
                query=query,
                entity_type=entity_type,
                limit=10,
            )
        )

        return json.dumps([
            {
                "name": e.name,
                "type": e.type,
                "description": e.description,
                "properties": e.properties,
            }
            for e in entities
        ])


class SaveEntityToolClass(BaseTool):
    """Tool for saving entities to the context graph."""

    name: str = "save_entity"
    description: str = """
    Save a new entity to the shared knowledge graph. Other agents will be able
    to access this information. Use this to record discoveries, facts, or
    important information.

    Parameters:
    - name: Name of the entity
    - entity_type: Type (PRODUCT, PERSON, ORGANIZATION, FINDING, etc.)
    - description: Description of the entity
    - properties: Optional dict of additional properties
    """

    memory_client: MemoryClient

    class Config:
        arbitrary_types_allowed = True

    def _run(
        self,
        name: str,
        entity_type: str,
        description: str = None,
        properties: dict = None,
    ) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        entity = loop.run_until_complete(
            self.memory_client.long_term.add_entity(
                name=name,
                entity_type=entity_type,
                description=description,
                properties=properties or {},
            )
        )

        return f"Saved entity: {entity.name} ({entity.type})"


class GetPreferencesToolClass(BaseTool):
    """Tool for getting user preferences."""

    name: str = "get_preferences"
    description: str = """
    Get stored preferences for personalization. Returns user preferences
    that should guide recommendations and responses.
    """

    memory_client: MemoryClient
    user_id: str

    class Config:
        arbitrary_types_allowed = True

    def _run(self, category: str = None) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        preferences = loop.run_until_complete(
            self.memory_client.long_term.get_preferences(
                user_id=self.user_id,
                category=category,
            )
        )

        return json.dumps([
            {"category": p.category, "preference": p.preference}
            for p in preferences
        ])


class SearchMessagesToolClass(BaseTool):
    """Tool for searching conversation history."""

    name: str = "search_messages"
    description: str = """
    Search past conversation messages for relevant context. Use this to
    understand what has been discussed previously.
    """

    memory_client: MemoryClient
    session_id: str

    class Config:
        arbitrary_types_allowed = True

    def _run(self, query: str, limit: int = 5) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        messages = loop.run_until_complete(
            self.memory_client.short_term.search_messages(
                query=query,
                session_id=self.session_id,
                limit=limit,
            )
        )

        return json.dumps([
            {"role": m.role, "content": m.content[:300]}
            for m in messages
        ])

Create Agents with Shared Memory

from crewai import Agent, Task, Crew, Process

# Initialize shared memory client
memory_client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

user_id = "CUST-12345"
session_id = "crew-session-001"

# Create shared tools
search_entities = SearchEntitiesToolClass(memory_client=memory_client)
save_entity = SaveEntityToolClass(memory_client=memory_client)
get_preferences = GetPreferencesToolClass(memory_client=memory_client, user_id=user_id)
search_messages = SearchMessagesToolClass(memory_client=memory_client, session_id=session_id)

shared_tools = [search_entities, save_entity, get_preferences, search_messages]

# Create agents
researcher = Agent(
    role="Research Specialist",
    goal="Research products and gather information from the knowledge graph",
    backstory="""You are an expert researcher who finds relevant information
    from the shared knowledge base. You search for entities and save new
    findings for other agents to use.""",
    tools=shared_tools,
    verbose=True,
)

analyst = Agent(
    role="Preference Analyst",
    goal="Analyze customer preferences and match them with findings",
    backstory="""You specialize in understanding customer preferences and
    ensuring recommendations align with their stated needs. You access
    preference data and conversation history.""",
    tools=shared_tools,
    verbose=True,
)

recommender = Agent(
    role="Recommendation Specialist",
    goal="Create personalized recommendations based on research and preferences",
    backstory="""You synthesize research findings and preference analysis
    to create compelling, personalized recommendations. You save final
    recommendations to the knowledge graph.""",
    tools=shared_tools,
    verbose=True,
)

Define Tasks

# Task 1: Research products
research_task = Task(
    description="""
    Research products related to: {query}

    1. Search the knowledge graph for relevant products
    2. Identify key features and attributes
    3. Save any new findings as entities
    4. Compile a summary of available options

    Store your findings in the shared knowledge graph for other agents.
    """,
    expected_output="A structured summary of relevant products with key attributes",
    agent=researcher,
)

# Task 2: Analyze preferences
analysis_task = Task(
    description="""
    Analyze customer preferences relevant to: {query}

    1. Retrieve customer preferences from the knowledge graph
    2. Search conversation history for additional context
    3. Identify key criteria for recommendations
    4. Note any constraints or requirements

    Provide a preference profile for the recommendation specialist.
    """,
    expected_output="Customer preference profile with key criteria",
    agent=analyst,
)

# Task 3: Create recommendations
recommendation_task = Task(
    description="""
    Create personalized recommendations for: {query}

    Using the research findings and preference analysis:
    1. Match products to customer preferences
    2. Rank options by relevance
    3. Explain why each recommendation fits
    4. Save the final recommendations to the knowledge graph

    Provide 3-5 personalized recommendations with explanations.
    """,
    expected_output="3-5 personalized product recommendations with explanations",
    agent=recommender,
    context=[research_task, analysis_task],  # Depends on previous tasks
)

Run the Crew

# Create crew
crew = Crew(
    agents=[researcher, analyst, recommender],
    tasks=[research_task, analysis_task, recommendation_task],
    process=Process.sequential,  # Tasks run in order
    verbose=True,
)

# Run with input
result = crew.kickoff(
    inputs={"query": "running shoes for marathon training"}
)

print(result)

Financial Services Multi-Agent System

Example for collaborative financial analysis:

from crewai import Agent, Task, Crew, Process
from crewai_tools import BaseTool
from neo4j_agent_memory import MemoryClient
import json


# --- Memory Client ---
memory_client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

client_id = "CL-78901"
session_id = "advisory-crew-001"


# --- Specialized Tools ---

class GetClientProfileTool(BaseTool):
    name: str = "get_client_profile"
    description: str = "Get comprehensive client investment profile including risk tolerance, holdings, and preferences."
    memory_client: MemoryClient
    client_id: str

    class Config:
        arbitrary_types_allowed = True

    def _run(self) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        # Get preferences
        preferences = loop.run_until_complete(
            self.memory_client.long_term.get_preferences(user_id=self.client_id)
        )

        # Get holdings from context graph
        holdings = loop.run_until_complete(
            self.memory_client.long_term.execute_query(
                """
                MATCH (c:Client {id: $client_id})-[:HAS_ACCOUNT]->(a:Account)
                      -[:HOLDS]->(p:Position)-[:IN]->(s:Security)
                RETURN a.type as account, s.name as security, s.ticker as ticker,
                       p.shares as shares, s.sector as sector
                """,
                parameters={"client_id": self.client_id},
            )
        )

        return json.dumps({
            "preferences": [{"category": p.category, "value": p.preference} for p in preferences],
            "holdings": holdings,
        })


class SearchSecuritiesTool(BaseTool):
    name: str = "search_securities"
    description: str = "Search for securities by criteria. Can filter by sector, asset class, etc."
    memory_client: MemoryClient

    class Config:
        arbitrary_types_allowed = True

    def _run(self, query: str, sector: str = None, asset_class: str = None) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        filters = {}
        if sector:
            filters["sector"] = sector
        if asset_class:
            filters["asset_class"] = asset_class

        securities = loop.run_until_complete(
            self.memory_client.long_term.search_entities(
                query=query,
                entity_type="SECURITY",
                property_filter=filters if filters else None,
                limit=15,
            )
        )

        return json.dumps([
            {
                "name": s.name,
                "ticker": s.properties.get("ticker"),
                "sector": s.properties.get("sector"),
                "description": s.description,
            }
            for s in securities
        ])


class SaveAnalysisTool(BaseTool):
    name: str = "save_analysis"
    description: str = "Save analysis findings to the knowledge graph for other agents and future reference."
    memory_client: MemoryClient
    client_id: str

    class Config:
        arbitrary_types_allowed = True

    def _run(self, title: str, analysis_type: str, findings: str) -> str:
        import asyncio
        from datetime import datetime
        loop = asyncio.get_event_loop()

        entity = loop.run_until_complete(
            self.memory_client.long_term.add_entity(
                name=title,
                entity_type="ANALYSIS",
                description=findings,
                properties={
                    "analysis_type": analysis_type,
                    "client_id": self.client_id,
                    "created_at": datetime.now().isoformat(),
                },
            )
        )

        return f"Saved analysis: {entity.name}"


class GetPastRecommendationsTool(BaseTool):
    name: str = "get_past_recommendations"
    description: str = "Find past successful recommendations for similar situations."
    memory_client: MemoryClient
    client_id: str

    class Config:
        arbitrary_types_allowed = True

    def _run(self, topic: str) -> str:
        import asyncio
        loop = asyncio.get_event_loop()

        traces = loop.run_until_complete(
            self.memory_client.reasoning.get_similar_traces(
                task=topic,
                user_id=self.client_id,
                limit=3,
                success_only=True,
            )
        )

        return json.dumps([
            {
                "topic": t.task,
                "recommendation": t.result.get("recommendation") if t.result else None,
                "outcome": t.result.get("client_feedback") if t.result else None,
            }
            for t in traces
        ])


# --- Create Tools ---
profile_tool = GetClientProfileTool(memory_client=memory_client, client_id=client_id)
securities_tool = SearchSecuritiesTool(memory_client=memory_client)
analysis_tool = SaveAnalysisTool(memory_client=memory_client, client_id=client_id)
history_tool = GetPastRecommendationsTool(memory_client=memory_client, client_id=client_id)


# --- Create Specialized Agents ---

portfolio_analyst = Agent(
    role="Portfolio Analyst",
    goal="Analyze current portfolio composition and identify opportunities",
    backstory="""You are an experienced portfolio analyst who evaluates
    investment portfolios. You understand asset allocation, diversification,
    and risk management. You analyze client holdings and identify gaps or
    opportunities for improvement.""",
    tools=[profile_tool, securities_tool, analysis_tool],
    verbose=True,
)

risk_analyst = Agent(
    role="Risk Analyst",
    goal="Assess risk levels and ensure alignment with client risk tolerance",
    backstory="""You specialize in investment risk analysis. You evaluate
    portfolio risk metrics, ensure alignment with client risk profiles,
    and identify potential concerns. You consider market conditions and
    concentration risks.""",
    tools=[profile_tool, securities_tool, analysis_tool],
    verbose=True,
)

investment_strategist = Agent(
    role="Investment Strategist",
    goal="Develop strategic recommendations based on analysis",
    backstory="""You are a senior investment strategist who synthesizes
    portfolio and risk analysis into actionable recommendations. You
    consider client goals, market conditions, and past successful
    strategies to provide personalized advice.""",
    tools=[profile_tool, securities_tool, analysis_tool, history_tool],
    verbose=True,
)


# --- Define Tasks ---

portfolio_analysis = Task(
    description="""
    Analyze the client's current portfolio for: {investment_goal}

    1. Retrieve the client's profile and current holdings
    2. Evaluate current asset allocation by sector and asset class
    3. Identify concentration risks or gaps
    4. Search for securities that could address any gaps
    5. Save your analysis findings to the knowledge graph

    Provide a detailed portfolio analysis report.
    """,
    expected_output="Portfolio analysis report with current allocation and identified opportunities",
    agent=portfolio_analyst,
)

risk_assessment = Task(
    description="""
    Assess the risk profile for: {investment_goal}

    1. Review client's stated risk tolerance from preferences
    2. Evaluate current portfolio risk level
    3. Identify any misalignment between risk tolerance and holdings
    4. Consider market conditions and concentration risks
    5. Save your risk assessment to the knowledge graph

    Provide a risk assessment with specific concerns and recommendations.
    """,
    expected_output="Risk assessment report with alignment analysis",
    agent=risk_analyst,
)

strategic_recommendation = Task(
    description="""
    Develop investment recommendations for: {investment_goal}

    Using the portfolio analysis and risk assessment:
    1. Review past successful recommendations for this client
    2. Identify 3-5 specific investment opportunities
    3. Ensure recommendations align with risk tolerance
    4. Provide rationale for each recommendation
    5. Save final recommendations to the knowledge graph

    Provide actionable investment recommendations with clear rationale.
    """,
    expected_output="3-5 investment recommendations with rationale",
    agent=investment_strategist,
    context=[portfolio_analysis, risk_assessment],
)


# --- Create and Run Crew ---

advisory_crew = Crew(
    agents=[portfolio_analyst, risk_analyst, investment_strategist],
    tasks=[portfolio_analysis, risk_assessment, strategic_recommendation],
    process=Process.sequential,
    verbose=True,
)

# Run the crew
async def run_advisory_analysis(investment_goal: str):
    # Start reasoning trace for the overall task
    trace = await memory_client.reasoning.start_trace(
        task=f"Advisory analysis: {investment_goal}",
        user_id=client_id,
        session_id=session_id,
    )

    try:
        result = advisory_crew.kickoff(
            inputs={"investment_goal": investment_goal}
        )

        await memory_client.reasoning.complete_trace(
            trace_id=trace.id,
            outcome="success",
            result={"recommendation": str(result)[:1000]},
        )

        return result

    except Exception as e:
        await memory_client.reasoning.complete_trace(
            trace_id=trace.id,
            outcome="failure",
            error=str(e),
        )
        raise


# Usage
if __name__ == "__main__":
    import asyncio

    result = asyncio.run(run_advisory_analysis(
        "Increase technology sector exposure while maintaining moderate risk"
    ))

    print("\n" + "="*50)
    print("ADVISORY RECOMMENDATIONS")
    print("="*50)
    print(result)

Ecommerce Product Team

Multi-agent system for product recommendations:

from crewai import Agent, Task, Crew, Process


# --- Tools (using shared memory_client) ---
# Reuse tools from previous examples


# --- Agents ---

product_researcher = Agent(
    role="Product Researcher",
    goal="Research products and gather comprehensive product information",
    backstory="""You excel at finding and evaluating products. You search
    the product catalog, compare features, and identify the best options
    for customer needs.""",
    tools=[search_entities, save_entity],
    verbose=True,
)

customer_specialist = Agent(
    role="Customer Specialist",
    goal="Understand customer preferences and purchase patterns",
    backstory="""You are an expert at understanding customer needs. You
    analyze preferences, review purchase history, and identify what
    matters most to each customer.""",
    tools=[get_preferences, search_messages],
    verbose=True,
)

personalization_expert = Agent(
    role="Personalization Expert",
    goal="Match products to customer preferences for personalized recommendations",
    backstory="""You create personalized experiences by matching products
    to individual customer preferences. You consider past behavior,
    stated preferences, and current needs.""",
    tools=[search_entities, get_preferences, save_entity],
    verbose=True,
)


# --- Tasks ---

product_research = Task(
    description="""
    Research products for: {customer_request}

    1. Search the product catalog for relevant items
    2. Identify key features and differentiators
    3. Note price points and availability
    4. Save notable findings to the knowledge graph
    """,
    expected_output="Product research summary with top options",
    agent=product_researcher,
)

customer_analysis = Task(
    description="""
    Analyze customer context for: {customer_request}

    1. Retrieve customer preferences
    2. Search conversation history for additional context
    3. Identify key decision criteria
    4. Note any constraints (budget, brand, etc.)
    """,
    expected_output="Customer analysis with key preferences and criteria",
    agent=customer_specialist,
)

personalized_recommendations = Task(
    description="""
    Create personalized recommendations for: {customer_request}

    Using product research and customer analysis:
    1. Match products to customer preferences
    2. Rank by relevance and fit
    3. Explain personalization rationale
    4. Save recommendations to knowledge graph

    Provide 3-5 recommendations with personalized explanations.
    """,
    expected_output="Personalized product recommendations",
    agent=personalization_expert,
    context=[product_research, customer_analysis],
)


# --- Crew ---

recommendation_crew = Crew(
    agents=[product_researcher, customer_specialist, personalization_expert],
    tasks=[product_research, customer_analysis, personalized_recommendations],
    process=Process.sequential,
    verbose=True,
)

# Run
result = recommendation_crew.kickoff(
    inputs={"customer_request": "comfortable running shoes for daily training"}
)

Best Practices

1. Share Context Through the Graph

Let agents communicate via saved entities:

# Agent A saves finding
save_entity.run(
    name="Key Finding: Price sensitivity",
    entity_type="FINDING",
    description="Customer shows high price sensitivity based on past behavior",
)

# Agent B retrieves and uses it
findings = search_entities.run(query="price sensitivity finding")

2. Use Reasoning Traces for Crew Tasks

Track overall crew performance:

trace = await memory_client.reasoning.start_trace(
    task=f"Crew task: {goal}",
    user_id=user_id,
    metadata={"crew": "advisory", "agents": 3},
)

try:
    result = crew.kickoff(inputs=inputs)
    await memory_client.reasoning.complete_trace(trace_id=trace.id, outcome="success")
except:
    await memory_client.reasoning.complete_trace(trace_id=trace.id, outcome="failure")

3. Scope Tools Appropriately

Different agents may need different tool access:

# Research agent: read-heavy tools
researcher_tools = [search_entities, get_preferences]

# Writer agent: can also save
writer_tools = [search_entities, save_entity, save_analysis]

4. Handle Concurrent Access

Neo4j handles concurrent writes, but design for it:

# Use unique identifiers for saved entities
entity = await memory_client.long_term.add_entity(
    name=f"Analysis-{agent_name}-{timestamp}",
    entity_type="ANALYSIS",
    properties={"agent": agent_name},
)