Change Data Capture Strategy

This strategy allows to ingest CDC events coming from another Neo4j instance, generated either by a source connector instance configured for Change Data Capture strategy or the deprecated Neo4j Streams plugin.

Change Data Capture events needs to be generated by the same corresponding version of the Source connector, which must be configured using a value converter that supports schemas.

Two sub-strategies are available:

  • The Schema strategy merges nodes and relationships by the constraints (node key, relationship key and/or property uniqueness + existence) defined in the source database.

  • The Source ID strategy merges nodes and relationships by the CDC event’s elementId or id fields (internal Neo4j entity identifier).

Schema sub-strategy

The Schema strategy merges nodes and relationships using the constraints declared in the change event, thus preserving the source schema structure.

Configuration of this strategy requires declaration of list of topics to read change events from.

"neo4j.cdc.schema.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

Example

Given that you configure the topics your sink connector subscribes to as follows;

"topics": "topic.1,topic.2"

You need to declare that you want to use the cdc.schema strategy by providing the list of topics you want to consume change events from.

"neo4j.cdc.schema.topics": "topic.1,topic.2"

Each change event will then be projected into a graph entity.

Consider this node creation event:

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

The relationship is persisted as follows, with the Sink connector using the keys or schema fields in order to insert/update the nodes, without a need for extra properties or labels.

(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"}
(:Person {first_name: "John", last_name: "Doe", email: "john.doe@example.com"})

Consider this relationship creation event:

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

The relationship is persisted as follows, with the Sink connector using the keys fields of the start and end nodes from the change event to create or update the relationship, again without a need for extra properties or labels.

(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})
(:Person {last_name: "Doe", first_name: "John"})-[:KNOWS {since: "2012-01-01"}]->(:Person {last_name: "Doe", first_name: "Mary"})

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.cdc.schema.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.schema.topics": "topic.1,topic.2"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.schema.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

Source ID sub-strategy

The Source ID strategy merges nodes and relationships by the source entity’s elementId or id values, by storing this value as an explicit property on the target nodes and relationships and by marking nodes with an explicit label.

The configuration of this strategy requires declaring the list of topics to read change events from. You can add an optional label name to use as a marker, and an optional property name to store the elementId or id values of the source entities.

"neo4j.cdc.source-id.topics": "<comma-separated list of topics>"
"neo4j.cdc.source-id.label-name": "<the label attached to the node, default=SourceEvent>"
"neo4j.cdc.source-id.property-name": "<the property name given to the CDC id field, default=sourceId>"

Example

Given that you configure the topics your sink connector subscribes to as follows;

"topics": "topic.1,topic.2"

You need to declare that you want to use cdc.source-id strategy by providing the list of topics you want to consume change events from.

"neo4j.cdc.source-id.topics": "topic.1,topic.2"

Each change event will then be projected into a graph entity.

Consider this node creation event:

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAADAAAAAAAAAAA",
  "seq": 0,
  "txId": 12,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T20:51:56.769Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T20:51:56.714Z"
  },
  "event": {
    "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "eventType": "n",
    "operation": "c",
    "keys": {
      "Person": [
        {
          "first_name": "John",
          "last_name": "Doe"
        }
      ]
    },
    "labels": [
      "Person"
    ],
    "state": {
      "before": null,
      "after": {
        "labels": [
          "Person"
        ],
        "properties": {
          "email": "john.doe@example.com",
          "first_name": "John",
          "last_name": "Doe"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": [
        "Person"
      ],
      "properties": {
        "email": "john.doe@example.com",
        "last_name": "Doe",
        "first_name": "John"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [
      {
        "label": "Person",
        "properties": [
          "first_name",
          "last_name"
        ],
        "type": "UNIQUE"
      }
    ]
  }
}

The node is persisted as follows, with the Sink connector using the elementId or id fields of the node change event to create or update the node.

(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}
(:Person:SourceEvent {first_name: "John", last_name: "Doe", email: "john.doe@example.com", sourceId: "1004"})

Consider this relationship creation event:

{
  "id": "A3Qc5ZIZ_Eo5v5xsONVo8KUAAAAAAAAAFAAAAAAAAAAA",
  "txId": 20,
  "seq": 0,
  "metadata": {
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "192.168.65.1:46246",
    "connectionServer": "172.17.0.2:7687",
    "connectionType": "bolt",
    "databaseName": "neo4j",
    "executingUser": "neo4j",
    "serverId": "7528cb82",
    "txCommitTime": "2024-03-03T21:34:02.965Z",
    "txMetadata": {
      "app": "cypher-shell_v5.6.0",
      "type": "user-direct"
    },
    "txStartTime": "2024-03-03T21:34:02.867Z"
  },
  "event": {
    "elementId": "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
    "type": "KNOWS",
    "eventType": "r",
    "operation": "c",
    "start": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0",
      "keys": {
        "Person": [
          {
            "first_name": "John",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "end": {
      "elementId": "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1",
      "keys": {
        "Person": [
          {
            "first_name": "Mary",
            "last_name": "Doe"
          }
        ]
      },
      "labels": [
        "Person"
      ]
    },
    "keys": [],
    "state": {
      "before": null,
      "after": {
        "properties": {
          "since": "2012-01-01"
        }
      }
    }
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.example.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": [
        "Person"
      ],
      "id": "123",
      "ids": {
        "last_name": "Doe",
        "first_name": "John"
      }
    },
    "end": {
      "labels": [
        "Person"
      ],
      "id": "456",
      "ids": {
        "last_name": "Doe",
        "first_name": "Mary"
      }
    },
    "after": {
      "properties": {
        "since": "2012-01-01"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "LocalDateTime"
    },
    "constraints": [
      {
        "label": "KNOWS",
        "properties": [
          "since"
        ],
        "type": "RELATIONSHIP_PROPERTY_EXISTS"
      }
    ]
  }
}

The relationship is persisted as follows, with the Sink connector using the elementId or id fields of the start and end nodes from the change event to create or update the relationship.

(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"})-[:KNOWS {since: "2012-01-01", sourceId: "5:741ce592-19fc-4a39-bf9c-6c38d568f0a5:0"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "4:741ce592-19fc-4a39-bf9c-6c38d568f0a5:1"})
(:Person:SourceEvent {last_name: "Doe", first_name: "John", sourceId: "123"})-[:KNOWS {since: "2012-01-01", sourceId: "123"}]->(:Person:SourceEvent {last_name: "Doe", first_name: "Mary", sourceId: "456"})

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.cdc.source-id.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cdc.source-id.topics": "topic.1,topic.2",
    "neo4j.cdc.source-id.label-name": "SourceEvent",
    "neo4j.cdc.source-id.property-name": "sourceId"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cdc.source-id.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.