Example CDC usage in C#

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

using System.CommandLine;
using System.Text.Json;
using Neo4j.Driver;

#pragma warning disable CS4014

namespace Neo4j.CDC.Sample;

static class Program
{
    class CDCSample
    {
        private readonly IDriver _driver;
        private readonly string? _database;
        private volatile string? _from;
        private readonly IEnumerable<object> _selectors;
        private readonly ManualResetEventSlim _event;

        public CDCSample(IDriver driver, string? database, string? from, IEnumerable<object>? selectors)
        {
            _driver = driver ?? throw new ArgumentNullException(nameof(driver));
            _database = database;
            _from = from;
            _selectors = selectors ?? Enumerable.Empty<object>();
            _event = new ManualResetEventSlim();
        }

        private static void ApplyChange(IRecord record) (1)
        {
            var jsonText = JsonSerializer.Serialize(record.Values, new JsonSerializerOptions()
            {
                WriteIndented = true
            });

            Console.WriteLine(jsonText);
        }

        private async Task QueryChanges(CancellationToken cancellation) (2)
        {
            await using var session = _driver.AsyncSession(ConfigureSession(_database));
            var current = await CurrentChangeId();
            await session.ExecuteReadAsync(async tx =>
            {
                var from = _from;
                var result = await tx.RunAsync("CALL db.cdc.query($from, $selectors)", new
                {
                    from,
                    selectors = _selectors,
                });

                var processed = 0;
                await foreach (var record in result.WithCancellation(cancellation))
                {
                    ApplyChange(record); (3)
                    _from = record["id"].As<string>(); (4)
                    processed++;
                }

                if (processed == 0)
                {
                    _from = current; (5)
                }
            });
        }

        private static Action<SessionConfigBuilder> ConfigureSession(string? database)
        {
            return sc =>
            {
                if (!string.IsNullOrEmpty(database))
                {
                    sc.WithDatabase(database);
                }
            };
        }

        private async Task<string> EarliestChangeId() (6)
        {
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.earliest")
                .WithMap(record => record["id"].As<string>())
                .ExecuteAsync();

            return response.Result[0];
        }

        private async Task<string> CurrentChangeId() (7)
        {
            var response = await _driver
                .ExecutableQuery("CALL db.cdc.current")
                .WithMap(record => record["id"].As<string>())
                .ExecuteAsync();

            return response.Result[0];
        }

        public async Task Start(CancellationToken cancellation)
        {
            if (string.IsNullOrEmpty(_from))
            {
                _from = await CurrentChangeId();
            }

            _event.Reset();
            Task.Factory.StartNew(async () =>
            {
                try
                {
                    while (!cancellation.IsCancellationRequested)
                    {
                        await QueryChanges(cancellation);

                        await Task.Delay(TimeSpan.FromMilliseconds(500), cancellation); (8)
                    }
                }
                finally
                {
                    _event.Set();
                }
            }, cancellation, TaskCreationOptions.LongRunning, TaskScheduler.Current);
        }

        public void WaitForExit()
        {
            _event.Wait();
        }
    }

    static async Task<int> Main(string[] args)
    {
        var uriOption = new Option<string>("--address", () => "bolt://localhost:7687", "Bolt URI");
        uriOption.AddAlias("-a");
        var databaseOption = new Option<string?>("--database", () => "", "Database");
        databaseOption.AddAlias("-d");
        var usernameOption = new Option<string>("--username", () => "neo4j", "Username");
        usernameOption.AddAlias("-u");
        var passwordOption = new Option<string>("--password", () => "passw0rd", "Password");
        passwordOption.AddAlias("-p");
        var fromOption = new Option<string?>("--from", () => null, "Change identifier to query changes from");
        fromOption.AddAlias("-f");

        var cmd = new RootCommand("Sample CDC application");
        cmd.AddOption(uriOption);
        cmd.AddOption(databaseOption);
        cmd.AddOption(usernameOption);
        cmd.AddOption(passwordOption);
        cmd.AddOption(fromOption);

        cmd.SetHandler(ctx =>
        {
            var cancellation = ctx.GetCancellationToken();
            var uri = ctx.ParseResult.GetValueForOption(uriOption);
            var database = ctx.ParseResult.GetValueForOption(databaseOption);
            var username = ctx.ParseResult.GetValueForOption(usernameOption);
            var password = ctx.ParseResult.GetValueForOption(passwordOption);
            var from = ctx.ParseResult.GetValueForOption(fromOption);

            DoRootCommand(cancellation, uri!, username!, password!, database, from)
                .Wait(cancellation);
        });

        return await cmd.InvokeAsync(args);
    }

    private static async Task DoRootCommand(CancellationToken cancellation, string uri, string username,
        string password,
        string? database, string? from)
    {
        try
        {
            var selectors = new List<object>
            {
                // new (9)
                // {
                //     select = "n", labels = new[] { "Person", "Employee" }
                // },
            };
            await using var driver = GraphDatabase.Driver(uri, AuthTokens.Basic(username, password));
            var service = new CDCSample(driver, database, from, selectors);

            await service.Start(cancellation);

            await Console.Out.WriteLineAsync("starting...");
            service.WaitForExit();
            await Console.Out.WriteLineAsync("quitting...");
        }
        catch (Exception e)
        {
            await Console.Error.WriteLineAsync("Error: " + e);
        }
    }
}
1 This method is called once for each change event. It should be replaced based on your use case.
2 This method fetches the changes from the database.
3 This method is called once for each change.
4 Note that ExecuteReadAsync may retry failing queries. To avoid seeing the same change twice, update the cursor as the changes are applied.
5 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 Wait for 500 milliseconds so that QueryChanges gets called repeatedly.
9 The results may be filtered to return a subset of changes. The out-commented line would select only node changes that have both Person and Employee labels.