Control results flow with reactive streams

In a reactive flow, consumers dictate the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server.

An example use-case is an application fetching records from a Neo4j server and doing some very time-consuming post-processing on each one. If the server were allowed to push records to the client as soon as it has them available, the client may be overflown with a lot of entries while its processing is still lagging behind. The Reactive API ensures that the receiving side is not forced to buffer arbitrary amounts of data.

The driver’s reactive implementation lives in the reactivestreams sub-package and relies on the reactor-core package from Project Reactor.

The Reactive API is recommended for applications that already work in a reactive programming style, and which have needs that only Reactive workflows can address. For all other cases, the sync and async APIs are recommended.

Install dependencies

To use reactive features, you need to add the relevant dependencies to your project first (refer to Reactor → Reference → Getting reactor).

  1. Add Reactor’s BOM to your pom.xml in a dependencyManagement section. Notice that this is in addition to the regular dependencies section. If a dependencyManagement section already exists in your pom, add only the contents.

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>io.projectreactor</groupId>
                <artifactId>reactor-bom</artifactId>
                <version>2023.0.2</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>
  2. Add the reactor-core dependency to the dependencies section. Notice that the version tag is omitted (it is picked up from Reactor’s BOM).

    <dependency>
        <groupId>io.projectreactor</groupId>
        <artifactId>reactor-core</artifactId>
    </dependency>

Reactive query examples

The basic driver’s concepts are the same as the synchronous case, but queries are run through a ReactiveSession, and the objects related to querying have a reactive counterpart and prefix.

Managed transaction with reactive sessions

A managed transaction .executeRead() example
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<URI for Neo4j database>";
        final String dbUser = "<Username>";
        final String dbPassword = "<Password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(  (1)
                Mono.just(driver.session(  (2)
                    ReactiveSession.class,  (3)
                    SessionConfig.builder().withDatabase("neo4j").build()
                )),
                rxSession -> Mono.fromDirect(rxSession.executeRead(  (4)
                    tx -> Mono
                        .fromDirect(tx.run("UNWIND range (1, 5) AS x RETURN x"))  (5)
                        .flatMapMany(ReactiveResult::records)  (6)
                )),
                ReactiveSession::close  (7)
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();  (8)
            System.out.println(values);
        }
    }
}
1 Flux.usingWhen(resourceSupplier, workerClosure, cleanupFunction) is used to create a new session, run queries using it, and finally close it. It will ensure the resource is alive for the time it is needed, and allows to specify the cleanup operation to undertake at the end.
2 .usingWhen() takes a resource supplier in the form of a Publisher, hence why session creation is wrapped in a Mono.just() call, which spawns a Mono from any value.
3 The session creation is similar to the async case, and the same configuration methods apply. The difference is that the first argument must be ReactiveSession.class, and the return value is a ReactiveSession object.
4 The method ReactiveSession.executeRead() runs a read transaction and returns a Publisher with the callee’s return, which Mono.fromDirect() converts into a Mono.
5 The method tx.run() returns a Publisher<ReactiveResult>, which Mono.fromDirect() converts into a Mono.
6 Before the final result is returned, Mono.flatMapMany() retrieves the records from the result and returns them as a new Flux.
7 The final cleanup closes the session.
8 To show the result of the reactive workflow, .block() waits for the flow to complete so that values can be printed. In a real application you wouldn’t block but rather forward the records publisher to your framework of choice, which would process them in a meaningful way.
You may run several queries within the same reactive session through several calls to executeRead/Write() within the workerClosure.

Implicit transaction with reactive sessions

The following example is very similar to the previous one, except it uses an implicit transaction.

An implicit transaction .run() example
package demo;

import java.util.List;

import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;

import org.neo4j.driver.AuthTokens;
import org.neo4j.driver.Driver;
import org.neo4j.driver.GraphDatabase;
import org.neo4j.driver.Record;
import org.neo4j.driver.SessionConfig;
import org.neo4j.driver.Value;
import org.neo4j.driver.reactivestreams.ReactiveResult;
import org.neo4j.driver.reactivestreams.ReactiveSession;

public class App {

    public static void main(String... args) {
        final String dbUri = "<URI for Neo4j database>";
        final String dbUser = "<Username>";
        final String dbPassword = "<Password>";

        try (var driver = GraphDatabase.driver(dbUri, AuthTokens.basic(dbUser, dbPassword))) {
            driver.verifyConnectivity();

            Flux<Record> records = Flux.usingWhen(
                Mono.just(driver.session(
                    ReactiveSession.class,
                    SessionConfig.builder().withDatabase("neo4j").build()
                )),
                rxSession -> Mono
                    .fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"))
                    .flatMapMany(ReactiveResult::records),
                ReactiveSession::close
            );

            // block for demonstration purposes
            List<Value> values = records.map(record -> record.get("x")).collectList().block();
            System.out.println(values);
        }
    }
}

Always defer session creation

It’s important to remember that in reactive programming a Publisher doesn’t come to life until a Subscriber attaches to it. A Publisher is just an abstract description of your asynchronous process, but it’s only the act of subscribing that triggers the flow of data in the whole chain.

For this reason, always be mindful to make session creation/destruction part of this chain, and not to create sessions separately from the query Publisher chain. Doing so may result in many open sessions, none doing work and all waiting for a Publisher to use them, potentially exhausting the number of available sessions for your application. The previous examples use Flux.usingWhen() to address this.

Bad practice example — session is created but nobody uses it
ReactiveSession rxSession = driver.session(ReactiveSession.class);
Mono<ReactiveResult> rxResult = Mono.fromDirect(rxSession.run("UNWIND range (1, 5) AS x RETURN x"));
// until somebody subscribes to `rxResult`, the Publisher doesn't materialize, but the session is busy!

Glossary

LTS

A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.

Aura

Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.

Cypher

Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.

APOC

Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.

Bolt

Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.

ACID

Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.

eventual consistency

A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.

causal consistency

A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.

NULL

The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with null.

transaction

A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.

backpressure

Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.

transaction function

A transaction function is a callback executed by an executeRead or executeWrite call. The driver automatically re-executes the callback in case of server failure.

Driver

A Driver object holds the details required to establish connections with a Neo4j database.