Neo4j Streaming Data Integrations User Guide v3.5


License: Creative Commons 4.0

This is the user guide for Neo4j Streams version 3.5, authored by the Neo4j Labs Team.

The guide covers the following areas:

Chapter 1. Project Overview

This chapter provides an introduction to the Neo4j Streams Library and Kafka Connect plugin.

Many user and customers want to integrate Kafka and other streaming solutions with Neo4j. Either to ingest data into the graph from other sources. Or to send update events (change data capture - CDC) to the event log for later consumption.

This extension was developed to satisfy all these use-cases and more to come.

Neo4j Streams can run in two modes:

  • as a Neo4j plugin:

    • Neo4j Streams Source: a transaction event handler events that sends data to a Kafka topic
    • Neo4j Streams Sink: a Neo4j application that ingest data from Kafka topics into Neo4j via templated Cypher Statements
    • Neo4j Streams Procedures: two procedures streams.publish, which allows custom message streaming from Neo4j to the configured environment, and streams.consume which allows to consume messages from a given topic.
  • as a Kafka-Connect Plugin: a plugin for the Confluent Platform that allows to ingest data into Neo4j, from Kafka topics, via Cypher queries. At the moment it offers only the Sink functionality.

Experienced Neo4j users will likely prefer running the software as a Neo4j Plugin. Kafka administrators may prefer using the Kafka Connect method.

Be aware of not configuring both Neo4j plugin and Kafka Connect worker. They will generate errors if used simultaneously, so just one at a time has to be used.

1.1. Neo4j Streams Plugin

As a Neo4j plugin, neo4j-streams runs inside of the database, and can both consume and produce messages to Kafka.

1.2. Kafka Connect Plugin

As a Kafka Connect plugin, neo4j-streams is deployed separately from the Neo4j database. At this time, the connect worker can be used to push data to Neo4j (Neo4j as the consumer) but does not yet support change data capture (CDC) coming from Neo4j.

Chapter 2. Quick Start

Get started fast for common scenarios, using Neo4j Streams plugin or Kafka Connect plugin

2.1. Neo4j Streams Plugin

Any configuration option that starts with streams. controls how the plugin itself behaves. For a full list of options available, see the documentation subsections on the source and sink.

2.1.1. Install the Plugin

2.1.2. Kafka settings

Any configuration option that starts with kafka. will be passed to the underlying Kafka driver. Neo4j streams uses the official Confluent Kafka producer and consumer java clients. Configuration settings which are valid for those connectors will also work for Neo4j Streams.

For example, in the Kafka documentation linked below, the configuration setting named batch.size should be stated as kafka.batch.size in Neo4j Streams.

The following are common configuration settings you may wish to use. This is not a complete list. The full list of configuration options and reference material is available from Confluent’s site for sink configurations and source configurations.

Table 2.1. Most Common Needed Configuration Settings
Setting Name Description Default Value

kafka.max.poll.records

The maximum number of records to pull per batch from Kafka. Increasing this number will mean larger transactions in Neo4j memory and may improve throughput.

500

kafka.buffer.memory

The total bytes of memory the producer can use to buffer records waiting. Use this to adjust how much memory the plugin may require to hold messages not yet delivered to Neo4j

33554432

kafka.batch.size

(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes.

16384

kafka.max.partition.fetch.bytes

(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress.

1048576

kafka.group.id

A unique string that identifies the consumer group this consumer belongs to.

N/A

2.1.3. Configure Kafka Connection

If you are running locally or against a standalone machine, configure neo4j.conf to point to that server:

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092

If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in the Confluent Cloud section

2.1.4. Decide: Sink, Source, or Both

Configuring neo4j-streams comes in three different parts, depending on your need:

  1. Required: Configuring a connection to Kafka

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092

  1. Optional: Configuring Neo4j to produce records to Kafka (Source)
  2. Optional: Configuring Neo4j to ingest from Kafka (Sink)

Follow one or both subsections according to your use case and need:

2.1.4.1. Sink

Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:

neo4j.conf. 

streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties

This will process every message that comes in on my-ingest-topic with the given cypher statement. When that cypher statement executes, the event variable that is referenced will be set to the message received, so this sample cypher will create a (:Label) node in the graph with the given ID, copying all of the properties in the source message.

For full details on what you can do here, see the Sink section of the documentation.

2.1.4.2. Source

Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:

neo4j.conf. 

streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000

This will produce all graph nodes labeled (:Person) on to the topic my-nodes-topic and all relationships of type -[:BELONGS-TO]→ to the topic named my-rels-topic. Further, schema changes will be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes. Please note that if not specified a value for streams.source.schema.polling.interval property then Streams plugin will use 300,000 ms as default.

The expressions Person{*} and BELONGS-TO{*} are patterns. You can find documentation on how to change these in the Patterns section.

For full details on what you can do here, see the Source section of the documentation.

2.1.4.3. Restart Neo4j

Once the plugin is installed and configured, restarting the database will make it active. If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.

2.2. Kafka Connect Plugin

2.2.1. Install the Plugin

Download and install the plugin via Confluent Hub client. See the chapter Kafka Connect Plugin for more details.

2.2.2. Run with Docker

Inside the directory /neo4j-kafka-connect-neo4j-<version>/doc/docker you’ll find a compose file that allows you to start the whole testing environment.

docker-compose.yml. 

---
version: '2'
services:
  neo4j:
    image: neo4j:3.5-enterprise
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: yes

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
    - zookeeper
    - broker
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    - connect
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Just go inside that folder from the terminal and run the following command:

docker-compose up -d

When the process is terminated you have all the modules up and running:

  • Neo4j
  • Zookeeper
  • Kafka Broker
  • Schema Registry
  • Kafka Connect
  • Kafka Control Center

Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and connect as password (see the docker-compose file to change it).

2.2.3. Configure SINK instance

On the Kafka Connect side only one thing is missing, namely create the SINK instance. So let’s do the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

In this case, we are configuring the SINK instance to consume and deliver data in AVRO format. Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and check the created my-topic as specified into the contrib.sink.avro.neo4j.json.

contrib.sink.avro.neo4j.json. 

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

The property neo4j.topic.cypher.my-topic defines each message that will be consumed by the SINK on the Kafka Connect side, will cause the execution of the specified cypher query on the Neo4j side.

Chapter 3. Neo4j Streams - Source: Neo4j → Kafka

This chapter describes the Neo4j Streams Source in the Neo4j Streams Library. Use this section to configure Neo4j to publish CDC style data to Kafka.

Is the transaction event handler events that sends data to a Kafka topic

3.1. Configuration

You can set the following configuration values in your neo4j.conf, here are the defaults.

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=
kafka.topic.discovery.polling.interval=300000

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.enabled=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>

To use the Kafka transactions please set kafka.transactional.id and kafka.acks properly. Checkout this blog post for further details about transactions in Apache Kafka

See the Apache Kafka documentation for details on these settings.

In case you Kafka broker is configured with auto.create.topics.enable to false, all the messages sent to topics that don’t exist are discarded; this because the KafkaProducer.send() method blocks the execution, as explained in KAFKA-3539. You can tune the custom property kafka.topic.discovery.polling.interval in order to periodically check for new topics into the Kafka cluster so the plugin will be able to send events to the defined topics.

3.2. Patterns

3.2.1. Nodes

To control which nodes are sent to Kafka, and which of their properties you can define node-patterns in the config.

You can chose Labels and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
pattern meaning

Label{*}

all nodes with this label with all their properties go to the related topic

Label1:Label2

nodes with these two labels are sent to the related topic

Label{prop1,prop2}

the prop1 and prop2 of all nodes with this label are sent to the related topic

Label{-prop1,-prop2}

in the node with label Label properties prop1 and prop2 are excluded

3.2.2. Relationships

To control which relationships are sent to Kafka, and which of their properties you can define relationships-patterns in the config.

You can chose Type and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
pattern meaning

KNOWS{*}

all relationship with this label with all their properties go to the related topic

MEET{prop1,prop2}

the prop1 and prop2 of all relationship with this type are sent to the related topic

ANSWER{-prop1,-prop2}

in the relationship with type KNOWS properties prop1 and prop2 are excluded

3.3. Transaction Event Handler

The transaction event handler is the core of the Stream Producer and allows to stream database changes.

3.3.1. Events

The Producer streams three kind of events:

  • created: when a node/relation/property is created
  • updated: when a node/relation/property is updated
  • deleted: when a node/relation/property is deleted

3.3.1.1. Created

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.1.2. Updated

Following an example of the node update event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person", "Tmp"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne"
      }
    },
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.1.3. Deleted

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String",
      "geo": "point"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.2. Meta

The meta field contains the metadata related to the transaction event:

Field Type Description

timestamp

Number

The timestamp related to the transaction event

username

String

The username that generated the transaction

tx_id

Number

The transaction id provided by the Neo4j trasaction manager

tx_event_count

Number

The number of the events included into the transaction (i.e. 2 update on nodes, 1 relationship creation)

tx_event_id

Number

The id of the event inside the transaction

operation

enum["created", "updated", "deleted"]

The operation type

source

Object

Contains the information about the source

3.3.2.1. Source

Field Type Description

hostname

String

The information about the source

3.3.3. Payload

The payload field contains the information about the the data related to the event

Field Type Description

id

Number

The id of the graph entity

type

enum["node", "relationship"]

The type of the graph entity

before

Object

The data before the transaction event

after

Object

The data after the transaction event

3.3.4. Payload: before and after

We must distinguish two cases:

3.3.4.1. Nodes

Field Type Description

labels

String[]

List of labels attached to the node

properties

Map<K, V>

List of properties attached to the node, the K is the property name

3.3.4.2. Relationships

Field Type Description

label

string

The relationship type

properties

Map<K,V>

List of properties attached to the relationship, the K is the property name

start

Object

The starting node of the relationship

end

Object

The ending node of the relationship

3.3.4.3. Relationships: startNode and endNode

Field Type Description

id

Number

The id of the node

labels

String[]

List of labels attached to the node

ids

Map<K,V>

The ids related to the defined constraints for the node (UNIQUENESS and/or NODE_KEY). The K is the property name, and the V the related value

3.3.5. Schema

Field Type Description

constraints

Object[]

List of constraints attached to the entity

properties

Map<K, V>

List of properties attached to the entity, where the K is the property name and the V is the class type

3.3.6. Constraints

Nodes and Relationships can have a list of constraints attached to them:

Table 3.1. Constraints
Field Type Description

label

String

The label attached to the constraint

type

enum["UNIQUE", "NODE_PROPERTY_EXISTS", "RELATIONSHIP_PROPERTY_EXISTS"]

The constraint type

properties

String[]

List of properties involved in the constraint

Chapter 4. Neo4j Streams - Sink: Kafka → Neo4j

This chapter describes the Neo4j Streams Sink in the Neo4j Streams Library. Use this section to configure Neo4j to ingest the data from Kafka into Neo4j.

Is the Kafka Sink that ingest the data directly into Neo4j

4.1. How it works

It works in several ways:

  • by providing a Cypher template
  • by ingesting the events emitted from another Neo4j instance via the Change Data Capture module
  • by providing a pattern extraction to a JSON or AVRO file
  • by managing a CUD file format

4.1.1. Cypher Template

It works with template Cypher queries stored into properties with the following format:

neo4j.conf. 

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>

Each Cypher template must refer to an event object that will be injected by the Sink

Following an example:

For this event

{
 "id": 42,
 "properties": {
   "title": "Answer to anyting",
   "description": "It depends."
 }
}

neo4j.conf. 

streams.sink.topic.cypher.my-topic=MERGE (n:Label {id: event.id}) \
ON CREATE SET n += event.properties

Under the hood the Sink inject the event object as a parameter in this way

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

Where {batch} is a json list, so continuing with the example above a possible full representation could be:

:params events => [{id:"alice@example.com",properties:{name:"Alice",age:32}},
    {id:"bob@example.com",properties:{name:"Bob",age:42}}]

UNWIND {events} AS event
MERGE (n:Label {id: event.id})
    ON CREATE SET n += event.properties

4.2. Sink ingestion strategies

4.2.1. Change Data Capture Event

This method allows to ingest CDC events coming from another Neo4j Instance. You can use two strategies:

  • The SourceId strategy which merges the nodes/relationships by the CDC event id field (it’s related to the Neo4j physical ID)
  • The Schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined in your graph model

4.2.1.1. The SourceId strategy

You can configure the topic in the following way:

streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
streams.sink.topic.cdc.sourceId=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}

as you can notice, ingested event has been projected with two peculiarities:

  • the id field has transformed into sourceId;
  • the node has an additional label SourceEvent;

these two fields will be used in order to match the node/relationship for future updates/deletes

4.2.1.2. The Schema strategy

You can configure the topic in the following way:

streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}

The Schema strategy leverages the schema field in order to insert/update the nodes so no extra fields will be created.

In case of relationship

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

the Schema strategy leverages the ids fields in order to insert/update the relationships so no extra fields will be created.

4.2.2. The Pattern strategy

The Pattern strategy allows you to extract nodes and relationships from a json by providing a extraction pattern

Each property can be prefixed with:

  • !: identify the id (could be more than one property), it’s mandatory
  • -: exclude the property from the extraction If no prefix is specified this means that the property will be included

You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties

Labels can be chained via :

Tombstone Record Management. The pattern strategy come out with the support to the Tombstone Record, in order to leverage it your event should contain as key the record that you want to delete and null for the value.

Currently you can’t concatenate multiple patterns (for example in case you use just one topic and produce more then one node/relationship type). So you have to use a different topic for each type of node/relationship and define a pattern for each of them

4.2.2.1. The Node Pattern configuration

You can configure the node pattern extraction in the following way:

streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}

You can transform it into a node by providing the following configuration:

by specifying a simpler pattern:

streams.sink.topic.pattern.node.user=User{!userId}

or by specifying a Cypher like node pattern:

streams.sink.topic.pattern.node.user=(:User{!userId})

Similar to the CDC pattern you can provide:

pattern meaning

User:Actor{!userId} or User:Actor{!userId,*}

the userId will be used as ID field and all properties of the json will be attached to the node with the provided labels (User and Actor) so the persisted node will be: (User:Actor{userId: 1, name: 'Andrea', surname: 'Santurbano', address.city: 'Venice', address.cap: 30100})

User{!userId, surname}

the userId will be used as ID field and only the surname property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano'})

User{!userId, surname, address.city}

the userId will be used as ID field and only the surname and the address.city property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano', address.city: 'Venice'})

User{!userId,-address}

the userId will be used as ID field and the address property will be excluded so the persisted node will be: (User{userId: 1, name: 'Andrea', surname: 'Santurbano'})

4.2.2.2. The Relationship Pattern configuration

You can configure the relationship pattern extraction in the following way:

streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}

You can transform it into a path, like (n)-[r]→(m), by providing the following configuration:

By specifying a simpler pattern:

streams.sink.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}

or by specifying a Cypher like node pattern:

streams.sink.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})

in this last case the we assume that User is the source node and Product the target node

Similar to the CDC pattern you can provide:

pattern meaning

(User{!userId})-[:BOUGHT]→(Product{!productId}) or (User{!userId})-[:BOUGHT{price, currency}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the other json properties on them so the persisted data will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice', shippingAddress.cap: 30100}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{-shippingAddress}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties (by the exclusion) so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price,currency, shippingAddress.city}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice'}]→(Product{productId: 100})

4.2.2.3. Attach properties to node

By default no properties will be attached to the edge nodes but you can specify which property attach to each node. Given the following json published via the user topic:

{
    "userId": 1,
    "userName": "Andrea",
    "userSurname": "Santurbano",
    "productId": 100,
    "productName": "My Awesome Product!",
    "price": 10,
    "currency": "€"
}
pattern meaning

(User{!userId, userName, userSurname})-[:BOUGHT]→(Product{!productId, productName})

this will merge two nodes and the BOUGHT relationship between with all json properties them so the persisted pattern will be: (User{userId: 1, userName: 'Andrea', userSurname: 'Santurbano'})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100, name: 'My Awesome Product!'})

4.2.3. CUD File Format

The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations.

You can configure the topic in the following way:

streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=my-topic;my-other.topic

We have two formats:

  • One for nodes:

    We provide an example of a MERGE operation

    {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }

    which would be transformed into the following Cypher query:

    UNWIND [..., {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }, ...] AS event
    MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
    SET n += event.properties

    Lets describe the fields:

    Table 4.1. CUD file Node format fields description
    field mandatory Description

    op

    yes

    The operation type: create/merge/update/delete

    N.B. delete messages are for individual nodes it’s not intended to be a generic way of doing cypher query building from JSON

    properties

    no in case the operation is delete, otherwise yes

    The properties attached to the node

    ids

    no in case the operation is create, otherwise yes

    In case the operation is merge/update/delete this field is mandatory and contains the primary/unique keys of the node that will be use to do the lookup to the entity. In case you use as key the _id name the cud format will refer to Neo4j’s node internal for the node lookup.

    N.B. If you’ll use the _id reference with the op merge it will work as simple update, this means that if the node with the passed internal id does not exists it will not be created.

    labels

    no

    The labels attached to the node.

    N.B. Neo4j allows to create nodes without labels, but from a performance perspective, it’s a bad idea don’t provide them.

    type

    yes

    The entity type: node/relationship ⇒ node in this case

    detach

    no

    In case the operation is delete you can specify if perform a "detach" delete that means delete any incident relationships when you delete a node

    N.B. if no value is provided, the default is true

  • And one for relationships:

    We provide an example of a CREATE operation

    {
      "op": "create",
      "properties": {
        "foo": "rel-value",
        "key": 1
      },
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }

    which would be transformed into the following Cypher query:

    UNWIND [..., {
      "op": "create",
      "properties": {
        "foo": "rel-value",
        "key": 1
      },
      "rel-type": "MY-REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }, ...] AS event
    MATCH (from:Foo:Bar {key: event.from.ids.key})
    MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
    CREATE (from)-[r:MY_REL]->(to)
    SET r = event.properties

    Lets describe the fields:

    Table 4.2. CUD file Relationship format fields description
    field mandatory Description

    op

    yes

    The operation type: create/merge/update/delete

    properties

    no

    The properties attached to the relationship

    rel_type

    yes

    The relationship type

    from

    yes, if you use the _id field reference into ids you can left labels blank

    Contains the info about the source node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

    to

    yes, if you use the _id field reference into ids you can left labels blank

    Contains the info about the target node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

    type

    yes

    The entity type: node/relationship ⇒ relationship in this case

Following another example of DELETE operation for both node and relationship.

  • For Node, the following JSON:

    {
      "op": "delete",
      "properties": {},
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": false
    }

    will be transformed in the following Cypher query:

    UNWIND [..., {
      "op": "delete",
      "properties": {},
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": false
    }, ...] AS event
    MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
    DELETE n

    Note that if you set "detach": true then the transformation will be:

    UNWIND [
    ...
    ] AS event
    ...
    DETACH DELETE n
  • For Relationship, the following JSON:

    {
      "op": "create",
      "properties": {},
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }

    will be transformed in the following Cypher query:

    UNWIND [..., {
      "op": "create",
      "properties": {},
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }, ...] AS event
    MATCH (from:Foo:Bar {key: event.from.ids.key})
    MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
    MATCH (from)-[r:MY_REL]->(to)
    DELETE r

4.3. How deal with bad data

The Neo4j Streams Plugin provides several means to handle processing errors.

It can fail fast or log errors with different detail levels. Another way is to re-route all the data and errors that for something reason it wasn’t able to ingest to a Dead Letter Queue.

It behaves by default like Kafka Connect, see this blog post

  • fail fast (abort) by default
  • need to configure dead-letter-queue topic to enable
  • need to enable logging explicitly
  • headers and message logging must be enabled explicitly

Config Options

Table 4.3. Dead Letter Queue configuration parameters
Name Value Note

errors.tolerance

none

fail fast (default!) abort

errors.tolerance

all

all == lenient, silently ignore bad messages

errors.log.enable

false/true

log errors (default: false)

errors.log.include.messages

false/true

log bad messages too (default: false)

errors.deadletterqueue.topic.name

topic-name

dead letter queue topic name, if left off no DLQ, default: not set

errors.deadletterqueue.context.headers.enable

false/true

enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false

errors.deadletterqueue.context.headers.prefix

prefix-text

common prefix for header entries, e.g. "__streams.errors." , default: not set

errors.deadletterqueue.topic.replication.factor

3/1

replication factor, need to set to 1 for single partition, default:3

For the Neo4j extension you prefix them with streams.sink in the Neo4j configuration.

Example settings:

Fail Fast, Abort. 

errors.tolerance=none

Don’t fail on errors, Log with Messages. 

errors.tolerance=all
errors.log.enable=true
errors.log.include.messages=true

Don’t fail on errors, Don’t log but send to DLQ with headers. 

errors.tolerance=all
errors.deadletterqueue.topic.name=my-dlq-topic
errors.deadletterqueue.context.headers.enable=true

Same Settings for Neo4j Server Plugin. 

streams.sink.errors.tolerance=all
streams.sink.errors.deadletterqueue.topic.name=my-dlq-topic
streams.sink.errors.deadletterqueue.context.headers.enable=true

Every published record in the Dead Letter Queue contains the original record Key and Value pairs and optionally the following headers:

Header key Description

<prefix>topic

The topic where the data is published

<prefix>partition

The topic partition where the data is published

<prefix>soffset

The offset of the data into the topic partition

<prefix>class.name

The class that generated the error

<prefix>exception.class.name

The exception that generated the error

<prefix>exception.message

The exception message

<prefix>exception.stacktrace"

The exception stack trace

4.4. Supported Kafka deserializers

The Neo4j Streams plugin supports 2 deserializers:

  • org.apache.kafka.common.serialization.ByteArrayDeserializer: if you want manage JSON messages
  • io.confluent.kafka.serializers.KafkaAvroDeserializer: if you want manage AVRO messages

You can define them independently for Key and Value as specified in the Configuration paragraph

4.5. Configuration summary

You can set the following Kafka configuration values in your neo4j.conf, here are the defaults.

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

streams.sink.topic.cypher.<TOPIC_NAME>=<CYPHER_QUERY>
streams.sink.topic.cdc.sourceId=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>
streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>
streams.sink.enabled=<true/false, default=true>

See the Apache Kafka documentation for details on these settings.

Chapter 5. Neo4j Streams - Procedures

This chapter describes the Neo4j Streams Procedures in the Neo4j Streams Library. Use this section to configure Neo4j to know how procedures allow the functionality of the plugin to be used ad-hoc in any Cypher query.

The Streams project comes out with a list of procedures.

5.1. Configuration

You can enable/disable the procedures by changing this variable inside the neo4j.conf

neo4j.conf. 

streams.procedures.enabled=<true/false, default=true>

Please note that by default the dbms.security.procedures.whitelist property is disabled, so Neo4j will load all procedures found. If you enable it then you have also to declare a comma separated list of procedures to be loaded by default. For example:

dbms.security.procedures.whitelist=streams.*,apoc.*

If you try to CALL one of the Streams procedures without declaring them into the whitelist, you will receive an error like the following:

Figure 5.1. Neo4j Streams procedure not found
procedure not found

5.2. streams.publish

This procedure allows custom message streaming from Neo4j to the configured environment by using the underlying configured Producer.

Uses:

CALL streams.publish('my-topic', 'Hello World from Neo4j!')

The message retrieved from the Consumer is the following:

{"payload":"Hello world from Neo4j!"}

If you use a local docker (compose) setup, you can check for these messages with:

docker exec -it kafka kafka-console-consumer --topic my-topic --bootstrap-server kafka:9092

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

payload

Object

The data that you want to stream

You can send any kind of data in the payload, nodes, relationships, paths, lists, maps, scalar values and nested versions thereof.

In case of nodes or relationships if the topic is defined in the patterns provided by the configuration their properties will be filtered in according with the configuration.

5.3. streams.consume

This procedure allows to consume messages from a given topic.

Uses:

CALL streams.consume('my-topic', {<config>}) YIELD event RETURN event

Example: Imagine you have a producer that publish events like this {"name": "Andrea", "surname": "Santurbano"}, we can create user nodes in this way:

CALL streams.consume('my-topic') YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

In case you want to read a specific offset of a topic partition you can do it by executing the following query:

CALL streams.consume('my-topic', {timeout: 5000, partitions: [{partition: 0, offset: 30}]}) YIELD event
CREATE (p:Person{firstName: event.data.name, lastName: event.data.surname})

Input Parameters:

Variable Name Type Description

topic

String

The topic where you want to publish the data

config

Map<K,V>

The configuration parameters

5.3.1. Available configuration parameters

Variable Name Type Description

timeout

Number (default 1000)

Define the time that the procedure should be listen the topic

from

String

It’s the Kafka configuration parameter auto.offset.reset. If not specified it inherits the underlying kafka.auto.offset.reset value

groupId

String

It’s the Kafka configuration parameter group.id. If not specified it inherits the underlying kafka.group.id value

autoCommit

Boolean (default true)

It’s the Kafka configuration parameter enable.auto.commit. If not specified it inherits the underlying kafka.enable.auto.commit value

commit

Boolean (default true)

In case of autoCommit is set to false you can decide if you want to commit the data.

zookeeper

String

The comma separated string of Zookeeper nodes url. If not specified it inherits the underlying kafka.zookeeper.connect value

broker

String

The comma separated string of Kafka nodes url. If not specified it inherits the underlying kafka.bootstrap.servers value

partitions

List<Map<K,V>>

The map contains the information about partition and offset in order to start reading from a

keyDeserializer

String

The supported deserializer for the Kafka Record Key If not specified it inherits the underlying kafka.key.deserializer value. Supported deserializers are: org.apache.kafka.common.serialization.ByteArrayDeserializer and io.confluent.kafka.serializers.KafkaAvroDeserializer

valueDeserializer

String

The supported deserializer for the Kafka Record Value If not specified it inherits the underlying kafka.value.deserializer value Supported deserializers are: org.apache.kafka.common.serialization.ByteArrayDeserializer and io.confluent.kafka.serializers.KafkaAvroDeserializer

schemaRegistryUrl

String

The schema registry url, required in case you are dealing with AVRO messages.

5.3.2. Partitions

Variable Name Type Description

partition

Number

It’s the Kafka partition number to read

offset

Number

It’s the offset to start to read the topic partition

Chapter 6. Kafka Connect Plugin

This chapter describes Kafka Connect plugins in the Neo4j Streams Library.

Figure 6.1. Neo4j Loves Confluent
neo4j loves confluent

Kafka Connect, an open source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

The Neo4j Streams project provides a Kafka Connect plugin that can be installed into the Confluent Platform enabling:

  • Ingest data from Kafka topics directly into Neo4j via templated Cypher queries;
  • Stream Neo4j transaction events (coming soon).

6.1. Plugin installation

You can choose your preferred way in order to install the plugin

6.1.1. Download and install the plugin via Confluent Hub client

If you are using the provided compose file you can easily install the plugin by using the Confluent Hub.

Once the compose file is up and running you can install the plugin by executing the following command:

<confluent_platform_dir>/bin/confluent-hub install neo4j/kafka-connect-neo4j:<version>

When the installation will ask:

The component can be installed in any of the following Confluent Platform installations:

Please prefer the solution (where this tool is installed) and then go ahead with the default options.

Following an example:

Figure 6.2. Installation via Confluent Hub Client
confluent hub client installation

At the end of the process the plugin is automatically installed.

6.1.2. Download the zip from the Confluent Hub

Please go to the Confluent Hub page of the plugin:

https://www.confluent.io/connector/kafka-connect-neo4j-sink/

And click to the Download Connector button.

Once you downloaded the file please place it into your Kafka Connect plugins dir.

6.1.3. Build it locally

Download the project from Github:

git clone https://github.com/neo4j-contrib/neo4j-streams.git

Go into the neo4j-streams directory:

cd neo4j-streams

Build the project by running the following command:

mvn clean install

Inside the directory <neo4j-streams>/kafka-connect-neo4j/target/component/packages you’ll find a file named neo4j-kafka-connect-neo4j-<VERSION>.zip, please unpackage and place it into your Kafka Connect plugins dir.

6.2. Create the Sink Instance

Create the Sink instance:

We’ll define the Sink configuration in several ways:

  • by providing a Cypher template
  • by ingesting the events emitted from another Neo4j instance via the Change Data Capture module
  • by providing a pattern extraction to a JSON or AVRO file
  • by managing a CUD file format

6.2.1. Cypher template

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.deadletterqueue.topic.name": "test-error-topic",
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

In particular this line:

"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"

defines that all the data that comes from the topic my-topic will be unpacked by the Sink into Neo4j with the following Cypher query:

MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Under the hood the Sink inject the event object in this way

UNWIND {batch} AS event
MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Where {batch} is a list of event objects.

You can change the query or remove the property and add your own, but you must follow the following convention:

"neo4j.topic.cypher.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

Let’s load the configuration into the Confluent Platform with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

The file contrib.sink.string-json.neo4j.json contains a configuration that manage a simple JSON producer example

Please check that everything is fine by going into:

http://localhost:9021/management/connect

and click to the Sink tab. You must find a table just like this:

Status Active Tasks Name Topics

Running

1

Neo4jSinkConnector

my-topic

Note that the Sink instance can be configured also to monitor multiple topics. Just evaluate the property topics with a list of topic separated by comma. For example:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "topics": "topicA,topicB",
    "_comment": "Cypher template example configuration",
    "neo4j.topic.cypher.topicA": "<YOUR_CYPHER_QUERY>",
    "neo4j.topic.cypher.topicB": "<YOUR_CYPHER_QUERY>",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false
  }
}

6.3. Sink ingestion strategies

6.3.1. Change Data Capture Event

This method allows to ingest CDC events coming from another Neo4j Instance. You can use two strategies:

  • The SourceId strategy which merges the nodes/relationships by the CDC event id field (it’s related to the Neo4j physical ID)
  • The Schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined in your graph model

6.3.1.1. The SourceId strategy

You can configure the topic in the following way:

neo4j.topic.cdc.sourceId=<list of topics separated by semicolon>
neo4j.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
neo4j.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
neo4j.topic.cdc.sourceId=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}

as you can notice, ingested event has been projected with two peculiarities:

  • the id field has transformed into sourceId;
  • the node has an additional label SourceEvent;

these two fields will be used in order to match the node/relationship for future updates/deletes

6.3.1.2. The Schema strategy

You can configure the topic in the following way:

neo4j.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
neo4j.topic.cdc.schema=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}

The Schema strategy leverages the schema field in order to insert/update the nodes so no extra fields will be created.

In case of relationship

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

the Schema strategy leverages the ids fields in order to insert/update the relationships so no extra fields will be created.

6.3.2. The Pattern strategy

The Pattern strategy allows you to extract nodes and relationships from a json by providing a extraction pattern

Each property can be prefixed with:

  • !: identify the id (could be more than one property), it’s mandatory
  • -: exclude the property from the extraction If no prefix is specified this means that the property will be included

You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties

Labels can be chained via :

Tombstone Record Management. The pattern strategy come out with the support to the Tombstone Record, in order to leverage it your event should contain as key the record that you want to delete and null for the value.

Currently you can’t concatenate multiple patterns (for example in case you use just one topic and produce more then one node/relationship type). So you have to use a different topic for each type of node/relationship and define a pattern for each of them

6.3.2.1. The Node Pattern configuration

You can configure the node pattern extraction in the following way:

neo4j.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}

You can transform it into a node by providing the following configuration:

by specifying a simpler pattern:

neo4j.topic.pattern.node.user=User{!userId}

or by specifying a Cypher like node pattern:

neo4j.topic.pattern.node.user=(:User{!userId})

Similar to the CDC pattern you can provide:

pattern meaning

User:Actor{!userId} or User:Actor{!userId,*}

the userId will be used as ID field and all properties of the json will be attached to the node with the provided labels (User and Actor) so the persisted node will be: (User:Actor{userId: 1, name: 'Andrea', surname: 'Santurbano', address.city: 'Venice', address.cap: 30100})

User{!userId, surname}

the userId will be used as ID field and only the surname property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano'})

User{!userId, surname, address.city}

the userId will be used as ID field and only the surname and the address.city property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano', address.city: 'Venice'})

User{!userId,-address}

the userId will be used as ID field and the address property will be excluded so the persisted node will be: (User{userId: 1, name: 'Andrea', surname: 'Santurbano'})

6.3.2.2. The Relationship Pattern configuration

You can configure the relationship pattern extraction in the following way:

neo4j.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}

You can transform it into a path, like (n)-[r]→(m), by providing the following configuration:

By specifying a simpler pattern:

neo4j.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}

or by specifying a Cypher like node pattern:

neo4j.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})

in this last case the we assume that User is the source node and Product the target node

Similar to the CDC pattern you can provide:

pattern meaning

(User{!userId})-[:BOUGHT]→(Product{!productId}) or (User{!userId})-[:BOUGHT{price, currency}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the other json properties on them so the persisted data will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice', shippingAddress.cap: 30100}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{-shippingAddress}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties (by the exclusion) so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price,currency, shippingAddress.city}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice'}]→(Product{productId: 100})

6.3.2.3. Attach properties to node

By default no properties will be attached to the edge nodes but you can specify which property attach to each node. Given the following json published via the user topic:

{
    "userId": 1,
    "userName": "Andrea",
    "userSurname": "Santurbano",
    "productId": 100,
    "productName": "My Awesome Product!",
    "price": 10,
    "currency": "€"
}
pattern meaning

(User{!userId, userName, userSurname})-[:BOUGHT]→(Product{!productId, productName})

this will merge two nodes and the BOUGHT relationship between with all json properties them so the persisted pattern will be: (User{userId: 1, userName: 'Andrea', userSurname: 'Santurbano'})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100, name: 'My Awesome Product!'})

6.3.3. CUD File Format

The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations.

You can configure the topic in the following way:

neo4j.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
neo4j.topic.cud=my-topic;my-other.topic

We have two formats:

  • One for nodes:

    We provide an example of a MERGE operation

    {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }

    which would be transformed into the following Cypher query:

    UNWIND [..., {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }, ...] AS event
    MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
    SET n += event.properties

    Lets describe the fields:

    Table 6.1. CUD file Node format fields description
    field mandatory Description

    op

    yes

    The operation type: create/merge/update/delete

    N.B. delete messages are for individual nodes it’s not intended to be a generic way of doing cypher query building from JSON

    properties

    no in case the operation is delete, otherwise yes

    The properties attached to the node

    ids

    no in case the operation is create, otherwise yes

    In case the operation is merge/update/delete this field is mandatory and contains the primary/unique keys of the node that will be use to do the lookup to the entity. In case you use as key the _id name the cud format will refer to Neo4j’s node internal for the node lookup.

    N.B. If you’ll use the _id reference with the op merge it will work as simple update, this means that if the node with the passed internal id does not exists it will not be created.

    labels

    no

    The labels attached to the node.

    N.B. Neo4j allows to create nodes without labels, but from a performance perspective, it’s a bad idea don’t provide them.

    type

    yes

    The entity type: node/relationship ⇒ node in this case

    detach

    no

    In case the operation is delete you can specify if perform a "detach" delete that means delete any incident relationships when you delete a node

    N.B. if no value is provided, the default is true

  • And one for relationships:

    We provide an example of a CREATE operation

    {
      "op": "create",
      "properties": {
        "foo": "rel-value",
        "key": 1
      },
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }

    which would be transformed into the following Cypher query:

    UNWIND [..., {
      "op": "create",
      "properties": {
        "foo": "rel-value",
        "key": 1
      },
      "rel-type": "MY-REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }, ...] AS event
    MATCH (from:Foo:Bar {key: event.from.ids.key})
    MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
    CREATE (from)-[r:MY_REL]->(to)
    SET r = event.properties

    Lets describe the fields:

    Table 6.2. CUD file Relationship format fields description
    field mandatory Description

    op

    yes

    The operation type: create/merge/update/delete

    properties

    no

    The properties attached to the relationship

    rel_type

    yes

    The relationship type

    from

    yes, if you use the _id field reference into ids you can left labels blank

    Contains the info about the source node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

    to

    yes, if you use the _id field reference into ids you can left labels blank

    Contains the info about the target node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

    type

    yes

    The entity type: node/relationship ⇒ relationship in this case

Following another example of DELETE operation for both node and relationship.

  • For Node, the following JSON:

    {
      "op": "delete",
      "properties": {},
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": false
    }

    will be transformed in the following Cypher query:

    UNWIND [..., {
      "op": "delete",
      "properties": {},
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": false
    }, ...] AS event
    MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
    DELETE n

    Note that if you set "detach": true then the transformation will be:

    UNWIND [
    ...
    ] AS event
    ...
    DETACH DELETE n
  • For Relationship, the following JSON:

    {
      "op": "create",
      "properties": {},
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }

    will be transformed in the following Cypher query:

    UNWIND [..., {
      "op": "create",
      "properties": {},
      "rel_type": "MY_REL",
      "from": {
        "ids": {"key": 1},
        "labels": ["Foo","Bar"]
      },
      "to": {
        "ids": {"otherKey":1},
        "labels": ["FooBar"]
      },
      "type":"relationship"
    }, ...] AS event
    MATCH (from:Foo:Bar {key: event.from.ids.key})
    MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
    MATCH (from)-[r:MY_REL]->(to)
    DELETE r

6.4. How deal with bad data

In Kafka Connect plugin, in the creation phase of the Sink instance, in addition to the properties described in the Dead Letter Queue configuration paramaters table, you need to define kafka broker connection properties:

Name mandatory Description

kafka.bootstrap.servers

true

It’s the Kafka Broker url. *(please look at the description below)

kafka.<any_other_kafka_property>

false

You can also specify any other kafka Producer setting by adding the kafka. prefix (i.e the configuration acks become kafka.acks). See the Apache Kafka documentation for details on these settings.

As you may have noticed we’re asking to provide the bootstrap.server property, this because the Kafka Connect Framework provides an out-of-the-box support only for deserialization errors and message transformations (please look into the following link for further details: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues). We want to extend this feature for transient errors in order to cover the 100% of failures. So to do that at this moment as suggested by Confluent we need to ask again the broker location, until this JIRA issue will not be addressed: https://issues.apache.org/jira/browse/KAFKA-8597. Said that, these properties has to be added only if you want to also redirect Neo4j errors into the DLQ.

6.5. Monitor via Confluent Platform UI

The Kafka Monitoring UI can be found at http://<localhost>:9021/management/connect

Figure 6.3. Confluent Importing Metrics
confluent metrics

They show up properly in my topic, and then are added to Neo4j via the sink.

Below you see the data that has been ingested into Neo4j. During my testing I got up to more than 2M events.

Figure 6.4. Confluent Platform Management
confluent imported data

6.6. Kafka Connect Client Config Override Policy

In Apache Kafka 2.3.0 was introduced the ability for each source and sink connector to inherit their client configurations from the worker properties.

For further details see this link: https://cwiki.apache.org/confluence/display/KAFKA/KIP-458%3A+Connector+Client+Config+Override+Policy

Chapter 7. Run with Docker

This chapter describes Docker Compose templates that can be used to test Neo4j Streams applications.

7.1. Neo4j Streams plugin

7.1.1. Introduction

When Neo4j is run in a Docker, some special considerations apply; please see Neo4j Docker Configuration for more information. In particular, the configuration format used in neo4j.conf looks different.

Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_ and using the following transformations:

  • single underscore is converted in double underscore: _ → __
  • point is converted in single underscore: ._

Example:

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G
  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

For more information and examples see this section and the Confluent With Docker section of the documentation.

Another important thing to watch out for is about possible permissions issue. If you want to running Kafka in Docker using a host volume for which the user is not the owner then you will have a permission error. There are two possible solutions:

  • use a root-user
  • change permissions of the volume in order to make it accessible by the non-root user

The Neo4j docker container is built on an approach that uses environment variables passed to the container as a way to configure Neo4j. There are certain characters which environment variables cannot contain, notably the dash - character. Configuring the plugin to use stream names that contain these characters will not work properly, because a configuration environment variable such as NEO4J_streams_sink_topic_cypher_my-topic cannot be correctly evaluated as an environment variable (my-topic). This is a limitation of the Neo4j docker container rather than neo4j-streams.

Please note that the Neo4j Docker image use a naming convention; you can override every neo4j.conf property by prefix it with NEO4J_ and using the following transformations:

  • single underscore is converted in double underscore: _ → __
  • point is converted in single underscore: ._

Example:

  • dbms.memory.heap.max_size=8GNEO4J_dbms_memory_heap_max__size: 8G
  • dbms.logs.debug.level=DEBUGNEO4J_dbms_logs_debug_level: DEBUG

Following you’ll find a lightweight Docker Compose file that allows you to test the application in your local environment

Prerequisites:

  • Docker
  • Docker Compose

Here the instruction about how to configure Docker and Docker-Compose

From the same directory where the compose file is, you can launch this command:

docker-compose up -d

7.1.2. Source module

Following a compose file that allows you to spin-up Neo4j, Kafka and Zookeeper in order to test the application.

docker-compose.yml. 

version: '3'
services:
  neo4j:
    image: neo4j:3.5
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    depends_on:
      - kafka
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_AUTH: neo4j/streams
      NEO4J_dbms_logs_debug_level: DEBUG
      # KAFKA related configuration
      NEO4J_kafka_zookeeper_connect: zookeeper:12181
      NEO4J_kafka_bootstrap_servers: kafka:19092
      NEO4J_streams_source_topic_nodes_neo4j: Person{*}
      NEO4J_streams_source_topic_relationships_neo4j: KNOWS{*}

  zookeeper:
    image: confluentinc/cp-zookeeper:latest
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "12181:12181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 12181

  kafka:
    image: confluentinc/cp-kafka:latest
    hostname: kafka
    container_name: kafka
    ports:
      - "19092:19092"
    depends_on:
      - zookeeper
    environment:
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:12181
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:19092

7.1.2.1. Launch it locally

Prerequisites
  • Install the latest version of Neo4j Streams plugin into ./neo4j/plugins

Before starting please change the volume directory according to yours, inside the <plugins> dir you must put Streams jar

volumes:
    - ./neo4j/plugins:/plugins

You can execute a Kafka Consumer that subscribes the topic neo4j by executing this command:

docker exec kafka kafka-console-consumer --bootstrap-server kafka:19092 --topic neo4j --from-beginning

Then directly from the Neo4j browser you can generate some random data with this query:

UNWIND range(1,100) as id
CREATE (p:Person {id:id, name: "Name " + id, age: id % 3}) WITH collect(p) as people
UNWIND people as p1
UNWIND range(1,10) as friend
WITH p1, people[(p1.id + friend) % size(people)] as p2
CREATE (p1)-[:KNOWS {years: abs(p2.id - p1.id)}]->(p2)

And if you go back to your consumer you’ll see something like this:

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":98,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"84","before":null,"after":{"properties":{"name":"Name 85","id":85,"age":1},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}

Please note that in this example no topic name was specified before the execution of the Kafka Consumer, which is listening on neo4j topic. This is because Neo4j Streams plugin, if not specified, will produce messages into a topic named neo4j by default.

7.1.3. Sink module

Following you’ll find a simple docker compose file that allow you to spin-up two Neo4j instances one configured as Source and one as Sink, allowing you to share any data from the Source to the Sink:

    environment:
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

7.1.3.1. Launch it locally

In the following example we will use the Neo4j Streams plugin in combination with the APOC procedures (download from here) in order to download some data from Stackoverflow, store them into the Neo4j Source instance and replicate these dataset into the Sink via the Neo4j Streams plugin.

version: '3'

services:
  neo4j-source:
    image: neo4j:3.5
    hostname: neo4j-source
    container_name: neo4j-source
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8474:7474"
      - "8687:7687"
    volumes:
      - ./neo4j/plugins:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/source
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_kafka_batch_size: 16384
      NEO4J_streams_sink_enabled: "false"
      NEO4J_streams_source_schema_polling_interval: 10000

  neo4j-sink:
    image: neo4j:3.5
    hostname: neo4j-sink
    container_name: neo4j-sink
    depends_on:
      - neo4j-source
    ports:
      - "7474:7474"
      - "7687:7687"
    volumes:
      - ./neo4j/plugins-sink:/plugins
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/sink
      NEO4J_dbms_memory_heap_max__size: 2G
      NEO4J_kafka_max_poll_records: 16384
      NEO4J_streams_source_enabled: "false"
      NEO4J_streams_sink_topic_cdc_schema: "neo4j"
      NEO4J_dbms_logs_debug_level: DEBUG
      NEO4J_streams_sink_enabled: "true"
      NEO4J_streams_sink_topic_neo4j:
        "WITH event.value.payload AS payload, event.value.meta AS meta
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Question' THEN [1] ELSE [] END |
          MERGE (n:Question{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Answer' THEN [1] ELSE [] END |
          MERGE (n:Answer{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'User' THEN [1] ELSE [] END |
          MERGE (n:User{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'node' AND meta.operation <> 'deleted' and payload.after.labels[0] = 'Tag' THEN [1] ELSE [] END |
          MERGE (n:Tag{neo_id: toInteger(payload.id)}) ON CREATE
            SET n += payload.after.properties
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ANSWERS' THEN [1] ELSE [] END |
          MERGE (s:Answer{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ANSWERS{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'TAGGED' THEN [1] ELSE [] END |
          MERGE (s:Question{neo_id: toInteger(payload.start.id)})
          MERGE (e:Tag{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:TAGGED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'PROVIDED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Answer{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:PROVIDED{neo_id: toInteger(payload.id)}]->(e)
        )
        FOREACH (ignoreMe IN CASE WHEN payload.type = 'relationship' AND meta.operation <> 'deleted' and payload.label = 'ASKED' THEN [1] ELSE [] END |
          MERGE (s:User{neo_id: toInteger(payload.start.id)})
          MERGE (e:Question{neo_id: toInteger(payload.end.id)})
          CREATE (s)-[:ASKED{neo_id: toInteger(payload.id)}]->(e)
        )"

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
      - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
Prerequisites
  • Install the APOC into ./neo4j/plugins.
  • Install the Neo4j Streams plugin into ./neo4j/plugins and ./neo4j/plugins-sink
Import the data

Let’s go to two instances in order to create the constraints on both sides:

// enable the multi-statement execution: https://stackoverflow.com/questions/21778435/multiple-unrelated-queries-in-neo4j-cypher?answertab=votes#tab-top
CREATE CONSTRAINT ON (u:User) ASSERT u.id IS UNIQUE;
CREATE CONSTRAINT ON (a:Answer) ASSERT a.id IS UNIQUE;
CREATE CONSTRAINT ON (t:Tag) ASSERT t.name IS UNIQUE;
CREATE CONSTRAINT ON (q:Question) ASSERT q.id IS UNIQUE;

please take a look at the property inside the compose file:

NEO4J_streams_source_schema_polling_interval: 10000

this means that every 10 seconds the Streams plugin polls the DB in order to retrieve schema changes and store them. So after you created the indexes you need almost to wait 10 seconds before the next step, otherwise the

Now lets go to the Source and, in order to import the Stackoverflow dataset, execute the following query:

UNWIND range(1, 1) as page
CALL apoc.load.json("https://api.stackexchange.com/2.2/questions?pagesize=100&order=desc&sort=creation&tagged=neo4j&site=stackoverflow&page=" + page) YIELD value
UNWIND value.items AS event
MERGE (question:Question {id:event.question_id}) ON CREATE
  SET question.title = event.title, question.share_link = event.share_link, question.favorite_count = event.favorite_count

FOREACH (ignoreMe in CASE WHEN exists(event.accepted_answer_id) THEN [1] ELSE [] END | MERGE (question)<-[:ANSWERS]-(answer:Answer{id: event.accepted_answer_id}))

WITH * WHERE NOT event.owner.user_id IS NULL
MERGE (owner:User {id:event.owner.user_id}) ON CREATE SET owner.display_name = event.owner.display_name
MERGE (owner)-[:ASKED]->(question)

Once the import process has finished to be sure that the data is correctly replicated into the Sink execute this query both in Source and Sink and compare the results:

MATCH (n)
RETURN
DISTINCT labels(n),
count(*) AS NumofNodes,
avg(size(keys(n))) AS AvgNumOfPropPerNode,
min(size(keys(n))) AS MinNumPropPerNode,
max(size(keys(n))) AS MaxNumPropPerNode,
avg(size((n)-[]-())) AS AvgNumOfRelationships,
min(size((n)-[]-())) AS MinNumOfRelationships,
max(size((n)-[]-())) AS MaxNumOfRelationships
order by NumofNodes desc

You can also launch a Kafka Consumer that subscribes the topic neo4j by executing this command:

docker exec broker kafka-console-consumer --bootstrap-server broker:9093 --topic neo4j --from-beginning

You’ll see something like this:

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":330,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"94","start":{"id":"186","labels":["User"],"ids":{"id":286795}},"end":{"id":"59","labels":["Question"],"ids":{"id":58303891}},"before":null,"after":{"properties":{}},"label":"ASKED","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

{"meta":{"timestamp":1571403896987,"username":"neo4j","txId":34,"txEventId":331,"txEventsCount":352,"operation":"created","source":{"hostname":"neo4j-source"}},"payload":{"id":"34","start":{"id":"134","labels":["Answer"],"ids":{"id":58180296}},"end":{"id":"99","labels":["Question"],"ids":{"id":58169215}},"before":null,"after":{"properties":{}},"label":"ANSWERS","type":"relationship"},"schema":{"properties":{},"constraints":[]}}

7.2. Kafka Connect plugin

Inside the directory /kafka-connect-neo4j/docker you’ll find a compose file that allows you to start the whole testing environment:

docker-compose.yml. 

---
version: '2'
services:
  neo4j:
    image: neo4j:3.5
    hostname: neo4j
    container_name: neo4j
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_kafka_zookeeper_connect: zookeeper:2181
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command:
#      - bash
#      - -c
#      - |
#        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:1.0.3
        /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

7.2.1. Configuration parameters

You can set the following configuration values via Confluent Connect UI, or via REST endpoint

Field Type Description

neo4j.server.uri

String

The Bolt URI (default bolt://localhost:7687)

neo4j.authentication.type

enum[NONE, BASIC, KERBEROS]

The authentication type (default BASIC)

neo4j.batch.size

Int

The max number of events processed by the Cypher query (default 1000)

neo4j.batch.timeout.msecs

Long

The execution timeout for the cypher query (default 30000)

neo4j.authentication.basic.username

String

The authentication username

neo4j.authentication.basic.password

String

The authentication password

neo4j.authentication.basic.realm

String

The authentication realm

neo4j.authentication.kerberos.ticket

String

The Kerberos ticket

neo4j.encryption.enabled

Boolean

If the encryption is enabled (default false)

neo4j.encryption.trust.strategy

enum[TRUST_ALL_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES]

The Neo4j trust strategy (default TRUST_ALL_CERTIFICATES)

neo4j.encryption.ca.certificate.path

String

The path of the certificate

neo4j.connection.max.lifetime.msecs

Long

The max Neo4j connection lifetime (default 1 hour)

neo4j.connection.acquisition.timeout.msecs

Long

The max Neo4j acquisition timeout (default 1 hour)

neo4j.connection.liveness.check.timeout.msecs

Long

The max Neo4j liveness check timeout (default 1 hour)

neo4j.connection.max.pool.size

Int

The max pool size (default 100)

neo4j.load.balance.strategy

enum[ROUND_ROBIN, LEAST_CONNECTED]

The Neo4j load balance strategy (default LEAST_CONNECTED)

neo4j.batch.parallelize

Boolean

(default true) While concurrent batch processing improves throughput, it might cause out-of-order handling of events. Set to false if you need application of messages with strict ordering, e.g. for change-data-capture (CDC) events.

7.2.2. Configuring the stack

Start the compose file

docker-compose up -d

You can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and connect as password (see the docker-compose file to change it).

7.2.2.1. Plugin installation

You can choose your preferred way in order to install the plugin:

  • Build it locally

    Build the project by running the following command:

    mvn clean install

    Create a directory plugins at the same level of the compose file and unzip the file neo4j-kafka-connect-neo4j-<VERSION>.zip inside it.

  • Download the zip from the Confluent Hub

    Please go to the Confluent Hub page of the plugin:

    https://www.confluent.io/connector/kafka-connect-neo4j-sink/

    And click to the Download Connector button.

    Create a directory plugins at the same level of the compose file and unzip the file neo4j-kafka-connect-neo4j-<VERSION>.zip inside it.

  • Download and install the plugin via Confluent Hub client

    If you are using the provided compose file you can easily install the plugin by using the Confluent Hub.

    Once the compose file is up and running you can install the plugin by executing the following command:

    docker exec -it connect confluent-hub install neo4j/kafka-connect-neo4j:<version>

    When the installation will ask:

    The component can be installed in any of the following Confluent Platform installations:

    Please prefer the solution (where this tool is installed) and then go ahead with the default options.

    At the end of the process the plugin is automatically installed.

7.2.3. Create the Sink Instance

To create the Sink instance and configure your preferred ingestion strategy, you can follow instructions described into Create the Sink Instance and Sink Ingestion Strategies sections.

7.2.3.1. Use the Kafka Connect Datagen

In order to generate a sample dataset you can use Kafka Connect Datagen as explained in Example with Kafka Connect Datagen section.

Before start using the data generator please create indexes in Neo4j (in order to speed-up the import process)

Chapter 8. Using with Neo4j Causal Cluster

This chapter describes considerations around using Neo4j Streams with Neo4j Enterprise Causal Cluster.

8.1. Overview

Neo4j Clustering is a feature available in Enterprise Edition which allows high availability of the database through having multiple database members.

Neo4j Enterprise uses a LEADER/FOLLOWER operational view, where writes are always processed by the leader, while reads can be serviced by either followers, or optionally be read replicas, which maintain a copy of the database and serve to scale out read operations horizontally.

8.2. Kafka Connect

When using Neo4j Streams in this method with a Neo4j cluster, the most important consideration is to use a routing driver. Generally, these are identified by a URI with bolt+routing:// as the scheme instead of just bolt:// as the scheme.

If your Neo4j cluster is located at graph.mycompany.com:7687, simply configure the Kafka Connect worker with

neo4j.server.uri=bolt+routing://graph.mycompany.com:7687

The use of the bolt+routing driver will mean that the Neo4j Driver itself will handle connecting to the correct cluster member, and managing changes to the cluster membership over time.

For further information on routing drivers, see the Neo4j Driver Manual.

8.3. Neo4j Plugin

When using Neo4j Streams as a plugin together with a cluster, there are several things to keep in mind:

  • The plugin must be present in the plugins directory of all cluster members, and not just one.
  • The configuration settings must be present in all of the neo4j.conf files, not just one.

Through the course of the cluster lifecycle, the leader may change; for this reason the plugin must be everywhere, and not just on the leader.

The plugin detects the leader, and will not attempt to perform a write (i.e. in the case of the consumer) on a follower where it would fail. The plugin checks cluster toplogy as needed.

Additionally for CDC, a consideration to keep in mind is that as of Neo4j 3.5, committed transactions are only published on the leader as well. In practical terms, this means that as new data is committed to Neo4j, it is the leader that will be publishing that data back out to Kafka, if you have the producer configured.

The neo4j streams utility procedures, in particular CALL streams.publish, can work on any cluster member, or read replica. CALL streams.consume may also be used on any cluster member, however it is important to keep in mind that due to the way clustering in Neo4j works, using streams.consume together with write operations will not work on a cluster follower or read replica, as only the leader can process writes.

8.4. Remote Clients

Sometimes there will be remote applications that talk to Neo4j via official drivers, that want to use streams functionality. Best practices in these cases are:

  • Always use a bolt+routing:// driver URI when communicating with the cluster in the client application.
  • Use Explicit Write Transactions in your client application when using procedure calls such as CALL streams.consume to ensure that the routing driver routes the query to the leader.

Chapter 9. Configure with Kafka over SSL

This section provides guidance to configure SSL security between Kafka and Neo4j. This will provide data encryption between Kafka and Neo4j.

This does not address ACL confguration inside of KAFKA.

Please see also the following Confluent documentations for further details on how to configure encryption and authentication with SSL:

9.1. Self Signed Certificates

This section came from https://medium.com/talking-tech-all-around/how-to-enable-and-verify-client-authentication-in-kafka-21e936e670e8.

Make sure that you have truststore and keystore JKSs for each server. In case you want a self signed certificate, you can use the following commands:

mkdir security
cd security

export PASSWORD=password
keytool -keystore kafka.server.keystore.jks -alias localhost -validity 365 -genkey
openssl req -new -x509 -keyout ca-key -out ca-cert -days 365
keytool -keystore kafka.server.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.truststore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.server.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.server.keystore.jks -alias localhost -import -file cert-signed
keytool -keystore kafka.client1.keystore.jks -alias localhost -validity 365 -genkey
keytool -keystore kafka.client1.keystore.jks -alias localhost -certreq -file cert-file
openssl x509 -req -CA ca-cert -CAkey ca-key -in cert-file -out cert-signed -days 365 -CAcreateserial -passin pass:$PASSWORD
keytool -keystore kafka.client1.keystore.jks -alias CARoot -import -file ca-cert
keytool -keystore kafka.client1.keystore.jks -alias localhost -import -file cert-signed

Once the keystores are created, you have to move the kafka.client1.keystore.jks and kafka.client1.truststore.jks to your neo4j server.

This article discusses addressing this error (Caused by: java.security.cert.CertificateException: No subject alternative names present) that may appear when querying the topic. https://geekflare.com/san-ssl-certificate/

9.2. Kafka Configuration

Connect to your Kafka server and modify the config/server.properties file. This configuration worked in general but other configurations without the EXTERNAL and INTERNAL settings should works as well. This configuration is for Kafka on AWS but should work for other configurations.

listeners=EXTERNAL://0.0.0.0:9092,INTERNAL://0.0.0.0:19092,CLIENT://0.0.0.0:9093,SSL://0.0.0.0:9094
listener.security.protocol.map=EXTERNAL:PLAINTEXT,INTERNAL:PLAINTEXT,CLIENT:PLAINTEXT,SSL:SSL
advertised.listeners=EXTERNAL://aws_public_ip:9092,INTERNAL://aws_internal_ip:19092,CLIENT://aws_public_ip:9093,SSL://aws_public_ip:9094
inter.broker.listener.name=INTERNAL

ssl.keystore.location=/home/kafka/security/kafka.server.keystore.jks
ssl.keystore.password=neo4jpassword
ssl.truststore.location=/home/kafka/security/kafka.server.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.key.password=neo4jpassword
ssl.enabled.protocols=TLSv1.2,TLSv1.1

ssl.endpoint.identification.algorithm=HTTPS
ssl.client.auth=required

9.3. Neo4j Configuration

The following is required for a Neo4j configuration. In this case, we are connecting to the public AWS IP address. The keystore and truststore locations point to the files that you created earlier in the steps.

Note that the passwords are stored in plaintext so limit access to this neo4j.conf file.

kafka.zookeeper.connect=xxx.xxx.xxx.xxx:2181
kafka.bootstrap.servers=xxx.xxx.xxx.xxx:9094
streams.sink.enabled=false
streams.sink.polling.interval=1000

streams.source.topic.nodes.neoTest=Person{*}

kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
streams.sink.dlq=neo4j-dlq

kafka.acks=all
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432

kafka.security.protocol=SSL
kafka.ssl.truststore.location=/home/ubuntu/security/kafka.client1.truststore.jks
kafka.ssl.truststore.password=neo4jpassword
kafka.ssl.keystore.location=/home/ubuntu/security/kafka.client1.keystore.jks
kafka.ssl.keystore.password=neo4jpassword
kafka.ssl.key.password=neo4jpassword
kafka.ssl.endpoint.identification.algorithm=HTTPS

dbms.security.procedures.whitelist=apoc.*
dbms.security.procedures.unrestricted=apoc.*
dbms.jvm.additional=-Djavax.net.debug=ssl:handshake

This line dbms.jvm.additional=-Djavax.net.debug=ssl:handshake is optional but does help for debugging SSL issues.

9.4. Testing

After starting Kafka and Neo4j, you can test by creating a Person node in Neo4j and then query the topic as follows:

./bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic neoTest --from-beginning

If you want to test using SSL, you would do the following:

  • Create a client-ssl.properties file consisting of:
security.protocol=SSL
ssl.truststore.location=/home/kafka/security/kafka.client.truststore.jks
ssl.truststore.password=neo4jpassword
ssl.endpoint.identification.algorithm=

Chapter 10. Confluent Cloud

Configuring a connection to a Confluent Cloud instance should follow Confluent’s Java Client configuration advice, and the advice in Kafka Settings section. At a minimum, to configure this, you will need:

  • BOOTSTRAP_SERVER_URL
  • API_KEY
  • API_SECRET

More specifically the plugin has to be configured as follow:

neo4j.conf. 

kafka.bootstrap.servers=${BOOTSTRAP_SERVER_URL}
kafka.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="${API_KEY}" password="${API_SECRET}";
kafka.ssl.endpoint.identification.algorithm=https
kafka.security.protocol=SASL_SSL
kafka.sasl.mechanism=PLAIN
kafka.request.timeout.ms=20000
kafka.retry.backoff.ms=500

Make sure to replace BOOTSTRAP_SERVER_URL, API_SECRET, and API_KEY with the values that Confluent Cloud gives you when you generate an API access key.

Chapter 11. Examples with Confluent Platform and Kafka Connect Datagen

11.1. Confluent and Neo4j in binary format

In this example Neo4j and Confluent will be downloaded in binary format and Neo4j Streams plugin will be set up in SINK mode. The data consumed by Neo4j will be genereated by the Kafka Connect Datagen. Please note the this connector should be used just for test purposes and is not suitable for production scenarios.

11.1.1. Download and Install Confluent Platform

  • Download Confluent Platform and then choose the desired format .tar.gz or .zip.
  • Decompress the file in your desired folder
  • Add the install location of the Confluent bin directory to your PATH environment variable.
export PATH=<CONFLUENT_HOME_DIR>/bin:$PATH
  • Run Confluent Platform using the following command:
confluent local start

the output should be something like this:

Starting zookeeper
zookeeper is [UP]
Starting kafka
kafka is [UP]
Starting schema-registry
schema-registry is [UP]
Starting kafka-rest
kafka-rest is [UP]
Starting connect
connect is [UP]
Starting ksql-server
ksql-server is [UP]
Starting control-center
control-center is [UP]

11.1.2. Download Neo4j

  • Download the latest version of Neo4j at the following link https://neo4j.com/download-center/
  • Decompress it in your desired folder
  • Install Neo4j Streams plugin by copying the jar in the plugins folder
  • Add the following properties to neo4j.conf in order to enable Sink functionality

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.auto.offset.reset=earliest
kafka.group.id=neo4j
kafka.enable.auto.commit=true
kafka.key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
kafka.value.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer

#********************************************************************
# Kafka Consumer
#********************************************************************
streams.sink.enabled=true
streams.sink.topic.cypher.pageviews=MERGE (n:User {id: event.payload.userid}) MERGE (p:PageView { id: event.payload.pageid }) MERGE (n)-[:VIEWED]->(p)

Configure deserializer accordingly to the choosen data format:

  • org.apache.kafka.common.serialization.ByteArrayDeserializer in case of JSON format
  • io.confluent.kafka.serializers.KafkaAvroDeserializer in case of AVRO format

If AVRO then a schema registry configuration is also needed:

kafka.schema.registry.url=localhost:8081

where 8081 is the default port for the Confluent Schema Registry.

If you started Neo4j before adding above properties, you need also to restart Neo4j server.

11.1.3. Install Kafka Connect Datagen

Install the Kafka Connect Datagen using the Confluent Hub client.

<CONFLUENT_HOME_DIR>/bin/confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest

the output should be something like this:

Running in a "--no-prompt" mode
Implicit acceptance of the license below:
Apache License 2.0
https://www.apache.org/licenses/LICENSE-2.0
Downloading component Kafka Connect Datagen 0.1.5, provided by Confluent, Inc. from Confluent Hub and installing into /Applications/Development/confluent-5.3.1/share/confluent-hub-components
...
Completed

11.1.4. View results

Now you can access to the Confluent Control Center at http://localhost:9021, you can create Kafka topics and generate some sample data. Follow step 2 and step 3 of the official Confluent documentation

When configuring the data generator connectors specify also the Value converter class property with the following value:

org.apache.kafka.connect.json.JsonConverter

Accessing the Neo4j Browser at http://localhost:7474 you can see that Kafka messages generated by the Kafka Connect Datagen were consumed and converted to nodes and relationship accordingly to the cypher specified in the property streams.sink.topic.cypher.pageviews. Just execute the following cypher query:

MATCH p=()-->() RETURN p LIMIT 25

The output should be something like:

Figure 11.1. SINK Output
sink ouput

11.2. Confluent with Docker, Neo4j in binary format

In this example Neo4j will be installed locally and Confluent Platform will be in a Docker environment

11.2.1. Neo4j

Neo4j is installed and configure in the same way as above example

11.2.2. Confluent with Docker

In order to have a ready to use Confluent Platform with Docker, please use the following docker-compose file (please note that in the configuration of the connect service you have to substitute the <version> of kafka-connect-plugin you’re going to install):

docker-compose.yml. 

version: '2'
services:

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
      - zookeeper
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/kafka-connect-datagen:latest
    hostname: connect
    container_name: connect
    depends_on:
      - zookeeper
      - broker
      - schema_registry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
    command:
      - bash
      - -c
      - |
        confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:<version> && \
        confluent-hub install --no-prompt confluentinc/kafka-connect-datagen:latest
        /etc/confluent/docker/run

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
      - zookeeper
      - broker
      - schema_registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

You must allocate a minimum of 8 GB of Docker memory resource in order to avoid Exit Code 137 (Out Of Memory Error) on the connect container

docker memory setting

To see the results follow the instruction explained in above View results section.

Chapter 12. Developing Neo4j Streams

This chapter describes setting up Neo4j Streams for local development.

12.1. Build locally

mvn clean install
  1. Copy <project_dir>/target/neo4j-streams-<VERSION>.jar into $NEO4J_HOME/plugins
  2. Restart Neo4j

12.2. Generating this Documentation

  1. cd doc && ./gradlew clean packageHTML
  2. cd build/html && python3 -m http.server
  3. Browse to http://localhost:8000/

Chapter 13. Neo4j Streams FAQ

13.1. How to integrate Neo4j and Kafka

When integrating Neo4j and Kafka using Neo4j Streams plugin or Kafka Connect plugin is important configure just one of them and not both. If you need to load data from Kafka to Neo4j (and not viceversa) you can just use the Kafka Connect plugin. If you need to have both sink and source functionalities then you have to use the Neo4j Streams plugin. This is the main difference between those plugins.

13.2. About CUD file format

The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations. So every JSON event represents a single operation. For more details about how to use these, please checkout the CUD File Format section for the Neo4j Streams plugin, and the CUD File Format section for Kafka Connect plugin.

13.3. How to ingest events using CDC Schema strategy

Change Data Capture method allows to ingest events between different Neo4j instances. If you decide to use the Neo4j Streams plugin, then the Neo4j source instance will be configured as follow:

streams.sink.enabled=false
streams.source.schema.polling.interval=10000

and the Neo4j sink instance will be configured as follow:

streams.source.enabled=false
streams.sink.topic.cdc.schema=<topic-name>
streams.sink.enabled=true

If you decide to use Kafka Connect plugin for the sink instance, then it has to be configured as follow:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "topics": "<list_of_topics_separated_by_comma>",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://<neo4j-sink-hostname>:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cdc.schema": "<list_of_topics_separated_by_semicolon>"
  }
}

13.4. Is Neo4j Streams supported by Confluent Cloud?

If the need is to run the connector as a managed service then the answer is no. Users who are interested in running Neo4j-Streams as a Cloud managed connector by Confluent should request this of Confluent. Right now there are only a few connectors such as that for S3 that can be run as managed services. Click here to learn more.

However, it is supported in the sense that Neo4j Streams can be connected to a Confluent Cloud instance, and the basic configuration is explained in the Confluent Cloud section. Other references to how to configure it to connect to the Confluent Cloud can be found at the following links:

13.5. Kafka output events description

If you configure the Neo4j Streams plugin as Sink, using a Cypher query in order to ingest data from Kafka into Neo4j, watching the Kafka console consumer output you will see JSON events which describes nodes and relationships creation. They looks like as following:

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":99,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"85","before":null,"after":{"properties":{"name":"Name 86","id":86,"age":2},"labels":["Person"]},"type":"node"},"schema":{"properties":{"name":"String","id":"Long","age":"Long"},"constraints":[]}}

{"meta":{"timestamp":1571329239766,"username":"neo4j","txId":20,"txEventId":100,"txEventsCount":1100,"operation":"created","source":{"hostname":"neo4j"}},"payload":{"id":"0","start":{"id":"0","labels":["Person"],"ids":{}},"end":{"id":"2","labels":["Person"],"ids":{}},"before":null,"after":{"properties":{"years":2}},"label":"KNOWS","type":"relationship"},"schema":{"properties":{"years":"Long"},"constraints":[]}}

A JSON event must describe only one Neo4j entity (node/relationship) at a time.

The specified query is considered unique, so all the entities involved belongs to the same transaction:

  • txId identifies the transaction that affected the entity
  • txEventId is a counter that identifies the internal order in which Neo4j handled the specific event

13.6. How to configure Kafka over SSL?

You will find a guide here on how to configure Neo4j Streams plugin to work with Kafka over SSL. Under the covers, Neo4j Streams plugin uses the official Java libraries for Kafka, so you would configure this in the same way for the Java client.

If you would like to use also Kerberos the following tutorial should be useful: https://henning.kropponline.de/2016/02/21/secure-kafka-java-producer-with-kerberos/

Moreover checkout also the Confluent Kafka official documentation for further details on this topic. Here are some helpful links:

13.7. Enabling DLQ functionality

In order to enable the DLQ functionality you have to specify the following properties:

streams.sink.errors.log.enable=true
streams.sink.errors.log.include.messages=true

These properties has to be specified because their value is false by default. By specifying them you will be able to log errors and bad messages. Moreover you should need to declare also the following property, if left-off no DLQ:

streams.sink.errors.deadletterqueue.topic.name=<topic_name>

For further details try to look at the following section: How deal with bad data

13.8. Supported Kafka deserializers

The Neo4j Streams support two tpye of deserializers:

  • org.apache.kafka.common.serialization.ByteArrayDeserializer, if you want manage JSON messages
  • io.confluent.kafka.serializers.KafkaAvroDeserializer, if you want manage AVRO messages

If AVRO then a schema registry configuration is also needed:

kafka.schema.registry.url=*.*.*.*:8081

Where 8081 is the default port for the Confluent Schema Registry.

13.9. Kafka cluster and topic with multiple partition setup

If the environment is a Kafka cluster composed by:

  • multiple Zookeepers servers
  • multiple Kafka brokers
  • topics with multiple partitions
  • a Neo4j instance configured as Sink

is important to setup Zookeeper servers correctly. This means that the number of Zookeeper instances has to be 2n+1 where n is any number greater then 0. This because the odd number of servers allows ZooKeeper to perform majority elections for leadership.

So, if the cluster is not setup properly, what could happens is that events produced in some partitions may not be read.

Please see the following link for further details: