The Configuration System

The Kafka Connect Neo4j Connector is the recommended method to integrate Kafka with Neo4j, as Neo4j Streams is no longer under active development and will not be supported after version 4.4 of Neo4j.

The most recent version of the Kafka Connect Neo4j Connector can be found here.

Configuration system overview

Location of Configuration Information

For versions prior to 4.0.7 the configuration management works statically with the properties provided inside the neo4j.conf file; since the version 4.0.7 we introduced a new configuration systems based on dynamic reloading that relies on the streams.conf file inside $NEO4J_HOME/conf.

Breaking Changes

We deprecated the neo4j.conf file based configuration, so you need to define a new streams.conf file inside $NEO4J_HOME/conf and put in there all the required configuration.

Note about usage with Docker

The official Neo4j Docker image uses a particular naming convention for environment variables in order to transform them into properties inside the neo4j.conf file. In order to be compliant with that behaviour you can still use them without changing anything in your configuration, under-the-hood from version 4.0.7 system will save them inside the streams.conf file instead.

How it behaves

You can interact with the new configuration system in two ways:

  • By changing the streams.conf manually

  • By applying new configurations via procedures

You must consider that every change applied to the configuration causes the reload of the plugins so you must use this feature very carefully.

File based changes

Every change inside the streams.conf is gathered and reloads the Streams Sink and Source with the new configuration.

Procedure based changes

From version 4.0.7 you’ll find three new procedures:

  • streams.configuration.get returns the current configuration

  • streams.configuration.set({<plugin_config_map>}, {<procedure_config>}) that applies the new configuration and returns it

  • streams.configuration.remove({<plugin_config_keys_list>}, {<procedure_config>}) that removes the provided configuration keys and returns the new configuration status

N.B. These procedures work for each database instance this means that if you’re changing a property that affected all databases that are running a Streams module will be notified and restarted.

streams.configuration.get

This procedure returns the current configuration applied to both Sink and Source plugin.

Output Parameters:

Variable Name Description

name

The configuration name

value

The configuration value

So given the following procedure call:

CALL streams.configuration.get() YIELD name, value
RETURN name, value

You’ll have this following output (it’s related to your configuration 😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

streams.configuration.set

This procedure applies the map of configuration parameters passed as first argument.

Input Parameters:

Variable Name Description

plugin_config_map

This map represents the set of configurations applied to the Sink and the Source

procedure_config

The configuration map

Output Parameters:

Variable Name Description

name

The configuration name

value

The configuration value

Following the accepted parameters for the procedure_config:

Configuration Name Description

save

(Boolean, default true) if persist or not the configuration into the file in order to have it back once the database is restarted for whatever reason

So given the following procedure call:

CALL streams.configuration.set({`streams.sink.topic.cypher.test`: 'CREATE (p:Person{name: event.name, surname: event.surname, fullName: event.name + ' ' + event.surname})'}, {save: false}) YIELD name, value
RETURN name, value

You’ll have this following output (it’s related to your configuration 😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

streams.configuration.remove

This procedure removes the provided list of keys from the configuration.

Input Parameters:

Variable Name Description

plugin_config_keys_list

This list represents the properties set that will be removed from the configuration.

procedure_config

The configuration map

Output Parameters:

Variable Name Description

name

The configuration name

value

The configuration value

Following the accepted parameters for the procedure_config:

Configuration Name Description

save

(Boolean, default true) if persist or not the configuration into the file in order to have it back once the database is restarted for whatever reason

So given the following procedure call:

CALL streams.configuration.remove([`kafka.acks`], {save: false}) YIELD name, value
RETURN name, value

You’ll have this following output (it’s related to your configuration 😄)

name value

"streams.sink.topic.cypher.test"

"CREATE (p:Person{name: event.name, surname: event.surname})"

"streams.sink.errors.tolerance"

"all"

"kafka.default.api.timeout.ms"

"5000"

"kafka.bootstrap.servers"

"broker:9093"

"streams.sink.errors.log.include.messages"

"true"

"streams.sink.enabled"

"true"

"streams.sink.errors.log.enable"

"true"

What happens when we change a configuration properties from procedure

When we change the configuration properties from streams.configuration.set/remove, under-the-hood Sink and Source modules are reloaded. So use it carefully because it has an impact in your Stream flow.

N.b. The Source/Sink module will be restarted only if there are changes in the configuration related to itself; this means that if you have both active and you change properties related to the Sink, only it will be restarted.

What happens into the Source module

During the reload process the transaction event handler gets unplugged, this means that all transaction that even happen during reload period are not caught by the Source, so they are lost.

What happens into the Sink module

During the reload process the Sink gets stopped, this should not have any impact in your ingestion process because it will restart from the last committed messages, so there is no data loss.