Quick Start

Kafka Connect Neo4j Connector

Install the Connector

Download and install the plugin via Confluent Hub client. See the chapter Kafka Connect Neo4j Connector for more details.

Run with Docker

Inside the directory /neo4j-kafka-connect-neo4j-<version>/doc/docker you’ll find a compose file that allows you to start the whole testing environment.

docker-compose.yml
---
version: '2'
services:
  neo4j:
    image: neo4j:4.0.3-enterprise
    hostname: neo4j
    container_name: neo4j
    ports:
    - "7474:7474"
    - "7687:7687"
    environment:
      NEO4J_kafka_bootstrap_servers: broker:9093
      NEO4J_AUTH: neo4j/connect
      NEO4J_dbms_memory_heap_max__size: 8G
      NEO4J_ACCEPT_LICENSE_AGREEMENT: yes

  zookeeper:
    image: confluentinc/cp-zookeeper
    hostname: zookeeper
    container_name: zookeeper
    ports:
    - "2181:2181"
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  broker:
    image: confluentinc/cp-enterprise-kafka
    hostname: broker
    container_name: broker
    depends_on:
    - zookeeper
    ports:
    - "9092:9092"
    expose:
    - "9093"
    environment:
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://broker:9093,OUTSIDE://localhost:9092
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,OUTSIDE:PLAINTEXT
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9093,OUTSIDE://0.0.0.0:9092
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: broker:9093

      # workaround if we change to a custom name the schema_registry fails to start
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT

      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_METRIC_REPORTERS: io.confluent.metrics.reporter.ConfluentMetricsReporter
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
      CONFLUENT_METRICS_REPORTER_ZOOKEEPER_CONNECT: zookeeper:2181
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

  schema_registry:
    image: confluentinc/cp-schema-registry
    hostname: schema_registry
    container_name: schema_registry
    depends_on:
    - zookeeper
    - broker
    ports:
    - "8081:8081"
    environment:
      SCHEMA_REGISTRY_HOST_NAME: schema_registry
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: 'zookeeper:2181'

  connect:
    image: confluentinc/cp-kafka-connect
    hostname: connect
    container_name: connect
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    ports:
    - "8083:8083"
    volumes:
    - ./plugins:/tmp/connect-plugins
    environment:
      CONNECT_BOOTSTRAP_SERVERS: 'broker:9093'
      CONNECT_REST_ADVERTISED_HOST_NAME: connect
      CONNECT_REST_PORT: 8083
      CONNECT_GROUP_ID: compose-connect-group
      CONNECT_CONFIG_STORAGE_TOPIC: docker-connect-configs
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_FLUSH_INTERVAL_MS: 10000
      CONNECT_OFFSET_STORAGE_TOPIC: docker-connect-offsets
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_TOPIC: docker-connect-status
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: 'http://schema_registry:8081'
      CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONNECT_PLUGIN_PATH: /usr/share/java,/tmp/connect-plugins
      CONNECT_LOG4J_LOGGERS: org.apache.zookeeper=DEBUG,org.I0Itec.zkclient=DEBUG,org.reflections=ERROR

  control-center:
    image: confluentinc/cp-enterprise-control-center
    hostname: control-center
    container_name: control-center
    depends_on:
    - zookeeper
    - broker
    - schema_registry
    - connect
    ports:
    - "9021:9021"
    environment:
      CONTROL_CENTER_BOOTSTRAP_SERVERS: 'broker:9093'
      CONTROL_CENTER_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      CONTROL_CENTER_CONNECT_CLUSTER: 'connect:8083'
      CONTROL_CENTER_REPLICATION_FACTOR: 1
      CONTROL_CENTER_INTERNAL_TOPICS_PARTITIONS: 1
      CONTROL_CENTER_MONITORING_INTERCEPTOR_TOPIC_PARTITIONS: 1
      CONFLUENT_METRICS_TOPIC_REPLICATION: 1
      PORT: 9021

Just go inside that folder from the terminal and run the following command:

docker-compose up -d

When the process is terminated you have all the modules up and running:

  • Neo4j

  • Zookeeper

  • Kafka Broker

  • Schema Registry

  • Kafka Connect

  • Kafka Control Center

Now you can access your Neo4j instance under: http://localhost:7474, log in with neo4j as username and connect as password (see the docker-compose file to change it).

Configure SINK instance

On the Kafka Connect side only one thing is missing, namely create the SINK instance. So let’s do the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.sink.avro.neo4j.json

In this case, we are configuring the SINK instance to consume and deliver data in AVRO format. Now you can access your Confluent Control Center instance under: http://localhost:9021/clusters, and check the created my-topic as specified into the contrib.sink.avro.neo4j.json.

contrib.sink.avro.neo4j.json
{
  "name": "Neo4jSinkConnector",
  "config": {
    "topics": "my-topic",
    "connector.class": "streams.kafka.connect.sink.Neo4jSinkConnector",
    "errors.retry.timeout": "-1",
    "errors.retry.delay.max.ms": "1000",
    "errors.tolerance": "all",
    "errors.log.enable": true,
    "errors.log.include.messages": true,
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.topic.cypher.my-topic": "MERGE (p:Person{name: event.name, surname: event.surname, from: 'AVRO'}) MERGE (f:Family{name: event.surname}) MERGE (p)-[:BELONGS_TO]->(f)"
  }
}

The property neo4j.topic.cypher.my-topic defines each message that will be consumed by the SINK on the Kafka Connect side, will cause the execution of the specified cypher query on the Neo4j side.

Configure SOURCE instance

On the Kafka Connect side only one thing is missing, namely create the SOURCE instance. So let’s do the following REST call:

curl -X POST http://localhost:8083/connectors \
  -H 'Content-Type:application/json' \
  -H 'Accept:application/json' \
  -d @contrib.source.avro.neo4j.json

This will create a Kafka Connect Source instance that will send AVRO message over the topic named my-topic.

contrib.source.avro.neo4j.json
{
  "name": "Neo4jSourceConnector",
  "config": {
    "topic": "my-topic",
    "connector.class": "streams.kafka.connect.source.Neo4jSourceConnector",
    "key.converter": "io.confluent.connect.avro.AvroConverter",
    "value.converter": "io.confluent.connect.avro.AvroConverter",
    "neo4j.server.uri": "bolt://neo4j:7687",
    "neo4j.authentication.basic.username": "neo4j",
    "neo4j.authentication.basic.password": "connect",
    "neo4j.encryption.enabled": false,
    "neo4j.streaming.poll.interval.msecs": 5000,
    "neo4j.streaming.property": "timestamp",
    "neo4j.streaming.from": "LAST_COMMITTED",
    "neo4j.enforce.schema": true,
    "neo4j.source.query": "MATCH (ts:TestSource) WHERE ts.timestamp > $lastCheck RETURN ts.name AS name, ts.timestamp AS timestamp"
  }
}

The property topic defines where each message will be pushed, the message structure is related is given by RETURN cause of the Cypher statement defined into the neo4j.source.query property. So given the JSON above the structure of the message will be the following:

{"name": <name>, "timestamp": <timestamp>}