Source Configuration

The documentation on the deprecated Neo4j Streams plugin can be found here.

In this chapter we’ll discuss how the Source instance is configured.

Create the Source Instance

You can create a new Source instance with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @source.avro.neo4j.json

Let’s look at the source.avro.neo4j.json file:

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "topic": "my-topic",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.property": "timestamp",
    "neo4j.streaming.from": "LAST_COMMITTED",
    "neo4j.enforce.schema": true,
    "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp"
  }
}

This will create a Kafka Connect Source instance that will send AVRO message over the topic named my-topic. Every message in the topic will have the following structure:

{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}

Please check the Configuration Summary for a detailed guide about the supported configuration parameters.

How the Source module pushes the data to the defined Kafka topic

Timestamps aren’t necessarily unique. As a result:

  • this mode can’t guarantee that all updated data will be delivered. For example, if two rows share the same timestamp, and only one has been pushed to the kafka topic before a failure, the second update will be missed when the system recovers.

  • you could find duplicated events into the topic because events are sent to the topic every time the condition ts.timestamp > $lastCheck is satisfied. To manage this duplication you can implement some sort of "records filtering" in your consumer application or leveraging the Kafka SMT (Single Message Transformation) functions. See [here] (https://docs.confluent.io/platform/current/connect/transforms/overview.html) for further details. These functions allow you to transform the records in your topic and redirect them into a new topic (the one where your consumer application is reading from).

We use the query provided in the neo4j.source.query field by polling the database every value is into the neo4j.streaming.poll.interval.msecs field.

So given the JSON configuration we have that we’ll perform:

MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp

every 5000 milliseconds by publishing events like:

{"name":{"string":"John Doe"},"timestamp":{"long":1624551349362}}

In this case we use neo4j.enforce.schema=true and this means that we will attach a schema for each record, in case you want to stream pure simple JSON strings just use the relative serializer with neo4j.enforce.schema=false with the following output:

{"name": "John Doe", "timestamp": 1624549598834}