Change Data Capture and Neo4j Connector for Confluent and Apache Kafka Go GA



We are thrilled to announce a major milestone in Neo4j: the general availability of Change Data Capture (CDC) in Aura Virtual Dedicated Cloud (Enterprise) and Neo4j Enterprise subscriptions and the Neo4j Connector for Confluent and Apache Kafka v5.1 with support for CDC. After an Early Access Program (EAP) with a group of select customers and partners, followed by a successful public beta for both products, we are excited to provide procedures and example code in Go, Java, JavaScript, Python, and C#.

Key Features of CDC in Neo4j 5

CDC enables application developers to filter (using selectors) for specific changes in the graph database — when new labels or relationships are created, updated, or deleted, for example. You can query the transaction log for changes using the procedures (more on this later). Turning CDC on ensures that sufficient information is recorded in the log to provide the context of the change along with some metadata, which can be used for decision-making purposes.

  1. Real-time data changes: Neo4j 5’s CDC feature enables developers to easily capture and process changes to the graph database in real time, ensuring that data in other systems reflects the latest updates or analysis results.
  2. Seamless integration: The CDC procedures integrate seamlessly without requiring changes to the data model.
  3. Selectors enable application developers to easily filter on specific changes to create, delete, or update specific nodes, relationships, and even the metadata. Selectors can also be combined, but each condition must be satisfied to return results (see the documentation to learn more).

Great Use Cases

During the EAP and beta, customers created applications where Neo4j is the authoritative data source. Use cases include:

  • Security systems — Mapping access and entitlement in healthcare, financial, and global organizations and integrating into third-party identity and access management (IAM) systems. CDC helps ensure that changes to entitlement are reflected across the network.
  • Application and data governance — Manage security reviews, auditing, and compliance checks for application ownership, software packages, cloud storage, and permission modes. CDC helps alert when reviews and audits are required.
  • Building entity-resolution pipelines with custom matching rules — CDC avoids having to re-query or rebuild the entire database by recomputing the entity groups where dependencies have changed.
  • Law enforcement — Monitoring people of interest, people on probation, or politicians at risk.
  • Active recommendation systems — In a knowledge graph used by sales and partner organizations, CDC enables new recommendations to be pushed out to third parties when price changes occur in product lines.

Great Resource Kit

Developers have three procedures and example code to build applications quickly in Go, Java, JavaScript, Python, and C#.

The following code snippets are in Python.

  • db.cdc.currentto identify the current change event identifier:
def current_change_id(self):
    records, _, _ = self.driver.execute_query(
        'CALL db.cdc.current', database_=self.database)
    return records[0]['id']
  • db.cdc.earliest to identify the earliest change event identifier:
def earliest_change_id(self):
    records, _, _ = self.driver.execute_query(
        'CALL db.cdc.earliest', database_=self.database)
    return records[0]['id']
  • db.cdc.query returns the change events and metadata that have occurred since the given identifier; may optionally pass in a selector to filter the changes:
def query_changes_query(self, tx):
    current = self.current_change_id()
    result = tx.run('CALL db.cdc.query($cursor, $selectors)',
                    cursor=self.cursor, selectors=self.selectors)
    if result.peek() == None:
        self.cursor = current
    else:
        for record in result:
            try:
                self.apply_change(record)
            except Exception as e:
                print('Error whilst applying change', e)
                break
            self.cursor = record['id']

CDC Enrichment Modes

Neo4j chose to implement transaction log-based CDC to provide developers with a consistent, robust, and performant means of querying for changes — the same approach used by other enterprise database systems like Oracle and SQL Server.

Change Data Capture under the hood

Neo4j’s CDC provides two enrichment modes to determine what additional information is written to the transaction log to describe the change event to db.cdc.query:

  • FULL— Useful in auditing use cases where you need to see all the information present on an entity before and after it changes.
  • DIFF— Useful for event-based applications where you only need to see the information that changed, such as order changed from in progress to fullfilled.

Enabling CDC

For customers with an Aura Virtual Dedicated Cloud (Enterprise) account, there is a simple toggle for the enrichment mode on your Aura instance.

Enabling CDC in Aura Enterprise

For Neo4j, customers run the appropriate Cypher command:

ALTER DATABASE neo4j SET OPTION txLogEnrichment "DIFF"

OR

CREATE DATABASE master SET OPTION txLogEnrichment "FULL"

Product Improvements Through the EAP and Beta

We collaborated with developers building applications using Neo4j as the authoritative data source during the EAP and beta, which resulted in some great product improvements, including:

  • Selectors are based on the UNION of before and after change events so that if a change causes an entity to move into the scope of the Selector, it is included in the result set. For example, if you want to create a Selector to watch for nodes with the label “HighAlert,” you can specify {select:n, labels:[HighAlert], and the following change will be captured:
MATCH (n:LowAlert) SET n:HighAlert REMOVE n:LowAlert
  • Transaction metadata is included in the change event, enabling the application to take different actions based on the application that makes the change to the database.
  • Improved code examples to update the changeID (cursor) when the query returns no results with a given Selector, which saves the next query scanning the same transactions.

Plus, they helped us find a few bugs, too.

Not Available in CDC

CDC does not provide the following:

  • Support for Selector ‘NOT’- based logic — If you need this type of logic, you will need to implement it within your application. You can easily filter on transaction metadata, so if you can provide information in the transaction that identifies changes you do or do not want to capture, then you can filter on those. If you have multiple applications that make changes, you can easily filter on those.
  • Schema/Index changes are not included in the CDC events.
  • RBAC is not applied to the data in CDC events. An application querying CDC can see all changes. However, RBAC can be used to restrict access to CDC’s procedures (such as what users can perform queries). Please refer to the docs for information on setting this up.
  • Support for DATABASE ALIAS, CDC queries the named transaction log and does not support the concept of the ALIAS which is applied at the connection level.

GA of CDC Strategies for Neo4j Connector for Confluent and Apache Kafka

The Neo4j Connector for Confluent and Apache Kafka 5.1 also has gone GA, which means you can now use a CDC strategy to send change events to Topics:

  • The Neo4j Connector for Confluent is supported in Confluent Cloud — through Custom Connectors. Confluent just extended support from AWS to Azure, and you can find details on supported regions in the Confluent Documentation.
  • The Neo4j Connector for Apache Kafka with the CDC strategy is supported in Apache Kafka-based systems, including Amazon Managed Streaming for Apache Kafka (MSK).

These connectors open up new possibilities for developers building loosely coupled data pipelines that process data through Topics in Kafka-based systems. Key benefits of these connectors using CDC:

  1. Simplified and streamlined data pipelines — The Neo4j Connector for Apache Kafka with strategies for CDC, enables developers to easily create loosely coupled data pipelines, ensuring that data flows seamlessly between Neo4j, Apache Kafka, and other systems consuming data from Neo4j.
  2. Event-driven architecture — Use the power of event-driven architecture by using CDC in Neo4j with Apache Kafka, allowing organizations to respond to changes in real time and enhance overall system responsiveness.

These connectors continue to support the traditional Query* strategy for developers who aren’t using CDC, which means that users in Aura Free and Pro can continue to benefit from these connectors for their projects.

The Query strategy requires developers to define their own Cypher queries to extract changes in the graph based on a change event schema. This requires changes to the schema to record timestamps on nodes and relationships, and implement soft deletes.

Developers familiar with the Neo4j Streams plugin, which was deprecated in favor of the new Neo4j Connector for Confluent and Apache Kafka will be pleased to know that they can set node and relationship patterns in the same way:

// Select all changes on nodes with label :User and only include name and surname properties in the change event
"neo4j.cdc.topic.my-topic.patterns": "(:User{name, surname})"

// Select all changes on :BOUGHT relationships with start nodes of label :User and end nodes of label :Product
"neo4j.cdc.topic.my-topic.patterns": "(:User)-[:BOUGHT]->(:Product)"

While implementing the CDC strategy in the connector, we had to improve the change schema. Previously, all the properties that belonged to a key for an entity were returned from CDC in a flat list. This lost information about which property belonged to which key, so the change schema was nested to preserve the connection between the key <-> property.

The sink strategies have been improved as well. They now preserve message ordering and make full use of dead letter queues/raise exceptions, which means no more lost messages.

The configuration surface of version 5.1 has changed from 5.0.5, so we will be releasing a 5.0.6 version of the connector, which will dump the migrated configuration into the logs to simplify the transition between connectors.

Next Steps

You can enable CDC in the Aura Console on all Aura Virtual Dedicated Cloud (Enterprise) instances. If you don’t have an account, please contact your Neo4j representative or contact us on Neo4j.com. For Docker enthusiasts, just pull the latest Docker image and set the transaction log mode:

ALTER DATABASE neo4j SET OPTION txLogEnrichment "DIFF"

Get going with example code in Go, Java, JavaScript, Python, and C#.

The Neo4j Connector for Confluent is available on the Confluent Hub, and the Neo4j Connector for Apache Kafka is available on Neo4j.com.

If you are only interested in exploring Neo4j 5’s graph capabilities for now, sign up for AuraDB Free (remember that CDC is not included).