Cypher Strategy

This strategy executes corresponding Cypher statements for each message received.

To configure a cypher strategy for a desired topic, you must follow the following convention:

"neo4j.cypher.topic.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"

Starting with version 5.1.0 of the Neo4j Connector for Kafka, the Cypher strategy binds header, key, and value of the messages as __header, __key, and __value respectively, which are passed to the user-provided Cypher query as predefined variables. See Cypher strategy settings for more information on how to customize the name of the variables.

For backward compatibility, event is still available for use but will only correspond to the value of the message.

Example

Given that you configure the topics your sink connector subscribes to within the sink configuration settings as follows;

  "topics": "creates,updates,deletes"

You need to declare that you want to use cypher strategy and provide the corresponding Cypher statement for each topic, similar to the following;

  "topics": "creates,updates,deletes",
  "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
  "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p"

The above configuration excerpt defines that;

  • messages received from creates topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:

    WITH __value.event.state.after AS state
    MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
    MERGE (f:Family {name: state.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • messages received from updates topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:

    WITH __value.event.state.before AS before, __value.event.state.after AS after
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    MATCH (fPre:Family {name: before.properties.surname})
    OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre)
    DELETE b
    WITH after, p
    SET p.name = after.properties.name, p.surname = after.properties.surname
    MERGE (f:Family {name: after.properties.surname})
    MERGE (p)-[:BELONGS_TO]->(f)
  • messages received from deletes topic will be unpacked by the Sink connector into Neo4j with the following Cypher query:

    WITH __value.event.state.before AS before
    MATCH (p:Person {name: before.properties.name, surname: before.properties.surname})
    DETACH DELETE p

Creating the Sink instance

Based on the above example, you can use one of the following configurations. Pick one of the message serialization format examples and save it as a file named sink.cypher.neo4j.json into a local directory.

{
  "name": "Neo4jSinkConnectorAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

Load the configuration into the Kafka Connect with this REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @sink.cypher.neo4j.json

Now you can access your Confluent Control Center instance under http://localhost:9021/clusters. Verify that the configured connector instance is running in the Connect tab under connect-default.

Under the hood the connector will create a batch of changes for each topic, and will execute the query with a prepended UNWIND clause. For the above example, the executed query for messages received from creates topic would look like;

UNWIND $events AS message
WITH message.value AS event, message.header AS __header, message.key AS __key, message.value AS __value
WITH __value.event.state.after AS state
MERGE (p:Person {name: state.properties.name, surname: state.properties.surname})
MERGE (f:Family {name: state.properties.surname})
MERGE (p)-[:BELONGS_TO]->(f)

where $events is a batch of change events.