CUD File Format Strategy

The CUD file format is a JSON file that represents operations (Create, Update, Delete) performed on nodes or relationships.

Configuration

You can configure the topic in the following way:

"neo4j.cud.topics": "<COMMA_SEPARATED_LIST_OF_TOPICS>"

For instance, if you have two topics, topic.1 and topic.2 that you publish as CUD-formatted messages, you can configure the sink instance as follows.

"neo4j.cud.topics": "topic.1,topic.2"

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.cud.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "topic.1,topic.2",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cud.topics": "topic.1,topic.2"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cud.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

CUD Format Specification

We have two formats:

  • Node

  • Relationship

Node

The Node format defines a set of fields which describes the operation (create, update, merge and delete), label and properties of a node entity.

Table 1. Node format fields
Field Mandatory Description

op

Mandatory

The operation type. One of create, merge, update or delete.

The delete operation is for individual nodes only. It is not intended to run generic Cypher queries from JSON.

properties

Optional when operation is delete.

The properties attached to the node.

ids

Optional when operation is create.

Contains the primary/unique key properties that will be used to look up the entity. In case you use _elementId (or _id) as a property name, it will use Neo4j’s internal element id for the node lookup.

If you use _elementId (or _id) with merge operations, it will behave as a simple update, i.e. if the node with the given element id does not exist, it will not be created.

labels

Optional

The labels attached to the node.

Although Neo4j allows to create nodes without labels, it’s not advised to do so from a performance perspective.

type

Mandatory

The entity type: node.

detach

Optional

When operation is delete, you can specify whether to perform a "detach" delete.

If no value is provided, the default is false.

Examples

  • An example of a CREATE operation;

    {
      "type": "node",
      "op": "create",
      "labels": ["Foo", "Bar"],
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    CREATE (n:Foo:Bar) SET n = $properties
  • An example of a UPDATE operation;

    {
      "type": "node",
      "op": "update",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • An example of a MERGE operation;

    {
      "type": "node",
      "op": "merge",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "properties": {
        "id": 1,
        "foo": "foo-value"
      }
    }

    which would be transformed into the following Cypher query:

    MERGE (n:Foo:Bar {id: $ids.id}) SET n += $properties
  • An example of a DELETE operation;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) DELETE n
  • An example of a DELETE operation with detach true;

    {
      "type": "NODE",
      "op": "delete",
      "labels": ["Foo", "Bar"],
      "ids": {
        "id": 0
      },
      "detach": true
    }

    which would be transformed into the following Cypher query:

    MATCH (n:Foo:Bar {id: $ids.id}) DETACH DELETE n

Relationship

Relationship format defines a set of fields which describes the operation (create, update, merge and delete), relationship type, source and target node references and properties.

Table 2. Relationship format fields
Field Mandatory Description

op

Mandatory

The operation type. One of create, merge, update or delete.

properties

Optional when operation is delete.

The properties attached to the relationship.

rel_type

Mandatory

The relationship type.

ids

Optional

Contains the primary/unique key properties that will be used to look up the relationship.

from

Mandatory

Contains information about the source node of the relationship. op field can only be merge or match and by default it’s match. For the description of the ids and labels fields, please look at the node fields described above.

If you use _elementId (or _id) field reference in ids, you can omit labels field.

to

Mandatory

Contains information about the target node of the relationship. op field can only be merge or match and by default it’s match. For the description of the ids and labels fields, please look at the node fields described above.

If you use _elementId (or _id) field reference in ids, you can omit labels field.

type

Mandatory

The entity type: relationship.

Examples

  • An example of a CREATE operation;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • An example of a CREATE operation with merging the source node;

    {
      "type": "relationship",
      "op": "create",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        },
        "op": "merge"
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "match"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MERGE (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    CREATE (start)-[r:RELATED_TO]->(end)
    SET r = $properties
  • An example of a UPDATE operation;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • An example of a UPDATE operation with relationship ids;

    {
      "type": "relationship",
      "op": "update",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • An example of a MERGE operation;

    {
      "type": "relationship",
      "op": "merge",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: $to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO]->(end)
    SET r += $properties
  • An example of a MERGE operation with relationship ids;

    {
      "type": "relationship",
      "op": "MERGE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        },
        "op": "merge"
      },
      "ids": {
        "id": 5
      },
      "properties": {
        "by": "incident"
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MERGE (end:Bar {id: to.ids.id}) WITH start, end
    MERGE (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    SET r += $properties
  • An example of a DELETE operation;

    {
      "type": "relationship",
      "op": "delete",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO]->(end)
    DELETE r
  • An example of a DELETE operation with relationship ids;

    {
      "type": "relationship",
      "op": "DELETE",
      "rel_type": "RELATED_TO",
      "from": {
        "labels": ["Foo"],
        "ids": {
          "id": 0
        }
      },
      "to": {
        "labels": ["Bar"],
        "ids": {
          "id": 1
        }
      },
      "ids": {
        "id": 5
      }
    }

    which would be transformed into the following Cypher query:

    MATCH (start:Foo {id: $from.ids.id}) WITH start
    MATCH (end:Bar {id: $to.ids.id}) WITH start, end
    MATCH (start)-[r:RELATED_TO {id: $ids.id}]->(end)
    DELETE r