CUD File Format Strategy
The CUD file format is a JSON file that represents operations (Create, Update, Delete) performed on nodes or relationships.
Configuration
You can configure the topic in the following way:
"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"
For instance, if you have two topics, topic.1 and topic.2 that you publish as CUD-formatted messages, you can configure the sink instance as follows.
"neo4j.cud.topics": "topic.1,topic.2"
Creating Sink instance
Based on the above example, you can use one of the following configurations.
Pick one of the message serialization format examples and save it as a file named sink.cud.neo4j.json into a local directory.
Load the configuration into the Kafka Connect with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.cud.neo4j.json
Now you can access your Confluent Control Center instance under http://localhost:9021/clusters.
Verify that the configured connector instance is running in the Connect tab under connect-default.
|
The CDC Source connector provides JMX metrics to monitor the status of the data ingestion. See Source Monitoring for more information. |
CUD Format Specification
We have two formats:
-
Node -
Relationship
Node
The Node format defines a set of fields which describes the operation (create, update, merge and delete), label and properties of a node entity.
| Field | Mandatory | Description | ||
|---|---|---|---|---|
op |
Mandatory |
The operation type.
One of
|
||
properties |
Optional when operation is |
The properties attached to the node. |
||
ids |
Optional when operation is |
Contains the primary/unique key properties that will be used to look up the entity.
In case you use
|
||
labels |
Optional |
The labels attached to the node.
|
||
type |
Mandatory |
The entity type: |
||
detach |
Optional |
When operation is
|
Examples
-
An example of a
CREATEoperation;{ "type": "node", "op": "create", "labels": ["Foo", "Bar"], "properties": { "id": 1, "foo": "foo-value" } }which would be transformed into the following Cypher query:
CREATE (n:Foo:Bar) SET n = $properties -
An example of a
UPDATEoperation;{ "type": "node", "op": "update", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties -
An example of a
MERGEoperation;{ "type": "node", "op": "merge", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "properties": { "id": 1, "foo": "foo-value" } }which would be transformed into the following Cypher query:
MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties -
An example of a
DELETEoperation;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 } }which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) DELETE n -
An example of a
DELETEoperation with detachtrue;{ "type": "NODE", "op": "delete", "labels": ["Foo", "Bar"], "ids": { "id": 0 }, "detach": true }which would be transformed into the following Cypher query:
MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n
Relationship
Relationship format defines a set of fields which describes the operation (create, update, merge and delete), relationship type, source and target node references and properties.
| Field | Mandatory | Description | ||
|---|---|---|---|---|
op |
Mandatory |
The operation type. One of |
||
properties |
Optional when operation is |
The properties attached to the relationship. |
||
rel_type |
Mandatory |
The relationship type. |
||
ids |
Optional |
Contains the primary/unique key properties that will be used to look up the relationship. |
||
from |
Mandatory |
Contains information about the source node of the relationship.
|
||
to |
Mandatory |
Contains information about the target node of the relationship.
|
||
type |
Mandatory |
The entity type: |
Examples
-
An example of a
CREATEoperation;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties -
An example of a
CREATEoperation withmergingthe source node;{ "type": "relationship", "op": "create", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 }, "op": "merge" }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "match" }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MERGE (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end CREATE (start)-[r:RELATED_TO]->(end) SET r = $properties -
An example of a
UPDATEoperation;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) SET r += $properties -
An example of a
UPDATEoperation withrelationship ids;{ "type": "relationship", "op": "update", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties -
An example of a
MERGEoperation;{ "type": "relationship", "op": "merge", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: $to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO]->(end) SET r += $properties -
An example of a
MERGEoperation withrelationship ids;{ "type": "relationship", "op": "MERGE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 }, "op": "merge" }, "ids": { "id": 5 }, "properties": { "by": "incident" } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MERGE (end:Bar {id: to.ids.id}) WITH start, end MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end) SET r += $properties -
An example of a
DELETEoperation;{ "type": "relationship", "op": "delete", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO]->(end) DELETE r -
An example of a
DELETEoperation withrelationship ids;{ "type": "relationship", "op": "DELETE", "rel_type": "RELATED_TO", "from": { "labels": ["Foo"], "ids": { "id": 0 } }, "to": { "labels": ["Bar"], "ids": { "id": 1 } }, "ids": { "id": 5 } }which would be transformed into the following Cypher query:
MATCH (start:Foo {id: $from.ids.id}) WITH start MATCH (end:Bar {id: $to.ids.id}) WITH start, end MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end) DELETE r
Batching of CUD events
Starting from version 5.4.0, a batched sink handler is available for the CUD strategy to provide better throughput when processing CUD events. This handler processes messages in batches and applies them to the database more efficiently.
There are currently two batching strategies used by the CUD sink handler:
-
APOC Core strategy: This strategy uses the
apoc.cypher.doItprocedure to execute batches of Cypher statements generated from the CUD events. -
Native strategy: This strategy generates a single Cypher statement containing multiple subqueries for each message in the batch, and executes it using the standard Cypher execution engine. The maximum number of subqueries included in a single batch when using the native strategy can be configured using the
neo4j.max-batched-queriessetting, which defaults to50.
The connector defaults to the APOC core batching strategy when available, falling back to the native strategy when APOC Core is not available on the target database.
Batch size, which applies to both batching strategies, can be configured using the neo4j.batch-size setting, which defaults to 1000.
Exactly-once semantics
Starting from version 5.4.0, the CUD sink handler can be configured to achieve exactly-once processing guarantees by keeping track of the last successfully processed message offset in the database. This is achieved by storing a dedicated node representing the offset of the last successfully processed message.
Use the neo4j.eos-offset-label setting to specify a label name to attach to the offset node.
This setting is empty by default, meaning that exactly-once semantics are disabled and the connector will not keep track of processed message offsets in the target database, meaning that at-least-once processing guarantees are provided.
|
Make sure that the configured label has a node key constraint defined on the For example, if your
|