Change Data Capture strategy

Change Data Capture strategy makes use of the Change Data Capture feature provided by Neo4j and Aura Enterprise 5, and is the preferred strategy for source connector instances as it doesn’t require any schema changes and can capture deletions reliably. Please make sure that you take the necessary steps described at Change Data Capture > Getting Started before configuring your source instance with this strategy.

In order to configure this strategy, you need to define patterns and selectors that describe which nodes or relationships you want to track changes and assign them to topics.

When a database is restored from a backup in on-prem installations or restored from a snapshot, or paused and resumed in Neo4j Aura, existing change identifiers will no longer work, and you need to re-configure your Source instances from scratch. Refer to Change Data Capture > Restore backups and snapshots for more information.

Configuration

First, you need to select CDC strategy for the connector instance;

"neo4j.source-strategy": "CDC"

Second, you need to define your patterns and map them to your topics;

"neo4j.cdc.topic.my-topic.patterns": "(:Person),(:Person)-[:KNOWS]-(:Person)"

While the above configuration is provided for convenience, if you need to define other filters for change events, such as operation, changed property names or metadata fields, you will need to use the indexed configuration approach as shown below;

"neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)", (1)
"neo4j.cdc.topic.my-topic.patterns.0.operation": "create", (2)
"neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname", (3)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j", (4)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j", (5)
"neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales", (6)
"neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
"neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
"neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
"neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
1 A single pattern that identifies graph entities to monitor for changes.
2 A single operation that we are interested in, can be create, update or delete.
3 A list of properties that needs to be updated for a change message to be returned. Addition and deletion of properties also count as updated.
4 Authenticated user who performed the change.
5 Executing user who performed the change. Usually will be the same as authenticated user, but could be different if impersonation is used.
6 A key value pair that needs to be matched against the transaction metadata of the transaction that performed the change.
Only pattern settings are mandatory in the above example, and others are optional and can be added based on your requirements.

Creating Source instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named source.cdc.neo4j.json into a local directory.

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.my-topic.patterns.0.pattern": "(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.0.operation": "create",
    "neo4j.cdc.topic.my-topic.patterns.0.changesTo": "name,surname",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.0.metadata.txMetadata.app": "sales",
    "neo4j.cdc.topic.my-topic.patterns.1.pattern": "(:Person)-[:KNOWS]->(:Person)",
    "neo4j.cdc.topic.my-topic.patterns.1.operation": "update",
    "neo4j.cdc.topic.my-topic.patterns.1.changesTo": "since",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.authenticatedUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.executingUser": "neo4j",
    "neo4j.cdc.topic.my-topic.patterns.1.metadata.txMetadata.app": "sales"
  }
}

We will now create the source instance by invoking the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.cdc.neo4j.json

This will create a Kafka Connect source instance that will send change event messages matching the provided selectors over to the my-topic topic, using your preferred serialization format. In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.

Patterns

Node Patterns

Node patterns are defined similar to Cypher node patterns.

  1. Start with (.

  2. [Optional] Define optional list of labels, separated by :, such as :Person or :Person:Employee.

  3. [Optional] Open property section with {.

    1. [Optional] Define properties to be used as key filters, with their values, in key: value format. Multiple properties can be defined and must be separated by ,. These properties must correspond to a NODE KEY constraint properties.

    2. Either;

      1. Nothing or *, meaning that assign all properties from JSON object to the node.

      2. List of property names, separated by ,, to be assigned to the node from JSON object.

      3. List of property names not to be assigned to the node, each prepended with - and separated by ,, all other properties from JSON object will be assigned to the node.

    3. Close property section with }.

  4. End with ).

You cannot mix inclusion and exclusion properties so your pattern must contain either all exclusion or inclusion properties.

Examples

  • Select all changes on any node.

    ()
  • Select all changes on nodes with label :User.

    (:User)
  • Select all changes on nodes with both labels :User and :Employee.

    (:User:Employee)
  • Select all changes on nodes with label :User and only include name and surname properties in the change event.

    (:User{name, surname})
  • Select all changes on nodes with label :User and exclude adress and dob properties in the change event.

    (:User{-address, -dob})
  • Select all changes on node with label :User and key property userId equals 1001 and include name and surname properties in the change event.

    (:User{userId: 1001, name, surname})
    This example requires a NODE KEY constraint on userId property for :User label.
  • Select all changes on node with both labels :User and Employee, and key properties name equals john and surname equals doe.

    (:User:Employee{name: 'john', surname: 'doe'})
    This example requires a NODE KEY constraint on name and surname properties either for :User or :Employee label or both.

Relationship Patterns

Relationship patterns are defined similar to Cypher relationship patterns.

  1. Node pattern for start node, without any property inclusion or exclusion list.

  2. -[.

  3. Define relationship type, prepended by :, such as :BOUGHT or :KNOWS.

  4. [Optional] Open property section with {.

    1. [Optional] Define properties to be used as key filters, with their values, in key: value format.

    2. Either;

      1. Nothing or *, meaning that assign all properties from JSON object to the node.

      2. List of property names, separated by ,, to be assigned to the node from JSON object.

      3. List of property names not to be assigned to the node, each prepended with - and separated by ,, all other properties from JSON object will be assigned to the node.

    3. Close property section with }.

  5. ]->.

  6. Node pattern for end node, without any property inclusion or exclusion list.

You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties.

Examples

  • Select all changes on :BOUGHT relationships.

    ()-[:BOUGHT]->()
  • Select all changes on :BOUGHT relationships with start nodes of label :User and end nodes of label :Product.

    (:User)-[:BOUGHT]->(:Product)
  • Select all changes on :BOUGHT relationships with start nodes of labels :User and :Employee and end nodes of label :Product.

    (:User:Employee)-[:BOUGHT]->(:Product)
  • Select all changes on :BOUGHT relationships with start nodes of label :User and end nodes of label :Product and only include price and currency properties in the change event.

    (:User)-[:BOUGHT{price, currency}]->(:Product)
  • Select all changes on :BOUGHT relationships with start nodes of label :User and end nodes of label :Product and exclude card property from the change event.

    (:User)-[:BOUGHT{-card}]->(:Product)
  • Select all changes on :WORKS_FOR relationship identified by key property contractId equals 5910.

    ()-[:WORKS_FOR{contractId: 5910}]->()
    This example requires a RELATIONSHIP KEY constraint on contractId property for :WORKS_FOR relationship type.
  • Select all changes on :WORKS_FOR relationship identified by key property contractId equals 5910 and exclude salary property from the change event.

    ()-[:WORKS_FOR{contractId: 5910,-salary}]->()
  • Select all changes on relationships starting from node of label :User identified by key property userId equals 1001.

    (:User{userId: 1001})-[]->()