Quick Start
Kafka Connect Neo4j Connector
In this quick start guide, we will use Docker Compose to create an environment with Confluent and Neo4j running inside Docker.
The Neo4j Connector will be configured with a source instance which will query for changes when nodes of specific label are created in a Neo4j database and publish the changes into a Kafka topic named my-topic.
Next, we will create a sink instance which will listen for messages in my-topic
, and apply a Cypher statement to create a new set of nodes and relationships in Neo4j when the messages are received.
The result is Neo4j database being used as a source of events for Kafka topics that act as a sink for the same Neo4j database so that you can see the connector acting as both a source and sink for Neo4j databases.
The following guide uses the Kafka Connect Neo4j Connector with Confluent, the Docker Compose file will need to be modified to work with Apache Kafka. |
Run with Docker
Copy the following Docker Compose file into a desired directory.
---
version: '2'
services:
neo4j:
image: neo4j:5-enterprise
hostname: neo4j
container_name: neo4j
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/password
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_server_memory_heap_max__size: "4G"
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
broker:
image: confluentinc/cp-server:7.3.0
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
schema-registry:
image: confluentinc/cp-schema-registry:7.3.0
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
connect:
image: cnfldemos/cp-server-connect-datagen:0.6.0-7.3.0
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.3.0.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
command:
- bash
- -c
- |
confluent-hub install --no-prompt neo4j/kafka-connect-neo4j:latest
/etc/confluent/docker/run
control-center:
image: confluentinc/cp-enterprise-control-center:7.3.0
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_KSQL_KSQLDB1_URL: "http://ksqldb-server:8088"
CONTROL_CENTER_KSQL_KSQLDB1_ADVERTISED_URL: "http://localhost:8088"
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
Just go inside that directory (where the Docker Compose file resides) from the terminal and run the following command:
docker compose up -d
When the process completes you should have all the modules up and running. You can check the status of all services using the following command:
docker compose ps
which should return a table like following, stating that every service is up and running.
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect "bash -c '# confluen…" connect running 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp
neo4j "tini -g -- /startup…" neo4j running 0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j
as username and password
as password (Refer to the Docker Compose file to change it).
Confirm that you can access the Confluent Control Center instance at http://localhost:9021/clusters and that the Cluster reports as healthy (this may take 90-120s).
You should have one Broker, several Topics and one Connect cluster in the Control Center.
Configure Source instance
First up, we need to set up Neo4j as a source database that will provide messages for topics.
Pick one of the following message serialization formats given below, save the content of the provided file into a local directory, named as source.neo4j.json
.
We will now create the source instance by invoking the the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.neo4j.json
This will create a Kafka Connect source instance that will send messages over to the topic named my-topic
, using your preferred serialization format.
In Control Center, confirm that the source connector has been created in the Connect tab, under connect-default.
The property topic
defines where each message will be pushed, the message structure is defined by the RETURN
cause of the Cypher statement in the neo4j.source.query
property.
So given the above configuration, the structure of the message will be the following (serialized as a your preferred serialization format):
{"name": <name>, "surname": <surname>, "timestamp": <timestamp>}
Now that you have a running source instance, you can create the following nodes in Neo4j:
CREATE (:TestSource {name: 'john', surname: 'doe', timestamp: datetime().epochMillis});
CREATE (:TestSource {name: 'mary', surname: 'doe', timestamp: datetime().epochMillis});
CREATE (:TestSource {name: 'jack', surname: 'small', timestamp: datetime().epochMillis});
which will result in there new messages being published into the my-topic
topic.
Configure Sink instance
To get the messages created inside my-topic
sinked back to the Neo4j database as nodes and relationships, we need to create a sink instance, which will consume messages from my-topic
topic and execute a defined Cypher statement for each of the consumed message.
First, save the following JSON file into a local directory named as sink.neo4j.json
.
We will now create the sink instance by invoking the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @sink.neo4j.json
This configures the sink instance to consume data in your preferred serialization format.
The property neo4j.topic.cypher.my-topic
defines the Cypher query that will be executed per each message consumed by the sink instance on the Kafka Connect side.
Testing It Out
Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and check the created my-topic
as specified in the connector configuration, as well as the source and sink connector instances are running under Connect, connect-default.
With both source and sink connectors running, the previously created :TestSource
nodes will result in messages being published into the my-topic
topic by the source instance.
These messages will then be consumed by the sink instance, and corresponding :Person
and :Family
nodes to be created inside Neo4j.
Check that this is the case, by executing the following query in the Neo4j Browser at http://localhost:7474/browser/:
MATCH (n:(Person | Family)) RETURN n
You can create more Person and Family nodes by executing more statements like:
CREATE (:TestSource {name: 'Ann', surname: 'Bolin', timestamp: datetime().epochMillis});
Summary
In this Quick Start, we have shown how to configure a Neo4j database to act as both the source of messages for Confluent topics and the sink for those same messages from via topics to create new nodes and relationships in the database. Typically, our connector is used as either a sink when pulling data from other data sources via Apache Kafka or Confluent or as source for Apache Kafka or Confluent to push data into other databases.