Quick Start
Neo4j Streams Plugin
Any configuration option that starts with streams.
controls how the plugin itself behaves. For a full
list of options available, see the documentation subsections on the source and sink.
Install the Plugin
-
Download the latest release jar from https://github.com/neo4j-contrib/neo4j-streams/releases/latest
-
Copy it into
$NEO4J_HOME/plugins
and configure the relevant connections
Kafka settings
Any configuration option that starts with kafka.
will be passed to the underlying Kafka driver. Neo4j
streams uses the official Confluent Kafka producer and consumer java clients.
Configuration settings which are valid for those connectors will also work for Neo4j Streams.
For example, in the Kafka documentation linked below, the configuration setting named batch.size
should be stated as
kafka.batch.size
in Neo4j Streams.
The following are common configuration settings you may wish to use. This is not a complete list. The full list of configuration options and reference material is available from Confluent’s site for sink configurations and source configurations.
Setting Name | Description | Default Value |
---|---|---|
kafka.max.poll.records |
The maximum number of records to pull per batch from Kafka. Increasing this number will mean larger transactions in Neo4j memory and may improve throughput. |
500 |
kafka.buffer.memory |
The total bytes of memory the producer can use to buffer records waiting. Use this to adjust how much memory the plugin may require to hold messages not yet delivered to Neo4j |
33554432 |
kafka.batch.size |
(Producer only) The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. |
16384 |
kafka.max.partition.fetch.bytes |
(Consumer only) The maximum amount of data per-partition the server will return. Records are fetched in batches by the consumer. If the first record batch in the first non-empty partition of the fetch is larger than this limit, the batch will still be returned to ensure that the consumer can make progress. |
1048576 |
kafka.group.id |
A unique string that identifies the consumer group this consumer belongs to. |
N/A |
Configure Kafka Connection
If you are running locally or against a standalone machine, configure neo4j.conf
to point to that server:
kafka.bootstrap.servers=localhost:9092
If you are using Confluent Cloud (managed Kafka), you can connect to Kafka as described in the Confluent Cloud section
Decide: Sink, Source, or Both
Configuring neo4j-streams comes in three different parts, depending on your need:
-
Required: Configuring a connection to Kafka
kafka.bootstrap.servers=localhost:9092
Follow one or both subsections according to your use case and need:
Sink
Take data from Kafka and store it in Neo4j (Neo4j as a data consumer) by adding configuration such as:
streams.sink.enabled=true
streams.sink.topic.cypher.my-ingest-topic=MERGE (n:Label {id: event.id}) ON CREATE SET n += event.properties
This will process every message that comes in on my-ingest-topic
with the given cypher statement. When
that cypher statement executes, the event
variable that is referenced will be set to the message received,
so this sample cypher will create a (:Label)
node in the graph with the given ID, copying all of the
properties in the source message.
For full details on what you can do here, see the Sink section of the documentation.
Source
Produce data from Neo4j and send it to a Kafka topic (Neo4j as a data producer) by adding configuration such as:
streams.source.topic.nodes.my-nodes-topic=Person{*}
streams.source.topic.relationships.my-rels-topic=BELONGS-TO{*}
streams.source.enabled=true
streams.source.schema.polling.interval=10000
This will produce all graph nodes labeled (:Person)
on to the topic my-nodes-topic
and all
relationships of type -[:BELONGS-TO]→
to the topic named my-rels-topic
. Further, schema changes will
be polled every 10,000 ms, which affects how quickly the database picks up new indexes/schema changes.
Please note that if not specified a value for streams.source.schema.polling.interval
property then Streams plugin will use
300,000 ms as default.
The expressions Person{*}
and BELONGS-TO{*}
are patterns. You can find documentation on how to change
these in the Patterns section.
For full details on what you can do here, see the Source section of the documentation.
Restart Neo4j
Once the plugin is installed and configured, restarting the database will make it active. If you have configured Neo4j to consume from kafka, it will begin immediately processing messages.
When installing the latest version of the Neo4j Streams plugin into Neo4j 4.x, watching to logs you could find something similar to the following:
These are not errors. They comes from the new Neo4j 4 Configuration System, which warns that it doesn’t recognize those properties. Despite these warnings the plugin will work properly. |
Kafka Connect Plugin
Install the Plugin
Download and install the plugin via Confluent Hub client. See the chapter Kafka Connect Plugin for more details.
Run with Docker
Inside the directory /neo4j-kafka-connect-neo4j-<version>/doc/docker
you’ll find a compose file that allows you to start the whole testing environment.
---
version: '2'
services:
neo4j:
image: neo4j:4.0.3-enterprise
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_kafka_bootstrap_servers: broker:9093
NEO4J_AUTH: neo4j/connect
NEO4J_dbms_memory_heap_max__size: 8G
NEO4J_ACCEPT_LICENSE_AGREEMENT: yes
zookeeper:
image: confluentinc/cp-zookeeper
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-enterprise-kafka
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
expose:
- "9093"
environment:
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093
# workaround if we change to a custom name the schema_registry fails to start
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema_registry:
image: confluentinc/cp-schema-registry
hostname: schema_registry
container_name: schema_registry
depends_on:
- zookeeper
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema_registry
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'
connect:
image: confluentinc/cp-kafka-connect
hostname: connect
container_name: connect
depends_on:
- zookeeper
- broker
- schema_registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR
control-center:
image: confluentinc/cp-enterprise-control-center
hostname: control-center
container_name: control-center
depends_on:
- zookeeper
- broker
- schema_registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Just go inside that folder from the terminal and run the following command:
docker-compose up -d
When the process is terminated you have all the modules up and running:
-
Neo4j
-
Zookeeper
-
Kafka Broker
-
Schema Registry
-
Kafka Connect
-
Kafka Control Center
Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j
as username and
connect
as password (see the docker-compose file to change it).
Configure SINK instance
On the Kafka Connect side only one thing is missing, namely create the SINK instance. So let’s do the following REST call:
curl -X POST http://localhost:8083/connectors \
-H 'Content-Type:application/json' \
-H 'Accept:application/json' \
-d @contrib.sink.avro.neo4j.json
In this case, we are configuring the SINK instance to consume and deliver data in AVRO format.
Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters,
and check the created my-topic
as specified into the contrib.sink.avro.neo4j.json
.
{
"name": "Neo4jSinkConnector",
"config": {
"topics": "my-topic",
"connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
"errors.retry.timeout": "-1",
"errors.retry.delay.max.ms": "1000",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true,
"neo4j.server.uri": "bolt://neo4j:7687",
"neo4j.authentication.basic.username": "neo4j",
"neo4j.authentication.basic.password": "connect",
"neo4j.encryption.enabled": false,
"neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
}
}
The property neo4j.topic.cypher.my-topic
defines each message that will be consumed by the SINK on
the Kafka Connect side, will cause the execution of the specified cypher query on the Neo4j side.
Was this page helpful?