Learn more about the most recent beta release of the Java driver 1.5.0 for the Neo4j graph database

Introduction


In this article I would like to introduce the new 1.5.0-beta03 pre-release version of the Bolt Java driver for Neo4j which is now built on an asynchronous, Netty-based infrastructure.

Previous versions of the driver used blocking I/O, which meant that the amount of threads needed to handle N concurrent connections was also N. With new non-blocking I/O, the amount of threads can be significantly reduced because one thread can handle multiple network connections in an async fashion. This functionality is exposed through a new set of asynchronous methods, which allow queries and transactions to be executed without blocking.

Asynchronous processing of results is especially valuable in environments where code should block as little as possible, like Akka actors or Spring Data reactive repositories.

One important thing to note is that starting from 1.5, the Neo4j Java driver will require Java 8. The decision to increment required Java versions was made in order to use the existing async programming APIs and interfaces, like CompletionStage and CompletableFuture present only starting from Java 8. They are now used in async API calls, like Session#runAsync(), Transaction#runAsync(), Session#readTransactionAsync(), etc. The previous driver version 1.4 still requires Java 7 and will remain maintained.

Async API


This section describes the new async APIs present in the 1.5.0-beta03 Java driver version. It does not discuss blocking API counterparts, please refer to the Neo4j Developer Manual for more details. The blocking API has been re-implemented on top of the async API and so shares the underlying infrastructure.

Driver Initialization


The main entry point of the driver API remains unchanged, it is the GraphDatabase class and can be used to create a driver like this:

import org.neo4j.driver.v1.AuthTokens;
import org.neo4j.driver.v1.Driver;
import org.neo4j.driver.v1.GraphDatabase;

Driver driver = GraphDatabase.driver("bolt://localhost", 
                                     AuthTokens.basic("neo4j", "test"));

Driver is a thread-safe, application-wide object from which all Neo4j interactions derive.

Sessions


The Driver instance should be used to obtain Session instances which allow for running queries and creating transactions.

Session is a client-side abstraction for logically grouping one or more units of work. It is designed for single-threaded use and may be backed by a TCP connection when executing requested operations. In the routing driver, created for a bolt+routing URI, all transactions within a single session will be implicitly connected with bookmarks.

See the causal chaining section of the Neo4j Developer Manual for more information:

New sessions can be created like this:

import org.neo4j.driver.v1.Session;

Session session = driver.session();

Using the session we can run our first async query:

import org.neo4j.driver.v1.StatementResultCursor;

CompletionStage<StatementResultCursor> cursorStage = 
      session.runAsync("UNWIND range(1, 10) AS x RETURN x");

cursorStage.thenCompose(StatementResultCursor::listAsync)
                    .whenComplete((records, error) -> {
                           if (records != null) System.out.println( records );
                           else error.printStackTrace();
                           session.closeAsync();
                    });

Invocation of Session#runAsync() returns a CompletionStage of StatementResultCursor, which is the main abstraction for consuming query results, received asynchronously from the database. In this example, all results are eagerly fetched in a list which is later printed.

This might require a lot of heap memory, depending on the size of the result. Large ones can benefit from incremental consumption using #forEachAsync() and #nextAsync(). Created session objects should be explicitly closed at the end of the chain.

The Session#runAsync() method has various overloads that accept query parameters, records and org.neo4j.driver.v1.Statement objects for convenience.

It is possible to safely retrieve a single record from cursor, while asserting that only single record is returned:

import org.neo4j.driver.v1.StatementResultCursor;

CompletionStage<StatementResultCursor> cursorStage = session.runAsync("MATCH (n) RETURN n LIMIT 1");

cursorStage.thenCompose(StatementResultCursor::singleAsync)
                   .thenApply(record -> record.get( 0 ).asNode())
                   .thenApply(Node::labels)
                   .exceptionally(error -> {
                       error.printStackTrace();
                       return emptyList();
                   })
                   .thenAccept(labels -> System.out.println(labels))
                   .thenCompose(ignore -> session.closeAsync());

This code prints all labels of the fetched node. It also explicitly handles errors (database unavailable, network error, no nodes were fetched, …) by printing the stacktrace and returning an empty list of nodes instead.

Sometimes it might be required to consume result records one by one or as a stream. StatementResultCursor allows this using two methods:
    • CompletionStage<Record> nextAsync() – returns stage completed with next records in the result stream or null when end of stream has been reached. Stage can also be completed exceptionally when query fails.
    • CompletionStage<ResultSummary> forEachAsync(Consumer<Record> action) – returns stage completed with summary and applies supplied action to every record of the result stream.
Method #forEachAsync() can be used to convert StatementResultCursor to an rx.Observable from RxJava 1.x library. A naïve example using rx.subject.PublishSubject would be:

import rx.Observable;
import rx.subjects.PublishSubject;

Observable<Record> fetchRecords(Session session, String query) {
    PublishSubject<Record> subject = PublishSubject.create();
    session.runAsync(query)
                .thenCompose(cursor -> cursor.forEachAsync(subject::onNext))
                .whenComplete((summary, error) -> {
                    if (error != null) {
                        subject.onError( error );
                    } else {
                        System.out.println( summary );
                        subject.onCompleted();
                    }
                });
    return subject;
}

Observable<Record> recordsObservable = fetchRecords(session, "MATCH (n:Person) RETURN n");
recordsObservable.subscribe(
    record -> System.out.println(record),
    error -> error.printStackTrace(),
    () -> System.out.println("Query completed")
);

All incoming records are consumed using #forEachAsync() and pushed to a PublishSubject, so that its subscribers can access every record.

Transactions


Sessions not only allow running standalone queries but also running queries within explicit transactions. Callers have control over beginning transactions, executing Cypher queries and committing or rolling them back.

It is recommended to use the Transaction Function API, as detailed in the Neo4j Developer Manual, over explicit transactions. This is true for both the blocking and async API.

In this section we’ll take a look at Async Transaction Functions:

Two main entry points with Async Transaction Functions are:

    • Session#readTransactionAsync(TransactionWork<CompletionStage<T>>)
    • Session#writeTransactionAsync(TransactionWork<CompletionStage<T>>)
These allow the execution of read/write transactions denoted by given TransactionWork objects in asynchronous fashion.

A simple write transaction that creates a node might look like:

session.writeTransactionAsync(tx ->
  tx.runAsync("CREATE (n:Person) RETURN n")
     .thenCompose(StatementResultCursor::singleAsync)
).whenComplete((record, error) -> {
    if (error != null) error.printStackTrace();
    else System.out.println(record);
    session.closeAsync();
});

It creates a single Person node in a write transaction and prints the resulting record. Transactions allow execution of queries in an async fashion via various overloads of Transaction#runAsync() and return the same StatementResultCursor as Session#runAsync(), described above. Transaction will automatically commit when given TransactionWork succeeds and will roll back when it fails.

A read transaction consisting of a single statement might look like this:

session.readTransactionAsync(tx ->
    tx.runAsync("MATCH (n:Person) RETURN n")
      .thenCompose(cursor -> cursor.forEachAsync(System.out::println))
).whenComplete((ignore, error) -> {
    if ( error != null ) error.printStackTrace();
    session.closeAsync();
});

In this example above, all records are consumed and printed within the body of a transaction. It will be automatically committed or rolled back afterwards.

Driver Termination


Used Driver instances hold on to all established network connections and should be explicitly closed when your application is shutting down or finished interacting with Neo4j. Not doing so might result in file descriptor leaks or prevent an application from exiting. The driver can be closed like this:

driver.close();

The close operation terminates all network connections and I/O threads. It is a blocking operation and returns when all resources are terminated.

Use with Maven


The new Java driver release is now available in this Maven Central repository and can be included in a Maven project using this dependency definition:

<dependency>
    <groupId>org.neo4j.driver</groupId>
    <artifactId>neo4j-java-driver</artifactId>
    <version>1.5.0-beta03</version>
</dependency>

The driver has a compile time dependency on Netty but it’s shaded into the final driver artifact so there should be no version dependency conflicts.

Other Notable Changes


This Java driver release also adds a couple of new features, apart from the async API. Most prominent are:
    • A new load-balancing strategy for Causal Clustering uses a least-connected strategy instead of round-robin, which might result in better performance and less degradation when some cluster members perform poorly due to network or other similar issues.
    • Improved connection pooling: The Java driver now allows setting a limit on the amount of connections in the pool per server address via Config.build().withMaxConnectionPoolSize(25) and connection acquisition timeout via Config.build().withConnectionAcquisitionTimeout(10, TimeUnit.SECONDS)
    • Maximum connection lifetime: The Java driver allows for the limiting lifetime of a connection, which can be configured using Config.build().withMaxConnectionLifetime(1, TimeUnit.HOURS)

Conclusion


The Bolt Java driver 1.5 is a rather large release with a lot of new functionality. The new asynchronous API is the most involved part of this and allows users of the driver to interact with it in a different way. It also introduces access to new, richer Java 8 API, such as CompletionStage.

At this point, community input about the new async API would be immensely helpful and would allow us to fine tune the API designs and provide as much value to async code bases as possible.

The driver described here is a pre-release version and should not be used in production. Click here for the most stable version of the Neo4j Java driver.


Level up your skills with graph databases and Neo4j: Click below to register for our free online training class, Introduction to Graph Databases and master the world of graph technology in no time.

Sign Me Up

 

Keywords:  


About the Author

Konstantin Lutovich , Neo4j Developer

Konstantin Lutovich Image

Konstantin is a developer in the Neo4j engineering organization. He holds a master’s degree in informatics from the Taras Shevchenko National University.

While being a full-time student there, he also worked full time in the Kiev office of the Copenhagen startup Agillic, primarily working on the data management layers of the platform. He has since then been working in the Kiev office of Yandex. Konstantin is moving to Sweden and will be working in the Malmö office.


2 Comments

Alexander Semenov says:

These are very exciting news! I have some questions.

1. Why no reactive-streams API exposed, i.e. Publisher? This is def the way to go nowadays.
2. Does GraphDatabase.driver(…) still throws when DB is unavailable? This was changed to throw recently instead of throwing when actually using the driver and that seems to be very bad design choice cause one needs both to handle same exceptions when creating a driver and when using it. I ended up writing ad-hock DriverFactory to prevent this and make it all lazy.
3. You say that “session is designed for single-threaded use” but what if one needs to reuse same transaction from a completion stage that can happen on another thread?

Konstantin Lutovich says:

Hi Alexander,

Sorry for the delayed response!

1. Reactive streams are definitely great but are a much bigger change than async API with `CompletionStage`. They also require more features from the underlying protocol, like back pressure. Rx is definitely on our radar for future work.

2. Yeah, that factory method still throws. It is actually the only place where we block on purpose to behave the same way as previous version. Behaviour of the direct driver (created for ‘bolt’ uri) has been different from routing driver (created for ‘bolt+routing’ uri). Former has been aligned with later for consistency. We could probably expose a method on the driver to explicitly verify connectivity, but that’s probably a feature for the next major release. Factory method can only throw when driver is created but database is unavailable. Could you please explain your use-case and how is this possible?

3. Session objects are not designed for concurrent use. So there should be no threads accessing Session concurrently to execute queries or begin transactions. It is fine to pass Session through the `CompletionStage` chain because it will be accessed by a single thread at a time. It will also be safely published from one element of the chain to another.

Hope this helps!

Leave a Reply

Your email address will not be published. Required fields are marked *

Subscribe

Upcoming Event

 


Have a Graph Question?

Stack Overflow
Community Forums
Contact Us

Share your Graph Story?

Email us: content@neo4j.com