Sink Configuration
The documentation on the deprecated Neo4j Streams plugin and the previous version of the Kafka Connect Neo4j Connector can be found here. |
In this chapter we’ll discuss how the Sink instance is configured.
Create the Sink Instance
Create the Sink instance:
We’ll define the Sink configuration in several ways:
-
by providing a Cypher template
-
by ingesting the events emitted from another Neo4j instance via the Change Data Capture module
-
by providing a pattern extraction to a JSON or AVRO file
-
by managing a CUD file format
Sink ingestion strategies
Cypher Strategy
{
"name": "Neo4jSinkConnectorJSONString",
"config": {
"topics": "my-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": false,
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
}
}
In particular this line:
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
defines that all the data that comes from the topic my-topic
will be unpacked by the Sink into Neo4j with the following Cypher query:
MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)
Under the hood the Sink inject the event object in this way
UNWIND $batch AS event
MERGE (p:Person{name: event.name, surname: event.surname})
MERGE (f:Family{name: event.surname})
MERGE (p)-[:BELONGS_TO]->(f)
Where $batch
is a list of event objects.
You can change the query or remove the property and add your own, but you must follow the following convention:
"neo4j.topic.cypher.<YOUR_TOPIC>": "<YOUR_CYPHER_QUERY>"
Let’s load the configuration into the Confluent Platform with this REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @sink.string-json.neo4j.json
The file sink.string-json.neo4j.json
contains a configuration that shows a sample JSON consumer configuration.
Please check that everything is fine by going into:
http://localhost:9021/management/connect
and click to the Sink tab. You must find a table just like this:
Status | Active Tasks | Name | Topics |
---|---|---|---|
Running |
1 |
Neo4jSinkConnector |
my-topic |
Note that the Sink instance can be configured also to monitor multiple topics. Just evaluate the property topics
with a list of topic separated by comma. For example:
{
"name": "Neo4jSinkConnector",
"config": {
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"topics": "topicA,topicB",
"_comment": "Cypher template example configuration",
"neo4j.topic.cypher.topicA": "<YOUR_CYPHER_QUERY>",
"neo4j.topic.cypher.topicB": "<YOUR_CYPHER_QUERY>",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false
}
}
Change Data Capture Event Strategy
This method allows to ingest CDC events coming from another Neo4j Instance. You can use two sub strategies:
-
The
SourceId
sub-strategy which merges the nodes/relationships by the CDC eventid
field (it’s related to the Neo4j physical ID) -
The
Schema
sub-strategy which merges the nodes/relationships by the constraints (UNIQUENESS, NODE_KEY) defined in your graph model
The SourceId
sub-strategy
You can configure the topic in the following way:
"neo4j.topic.cdc.sourceId": "<list of topics separated by semicolon>"
"neo4j.topic.cdc.sourceId.labelName": "<the label attached to the node, default=SourceEvent>"
"neo4j.topic.cdc.sourceId.idName": "<the id name given to the CDC id field, default=sourceId>"
"neo4j.topic.cdc.sourceId": "my-topic;my-other.topic"
Each streams event will be projected into the related graph entity, for instance the following event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
will be persisted as the following node:
Person:SourceEvent{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org", sourceId: "1004"}
as you can notice, ingested event has been projected with two peculiarities:
-
the
id
field has transformed intosourceId
; -
the node has an additional label
SourceEvent
;
these two fields will be used in order to match the node/relationship for future updates/deletes
The Schema
sub-strategy
You can configure the topic in the following way:
"neo4j.topic.cdc.schema": "<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>"
"neo4j.topic.cdc.schema": "my-topic;my-other.topic"
Each streams event will be projected into the related graph entity, for instance the following event:
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "1004",
"type": "node",
"after": {
"labels": ["Person"],
"properties": {
"email": "annek@noanswer.org",
"last_name": "Kretchmar",
"first_name": "Anne Marie"
}
}
},
"schema": {
"properties": {
"last_name": "String",
"email": "String",
"first_name": "String"
},
"constraints": [{
"label": "Person",
"properties": ["first_name", "last_name"],
"type": "UNIQUE"
}]
}
}
will be persisted as the following node:
Person{first_name: "Anne Marie", last_name: "Kretchmar", email: "annek@noanswer.org"}
The Schema
sub-strategy leverages the schema
field in order to insert/update the nodes so no extra fields will be created.
In case of relationship
{
"meta": {
"timestamp": 1532597182604,
"username": "neo4j",
"tx_id": 3,
"tx_event_id": 0,
"tx_events_count": 2,
"operation": "created",
"source": {
"hostname": "neo4j.mycompany.com"
}
},
"payload": {
"id": "123",
"type": "relationship",
"label": "KNOWS",
"start": {
"labels": ["Person"],
"id": "123",
"ids": {
"last_name": "Andrea",
"first_name": "Santurbano"
}
},
"end": {
"labels": ["Person"],
"id": "456",
"ids": {
"last_name": "Michael",
"first_name": "Hunger"
}
},
"after": {
"properties": {
"since": "2018-04-05T12:34:00[Europe/Berlin]"
}
}
},
"schema": {
"properties": {
"since": "ZonedDateTime"
},
"constraints": [{
"label": "KNOWS",
"properties": ["since"],
"type": "RELATIONSHIP_PROPERTY_EXISTS"
}]
}
}
the Schema
sub-strategy leverages the ids
fields in order to insert/update the relationships so no extra fields will be created.
The Pattern
strategy
The Pattern
strategy allows you to extract nodes and relationships from a json by providing a extraction pattern
Each property can be prefixed with:
-
!
: identify the id (could be more than one property), it’s mandatory -
-
: exclude the property from the extraction If no prefix is specified this means that the property will be included
You cannot mix inclusion and exclusion so your pattern must contains all exclusion or inclusion properties |
Labels can be chained via :
The pattern strategy come out with the support to the Tombstone Record,
in order to leverage it your event should contain as key the record that you want to delete and null
for the value.
Currently you can’t concatenate multiple patterns (for example in case you use just one topic and produce more then one node/relationship type). So you have to use a different topic for each type of node/relationship and define a pattern for each of them |
The Node Pattern
configuration
You can configure the node pattern extraction in the following way:
"neo4j.topic.pattern.node.<TOPIC_NAME>": "<NODE_EXTRACTION_PATTERN>"
So for instance, given the following json
published via the user
topic:
{"userId": 1, "name": "Andrea", "surname": "Santurbano", "address": {"city": "Venice", "cap": "30100"}}
You can transform it into a node by providing the following configuration:
by specifying a simpler pattern:
"neo4j.topic.pattern.node.user": "User{!userId}"
or by specifying a Cypher like node pattern:
"neo4j.topic.pattern.node.user": "(:User{!userId})"
Similar to the CDC pattern you can provide:
pattern | meaning |
---|---|
|
the userId will be used as ID field and all properties of the json will be attached to the node with the provided
labels ( |
|
the userId will be used as ID field and only the surname property of the json will be attached to the node with the provided
labels ( |
|
the userId will be used as ID field and only the surname and the |
|
the userId will be used as ID field and the |
The Relationship Pattern
configuration
You can configure the relationship pattern extraction in the following way:
"neo4j.topic.pattern.relationship.<TOPIC_NAME>": "<RELATIONSHIP_EXTRACTION_PATTERN>"
So for instance, given the following json
published via the user
topic:
{"userId": 1, "productId": 100, "price": 10, "currency": "€", "shippingAddress": {"city": "Venice", cap: "30100"}}
You can transform it into a path, like (n)-[r]→(m)
, by providing the following configuration:
By specifying a simpler pattern:
"neo4j.topic.pattern.relationship.user": "User{!userId} BOUGHT{price, currency} Product{!productId}"
or by specifying a Cypher like node pattern:
"neo4j.topic.pattern.relationship.user": "(:User{!userId})-[:BOUGHT{price, currency}]->(:Product{!productId})"
in this last case the we assume that User
is the source node and Product
the target node
Similar to the CDC pattern you can provide:
pattern | meaning |
---|---|
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
|
this will merge fetch/create the two nodes by the provided identifier and the |
Attach properties to node
By default no properties will be attached to the edge nodes but you can specify which property attach to each node. Given the following json
published via the user
topic:
{
"userId": 1,
"userName": "Andrea",
"userSurname": "Santurbano",
"productId": 100,
"productName": "My Awesome Product!",
"price": 10,
"currency": "€"
}
pattern | meaning |
---|---|
|
this will merge two nodes and the |
CUD File Format Strategy
The CUD file format is JSON file that represents Graph Entities (Nodes/Relationships) and how to manage them in terms of Create/Update/Delete operations.
You can configure the topic in the following way:
"neo4j.topic.cud": "<LIST_OF_TOPICS_SEPARATED_BY_SEMICOLON>"
"neo4j.topic.cud": "my-topic;my-other.topic"
We have two formats:
-
One for nodes:
We provide an example of a
MERGE
operation{ "op": "merge", "properties": { "foo": "value", "key": 1 }, "ids": {"key": 1, "otherKey": "foo"}, "labels": ["Foo","Bar"], "type": "node", "detach": true }
which would be transformed into the following Cypher query:
UNWIND [..., {
"op": "merge",
"properties": {
"foo": "value",
"key": 1
},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": true
}, ...] AS event
MERGE (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
SET n += event.properties
Lets describe the fields:
field | mandatory | Description |
---|---|---|
op |
yes |
The operation type: create/merge/update/delete N.B. delete messages are for individual nodes it’s not intended to be a generic way of doing cypher query building from JSON |
properties |
no in case the operation is |
The properties attached to the node |
ids |
no in case the operation is |
In case the operation is merge/update/delete this field is mandatory and contains
the primary/unique keys of the node that will be use to do the lookup to the entity.
In case you use as key the N.B. If you’ll use the |
labels |
no |
The labels attached to the node. N.B. Neo4j allows to create nodes without labels, but from a performance perspective, it’s a bad idea don’t provide them. |
type |
yes |
The entity type: node/relationship ⇒ node in this case |
detach |
no |
In case the operation is delete you can specify if perform a "detach" delete that means delete any incident relationships when you delete a node N.B. if no value is provided, the default is true |
-
And one for relationships:
We provide an example of a CREATE
operation
{
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
which would be transformed into the following Cypher query:
UNWIND [..., {
"op": "create",
"properties": {
"foo": "rel-value",
"key": 1
},
"rel-type": "MY-REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherKey: event.to.ids.otherKey})
CREATE (from)-[r:MY_REL]->(to)
SET r = event.properties
Lets describe the fields:
field | mandatory | Description |
---|---|---|
op |
yes |
The operation type: create/merge/update/delete |
properties |
no |
The properties attached to the relationship |
rel_type |
yes |
The relationship type |
from |
yes, if you use the |
Contains the info about the source node of the relationship.
For the description of the |
to |
yes, if you use the |
Contains the info about the target node of the relationship.
For the description of the |
type |
yes |
The entity type: node/relationship ⇒ relationship in this case |
Following another example of DELETE
operation for both node and relationship.
-
For Node, the following JSON:
{
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}
will be transformed in the following Cypher query:
UNWIND [..., {
"op": "delete",
"properties": {},
"ids": {"key": 1, "otherKey": "foo"},
"labels": ["Foo","Bar"],
"type": "node",
"detach": false
}, ...] AS event
MATCH (n:Foo:Bar {key: event.ids.key, otherkey: event.ids.otherkey})
DELETE n
Note that if you set "detach": true
then the transformation will be:
UNWIND [
...
] AS event
...
DETACH DELETE n
-
For Relationship, the following JSON:
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}
will be transformed in the following Cypher query:
UNWIND [..., {
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"]
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"]
},
"type":"relationship"
}, ...] AS event
MATCH (from:Foo:Bar {key: event.from.ids.key})
MATCH (to:FooBar {otherkey: event.to.ids.otherkey})
MATCH (from)-[r:MY_REL]->(to)
DELETE r
We can create non-existent nodes at relationship creation/merging, putting "op": "merge"
in "from"
and/or "to"
field.
By default, "op" is match
, so the node is not created if it doesn’t exist.
We can write, for example:
{
"op": "create",
"properties": {},
"rel_type": "MY_REL",
"from": {
"ids": {"key": 1},
"labels": ["Foo","Bar"],
"op": "merge"
},
"to": {
"ids": {"otherKey":1},
"labels": ["FooBar"],
"op": "merge"
},
"type":"relationship"
}
Multi Database Support
Neo4j Enterprise Edition has multi-tenancy support.
In order to support this feature with Kafka Connect Neo4j Connector, we have to add the neo4j.database
property to the sink configuration, which tells the Connector the database to use as default.
If you don’t specify that property, the home database for the configured user will be used.
Following an example:
{
"name": "Neo4jSinkConnector",
"config": {
"neo4j.database": "<database_name>",
"topics": "topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": false,
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "password",
"neo4j.encryption.enabled": false,
"_comment": "Sink CDC SourceId Strategy",
"neo4j.topic.cdc.sourceId": "topic",
"neo4j.topic.cdc.sourceId.labelName": "<the label attached to the node, default=SourceEvent>",
"neo4j.topic.cdc.sourceId.idName": "<the id name given to the CDC id field, default=sourceId>",
"_comment": "Sink CDC Schema Strategy",
"neo4j.topic.cdc.schema": "<list_of_topics_separated_by_semicolon>",
"_comment": "Sink Node/Relationship Pattern Strategy",
"neo4j.topic.pattern.node.<TOPIC_NAME>": "<node_extraction_pattern>",
"neo4j.topic.pattern.relationship.<TOPIC_NAME>": "<relationship_extraction_pattern>",
"_comment": "Sink CUD File forma Strategy",
"neo4j.topic.cud": "<list_of_topics_separated_by_semicolon>"
}
}
How to deal with bad data
In Kafka Connect Neo4j Connector, in the creation phase of the Sink instance, in addition to the properties described in the Dead Letter Queue configuration parameters table, you need to define kafka broker connection properties:
Name | Value | Note |
---|---|---|
|
|
fail fast (default!) abort |
|
|
all == lenient, silently ignore bad messages |
|
|
log errors (default: false) |
|
|
log bad messages too (default: false) |
|
|
dead letter queue topic name, if left off no DLQ, default: not set |
|
|
enrich messages with metadata headers like exception, timestamp, org. topic, org.part, default:false |
|
|
common prefix for header entries, e.g. |
|
|
replication factor, need to set to 1 for single partition, default:3 |
For the Neo4j extension you prefix them with neo4j
in the Neo4j configuration.
Name | mandatory | Description |
---|---|---|
kafka.bootstrap.servers |
true |
It’s the Kafka Broker url. *(please look at the description below) |
kafka.<any_other_kafka_property> |
false |
You can also specify any other kafka Producer
setting by adding the |
As you may have noticed we’re asking to provide the bootstrap.server
property,
this because the Kafka Connect Framework provides an out-of-the-box support
only for deserialization errors and message transformations
(please look into the following link for further details: https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/).
We want to extend this feature for transient errors in order to cover the 100% of failures.
So to do that at this moment as suggested by Confluent we need to ask again the broker location,
until this JIRA issue will not be addressed: https://issues.apache.org/jira/browse/KAFKA-8597.
Said that, these properties has to be added only if you want to also redirect Neo4j errors into the DLQ.
Was this page helpful?