Run non-blocking asynchronous queries

The examples in Query the database use the async/await syntax, which forces the driver to work synchronously. When using await with a query, your application waits for the server to retrieve all query results and transmit them to the driver. This is not a problem for most use cases, but for queries that have a long processing time or a large result set, asynchronous handling may speed up your application.

There are several ways of running asynchronous queries:

  • Asynchronous iteration — the query result is processed (iteratively) as quickly as your application can handle. The driver modulates the amount of records transmitted by the server accordingly.

  • Promise API — the query result is returned as a Promise. The promise is only resolved when the full result set is available to the driver. Best suited for queries with a large server processing time, but the result of which you want to process in one go. Your application receives the result in bulk, for eager consumption.

  • Streaming API — the query result is returned as a stream, so that each result record is processed as soon as it is available. Best suited for queries where records processing is individual. Your application receives the result in bits, for lazy consumption.

  • Reactive API — for reactive applications.

When using await tx.run() in a transaction function, you may return the query result out of the transaction function as-is for further processing. On the other hand, for async queries, you have to process the result inside the transaction function (except for the Promise API).

Asynchronous iteration

The Result object supports asynchronous iteration. This allows your application to process data at its own pace, with the driver accordingly modulating the speed at which records are streamed from the server, applying backpressure. With async iterators, you get the guarantee that your application does not receive data faster than it can process.

const session = driver.session()
try {
  const peopleNames = await session.executeWrite(async tx => {
    const result = tx.run(  (1)
      'MERGE (p:Person {name: $name}) RETURN p.name AS name',
      { name: 'Alice' }
    )
    let names = []
    for await (const record of result) {  (2)
      console.log(`Processing ${record.get('name')}`)
      names.push(record.get('name'))
    }
    return names  (3)
  })
} finally {
  await session.close()
}
1 Run a query
2 Process records with async iteration
3 Return processed results (and not the raw query result)

There are two important points to the usage of the async iterator:

  • you can async-iterate only once per query result. Once the result cursor reaches the end of the stream, it does not rewind, so you cannot iterate over a result more than once. If you need to process the data more than once in your application, you have to manually store it in an auxiliary data structure (like a list, as above).

  • the processing of the result happens inside the transaction function. You should not return the raw result out of the transaction function and then iterate over it. That workflow only works with the Promise API.

Promise API

The Promise API allows to run a query and receive the result as a Promise. You can think of this query method as allowing you to specify a Cypher query and a number of callbacks that are asynchronously executed depending on the query outcome.

const session = driver.session({database: 'neo4j'})
const result = session.executeWrite(async tx => {  (1)
  return tx.run(
    'MERGE (p:Person {name: $name}) RETURN p.name AS name',
    { name: 'Alice' }
  )
})
result.then(result => {  (2)
  result.records.forEach(record => {
    console.log(record.get('name'))
  })
  return result
})
.catch(error => {  (3)
  console.log(error)
})
.then(() => session.close())  (4)
1 Run a query
2 Specify callback for successful runs, taking query result as input
3 Specify callback for failed runs, taking driver error as input
4 Specify callback to run regardless of query outcome

The Promise API holds for Driver.executeQuery() as well. If you prepend await to an .executeQuery() call, as shown in Query the database, you effectively force your application to wait until the result is ready, and you obtain the corresponding of either result or error from the example above.

Combine multiple transactions

To run multiple queries within the same transaction, use Promise.all(). It runs asynchronous operations concurrently, so you can submit multiple queries at the same time and wait for them all to finish.

Retrieve people’s names and assign each of them to the company Neo4j
const companyName = 'Neo4j'
const session = driver.session({database: 'neo4j'})
try {
  const names = await session.executeRead(async tx => {
    const result = await tx.run('MATCH (p:Person) RETURN p.name AS name')
    return result.records.map(record => record.get('name'))
  })

  const relationshipsCreated = await session.executeWrite(tx =>
    Promise.all(  // group together all Promises
      names.map(name =>
        tx.run(`
          MATCH (emp:Person {name: $personName})
          MERGE (com:Company {name: $companyName})
          MERGE (emp)-[:WORKS_FOR]->(com)
          `, { personName: name, companyName: companyName }
        )
        .then(result => result.summary.counters.updates().relationshipsCreated)
      )
    ).then(values => values.reduce((a, b) => a + b))  // aggregate results
  )

  console.log(`Created ${relationshipsCreated} employees relationships.`)
} finally {
  await session.close()
}

Streaming API

The Streaming API allows to run a query and receive results individually, as soon as the server has them ready. You can specify a callback to process each record. This API is particularly fit for cases in which it may take the server a different time to retrieve the different records, but you want to process each of them as soon as they are available. The behavior is similar to the async iterator; the programming style is different.

const session = driver.session({database: 'neo4j'})
let peopleNames = []

session
  .run('MERGE (p:Person {name: $name}) RETURN p.name AS name', {  (1)
    name: 'Alice'
  })
  .subscribe({  (2)
    onKeys: keys => {  (3)
      console.log('Result columns are:')
      console.log(keys)
    },
    onNext: record => {  (4)
      console.log(`Processing ${record.get('name')}`)
      peopleNames.push(record.get('name'))
    },
    onCompleted: () => {  (5)
      session.close() // returns a Promise
    },
    onError: error => {  (6)
      console.log(error)
    }
  })
1 Run a query
2 Attach a handler to the result stream
3 The onKeys callback receives the list of result columns
4 The onNext callback is invoked every time a record is received
5 The onCompleted callback is invoked when the transaction is over
6 The onError is triggered in case of error

Reactive API

Typical of reactive programming, in a reactive flow consumers control the rate at which they consume records from queries, and the driver in turn manages the rate at which records are requested from the server. The reactive API is recommended for applications that are already oriented towards the reactive style.

const rxjs = require('rxjs');

const rxSession = driver.rxSession()  (1)
const rxResult = await rxSession.executeWrite(tx => {
  return tx
    .run('MERGE (p:Person {name: $name}) RETURN p.name AS name', {  (2)
      name: 'Alice'
    })
    .records()  (3)
    .pipe(  (4)
      rxjs.map(record => record.get('name')),
      //rxjs.materialize(),  // optional, turns outputs into Notifications
      rxjs.toArray()
    )
  })
const people = await rxResult.toPromise()
console.log(people)
1 Obtain a reactive session
2 Run a query
3 Obtain an observable for result records
4 Reactive processing
The reactive API requires a RxSession and returns a RxResult.
The reactive API is not available in the lite version of the driver.

Glossary

LTS

A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.

Aura

Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.

Cypher

Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.

APOC

Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.

Bolt

Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.

ACID

Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.

eventual consistency

A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.

causal consistency

A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.

NULL

The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with null.

transaction

A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.

backpressure

Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.

transaction function

A transaction function is a callback executed by an executeRead or executeWrite call. The driver automatically re-executes the callback in case of server failure.

Driver

A Driver object holds the details required to establish connections with a Neo4j database.