Quickstart for Confluent Platform

Overview

We will use Docker Compose to create an environment with Confluent Platform components and Neo4j running inside Docker.

The Neo4j Connector for Kafka will first be configured with a source instance. The source will retrieve changes from CREATE, UPDATE and DELETE operations for the node pattern (:TestSource). Received changes will then be published into creates, updates and deletes topics based on the operation.

Next, we will create a sink instance which will listen for messages in creates, updates and deletes topics, and execute a Cypher statement to apply the corresponding change in Neo4j when the messages are received.

The following guide uses the Confluent Platform docker images.

Run with Docker Compose

Copy the following Docker Compose file into a desired directory.

The sample docker-compose.yml file below makes use of recent features of docker compose and requires a recent version of docker compose. Please make sure you have at least v2.20.3 version of the tool.
docker-compose.yml
---
services:
  neo4j:
    image: neo4j:5-enterprise
    hostname: neo4j
    container_name: neo4j
    # this is to ensure you have the latest 5.x version of the database
    pull_policy: always
    ports:
      - "7474:7474"
      - "7687:7687"
    environment:
      NEO4J_AUTH: neo4j/password
      NEO4J_ACCEPT_LICENSE_AGREEMENT: "yes"
      NEO4J_server_memory_heap_max__size: "4G"
    healthcheck:
      test: [ "CMD", "cypher-shell", "-u", "neo4j", "-p", "password", "RETURN 1" ]
      start_period: 2m
      start_interval: 10s
      interval: 30s
      timeout: 10s
      retries: 5


  zookeeper:
    image: confluentinc/cp-zookeeper:7.5.2
    hostname: zookeeper
    container_name: zookeeper
    ports:
      - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "2181" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  broker:
    image: confluentinc/cp-server:7.5.2
    hostname: broker
    container_name: broker
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
      - "9101:9101"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:29092,PLAINTEXT_HOST://localhost:9092
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_JMX_PORT: 9101
      KAFKA_JMX_HOSTNAME: localhost
      KAFKA_CONFLUENT_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "9092" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  schema-registry:
    image: confluentinc/cp-schema-registry:7.5.2
    hostname: schema-registry
    container_name: schema-registry
    depends_on:
      - broker
    ports:
      - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema-registry
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'broker:29092'
      SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8081" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  connect:
    image: confluentinc/cp-server-connect:7.5.2
    hostname: connect
    container_name: connect
    depends_on:
      - broker
      - schema-registry
    ports:
      - "8083:8083"
    volumes:
      - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:29092'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: org.apache.kafka.connect.storage.StringConverter
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
      # CLASSPATH required due to CC-2422
      CLASSPATH: /usr/share/java/monitoring-interceptors/monitoring-interceptors-7.5.2.jar
      CONNECT_PRODUCER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor"
      CONNECT_CONSUMER_INTERCEPTOR_CLASSES: "io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor"
      CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components,/tmp/connect-plugins"
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=ERROR,org.I0Itec.zkclient=ERROR,org.reflections=ERROR
    healthcheck:
      test: [ "CMD", "nc", "-z", "localhost", "8083" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

  control-center:
    image: confluentinc/cp-enterprise-control-center:7.5.2
    hostname: control-center
    container_name: control-center
    depends_on:
      - broker
      - schema-registry
      - connect
    ports:
      - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:29092'
      CONTROL_CENTER_CONNECT_CONNECT-DEFAULT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_SCHEMA_REGISTRY_URL: "http://schema-registry:8081"
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021
    healthcheck:
      test: [ "CMD", "curl", "-f", "http://localhost:9021" ]
      start_period: 5m
      start_interval: 10s
      interval: 1m
      timeout: 10s
      retries: 5

Copy the following Neo4j Connector for Kafka artifacts into a directory named plugins in the same directory as your docker-compose.yml file. The directory structure should look like;

quickstart/
├─ plugins/
│  ├─ neo4j-kafka-connect-5.1.1.jar
├─ docker-compose.yml

Open a terminal, proceed to the Docker Compose file’s directory and run:

docker compose up -d

When the process completes you should have all the modules up and running. You can check the status of all services as follows:

docker compose ps

This should return a table that shows every service is up and running.

NAME                COMMAND                  SERVICE             STATUS              PORTS
broker              "/etc/confluent/dock…"   broker              running             0.0.0.0:9092->9092/tcp, 0.0.0.0:9101->9101/tcp
connect             "bash -c '# confluen…"   connect             running             0.0.0.0:8083->8083/tcp, 9092/tcp
control-center      "/etc/confluent/dock…"   control-center      running             0.0.0.0:9021->9021/tcp
neo4j               "tini -g -- /startup…"   neo4j               running             0.0.0.0:7474->7474/tcp, 7473/tcp, 0.0.0.0:7687->7687/tcp
schema-registry     "/etc/confluent/dock…"   schema-registry     running             0.0.0.0:8081->8081/tcp
zookeeper           "/etc/confluent/dock…"   zookeeper           running             2888/tcp, 0.0.0.0:2181->2181/tcp, 3888/tcp

Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and password as password (Update NEO4J_AUTH environment variable in the Docker Compose file to change it). Confirm that you can access the Confluent Control Center instance at http://localhost:9021/clusters and that the Cluster reports as healthy (this may take 90-120s). You should have one Broker, several Topics and one Connect cluster in the Control Center.

Enable CDC

Enable Change Data Capture on the source database by executing the following Cypher command. For more information on Change Data Capture and enabling it, please refer to Change Data Capture > Enable CDC > Neo4j DBMS for on-prem installations and Change Data Capture > Enable CDC > Aura for Aura.

ALTER DATABASE neo4j SET OPTION txLogEnrichment 'FULL';

Source with CDC

First up, we need to set up Neo4j as a source database that will provide messages for topics. Pick one of the following message serialization formats, save the content of the provided file into a local directory, named as source.neo4j.json.

{
  "name": "Neo4jSourceConnectorAVRO",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorJSONSchema",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}
{
  "name": "Neo4jSourceConnectorProtobuf",
  "config": {
    "connector.class": "org.neo4j.connectors.kafka.source.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.source-strategy": "CDC",
    "neo4j.start-from": "NOW",
    "neo4j.cdc.poll-interval": "1s",
    "neo4j.cdc.poll-duration": "5s",
    "neo4j.cdc.topic.creates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.creates.patterns.0.operation": "CREATE",
    "neo4j.cdc.topic.updates.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.updates.patterns.0.operation": "UPDATE",
    "neo4j.cdc.topic.deletes.patterns.0.pattern": "(:TestSource)",
    "neo4j.cdc.topic.deletes.patterns.0.operation": "DELETE"
  }
}

We will now create the source instance by invoking the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @source.neo4j.json

This will create a Kafka Connect source instance that will send change event messages over to the topics named creates, updates and deletes, using your preferred serialization format. In Control Center, confirm that the Source connector has been created in the Connect tab, under connect-default.

As illustrated, you can configure multiple patterns to read changes for, and have them published to topics of your choice. So given the above configuration, the connector will read changes happening on nodes of label TestSource and the structure of the message will be based on Change Data Capture > Change event schema, serialized according to the configured message format. The expected change events, based on the type of operation, will have the following structure.

{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": null,
      "after": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "c",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<old-name>",
          "surname": "<old-surname>"
        },
        "labels": ["TestSource"]
      },
      "after": {
        "properties": {
          "name": "<new-name>",
          "surname": "<new-surname>"
        },
        "labels": ["TestSource"]
      }
    },
    "operation": "u",
    "labels": ["TestSource"]
  }
}
{
  "id": "<id>",
  "txId": 12,
  "seq": 0,
  "metadata": {
    "executingUser": "neo4j",
    "authenticatedUser": "neo4j",
    "captureMode": "FULL",
    "connectionClient": "127.0.0.1:51320",
    "serverId": "<server-id>",
    "databaseName": "<database-name>",
    "connectionType": "bolt",
    "connectionServer": "127.0.0.1:51316",
    "txStartTime": "2023-11-03T11:58:30.429Z",
    "txCommitTime": "2023-11-03T11:58:30.526Z",
    "txMetadata": {}
  },
  "event": {
    "elementId": "4:b7e35973-0aff-42fa-873b-5de31868cb4a:1",
    "keys": {},
    "eventType": "n",
    "state": {
      "before": {
        "properties": {
          "name": "<name>",
          "surname": "<surname>"
        },
        "labels": ["TestSource"]
      },
      "after": null
    },
    "operation": "d",
    "labels": ["TestSource"]
  }
}

Now that you have a running source instance, you can create the following nodes in Neo4j:

CREATE (:TestSource {name: 'john', surname: 'doe'});
CREATE (:TestSource {name: 'mary', surname: 'doe'});
CREATE (:TestSource {name: 'jack', surname: 'small'});

This will result in new messages being published to the topic named creates.

Sink with Cypher

Having set up the Source connector, the next step is to configure a sink connector that consumes messages published to the creates, updates and deletes topics.

First, save the following JSON file into a local directory named as sink.neo4j.json.

{
  "name": "Neo4jSinkConnectorCypherAVRO",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherJSONSchema",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}
{
  "name": "Neo4jSinkConnectorCypherProtobuf",
  "config": {
    "topics": "creates,updates,deletes",
    "connector.class": "org.neo4j.connectors.kafka.sink.Neo4jConnector",
    "key.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "key.converter.schemas.enable": true,
    "key.converter.schema.registry.url": "http://schema-registry:8081",
    "key.converter.optional.for.nullables": true,
    "value.converter": "io.confluent.connect.protobuf.ProtobufConverter",
    "value.converter.schemas.enable": true,
    "value.converter.schema.registry.url": "http://schema-registry:8081",
    "value.converter.optional.for.nullables": true,
    "neo4j.uri": "neo4j://neo4j:7687",
    "neo4j.authentication.type": "BASIC",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "password",
    "neo4j.cypher.topic.creates": "WITH __value.event.state.after AS state MERGE (p:Person {name: state.properties.name, surname: state.properties.surname}) MERGE (f:Family {name: state.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.updates": "WITH __value.event.state.before AS before, __value.event.state.after AS after MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) MATCH (fPre:Family {name: before.properties.surname}) OPTIONAL MATCH (p)-[b:BELONGS_TO]->(fPre) DELETE b WITH after, p SET p.name = after.properties.name, p.surname = after.properties.surname MERGE (f:Family {name: after.properties.surname}) MERGE (p)-[:BELONGS_TO]->(f)",
    "neo4j.cypher.topic.deletes": "WITH __value.event.state.before AS before MATCH (p:Person {name: before.properties.name, surname: before.properties.surname}) DETACH DELETE p",
    "neo4j.cypher.bind-header-as": "",
    "neo4j.cypher.bind-key-as": "",
    "neo4j.cypher.bind-value-as": "__value",
    "neo4j.cypher.bind-value-as-event": false
  }
}

We will now create the sink instance by invoking the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H "Content-Type:application/json" \
  -H "Accept:application/json" \
  -d @sink.neo4j.json

This configures the sink instance to consume data in your preferred serialization format. Cypher strategy will build Cypher queries based on Cypher query templates defined by the properties neo4j.cypher.topic.creates, neo4j.cypher.topic.updates and neo4j.cypher.topic.deletes.

Testing It Out

Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and verify at least the creates topic is created as specified in the connector configuration, as well as the source and sink connector instances are running under Connect, connect-default.

With both source and sink connectors running, the previously created :TestSource nodes will result in messages being published into the creates topic by the source instance. These messages will then be consumed by the sink instance, and corresponding :Person and :Family nodes to be created inside Neo4j. As you create, update and delete the TestSource labelled nodes, updates and deletes topics will also be created.

Check that this is the case, by executing the following query in the Neo4j Browser at http://localhost:7474/browser/:

MATCH (n:(Person | Family)) RETURN n

You can now create, update or delete Person and Family nodes by executing more statements like:

Create a new person
CREATE (:TestSource {name: 'Ann', surname: 'Bolin'});

Verify that a new Person and a new Family node is created and linked together.

Update an existing person
MATCH (n:TestSource {name: 'mary', surname: 'doe'}) SET n.surname = 'smith';

Verify that the existing Person node is now updated with a surname of smith and linked to a new Family node.

Delete an existing person
MATCH (n:TestSource {name: 'mary', surname: 'smith'}) DELETE n;

Verify that the existing Person node is now deleted.

Summary

In this Quick Start, we have shown how to configure a Neo4j database to act as both the source of messages for Kafka topics and the sink for those same messages to create, update or delete nodes and relationships in the database. Typically, our connector is used as either a sink when pulling data from other data sources via Apache Kafka or Confluent or as source for Apache Kafka or Confluent to push data into other databases.

Troubleshooting

If you don’t see any messages being published into the creates, updates and deletes topics, or any :Family and :Person nodes created, please check Kafka Connect logs by executing the following command and resolve any issues being reported.

docker compose logs connect