Example CDC usage in Python

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

import getopt
import json
import sys
import time
from threading import Thread

from neo4j import GraphDatabase


class CDCService:
    def __init__(self, driver, database, start_cursor=None, selectors=None):
        self.driver = driver
        self.database = database
        self.cursor = start_cursor
        if self.cursor is None:
            self.cursor = self.current_change_id()
        self.selectors = selectors

    def apply_change(self, record): (1)
        record_dict = {
            k: record.get(k)
            for k in ('id', 'txId', 'seq', 'event', 'metadata')
        }
        print(json.dumps(record_dict, indent=2, default=repr))

    def query_changes_query(self, tx):
        current = self.current_change_id()
        result = tx.run('CALL db.cdc.query($cursor, $selectors)', (2)
                        cursor=self.cursor, selectors=self.selectors)
        if result.peek() == None:
            self.cursor = current (3)
        else:
            for record in result:
                try:
                    self.apply_change(record) (4)
                except Exception as e:
                    print('Error whilst applying change', e)
                    break
                self.cursor = record['id'] (5)

    def query_changes(self):
        with self.driver.session(database=self.database) as session:
            session.execute_read(self.query_changes_query)

    def earliest_change_id(self): (6)
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.earliest', database_=self.database)
        return records[0]['id']

    def current_change_id(self): (7)
        records, _, _ = self.driver.execute_query(
            'CALL db.cdc.current', database_=self.database)
        return records[0]['id']

    def run(self):
        while True: (9)
            self.query_changes()
            time.sleep(0.5)


def main(argv):
    # Default values
    address = 'neo4j://localhost:7687'
    database = 'neo4j'
    username = 'neo4j'
    password = 'passw0rd'
    cursor = None

    opts, _ = getopt.getopt(
        argv, 'a:d:u:p:f:',
        ['address=', 'database=', 'username=', 'password=', 'from='])
    for opt, arg in opts:
        if opt in ('-a', '--address'):
            address = arg
        elif opt in ('-d', '--database'):
            database = arg
        elif opt in ('-u', '--username'):
            username = arg
        elif opt in ('-p', '--password'):
            password = arg
        elif opt in ('-f', '--from'):
            cursor = arg

    selectors = [ (8)
        # {'select': 'n'}
    ]

    with GraphDatabase.driver(address, auth=(username, password)) as driver:
        cdc = CDCService(driver, database, cursor, selectors)
        cdc_thread = Thread(target=cdc.run, daemon=True)
        cdc_thread.start()
        cdc_thread.join()


if __name__ == '__main__':
    main(sys.argv[1:])
1 This method is called once for each change event. It should be replaced depending on your use case.
2 This query fetches the changes from the database.
3 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
4 This method is called once for each change.
5 session.execute_read may retry query_changes_query when there are network issues, for example. To avoid seeing the same change twice, update the cursor as the changes are applied.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 The results may be filtered to return a subset of changes. The out-commented line would select only node changes and exclude all relationship changes.
9 Call query_changes repeatedly, using the cursor from the previous call.