Migrate from Neo4j Streams

The Neo4j Connector for Kafka and the Neo4j Streams plugin differ significantly in the architecture.

The following table summarises the differences between the two solutions.

Neo4j Streams plugin Neo4j Connector for Kafka

Deployment

Plugin deployed on Neo4j as JAR file; manages Kafka connections itself

Kafka Connect connector running on Kafka Connect, running separately from Neo4j

Communication

In-process client for the underlying Neo4j database

Establishes a database connection over BOLT using the Neo4j Driver for Java

Configuration

Kafka configuration options in the neo4j.conf file

Kafka configuration and Converters configured either globally in the Kafka Connect instance or per connector in the connector’s configuration

Sink/Source role

Supports both sink and source capabilities within a single plugin instance

Must be configured explicitly as a source or sink connector at launch

Kafka configuration

See the source and sink configuration for a detailed description of the new settings.

Removed settings

The Kafka settings in the neo4j.conf file used by the Neo4j Streams plugin are no longer supported, and must not be included in the new connector configuration.

In general, configuration settings starting with the kafka. prefix are no longer supported in the Neo4j Connector for Kafka.

General plugin settings
streams.procedures.enabled
streams.sink.enabled
streams.source.enabled
Consumer settings
kafka.auto.offset.reset
kafka.enable.auto.commit
kafka.group.id
kafka.max.poll.records
kafka.session.timeout.ms
kafka.topic.discovery.polling.interval
kafka.value.deserializer
kafka.value.deserializer
Producer settings
kafka.acks
kafka.batch.size
kafka.buffer.memory
kafka.linger.ms
kafka.retries
kafka.transactional.id
Security and connection settings
kafka.bootstrap.servers
kafka.connection.timeout.ms
kafka.security.protocol
kafka.ssl.endpoint.identification.algorithm
kafka.ssl.key.password
kafka.ssl.keystore.location
kafka.ssl.keystore.password
kafka.ssl.truststore.location
kafka.ssl.truststore.password
Other settings
kafka.reindex.batch.size
kafka.replication
kafka.streams.log.compaction.strategy

Neo4j Streams as a source

The source capability of the Neo4j Streams plugin has been replaced by the Change Data Capture (CDC) source connector.

CDC requires one of the following Neo4j deployments:

  • Neo4j AuraDB Business Critical or Virtual Dedicated Cloud tier

  • The latest Neo4j Enterprise Edition

  • Neo4j 5.26 LTS Enterprise Edition

If you are on a Neo4j or AuraDB version where CDC is not available, you can use the query-based source connector instead.

Required changes

See Source → CDC for details on the new configuration options.

The pattern syntax used for the Streams plugin has been updated to better reflect the Cypher pattern syntax, as in the following example:

streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
"neo4j.source-strategy": "CDC",
"neo4j.cdc.topic.my-nodes-topic.patterns": "(:Person)"
"neo4j.cdc.topic.my-rels-topic.patterns": "()-[:BELONGS-TO]->()"

Removed settings

streams.source.enabled
streams.source.enabled.from.<database>
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy
streams.source.schema.polling.interval

Neo4j Streams as a sink

The Neo4j Connector for Kafka supports all of the sink strategies available for the Neo4j Streams plugin.

Cypher strategy

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
"neo4j.cypher.topic.<YOUR_TOPIC>": "<CYPHER_QUERY>"

The Neo4j Connector for Kafka adds more options to map incoming message headers, keys, and/or values in recent versions, as described in Sink → Cypher.

CDC strategy

Both SourceId and Schema strategies for CDC events are still supported. While the source CDC connectors emit CDC events with a new CDC-compatible schema, sink connectors are still able to read CDC messages generated from the Neo4j Streams plugin.

Example 1. Changes for the SourceId strategy
streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the ID name given to the CDC ID field, default=sourceId>
"neo4j.cdc.source-id.topics": "<comma-separated list of topics>",
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>",
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"
Example 2. Changes for the Schema strategy
streams.sink.topic.cdc.schema=<list of topics separated by semicolon>
"neo4j.cdc.schema.topics": "<comma-separated list of topics>"

Pattern strategy

The pattern syntax from the Neo4j Streams configuration is fully supported in the Neo4j Connector for Kafka. While both simple and Cypher-like patterns are supported, the latter are the preferred way of defining patterns.

streams.sink.topic.pattern.node.user=(:User{!userId})
streams.sink.topic.pattern.node.product=(:Product{!productId})
streams.sink.topic.pattern.relationship.bought=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})
"neo4j.pattern.topic.user": "(:User{!userId})",
"neo4j.pattern.topic.product": "(:Product{!productId})",
"neo4j.pattern.topic.bought": "(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})"

Note that there is no node or relationship distinction in the Neo4j Connector for Kafka while declaring your pattern configuration.

You can also add more topics with different patterns to the same connector. Furthermore, the Neo4j Connector for Kafka introduces additional capabilities such as mapping incoming message properties to a new name, as shown in the Patterns section.

CUD File Format strategy

The CUD File Format strategy is fully supported in the Neo4j Connector for Kafka.

streams.sink.topic.cud=<list of topics separated by semicolon>
"neo4j.cud.topics": "<comma-separated list of topics>"

Error handling

Kafka Connect has its own error-handling mechanism to process sink messages. Refer to Sink → Error handling for more information on configuring your sink connectors for advanced error handling.