Quick Start

Kafka Connect Neo4j Connector

Install the Connector

Download and install the plugin via Confluent Hub client. See the chapter Kafka Connect Neo4j Connector for more details.

Run with Docker

Inside the directory /neo4j-kafka-connect-neo4j-<version>/doc/docker you’ll find a compose file that allows you to start the whole testing environment.

docker-compose.yml
---
version: '2'
services:
  neo4j:
    image: neo4j:4.0.3-enterprise
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: yes

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
    - zookeeper
    - broker
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    - connect
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Just go inside that folder from the terminal and run the following command:

docker-compose up -d

When the process is terminated you have all the modules up and running:

  • Neo4j

  • Zookeeper

  • Kafka Broker

  • Schema Registry

  • Kafka Connect

  • Kafka Control Center

Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and connect as password (see the docker-compose file to change it).

Configure SINK instance

On the Kafka Connect side only one thing is missing, namely create the SINK instance. So let’s do the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

In this case, we are configuring the SINK instance to consume and deliver data in AVRO format. Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and check the created my-topic as specified into the contrib.sink.avro.neo4j.json.

contrib.sink.avro.neo4j.json
{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

The property neo4j.topic.cypher.my-topic defines each message that will be consumed by the SINK on the Kafka Connect side, will cause the execution of the specified cypher query on the Neo4j side.

Configure SOURCE instance

On the Kafka Connect side only one thing is missing, namely create the SOURCE instance. So let’s do the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.source.avro.neo4j.json

This will create a Kafka Connect Source instance that will send AVRO message over the topic named my-topic.

contrib.source.avro.neo4j.json
{
  "name": "Neo4jSourceConnector",
  "config": {
    "topic": "my-topic",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.property": "timestamp",
    "neo4j.streaming.from": "LAST_COMMITTED",
    "neo4j.enforce.schema": true,
    "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
  }
}

The property topic defines where each message will be pushed, the message structure is related is given by RETURN cause of the Cypher statement defined into the neo4j.source.query property. So given the JSON above the structure of the message will be the following:

{"name": <name>, "timestamp": <timestamp>}

Neo4j Streams Plugin

The Neo4j Streams Plugin running inside the Neo4j database is deprecated and will not be supported after version 4.3 of Neo4j. We recommend users not to adopt this plugin for new implementations, and to consider migrating to the use of the Kafka Connect Neo4j Connector as a replacement

Any configuration option that starts with streams. controls how the plugin itself behaves. For a full list of options available, see the documentation subsections on the source and sink.

Install the Plugin

Kafka settings

Any configuration option that starts with kafka. will be passed to the underlying Kafka driver. Neo4j streams uses the official Confluent Kafka producer and consumer java clients. Configuration settings which are valid for those connectors will also work for Neo4j Streams.

For example, in the Kafka documentation linked below, the configuration setting named batch.size should be stated as kafka.batch.size in Neo4j Streams.

The following are common configuration settings you may wish to use. This is not a complete list. The full list of configuration options and reference material is available from Confluent’s site for sink configurations and source configurations.

Table 1. Most Common Needed Configuration Settings
Setting Name Description Default Value

kafka.max.poll.records

The maximum number of records to pull per batch from Kafka. Increasing this number will mean larger transactions in Neo4j memory and may improve throughput.

500

kafka.buffer.memory

The total bytes of memory the producer can use to buffer records waiting. Use this to adjust how much memory the plugin may require to hold messages not yet delivered to Neo4j

33554432

kafka.batch.size

(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

16384

kafka.max.partition.fetch.bytes

(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.

1048576

kafka.group.id

A unique string that identifies the consumer group this consumer belongs to.

N/A

Configure Kafka Connection

If you are running locally or against a standalone machine, configure neo4j.conf to point to that server:

neo4j.conf
kafka.bootstrap.servers=localhost:9092

If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in the Confluent Cloud section

Decide: Sink, Source, or Both

Configuring neo4j-streams comes in three different parts, depending on your need:

  1. Required: Configuring a connection to Kafka

neo4j.conf
kafka.bootstrap.servers=localhost:9092
  1. Optional: Configuring Neo4j to produce records to Kafka (Source)

  2. Optional: Configuring Neo4j to ingest from Kafka (Sink)

Follow one or both subsections according to your use case and need:

Sink

Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:

neo4j.conf
streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

This will process every message that comes in on my-ingest-topic with the given cypher statement. When that cypher statement executes, the event variable that is referenced will be set to the message received, so this sample cypher will create a (:Label) node in the graph with the given ID, copying all of the properties in the source message.

For full details on what you can do here, see the Sink section of the documentation.

Source

Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:

neo4j.conf
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000

This will produce all graph nodes labeled (:Person) on to the topic my-nodes-topic and all relationships of type -[:BELONGS-TO]→ to the topic named my-rels-topic. Further, schema changes will be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes. Please note that if not specified a value for streams.source.schema.polling.interval property then Streams plugin will use 300,000 ms as default.

The expressions Person{*} and BELONGS-TO{*} are patterns. You can find documentation on how to change these in the Patterns section.

For full details on what you can do here, see the Source section of the documentation.

Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active. If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.

When installing the latest version of the Neo4j Streams plugin into Neo4j 4.x, watching to logs you could find something similar to the following:

2020-03-25 20:13:50.606+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.partition.fetch.bytes
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.include.messages
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.auto.offset.reset
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.bootstrap.servers
2020-03-25 20:13:50.608+0000 WARN  Unrecognized setting. No declared setting with name: kafka.max.poll.records
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.log.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.source.enabled
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.topic.cypher.boa.to.kafkaTest
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.tolerance
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: kafka.group.id
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.headers.enable
2020-03-25 20:13:50.609+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.context.header.prefix
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.errors.deadletterqueue.topic.name
2020-03-25 20:13:50.610+0000 WARN  Unrecognized setting. No declared setting with name: streams.sink.enabled.to.kafkaTest

These are not errors. They comes from the new Neo4j 4 Configuration System, which warns that it doesn’t recognize those properties. Despite these warnings the plugin will work properly.