CUD File Format Strategy

The CUD file format is a JSON file that represents operations (Create, Update, Delete) performed on nodes or relationships.

Configuration

You can configure the topic in the following way:

"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

For instance, if you have two topics, topic.1 and topic.2 that you publish as CUD-formatted messages, you can configure the sink instance as follows.

"neo4j.cud.topics": "topic.1,topic.2"

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.cud.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cud.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

The CDC Source connector provides JMX metrics to monitor the status of the data ingestion. See Source Monitoring for more information.

CUD Format Specification

We have two formats:

  • Node

  • Relationship

Node

The Node format defines a set of fields which describes the operation (create, update, merge and delete), label and properties of a node entity.

Table 1. Node format fields
Field Mandatory Description

op

Mandatory

The operation type. One of create, merge, update or delete.

The delete operation is for individual nodes only. It is not intended to run generic Cypher queries from JSON.

properties

Optional when operation is delete.

The properties attached to the node.

ids

Optional when operation is create.

Contains the primary/unique key properties that will be used to look up the entity. In case you use _elementId (or _id) as a property name, it will use Neo4j’s internal element id for the node lookup.

If you use _elementId (or _id) with merge operations, it will behave as a simple update, i.e. if the node with the given element id does not exist, it will not be created.

labels

Optional

The labels attached to the node.

Although Neo4j allows to create nodes without labels, it’s not advised to do so from a performance perspective.

type

Mandatory

The entity type: node.

detach

Optional

When operation is delete, you can specify whether to perform a "detach" delete.

If no value is provided, the default is false.

Examples

  • An example of a CREATE operation;

    {
      "type": "node",
      "op": "create",
      "labels": ["Foo", "Bar"],
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    CREATE (n:Foo:Bar) SET n = $properties
  • An example of a UPDATE operation;

    {
      "type": "node",
      "op": "update",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • An example of a MERGE operation;

    {
      "type": "node",
      "op": "merge",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • An example of a DELETE operation;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
  • An example of a DELETE operation with detach true;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "detach": true
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n

Relationship

Relationship format defines a set of fields which describes the operation (create, update, merge and delete), relationship type, source and target node references and properties.

Table 2. Relationship format fields
Field Mandatory Description

op

Mandatory

The operation type. One of create, merge, update or delete.

properties

Optional when operation is delete.

The properties attached to the relationship.

rel_type

Mandatory

The relationship type.

ids

Optional

Contains the primary/unique key properties that will be used to look up the relationship.

from

Mandatory

Contains information about the source node of the relationship. op field can only be merge or match and by default it’s match. For the description of the ids and labels fields, please look at the node fields described above.

If you use _elementId (or _id) field reference in ids, you can omit labels field.

to

Mandatory

Contains information about the target node of the relationship. op field can only be merge or match and by default it’s match. For the description of the ids and labels fields, please look at the node fields described above.

If you use _elementId (or _id) field reference in ids, you can omit labels field.

type

Mandatory

The entity type: relationship.

Examples

  • An example of a CREATE operation;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • An example of a CREATE operation with merging the source node;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        },
        "op": "merge"
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "match"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MERGE (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • An example of a UPDATE operation;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • An example of a UPDATE operation with relationship ids;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • An example of a MERGE operation;

    {
      "type": "relationship",
      "op": "merge",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • An example of a MERGE operation with relationship ids;

    {
      "type": "relationship",
      "op": "MERGE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • An example of a DELETE operation;

    {
      "type": "relationship",
      "op": "delete",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    DELETE r
  • An example of a DELETE operation with relationship ids;

    {
      "type": "relationship",
      "op": "DELETE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "ids": {
        "id": 5
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    DELETE r

Batching of CUD events

Starting from version 5.4.0, a batched sink handler is available for the CUD strategy to provide better throughput when processing CUD events. This handler processes messages in batches and applies them to the database more efficiently.

There are currently two batching strategies used by the CUD sink handler:

  1. APOC Core strategy: This strategy uses the apoc.cypher.doIt procedure to execute batches of Cypher statements generated from the CUD events.

  2. Native strategy: This strategy generates a single Cypher statement containing multiple subqueries for each message in the batch, and executes it using the standard Cypher execution engine. The maximum number of subqueries included in a single batch when using the native strategy can be configured using the neo4j.max-batched-queries setting, which defaults to 50.

The connector defaults to the APOC core batching strategy when available, falling back to the native strategy when APOC Core is not available on the target database.

Batch size, which applies to both batching strategies, can be configured using the neo4j.batch-size setting, which defaults to 1000.

Exactly-once semantics

Starting from version 5.4.0, the CUD sink handler can be configured to achieve exactly-once processing guarantees by keeping track of the last successfully processed message offset in the database. This is achieved by storing a dedicated node representing the offset of the last successfully processed message.

Use the neo4j.eos-offset-label setting to specify a label name to attach to the offset node. This setting is empty by default, meaning that exactly-once semantics are disabled and the connector will not keep track of processed message offsets in the target database, meaning that at-least-once processing guarantees are provided.

Make sure that the configured label has a node key constraint defined on the strategy, topic and partition node properties.

For example, if your neo4j.eos-offset-label is set to __KafkaOffset, the following constraint must appear on the target database:

CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS FOR (n:__KafkaOffset) REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY