Example CDC usage in Go

This feature has been released as a public beta in AuraDB Enterprise October Release and Neo4j Enterprise Edition 5.13 and breaking changes are likely to be introduced before it is made generally available (GA).

package main

import (
	"context"
	"encoding/json"
	"fmt"
	"log"
	"os"
	"os/signal"
	"sync"
	"sync/atomic"
	"time"

	"github.com/neo4j/neo4j-go-driver/v5/neo4j"
	"github.com/spf13/cobra"
	"github.com/tidwall/pretty"
)

func applyChange(record *neo4j.Record) error { (1)
	jsonOutput, err := json.Marshal(asMap(record))
	if err != nil {
		return fmt.Errorf("unable to jsonify record: %w", err)
	}

	fmt.Println(string(pretty.Color(pretty.Pretty(jsonOutput), pretty.TerminalStyle)))

	return nil
}

func queryChangeID(ctx context.Context, driver neo4j.DriverWithContext, database string, query string) (string, error) {
	result, err := neo4j.ExecuteQuery(ctx, driver, query, nil, neo4j.EagerResultTransformer, neo4j.ExecuteQueryWithDatabase(database), neo4j.ExecuteQueryWithReadersRouting())
	if err != nil {
		return "", fmt.Errorf("unable to query change identifier: %w", err)
	}

	if len(result.Records) != 1 {
		return "", fmt.Errorf("expected one record, but got %d", len(result.Records))
	}

	id, _, err := neo4j.GetRecordValue[string](result.Records[0], "id")
	if err != nil {
		return "", fmt.Errorf("unable to extract id: %w", err)
	}

	return id, nil
}

func asMap(record *neo4j.Record) map[string]any {
	result := make(map[string]any, len(record.Keys))

	for i := 0; i < len(record.Keys); i++ {
		result[record.Keys[i]] = record.Values[i]
	}

	return result
}

type CDCService struct {
	driver    neo4j.DriverWithContext
	database  string
	waitGroup sync.WaitGroup
	cursor    atomic.Pointer[string]
	selectors []any
}

func (s *CDCService) queryChanges(ctx context.Context) error { (2)
	session := s.driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: s.database})

	current, err := s.currentChangeID(ctx)
	if err != nil {
		return err
	}
	_, err = session.ExecuteRead(ctx, func(tx neo4j.ManagedTransaction) (any, error) {
		result, err := tx.Run(ctx, "CALL cdc.query($from, $selectors)", map[string]any{
			"from":      s.from(),
			"selectors": s.selectors,
		})
		if err != nil {
			return "", err
		}

		var record *neo4j.Record
		if !result.Peek(ctx) {
			s.setFrom(current) (3)
		} else {
			for result.NextRecord(ctx, &record) {
				err := applyChange(record) (4)
				if err != nil {
					return "", fmt.Errorf("error processing record: %w", err)
				}

				id, isNil, err := neo4j.GetRecordValue[string](record, "id")
				if err != nil || isNil {
					return "", fmt.Errorf("missing or invalid id value returned")
				}
				s.setFrom(id) (5)
			}
		}

		return s.from(), nil
	})
	if err != nil {
		return fmt.Errorf("unable to query/process changes: %w", err)
	}

	return nil
}

func (s *CDCService) earliestChangeID(ctx context.Context) (string, error) { (6)
	return queryChangeID(ctx, s.driver, s.database, "CALL cdc.earliest()")
}

func (s *CDCService) currentChangeID(ctx context.Context) (string, error) { (7)
	return queryChangeID(ctx, s.driver, s.database, "CALL cdc.current()")
}

func (s *CDCService) from() string {
	return *s.cursor.Load()
}

func (s *CDCService) setFrom(from string) {
	s.cursor.Store(&from)
}

func (s *CDCService) Start(ctx context.Context) error {
	if s.from() == "" {
		current, err := s.currentChangeID(ctx)
		if err != nil {
			return err
		}
		s.setFrom(current)
	}

	s.waitGroup.Add(1)
	go func(ctx context.Context) {
		defer func() {
			s.waitGroup.Done()
		}()

		timer := time.NewTimer(0 * time.Millisecond)
		for {
			select {
			case <-ctx.Done():
				return
			case <-timer.C:
				{
					err := s.queryChanges(ctx)
					if err != nil {
						log.Printf("error querying/processing changes: %v", err)
						return
					}

					timer.Reset(500 * time.Millisecond) (8)
				}
			}
		}
	}(ctx)

	return nil
}

func (s *CDCService) WaitForExit() {
	s.waitGroup.Wait()
}

func NewCDCService(uri string, username string, password string, database string, from string, selectors []any) (*CDCService, error) {
	driver, err := neo4j.NewDriverWithContext(uri, neo4j.BasicAuth(username, password, ""))
	if err != nil {
		return nil, fmt.Errorf("unable to create driver: %w", err)
	}

	cdc := &CDCService{
		driver:    driver,
		database:  database,
		waitGroup: sync.WaitGroup{},
		cursor:    atomic.Pointer[string]{},
		selectors: selectors,
	}
	cdc.setFrom(from)

	return cdc, nil
}

var (
	address  string
	database string
	username string
	password string
	from     string
)

func main() {
	rootCmd := &cobra.Command{
		Run: func(cmd *cobra.Command, args []string) {
			ctx, _ := signal.NotifyContext(context.Background(), os.Interrupt)

			selectors := []any{
				//map[string]any{"select": "n", "labels": []string{"Person", "Employee"}}, (9)
			}

			cdc, err := NewCDCService(address, username, password, database, from, selectors)
			if err != nil {
				log.Fatal(err)
			}

			if err := cdc.Start(ctx); err != nil {
				log.Fatal(err)
			}

			fmt.Printf("starting...\n")
			cdc.WaitForExit()
			fmt.Printf("quitting...\n")
		},
	}

	rootCmd.Flags().StringVarP(&address, "address", "a", "bolt://localhost:7687", "Bolt URI")
	rootCmd.Flags().StringVarP(&database, "database", "d", "", "Database")
	rootCmd.Flags().StringVarP(&username, "username", "u", "neo4j", "Username")
	rootCmd.Flags().StringVarP(&password, "password", "p", "passw0rd", "Password")
	rootCmd.Flags().StringVarP(&from, "from", "f", "", "Change identifier to query changes from")

	cobra.CheckErr(rootCmd.Execute())
}
1 This method is called once for each change.
2 This function fetches the changes from the database.
3 The cursor is moved forward to keep it up-to-date. This may not be necessary in your use case. See Cursor Management for details.
4 A function is called once for each change.
5 Note that ExecuteRead may retry failing queries. To avoid seeing the same change twice, update the cursor as the changes are applied.
6 Use this function to get the earliest available change id.
7 Use this function to get the current change id.
8 The timer is reset so that queryChanges gets called repeatedly.
9 The results may be filtered to return a subset of changes. The out-commented line would select only node changes that have both Person and Employee labels.