Use with CrewAI
How to integrate neo4j-agent-memory with CrewAI for building multi-agent systems with shared context graphs.
Overview
CrewAI enables building multi-agent systems where specialized agents collaborate on complex tasks. Integrating with neo4j-agent-memory provides a shared context graph that all agents can read from and write to, enabling persistent memory across agent interactions.
| CrewAI + Shared Context Graph Architecture |
|---|
|
Prerequisites
-
Python 3.10+
-
neo4j-agent-memoryandcrewaiinstalled -
Neo4j database running
-
OpenAI API key
pip install neo4j-agent-memory crewai crewai-tools
Basic Integration
Create Memory Tools
Define tools that wrap neo4j-agent-memory for CrewAI agents:
from crewai_tools import BaseTool
from neo4j_agent_memory import MemoryClient
import json
class SearchEntitiesToolClass(BaseTool):
"""Tool for searching entities in the context graph."""
name: str = "search_entities"
description: str = """
Search the shared knowledge graph for entities like products, people,
organizations, or any other stored knowledge. Use this to find information
that other agents may have discovered.
Parameters:
- query: Search query describing what you're looking for
- entity_type: Optional filter (PRODUCT, PERSON, ORGANIZATION, etc.)
"""
memory_client: MemoryClient
class Config:
arbitrary_types_allowed = True
def _run(self, query: str, entity_type: str = None) -> str:
import asyncio
loop = asyncio.get_event_loop()
entities = loop.run_until_complete(
self.memory_client.long_term.search_entities(
query=query,
entity_type=entity_type,
limit=10,
)
)
return json.dumps([
{
"name": e.name,
"type": e.type,
"description": e.description,
"properties": e.properties,
}
for e in entities
])
class SaveEntityToolClass(BaseTool):
"""Tool for saving entities to the context graph."""
name: str = "save_entity"
description: str = """
Save a new entity to the shared knowledge graph. Other agents will be able
to access this information. Use this to record discoveries, facts, or
important information.
Parameters:
- name: Name of the entity
- entity_type: Type (PRODUCT, PERSON, ORGANIZATION, FINDING, etc.)
- description: Description of the entity
- properties: Optional dict of additional properties
"""
memory_client: MemoryClient
class Config:
arbitrary_types_allowed = True
def _run(
self,
name: str,
entity_type: str,
description: str = None,
properties: dict = None,
) -> str:
import asyncio
loop = asyncio.get_event_loop()
entity = loop.run_until_complete(
self.memory_client.long_term.add_entity(
name=name,
entity_type=entity_type,
description=description,
properties=properties or {},
)
)
return f"Saved entity: {entity.name} ({entity.type})"
class GetPreferencesToolClass(BaseTool):
"""Tool for getting user preferences."""
name: str = "get_preferences"
description: str = """
Get stored preferences for personalization. Returns user preferences
that should guide recommendations and responses.
"""
memory_client: MemoryClient
user_id: str
class Config:
arbitrary_types_allowed = True
def _run(self, category: str = None) -> str:
import asyncio
loop = asyncio.get_event_loop()
preferences = loop.run_until_complete(
self.memory_client.long_term.get_preferences(
user_id=self.user_id,
category=category,
)
)
return json.dumps([
{"category": p.category, "preference": p.preference}
for p in preferences
])
class SearchMessagesToolClass(BaseTool):
"""Tool for searching conversation history."""
name: str = "search_messages"
description: str = """
Search past conversation messages for relevant context. Use this to
understand what has been discussed previously.
"""
memory_client: MemoryClient
session_id: str
class Config:
arbitrary_types_allowed = True
def _run(self, query: str, limit: int = 5) -> str:
import asyncio
loop = asyncio.get_event_loop()
messages = loop.run_until_complete(
self.memory_client.short_term.search_messages(
query=query,
session_id=self.session_id,
limit=limit,
)
)
return json.dumps([
{"role": m.role, "content": m.content[:300]}
for m in messages
])
Create Agents with Shared Memory
from crewai import Agent, Task, Crew, Process
# Initialize shared memory client
memory_client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
user_id = "CUST-12345"
session_id = "crew-session-001"
# Create shared tools
search_entities = SearchEntitiesToolClass(memory_client=memory_client)
save_entity = SaveEntityToolClass(memory_client=memory_client)
get_preferences = GetPreferencesToolClass(memory_client=memory_client, user_id=user_id)
search_messages = SearchMessagesToolClass(memory_client=memory_client, session_id=session_id)
shared_tools = [search_entities, save_entity, get_preferences, search_messages]
# Create agents
researcher = Agent(
role="Research Specialist",
goal="Research products and gather information from the knowledge graph",
backstory="""You are an expert researcher who finds relevant information
from the shared knowledge base. You search for entities and save new
findings for other agents to use.""",
tools=shared_tools,
verbose=True,
)
analyst = Agent(
role="Preference Analyst",
goal="Analyze customer preferences and match them with findings",
backstory="""You specialize in understanding customer preferences and
ensuring recommendations align with their stated needs. You access
preference data and conversation history.""",
tools=shared_tools,
verbose=True,
)
recommender = Agent(
role="Recommendation Specialist",
goal="Create personalized recommendations based on research and preferences",
backstory="""You synthesize research findings and preference analysis
to create compelling, personalized recommendations. You save final
recommendations to the knowledge graph.""",
tools=shared_tools,
verbose=True,
)
Define Tasks
# Task 1: Research products
research_task = Task(
description="""
Research products related to: {query}
1. Search the knowledge graph for relevant products
2. Identify key features and attributes
3. Save any new findings as entities
4. Compile a summary of available options
Store your findings in the shared knowledge graph for other agents.
""",
expected_output="A structured summary of relevant products with key attributes",
agent=researcher,
)
# Task 2: Analyze preferences
analysis_task = Task(
description="""
Analyze customer preferences relevant to: {query}
1. Retrieve customer preferences from the knowledge graph
2. Search conversation history for additional context
3. Identify key criteria for recommendations
4. Note any constraints or requirements
Provide a preference profile for the recommendation specialist.
""",
expected_output="Customer preference profile with key criteria",
agent=analyst,
)
# Task 3: Create recommendations
recommendation_task = Task(
description="""
Create personalized recommendations for: {query}
Using the research findings and preference analysis:
1. Match products to customer preferences
2. Rank options by relevance
3. Explain why each recommendation fits
4. Save the final recommendations to the knowledge graph
Provide 3-5 personalized recommendations with explanations.
""",
expected_output="3-5 personalized product recommendations with explanations",
agent=recommender,
context=[research_task, analysis_task], # Depends on previous tasks
)
Run the Crew
# Create crew
crew = Crew(
agents=[researcher, analyst, recommender],
tasks=[research_task, analysis_task, recommendation_task],
process=Process.sequential, # Tasks run in order
verbose=True,
)
# Run with input
result = crew.kickoff(
inputs={"query": "running shoes for marathon training"}
)
print(result)
Financial Services Multi-Agent System
Example for collaborative financial analysis:
from crewai import Agent, Task, Crew, Process
from crewai_tools import BaseTool
from neo4j_agent_memory import MemoryClient
import json
# --- Memory Client ---
memory_client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
client_id = "CL-78901"
session_id = "advisory-crew-001"
# --- Specialized Tools ---
class GetClientProfileTool(BaseTool):
name: str = "get_client_profile"
description: str = "Get comprehensive client investment profile including risk tolerance, holdings, and preferences."
memory_client: MemoryClient
client_id: str
class Config:
arbitrary_types_allowed = True
def _run(self) -> str:
import asyncio
loop = asyncio.get_event_loop()
# Get preferences
preferences = loop.run_until_complete(
self.memory_client.long_term.get_preferences(user_id=self.client_id)
)
# Get holdings from context graph
holdings = loop.run_until_complete(
self.memory_client.long_term.execute_query(
"""
MATCH (c:Client {id: $client_id})-[:HAS_ACCOUNT]->(a:Account)
-[:HOLDS]->(p:Position)-[:IN]->(s:Security)
RETURN a.type as account, s.name as security, s.ticker as ticker,
p.shares as shares, s.sector as sector
""",
parameters={"client_id": self.client_id},
)
)
return json.dumps({
"preferences": [{"category": p.category, "value": p.preference} for p in preferences],
"holdings": holdings,
})
class SearchSecuritiesTool(BaseTool):
name: str = "search_securities"
description: str = "Search for securities by criteria. Can filter by sector, asset class, etc."
memory_client: MemoryClient
class Config:
arbitrary_types_allowed = True
def _run(self, query: str, sector: str = None, asset_class: str = None) -> str:
import asyncio
loop = asyncio.get_event_loop()
filters = {}
if sector:
filters["sector"] = sector
if asset_class:
filters["asset_class"] = asset_class
securities = loop.run_until_complete(
self.memory_client.long_term.search_entities(
query=query,
entity_type="SECURITY",
property_filter=filters if filters else None,
limit=15,
)
)
return json.dumps([
{
"name": s.name,
"ticker": s.properties.get("ticker"),
"sector": s.properties.get("sector"),
"description": s.description,
}
for s in securities
])
class SaveAnalysisTool(BaseTool):
name: str = "save_analysis"
description: str = "Save analysis findings to the knowledge graph for other agents and future reference."
memory_client: MemoryClient
client_id: str
class Config:
arbitrary_types_allowed = True
def _run(self, title: str, analysis_type: str, findings: str) -> str:
import asyncio
from datetime import datetime
loop = asyncio.get_event_loop()
entity = loop.run_until_complete(
self.memory_client.long_term.add_entity(
name=title,
entity_type="ANALYSIS",
description=findings,
properties={
"analysis_type": analysis_type,
"client_id": self.client_id,
"created_at": datetime.now().isoformat(),
},
)
)
return f"Saved analysis: {entity.name}"
class GetPastRecommendationsTool(BaseTool):
name: str = "get_past_recommendations"
description: str = "Find past successful recommendations for similar situations."
memory_client: MemoryClient
client_id: str
class Config:
arbitrary_types_allowed = True
def _run(self, topic: str) -> str:
import asyncio
loop = asyncio.get_event_loop()
traces = loop.run_until_complete(
self.memory_client.reasoning.get_similar_traces(
task=topic,
user_id=self.client_id,
limit=3,
success_only=True,
)
)
return json.dumps([
{
"topic": t.task,
"recommendation": t.result.get("recommendation") if t.result else None,
"outcome": t.result.get("client_feedback") if t.result else None,
}
for t in traces
])
# --- Create Tools ---
profile_tool = GetClientProfileTool(memory_client=memory_client, client_id=client_id)
securities_tool = SearchSecuritiesTool(memory_client=memory_client)
analysis_tool = SaveAnalysisTool(memory_client=memory_client, client_id=client_id)
history_tool = GetPastRecommendationsTool(memory_client=memory_client, client_id=client_id)
# --- Create Specialized Agents ---
portfolio_analyst = Agent(
role="Portfolio Analyst",
goal="Analyze current portfolio composition and identify opportunities",
backstory="""You are an experienced portfolio analyst who evaluates
investment portfolios. You understand asset allocation, diversification,
and risk management. You analyze client holdings and identify gaps or
opportunities for improvement.""",
tools=[profile_tool, securities_tool, analysis_tool],
verbose=True,
)
risk_analyst = Agent(
role="Risk Analyst",
goal="Assess risk levels and ensure alignment with client risk tolerance",
backstory="""You specialize in investment risk analysis. You evaluate
portfolio risk metrics, ensure alignment with client risk profiles,
and identify potential concerns. You consider market conditions and
concentration risks.""",
tools=[profile_tool, securities_tool, analysis_tool],
verbose=True,
)
investment_strategist = Agent(
role="Investment Strategist",
goal="Develop strategic recommendations based on analysis",
backstory="""You are a senior investment strategist who synthesizes
portfolio and risk analysis into actionable recommendations. You
consider client goals, market conditions, and past successful
strategies to provide personalized advice.""",
tools=[profile_tool, securities_tool, analysis_tool, history_tool],
verbose=True,
)
# --- Define Tasks ---
portfolio_analysis = Task(
description="""
Analyze the client's current portfolio for: {investment_goal}
1. Retrieve the client's profile and current holdings
2. Evaluate current asset allocation by sector and asset class
3. Identify concentration risks or gaps
4. Search for securities that could address any gaps
5. Save your analysis findings to the knowledge graph
Provide a detailed portfolio analysis report.
""",
expected_output="Portfolio analysis report with current allocation and identified opportunities",
agent=portfolio_analyst,
)
risk_assessment = Task(
description="""
Assess the risk profile for: {investment_goal}
1. Review client's stated risk tolerance from preferences
2. Evaluate current portfolio risk level
3. Identify any misalignment between risk tolerance and holdings
4. Consider market conditions and concentration risks
5. Save your risk assessment to the knowledge graph
Provide a risk assessment with specific concerns and recommendations.
""",
expected_output="Risk assessment report with alignment analysis",
agent=risk_analyst,
)
strategic_recommendation = Task(
description="""
Develop investment recommendations for: {investment_goal}
Using the portfolio analysis and risk assessment:
1. Review past successful recommendations for this client
2. Identify 3-5 specific investment opportunities
3. Ensure recommendations align with risk tolerance
4. Provide rationale for each recommendation
5. Save final recommendations to the knowledge graph
Provide actionable investment recommendations with clear rationale.
""",
expected_output="3-5 investment recommendations with rationale",
agent=investment_strategist,
context=[portfolio_analysis, risk_assessment],
)
# --- Create and Run Crew ---
advisory_crew = Crew(
agents=[portfolio_analyst, risk_analyst, investment_strategist],
tasks=[portfolio_analysis, risk_assessment, strategic_recommendation],
process=Process.sequential,
verbose=True,
)
# Run the crew
async def run_advisory_analysis(investment_goal: str):
# Start reasoning trace for the overall task
trace = await memory_client.reasoning.start_trace(
task=f"Advisory analysis: {investment_goal}",
user_id=client_id,
session_id=session_id,
)
try:
result = advisory_crew.kickoff(
inputs={"investment_goal": investment_goal}
)
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="success",
result={"recommendation": str(result)[:1000]},
)
return result
except Exception as e:
await memory_client.reasoning.complete_trace(
trace_id=trace.id,
outcome="failure",
error=str(e),
)
raise
# Usage
if __name__ == "__main__":
import asyncio
result = asyncio.run(run_advisory_analysis(
"Increase technology sector exposure while maintaining moderate risk"
))
print("\n" + "="*50)
print("ADVISORY RECOMMENDATIONS")
print("="*50)
print(result)
Ecommerce Product Team
Multi-agent system for product recommendations:
from crewai import Agent, Task, Crew, Process
# --- Tools (using shared memory_client) ---
# Reuse tools from previous examples
# --- Agents ---
product_researcher = Agent(
role="Product Researcher",
goal="Research products and gather comprehensive product information",
backstory="""You excel at finding and evaluating products. You search
the product catalog, compare features, and identify the best options
for customer needs.""",
tools=[search_entities, save_entity],
verbose=True,
)
customer_specialist = Agent(
role="Customer Specialist",
goal="Understand customer preferences and purchase patterns",
backstory="""You are an expert at understanding customer needs. You
analyze preferences, review purchase history, and identify what
matters most to each customer.""",
tools=[get_preferences, search_messages],
verbose=True,
)
personalization_expert = Agent(
role="Personalization Expert",
goal="Match products to customer preferences for personalized recommendations",
backstory="""You create personalized experiences by matching products
to individual customer preferences. You consider past behavior,
stated preferences, and current needs.""",
tools=[search_entities, get_preferences, save_entity],
verbose=True,
)
# --- Tasks ---
product_research = Task(
description="""
Research products for: {customer_request}
1. Search the product catalog for relevant items
2. Identify key features and differentiators
3. Note price points and availability
4. Save notable findings to the knowledge graph
""",
expected_output="Product research summary with top options",
agent=product_researcher,
)
customer_analysis = Task(
description="""
Analyze customer context for: {customer_request}
1. Retrieve customer preferences
2. Search conversation history for additional context
3. Identify key decision criteria
4. Note any constraints (budget, brand, etc.)
""",
expected_output="Customer analysis with key preferences and criteria",
agent=customer_specialist,
)
personalized_recommendations = Task(
description="""
Create personalized recommendations for: {customer_request}
Using product research and customer analysis:
1. Match products to customer preferences
2. Rank by relevance and fit
3. Explain personalization rationale
4. Save recommendations to knowledge graph
Provide 3-5 recommendations with personalized explanations.
""",
expected_output="Personalized product recommendations",
agent=personalization_expert,
context=[product_research, customer_analysis],
)
# --- Crew ---
recommendation_crew = Crew(
agents=[product_researcher, customer_specialist, personalization_expert],
tasks=[product_research, customer_analysis, personalized_recommendations],
process=Process.sequential,
verbose=True,
)
# Run
result = recommendation_crew.kickoff(
inputs={"customer_request": "comfortable running shoes for daily training"}
)
Best Practices
1. Share Context Through the Graph
Let agents communicate via saved entities:
# Agent A saves finding
save_entity.run(
name="Key Finding: Price sensitivity",
entity_type="FINDING",
description="Customer shows high price sensitivity based on past behavior",
)
# Agent B retrieves and uses it
findings = search_entities.run(query="price sensitivity finding")
2. Use Reasoning Traces for Crew Tasks
Track overall crew performance:
trace = await memory_client.reasoning.start_trace(
task=f"Crew task: {goal}",
user_id=user_id,
metadata={"crew": "advisory", "agents": 3},
)
try:
result = crew.kickoff(inputs=inputs)
await memory_client.reasoning.complete_trace(trace_id=trace.id, outcome="success")
except:
await memory_client.reasoning.complete_trace(trace_id=trace.id, outcome="failure")
3. Scope Tools Appropriately
Different agents may need different tool access:
# Research agent: read-heavy tools
researcher_tools = [search_entities, get_preferences]
# Writer agent: can also save
writer_tools = [search_entities, save_entity, save_analysis]
4. Handle Concurrent Access
Neo4j handles concurrent writes, but design for it:
# Use unique identifiers for saved entities
entity = await memory_client.long_term.add_entity(
name=f"Analysis-{agent_name}-{timestamp}",
entity_type="ANALYSIS",
properties={"agent": agent_name},
)