Store and Search Messages
How to store conversation messages and retrieve them using semantic search.
Prerequisites
-
Neo4j database running with vector index support (Neo4j 5.13+)
-
neo4j-agent-memoryinstalled -
Embedding service configured (OpenAI, Sentence Transformers, etc.)
Store Messages
Basic Message Storage
Store a single message with automatic embedding generation:
from neo4j_agent_memory import MemoryClient
client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
# Store a customer service message
message = await client.short_term.add_message(
role="user",
content="I'd like to return the Nike Air Max shoes I ordered last week. Order #12345.",
session_id="support-session-001",
)
print(f"Stored message: {message.id}")
Store with Metadata
Add custom metadata to messages for filtering and context:
# Store an advisory conversation message
message = await client.short_term.add_message(
role="user",
content="I want to increase my exposure to technology stocks, maybe 20% of the portfolio.",
session_id="advisory-call-2024-01-15",
metadata={
"client_id": "CL-78901",
"advisor_id": "ADV-456",
"account_type": "investment",
"risk_profile": "moderate-growth",
"compliance_recorded": True,
},
)
# Store a shopping assistant message
message = await client.short_term.add_message(
role="assistant",
content="Based on your previous purchases, I recommend the new Nike ZoomX running shoes. They're similar to the Air Max you loved but with better cushioning for long runs.",
session_id="shopping-assistant-001",
metadata={
"customer_id": "CUST-12345",
"channel": "mobile_app",
"page_context": "product_discovery",
"recommended_products": ["SKU-NKE-ZMX-001", "SKU-NKE-ZMX-002"],
},
)
Batch Message Loading
Load multiple messages at once, such as importing conversation history:
# Import historical support tickets
messages = [
{
"role": "user",
"content": "My order hasn't arrived yet. It's been 10 days.",
"timestamp": "2024-01-10T09:00:00Z",
"metadata": {"ticket_id": "TKT-001", "priority": "high"}
},
{
"role": "assistant",
"content": "I apologize for the delay. Let me check the shipping status for you.",
"timestamp": "2024-01-10T09:01:00Z",
"metadata": {"ticket_id": "TKT-001", "agent_id": "AGT-123"}
},
{
"role": "assistant",
"content": "Your order was delayed due to weather. It's now out for delivery.",
"timestamp": "2024-01-10T09:03:00Z",
"metadata": {"ticket_id": "TKT-001", "resolution": "shipping_update"}
},
]
await client.short_term.add_messages_batch(
messages=messages,
session_id="historical-import-001",
)
Search Messages
Semantic Search
Find messages by meaning, not just keywords:
# Find discussions about portfolio allocation
results = await client.short_term.search_messages(
query="client wants to adjust investment allocation",
limit=10,
)
for msg in results:
print(f"[{msg.role}] {msg.content[:100]}...")
print(f" Session: {msg.session_id}, Score: {msg.score:.3f}")
Filter by Session
Search within a specific conversation:
# Search within a specific advisory call
results = await client.short_term.search_messages(
query="risk tolerance",
session_id="advisory-call-2024-01-15",
limit=5,
)
Filter by Metadata
Combine semantic search with metadata filters:
results = await client.short_term.search_messages(
query="product return refund",
metadata_filter={
"priority": "high",
"channel": "phone",
},
limit=20,
)
results = await client.short_term.search_messages(
query="concentrated position single stock",
metadata_filter={
"risk_profile": ["aggressive", "moderate-growth"],
"compliance_recorded": True,
},
limit=50,
)
Time-Based Filtering
Search messages within a time range:
from datetime import datetime, timedelta
# Find recent messages about shipping issues
one_week_ago = datetime.now() - timedelta(days=7)
results = await client.short_term.search_messages(
query="shipping delay delivery problem",
after=one_week_ago,
limit=50,
)
Manage Sessions
List Sessions
Get all conversation sessions:
sessions = await client.short_term.list_sessions(
limit=20,
metadata_filter={"channel": "mobile_app"},
)
for session in sessions:
print(f"Session: {session.id}")
print(f" Messages: {session.message_count}")
print(f" Started: {session.created_at}")
print(f" Last activity: {session.last_message_at}")
Get Session Messages
Retrieve all messages in a session:
# Get full conversation history
messages = await client.short_term.get_session_messages(
session_id="support-session-001",
limit=100,
)
for msg in messages:
print(f"[{msg.timestamp}] {msg.role}: {msg.content}")
Delete Sessions
Remove a session and all its messages:
# Delete completed support session
await client.short_term.delete_session(
session_id="support-session-001",
)
Generate Summaries
Create summaries of long conversations for efficient context:
Auto-Summarize Session
# Generate summary of advisory call
summary = await client.short_term.summarize_session(
session_id="advisory-call-2024-01-15",
)
print(f"Summary: {summary.content}")
print(f"Key topics: {summary.topics}")
print(f"Action items: {summary.action_items}")
Custom Summary Prompt
Provide domain-specific summarization instructions:
summary = await client.short_term.summarize_session(
session_id="advisory-call-2024-01-15",
prompt="""
Summarize this financial advisory conversation. Include:
1. Client's stated investment goals
2. Risk tolerance discussed
3. Specific securities or sectors mentioned
4. Any compliance-relevant statements
5. Agreed next steps
""",
)
summary = await client.short_term.summarize_session(
session_id="support-session-001",
prompt="""
Summarize this customer support interaction. Include:
1. Customer's issue or request
2. Products mentioned (with SKUs if available)
3. Resolution provided
4. Customer satisfaction indicators
5. Any follow-up actions needed
""",
)
Build Conversation Context
Combine message history with other memory types for rich context:
Get Combined Context
# Get comprehensive context for agent response
context = await client.get_context(
query="What running shoes should I recommend?",
session_id="shopping-assistant-001",
include_short_term=True, # Recent messages
include_long_term=True, # Customer preferences and entities
include_reasoning=True, # Past successful recommendations
short_term_limit=10,
long_term_limit=5,
)
print("Recent conversation:")
for msg in context.messages:
print(f" [{msg.role}]: {msg.content[:80]}...")
print("\nCustomer preferences:")
for pref in context.preferences:
print(f" {pref.category}: {pref.preference}")
print("\nRelevant entities:")
for entity in context.entities:
print(f" {entity.name} ({entity.type})")
Format for LLM
Format context for inclusion in LLM prompts:
def format_context_for_prompt(context):
parts = []
# Add conversation history
if context.messages:
parts.append("## Recent Conversation")
for msg in context.messages[-5:]: # Last 5 messages
parts.append(f"**{msg.role}**: {msg.content}")
# Add preferences
if context.preferences:
parts.append("\n## Customer Preferences")
for pref in context.preferences:
parts.append(f"- {pref.category}: {pref.preference}")
# Add relevant entities
if context.entities:
parts.append("\n## Relevant Information")
for entity in context.entities:
if entity.description:
parts.append(f"- **{entity.name}**: {entity.description}")
return "\n".join(parts)
# Use in agent prompt
context = await client.get_context(query=user_message, session_id=session_id)
system_prompt = f"""You are a helpful shopping assistant.
{format_context_for_prompt(context)}
Use the above context to provide personalized recommendations.
"""
Best Practices
1. Use Meaningful Session IDs
Structure session IDs for easy filtering and debugging:
# Good: Descriptive and filterable
session_id = f"support-{customer_id}-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
session_id = f"advisory-{advisor_id}-{client_id}-{date}"
# Avoid: Generic UUIDs with no context
session_id = str(uuid.uuid4()) # Hard to debug
2. Include Searchable Metadata
Add metadata that supports your search patterns:
# Metadata enables powerful filtering
metadata = {
"customer_id": customer_id,
"channel": "web", # Filter by channel
"intent": "product_return", # Filter by intent
"sentiment": "negative", # Filter by sentiment
"language": "en", # Filter by language
"agent_id": agent_id, # Filter by agent
}
3. Summarize Long Sessions
Generate summaries to avoid context window limits:
# Check if session is getting long
messages = await client.short_term.get_session_messages(session_id)
if len(messages) > 20:
# Generate summary of older messages
summary = await client.short_term.summarize_session(
session_id=session_id,
message_limit=15, # Summarize first 15 messages
)
# Use summary + recent messages for context
recent_messages = messages[-5:]
4. Archive Completed Sessions
Move completed conversations to long-term storage:
# After support ticket resolved
summary = await client.short_term.summarize_session(session_id)
# Store summary as an entity for long-term reference
await client.long_term.add_entity(
name=f"Support Case {ticket_id}",
entity_type="SUPPORT_CASE",
properties={
"summary": summary.content,
"customer_id": customer_id,
"resolution": resolution_type,
"resolved_at": datetime.now().isoformat(),
},
)
# Optionally delete the detailed messages
await client.short_term.delete_session(session_id)