Entity Extraction: Batch & Streaming
How to process multiple documents in parallel and handle very long documents with streaming extraction.
Batch Extraction
Process multiple documents efficiently with parallel extraction:
documents = [
{"id": "doc-1", "text": "Customer John ordered iPhone 15..."},
{"id": "doc-2", "text": "Jane returned the Nike shoes..."},
{"id": "doc-3", "text": "Order #12345 shipped via FedEx..."},
# ... hundreds more
]
texts = [doc["text"] for doc in documents]
result = await extractor.extract_batch(
texts=texts,
batch_size=10, # Documents per batch
max_concurrency=5, # Parallel workers
on_progress=lambda done, total: print(f"Progress: {done}/{total}"),
)
print(f"Processed: {result.successful_items}/{result.total_items}")
print(f"Total entities extracted: {result.total_entities}")
print(f"Total relations extracted: {result.total_relations}")
# Check for failures
for index, error_msg in result.get_errors():
print(f" doc {index} failed: {error_msg}")
|
Tuning batch size and concurrency
|
High-Volume Ecommerce (Speed Priority)
extractor = GLiNEREntityExtractor.for_schema("ecommerce")
result = await extractor.extract_batch(
texts=product_descriptions,
batch_size=50, # Larger batches for GPU throughput
max_concurrency=10,
)
Financial Compliance (Accuracy Priority)
pipeline = ExtractionPipeline(
stages=[
GLiNEREntityExtractor.for_schema("financial"),
LLMEntityExtractor(
model="gpt-4o",
extract_relations=True,
temperature=0, # Deterministic output
),
],
merge_strategy="union", # Keep all extractions
)
result = await pipeline.extract_batch(compliance_documents)
# Flag low-confidence entities for human review
for item in result.results:
for entity in item.result.entities:
if entity.confidence < 0.8:
await flag_for_review(entity, document_id=item.index)
Streaming Extraction
For documents longer than ~10,000 tokens, use streaming extraction to process in chunks without loading the full document into memory:
from neo4j_agent_memory.extraction import StreamingExtractor
streamer = StreamingExtractor(
extractor=extractor,
chunk_size=4000, # Characters per chunk
overlap=200, # Overlap to avoid splitting entities at boundaries
)
long_document = open("annual_report.txt").read() # 100K+ characters
# Stream results as each chunk completes
async for chunk_result in streamer.extract_streaming(long_document):
print(f"Chunk {chunk_result.chunk.index}: {chunk_result.entity_count} entities")
for entity in chunk_result.entities:
stored, _ = await client.long_term.add_entity(
name=entity.name,
entity_type=entity.type,
)
Get Complete Deduplicated Result
# Collect all chunks and deduplicate across chunk boundaries
result = await streamer.extract(long_document, deduplicate=True)
print(f"Chunks processed: {result.stats.total_chunks}")
print(f"Raw entities: {result.stats.total_entities}")
print(f"After deduplication: {result.stats.deduplicated_entities}")
extraction_result = result.to_extraction_result(source_text=long_document)
Token-Based Chunking
Use token-count chunking for more predictable LLM context window usage:
streamer = StreamingExtractor(
extractor=extractor,
chunk_size=1000, # Approximate tokens per chunk
overlap=50, # Token overlap
chunk_by_tokens=True, # Token-based instead of character-based
)
|
When to use streaming Use streaming when your document exceeds ~8,000 characters (~2,000 tokens). Below that threshold, the chunking overhead isn’t worth it. For batch processing many large documents, combine both: use |
Track Extraction Quality
metrics = {
"document_id": doc_id,
"text_length": len(text),
"entities_extracted": len(result.entities),
"relations_extracted": len(result.relations),
"avg_confidence": sum(e.confidence for e in result.entities) / len(result.entities),
"extraction_time_ms": result.extraction_time_ms,
"extractor": extractor.__class__.__name__,
}
await log_metrics(metrics)