Beta Release: Java Driver with Async API for Neo4j


Learn more about the most recent beta release of the Java driver 1.5.0 for the Neo4j graph database

Introduction


In this article I would like to introduce the new 1.5.0-beta03 pre-release version of the Bolt Java driver for Neo4j which is now built on an asynchronous, Netty-based infrastructure.

Previous versions of the driver used blocking I/O, which meant that the amount of threads needed to handle N concurrent connections was also N. With new non-blocking I/O, the amount of threads can be significantly reduced because one thread can handle multiple network connections in an async fashion. This functionality is exposed through a new set of asynchronous methods, which allow queries and transactions to be executed without blocking.

Asynchronous processing of results is especially valuable in environments where code should block as little as possible, like Akka actors or Spring Data reactive repositories.

One important thing to note is that starting from 1.5, the Neo4j Java driver will require Java 8. The decision to increment required Java versions was made in order to use the existing async programming APIs and interfaces, like CompletionStage and CompletableFuture present only starting from Java 8. They are now used in async API calls, like Session#runAsync(), Transaction#runAsync(), Session#readTransactionAsync(), etc. The previous driver version 1.4 still requires Java 7 and will remain maintained.

Async API


This section describes the new async APIs present in the 1.5.0-beta03 Java driver version. It does not discuss blocking API counterparts, please refer to the Neo4j Developer Manual for more details. The blocking API has been re-implemented on top of the async API and so shares the underlying infrastructure.

Driver Initialization


The main entry point of the driver API remains unchanged, it is the GraphDatabase class and can be used to create a driver like this:

import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;

Driver driver = GraphDatabase.driver("bolt://localhost", 
                                     AuthTokens.basic("neo4j", "test"));

Driver is a thread-safe, application-wide object from which all Neo4j interactions derive.

Sessions


The Driver instance should be used to obtain Session instances which allow for running queries and creating transactions.

Session is a client-side abstraction for logically grouping one or more units of work. It is designed for single-threaded use and may be backed by a TCP connection when executing requested operations. In the routing driver, created for a bolt+routing URI, all transactions within a single session will be implicitly connected with bookmarks.

See the causal chaining section of the Neo4j Developer Manual for more information:

New sessions can be created like this:

import org.neo4j.driver.v1.Session;

Session session = driver.session();

Using the session we can run our first async query:

import org.neo4j.driver.v1.StatementResultCursor;

CompletionStage<StatementResultCursor> cursorStage = 
      session.runAsync("UNWIND range(1, 10) AS x RETURN x");

cursorStage.thenCompose(StatementResultCursor::listAsync)
                    .whenComplete((records, error) -> {
                           if (records != null) System.out.println( records );
                           else error.printStackTrace();
                           session.closeAsync();
                    });

Invocation of Session#runAsync() returns a CompletionStage of StatementResultCursor, which is the main abstraction for consuming query results, received asynchronously from the database. In this example, all results are eagerly fetched in a list which is later printed.

This might require a lot of heap memory, depending on the size of the result. Large ones can benefit from incremental consumption using #forEachAsync() and #nextAsync(). Created session objects should be explicitly closed at the end of the chain.

The Session#runAsync() method has various overloads that accept query parameters, records and org.neo4j.driver.v1.Statement objects for convenience.

It is possible to safely retrieve a single record from cursor, while asserting that only single record is returned:

import org.neo4j.driver.v1.StatementResultCursor;

CompletionStage<StatementResultCursor> cursorStage = session.runAsync("MATCH (n) RETURN n LIMIT 1");

cursorStage.thenCompose(StatementResultCursor::singleAsync)
                   .thenApply(record -> record.get( 0 ).asNode())
                   .thenApply(Node::labels)
                   .exceptionally(error -> {
                       error.printStackTrace();
                       return emptyList();
                   })
                   .thenAccept(labels -> System.out.println(labels))
                   .thenCompose(ignore -> session.closeAsync());

This code prints all labels of the fetched node. It also explicitly handles errors (database unavailable, network error, no nodes were fetched, …) by printing the stacktrace and returning an empty list of nodes instead.

Sometimes it might be required to consume result records one by one or as a stream. StatementResultCursor allows this using two methods:
    • CompletionStage<Record> nextAsync() – returns stage completed with next records in the result stream or null when end of stream has been reached. Stage can also be completed exceptionally when query fails.
    • CompletionStage<ResultSummary> forEachAsync(Consumer<Record> action) – returns stage completed with summary and applies supplied action to every record of the result stream.
Method #forEachAsync() can be used to convert StatementResultCursor to an rx.Observable from RxJava 1.x library. A naïve example using rx.subject.PublishSubject would be:

import rx.Observable;
import rx.subjects.PublishSubject;

Observable<Record> fetchRecords(Session session, String query) {
    PublishSubject<Record> subject = PublishSubject.create();
    session.runAsync(query)
                .thenCompose(cursor -> cursor.forEachAsync(subject::onNext))
                .whenComplete((summary, error) -> {
                    if (error != null) {
                        subject.onError( error );
                    } else {
                        System.out.println( summary );
                        subject.onCompleted();
                    }
                });
    return subject;
}

Observable<Record> recordsObservable = fetchRecords(session, "MATCH (n:Person) RETURN n");
recordsObservable.subscribe(
    record -> System.out.println(record),
    error -> error.printStackTrace(),
    () -> System.out.println("Query completed")
);

All incoming records are consumed using #forEachAsync() and pushed to a PublishSubject, so that its subscribers can access every record.

Transactions


Sessions not only allow running standalone queries but also running queries within explicit transactions. Callers have control over beginning transactions, executing Cypher queries and committing or rolling them back.

It is recommended to use the Transaction Function API, as detailed in the Neo4j Developer Manual, over explicit transactions. This is true for both the blocking and async API.

In this section we’ll take a look at Async Transaction Functions:

Two main entry points with Async Transaction Functions are:

    • Session#readTransactionAsync(TransactionWork<CompletionStage<T>>)
    • Session#writeTransactionAsync(TransactionWork<CompletionStage<T>>)
These allow the execution of read/write transactions denoted by given TransactionWork objects in asynchronous fashion.

A simple write transaction that creates a node might look like:

session.writeTransactionAsync(tx ->
  tx.runAsync("CREATE (n:Person) RETURN n")
     .thenCompose(StatementResultCursor::singleAsync)
).whenComplete((record, error) -> {
    if (error != null) error.printStackTrace();
    else System.out.println(record);
    session.closeAsync();
});

It creates a single Person node in a write transaction and prints the resulting record. Transactions allow execution of queries in an async fashion via various overloads of Transaction#runAsync() and return the same StatementResultCursor as Session#runAsync(), described above. Transaction will automatically commit when given TransactionWork succeeds and will roll back when it fails.

A read transaction consisting of a single statement might look like this:

session.readTransactionAsync(tx ->
    tx.runAsync("MATCH (n:Person) RETURN n")
      .thenCompose(cursor -> cursor.forEachAsync(System.out::println))
).whenComplete((ignore, error) -> {
    if ( error != null ) error.printStackTrace();
    session.closeAsync();
});

In this example above, all records are consumed and printed within the body of a transaction. It will be automatically committed or rolled back afterwards.

Driver Termination


Used Driver instances hold on to all established network connections and should be explicitly closed when your application is shutting down or finished interacting with Neo4j. Not doing so might result in file descriptor leaks or prevent an application from exiting. The driver can be closed like this:

driver.close();

The close operation terminates all network connections and I/O threads. It is a blocking operation and returns when all resources are terminated.

Use with Maven


The new Java driver release is now available in this Maven Central repository and can be included in a Maven project using this dependency definition:

<dependency>
    <groupId>org.neo4j.driver</groupId>
    <artifactId>neo4j-java-driver</artifactId>
    <version>1.5.0-beta03</version>
</dependency>

The driver has a compile time dependency on Netty but it’s shaded into the final driver artifact so there should be no version dependency conflicts.

Other Notable Changes


This Java driver release also adds a couple of new features, apart from the async API. Most prominent are:
    • A new load-balancing strategy for Causal Clustering uses a least-connected strategy instead of round-robin, which might result in better performance and less degradation when some cluster members perform poorly due to network or other similar issues.
    • Improved connection pooling: The Java driver now allows setting a limit on the amount of connections in the pool per server address via Config.build().withMaxConnectionPoolSize(25) and connection acquisition timeout via Config.build().withConnectionAcquisitionTimeout(10, TimeUnit.SECONDS)
    • Maximum connection lifetime: The Java driver allows for the limiting lifetime of a connection, which can be configured using Config.build().withMaxConnectionLifetime(1, TimeUnit.HOURS)

Conclusion


The Bolt Java driver 1.5 is a rather large release with a lot of new functionality. The new asynchronous API is the most involved part of this and allows users of the driver to interact with it in a different way. It also introduces access to new, richer Java 8 API, such as CompletionStage.

At this point, community input about the new async API would be immensely helpful and would allow us to fine tune the API designs and provide as much value to async code bases as possible.

The driver described here is a pre-release version and should not be used in production. Click here for the most stable version of the Neo4j Java driver.