Configuration Summary

The documentation on the deprecated Neo4j Streams plugin can be found here.

Following a summary of all the configuration parameters you can use for the Kafka Connect Neo4j Connector:

Table 1. Kafka Connect Common configuration parameters (shared between Source and Sink)
Name Value Mandatory Note

connector.class

streams.kafka.connect.sink.Neo4jSinkConnector

true

key.converter

org.apache.kafka.connect.storage.StringConverter

false

Converter class for key Connect data

value.converter

org.apache.kafka.connect.json.JsonConverter

false

Converter class for value Connect data

key.converter.schemas.enable

true/false

false

If true the key will be treated as a composite JSON object containing schema and the data. Default value is false

value.converter.schemas.enable

true/false

false

If true the value will be treated as a composite JSON object containing schema and the data. Default value is false

key.converter.schema.registry.url

http://localhost:8081

false

The Schema Registry URL has to be provided only when you decide to use AvroConverter

value.converter.schema.registry.url

http://localhost:8081

false

The Schema Registry URL has to be provided only when you decide to use AvroConverter

neo4j.database

"bolt://neo4j:7687"

true

Specify a database name only if you want to use a non-default database. Default value is 'neo4j'

neo4j.server.uri

"bolt://neo4j:7687"

true

Neo4j Server URI

neo4j.authentication.basic.username

your_neo4j_user

true

Neo4j username

neo4j.authentication.basic.password

your_neo4j_password

true

Neo4j password

neo4j.authentication.basic.realm

your_neo4j_auth_realm

false

The authentication realm

neo4j.authentication.kerberos.ticket

your_kerberos_ticket

false

The Kerberos ticket

neo4j.authentication.type

NONE/BASIC/KERBEROS

false

The authentication type (default: 'BASIC')

neo4j.batch.size

Integer

false

The max number of events processed by the Cypher query (default: 1000)

neo4j.batch.timeout.msecs

Integer

false

The execution timeout for the cypher query (default: 0, that is without timeout)

neo4j.connection.max.lifetime.msecs

Long

false

The max Neo4j connection lifetime (default: 1 hour)

neo4j.connection.acquisition.timeout.msecs

Long

false

The max Neo4j acquisition timeout (default 1 hour)

neo4j.connection.liveness.check.timeout.msecs

Long

false

The max Neo4j liveness check timeout (default 1 hour)

neo4j.connection.max.pool.size

Int

false

The max pool size (default: 100)

neo4j.encryption.ca.certificate.path

your_certificate_path

false

The path of the certificate

neo4j.encryption.enabled

true/false

false

neo4j.encryption.trust.strategy

TRUST_ALL_CERTIFICATES/TRUST_CUSTOM_CA_SIGNED_CERTIFICATES/TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

false

The Neo4j trust strategy (default: TRUST_ALL_CERTIFICATES)

neo4j.retry.backoff.msecs

Long

false

The time in milliseconds to wait following a transient error before a retry attempt is made (default: 30000).

neo4j.retry.max.attemps

Long

false

The maximum number of times to retry on transient errors (except for TimeoutException) before failing the task (default: 5).

Table 2. Kafka Connect Sink configuration parameters
Name Value Mandatory Note

topics

<topicA,topicB>

true

A list of comma-separated topics

kafka.bootstrap.servers

<localhost:9092>

false

The Broker URI is mandatory only when if you have configured DLQ

kafka.<any_other_kafka_property>

false

errors.tolerance

all/none

false

all == lenient, silently ignore bad messages. none (default) means that any error will result in a connector failure

errors.log.enable

false/true

false

log errors (default: false)

errors.log.include.messages

false/true

false

log bad messages too (default: false)

errors.deadletterqueue.topic.name

topic-name

false

dead letter queue topic name, if left off no DLQ, default: not set

errors.deadletterqueue.context.headers.enable

false/true

false

enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false

errors.deadletterqueue.context.headers.prefix

prefix-text

false

common prefix for header entries, e.g. "__streams.errors." , default: not set

errors.deadletterqueue.topic.replication.factor

3/1

false

replication factor, need to set to 1 for single partition, default:3

neo4j.batch.parallelize

boolean

false

If enabled messages are processed concurrently in the sink. Non concurrent execution supports in-order processing, e.g. for CDC

neo4j.topic.cdc.sourceId

<list of topics separated by semicolon>

false

neo4j.topic.cdc.sourceId.labelName

<the label attached to the node>

false

default value is SourceEvent

neo4j.topic.cdc.sourceId.idName

<the id name given to the CDC id field>

false

default value is sourceId

neo4j.topic.cdc.schema

<list of topics separated by semicolon>

false

neo4j.topic.pattern.node.<TOPIC_NAME>

<node extraction pattern>

false

neo4j.topic.pattern.relationship.<TOPIC_NAME>

<relationship extraction pattern>

false

neo4j.topic.cud

<list of topics separated by semicolon>

false

neo4j.topic.cypher.<topic>

<valid Cypher query>

false

Table 3. Kafka Connect Source configuration parameters
Name Value Mandatory Note

topic

<topic>

true

The topic where the Source will publish the data

partitions

<partition>

false

The number of partition for the Source (default 1)

neo4j.streaming.from

enum[ALL, NOW, LAST_COMMITTED]

false

When the Source connector starts:

  • ALL means that the Source will stream all the data inside the graph from the beginning.

  • NOW means that the Source will start streaming by filtering data using the moment the connector was started as timestamp value. (default value)

  • LAST_COMMITTED will try to retrieve the last committed timestamp offset, in case it will not find one LAST_COMMITTED use NOW as fallback.

neo4j.source.type

enum[QUERY]

false

The type of the Source strategy, with QUERY you must set neo4j.source.query

neo4j.source.query

<valid cypher query>

false

The Cypher query in order to extract the data from Neo4j you need to define it if you use neo4j.source.type=QUERY

neo4j.streaming.property

<property>

false

The name of the property that we need to consider in order to determinate the last queried record; if not defined we use an internal value given from the last performed check. We use this value for injecting it in the provided query defined in neo4j.source.query as $lastChek parameter

neo4j.streaming.poll.interval.msecs

<millis>

Int

The polling interval in ms (Default: 10000)

neo4j.enforce.schema

<true/false>

false

Apply a schema to each record (Default: false)

neo4j.cdc.topic.<topic>.key-strategy

enum[SKIP, ELEMENT_ID, ENTITY_KEYS, WHOLE_VALUE]

false

The strategy that determines how message keys are serialized, based on the corresponding change event (Default: WHOLE_VALUE)

If you need to manage data in JSON format without using the Schema Registry, then you can use the org.apache.kafka.connect.json.JsonConverter and disabling both key.converter.schemas.enable and value.converter.schemas.enable.

Other supported converters are:

  • org.apache.kafka.connect.storage.StringConverter

  • org.apache.kafka.connect.converters.ByteArrayConverter

  • io.confluent.connect.avro.AvroConverter

For further information about Kafka Connect properties, please checkout the following:

For further details about error handling properties refers to How deal with bad data section

Kafka Connect Neo4j Connector supports also the secured Neo4j URI schemes. Please see the Neo4j official documentation for detailed information: https://neo4j.com/docs/driver-manual/current/client-applications/#driver-configuration-examples