Source Configuration Settings

Connector Settings

Name Description

connector.class Mandatory

org.neo4j.connectors.kafka.source.Neo4jConnector

key.converter Mandatory

A compatible Kafka Connect converter.

Could be one of;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

key.converter.schema.registry.url

Schema Registry URL for message keys. Required when a schema registry is required by the configured key.converter.

key.converter.optional.for.nullables

Should be set to true when key.converter is io.confluent.connect.protobuf.ProtobufConverter.

value.converter Mandatory

A compatible Kafka Connect converter.

Could be one of;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

value.converter.schema.registry.url

Schema Registry URL for message values. Required when a schema registry is required by the configured value.converter.

value.converter.optional.for.nullables

Should be set to true when key.converter is io.confluent.connect.protobuf.ProtobufConverter.

Neo4j Connection Settings

Name Description

neo4j.uri Mandatory

Neo4j URI to connect to. Multiple URIs can be specified separated by ,.

neo4j.database

Neo4j database name to connect to. Recommended to be specified explicitly.

neo4j.authentication.type Mandatory

Authentication type to use. One of NONE, BASIC, KERBEROS, BEARER, CUSTOM

Default: BASIC

neo4j.authentication.basic.username

Username to authenticate with. Required when neo4j.authentication.type is BASIC.

neo4j.authentication.basic.password

Password to authenticate with. Required when neo4j.authentication.type is BASIC.

neo4j.authentication.basic.realm

Authentication realm to authenticate with, leave empty for default.

neo4j.authentication.kerberos.ticket

Kerberos ticket to establish connection with. Required when neo4j.authentication.type is KERBEROS.

neo4j.authentication.bearer.token

Bearer token to establish connection with. Required when neo4j.authentication.type is BEARER.

neo4j.authentication.custom.scheme

Custom authentication scheme to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.principal

Custom principal to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.credentials

Custom credential to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.realm

Custom authentication realm to authenticate with, set as required by your custom authentication provider.

neo4j.connection-timeout

TCP connection timeout (valid units are: ms, s, m, h and d; default unit is s).

Default: 30s (Driver default)

neo4j.pool.max-connection-pool-size

Maximum number of connections to keep in the connection pool.

Default: 100 (Driver default)

neo4j.pool.connection-acquisition-timeout

Maximum duration to wait for acquiring a connection from the connection pool (valid units are: ms, s, m, h and d; default unit is s).

Default: 60s (Driver default)

neo4j.pool.idle-time-before-connection-test

Duration after which idle connections are tested for liveness (valid units are: ms, s, m, h and d; default unit is s).

Default: no test (Driver default)

neo4j.pool.max-connection-lifetime

Duration after which a connection is dropped from the connection pool (valid units are: `ms, s, m, h and d; default unit is s).

Default: 1h (Driver default)

neo4j.max-retry-time

Maximum duration to retry a transaction.

Default: 30s

neo4j.security.encrypted

Whether encryption is enabled. Only applicable when bolt or neo4j schemes are used inside neo4j.uri. One of true, false

Default: false.

neo4j.security.trust-strategy

Trust strategy to use for TLS connections. One of TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES Required when neo4j.security.encrypted is true.

Default: TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

neo4j.security.cert-files

List of files that contain X509 certificates of CAs to trust. Required when neo4j.security.trust-strategy is TRUST_CUSTOM_CA_SIGNED_CERTIFICATES.

neo4j.security.hostname-verification-enabled

Whether hostname verification is enabled during TLS handshake. One of true, false.

Default: true

Common Source Settings

Name Description

neo4j.source-strategy Mandatory

Source strategy for this connector. One of CDC, QUERY.

neo4j.start-from Mandatory

A time anchor to start streaming from. One of EARLIEST, NOW, USER_PROVIDED. Only used on initial run of the connector instance, and ignored when there is already a stored offset in Kafka Connect.

Default: NOW

neo4j.start-from.value

Custom value to use as a starting offset. Required when neo4j.start-from is set to USER_PROVIDED.

neo4j.ignore-stored-offset

Whether to ignore any offset value retrieved from the offset storage saved by a previous run. One of true, false.

Default: false

neo4j.batch-size

Maximum number of change events to publish for each poll cycle.

Default: 1000

CDC Strategy Settings

Name Description

neo4j.cdc.topic.{NAME}.key-strategy

Serialisation strategy for CDC topic key. One of SKIP, ELEMENT_ID, ENTITY_KEYS, WHOLE_VALUE.

Default: WHOLE_WALUE

neo4j.cdc.topic.{NAME}.value-strategy

Serialisation strategy for CDC topic key. One of CHANGE_EVENT, ENTITY_EVENT.

Default: CHANGE_EVENT

Note: If you’re using a sink connector with one of the CDC strategies, this setting must be configured as CHANGE_EVENT.

neo4j.cdc.poll-duration

Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: ms, s, m, h and d; default unit is s)

Default: 5s

neo4j.cdc.poll-interval

The interval in which the database will be polled for changes during neo4j.cdc.poll-duration (valid units are: ms, s, m, h and d; default unit is s).

Default: 1s

neo4j.cdc.topic.{NAME}.patterns

Comma-separated list of graph patterns, with optional event property filters (prefixed with + or - for inclusion or exclusion respectively).

Example setting: "neo4j.cdc.topic.my-topic.patterns": "(:Person {+name}),(:Person)-[:WORKS_FOR]→(:Company),(:Company {-id})". The change event for Company nodes will not include the id property.

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.pattern

Indexed graph pattern, with optional event property filters (prefixed with + or - for inclusion or exclusion respectively).

Example: "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Company {+name})". The change event for Company nodes will either always include the name property (in CDC FULL enrichment mode) or when the name property has changed (in CDC DIFF enrichment mode).

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.operation

One of CREATE, UPDATE, DELETE. Operation for which we want to receive events for the corresponding indexed graph pattern.

Example: "neo4j.cdc.topic.my-topic.patterns.0.operation": "DELETE"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.changesTo

Comma-separated list of property names.

Example: "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name, age" (see how selectors are combined)

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.authenticatedUser

User who was authenticated while performing the change.

Example: "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "userA"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.executingUser

User who was executing the transaction while performing the change. Usually will be same with authenticatedUser, but might change if user impersonation is used.

Example: "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "userB"

neo4j.cdc.topic.{NAME}.patterns.{INDEX}.metadata.txMetadata.{KEY}

Key-value transaction metadata selector. The key-value pair is matched against the actual transaction metadata (with the key stripped of the aforementioned prefix).

Example: "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "neo4j-browser"

QUERY strategy Settings

Name Description

neo4j.query.topic Mandatory

Kafka topic to publish change events gathered through provided query.

neo4j.query Mandatory

Cypher query to use for gathering changes. Requires both neo4j.query.streaming-property to be in the result set, and $lastCheck query parameter used for tracking changes.

neo4j.query.streaming-property

Property name that is both present in the result set of the specified query and used as a filter to query changes from a previous value.

Default: timestamp

neo4j.query.poll-duration

Maximum amount of time Kafka Connect poll request will wait for a change to be received from the database (valid units are: ms, s, m, h and d; default unit is s).

Default: 5s

neo4j.query.poll-interval

The interval in which the database will be polled for changes during neo4j.query.poll-duration (valid units are: ms, s, m, h and d; default unit is s).

Default: 1s

neo4j.query.timeout

Maximum amount of time query is allowed to run (valid units are: ms, s, m, h and d; default unit is s).

Default: 0s