Example CDC usage in Java

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

package org.neo4j.example;

import org.neo4j.driver.*;
import org.neo4j.driver.Record;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static picocli.CommandLine.*;

class CDCService implements Closeable {
    private final Driver driver;
    private final String database;
    private final AtomicReference<String> cursor = new AtomicReference<>(null);
    private final ScheduledThreadPoolExecutor scheduler;

    private List<Map<String, Object>> selectors = List.of();

    public CDCService(String uri, String database, String user, String password) {
        driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
        scheduler = new ScheduledThreadPoolExecutor(1);
        this.database = database;
    }

    @Override
    public void close(){
        driver.close();
        scheduler.shutdown();
    }

    public void setSelectors(List<Map<String, Object>> selectors) {
        this.selectors = selectors;
    }

    private void applyChange(Record change) { (1)
        var eventType = change.get("event").asMap().get("eventType");
        var operation = change.get("event").asMap().get("operation");
        System.out.println("Element of type '%s' changed with operation '%s'".formatted(eventType, operation));
        System.out.println(change);
    }

    private synchronized void queryChanges() { (2)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            var current = currentChangeID();
            session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.query($from, $selectors)", Map.of("from", cursor.get(), "selectors", selectors)));
                if (!result.hasNext()) {
                    cursor.set(current); (3)
                } else {
                    while (result.hasNext()) {
                        var change = result.next();
                        applyChange(change); (4)
                        cursor.set(change.get("id").asString()); (5)
                    }
                }
                return cursor.get();
            });
        } catch (Exception e) {
            System.err.println(e.getMessage());
            throw new RuntimeException("Error querying/processing changes.", e);
        }
    }

    private String earliestChangeID() { (6)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.earliest"));
                return result.single().get("id").asString();
            });
        }
    }

    private String currentChangeID() { (7)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL db.cdc.current"));
                return result.single().get("id").asString();
            });
        }
    }

    public void start(String from) {
        cursor.set(from == null ? currentChangeID() : from);
        scheduler.scheduleWithFixedDelay(this::queryChanges, 0, 500, TimeUnit.MILLISECONDS); (8)
        Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
    }


    public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
        scheduler.awaitTermination(timeout, unit);
    }
}

@Command(name = "Neo4j CDC example usage", mixinStandardHelpOptions = true, version = "1.0", description = "Connects to neo4j and queries for change events through cdc procedures.")
public class Example implements Callable<Integer> {

    @Option(names = {"-a", "--address"}, description = "Where to find the neo4j instance to connect to")
    private String address = "bolt://localhost:7687";
    @Option(names = {"-d", "--database"}, description = "Which database to access")
    private String database = "neo4j";
    @Option(names = {"-u", "--username"}, description = "Username for authenticating against the neo4j server")
    private String username = "neo4j";
    @Option(names = {"-p", "--password"}, description = "Password for authenticating against the neo4j server")
    private String password = "passw0rd";
    @Option(names = {"-f", "--from"}, description = "Cursor value to start streaming from")
    private String from = null;


    @Override
    public Integer call() {

        var cdcService = new CDCService(address, database, username, password);
        List<Map<String, Object>> selectors = List.of( (9)
//                Map.of("select", "n")
        );
        cdcService.setSelectors(selectors);
        cdcService.start(from);

        System.out.println("started querying changes...");

        try {
            cdcService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println("quitting...");
        System.out.flush();
        return 0;
    }

    public static void main(String... args) {
        int exitCode = new CommandLine(new Example()).execute(args);
        System.exit(exitCode);
    }
}
1 This method is called once for each change.
2 This query fetches the changes from the database.
3 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
4 A function is called once for each change.
5 Note that executeRead may retry failing queries. To avoid seeing the same change twice, update the cursor as the changes are applied.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 Schedule such that queryChanges gets called repeatedly.
9 The results may be filtered to return a subset of changes. The out-commented line would select only node changes and exclude all relationship changes.