Pattern strategy

This strategy allows you to extract nodes and relationships from a kafka message through extraction patterns.

Configuration

To configure a pattern strategy for a desired topic, you must follow the following convention:

"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"

For instance, given the following message published to the user and lives_in topics:

{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}

You can transform it into a node by providing the following configuration:

"neo4j.pattern.topic.user": "(:User{!id: userId})"

You can also provide relationship patterns to transform it into a path, like (n)-[r]→(m), by providing the following configuration:

"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
Relationship key properties can only be used with Neo4j Enterprise Edition 5.7 and above, and AuraDB 5.

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.pattern.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.pattern.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

Patterns

Node Patterns

Node patterns are defined similar to Cypher node patterns.

  1. Start with (.

  2. Define optional list of labels, separated by :, such as :Person or :Person:Employee.

  3. Open property section with {.

  4. Define properties to be treated as keys, each prepended with !, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of userId: __key.user.id. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

  5. Either;

    1. Nothing or *, meaning that assign all properties from message to the node.

    2. List of property names to be assigned to the node from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of userName: __value.user.name. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

    3. List of property names not to be assigned to the node, each prepended with -, all other properties from message will be assigned to the node.

  6. Close property section with }.

  7. End with ).

You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contain either all exclusion or inclusion properties.

Examples

  • MERGE operation on User label with userId treated as a key and assigning all properties from incoming message value to the node

    (:User{!userId})

    or

    (:User{!userId, *})
  • MERGE operation on User label with userId treated as a key and assigning only surname property from incoming message to the node

    (:User{!userId, surname})
  • MERGE operation on User label with userId treated as a key and assigning only surname and address.city properties from incoming message to the node

    (:User{!userId, surname, city: address.city})
  • MERGE operation on User label with userId treated as key and assigning all properties excluding address property from incoming message to the node

    (:User{!userId, -address})
  • MERGE operation on User label with userId treated as key which will be taken from the key part of the message and assigning only name and surname properties from the value part of incoming message to the node

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
  • MERGE operation on User label with userId treated as key which will be taken from the key part of incoming message and assigning only name, surname properties from the value part, createdBy from the header and createdAt property as current timestamp to the node

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})

Relationship Patterns

Relationship patterns are defined similar to Cypher relationship patterns.

  1. Node pattern for start node.

  2. -[

  3. Define relationship type, prepended by :, such as :BOUGHT or :KNOWS.

  4. Open property section with {.

  5. [Optional] Define properties to be treated as keys, each prepended with !, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of relationshipId: __key.relationship.id. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

  6. Either;

    1. Nothing or *, meaning that assign all properties from message to the relationship.

    2. List of property names to be assigned to the relationship from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of relationshipType: __value.relationship.type. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

    3. List of property names not to be assigned to the relationship, each prepended with -, all other properties from message will be assigned to the relationship.

  7. Close property section with }.

  8. ]->

  9. Node pattern for end node.

You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contains all exclusion or inclusion properties.

Examples

  1. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning all other message properties to the relationship

    (:User{!userId})-[:BOUGHT]->(:Product{!productId})
  2. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only price and currency properties from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
  3. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only price, currency and shippingAddress.city properties from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
  4. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning all properties excluding shippingAddress from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
  5. MERGE operation on User and Product labels with userId and productId treated as keys respectively, assign userFirstName and userLastName properties from the message to the User node and MERGE a BOUGHT relationship between them assigning only price and currency properties from the message to the relationship

    (:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
  6. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only transactionId property as a key property from key part of the message and date from value part of the message to the relationship

    (:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})

Tombstone records

The pattern strategy supports tombstone records. In order to use it, message key should contain at least the key properties present in the provided pattern and message value should be set as null.

It is not possible to define multiple patterns for a single topic, such as extracting more than one node or relationship type from a single message. In order to achieve this, you have to use a different topic for each pattern.