Pattern strategy

This strategy allows you to extract nodes and relationships from a kafka message through extraction patterns.

Configuration

To configure a pattern strategy for a desired topic, you must follow the following convention:

"neo4j.pattern.topic.<YOUR_TOPIC>": "<PATTERN>"

For instance, given the following message published to the user and lives_in topics:

{"userId": 1, "name": "John", "surname": "Doe", "address": {"since": "2012-05", "city": "London", "country": "UK"}}

You can transform it into a node by providing the following configuration:

"neo4j.pattern.topic.user": "(:User{!id: userId})"

You can also provide relationship patterns to transform it into a path, like (n)-[r]→(m), by providing the following configuration:

"neo4j.pattern.topic.lives_in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
Relationship key properties can only be used with Neo4j Enterprise Edition 5.7 and above, and AuraDB 5.

Creating Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.pattern.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "user,lives-in",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.pattern.topic.user": "(:User{!id: userId})",
    "neo4j.pattern.topic.lives-in": "(:User{!id: userId})-[:LIVES_IN{since}]->(:City{!name: address.city, !country: address.country})"
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.pattern.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

The CDC Source connector provides JMX metrics to monitor the status of the data ingestion. See Source Monitoring for more information.

Patterns

Node Patterns

Node patterns are defined similar to Cypher node patterns.

  1. Start with (.

  2. Define optional list of labels, separated by :, such as :Person or :Person:Employee.

  3. Open property section with {.

  4. Define properties to be treated as keys, each prepended with !, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of userId: __key.user.id. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

  5. Either;

    1. Nothing or *, meaning that assign all properties from message to the node.

    2. List of property names to be assigned to the node from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of userName: __value.user.name. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

    3. List of property names not to be assigned to the node, each prepended with -, all other properties from message will be assigned to the node.

  6. Close property section with }.

  7. End with ).

You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contain either all exclusion or inclusion properties.

Examples

  • MERGE operation on User label with userId treated as a key and assigning all properties from incoming message value to the node

    (:User{!userId})

    or

    (:User{!userId, *})
  • MERGE operation on User label with userId treated as a key and assigning only surname property from incoming message to the node

    (:User{!userId, surname})
  • MERGE operation on User label with userId treated as a key and assigning only surname and address.city properties from incoming message to the node

    (:User{!userId, surname, city: address.city})
  • MERGE operation on User label with userId treated as key and assigning all properties excluding address property from incoming message to the node

    (:User{!userId, -address})
  • MERGE operation on User label with userId treated as key which will be taken from the key part of the message and assigning only name and surname properties from the value part of incoming message to the node

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName})
  • MERGE operation on User label with userId treated as key which will be taken from the key part of incoming message and assigning only name, surname properties from the value part, createdBy from the header and createdAt property as current timestamp to the node

    (:User{!userId: __key.id, name: __value.firstName, surname: __value.lastName, createdBy: __header.username, createdAt: __timestamp})

Relationship Patterns

Relationship patterns are defined similar to Cypher relationship patterns.

  1. Node pattern for start node.

  2. -[

  3. Define relationship type, prepended by :, such as :BOUGHT or :KNOWS.

  4. Open property section with {.

  5. [Optional] Define properties to be treated as keys, each prepended with !, where at least one key property needs to be provided. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of relationshipId: __key.relationship.id. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

  6. Either;

    1. Nothing or *, meaning that assign all properties from message to the relationship.

    2. List of property names to be assigned to the relationship from message. Individual message fields can be explicitly referenced and assigned to a user defined property in the format of relationshipType: __value.relationship.type. By default, messages fields can be referenced as __timestamp, __headers, __key and __value.

    3. List of property names not to be assigned to the relationship, each prepended with -, all other properties from message will be assigned to the relationship.

  7. Close property section with }.

  8. ]->

  9. Node pattern for end node.

You cannot mix inclusion and exclusion inside patterns, i.e. your pattern must contains all exclusion or inclusion properties.

Examples

  1. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning all other message properties to the relationship

    (:User{!userId})-[:BOUGHT]->(:Product{!productId})
  2. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only price and currency properties from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{price,currency}]->(:Product{!productId})
  3. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only price, currency and shippingAddress.city properties from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{price,currency,shippingAddress.city}]->(:Product{!productId})
  4. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning all properties excluding shippingAddress from incoming message to the relationship

    (:User{!userId})-[:BOUGHT{-shippingAddress}]->(:Product{!productId})
  5. MERGE operation on User and Product labels with userId and productId treated as keys respectively, assign userFirstName and userLastName properties from the message to the User node and MERGE a BOUGHT relationship between them assigning only price and currency properties from the message to the relationship

    (:User{!userId, userFirstName, userLastName})-[:BOUGHT{price, currency}]->(:Product{!productId})
  6. MERGE operation on User and Product labels with userId and productId treated as keys respectively and MERGE a BOUGHT relationship between them assigning only transactionId property as a key property from key part of the message and date from value part of the message to the relationship

    (:User{!userId})-[:BOUGHT{!transactionId: __key.transaction.id, date: __value.transaction.date}]->(:Product{!productId})

Tombstone records

The pattern strategy supports tombstone records. In order to use it, message key should contain at least the key properties present in the provided pattern and message value should be set as null.

It is not possible to define multiple patterns for a single topic, such as extracting more than one node or relationship type from a single message. In order to achieve this, you have to use a different topic for each pattern.

Batching of pattern events

Starting from version 5.4.0, batched sink handlers are available for the Pattern strategy to provide better throughput when processing pattern events. These handlers process messages in batches and apply them to the database more efficiently.

There are currently two batching strategies used by the pattern sink handlers:

  1. APOC Core strategy: This strategy uses the apoc.cypher.doIt procedure to execute batches of Cypher statements generated from the pattern events.

  2. Native strategy: This strategy generates a single Cypher statement containing multiple subqueries for each message in the batch, and executes it using the standard Cypher execution engine. The maximum number of subqueries included in a single batch when using the native strategy can be configured using the neo4j.max-batched-queries setting, which defaults to 50.

The connector defaults to the APOC core batching strategy when available, falling back to the native strategy when APOC Core is not available on the target database.

Batch size, which applies to both batching strategies, can be configured using the neo4j.batch-size setting, which defaults to 1000.

Exactly-once semantics

Starting from version 5.4.0, pattern sink handlers can be configured to achieve exactly-once processing guarantees by keeping track of the last successfully processed message offset in the database. This is achieved by storing a dedicated node representing the offset of the last successfully processed message.

Use the neo4j.eos-offset-label setting to specify a label name to attach to the offset node. This setting is empty by default, meaning that exactly-once semantics are disabled and the connector will not keep track of processed message offsets in the target database, meaning that at-least-once processing guarantees are provided.

Make sure that the configured label has a node key constraint defined on the strategy, topic and partition node properties.

For example, if your neo4j.eos-offset-label is set to __KafkaOffset, the following constraint must appear on the target database:

CREATE CONSTRAINT kafka_offset_key IF NOT EXISTS FOR (n:__KafkaOffset) REQUIRE (n.strategy, n.topic, n.partition) IS NODE KEY