Migrate from Neo4j Streams
The Neo4j Connector for Kafka and the Neo4j Streams plugin differ significantly in the architecture.
The following table summarises the differences between the two solutions.
| Neo4j Streams plugin | Neo4j Connector for Kafka | |
|---|---|---|
Deployment |
Plugin deployed on Neo4j as JAR file; manages Kafka connections itself |
Kafka Connect connector running on Kafka Connect, running separately from Neo4j |
Communication |
In-process client for the underlying Neo4j database |
Establishes a database connection over BOLT using the Neo4j Driver for Java |
Configuration |
Kafka configuration options in the neo4j.conf file |
Kafka configuration and Converters configured either globally in the Kafka Connect instance or per connector in the connector’s configuration |
Sink/Source role |
Supports both sink and source capabilities within a single plugin instance |
Must be configured explicitly as a source or sink connector at launch |
Kafka configuration
Removed settings
The Kafka settings in the neo4j.conf file used by the Neo4j Streams plugin are no longer supported, and must not be included in the new connector configuration.
|
In general, configuration settings starting with the |
streams.procedures.enabled
streams.sink.enabled
streams.source.enabled
kafka.auto.offset.reset
kafka.enable.auto.commit
kafka.group.id
kafka.max.poll.records
kafka.session.timeout.ms
kafka.topic.discovery.polling.interval
kafka.value.deserializer
kafka.value.deserializer
kafka.acks
kafka.batch.size
kafka.buffer.memory
kafka.linger.ms
kafka.retries
kafka.transactional.id
kafka.bootstrap.servers
kafka.connection.timeout.ms
kafka.security.protocol
kafka.ssl.endpoint.identification.algorithm
kafka.ssl.key.password
kafka.ssl.keystore.location
kafka.ssl.keystore.password
kafka.ssl.truststore.location
kafka.ssl.truststore.password
kafka.reindex.batch.size
kafka.replication
kafka.streams.log.compaction.strategy
Neo4j Streams as a source
The source capability of the Neo4j Streams plugin has been replaced by the Change Data Capture (CDC) source connector.
|
CDC requires one of the following Neo4j deployments:
If you are on a Neo4j or AuraDB version where CDC is not available, you can use the query-based source connector instead. |
Required changes
See Source → CDC for details on the new configuration options.
The pattern syntax used for the Streams plugin has been updated to better reflect the Cypher pattern syntax, as in the following example:
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
"neo4j.source-strategy": "CDC",
"neo4j.cdc.topic.my-nodes-topic.patterns": "(:Person)"
"neo4j.cdc.topic.my-rels-topic.patterns": "()-[:BELONGS-TO]->()"
Neo4j Streams as a sink
The Neo4j Connector for Kafka supports all of the sink strategies available for the Neo4j Streams plugin.
Cypher strategy
streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
"neo4j.cypher.topic.<YOUR_TOPIC>": "<CYPHER_QUERY>"
The Neo4j Connector for Kafka adds more options to map incoming message headers, keys, and/or values in recent versions, as described in Sink → Cypher.
CDC strategy
Both SourceId and Schema strategies for CDC events are still supported. While the source CDC connectors emit CDC events with a new CDC-compatible schema, sink connectors are still able to read CDC messages generated from the Neo4j Streams plugin.
streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the ID name given to the CDC ID field, default=sourceId>
"neo4j.cdc.source-id.topics": "<comma-separated list of topics>",
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>",
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"
streams.sink.topic.cdc.schema=<list of topics separated by semicolon>
"neo4j.cdc.schema.topics": "<comma-separated list of topics>"
Pattern strategy
The pattern syntax from the Neo4j Streams configuration is fully supported in the Neo4j Connector for Kafka. While both simple and Cypher-like patterns are supported, the latter are the preferred way of defining patterns.
streams.sink.topic.pattern.node.user=(:User{!userId})
streams.sink.topic.pattern.node.product=(:Product{!productId})
streams.sink.topic.pattern.relationship.bought=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})
"neo4j.pattern.topic.user": "(:User{!userId})",
"neo4j.pattern.topic.product": "(:Product{!productId})",
"neo4j.pattern.topic.bought": "(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})"
Note that there is no node or relationship distinction in the Neo4j Connector for Kafka while declaring your pattern configuration.
You can also add more topics with different patterns to the same connector. Furthermore, the Neo4j Connector for Kafka introduces additional capabilities such as mapping incoming message properties to a new name, as shown in the Patterns section.
CUD File Format strategy
The CUD File Format strategy is fully supported in the Neo4j Connector for Kafka.
streams.sink.topic.cud=<list of topics separated by semicolon>
"neo4j.cud.topics": "<comma-separated list of topics>"
Error handling
Kafka Connect has its own error-handling mechanism to process sink messages. Refer to Sink → Error handling for more information on configuring your sink connectors for advanced error handling.