Sink Configuration Settings

Connector Settings

Name Description

connector.class Mandatory

org.neo4j.connectors.kafka.sink.Neo4jConnector

key.converter Mandatory

A compatible Kafka Connect converter.

Could be one of;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

key.converter.schema.registry.url

Schema Registry URL for message keys. Required when a schema registry is required by the configured key.converter.

key.converter.optional.for.nullables

Should be set to true when key.converter is io.confluent.connect.protobuf.ProtobufConverter.

value.converter Mandatory

A compatible Kafka Connect converter.

Could be one of;

  • io.confluent.connect.avro.AvroConverter

  • io.confluent.connect.json.JsonSchemaConverter

  • io.confluent.connect.protobuf.ProtobufConverter

value.converter.schema.registry.url

Schema Registry URL for message values. Required when a schema registry is required by the configured value.converter.

value.converter.optional.for.nullables

Should be set to true when key.converter is io.confluent.connect.protobuf.ProtobufConverter.

Neo4j Connection Settings

Name Description

neo4j.uri Mandatory

Neo4j URI to connect to. Multiple URIs can be specified separated by ,.

neo4j.database

Neo4j database name to connect to. Recommended to be specified explicitly.

neo4j.authentication.type Mandatory

Authentication type to use. One of NONE, BASIC, KERBEROS, BEARER, CUSTOM

Default: BASIC

neo4j.authentication.basic.username

Username to authenticate with. Required when neo4j.authentication.type is BASIC.

neo4j.authentication.basic.password

Password to authenticate with. Required when neo4j.authentication.type is BASIC.

neo4j.authentication.basic.realm

Authentication realm to authenticate with, leave empty for default.

neo4j.authentication.kerberos.ticket

Kerberos ticket to establish connection with. Required when neo4j.authentication.type is KERBEROS.

neo4j.authentication.bearer.token

Bearer token to establish connection with. Required when neo4j.authentication.type is BEARER.

neo4j.authentication.custom.scheme

Custom authentication scheme to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.principal

Custom principal to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.credentials

Custom credential to establish connection with. Required when neo4j.authentication.type is CUSTOM.

neo4j.authentication.custom.realm

Custom authentication realm to authenticate with, set as required by your custom authentication provider.

neo4j.connection-timeout

TCP connection timeout (valid units are: ms, s, m, h and d; default unit is s).

Default: 30s (Driver default)

neo4j.pool.max-connection-pool-size

Maximum number of connections to keep in the connection pool.

Default: 100 (Driver default)

neo4j.pool.connection-acquisition-timeout

Maximum duration to wait for acquiring a connection from the connection pool (valid units are: ms, s, m, h and d; default unit is s).

Default: 60s (Driver default)

neo4j.pool.idle-time-before-connection-test

Duration after which idle connections are tested for liveness (valid units are: ms, s, m, h and d; default unit is s).

Default: no test (Driver default)

neo4j.pool.max-connection-lifetime

Duration after which a connection is dropped from the connection pool (valid units are: `ms, s, m, h and d; default unit is s).

Default: 1h (Driver default)

neo4j.max-retry-time

Maximum duration to retry a transaction.

Default: 30s

neo4j.security.encrypted

Whether encryption is enabled. Only applicable when bolt or neo4j schemes are used inside neo4j.uri. One of true, false

Default: false.

neo4j.security.trust-strategy

Trust strategy to use for TLS connections. One of TRUST_ALL_CERTIFICATES, TRUST_SYSTEM_CA_SIGNED_CERTIFICATES, TRUST_CUSTOM_CA_SIGNED_CERTIFICATES Required when neo4j.security.encrypted is true.

Default: TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

neo4j.security.cert-files

List of files that contain X509 certificates of CAs to trust. Required when neo4j.security.trust-strategy is TRUST_CUSTOM_CA_SIGNED_CERTIFICATES.

neo4j.security.hostname-verification-enabled

Whether hostname verification is enabled during TLS handshake. One of true, false.

Default: true

Common Sink Settings

Name Description

neo4j.batch-size

Maximum number of messages processed per transaction per topic.

Default: 1000

neo4j.batch-timeout

Maximum amount of time a batch is allowed to be processed (valid units are: ms, s, m, h and d; default unit is s).

Default: 0s

Cypher Strategy Settings

Name Description

neo4j.cypher.topic.{NAME}

Cypher statement to run for the specified topic. Example: "neo4j.cypher.topic.my-topic": "MERGE (n:Person {name: __value.name})"

neo4j.cypher.bind-timestamp-as

Under what name message timestamp will be bound in user provided Cypher statements. Set to empty string to disable binding of message timestamp.

Default: __timestamp

neo4j.cypher.bind-header-as

Under what name message header will be bound in user provided Cypher statements. Message headers will be bound as a map of header names to corresponding values. Set to empty string to disable binding of message headers.

Default: __header

neo4j.cypher.bind-key-as

Under what name message key will be bound in user provided Cypher statements. Set to empty string to disable binding of message key.

Default: __key

neo4j.cypher.bind-value-as

Under what name message value will be bound in user provided Cypher statements. Set to empty string to disable binding of message value.

Default: __value

neo4j.cypher.bind-value-as-event

Whether message value will be bound as 'event' in user provided Cypher statements for backward compatibility.

Default: true

Pattern Strategy Settings

Name Description

neo4j.pattern.topic.{NAME}

Node or relationship pattern to apply on the messages received from the specified topic. Example: "neo4j.pattern.topic.user": "(:User\{!userId})" Example: "neo4j.pattern.topic.user": "(:User\{!userId})-[:KNOWS]→(:User\{!otherUserId})"

neo4j.pattern.merge-node-properties

Whether to merge incoming properties with existing properties of nodes.

Default: false

neo4j.pattern.merge-relationship-properties

Whether to merge incoming properties with existing properties of relationships.

Default: false

neo4j.pattern.bind-timestamp-as

Under what name message timestamp will be bound in patterns. Set to empty string to disable binding of message timestamp.

Default: __timestamp

neo4j.pattern.bind-header-as

Under what name message header will be bound in patterns. Message headers will be bound as a map of header names to corresponding values. Set to empty string to disable binding of message headers.

Default: __header

neo4j.pattern.bind-key-as

Under what name message key will be bound in patterns. Set to empty string to disable binding of message key.

Default: __key

neo4j.pattern.bind-value-as

Under what name message value will be bound in patterns. Set to empty string to disable binding of message value.

Default: __value

CDC Strategy Settings

Name Description

neo4j.cdc.schema.topics

The topic(s) that contain CDC events generated by a Source instance of this connector using CDC source strategy.

neo4j.cdc.source-id.topics

The topic(s) that contain CDC events generated by a Source instance of this connector using CDC source strategy.

neo4j.cdc.source-id.label-name

The label name attached to the nodes managed by the CDC Source Id strategy.

Default: SourceEvent

neo4j.cdc.source-id.property-name

The id property name attached to the nodes managed by the CDC Source Id strategy.

Default: sourceId

CUD Strategy Settings

Name Description

neo4j.cud.topics

The topic(s) that contain CUD events.