Neo4j Streams FAQ

Source Code License

The source code to Neo4j Streams is available under the terms of the Apache License, version 2.0. See the LICENSE file in the source code repository for full terms and conditions.

How to integrate Neo4j and Kafka

When integrating Neo4j and Kafka using Neo4j Streams plugin or Kafka Connect plugin is important configure just one of them and not both. If you need to load data from Kafka to Neo4j (and not viceversa) you can just use the Kafka Connect plugin. If you need to have both sink and source functionalities then you have to use the Neo4j Streams plugin. This is the main difference between those plugins.

About CUD file format

The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations. So every JSON event represents a single operation. For more details about how to use these, please checkout the CUD File Format section for the Neo4j Streams plugin, and the CUD File Format section for Kafka Connect plugin.

How to ingest events using CDC Schema strategy

Change Data Capture method allows to ingest events between different Neo4j instances. If you decide to use the Neo4j Streams plugin, then the Neo4j source instance will be configured as follow:

streams.sink.enabled=false
streams.source.enabled=true
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>

and the Neo4j sink instance will be configured as follow:

streams.source.enabled=false
streams.sink.topic.cdc.schema=<topic-name>
streams.sink.enabled=true

If you decide to use Kafka Connect plugin for the sink instance, then it has to be configured as follow:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "topics": "<list_of_topics_separated_by_comma>",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://<neo4j-sink-hostname>:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cdc.schema": "<list_of_topics_separated_by_semicolon>"
  }
}

Is Neo4j Streams supported by Confluent Cloud?

If the need is to run the connector as a managed service then the answer is no. Users who are interested in running Neo4j-Streams as a Cloud managed connector by Confluent should request this of Confluent. Right now there are only a few connectors such as that for S3 that can be run as managed services. Click here to learn more.

However, it is supported in the sense that Neo4j Streams can be connected to a Confluent Cloud instance, and the basic configuration is explained in the Confluent Cloud section. Other references to how to configure it to connect to the Confluent Cloud can be found at the following links:

Kafka output events description

If you configure the Neo4j Streams plugin as Sink, using a Cypher query in order to ingest data from Kafka into Neo4j, watching the Kafka console consumer output you will see JSON events which describes nodes and relationships creation. They looks like as following:

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}

A JSON event must describe only one Neo4j entity (node/relationship) at a time.

The specified query is considered unique, so all the entities involved belongs to the same transaction:

  • txId identifies the transaction that affected the entity

  • txEventId is a counter that identifies the internal order in which Neo4j handled the specific event

How to configure Kafka over SSL?

You will find a guide here on how to configure Neo4j Streams plugin to work with Kafka over SSL. Under the covers, Neo4j Streams plugin uses the official Java libraries for Kafka, so you would configure this in the same way for the Java client.

If you would like to use also Kerberos the following tutorial should be useful: https://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

Moreover checkout also the Confluent Kafka official documentation for further details on this topic. Here are some helpful links:

Enabling DLQ functionality

In order to enable the DLQ functionality you have to specify the following properties:

streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

These properties has to be specified because their value is false by default. By specifying them you will be able to log errors and bad messages. Moreover you should need to declare also the following property, if left-off no DLQ:

streams.sink.errors.deadletterqueue.topic.name=<topic_name>

For further details try to look at the following section: How deal with bad data

Supported Kafka deserializers

The Neo4j Streams support two tpye of deserializers:

  • org.apache.kafka.common.serialization.ByteArrayDeserializer, if you want manage JSON messages

  • io.confluent.kafka.serializers.KafkaAvroDeserializer, if you want manage AVRO messages

If AVRO then a schema registry configuration is also needed:

kafka.schema.registry.url=*.*.*.*:8081

Where 8081 is the default port for the Confluent Schema Registry.

Kafka cluster and topic with multiple partition setup

If the environment is a Kafka cluster composed by:

  • multiple Zookeepers servers

  • multiple Kafka brokers

  • topics with multiple partitions

  • a Neo4j instance configured as Sink

is important to setup Zookeeper servers correctly. This means that the number of Zookeeper instances has to be 2n+1 where n is any number greater then 0. This because the odd number of servers allows ZooKeeper to perform majority elections for leadership.

So, if the cluster is not setup properly, what could happens is that events produced in some partitions may not be read.

Please see the following link for further details: