Kafka Connect Plugin

neo4j loves confluent
Figure 1. Neo4j Loves Confluent

Kafka Connect, an open source component of Apache Kafka, is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.

The Neo4j Streams project provides a Kafka Connect plugin that can be installed into the Confluent Platform enabling:

  • Ingest data from Kafka topics directly into Neo4j via templated Cypher queries;

  • Stream Neo4j transaction events (coming soon).

Plugin installation

You can choose your preferred way in order to install the plugin

Download and install the plugin via Confluent Hub client

If you are using the provided compose file you can easily install the plugin by using the Confluent Hub.

Once the compose file is up and running you can install the plugin by executing the following command:

<confluent_platform_dir>/bin/confluent-hub install neo4j/kafka-connect-neo4j:<version>

When the installation will ask:

The component can be installed in any of the following Confluent Platform installations:

Please prefer the solution (where this tool is installed) and then go ahead with the default options.

Following an example:

confluent hub client installation
Figure 2. Installation via Confluent Hub Client

At the end of the process the plugin is automatically installed.

Download the zip from the Confluent Hub

Please go to the Confluent Hub page of the plugin:

And click to the Download Connector button.

Once you downloaded the file please place it into your Kafka Connect plugins dir.

Build it locally

Download the project from Github:

git clone https://github.com/neo4j-contrib/neo4j-streams.git

Go into the neo4j-streams directory:

cd neo4j-streams

Build the project by running the following command:

mvn clean install

Inside the directory <neo4j-streams>/kafka-connect-neo4j/target/component/packages you’ll find a file named neo4j-kafka-connect-neo4j-<VERSION>.zip, please unpackage and place it into your Kafka Connect plugins dir.

Create the Sink Instance

Create the Sink instance:

We’ll define the Sink configuration in several ways:

  • by providing a Cypher template

  • by ingesting the events emitted from another Neo4j instance via the Change Data Capture module

  • by providing a pattern extraction to a JSON or AVRO file

  • by managing a CUD file format

Cypher template

{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.deadletterqueue.topic.name": "test-error-topic",
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

In particular this line:

"streams.sink.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"

defines that all the data that comes from the topic my-topic will be unpacked by the Sink into Neo4j with the following Cypher query:

MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Under the hood the Sink inject the event object in this way

UNWIND {batch} AS event
MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)

Where {batch} is a list of event objects.

You can change the query or remove the property and add your own, but you must follow the following convention:

"streams.sink.topic.cypher.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

Let’s load the configuration into the Confluent Platform with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

The file contrib.sink.string-json.neo4j.json contains a configuration that manage a simple JSON producer example

Please check that everything is fine by going into:

http://localhost:9021/management/connect

and click to the Sink tab. You must find a table just like this:

Status Active Tasks Name Topics

Running

1

Neo4jSinkConnector

my-topic

Note that the Sink instance can be configured also to monitor multiple topics. Just evaluate the property topics with a list of topic separated by comma. For example:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "topics": "topicA,topicB",
    "_comment": "Cypher template example configuration",
    "neo4j.topic.cypher.topicA": "<YOUR_CYPHER_QUERY>",
    "neo4j.topic.cypher.topicB": "<YOUR_CYPHER_QUERY>",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false
  }
}

Sink ingestion strategies

Change Data Capture Event

This method allows to ingest CDC events coming from another Neo4j Instance. You can use two strategies:

  • The SourceId strategy which merges the nodes/relationships by the CDC event id field (it’s related to the Neo4j physical ID)

  • The Schema strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined in your graph model

The SourceId strategy

You can configure the topic in the following way:

streams.sink.topic.cdc.sourceId=<list of topics separated by semicolon>
streams.sink.topic.cdc.sourceId.labelName=<the label attached to the node, default=SourceEvent>
streams.sink.topic.cdc.sourceId.idName=<the id name given to the CDC id field, default=sourceId>
streams.sink.topic.cdc.sourceId=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}

as you can notice, ingested event has been projected with two peculiarities:

  • the id field has transformed into sourceId;

  • the node has an additional label SourceEvent;

these two fields will be used in order to match the node/relationship for future updates/deletes

The Schema strategy

You can configure the topic in the following way:

streams.sink.topic.cdc.schema=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cdc.schema=my-topic;my-other.topic

Each streams event will be projected into the related graph entity, for instance the following event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}

will be persisted as the following node:

Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}

The Schema strategy leverages the schema field in order to insert/update the nodes so no extra fields will be created.

In case of relationship

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

the Schema strategy leverages the ids fields in order to insert/update the relationships so no extra fields will be created.

The Pattern strategy

The Pattern strategy allows you to extract nodes and relationships from a json by providing a extraction pattern

Each property can be prefixed with:

  • !: identify the id (could be more than one property), it’s mandatory

  • -: exclude the property from the extraction If no prefix is specified this means that the property will be included

You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties

Labels can be chained via :

Tombstone Record Management

The pattern strategy come out with the support to the Tombstone Record, in order to leverage it your event should contain as key the record that you want to delete and null for the value.

Currently you can’t concatenate multiple patterns (for example in case you use just one topic and produce more then one node/relationship type). So you have to use a different topic for each type of node/relationship and define a pattern for each of them
The Node Pattern configuration

You can configure the node pattern extraction in the following way:

streams.sink.topic.pattern.node.<TOPIC_NAME>=<NODE_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}

You can transform it into a node by providing the following configuration:

by specifying a simpler pattern:

streams.sink.topic.pattern.node.user=User{!userId}

or by specifying a Cypher like node pattern:

streams.sink.topic.pattern.node.user=(:User{!userId})

Similar to the CDC pattern you can provide:

pattern meaning

User:Actor{!userId} or User:Actor{!userId,*}

the userId will be used as ID field and all properties of the json will be attached to the node with the provided labels (User and Actor) so the persisted node will be: (User:Actor{userId: 1, name: 'Andrea', surname: 'Santurbano', address.city: 'Venice', address.cap: 30100})

User{!userId, surname}

the userId will be used as ID field and only the surname property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano'})

User{!userId, surname, address.city}

the userId will be used as ID field and only the surname and the address.city property of the json will be attached to the node with the provided labels (User) so the persisted node will be: (User{userId: 1, surname: 'Santurbano', address.city: 'Venice'})

User{!userId,-address}

the userId will be used as ID field and the address property will be excluded so the persisted node will be: (User{userId: 1, name: 'Andrea', surname: 'Santurbano'})

The Relationship Pattern configuration

You can configure the relationship pattern extraction in the following way:

streams.sink.topic.pattern.relationship.<TOPIC_NAME>=<RELATIONSHIP_EXTRACTION_PATTERN>

So for instance, given the following json published via the user topic:

{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}

You can transform it into a path, like (n)-[r]→(m), by providing the following configuration:

By specifying a simpler pattern:

streams.sink.topic.pattern.relationship.user=User{!userId} BOUGHT{price, currency} Product{!productId}

or by specifying a Cypher like node pattern:

streams.sink.topic.pattern.relationship.user=(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})

in this last case the we assume that User is the source node and Product the target node

Similar to the CDC pattern you can provide:

pattern meaning

(User{!userId})-[:BOUGHT]→(Product{!productId}) or (User{!userId})-[:BOUGHT{price, currency}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the other json properties on them so the persisted data will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice', shippingAddress.cap: 30100}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{-shippingAddress}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties (by the exclusion) so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100})

(User{!userId})-[:BOUGHT{price,currency, shippingAddress.city}]→(Product{!productId})

this will merge fetch/create the two nodes by the provided identifier and the BOUGHT relationship between them. And then set all the specified json properties so the persisted pattern will be: (User{userId: 1})-[:BOUGHT{price: 10, currency: '€', shippingAddress.city: 'Venice'}]→(Product{productId: 100})

Attach properties to node

By default no properties will be attached to the edge nodes but you can specify which property attach to each node. Given the following json published via the user topic:

{
    "userId": 1,
    "userName": "Andrea",
    "userSurname": "Santurbano",
    "productId": 100,
    "productName": "My Awesome Product!",
    "price": 10,
    "currency": "€"
}
pattern meaning

(User{!userId, userName, userSurname})-[:BOUGHT]→(Product{!productId, productName})

this will merge two nodes and the BOUGHT relationship between with all json properties them so the persisted pattern will be: (User{userId: 1, userName: 'Andrea', userSurname: 'Santurbano'})-[:BOUGHT{price: 10, currency: '€'}]→(Product{productId: 100, name: 'My Awesome Product!'})

CUD File Format

The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in term of Create/Update/Delete operations.

You can configure the topic in the following way:

streams.sink.topic.cud=<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>
streams.sink.topic.cud=my-topic;my-other.topic

We have two formats:

  • One for nodes:

    We provide an example of a MERGE operation

    {
      "op": "merge",
      "properties": {
        "foo": "value",
        "key": 1
      },
      "ids": {"key": 1, "otherKey":  "foo"},
      "labels": ["Foo","Bar"],
      "type": "node",
      "detach": true
    }

which would be transformed into the following Cypher query:

UNWIND [..., {
  "op": "merge",
  "properties": {
    "foo": "value",
    "key": 1
  },
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": true
}, ...] AS event
MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
SET n += event.properties

Lets describe the fields:

Table 1. CUD file Node format fields description
field mandatory Description

op

yes

The operation type: create/merge/update/delete

N.B. delete messages are for individual nodes it’s not intended to be a generic way of doing cypher query building from JSON

properties

no in case the operation is delete, otherwise yes

The properties attached to the node

ids

no in case the operation is create, otherwise yes

In case the operation is merge/update/delete this field is mandatory and contains the primary/unique keys of the node that will be use to do the lookup to the entity. In case you use as key the _id name the cud format will refer to Neo4j’s node internal for the node lookup.

N.B. If you’ll use the _id reference with the op merge it will work as simple update, this means that if the node with the passed internal id does not exists it will not be created.

labels

no

The labels attached to the node.

N.B. Neo4j allows to create nodes without labels, but from a performance perspective, it’s a bad idea don’t provide them.

type

yes

The entity type: node/relationship ⇒ node in this case

detach

no

In case the operation is delete you can specify if perform a "detach" delete that means delete any incident relationships when you delete a node

N.B. if no value is provided, the default is true

  • And one for relationships:

We provide an example of a CREATE operation

{
  "op": "create",
  "properties": {
    "foo": "rel-value",
    "key": 1
  },
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}

which would be transformed into the following Cypher query:

UNWIND [..., {
  "op": "create",
  "properties": {
    "foo": "rel-value",
    "key": 1
  },
  "rel-type": "MY-REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
CREATE (from)-[r:MY_REL]->(to)
SET r = event.properties

Lets describe the fields:

Table 2. CUD file Relationship format fields description
field mandatory Description

op

yes

The operation type: create/merge/update/delete

properties

no

The properties attached to the relationship

rel_type

yes

The relationship type

from

yes, if you use the _id field reference into ids you can left labels blank

Contains the info about the source node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

to

yes, if you use the _id field reference into ids you can left labels blank

Contains the info about the target node of the relationship. For the description of the ids and labels fields please please look at the node fields description above

type

yes

The entity type: node/relationship ⇒ relationship in this case

Following another example of DELETE operation for both node and relationship.

  • For Node, the following JSON:

{
  "op": "delete",
  "properties": {},
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": false
}

will be transformed in the following Cypher query:

UNWIND [..., {
  "op": "delete",
  "properties": {},
  "ids": {"key": 1, "otherKey":  "foo"},
  "labels": ["Foo","Bar"],
  "type": "node",
  "detach": false
}, ...] AS event
MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
DELETE n

Note that if you set "detach": true then the transformation will be:

UNWIND [
...
] AS event
...
DETACH DELETE n
  • For Relationship, the following JSON:

{
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}

will be transformed in the following Cypher query:

UNWIND [..., {
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"]
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"]
  },
  "type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
MATCH (from)-[r:MY_REL]->(to)
DELETE r

We can create non-existent nodes at relationship creation/merging, putting "op": "merge" in "from" and/or "to" field. By default, "op" is match, so the node is not created if it doesn’t exist. We can write, for example:

{
  "op": "create",
  "properties": {},
  "rel_type": "MY_REL",
  "from": {
    "ids": {"key": 1},
    "labels": ["Foo","Bar"],
    "op": "merge"
  },
  "to": {
    "ids": {"otherKey":1},
    "labels": ["FooBar"],
    "op": "merge"
  },
  "type":"relationship"
}

Multi Database Support

Neo4j 4.0 Enterprise has multi-tenancy support, in order to support this feature, in order to support this feature with Kafka Connect plugin, creating the Sink instance we have to add the neo4j.database property, which tells the Connector the database to use as default. If you don’t specify that property, the default database neo4j will be used.

Remember the naming rules for databases described here

Following an example:

{
  "name": "Neo4jSinkConnector",
  "config": {
    "neo4j.database": "<database_name>",
    "topics": "topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter.schemas.enable": false,
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.encryption.enabled": false,
    "_comment": "Sink CDC SourceId Strategy",
    "neo4j.topic.cdc.sourceId": "topic",
    "neo4j.topic.cdc.sourceId.labelName": "<the label attached to the node, default=SourceEvent>",
    "neo4j.topic.cdc.sourceId.idName": "<the id name given to the CDC id field, default=sourceId>",
    "_comment": "Sink CDC Schema Strategy",
    "neo4j.topic.cdc.schema": "<list_of_topics_separated_by_semicolon>",
    "_comment": "Sink Node/Relationship Pattern Strategy",
    "neo4j.topic.pattern.node.<TOPIC_NAME>": "<node_extraction_pattern>",
    "neo4j.topic.pattern.relationship.<TOPIC_NAME>": "<relationship_extraction_pattern>",
    "_comment": "Sink CUD File forma Strategy",
    "neo4j.topic.cud": "<list_of_topics_separated_by_semicolon>"
  }
}

How deal with bad data

In Kafka Connect plugin, in the creation phase of the Sink instance, in addition to the properties described in the Dead Letter Queue configuration parameters table, you need to define kafka broker connection properties:

Name mandatory Description

kafka.bootstrap.servers

true

It’s the Kafka Broker url. *(please look at the description below)

kafka.<any_other_kafka_property>

false

You can also specify any other kafka Producer setting by adding the kafka. prefix (i.e the configuration acks become kafka.acks). See the Apache Kafka documentation for details on these settings.

As you may have noticed we’re asking to provide the bootstrap.server property, this because the Kafka Connect Framework provides an out-of-the-box support only for deserialization errors and message transformations (please look into the following link for further details: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/). We want to extend this feature for transient errors in order to cover the 100% of failures. So to do that at this moment as suggested by Confluent we need to ask again the broker location, until this JIRA issue will not be addressed: https://issues.apache.org/jira/browse/KAFKA-8597. Said that, these properties has to be added only if you want to also redirect Neo4j errors into the DLQ.

Monitor via Confluent Platform UI

The Kafka Monitoring UI can be found at http://<localhost>:9021/management/connect

confluent metrics
Figure 3. Confluent Importing Metrics

They show up properly in my topic, and then are added to Neo4j via the sink.

Below you see the data that has been ingested into Neo4j. During my testing I got up to more than 2M events.

confluent imported data
Figure 4. Confluent Platform Management

Kafka Connect Client Config Override Policy

In Apache Kafka 2.3.0 was introduced the ability for each source and sink connector to inherit their client configurations from the worker properties.

Configuration Summary

Following a summary of all the configuration parameters you can use for the Kafka Connect plugin:

Table 3. Kafka Connect configuration parameters
Name Value Mandatory Note

topics

<topicA,topicB>

true

A list of comma-separated topics

connector.class

streams.kafka.connect.sink.Neo4jSinkConnector

true

key.converter

org.apache.kafka.connect.storage.StringConverter

false

Converter class for key Connect data

value.converter

org.apache.kafka.connect.json.JsonConverter

false

Converter class for value Connect data

key.converter.schemas.enable

true/false

false

If true the key will be treated as a composite JSON object containing schema and the data. Default value is false

value.converter.schemas.enable

true/false

false

If true the value will be treated as a composite JSON object containing schema and the data. Default value is false

key.converter.schema.registry.url

http://localhost:8081

false

The Schema Registry URL has to be provide only when you decide to use AvroConverter

value.converter.schema.registry.url

http://localhost:8081

false

The Schema Registry URL has to be provide only when you decide to use AvroConverter

kafka.bootstrap.servers

<localhost:9092>

false

The Broker URI is mandatory only when if you have configured DLQ

kafka.<any_other_kafka_property

false

errors.tolerance

all/none

false

all == lenient, silently ignore bad messages. none (default) means that any error will result in a connector failure

errors.log.enable

false/true

false

log errors (default: false)

errors.log.include.messages

false/true

false

log bad messages too (default: false)

errors.deadletterqueue.topic.name

topic-name

false

dead letter queue topic name, if left off no DLQ, default: not set

errors.deadletterqueue.context.headers.enable

false/true

false

enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false

errors.deadletterqueue.context.headers.prefix

prefix-text

false

common prefix for header entries, e.g. "__streams.errors." , default: not set

errors.deadletterqueue.topic.replication.factor

3/1

false

replication factor, need to set to 1 for single partition, default:3

neo4j.database

"bolt://neo4j:7687"

true

Specify a database name only if you want to use a non-default database. Default value is 'neo4j'

neo4j.server.uri

"bolt://neo4j:7687"

true

Neo4j Server URI

neo4j.authentication.basic.username

your_neo4j_user

true

Neo4j username

neo4j.authentication.basic.password

your_neo4j_password

true

Neo4j password

neo4j.authentication.basic.realm

your_neo4j_auth_realm

false

The authentication realm

neo4j.authentication.kerberos.ticket

your_kerberos_ticket

false

The Kerberos ticket

neo4j.authentication.type

NONE/BASIC/KERBEROS

false

The authentication type (default: 'BASIC')

neo4j.batch.size

Integer

false

The max number of events processed by the Cypher query (default: 1000)

neo4j.batch.timeout.msecs

Integer

false

The execution timeout for the cypher query (default: 0, that is without timeout)

neo4j.batch.parallelize

boolean

false

If enabled messages are processed concurrently in the sink. Non concurrent execution supports in-order processing, e.g. for CDC

neo4j.connection.max.lifetime.msecs

Long

false

The max Neo4j connection lifetime (default: 1 hour)

neo4j.connection.acquisition.timeout.msecs

Long

false

The max Neo4j acquisition timeout (default 1 hour)

neo4j.connection.liveness.check.timeout.msecs

Long

false

The max Neo4j liveness check timeout (default 1 hour)

neo4j.connection.max.pool.size

Int

false

The max pool size (default: 100)

neo4j.encryption.ca.certificate.path

your_certificate_path

false

The path of the certificate

neo4j.encryption.enabled

true/false

false

neo4j.encryption.trust.strategy

TRUST_ALL_CERTIFICATES/TRUST_CUSTOM_CA_SIGNED_CERTIFICATES/TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

false

The Neo4j trust strategy (default: TRUST_ALL_CERTIFICATES)

neo4j.retry.backoff.msecs

Long

false

The time in milliseconds to wait following a transient error before a retry attempt is made (default: 30000).

neo4j.retry.max.attemps

Long

false

The maximum number of times to retry on transient errors (except for TimeoutException) before failing the task (default: 5).

neo4j.topic.cdc.sourceId

<list of topics separated by semicolon>

false

neo4j.topic.cdc.sourceId.labelName

<the label attached to the node>

false

default value is SourceEvent

neo4j.topic.cdc.sourceId.idName

<the id name given to the CDC id field>

false

default value is sourceId

neo4j.topic.cdc.schema

<list of topics separated by semicolon>

false

neo4j.topic.pattern.node.<TOPIC_NAME>

<node extraction pattern>

false

neo4j.topic.pattern.relationship.<TOPIC_NAME>

<relationship extraction pattern>

false

neo4j.topic.cud

<list of topics separated by semicolon>

false

If you need to manage data in JSON format without using the Schema Registry, then you can use the org.apache.kafka.connect.json.JsonConverter and disabling both key.converter.schemas.enable and value.converter.schemas.enable.

Other supported converters are:

  • org.apache.kafka.connect.storage.StringConverter

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • io.confluent.connect.avro.AvroConverter

For further information about Kafka Connect properties, please checkout the following:

For further details about error handling properties refers to How deal with bad data section

Kafka Connect plugin supports also the secured Neo4j URI schemes. Please see the Neo4j official documentation for detailed information: https://neo4j.com/docs/driver-manual/current/client-applications/#driver-configuration-examples