Process Documents in Batch
Table of Contents
How to efficiently process large document collections to build your context graph.
Overview
Batch processing enables efficient extraction of entities and relationships from large document collections, populating your context graph at scale.
Prerequisites
-
neo4j-agent-memoryinstalled -
Extraction models configured (GLiNER, spaCy, or LLM)
-
Sufficient memory for batch sizes
Basic Batch Extraction
Process Multiple Texts
from neo4j_agent_memory.extraction import GLiNEREntityExtractor
extractor = GLiNEREntityExtractor.for_schema("ecommerce")
# List of documents to process
documents = [
"Customer John ordered Nike Air Max shoes from our Manhattan store.",
"Jane returned the Adidas Ultraboost due to sizing issues.",
"Order #12345 shipped via FedEx to Brooklyn, NY.",
# ... hundreds more
]
# Batch extraction
result = await extractor.extract_batch(
texts=documents,
batch_size=10,
max_concurrency=5,
)
print(f"Processed: {result.successful_items}/{result.total_items}")
print(f"Total entities: {result.total_entities}")
print(f"Total relations: {result.total_relations}")
With Progress Tracking
def on_progress(completed: int, total: int):
percent = (completed / total) * 100
print(f"Progress: {completed}/{total} ({percent:.1f}%)")
result = await extractor.extract_batch(
texts=documents,
batch_size=10,
max_concurrency=5,
on_progress=on_progress,
)
Access Individual Results
result = await extractor.extract_batch(texts=documents)
for i, item_result in enumerate(result.items):
if item_result.success:
print(f"Doc {i}: {item_result.entity_count} entities")
for entity in item_result.entities:
print(f" - {entity.name} ({entity.type})")
else:
print(f"Doc {i}: Failed - {item_result.error}")
Streaming Extraction for Long Documents
Process Large Documents in Chunks
from neo4j_agent_memory.extraction import StreamingExtractor
# Wrap extractor for streaming
streamer = StreamingExtractor(
extractor=extractor,
chunk_size=4000, # Characters per chunk
overlap=200, # Overlap to avoid splitting entities
)
# Process a long document
long_document = open("annual_report.txt").read() # 100K+ characters
# Stream results as they're extracted
async for chunk_result in streamer.extract_streaming(long_document):
print(f"Chunk {chunk_result.chunk.index}: {chunk_result.entity_count} entities")
# Store entities as they come
for entity in chunk_result.entities:
await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
)
Get Complete Deduplicated Result
# Extract with automatic cross-chunk deduplication
result = await streamer.extract(long_document, deduplicate=True)
print(f"Total chunks: {result.stats.total_chunks}")
print(f"Raw entities: {result.stats.raw_entity_count}")
print(f"Unique entities: {result.stats.deduplicated_entities}")
for entity in result.entities:
print(f" {entity.name} ({entity.type})")
Store Batch Results in Context Graph
Basic Storage
from neo4j_agent_memory import MemoryClient
client = MemoryClient(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password",
)
# Extract batch
result = await extractor.extract_batch(texts=documents)
# Store all entities
for item in result.items:
if not item.success:
continue
for entity in item.entities:
await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
properties={
"confidence": entity.confidence,
"source_index": item.index,
},
)
With Deduplication
from neo4j_agent_memory.memory import DeduplicationConfig
dedup_config = DeduplicationConfig(
auto_merge_threshold=0.95,
flag_threshold=0.85,
)
stored_count = 0
merged_count = 0
for item in result.items:
if not item.success:
continue
for entity in item.entities:
stored, dedup_result = await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
deduplication=dedup_config,
)
if dedup_result.action == "created":
stored_count += 1
elif dedup_result.action == "merged":
merged_count += 1
print(f"New entities: {stored_count}")
print(f"Merged with existing: {merged_count}")
With Provenance Tracking
# Register the extractor for provenance
await client.long_term.register_extractor(
name="GLiNEREntityExtractor",
version="1.0",
config={"schema": "ecommerce"},
)
# Store with provenance
for doc_index, item in enumerate(result.items):
if not item.success:
continue
# Store document as a message/source
message = await client.short_term.add_message(
role="system",
content=documents[doc_index],
session_id="batch-import",
metadata={"doc_index": doc_index},
)
for entity in item.entities:
stored = await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
)
# Link to source document
await client.long_term.link_entity_to_message(
entity=stored,
message_id=message.id,
confidence=entity.confidence,
start_pos=entity.start,
end_pos=entity.end,
)
# Link to extractor
await client.long_term.link_entity_to_extractor(
entity=stored,
extractor_name="GLiNEREntityExtractor",
confidence=entity.confidence,
)
Financial Document Processing
Example for processing financial documents:
from neo4j_agent_memory.extraction import GLiNEREntityExtractor, ExtractionPipeline
# Configure for financial domain
extractor = GLiNEREntityExtractor.for_schema("financial")
async def process_financial_documents(documents: list[dict]):
"""Process financial documents and build context graph."""
# Extract texts
texts = [doc["content"] for doc in documents]
# Batch extraction
result = await extractor.extract_batch(
texts=texts,
batch_size=10,
on_progress=lambda d, t: print(f"Extracting: {d}/{t}"),
)
# Store in context graph
for i, item in enumerate(result.items):
if not item.success:
print(f"Failed: {documents[i]['id']} - {item.error}")
continue
doc = documents[i]
# Store document metadata as entity
doc_entity = await client.long_term.add_entity(
name=doc.get("title", f"Document {doc['id']}"),
entity_type="DOCUMENT",
properties={
"doc_id": doc["id"],
"doc_type": doc.get("type"),
"date": doc.get("date"),
},
)
# Store extracted entities with relationships
for entity in item.entities:
stored = await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
)
# Link entity to document
await client.long_term.add_relationship(
from_entity=stored.id,
to_entity=doc_entity.id,
relationship_type="MENTIONED_IN",
properties={
"confidence": entity.confidence,
"position": entity.start,
},
)
# Store relationships if extracted
for relation in item.relations:
# Find or create source/target entities
source = await client.long_term.search_entities(
query=relation.source,
limit=1,
)
target = await client.long_term.search_entities(
query=relation.target,
limit=1,
)
if source and target:
await client.long_term.add_relationship(
from_entity=source[0].id,
to_entity=target[0].id,
relationship_type=relation.type,
properties={"source_doc": doc["id"]},
)
return result
# Usage
documents = [
{
"id": "10K-2024",
"title": "Annual Report 2024",
"type": "SEC Filing",
"date": "2024-03-15",
"content": "Apple Inc. reported record revenue...",
},
{
"id": "earnings-Q4",
"title": "Q4 Earnings Call",
"type": "Transcript",
"date": "2024-01-25",
"content": "CEO Tim Cook discussed...",
},
# ... more documents
]
result = await process_financial_documents(documents)
Ecommerce Catalog Import
Example for importing product catalogs:
async def import_product_catalog(products: list[dict]):
"""Import product catalog into context graph."""
# Prepare product descriptions for extraction
texts = []
for product in products:
text = f"""
Product: {product['name']}
Brand: {product.get('brand', 'Unknown')}
Category: {product.get('category', 'Unknown')}
Description: {product.get('description', '')}
"""
texts.append(text)
# Extract additional entities from descriptions
extractor = GLiNEREntityExtractor.for_schema("ecommerce")
result = await extractor.extract_batch(
texts=texts,
batch_size=20,
)
for i, product in enumerate(products):
# Store product as primary entity
product_entity = await client.long_term.add_entity(
name=product["name"],
entity_type="PRODUCT",
description=product.get("description"),
properties={
"sku": product["sku"],
"price": product.get("price"),
"rating": product.get("rating"),
},
deduplicate=False, # SKUs are unique
)
# Store brand as entity
if product.get("brand"):
brand_entity = await client.long_term.add_entity(
name=product["brand"],
entity_type="BRAND",
)
await client.long_term.add_relationship(
from_entity=product_entity.id,
to_entity=brand_entity.id,
relationship_type="MADE_BY",
)
# Store category as entity
if product.get("category"):
category_entity = await client.long_term.add_entity(
name=product["category"],
entity_type="CATEGORY",
)
await client.long_term.add_relationship(
from_entity=product_entity.id,
to_entity=category_entity.id,
relationship_type="IN_CATEGORY",
)
# Link extracted entities to product
if result.items[i].success:
for entity in result.items[i].entities:
if entity.type not in ["PRODUCT", "BRAND"]: # Avoid duplicates
linked = await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
)
await client.long_term.add_relationship(
from_entity=product_entity.id,
to_entity=linked.id,
relationship_type="RELATED_TO",
)
print(f"Imported {len(products)} products")
# Usage
products = [
{
"sku": "NKE-AM90-001",
"name": "Nike Air Max 90",
"brand": "Nike",
"category": "Running Shoes",
"price": 129.99,
"description": "Classic running shoe with Air cushioning...",
},
# ... more products
]
await import_product_catalog(products)
Performance Optimization
Tune Batch Size
# Smaller batches for memory-constrained environments
result = await extractor.extract_batch(
texts=documents,
batch_size=5, # Smaller batches
max_concurrency=2,
)
# Larger batches for high-memory environments with GPU
result = await extractor.extract_batch(
texts=documents,
batch_size=50, # Larger batches
max_concurrency=10,
)
Use Appropriate Extractor
# High-volume, lower precision
fast_extractor = SpacyEntityExtractor() # Fastest
# Balanced
balanced_extractor = GLiNEREntityExtractor.for_schema("ecommerce")
# High precision, slower
accurate_pipeline = ExtractionPipeline(
stages=[
GLiNEREntityExtractor.for_schema("ecommerce"),
LLMEntityExtractor(model="gpt-4o-mini"),
],
)
Skip Failures and Continue
result = await extractor.extract_batch(
texts=documents,
fail_fast=False, # Continue on errors (default)
)
# Check for failures
failed = [item for item in result.items if not item.success]
print(f"Failed: {len(failed)}/{result.total_items}")
for item in failed:
print(f" Index {item.index}: {item.error}")
Best Practices
1. Validate Before Processing
# Filter out invalid documents
valid_docs = [
doc for doc in documents
if doc.get("content") and len(doc["content"]) > 10
]
print(f"Processing {len(valid_docs)} of {len(documents)} documents")
result = await extractor.extract_batch(texts=[d["content"] for d in valid_docs])
2. Log Progress and Errors
import logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
def on_progress(completed, total):
logger.info(f"Progress: {completed}/{total}")
result = await extractor.extract_batch(
texts=documents,
on_progress=on_progress,
)
for item in result.items:
if not item.success:
logger.error(f"Failed doc {item.index}: {item.error}")
3. Process in Manageable Chunks
# For very large document sets, process in chunks
CHUNK_SIZE = 1000
for i in range(0, len(all_documents), CHUNK_SIZE):
chunk = all_documents[i:i + CHUNK_SIZE]
print(f"Processing chunk {i//CHUNK_SIZE + 1}")
result = await extractor.extract_batch(texts=chunk)
# Store results before next chunk
await store_results(result)
4. Use Appropriate Deduplication
# For known-clean data, skip deduplication
await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
deduplicate=False, # Faster for trusted sources
)
# For user-generated content, enable deduplication
await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
deduplication=dedup_config,
)