Query strategy

Query strategy allows users to define their own Cypher query to extract changes. This requires proper schema modifications, such as tracking changes through a dedicated change tracking property such as timestamps on nodes or relationships or using soft-deletes to track deletion of entities.

Configuration

First, you need to select QUERY strategy for the connector instance;

"neo4j.source-strategy": "QUERY"

Second, you need to define your query to track changes and where to publish them.

"neo4j.query.topic": "my-topic", (1)
"neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp", (2)
"neo4j.query.streaming-property": "timestamp" (3)
1 Topic name which will receive the message.
2 A Cypher query that returns changed entities since the last iteration, sent in by $lastCheck parameter.
3 The property (field name) that we use as a cursor to track changes. This needs to be part of the returned results.

Creating Source instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named source.query.neo4j.json into a local directory.

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorJSONString",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "org.apache.kafka.connect.storage.StringConverter",
    "key.converter.schemas.enable": false,
    "value.converter": "org.apache.kafka.connect.storage.StringConverter",
    "value.converter.schemas.enable": false,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "QUERY",
    "neo4j.start-from": "NOW",
    "neo4j.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.surname AS surname, ts.timestamp AS timestamp",
    "neo4j.query.streaming-property": "timestamp",
    "neo4j.query.topic": "test-source",
    "neo4j.query.polling-interval": "1s",
    "neo4j.query.polling-duration": "5s"
  }
}

We will now create the source instance by invoking the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.query.neo4j.json

This will create a Kafka Connect source instance that will send change event messages derived by the provided query over to the my-topic topic, using your preferred serialization format. In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.

Generated change event messages in this case will have the following structure:

{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}