Example CDC usage in Python
This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA). |
import getopt
import json
import sys
import time
from threading import Thread
from neo4j import GraphDatabase
class CDCService:
def __init__(self, driver, database, start_cursor=None, selectors=None):
self.driver = driver
self.database = database
self.cursor = start_cursor
if self.cursor is None:
self.cursor = self.current_change_id()
self.selectors = selectors
def apply_change(self, record): (1)
record_dict = {
k: record.get(k)
for k in ('id', 'txId', 'seq', 'event', 'metadata')
}
print(json.dumps(record_dict, indent=2, default=repr))
def query_changes_query(self, tx):
result = tx.run('CALL cdc.query($cursor, $selectors)', (2)
cursor=self.cursor, selectors=self.selectors)
for record in result:
try:
self.apply_change(record) (3)
except Exception as e:
print('Error whilst applying change', e)
break
self.cursor = record['id'] (4)
def query_changes(self):
with self.driver.session(database=self.database) as session:
session.execute_read(self.query_changes_query)
def earliest_change_id(self): (5)
records, _, _ = self.driver.execute_query(
'CALL cdc.earliest', database_=self.database)
return records[0]['id']
def current_change_id(self): (6)
records, _, _ = self.driver.execute_query(
'CALL cdc.current', database_=self.database)
return records[0]['id']
def run(self):
while True: (8)
self.query_changes()
time.sleep(0.5)
def main(argv):
# Default values
address = 'neo4j://localhost:7687'
database = 'neo4j'
username = 'neo4j'
password = 'passw0rd'
cursor = None
opts, _ = getopt.getopt(
argv, 'a:d:u:p:f:',
['address=', 'database=', 'username=', 'password=', 'from='])
for opt, arg in opts:
if opt in ('-a', '--address'):
address = arg
elif opt in ('-d', '--database'):
database = arg
elif opt in ('-u', '--username'):
username = arg
elif opt in ('-p', '--password'):
password = arg
elif opt in ('-f', '--from'):
cursor = arg
selectors = [ (7)
# {'select': 'n'}
]
with GraphDatabase.driver(address, auth=(username, password)) as driver:
cdc = CDCService(driver, database, cursor, selectors)
cdc_thread = Thread(target=cdc.run, daemon=True)
cdc_thread.start()
cdc_thread.join()
if __name__ == '__main__':
main(sys.argv[1:])
1 | This method is called once for each change event. It should be replaced depending on your use case. |
2 | This query fetches the changes from the database. |
3 | Here we call a method once for each change. |
4 | session.execute_read may retry query_changes_query , for example when there are network issues. Thus query_changes_query must be idempotent. We keep it idempotent by updating the cursor as we go. |
5 | Use this function to get the earliest available change id. |
6 | Use this function to get the current change id. |
7 | Here you can limit the returned changes. The out-commented line would select only node changes and exclude all relationship changes. |
8 | We call query_changes repeatedly, using the cursor from the previous call. |
Was this page helpful?