Process Documents in Batch

How to efficiently process large document collections to build your context graph.

Overview

Batch processing enables efficient extraction of entities and relationships from large document collections, populating your context graph at scale.

Prerequisites

  • neo4j-agent-memory installed

  • Extraction models configured (GLiNER, spaCy, or LLM)

  • Sufficient memory for batch sizes

Basic Batch Extraction

Process Multiple Texts

from neo4j_agent_memory.extraction import GLiNEREntityExtractor

extractor = GLiNEREntityExtractor.for_schema("ecommerce")

# List of documents to process
documents = [
    "Customer John ordered Nike Air Max shoes from our Manhattan store.",
    "Jane returned the Adidas Ultraboost due to sizing issues.",
    "Order #12345 shipped via FedEx to Brooklyn, NY.",
    # ... hundreds more
]

# Batch extraction
result = await extractor.extract_batch(
    texts=documents,
    batch_size=10,
    max_concurrency=5,
)

print(f"Processed: {result.successful_items}/{result.total_items}")
print(f"Total entities: {result.total_entities}")
print(f"Total relations: {result.total_relations}")

With Progress Tracking

def on_progress(completed: int, total: int):
    percent = (completed / total) * 100
    print(f"Progress: {completed}/{total} ({percent:.1f}%)")

result = await extractor.extract_batch(
    texts=documents,
    batch_size=10,
    max_concurrency=5,
    on_progress=on_progress,
)

Access Individual Results

result = await extractor.extract_batch(texts=documents)

for i, item_result in enumerate(result.items):
    if item_result.success:
        print(f"Doc {i}: {item_result.entity_count} entities")
        for entity in item_result.entities:
            print(f"  - {entity.name} ({entity.type})")
    else:
        print(f"Doc {i}: Failed - {item_result.error}")

Streaming Extraction for Long Documents

Process Large Documents in Chunks

from neo4j_agent_memory.extraction import StreamingExtractor

# Wrap extractor for streaming
streamer = StreamingExtractor(
    extractor=extractor,
    chunk_size=4000,    # Characters per chunk
    overlap=200,        # Overlap to avoid splitting entities
)

# Process a long document
long_document = open("annual_report.txt").read()  # 100K+ characters

# Stream results as they're extracted
async for chunk_result in streamer.extract_streaming(long_document):
    print(f"Chunk {chunk_result.chunk.index}: {chunk_result.entity_count} entities")

    # Store entities as they come
    for entity in chunk_result.entities:
        await client.long_term.add_entity(
            name=entity.name,
            entity_type=entity.type,
        )

Get Complete Deduplicated Result

# Extract with automatic cross-chunk deduplication
result = await streamer.extract(long_document, deduplicate=True)

print(f"Total chunks: {result.stats.total_chunks}")
print(f"Raw entities: {result.stats.raw_entity_count}")
print(f"Unique entities: {result.stats.deduplicated_entities}")

for entity in result.entities:
    print(f"  {entity.name} ({entity.type})")

Store Batch Results in Context Graph

Basic Storage

from neo4j_agent_memory import MemoryClient

client = MemoryClient(
    neo4j_uri="bolt://localhost:7687",
    neo4j_user="neo4j",
    neo4j_password="password",
)

# Extract batch
result = await extractor.extract_batch(texts=documents)

# Store all entities
for item in result.items:
    if not item.success:
        continue

    for entity in item.entities:
        await client.long_term.add_entity(
            name=entity.name,
            entity_type=entity.type,
            properties={
                "confidence": entity.confidence,
                "source_index": item.index,
            },
        )

With Deduplication

from neo4j_agent_memory.memory import DeduplicationConfig

dedup_config = DeduplicationConfig(
    auto_merge_threshold=0.95,
    flag_threshold=0.85,
)

stored_count = 0
merged_count = 0

for item in result.items:
    if not item.success:
        continue

    for entity in item.entities:
        stored, dedup_result = await client.long_term.add_entity(
            name=entity.name,
            entity_type=entity.type,
            deduplication=dedup_config,
        )

        if dedup_result.action == "created":
            stored_count += 1
        elif dedup_result.action == "merged":
            merged_count += 1

print(f"New entities: {stored_count}")
print(f"Merged with existing: {merged_count}")

With Provenance Tracking

# Register the extractor for provenance
await client.long_term.register_extractor(
    name="GLiNEREntityExtractor",
    version="1.0",
    config={"schema": "ecommerce"},
)

# Store with provenance
for doc_index, item in enumerate(result.items):
    if not item.success:
        continue

    # Store document as a message/source
    message = await client.short_term.add_message(
        role="system",
        content=documents[doc_index],
        session_id="batch-import",
        metadata={"doc_index": doc_index},
    )

    for entity in item.entities:
        stored = await client.long_term.add_entity(
            name=entity.name,
            entity_type=entity.type,
        )

        # Link to source document
        await client.long_term.link_entity_to_message(
            entity=stored,
            message_id=message.id,
            confidence=entity.confidence,
            start_pos=entity.start,
            end_pos=entity.end,
        )

        # Link to extractor
        await client.long_term.link_entity_to_extractor(
            entity=stored,
            extractor_name="GLiNEREntityExtractor",
            confidence=entity.confidence,
        )

Financial Document Processing

Example for processing financial documents:

from neo4j_agent_memory.extraction import GLiNEREntityExtractor, ExtractionPipeline

# Configure for financial domain
extractor = GLiNEREntityExtractor.for_schema("financial")

async def process_financial_documents(documents: list[dict]):
    """Process financial documents and build context graph."""

    # Extract texts
    texts = [doc["content"] for doc in documents]

    # Batch extraction
    result = await extractor.extract_batch(
        texts=texts,
        batch_size=10,
        on_progress=lambda d, t: print(f"Extracting: {d}/{t}"),
    )

    # Store in context graph
    for i, item in enumerate(result.items):
        if not item.success:
            print(f"Failed: {documents[i]['id']} - {item.error}")
            continue

        doc = documents[i]

        # Store document metadata as entity
        doc_entity = await client.long_term.add_entity(
            name=doc.get("title", f"Document {doc['id']}"),
            entity_type="DOCUMENT",
            properties={
                "doc_id": doc["id"],
                "doc_type": doc.get("type"),
                "date": doc.get("date"),
            },
        )

        # Store extracted entities with relationships
        for entity in item.entities:
            stored = await client.long_term.add_entity(
                name=entity.name,
                entity_type=entity.type,
            )

            # Link entity to document
            await client.long_term.add_relationship(
                from_entity=stored.id,
                to_entity=doc_entity.id,
                relationship_type="MENTIONED_IN",
                properties={
                    "confidence": entity.confidence,
                    "position": entity.start,
                },
            )

        # Store relationships if extracted
        for relation in item.relations:
            # Find or create source/target entities
            source = await client.long_term.search_entities(
                query=relation.source,
                limit=1,
            )
            target = await client.long_term.search_entities(
                query=relation.target,
                limit=1,
            )

            if source and target:
                await client.long_term.add_relationship(
                    from_entity=source[0].id,
                    to_entity=target[0].id,
                    relationship_type=relation.type,
                    properties={"source_doc": doc["id"]},
                )

    return result


# Usage
documents = [
    {
        "id": "10K-2024",
        "title": "Annual Report 2024",
        "type": "SEC Filing",
        "date": "2024-03-15",
        "content": "Apple Inc. reported record revenue...",
    },
    {
        "id": "earnings-Q4",
        "title": "Q4 Earnings Call",
        "type": "Transcript",
        "date": "2024-01-25",
        "content": "CEO Tim Cook discussed...",
    },
    # ... more documents
]

result = await process_financial_documents(documents)

Ecommerce Catalog Import

Example for importing product catalogs:

async def import_product_catalog(products: list[dict]):
    """Import product catalog into context graph."""

    # Prepare product descriptions for extraction
    texts = []
    for product in products:
        text = f"""
        Product: {product['name']}
        Brand: {product.get('brand', 'Unknown')}
        Category: {product.get('category', 'Unknown')}
        Description: {product.get('description', '')}
        """
        texts.append(text)

    # Extract additional entities from descriptions
    extractor = GLiNEREntityExtractor.for_schema("ecommerce")
    result = await extractor.extract_batch(
        texts=texts,
        batch_size=20,
    )

    for i, product in enumerate(products):
        # Store product as primary entity
        product_entity = await client.long_term.add_entity(
            name=product["name"],
            entity_type="PRODUCT",
            description=product.get("description"),
            properties={
                "sku": product["sku"],
                "price": product.get("price"),
                "rating": product.get("rating"),
            },
            deduplicate=False,  # SKUs are unique
        )

        # Store brand as entity
        if product.get("brand"):
            brand_entity = await client.long_term.add_entity(
                name=product["brand"],
                entity_type="BRAND",
            )

            await client.long_term.add_relationship(
                from_entity=product_entity.id,
                to_entity=brand_entity.id,
                relationship_type="MADE_BY",
            )

        # Store category as entity
        if product.get("category"):
            category_entity = await client.long_term.add_entity(
                name=product["category"],
                entity_type="CATEGORY",
            )

            await client.long_term.add_relationship(
                from_entity=product_entity.id,
                to_entity=category_entity.id,
                relationship_type="IN_CATEGORY",
            )

        # Link extracted entities to product
        if result.items[i].success:
            for entity in result.items[i].entities:
                if entity.type not in ["PRODUCT", "BRAND"]:  # Avoid duplicates
                    linked = await client.long_term.add_entity(
                        name=entity.name,
                        entity_type=entity.type,
                    )

                    await client.long_term.add_relationship(
                        from_entity=product_entity.id,
                        to_entity=linked.id,
                        relationship_type="RELATED_TO",
                    )

    print(f"Imported {len(products)} products")


# Usage
products = [
    {
        "sku": "NKE-AM90-001",
        "name": "Nike Air Max 90",
        "brand": "Nike",
        "category": "Running Shoes",
        "price": 129.99,
        "description": "Classic running shoe with Air cushioning...",
    },
    # ... more products
]

await import_product_catalog(products)

Performance Optimization

Tune Batch Size

# Smaller batches for memory-constrained environments
result = await extractor.extract_batch(
    texts=documents,
    batch_size=5,  # Smaller batches
    max_concurrency=2,
)

# Larger batches for high-memory environments with GPU
result = await extractor.extract_batch(
    texts=documents,
    batch_size=50,  # Larger batches
    max_concurrency=10,
)

Use Appropriate Extractor

# High-volume, lower precision
fast_extractor = SpacyEntityExtractor()  # Fastest

# Balanced
balanced_extractor = GLiNEREntityExtractor.for_schema("ecommerce")

# High precision, slower
accurate_pipeline = ExtractionPipeline(
    stages=[
        GLiNEREntityExtractor.for_schema("ecommerce"),
        LLMEntityExtractor(model="gpt-4o-mini"),
    ],
)

Skip Failures and Continue

result = await extractor.extract_batch(
    texts=documents,
    fail_fast=False,  # Continue on errors (default)
)

# Check for failures
failed = [item for item in result.items if not item.success]
print(f"Failed: {len(failed)}/{result.total_items}")

for item in failed:
    print(f"  Index {item.index}: {item.error}")

Best Practices

1. Validate Before Processing

# Filter out invalid documents
valid_docs = [
    doc for doc in documents
    if doc.get("content") and len(doc["content"]) > 10
]

print(f"Processing {len(valid_docs)} of {len(documents)} documents")
result = await extractor.extract_batch(texts=[d["content"] for d in valid_docs])

2. Log Progress and Errors

import logging

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def on_progress(completed, total):
    logger.info(f"Progress: {completed}/{total}")

result = await extractor.extract_batch(
    texts=documents,
    on_progress=on_progress,
)

for item in result.items:
    if not item.success:
        logger.error(f"Failed doc {item.index}: {item.error}")

3. Process in Manageable Chunks

# For very large document sets, process in chunks
CHUNK_SIZE = 1000

for i in range(0, len(all_documents), CHUNK_SIZE):
    chunk = all_documents[i:i + CHUNK_SIZE]
    print(f"Processing chunk {i//CHUNK_SIZE + 1}")

    result = await extractor.extract_batch(texts=chunk)

    # Store results before next chunk
    await store_results(result)

4. Use Appropriate Deduplication

# For known-clean data, skip deduplication
await client.long_term.add_entity(
    name=entity.name,
    entity_type=entity.type,
    deduplicate=False,  # Faster for trusted sources
)

# For user-generated content, enable deduplication
await client.long_term.add_entity(
    name=entity.name,
    entity_type=entity.type,
    deduplication=dedup_config,
)