Example CDC usage in Python

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

import getopt
import json
import sys
import time
from threading import Thread

from neo4j import GraphDatabase


class CDCService:
    def __init__(self, driver, database, start_cursor=None, selectors=None):
        self.driver = driver
        self.database = database
        self.cursor = start_cursor
        if self.cursor is None:
            self.cursor = self.current_change_id()
        self.selectors = selectors

    def apply_change(self, record): (1)
        record_dict = {
            k: record.get(k)
            for k in ('id', 'txId', 'seq', 'event', 'metadata')
        }
        print(json.dumps(record_dict, indent=2, default=repr))

    def query_changes_query(self, tx):
        result = tx.run('CALL cdc.query($cursor, $selectors)', (2)
                        cursor=self.cursor, selectors=self.selectors)
        for record in result:
            try:
                self.apply_change(record) (3)
            except Exception as e:
                print('Error whilst applying change', e)
                break
            self.cursor = record['id'] (4)

    def query_changes(self):
        with self.driver.session(database=self.database) as session:
            session.execute_read(self.query_changes_query)

    def earliest_change_id(self): (5)
        records, _, _ = self.driver.execute_query(
            'CALL cdc.earliest', database_=self.database)
        return records[0]['id']

    def current_change_id(self): (6)
        records, _, _ = self.driver.execute_query(
            'CALL cdc.current', database_=self.database)
        return records[0]['id']

    def run(self):
        while True: (8)
            self.query_changes()
            time.sleep(0.5)


def main(argv):
    # Default values
    address = 'neo4j://localhost:7687'
    database = 'neo4j'
    username = 'neo4j'
    password = 'passw0rd'
    cursor = None

    opts, _ = getopt.getopt(
        argv, 'a:d:u:p:f:',
        ['address=', 'database=', 'username=', 'password=', 'from='])
    for opt, arg in opts:
        if opt in ('-a', '--address'):
            address = arg
        elif opt in ('-d', '--database'):
            database = arg
        elif opt in ('-u', '--username'):
            username = arg
        elif opt in ('-p', '--password'):
            password = arg
        elif opt in ('-f', '--from'):
            cursor = arg

    selectors = [ (7)
        # {'select': 'n'}
    ]

    with GraphDatabase.driver(address, auth=(username, password)) as driver:
        cdc = CDCService(driver, database, cursor, selectors)
        cdc_thread = Thread(target=cdc.run, daemon=True)
        cdc_thread.start()
        cdc_thread.join()


if __name__ == '__main__':
    main(sys.argv[1:])
1 This method is called once for each change event. It should be replaced depending on your use case.
2 This query fetches the changes from the database.
3 Here we call a method once for each change.
4 session.execute_read may retry query_changes_query, for example when there are network issues. Thus query_changes_query must be idempotent. We keep it idempotent by updating the cursor as we go.
5 Use this function to get the earliest available change id.
6 Use this function to get the current change id.
7 Here you can limit the returned changes. The out-commented line would select only node changes and exclude all relationship changes.
8 We call query_changes repeatedly, using the cursor from the previous call.