Entity Extraction: Batch & Streaming

How to process multiple documents in parallel and handle very long documents with streaming extraction.

Batch Extraction

Process multiple documents efficiently with parallel extraction:

documents = [
    {"id": "doc-1", "text": "Customer John ordered iPhone 15..."},
    {"id": "doc-2", "text": "Jane returned the Nike shoes..."},
    {"id": "doc-3", "text": "Order #12345 shipped via FedEx..."},
    # ... hundreds more
]

texts = [doc["text"] for doc in documents]

result = await extractor.extract_batch(
    texts=texts,
    batch_size=10,         # Documents per batch
    max_concurrency=5,     # Parallel workers
    on_progress=lambda done, total: print(f"Progress: {done}/{total}"),
)

print(f"Processed: {result.successful_items}/{result.total_items}")
print(f"Total entities extracted: {result.total_entities}")
print(f"Total relations extracted: {result.total_relations}")

# Check for failures
for index, error_msg in result.get_errors():
    print(f"  doc {index} failed: {error_msg}")

Tuning batch size and concurrency

  • GLiNER on GPU: batch_size=32, max_concurrency=1 — let the GPU saturate with larger batches rather than many small concurrent runs

  • GLiNER on CPU: batch_size=10, max_concurrency=4 — smaller batches, more parallelism

  • LLM via API: batch_size=1, max_concurrency=10 — one doc per worker, limited by API rate limits

High-Volume Ecommerce (Speed Priority)

extractor = GLiNEREntityExtractor.for_schema("ecommerce")

result = await extractor.extract_batch(
    texts=product_descriptions,
    batch_size=50,         # Larger batches for GPU throughput
    max_concurrency=10,
)

Financial Compliance (Accuracy Priority)

pipeline = ExtractionPipeline(
    stages=[
        GLiNEREntityExtractor.for_schema("financial"),
        LLMEntityExtractor(
            model="gpt-4o",
            extract_relations=True,
            temperature=0,           # Deterministic output
        ),
    ],
    merge_strategy="union",          # Keep all extractions
)

result = await pipeline.extract_batch(compliance_documents)

# Flag low-confidence entities for human review
for item in result.results:
    for entity in item.result.entities:
        if entity.confidence < 0.8:
            await flag_for_review(entity, document_id=item.index)

Streaming Extraction

For documents longer than ~10,000 tokens, use streaming extraction to process in chunks without loading the full document into memory:

from neo4j_agent_memory.extraction import StreamingExtractor

streamer = StreamingExtractor(
    extractor=extractor,
    chunk_size=4000,    # Characters per chunk
    overlap=200,        # Overlap to avoid splitting entities at boundaries
)

long_document = open("annual_report.txt").read()  # 100K+ characters

# Stream results as each chunk completes
async for chunk_result in streamer.extract_streaming(long_document):
    print(f"Chunk {chunk_result.chunk.index}: {chunk_result.entity_count} entities")

    for entity in chunk_result.entities:
        stored, _ = await client.long_term.add_entity(
            name=entity.name,
            entity_type=entity.type,
        )

Get Complete Deduplicated Result

# Collect all chunks and deduplicate across chunk boundaries
result = await streamer.extract(long_document, deduplicate=True)

print(f"Chunks processed: {result.stats.total_chunks}")
print(f"Raw entities: {result.stats.total_entities}")
print(f"After deduplication: {result.stats.deduplicated_entities}")

extraction_result = result.to_extraction_result(source_text=long_document)

Token-Based Chunking

Use token-count chunking for more predictable LLM context window usage:

streamer = StreamingExtractor(
    extractor=extractor,
    chunk_size=1000,        # Approximate tokens per chunk
    overlap=50,             # Token overlap
    chunk_by_tokens=True,   # Token-based instead of character-based
)

When to use streaming

Use streaming when your document exceeds ~8,000 characters (~2,000 tokens). Below that threshold, the chunking overhead isn’t worth it. For batch processing many large documents, combine both: use extract_batch() for parallelism and StreamingExtractor per document for size handling.

Track Extraction Quality

metrics = {
    "document_id": doc_id,
    "text_length": len(text),
    "entities_extracted": len(result.entities),
    "relations_extracted": len(result.relations),
    "avg_confidence": sum(e.confidence for e in result.entities) / len(result.entities),
    "extraction_time_ms": result.extraction_time_ms,
    "extractor": extractor.__class__.__name__,
}

await log_metrics(metrics)