Source: Neo4j → Kafka

The Neo4j Streams Plugin running inside the Neo4j database is deprecated and will not be supported after version 4.3 of Neo4j. We recommend users not to adopt this plugin for new implementations, and to consider migrating to the use of the Kafka Connect Neo4j Connector as a replacement

Is the transaction event handler events that sends data to a Kafka topic

Configuration

You can set the following configuration values in your neo4j.conf, here are the defaults.

neo4j.conf (with default values)
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=
kafka.topic.discovery.polling.interval=300000
kafka.streams.log.compaction.strategy=delete

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=<default/all>
streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<default/all>
streams.source.enabled=<true/false, default=true>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>
streams.procedures.enabled.from.<DB_NAME>=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>

To use the Kafka transactions please set kafka.transactional.id and kafka.acks properly. Checkout this blog post for further details about transactions in Apache Kafka

See the Apache Kafka documentation for details on these settings.

In case you Kafka broker is configured with auto.create.topics.enable to false, all the messages sent to topics that don’t exist are discarded; this because the KafkaProducer.send() method blocks the execution, as explained in KAFKA-3539. You can tune the custom property kafka.topic.discovery.polling.interval in order to periodically check for new topics into the Kafka cluster so the plugin will be able to send events to the defined topics.

With kafka.streams.log.compaction.strategy=delete will be generated a sequence of unique keys with Neo4j Streams Source. instead with kafka.streams.log.compaction.strategy=compact the keys will be adapted to enable Log Compaction on the Kafka side. Please note that delete strategy does not actually delete records, it has this name to match the topic config cleanup.policy=delete/compact. Namely, the operations which will involve the same nodes or relationships, will have the same key.

When kafka.streams.log.compaction.strategy=compact, for partitioning we leverage internal Kafka mechanism.

Multi Database Support

Neo4j 4.0 Enterprise has multi-tenancy support, in order to support this feature you can set for each database instance a configuration suffix with the following pattern from.<DB_NAME> to the properties in your neo4j.conf file.

Following the list of new properties that allows to support multi-tenancy:

streams.source.topic.nodes.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>.from.<DB_NAME>.key_strategy=<PATTERN>
streams.source.enabled.from.<DB_NAME>=<true/false, default=true>

This means that for each db instance you can specify if:

  • use the source connector

  • the routing patterns

So if you have a instance name foo you can specify a configuration in this way:

streams.source.topic.nodes.myTopic.from.foo=<PATTERN>
streams.source.topic.relationships.myTopic.from.foo=<PATTERN>
streams.source.enabled.from.foo=<true/false, default=true>

The old properties:

streams.source.enabled=<true/false, default=true>
streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.procedures.enabled=<true/false, default=true>

are still valid and they refer to Neo4j’s default db instance.

The default database is controlled by Neo4j’s dbms.default_database configuration property so we’re being clear about which default database applies for this user. Database names are case-insensitive and normalized to lowercase, and must follow Neo4j database naming rules. (Reference: https://neo4j.com/docs/operations-manual/current/manage-databases/configuration/#manage-databases-administration)

In particular the following property will be used as default value for non-default db instances, in case of the specific configuration params is not provided:

streams.source.enabled=<true/false, default=true>

This means that if you have Neo4j with 3 db instances:

  • neo4j (default)

  • foo

  • bar

and you want to enable the Source plugin on all instances, you can simply omit any configuration about enabling it, you just need to provide the routing configuration for each instance:

streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}

Otherwise if you want to enable the Source plugin only on foo and bar instances, you can do it in this way:

streams.source.enabled=false
streams.source.enabled.from.foo=true
streams.source.enabled.from.bar=true
streams.source.topic.nodes.testTopic=Test{testId}
streams.source.topic.nodes.fooTopic.from.foo=Foo{fooId,fooName}
streams.source.topic.relationships.barTopic.from.bar=Bar{barId,barName}

As you can see, if you want to enable the Source plugin only on one or more specific db instances, you have to previously disable the Source plugin (streams.source.enabled=false) and then enable it only on the desired instances (i.e. streams.source.enabled.from.foo=true). Furthermore, please note that the streams.source.topic.nodes.testTopic=Test{testId} will not be considered because the Source plugin on the default database instance neo4j has been disabled.

So in general if you have:

streams.source.enabled=true
streams.source.enabled.from.foo=false

Then Source module is enabled on all databases EXCEPT foo (local overrides global)

For example purposes only, imagine a situation like the following:

You have a Neo4j instance, without Neo4j Streams installed, where a database "testdb" was created and populated. You decide to install the Neo4j Streams plugin because we want to have also our graph data into Kafka. So you add the following configuration:

kafka.bootstrap.servers=localhost:9092
streams.source.enabled=true
streams.sink.enabled=false
streams.procedures.enabled=true
streams.source.topic.nodes.topicTest.from.testdb=Test{*}

Doing so you expect to reflect all the created/updated nodes with label Test into the topic topicTest. What actually happens is:

  • all the nodes and relationship inserted before the Kafka setup, are reflected into a topic called "testdb" which is created by default with the database name.

  • all the created/updated nodes with label Test, after the Kafka setup, are reflected into the configured topic topicTest.

The second point happens because, since the database "testdb" is already populated, by enabling the Source module (streams.source.enabled=true) the plugin will create a default topic with the same name as the database name, and it will reflect all the "testdb" data into it.

If you want to turn off this default behaviour you have to disable the "generic" Source module and enable it just for the database you are interested of:

kafka.bootstrap.servers=localhost:9092
streams.source.enabled=false
streams.sink.enabled=false
streams.procedures.enabled=true
streams.source.enabled.from.test1=true
streams.source.topic.nodes.topicTest.from.testdb=Test{*}

Serializers

To allow insertion of keys in any format (e.g. through streams.publish procedure) the key.serializer is set with the org.apache.kafka.common.serialization.ByteArraySerializer like value.serializer

Message structure

The message key structure depends on kafka.streams.log.compaction.strategy.

With delete is a string: "${meta.txId + meta.txEventId}-${meta.txEventId}".

"[txId+txEventId] - txEventId "

where:

  • txId identifies the transaction that affected the entity

  • txEventId is a counter that identifies the internal order in which Neo4j handled the specific event

  • [txId+txEventId] is the numeric sum of the two previous values

 

Instead with compact:

In case of node without constrained label the key is the string value of node id.

In case of node with constrained label, the key is a json with {ids: mapOfConstaint , labels: listOfLabels}

For example, with this configuration:

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

this constraint:

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE

and this query:

CREATE (:Person {name:'Sherlock', surname: 'Holmes'})

We obtain this key:

{"ids": {"name": "Sherlock"}, "labels": ["Person"]}

 

Otherwise, with the same configuration and query as above, but with the constraint:

CREATE CONSTRAINT ON (p:Person) ASSERT (p.name, p.surname) IS NODE KEY

We obtain this key:

{"ids": {"surname": "Holmes", "name":  "Sherlock"}, "labels": ["Person"]}

 

In case of relationship, the key is a json with {start: START_NODE , end: END_NODE, label: typeOfRelationship}
START_NODE and END_NODE node follow the same rule as above.

For example, with this configuration:

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

these constraints:

CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Product) ASSERT p.code IS UNIQUE;

and these queries:

CREATE (:Person {name:'Pippo'});
CREATE (p:Product {code:'1367', name: 'Notebook'});
MATCH (pe:Person {name:'Pippo'}), (pr:Product {name:'Notebook'}) MERGE (pe)-[:BUYS]->(pr);

We obtain this key:

{"start": {"ids": {"name":  "Pippo"}, "labels": ["Person"]}, "end": {"ids": {"code":  "1367"}, "labels": ["Product"]},
 "label": "BUYS"}

 

Otherwise, with this configuration:

streams.source.topic.nodes.<TOPIC_NAME>=Person{*}
streams.source.topic.relationships.<TOPIC_NAME>=Person{*}
kafka.streams.log.compaction.strategy=compact

without constraints, and with these queries:

CREATE (:Person {name:'Pippo'})

We obtain this key:

{"start": "0", "end": "1", "label": "BUYS"}

In case of relationships with multiple constraints on start or end node, the ids fields depend on streams.source.topic.relationships.<TOPIC_NAME>.key_strategy config. See 'key-strategy' section to more details

Patterns

Nodes

To control which nodes are sent to Kafka, and which of their properties you can define node-patterns in the config.

You can chose Labels and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
pattern meaning

Label{*}

all nodes with this label with all their properties go to the related topic

Label1:Label2

nodes with these two labels are sent to the related topic

Label{prop1,prop2}

the prop1 and prop2 of all nodes with this label are sent to the related topic

Label{-prop1,-prop2}

in the node with label Label properties prop1 and prop2 are excluded

Relationships

To control which relationships are sent to Kafka, and which of their properties you can define relationships-patterns in the config.

You can chose Type and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
pattern meaning

KNOWS{*}

all relationship with this label with all their properties go to the related topic

MEET{prop1,prop2}

the prop1 and prop2 of all relationship with this type are sent to the related topic

ANSWER{-prop1,-prop2}

in the relationship with type KNOWS properties prop1 and prop2 are excluded

Relationship key strategy

With streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=default, when I produce a message for relationships, only one of the associated node constraints will be returned, based on the following rule.

If there are multiple constraints, we sort by the number of the properties associated to the constraint, then by label name (alphabetically) and finally by properties name (alphabetically).

Finally, we take the properties of the first one.

So, if we have a start node with labels Person and Other, and we create 2 constraints like these:

CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
CREATE CONSTRAINT ON (p:Person) ASSERT p.name IS UNIQUE;

the start.ids field produced, will be:

{"zxc": "FooBar"}

because properties size is the same (that is 1), but first label by name is Other.

Otherwise, with:

CREATE CONSTRAINT ON (p:Other) ASSERT p.zxc IS UNIQUE;
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;

the start.ids field produced, will be:

{"name": "Sherlock"}

because of name property name compared to zxc.

Otherwise with streams.source.topic.relationships.<TOPIC_NAME>.key_strategy=all, any property participating in a unique constraint will be produced. So, with a start node with labels Person, Other, Another and with these constraints:

CREATE CONSTRAINT ON (p:Another) ASSERT p.zxc IS UNIQUE;
CREATE CONSTRAINT ON (p:Other) ASSERT p.name IS UNIQUE;
CREATE CONSTRAINT ON (p:Person) ASSERT p.surname IS UNIQUE;

the start.ids field produced, will be:

{ "name": "Sherlock", "surname": "Holmes", "zxc": "FooBar"}

Transaction Event Handler

The transaction event handler is the core of the Stream Producer and allows to stream database changes.

Events

The Producer streams three kind of events:

  • created: when a node/relation/property is created

  • updated: when a node/relation/property is updated

  • deleted: when a node/relation/property is deleted

Created

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

Updated

Following an example of the node update event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person", "Tmp"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne"
      }
    },
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

Deleted

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String",
      "geo": "point"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

Meta

The meta field contains the metadata related to the transaction event:

Field Type Description

timestamp

Number

The timestamp related to the transaction event

username

String

The username that generated the transaction

tx_id

Number

The transaction id provided by the Neo4j trasaction manager

tx_event_count

Number

The number of the events included into the transaction (i.e. 2 update on nodes, 1 relationship creation)

tx_event_id

Number

The id of the event inside the transaction

operation

enum["created", "updated", "deleted"]

The operation type

source

Object

Contains the information about the source

Source

Field Type Description

hostname

String

The information about the source

Payload

The payload field contains the information about the the data related to the event

Field Type Description

id

Number

The id of the graph entity

type

enum["node", "relationship"]

The type of the graph entity

before

Object

The data before the transaction event

after

Object

The data after the transaction event

Payload: before and after

We must distinguish two cases:

Nodes

Field Type Description

labels

String[]

List of labels attached to the node

properties

Map<K, V>

List of properties attached to the node, the K is the property name

Relationships

Field Type Description

label

string

The relationship type

properties

Map<K,V>

List of properties attached to the relationship, the K is the property name

start

Object

The starting node of the relationship

end

Object

The ending node of the relationship

Relationships: startNode and endNode

Field Type Description

id

Number

The id of the node

labels

String[]

List of labels attached to the node

ids

Map<K,V>

The ids related to the defined constraints for the node (UNIQUENESS and/or NODE_KEY). The K is the property name, and the V the related value

Schema

Field Type Description

constraints

Object[]

List of constraints attached to the entity

properties

Map<K, V>

List of properties attached to the entity, where the K is the property name and the V is the class type

Constraints

Nodes and Relationships can have a list of constraints attached to them:

Table 1. Constraints
Field Type Description

label

String

The label attached to the constraint

type

enum["UNIQUE", "NODE_PROPERTY_EXISTS", "RELATIONSHIP_PROPERTY_EXISTS"]

The constraint type

properties

String[]

List of properties involved in the constraint