Quick Start

The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j.

The most recent version of the Kafka Connect Neo4j Connector can be found here.

Neo4j Streams Plugin

Any configuration option that starts with streams. controls how the plugin itself behaves. For a full list of options available, see the documentation subsections on the source and sink.

Install the Plugin

Kafka settings

Any configuration option that starts with kafka. will be passed to the underlying Kafka driver. Neo4j streams uses the official Confluent Kafka producer and consumer java clients. Configuration settings which are valid for those connectors will also work for Neo4j Streams.

For example, in the Kafka documentation linked below, the configuration setting named batch.size should be stated as kafka.batch.size in Neo4j Streams.

The following are common configuration settings you may wish to use. This is not a complete list. The full list of configuration options and reference material is available from Confluent’s site for sink configurations and source configurations.

Table 1. Most Common Needed Configuration Settings
Setting Name Description Default Value

kafka.max.poll.records

The maximum number of records to pull per batch from Kafka. Increasing this number will mean larger transactions in Neo4j memory and may improve throughput.

500

kafka.buffer.memory

The total bytes of memory the producer can use to buffer records waiting. Use this to adjust how much memory the plugin may require to hold messages not yet delivered to Neo4j

33554432

kafka.batch.size

(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

16384

kafka.max.partition.fetch.bytes

(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.

1048576

kafka.group.id

A unique string that identifies the consumer group this consumer belongs to.

N/A

Configure Kafka Connection

If you are running locally or against a standalone machine, configure neo4j.conf to point to that server:

neo4j.conf
kafka.bootstrap.servers=localhost:9092

If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in the Confluent Cloud section

Decide: Sink, Source, or Both

Configuring neo4j-streams comes in three different parts, depending on your need:

  1. Required: Configuring a connection to Kafka

neo4j.conf
kafka.bootstrap.servers=localhost:9092
  1. Optional: Configuring Neo4j to produce records to Kafka (Source)

  2. Optional: Configuring Neo4j to ingest from Kafka (Sink)

Follow one or both subsections according to your use case and need:

Sink

Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:

neo4j.conf
streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

This will process every message that comes in on my-ingest-topic with the given cypher statement. When that cypher statement executes, the event variable that is referenced will be set to the message received, so this sample cypher will create a (:Label) node in the graph with the given ID, copying all of the properties in the source message.

For full details on what you can do here, see the Sink section of the documentation.

Source

Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:

neo4j.conf
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000

This will produce all graph nodes labeled (:Person) on to the topic my-nodes-topic and all relationships of type -[:BELONGS-TO]→ to the topic named my-rels-topic. Further, schema changes will be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes. Please note that if not specified a value for streams.source.schema.polling.interval property then Streams plugin will use 300,000 ms as default.

The expressions Person{*} and BELONGS-TO{*} are patterns. You can find documentation on how to change these in the Patterns section.

For full details on what you can do here, see the Source section of the documentation.

Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active. If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.

When installing the latest version of the Neo4j Streams plugin into Neo4j 4.x, watching to logs you could find something similar to the following:

2020-03-25 20:13:50.606+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.partition.fetch.bytes
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.include.messages
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.auto.offset.reset
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.poll.records
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.source.enabled
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.cypher.boa.to.kafkaTest
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.tolerance
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.headers.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.header.prefix
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.topic.name
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled.to.kafkaTest

These are not errors. They comes from the new Neo4j 4 Configuration System, which warns that it doesn’t recognize those properties. Despite these warnings the plugin will work properly.