Quickstart for Confluent Platform
Overview
We will use Docker Compose to create an environment with Confluent Platform components and Neo4j running inside Docker.
The Neo4j Connector for Kafka will first be configured with a source instance.
The source will retrieve changes from CREATE
, UPDATE
and DELETE
operations for the node pattern (:TestSource)
.
Received changes will then be published into creates
, updates
and deletes
topics based on the operation.
Next, we will create a sink instance which will listen for messages in creates
, updates
and deletes
topics, and execute a Cypher statement to apply the corresponding change in Neo4j when the messages are received.
The following guide uses the Confluent Platform docker images. |
Run with Docker Compose
Copy the following Docker Compose file into a desired directory.
The sample docker-compose.yml file below makes use of recent features of docker compose and requires a recent version of docker compose. Please make sure you have at least v2.20.3 version of the tool. |
---
services:
neo4j:
image: neo4j:5-enterprise
hostname: neo4j
container_name: neo4j
# this is to ensure you have the latest 5.x version of the database
pull_policy: always
ports:
- "7474:7474"
- "7687:7687"
environment:
NEO4J_AUTH: neo4j/password
NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
NEO4J_server_memory_heap_max__size: "4G"
healthcheck:
test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
start_period: 2m
start_interval: 10s
interval: 30s
timeout: 10s
retries: 5
zookeeper:
image: confluentinc/cp-zookeeper:7.5.2
hostname: zookeeper
container_name: zookeeper
ports:
- "2181:2181"
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "2181" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
broker:
image: confluentinc/cp-server:7.5.2
hostname: broker
container_name: broker
depends_on:
- zookeeper
ports:
- "9092:9092"
- "9101:9101"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
KAFKA_JMX_PORT: 9101
KAFKA_JMX_HOSTNAME: localhost
KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
CONFLUENT_METRICS_ENABLE: 'true'
CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "9092" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
schema-registry:
image: confluentinc/cp-schema-registry:7.5.2
hostname: schema-registry
container_name: schema-registry
depends_on:
- broker
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "8081" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
connect:
image: confluentinc/cp-server-connect:7.5.2
hostname: connect
container_name: connect
depends_on:
- broker
- schema-registry
ports:
- "8083:8083"
volumes:
- ./plugins:/tmp/connect-plugins
environment:
CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
CONNECT_REST_ADVERTISED_HOST_NAME: connect
CONNECT_GROUP_ID: compose-connect-group
CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
# CLASSPATH required due to CC-2422
CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.2.jar
CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
healthcheck:
test: [ "CMD", "nc", "-z", "localhost", "8083" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
control-center:
image: confluentinc/cp-enterprise-control-center:7.5.2
hostname: control-center
container_name: control-center
depends_on:
- broker
- schema-registry
- connect
ports:
- "9021:9021"
environment:
CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
CONTROL_CENTER_REPLICATION_FACTOR: 1
CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
CONFLUENT_METRICS_TOPIC_REPLICATION: 1
PORT: 9021
healthcheck:
test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
start_period: 5m
start_interval: 10s
interval: 1m
timeout: 10s
retries: 5
Copy the following Neo4j Connector for Kafka artifacts into a directory named plugins
in the same directory as your docker-compose.yml
file.
The directory structure should look like;
quickstart/
├─ plugins/
│ ├─ neo4j-kafka-connect-5.1.8.jar
├─ docker-compose.yml
Open a terminal, proceed to the Docker Compose file’s directory and run:
docker compose up -d
When the process completes you should have all the modules up and running. You can check the status of all services as follows:
docker compose ps
This should return a table that shows every service is up and running.
NAME COMMAND SERVICE STATUS PORTS
broker "/etc/confluent/dock…" broker running 0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect "bash -c '# confluen…" connect running 0.0.0.0:8083->8083/tcp, 9092/tcp
control-center "/etc/confluent/dock…" control-center running 0.0.0.0:9021->9021/tcp
neo4j "tini -g -- /startup…" neo4j running 0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry "/etc/confluent/dock…" schema-registry running 0.0.0.0:8081->8081/tcp
zookeeper "/etc/confluent/dock…" zookeeper running 2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp
Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j
as username and password
as password (Update NEO4J_AUTH
environment variable in the Docker Compose file to change it).
Confirm that you can access the Confluent Control Center instance at http://localhost:9021/clusters and that the Cluster reports as healthy (this may take 90-120s).
You should have one Broker, several Topics and one Connect cluster in the Control Center.
Enable CDC
Enable Change Data Capture on the source database by executing the following Cypher command. For more information on Change Data Capture and enabling it, please refer to Change Data Capture > Enable CDC > Neo4j DBMS for on-prem installations and Change Data Capture > Enable CDC > Aura for Aura.
ALTER DATABASE neo4j SET OPTION txLogEnrichment 'FULL';
Source with CDC
First up, we need to set up Neo4j as a source database that will provide messages for topics.
Pick one of the following message serialization formats, save the content of the provided file into a local directory, named as source.neo4j.json
.
We will now create the source instance by invoking the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @source.neo4j.json
This will create a Kafka Connect source instance that will send change event messages over to the topics named creates
, updates
and deletes
, using your preferred serialization format.
In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.
As illustrated, you can configure multiple patterns to read changes for, and have them published to topics of your choice.
So given the above configuration, the connector will read changes happening on nodes of label TestSource
and the structure of the message will be based on Change Data Capture > Change event schema, serialized according to the configured message format.
The expected change events, based on the type of operation, will have the following structure.
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": null,
"after": {
"properties": {
"name": "<name>",
"surname": "<surname>"
},
"labels": ["TestSource"]
}
},
"operation": "c",
"labels": ["TestSource"]
}
}
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": {
"properties": {
"name": "<old-name>",
"surname": "<old-surname>"
},
"labels": ["TestSource"]
},
"after": {
"properties": {
"name": "<new-name>",
"surname": "<new-surname>"
},
"labels": ["TestSource"]
}
},
"operation": "u",
"labels": ["TestSource"]
}
}
{
"id": "<id>",
"txId": 12,
"seq": 0,
"metadata": {
"executingUser": "neo4j",
"authenticatedUser": "neo4j",
"captureMode": "FULL",
"connectionClient": "127.0.0.1:51320",
"serverId": "<server-id>",
"databaseName": "<database-name>",
"connectionType": "bolt",
"connectionServer": "127.0.0.1:51316",
"txStartTime": "2023-11-03T11:58:30.429Z",
"txCommitTime": "2023-11-03T11:58:30.526Z",
"txMetadata": {}
},
"event": {
"elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
"keys": {},
"eventType": "n",
"state": {
"before": {
"properties": {
"name": "<name>",
"surname": "<surname>"
},
"labels": ["TestSource"]
},
"after": null
},
"operation": "d",
"labels": ["TestSource"]
}
}
Now that you have a running source instance, you can create the following nodes in Neo4j:
CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});
This will result in new messages being published to the topic named creates
.
Sink with Cypher
Having set up the Source connector, the next step is to configure a sink connector that consumes messages published to the creates
, updates
and deletes
topics.
First, save the following JSON file into a local directory named as sink.neo4j.json
.
We will now create the sink instance by invoking the following REST call:
curl -X POST http://localhost:8083/connectors \
-H "Content-Type:application/json" \
-H "Accept:application/json" \
-d @sink.neo4j.json
This configures the sink instance to consume data in your preferred serialization format.
Cypher strategy will build Cypher queries based on Cypher query templates defined by the properties neo4j.cypher.topic.creates
, neo4j.cypher.topic.updates
and neo4j.cypher.topic.deletes
.
Testing It Out
Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and verify at least the creates
topic is created as specified in the connector configuration, as well as the source and sink connector instances are running under Connect, connect-default.
With both source and sink connectors running, the previously created :TestSource
nodes will result in messages being published into the creates
topic by the source instance.
These messages will then be consumed by the sink instance, and corresponding :Person
and :Family
nodes to be created inside Neo4j.
As you create, update and delete the TestSource
labelled nodes, updates
and deletes
topics will also be created.
Check that this is the case, by executing the following query in the Neo4j Browser at http://localhost:7474/browser/:
MATCH (n:(Person | Family)) RETURN n
You can now create, update or delete Person and Family nodes by executing more statements like:
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});
Verify that a new Person
and a new Family
node is created and linked together.
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';
Verify that the existing Person
node is now updated with a surname of smith
and linked to a new Family
node.
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;
Verify that the existing Person
node is now deleted.
Summary
In this Quick Start, we have shown how to configure a Neo4j database to act as both the source of messages for Kafka topics and the sink for those same messages to create, update or delete nodes and relationships in the database. Typically, our connector is used as either a sink when pulling data from other data sources via Apache Kafka or Confluent or as source for Apache Kafka or Confluent to push data into other databases.