4.3. Reactive Sessions

This section describes reactive sessions, transactions executed within a reactive session and the results thereof.

Available in .NET, Java and JavaScript

Starting with Neo4j 4.0, the reactive processing of queries is supported. This can be achieved through reactive sessions. Reactive sessions allow for dynamic management of the data that is being exchanged between the driver and the server.

Typical of reactive programming, consumers control the rate at which they consume records from queries and the driver in turn manages the rate at which records are requested from the server. Flow control is supported throughout the entire Neo4j stack, meaning that the query engine responds correctly to the flow control signals. This results in far more efficient resource handling and ensures that the receiving side is not forced to buffer arbitrary amounts of data.

For more information about reactive stream, please see the following:

Reactive sessions will typically be used in a client application that is already oriented towards the reactive style; it is expected that a reactive dependency or framework is in place.

Refer to Chapter 1, Get started for more information on recommended dependencies.

4.3.1. Lifecycle

Session lifetime begins with session construction. A session then exists until it is closed, which is typically set to occur after its contained query results have been consumed.

4.3.2. Transaction functions

This form of transaction requires minimal boilerplate code and allows for a clear separation of database queries and application logic. Transaction functions are also desirable since they encapsulate retry logic and allow for the greatest degree of flexibility when swapping out a single instance of server for a cluster.

Functions can be called as either read or write operations. This choice will route the transaction to an appropriate server within a clustered environment. If you are in a single instance environment, this routing has no impact but it does give you the flexibility should you choose to later adopt a clustered environment.

Before writing a transaction function it is important to ensure that any side-effects carried out by a transaction function should be designed to be idempotent. This is because a function may be executed multiple times if initial runs fail.

Any query results obtained within a transaction function should be consumed within that function, as connection-bound resources cannot be managed correctly when out of scope. To that end, transaction functions can return values but these should be derived values rather than raw results.

When a transaction fails, the driver retry logic is invoked. For several failure cases, the transaction can be immediately retried against a different server. These cases include connection issues, server role changes (e.g. leadership elections) and transient errors. Retry logic can be configured when creating a session.

Example 4.9. Reactive transaction functions

Reactive transaction function. 

public IObservable<string> PrintAllProducts()
{
    var session = Driver.RxSession();

    return session.ReadTransaction(tx =>
    {
        return tx.Run(
                "MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
                new {id = 0} // Parameters in the query, if any
            )
            .Records()
            .Select(record => record[0].ToString());
    }).OnErrorResumeNext(session.Close<string>());
}

Reactive transaction function. 

public Flux<ResultSummary> printAllProducts()
{
    String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
    Map<String,Object> parameters = Collections.singletonMap( "id", 0 );

    return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
            session -> session.readTransaction( tx -> {
                RxResult result = tx.run( query, parameters );
                return Flux.from( result.records() )
                        .doOnNext( record -> System.out.println( record.get( 0 ).asString() ) ).then( Mono.from( result.consume() ) );
            }
         ), RxSession::close );
}

Reactive transaction function. 

const session = driver.rxSession()
const result = session.readTransaction(tx =>
  tx
    .run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', { id: 0 })
    .records()
    .pipe(
      map(r => r.get(0)),
      materialize(),
      toArray()
    )
)

Sessions can be configured in a number of different ways. This is carried out by supplying configuration inside the session constructor. See Section 4.4, “Session configuration” for more details.

4.3.3. Auto-commit transactions

An auto-commit transaction is a basic but limited form of transaction. Such a transaction consists of only one Cypher query and is not automatically replayed on failure. Therefore any error scenarios will need to be handled by the client application itself.

Auto-commit transactions are intended to be used for simple use cases such as when learning Cypher or writing one-off scripts. It is not recommended to use auto-commit transactions in production environments.

The only way to execute PERIODIC COMMIT Cypher queries is to auto-commit the transaction. Unlike other kinds of Cypher query, PERIODIC COMMIT queries do not participate in the causal chain. Please refer to the Cypher Manual → PERIODIC COMMIT query hint.

Example 4.10. Auto-commit transactions

Auto-commit transactions. 

public IObservable<string> ReadProductTitles()
{
    var session = Driver.RxSession();

    return session.Run(
            "MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher query
            new {id = 0} // Parameters in the query, if any
        )
        .Records()
        .Select(record => record[0].ToString())
        .OnErrorResumeNext(session.Close<string>());
}

Auto-commit transactions. 

public Flux<String> readProductTitles()
{
    String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
    Map<String,Object> parameters = Collections.singletonMap( "id", 0 );

    return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
            session -> Flux.from( session.run( query, parameters ).records() ).map( record -> record.get( 0 ).asString() ),
            RxSession::close );
}

Auto-commit transactions. 

function readProductTitles () {
  const session = driver.rxSession()
  return session
    .run('MATCH (p:Product) WHERE p.id = $id RETURN p.title', {
      id: 0
    })
    .records()
    .pipe(
      map(r => r.get(0)),
      materialize(),
      toArray()
    )
}

4.3.4. Consuming results

To consume data from a query in a reactive session, a subscriber is required to handle the results that are being returned by the publisher.

Each transaction corresponds to a data flow which supplies the data from the server. Result processing begins when records are pulled from this flow. Only one subscriber may pull data from a given flow.

Example 4.11. Consuming results

Consuming results. 

public IObservable<string> GetPeople()
{
    var session = Driver.RxSession();
    return session.ReadTransaction(tx =>
    {
        return tx.Run("MATCH (a:Person) RETURN a.name ORDER BY a.name")
            .Records()
            .Select(record => record[0].As<string>());
    }).OnErrorResumeNext(session.Close<string>());
}

Consuming results. 

public Flux<String> getPeople()
{
    String query = "MATCH (a:Person) RETURN a.name ORDER BY a.name";

    return Flux.usingWhen( Mono.fromSupplier( driver::rxSession ),
            session -> session.readTransaction( tx -> {
                        RxResult result = tx.run( query );
                        return Flux.from( result.records() )
                                .map( record -> record.get( 0 ).asString() );
                    }
            ), RxSession::close );
}

Consuming results. 

const session = driver.rxSession()
const result = session
  .run('MATCH (a:Person) RETURN a.name ORDER BY a.name')
  .records()
  .pipe(
    map(r => r.get(0)),
    materialize(),
    toArray()
  )

4.3.5. Cancellation

As per the reactive stream specification, a reactive data flow can be cancelled part way through. This prematurely commits or rolls back the transaction and stops the query engine from producing any more records.