Example CDC usage in Java

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

package org.neo4j.example;

import org.neo4j.driver.*;
import org.neo4j.driver.Record;
import picocli.CommandLine;
import picocli.CommandLine.Command;

import java.io.Closeable;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

import static picocli.CommandLine.*;

class CDCService implements Closeable {
    private final Driver driver;
    private final String database;
    private final AtomicReference<String> cursor = new AtomicReference<>(null);
    private final ScheduledThreadPoolExecutor scheduler;

    private List<Map<String, Object>> selectors = List.of();

    public CDCService(String uri, String database, String user, String password) {
        driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
        scheduler = new ScheduledThreadPoolExecutor(1);
        this.database = database;
    }

    @Override
    public void close(){
        driver.close();
        scheduler.shutdown();
    }

    public void setSelectors(List<Map<String, Object>> selectors) {
        this.selectors = selectors;
    }

    private void applyChange(Record change) { (1)
        var eventType = change.get("event").asMap().get("eventType");
        var operation = change.get("event").asMap().get("operation");
        System.out.println("Element of type '%s' changed with operation '%s'".formatted(eventType, operation));
        System.out.println(change);
    }

    private synchronized void queryChanges() { (2)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            session.executeRead(tx -> {
                var result = tx.run(new Query("CALL cdc.query($from, $selectors)", Map.of("from", cursor, "selectors", selectors)));
                while (result.hasNext()) {
                    var change = result.next();
                    applyChange(change); (3)
                    cursor.set(change.get("id").asString()); (4)
                }
                return cursor.get();
            });
        } catch (Exception e) {
            throw new RuntimeException("Error querying/processing changes.", e);
        }
    }

    private String earliestChangeID() { (5)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL cdc.earliest"));
                return result.single().get("id").asString();
            });
        }
    }

    private String currentChangeID() { (6)
        try (var session = driver.session(SessionConfig.forDatabase(database))) {
            return session.executeRead(tx -> {
                var result = tx.run(new Query("CALL cdc.current"));
                return result.single().get("id").asString();
            });
        }
    }

    public void start(String from) {
        cursor.set(from == null ? currentChangeID() : from);
        scheduler.scheduleWithFixedDelay(this::queryChanges, 0, 500, TimeUnit.MILLISECONDS); (7)
        Runtime.getRuntime().addShutdownHook(new Thread(scheduler::shutdownNow));
    }


    public void awaitTermination(int timeout, TimeUnit unit) throws InterruptedException {
        scheduler.awaitTermination(timeout, unit);
    }
}

@Command(name = "Neo4j CDC example usage", mixinStandardHelpOptions = true, version = "1.0", description = "Connects to neo4j and queries for change events through cdc procedures.")
public class Example implements Callable<Integer> {

    @Option(names = {"-a", "--address"}, description = "Where to find the neo4j instance to connect to")
    private String address = "bolt://localhost:7687";
    @Option(names = {"-d", "--database"}, description = "Which database to access")
    private String database = "neo4j";
    @Option(names = {"-u", "--username"}, description = "Username for authenticating against the neo4j server")
    private String username = "neo4j";
    @Option(names = {"-p", "--password"}, description = "Password for authenticating against the neo4j server")
    private String password = "passw0rd";
    @Option(names = {"-f", "--from"}, description = "Cursor value to start streaming from")
    private String from = null;


    @Override
    public Integer call() {

        var cdcService = new CDCService(address, database, username, password);
        List<Map<String, Object>> selectors = List.of( (8)
//                Map.of("select", "n")
        );
        cdcService.setSelectors(selectors);
        cdcService.start(from);

        System.out.println("started querying changes...");

        try {
            cdcService.awaitTermination(1, TimeUnit.DAYS);
        } catch (InterruptedException e) {
            throw new RuntimeException(e);
        }

        System.out.println("quitting...");
        System.out.flush();
        return 0;
    }

    public static void main(String... args) {
        int exitCode = new CommandLine(new Example()).execute(args);
        System.exit(exitCode);
    }
}
1 This method is called once for each change event. It should be replaced depending on your use case.
2 This query fetches the changes from the database.
3 Here we call a method once for each change.
4 Note that executeRead may retry failing queries. In order to avoid seeing the same change twice, we update the cursor as we apply the changes.
5 Use this function to get the earliest available change id.
6 Use this function to get the current change id.
7 Here we schedule such that queryChanges gets called repeatedly.
8 Here you can limit the returned changes. The out-commented line would select only node changes and exclude all relationship changes.