Chapter 3. Producer: Neo4j → Kafka

This chapter describes the Neo4j Streams Producer in the Neo4j Streams Library. Use this section to configure Neo4j to publish CDC style data to Kafka.

Is the transaction event handler events that sends data to a Kafka topic

3.1. Configuration

You can set the following configuration values in your neo4j.conf, here are the defaults.

neo4j.conf. 

kafka.zookeeper.connect=localhost:2181
kafka.bootstrap.servers=localhost:9092
kafka.acks=1
kafka.num.partitions=1
kafka.retries=2
kafka.batch.size=16384
kafka.buffer.memory=33554432
kafka.reindex.batch.size=1000
kafka.session.timeout.ms=15000
kafka.connection.timeout.ms=10000
kafka.replication=1
kafka.linger.ms=1
kafka.transactional.id=

streams.source.topic.nodes.<TOPIC_NAME>=<PATTERN>
streams.source.topic.relationships.<TOPIC_NAME>=<PATTERN>
streams.source.enabled=<true/false, default=true>
streams.source.schema.polling.interval=<MILLIS, the polling interval for getting the schema information>

Note: To use the Kafka transactions please set kafka.transactional.id and kafka.acks properly

See the Apache Kafka documentation for details on these settings.

3.2. Patterns

3.2.1. Nodes

To control which nodes are sent to Kafka, and which of their properties you can define node-patterns in the config.

You can chose Labels and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

Label{*};Label1{prop1, prop2};Label3{-prop1,-prop2}
pattern meaning

Label{*}

all nodes with this label with all their properties go to the related topic

Label1:Label2

nodes with these two labels are sent to the related topic

Label{prop1,prop2}

the prop1 and prop2 of all nodes with this label are sent to the related topic

Label{-prop1,-prop2}

in the node with label Label properties prop1 and prop2 are excluded

3.2.2. Relationships

To control which relationships are sent to Kafka, and which of their properties you can define relationships-patterns in the config.

You can chose Type and properties for inclusion or exclusion, with * meaning all.

Patterns are separated by semicolons ;.

The basic syntax is:

KNOWS{*};MEET{prop1, prop2};ANSWER{-prop1,-prop2}
pattern meaning

KNOWS{*}

all relationship with this label with all their properties go to the related topic

KNOWS{prop1,prop2}

the prop1 and prop2 of all relationship with this type are sent to the related topic

KNOWS{-prop1,-prop2}

in the relationship with type KNOWS properties prop1 and prop2 are excluded

3.3. Transaction Event Handler

The transaction event handler is the core of the Stream Producer and allows to stream database changes.

3.3.1. Events

The Producer streams three kind of events:

  • created: when a node/relation/property is created
  • updated: when a node/relation/property is updated
  • deleted: when a node/relation/property is deleted

3.3.1.1. Created

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "after": {
      "labels": ["Person"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne Marie"
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "created",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.1.2. Updated

Following an example of the node update event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person", "Tmp"],
      "properties": {
        "email": "annek@noanswer.org",
        "last_name": "Kretchmar",
        "first_name": "Anne"
      }
    },
    "after": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "updated",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]"
      }
    },
    "after": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.1.3. Deleted

Following an example of the node creation event:

{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "1004",
    "type": "node",
    "before": {
      "labels": ["Person"],
      "properties": {
        "last_name": "Kretchmar",
        "email": "annek@noanswer.org",
        "first_name": "Anne Marie",
        "geo": { "crs": "wgs-84-3d", "latitude": 46.2222, "longitude": 32.11111, "height": 0.123 }
      }
    }
  },
  "schema": {
    "properties": {
      "last_name": "String",
      "email": "String",
      "first_name": "String",
      "geo": "point"
    },
    "constraints": [{
      "label": "Person",
      "properties": ["first_name", "last_name"],
      "type": "UNIQUE"
    }]
  }
}
{
  "meta": {
    "timestamp": 1532597182604,
    "username": "neo4j",
    "tx_id": 3,
    "tx_event_id": 0,
    "tx_events_count": 2,
    "operation": "deleted",
    "source": {
      "hostname": "neo4j.mycompany.com"
    }
  },
  "payload": {
    "id": "123",
    "type": "relationship",
    "label": "KNOWS",
    "start": {
      "labels": ["Person"],
      "id": "123",
      "ids": {
        "last_name": "Andrea",
        "first_name": "Santurbano"
      }
    },
    "end": {
      "labels": ["Person"],
      "id": "456",
      "ids": {
        "last_name": "Michael",
        "first_name": "Hunger"
      }
    },
    "before": {
      "properties": {
        "since": "2018-04-05T12:34:00[Europe/Berlin]",
        "to": "2019-04-05T23:00:00[Europe/Berlin]"
      }
    }
  },
  "schema": {
    "properties": {
      "since": "ZonedDateTime",
      "to": "ZonedDateTime"
    },
    "constraints": [{
      "label": "KNOWS",
      "properties": ["since"],
      "type": "RELATIONSHIP_PROPERTY_EXISTS"
    }]
  }
}

3.3.2. Meta

The meta field contains the metadata related to the transaction event:

Field Type Description

timestamp

Number

The timestamp related to the transaction event

username

String

The username that generated the transaction

tx_id

Number

The transaction id provided by the Neo4j trasaction manager

tx_event_count

Number

The number of the events included into the transaction (i.e. 2 update on nodes, 1 relationship creation)

tx_event_id

Number

The id of the event inside the transaction

operation

enum["created", "updated", "deleted"]

The operation type

source

Object

Contains the information about the source

3.3.2.1. Source

Field Type Description

hostname

String

The information about the source

3.3.3. Payload

The payload field contains the information about the the data related to the event

Field Type Description

id

Number

The id of the graph entity

type

enum["node", "relationship"]

The type of the graph entity

before

Object

The data before the transaction event

after

Object

The data after the transaction event

3.3.3.1. Payload: before and after

We must distinguish two cases:

Nodes
Field Type Description

labels

String[]

List of labels attached to the node

properties

Map<K, V>

List of properties attached to the node, the K is the property name

Relationships
Field Type Description

label

string

The relationship type

properties

Map<K,V>

List of properties attached to the relationship, the K is the property name

start

Object

The starting node of the relationship

end

Object

The ending node of the relationship

Relationships: startNode and endNode
Field Type Description

id

Number

The id of the node

labels

String[]

List of labels attached to the node

ids

Map<K,V>

The ids related to the defined constraints for the node (UNIQUENESS and/or NODE_KEY). The K is the property name, and the V the related value

3.3.4. Schema

Field Type Description

constraints

Object[]

List of constraints attached to the entity

properties

Map<K, V>

List of properties attached to the entity, where the K is the property name and the V is the class type

3.3.4.1. Constraints

Nodes and Relationships can have a list of constraints attached to them:

Table 3.1. Constraints
Field Type Description

label

String

The label attached to the constraint

type

enum["UNIQUE", "NODE_PROPERTY_EXISTS", "RELATIONSHIP_PROPERTY_EXISTS"]

The constraint type

properties

String[]

List of properties involved in the constraint