Async API Documentation

New in version 5.0.

Warning

There are known issue with Python 3.8 and the async driver where it gradually slows down. Generally, it’s recommended to use the latest supported version of Python for best performance, stability, and security.

AsyncGraphDatabase

Async Driver Construction

The neo4j.AsyncDriver construction is done via a classmethod on the neo4j.AsyncGraphDatabase class.

class neo4j.AsyncGraphDatabase

Accessor for neo4j.AsyncDriver construction.

classmethod driver(uri, *, auth=None, **config)

Create a driver.

Parameters:
  • uri (str) – the connection URI for the driver, see URI for available URIs.

  • auth (Tuple[Any, Any] | Auth | None) – the authentication details, see Auth for available authentication details.

  • config – driver configuration key-word arguments, see Async Driver Configuration for available key-word arguments.

Return type:

AsyncDriver

Driver creation example:

import asyncio

from neo4j import AsyncGraphDatabase


async def main():
    uri = "neo4j://example.com:7687"
    driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "password"))

    await driver.close()  # close the driver object

 asyncio.run(main())

For basic authentication, auth can be a simple tuple, for example:

auth = ("neo4j", "password")

This will implicitly create a neo4j.Auth with a scheme="basic". Other authentication methods are described under Auth.

with block context example:

import asyncio

from neo4j import AsyncGraphDatabase


async def main():
    uri = "neo4j://example.com:7687"
    auth = ("neo4j", "password")
    async with AsyncGraphDatabase.driver(uri, auth=auth) as driver:
        ...  # use the driver

 asyncio.run(main())
classmethod bookmark_manager(initial_bookmarks=None, bookmarks_supplier=None, bookmarks_consumer=None)

Create a AsyncBookmarkManager with default implementation.

Basic usage example to configure sessions with the built-in bookmark manager implementation so that all work is automatically causally chained (i.e., all reads can observe all previous writes even in a clustered setup):

import neo4j


# omitting closing the driver for brevity
driver = neo4j.AsyncGraphDatabase.driver(...)
bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager(...)

async with driver.session(
    bookmark_manager=bookmark_manager
) as session1:
    async with driver.session(
        bookmark_manager=bookmark_manager,
        access_mode=neo4j.READ_ACCESS
    ) as session2:
        result1 = await session1.run("<WRITE_QUERY>")
        await result1.consume()
        # READ_QUERY is guaranteed to see what WRITE_QUERY wrote.
        result2 = await session2.run("<READ_QUERY>")
        await result2.consume()

This is a very contrived example, and in this particular case, having both queries in the same session has the exact same effect and might even be more performant. However, when dealing with sessions spanning multiple threads, async Tasks, processes, or even hosts, the bookmark manager can come in handy as sessions are not safe to be used concurrently.

Parameters:
  • initial_bookmarks (None | Bookmarks | Iterable[str]) – The initial set of bookmarks. The returned bookmark manager will use this to initialize its internal bookmarks.

  • bookmarks_supplier (Callable[[], Bookmarks | Awaitable[Bookmarks]] | None) – Function which will be called every time the default bookmark manager’s method AsyncBookmarkManager.get_bookmarks() gets called. The function takes no arguments and must return a Bookmarks object. The result of bookmarks_supplier will then be concatenated with the internal set of bookmarks and used to configure the session in creation. It will, however, not update the internal set of bookmarks.

  • bookmarks_consumer (Callable[[Bookmarks], None | Awaitable[None]] | None) – Function which will be called whenever the set of bookmarks handled by the bookmark manager gets updated with the new internal bookmark set. It will receive the new set of bookmarks as a Bookmarks object and return None.

Returns:

A default implementation of AsyncBookmarkManager.

Return type:

AsyncBookmarkManager

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

New in version 5.0.

Changed in version 5.3: The bookmark manager no longer tracks bookmarks per database. This effectively changes the signature of almost all bookmark manager related methods:

  • initial_bookmarks is no longer a mapping from database name to bookmarks but plain bookmarks.

  • bookmarks_supplier no longer receives the database name as an argument.

  • bookmarks_consumer no longer receives the database name as an argument.

URI

On construction, the scheme of the URI determines the type of neo4j.AsyncDriver object created.

Available valid URIs:

  • bolt://host[:port]

  • bolt+ssc://host[:port]

  • bolt+s://host[:port]

  • neo4j://host[:port][?routing_context]

  • neo4j+ssc://host[:port][?routing_context]

  • neo4j+s://host[:port][?routing_context]

uri = "bolt://example.com:7687"
uri = "neo4j://example.com:7687"

Each supported scheme maps to a particular neo4j.AsyncDriver subclass that implements a specific behaviour.

URI Scheme

Driver Object and Setting

bolt

AsyncBoltDriver with no encryption.

bolt+ssc

AsyncBoltDriver with encryption (accepts self signed certificates).

bolt+s

AsyncBoltDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks.

neo4j

AsyncNeo4jDriver with no encryption.

neo4j+ssc

AsyncNeo4jDriver with encryption (accepts self signed certificates).

neo4j+s

AsyncNeo4jDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks.

AsyncDriver

Every Neo4j-backed application will require a driver object.

This object holds the details required to establish connections with a Neo4j database, including server URIs, credentials and other configuration. neo4j.AsyncDriver objects hold a connection pool from which neo4j.AsyncSession objects can borrow connections. Closing a driver will immediately shut down all connections in the pool.

Note

Driver objects only open connections and pool them as needed. To verify that the driver is able to communicate with the database without executing any query, use neo4j.AsyncDriver.verify_connectivity().

class neo4j.AsyncDriver

Base class for all types of neo4j.AsyncDriver, instances of which are used as the primary access point to Neo4j.

async execute_query(query, parameters_=None, routing_=neo4j.RoutingControl.WRITERS, database_=None, impersonated_user_=None, bookmark_manager_=self.query_bookmark_manager, result_transformer_=AsyncResult.to_eager_result, **kwargs)

Execute a query in a transaction function and return all results.

This method is a handy wrapper for lower-level driver APIs like sessions, transactions, and transaction functions. It is intended for simple use cases where there is no need for managing all possible options.

The internal usage of transaction functions provides a retry-mechanism for appropriate errors. Furthermore, this means that queries using CALL {} IN TRANSACTIONS or the older USING PERIODIC COMMIT will not work (use AsyncSession.run() for these).

The method is roughly equivalent to:

async def execute_query(
    query_, parameters_, routing_, database_, impersonated_user_,
    bookmark_manager_, result_transformer_, **kwargs
):
    async def work(tx):
        result = await tx.run(query_, parameters_, **kwargs)
        return await result_transformer_(result)

    async with driver.session(
        database=database_,
        impersonated_user=impersonated_user_,
        bookmark_manager=bookmark_manager_,
    ) as session:
        if routing_ == RoutingControl.WRITERS:
            return await session.execute_write(work)
        elif routing_ == RoutingControl.READERS:
            return await session.execute_read(work)

Usage example:

from typing import List

import neo4j


async def example(driver: neo4j.AsyncDriver) -> List[str]:
    """Get the name of all 42 year-olds."""
    records, summary, keys = await driver.execute_query(
        "MATCH (p:Person {age: $age}) RETURN p.name",
        {"age": 42},
        routing_=neo4j.RoutingControl.READERS,  # or just "r"
        database_="neo4j",
    )
    assert keys == ["p.name"]  # not needed, just for illustration
    # log_summary(summary)  # log some metadata
    return [str(record["p.name"]) for record in records]
    # or: return [str(record[0]) for record in records]
    # or even: return list(map(lambda r: str(r[0]), records))

Another example:

import neo4j


async def example(driver: neo4j.AsyncDriver) -> int:
    """Call all young people "My dear" and get their count."""
    record = await driver.execute_query(
        "MATCH (p:Person) WHERE p.age <= 15 "
        "SET p.nickname = 'My dear' "
        "RETURN count(*)",
        # optional routing parameter, as write is default
        # routing_=neo4j.RoutingControl.WRITERS,  # or just "w",
        database_="neo4j",
        result_transformer_=neo4j.AsyncResult.single,
    )
    assert record is not None  # for typechecking and illustration
    count = record[0]
    assert isinstance(count, int)
    return count
Parameters:
  • query (Optional[str]) – cypher query to execute

  • parameters (Optional[Dict[str, Any]]) – parameters to use in the query

  • routing (neo4j.RoutingControl) – whether to route the query to a reader (follower/read replica) or a writer (leader) in the cluster. Default is to route to a writer.

  • database (Optional[str]) –

    database to execute the query against.

    None (default) uses the database configured on the server side.

    Note

    It is recommended to always specify the database explicitly when possible. This allows the driver to work more efficiently, as it will not have to resolve the default database first.

    See also the Session config database.

  • impersonated_user (Optional[str]) –

    Name of the user to impersonate.

    This means that all query will be executed in the security context of the impersonated user. For this, the user for which the Driver has been created needs to have the appropriate permissions.

    See also the Session config impersonated_user.

  • result_transformer (Callable[[neo4j.AsyncResult], Awaitable[T]]) –

    A function that gets passed the neo4j.AsyncResult object resulting from the query and converts it to a different type. The result of the transformer function is returned by this method.

    Warning

    The transformer function must not return the neo4j.AsyncResult itself.

    Example transformer that checks that exactly one record is in the result stream, then returns the record and the result summary:

    from typing import Tuple
    
    import neo4j
    
    
    async def transformer(
        result: neo4j.AsyncResult
    ) -> Tuple[neo4j.Record, neo4j.ResultSummary]:
        record = await result.single(strict=True)
        summary = await result.consume()
        return record, summary
    

    Note that methods of neo4j.AsyncResult that don’t take mandatory arguments can be used directly as transformer functions. For example:

    import neo4j
    
    
    async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
        record = await driver.execute_query(
            "SOME QUERY",
            result_transformer_=neo4j.AsyncResult.single
        )
    
    
    # is equivalent to:
    
    
    async def transformer(result: neo4j.AsyncResult) -> neo4j.Record:
        return await result.single()
    
    
    async def example(driver: neo4j.AsyncDriver) -> neo4j.Record::
        record = await driver.execute_query(
            "SOME QUERY",
            result_transformer_=transformer
        )
    

  • bookmark_manager

    Specify a bookmark manager to use.

    If present, the bookmark manager is used to keep the query causally consistent with all work executed using the same bookmark manager.

    Defaults to the driver’s query_bookmark_manager.

    Pass None to disable causal consistency.

  • kwargs (Any) – additional keyword parameters. None of these can end with a single underscore. This is to avoid collisions with the keyword configuration parameters of this method. If you need to pass such a parameter, use the parameters_ parameter instead. These take precedence over parameters passed as parameters_.

Returns:

the result of the result_transformer

Return type:

T

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

New in version 5.5.

property encrypted: bool

Indicate whether the driver was configured to use encryption.

session(**config)

Create a session, see AsyncSession Construction

Parameters:

config – session configuration key-word arguments, see Session Configuration for available key-word arguments.

Returns:

new neo4j.AsyncSession object

Return type:

AsyncSession

async close()

Shut down, closing any open connections in the pool.

Return type:

None

property query_bookmark_manager: AsyncBookmarkManager

The driver’s default query bookmark manager.

This is the default AsyncBookmarkManager used by execute_query(). This can be used to causally chain execute_query() calls and sessions. Example:

async def example(driver: neo4j.AsyncDriver) -> None:
    await driver.execute_query("<QUERY 1>")
    async with driver.session(
        bookmark_manager=driver.query_bookmark_manager
    ) as session:
        # every query inside this session will be causally chained
        # (i.e., can read what was written by <QUERY 1>)
        await session.run("<QUERY 2>")
    # subsequent execute_query calls will be causally chained
    # (i.e., can read what was written by <QUERY 2>)
    await driver.execute_query("<QUERY 3>")

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

New in version 5.5.

async verify_connectivity(**config)

Verify that the driver can establish a connection to the server.

This verifies if the driver can establish a reading connection to a remote server or a cluster. Some data will be exchanged.

Note

Even if this method raises an exception, the driver still needs to be closed via close() to free up all resources.

Parameters:

config

accepts the same configuration key-word arguments as session().

Warning

All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.

Raises:

DriverError – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.

Return type:

None

Changed in version 5.0: The undocumented return value has been removed. If you need information about the remote server, use get_server_info() instead.

async get_server_info(**config)

Get information about the connected Neo4j server.

Try to establish a working read connection to the remote server or a member of a cluster and exchange some data. Then return the contacted server’s information.

In a cluster, there is no guarantee about which server will be contacted.

Note

Even if this method raises an exception, the driver still needs to be closed via close() to free up all resources.

Parameters:

config

accepts the same configuration key-word arguments as session().

Warning

All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.

Raises:

DriverError – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.

Return type:

ServerInfo

New in version 5.0.

Async Driver Configuration

neo4j.AsyncDriver is configured exactly like neo4j.Driver (see Driver Configuration). The only difference is that the async driver accepts an async custom resolver function:

resolver

A custom resolver function to resolve any addresses the driver receives ahead of DNS resolution. This function is called with an Address and should return an iterable of Address objects or values that can be used to construct Address objects.

If no custom resolver function is supplied, the internal resolver moves straight to regular DNS resolution.

The custom resolver function can but does not have to be a coroutine.

For example:

import neo4j


 async def custom_resolver(socket_address):
     # assert isinstance(socket_address, neo4j.Address)
     if socket_address != ("example.com", 9999):
         raise OSError(f"Unexpected socket address {socket_address!r}")

     # You can return any neo4j.Address object
     yield neo4j.Address(("localhost", 7687))  # IPv4
     yield neo4j.Address(("::1", 7687, 0, 0))  # IPv6
     yield neo4j.Address.parse("localhost:7687")
     yield neo4j.Address.parse("[::1]:7687")

     # or any tuple that can be passed to neo4j.Address(...).
     # Initially, this will be interpreted as IPv4, but DNS resolution
     # will turn it into IPv6 if appropriate.
     yield "::1", 7687
     # This will be interpreted as IPv6 directly, but DNS resolution will
     # still happen.
     yield "::1", 7687, 0, 0
     yield "127.0.0.1", 7687


 # alternatively
 def custom_resolver(socket_address):
     ...


driver = neo4j.GraphDatabase.driver("neo4j://example.com:9999",
                                    auth=("neo4j", "password"),
                                    resolver=custom_resolver)
Default:

None

Driver Object Lifetime

For general applications, it is recommended to create one top-level neo4j.AsyncDriver object that lives for the lifetime of the application.

For example:

from neo4j import AsyncGraphDatabase


class Application:

    def __init__(self, uri, user, password)
        self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))

    async def close(self):
        await self.driver.close()

Connection details held by the neo4j.AsyncDriver are immutable. Therefore if, for example, a password is changed, a replacement neo4j.AsyncDriver object must be created. More than one AsyncDriver may be required if connections to multiple remotes, or connections as multiple users, are required, unless when using impersonation (impersonated_user).

neo4j.AsyncDriver objects are safe to be used in concurrent coroutines. They are not thread-safe.

AsyncBoltDriver

URI schemes:

bolt, bolt+ssc, bolt+s

Will result in:

class neo4j.AsyncBoltDriver(pool, default_workspace_config)

AsyncBoltDriver is instantiated for bolt URIs and addresses a single database machine. This may be a standalone server or could be a specific member of a cluster.

Connections established by a AsyncBoltDriver are always made to the exact host and port detailed in the URI.

This class is not supposed to be instantiated externally. Use AsyncGraphDatabase.driver() instead.

AsyncNeo4jDriver

URI schemes:

neo4j, neo4j+ssc, neo4j+s

Will result in:

class neo4j.AsyncNeo4jDriver(pool, default_workspace_config)

AsyncNeo4jDriver is instantiated for neo4j URIs. The routing behaviour works in tandem with Neo4j’s Causal Clustering feature by directing read and write behaviour to appropriate cluster members.

This class is not supposed to be instantiated externally. Use AsyncGraphDatabase.driver() instead.

AsyncSessions & AsyncTransactions

All database activity is co-ordinated through two mechanisms: sessions (neo4j.AsyncSession) and transactions (neo4j.AsyncTransaction, neo4j.AsyncManagedTransaction).

A session is a logical container for any number of causally-related transactional units of work. Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required. Sessions provide the top level of containment for database activity. Session creation is a lightweight operation and sessions are not thread safe.

Connections are drawn from the neo4j.AsyncDriver connection pool as required.

A transaction is a unit of work that is either committed in its entirety or is rolled back on failure.

AsyncSession Construction

To construct a neo4j.AsyncSession use the neo4j.AsyncDriver.session() method.

import asyncio

from neo4j import AsyncGraphDatabase


async def main():
    async with AsyncGraphDatabase(uri, auth=(user, password)) as driver:
        session = driver.session()
        try:
            result = await session.run("MATCH (a:Person) RETURN a.name AS name")
            names = [record["name"] async for record in result]
        except asyncio.CancelledError:
            session.cancel()
            raise
        finally:
            await session.close()

asyncio.run(main())

Sessions will often be created and destroyed using a with block context. This is the recommended approach as it takes care of closing the session properly even when an exception is raised.

async with driver.session() as session:
    result = await session.run("MATCH (a:Person) RETURN a.name AS name")
    ...  # do something with the result

Sessions will often be created with some configuration settings, see Session Configuration.

async with driver.session(database="example_database",
                          fetch_size=100) as session:
    result = await session.run("MATCH (a:Person) RETURN a.name AS name")
    ...  # do something with the result

AsyncSession

class neo4j.AsyncSession

Context for executing work

A AsyncSession is a logical context for transactional units of work. Connections are drawn from the AsyncDriver connection pool as required.

Session creation is a lightweight operation and sessions are not safe to be used in concurrent contexts (multiple threads/coroutines). Therefore, a session should generally be short-lived, and must not span multiple threads/asynchronous Tasks.

In general, sessions will be created and destroyed within a with context. For example:

async with driver.session(database="neo4j") as session:
    result = await session.run("MATCH (n:Person) RETURN n.name AS name")
    ...  # do something with the result

Note

Some asyncio utility functions (e.g., asyncio.wait_for() and asyncio.shield()) will wrap work in a asyncio.Task. This introduces concurrency and can lead to undefined behavior as AsyncSession is not concurrency-safe.

Consider this wrong example

async def dont_do_this(driver):
    async with driver.session() as session:
        await asyncio.shield(session.run("RETURN 1"))

If dont_do_this gets cancelled while waiting for session.run, session.run itself won’t get cancelled (it’s shielded) so it will continue to use the session in another Task. Concurrently, will the async context manager (async with driver.session()) on exit clean up the session. That’s two Tasks handling the session concurrently. Therefore, this yields undefined behavior.

In this particular example, the problem could be solved by shielding the whole coroutine dont_do_this instead of only the session.run. Like so

async def thats_better(driver):
    async def inner()
        async with driver.session() as session:
            await session.run("RETURN 1")

    await asyncio.shield(inner())
async close()

Close the session.

This will release any borrowed resources, such as connections, and will roll back any outstanding transactions.

Return type:

None

cancel()

Cancel this session.

If the session is already closed, this method does nothing. Else, it will if present, forcefully close the connection the session holds. This will violently kill all work in flight.

The primary purpose of this function is to handle asyncio.CancelledError.

session = await driver.session()
try:
    ...  # do some work
except asyncio.CancelledError:
    session.cancel()
    raise
Return type:

None

closed()

Indicate whether the session has been closed.

Returns:

True if closed, False otherwise.

Return type:

bool

async run(query, parameters=None, **kwargs)

Run a Cypher query within an auto-commit transaction.

The query is sent and the result header received immediately but the neo4j.Result content is fetched lazily as consumed by the client application.

If a query is executed before a previous neo4j.AsyncResult in the same AsyncSession has been fully consumed, the first result will be fully fetched and buffered. Note therefore that the generally recommended pattern of usage is to fully consume one result before executing a subsequent query. If two results need to be consumed in parallel, multiple AsyncSession objects can be used as an alternative to result buffering.

For more usage details, see AsyncTransaction.run().

Parameters:
  • query (t.Union[te.LiteralString, Query]) – cypher query

  • parameters (t.Optional[t.Dict[str, t.Any]]) – dictionary of parameters

  • kwargs (t.Any) – additional keyword parameters. These take precedence over parameters passed as parameters.

Returns:

a new neo4j.AsyncResult object

Raises:

SessionError – if the session has been closed.

Return type:

AsyncResult

async last_bookmarks()

Return most recent bookmarks of the session.

Bookmarks can be used to causally chain sessions. For example, if a session (session1) wrote something, that another session (session2) needs to read, use session2 = driver.session(bookmarks=session1.last_bookmarks()) to achieve this.

Combine the bookmarks of multiple sessions like so:

bookmarks1 = await session1.last_bookmarks()
bookmarks2 = await session2.last_bookmarks()
session3 = driver.session(bookmarks=bookmarks1 + bookmarks2)

A session automatically manages bookmarks, so this method is rarely needed. If you need causal consistency, try to run the relevant queries in the same session.

“Most recent bookmarks” are either the bookmarks passed to the session on creation, or the last bookmark the session received after committing a transaction to the server.

Note: For auto-commit transactions (Session.run()), this will trigger Result.consume() for the current result.

Returns:

the session’s last known bookmarks

Return type:

Bookmarks

async last_bookmark()

Get the bookmark received following the last completed transaction.

Note: For auto-commit transactions (Session.run()), this will trigger Result.consume() for the current result.

Warning

This method can lead to unexpected behaviour if the session has not yet successfully completed a transaction.

Returns:

last bookmark

Return type:

str | None

Deprecated since version 5.0: last_bookmark() will be removed in version 6.0. Use last_bookmarks() instead.

async begin_transaction(metadata=None, timeout=None)

Begin a new unmanaged transaction.

Creates a new AsyncTransaction within this session. At most one transaction may exist in a session at any point in time. To maintain multiple concurrent transactions, use multiple concurrent sessions.

Note: For auto-commit transactions (AsyncSession.run()), this will trigger a AsyncResult.consume() for the current result.

Parameters:
  • metadata (Dict[str, Any] | None) – a dictionary with metadata. Specified metadata will be attached to the executing transaction and visible in the output of SHOW TRANSACTIONS YIELD * It will also get logged to the query.log. This functionality makes it easier to tag transactions and is equivalent to the dbms.setTXMetaData procedure, see https://neo4j.com/docs/cypher-manual/current/clauses/transaction-clauses/#query-listing-transactions and https://neo4j.com/docs/operations-manual/current/reference/procedures/ for reference.

  • timeout (float | None) – the transaction timeout in seconds. Transactions that execute longer than the configured timeout will be terminated by the database. This functionality allows to limit query/transaction execution time. Specified timeout overrides the default timeout configured in the database using dbms.transaction.timeout setting. Value should not represent a duration of zero or negative duration.

Returns:

A new transaction instance.

Raises:
Return type:

AsyncTransaction

async read_transaction(transaction_function, *args, **kwargs)

Execute a unit of work in a managed read transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Returns:

a result as returned by the given unit of work

Raises:

SessionError – if the session has been closed.

Return type:

_R

Deprecated since version 5.0: Method was renamed to execute_read().

async execute_read(transaction_function, *args, **kwargs)

Execute a unit of work in a managed read transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns, unless an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def do_cypher_tx(tx, cypher):
    result = await tx.run(cypher)
    values = [record.values() async for record in result]
    return values

async with driver.session() as session:
    values = await session.execute_read(do_cypher_tx, "RETURN 1 AS x")

Example:

async def get_two_tx(tx):
    result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
    values = []
    async for record in result:
        if len(values) >= 2:
            break
        values.append(record.values())
    # or shorter: values = [record.values()
    #                       for record in await result.fetch(2)]

    # discard the remaining records if there are any
    summary = await result.consume()
    # use the summary for logging etc.
    return values

async with driver.session() as session:
    values = await session.execute_read(get_two_tx)
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Returns:

whatever the given transaction_function returns

Raises:

SessionError – if the session has been closed.

Return type:

_R

New in version 5.0.

async write_transaction(transaction_function, *args, **kwargs)

Execute a unit of work in a managed write transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Returns:

a result as returned by the given unit of work

Raises:

SessionError – if the session has been closed.

Return type:

_R

Deprecated since version 5.0: Method was renamed to execute_write().

async execute_write(transaction_function, *args, **kwargs)

Execute a unit of work in a managed write transaction.

Note

This does not necessarily imply access control, see the session configuration option default_access_mode.

This transaction will automatically be committed when the function returns unless, an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).

Example:

async def create_node_tx(tx, name):
    query = ("CREATE (n:NodeExample {name: $name, id: randomUUID()}) "
             "RETURN n.id AS node_id")
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

async with driver.session() as session:
    node_id = await session.execute_write(create_node_tx, "Bob")
Parameters:
  • transaction_function (t.Callable[te.Concatenate[AsyncManagedTransaction, _P], t.Awaitable[_R]]) – a function that takes a transaction as an argument and does work with the transaction. transaction_function(tx, *args, **kwargs) where tx is a AsyncManagedTransaction.

  • args (_P.args) – additional arguments for the transaction_function

  • kwargs (_P.kwargs) – key word arguments for the transaction_function

Returns:

a result as returned by the given unit of work

Raises:

SessionError – if the session has been closed.

Return type:

_R

New in version 5.0.

Session Configuration

neo4j.AsyncSession is configured exactly like neo4j.Session (see Session Configuration). The only difference is the async session accepts either a neo4j.api.BookmarkManager object or a neo4j.api.AsyncBookmarkManager as bookmark manager:

bookmark_manager

Specify a bookmark manager for the driver to use. If present, the bookmark manager is used to keep all work on the driver causally consistent.

See BookmarkManager for more information.

Warning

Enabling the BookmarkManager can have a negative impact on performance since all queries will wait for the latest changes to be propagated across the cluster.

For simpler use-cases, sessions (AsyncSession) can be used to group a series of queries together that will be causally chained automatically.

Type:

None, BookmarkManager, or AsyncBookmarkManager

Default:

None

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

AsyncTransaction

Neo4j supports three kinds of async transaction:

Each has pros and cons but if in doubt, use a managed transaction with a transaction function.

Auto-commit Transactions

Auto-commit transactions are the simplest form of transaction, available via neo4j.Session.run(). These are easy to use but support only one statement per transaction and are not automatically retried on failure.

Auto-commit transactions are also the only way to run PERIODIC COMMIT (only Neo4j 4.4 and earlier) or CALL {...} IN TRANSACTIONS (Neo4j 4.4 and newer) statements, since those Cypher clauses manage their own transactions internally.

Write example:

import neo4j


async def create_person(driver, name):
    # default_access_mode defaults to WRITE_ACCESS
    async with driver.session(database="neo4j") as session:
        query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
        result = await session.run(query, name=name)
        record = await result.single()
        return record["node_id"]

Read example:

import neo4j


async def get_numbers(driver):
    numbers = []
    async with driver.session(
        database="neo4j",
        default_access_mode=neo4j.READ_ACCESS
    ) as session:
        result = await session.run("UNWIND [1, 2, 3] AS x RETURN x")
        async for record in result:
            numbers.append(record["x"])
    return numbers

Explicit Transactions (Unmanaged Transactions)

Explicit transactions support multiple statements and must be created with an explicit neo4j.AsyncSession.begin_transaction() call.

This creates a new neo4j.AsyncTransaction object that can be used to run Cypher.

It also gives applications the ability to directly control commit and rollback activity.

class neo4j.AsyncTransaction

Container for multiple Cypher queries to be executed within a single context. AsyncTransaction objects can be used as a context managers (async with block) where the transaction is committed or rolled back on based on whether an exception is raised:

async with await session.begin_transaction() as tx:
    ...
async run(query, parameters=None, **kwparameters)

Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:

query = "CREATE (a:Person { name: $name, age: $age })"
result = await tx.run(query, {"name": "Alice", "age": 33})
result = await tx.run(query, {"name": "Alice"}, age=33)
result = await tx.run(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type system. In Python, this includes bool, int, str, list and dict. Note however that list properties must be homogenous.

Parameters:
  • query (te.LiteralString) – cypher query

  • parameters (t.Optional[t.Dict[str, t.Any]]) – dictionary of parameters

  • kwparameters (t.Any) – additional keyword parameters. These take precedence over parameters passed as parameters.

Raises:

TransactionError – if the transaction is already closed

Returns:

a new neo4j.AsyncResult object

Return type:

AsyncResult

async commit()

Mark this transaction as successful and close in order to trigger a COMMIT.

Raises:

TransactionError – if the transaction is already closed

async rollback()

Mark this transaction as unsuccessful and close in order to trigger a ROLLBACK.

Raises:

TransactionError – if the transaction is already closed

async close()

Close this transaction, triggering a ROLLBACK if not closed.

cancel()

Cancel this transaction.

If the transaction is already closed, this method does nothing. Else, it will close the connection without ROLLBACK or COMMIT in a non-blocking manner.

The primary purpose of this function is to handle asyncio.CancelledError.

tx = await session.begin_transaction()
try:
    ...  # do some work
except asyncio.CancelledError:
    tx.cancel()
    raise
Return type:

None

closed()

Indicate whether the transaction has been closed or cancelled.

Returns:

True if closed or cancelled, False otherwise.

Return type:

bool

Closing an explicit transaction can either happen automatically at the end of a async with block, or can be explicitly controlled through the neo4j.AsyncTransaction.commit(), neo4j.AsyncTransaction.rollback(), neo4j.AsyncTransaction.close() or neo4j.AsyncTransaction.cancel() methods.

Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction or that need to run multiple queries within a single transaction but without the retries provided by managed transactions.

Example:

import asyncio

import neo4j


async def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
    async with driver.session(
        database="neo4j",
        # optional, defaults to WRITE_ACCESS
        default_access_mode=neo4j.WRITE_ACCESS
    ) as session:
        tx = await session.begin_transaction()
        # or just use a `with` context instead of try/excpet/finally
        try:
            if not await customer_balance_check(tx, customer_id, amount):
                # give up
                return
            await other_bank_transfer_api(customer_id, other_bank_id, amount)
            # Now the money has been transferred
            # => we can't retry or rollback anymore
            try:
                await decrease_customer_balance(tx, customer_id, amount)
                await tx.commit()
            except Exception as e:
                request_inspection(customer_id, other_bank_id, amount, e)
                raise
        except asyncio.CancelledError:
            tx.cancel()
            raise
        finally:
            await tx.close()  # rolls back if not yet committed


async def customer_balance_check(tx, customer_id, amount):
    query = ("MATCH (c:Customer {id: $id}) "
             "RETURN c.balance >= $amount AS sufficient")
    result = await tx.run(query, id=customer_id, amount=amount)
    record = await result.single(strict=True)
    return record["sufficient"]


async def other_bank_transfer_api(customer_id, other_bank_id, amount):
    ...  # make some API call to other bank


async def decrease_customer_balance(tx, customer_id, amount):
    query = ("MATCH (c:Customer {id: $id}) "
             "SET c.balance = c.balance - $amount")
    await tx.run(query, id=customer_id, amount=amount)


def request_inspection(customer_id, other_bank_id, amount, e):
    # manual cleanup required; log this or similar
    print("WARNING: transaction rolled back due to exception:", repr(e))
    print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
          "amount:", amount)

Managed Transactions (transaction functions)

Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.

These allow a function object representing the transactional unit of work to be passed as a parameter. This function is called one or more times, within a configurable time limit, until it succeeds. Results should be fully consumed within the function and only aggregate or status values should be returned. Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.

This function will receive a neo4j.AsyncManagedTransaction object as its first parameter. For more details see neo4j.AsyncSession.execute_write() and neo4j.AsyncSession.execute_read().

class neo4j.AsyncManagedTransaction

Transaction object provided to transaction functions.

Inside a transaction function, the driver is responsible for managing (committing / rolling back) the transaction. Therefore, AsyncManagedTransactions don’t offer such methods. Otherwise, they behave like AsyncTransaction.

  • To commit the transaction, return anything from the transaction function.

  • To rollback the transaction, raise any exception.

Note that transaction functions have to be idempotent (i.e., the result of running the function once has to be the same as running it any number of times). This is, because the driver will retry the transaction function if the error is classified as retryable.

New in version 5.0: Prior, transaction functions used AsyncTransaction objects, but would cause hard to interpret errors when managed explicitly (committed or rolled back by user code).

async run(query, parameters=None, **kwparameters)

Run a Cypher query within the context of this transaction.

Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:

query = "CREATE (a:Person { name: $name, age: $age })"
result = await tx.run(query, {"name": "Alice", "age": 33})
result = await tx.run(query, {"name": "Alice"}, age=33)
result = await tx.run(query, name="Alice", age=33)

Parameter values can be of any type supported by the Neo4j type system. In Python, this includes bool, int, str, list and dict. Note however that list properties must be homogenous.

Parameters:
  • query (te.LiteralString) – cypher query

  • parameters (t.Optional[t.Dict[str, t.Any]]) – dictionary of parameters

  • kwparameters (t.Any) – additional keyword parameters. These take precedence over parameters passed as parameters.

Raises:

TransactionError – if the transaction is already closed

Returns:

a new neo4j.AsyncResult object

Return type:

AsyncResult

Example:

async def create_person(driver, name)
    async with driver.session() as session:
        node_id = await session.execute_write(create_person_tx, name)


async def create_person_tx(tx, name):
    query = ("CREATE (a:Person {name: $name, id: randomUUID()}) "
             "RETURN a.id AS node_id")
    result = await tx.run(query, name=name)
    record = await result.single()
    return record["node_id"]

To exert more control over how a transaction function is carried out, the neo4j.unit_of_work() decorator can be used.

AsyncResult

Every time a query is executed, a neo4j.AsyncResult is returned.

This provides a handle to the result of the query, giving access to the records within it as well as the result metadata.

Results also contain a buffer that automatically stores unconsumed records when results are consumed out of order.

A neo4j.AsyncResult is attached to an active connection, through a neo4j.AsyncSession, until all its content has been buffered or consumed.

class neo4j.AsyncResult

Handler for the result of Cypher query execution.

Instances of this class are typically constructed and returned by AsyncSession.run() and AsyncTransaction.run().

async result.__aiter__()
async result.__anext__()
keys()

The keys for the records in this result.

Returns:

tuple of key names

Return type:

tuple

async consume()

Consume the remainder of this result and return a neo4j.ResultSummary.

Example:

async def create_node_tx(tx, name):
    result = await tx.run(
        "CREATE (n:ExampleNode {name: $name}) RETURN n", name=name
    )
    record = await result.single()
    value = record.value()
    summary = await result.consume()
    return value, summary

async with driver.session() as session:
    node_id, summary = await session.execute_write(
        create_node_tx, "example"
    )

Example:

async def get_two_tx(tx):
    result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x")
    values = []
    async for record in result:
        if len(values) >= 2:
            break
        values.append(record.values())
    # or shorter: values = [record.values()
    #                       for record in await result.fetch(2)]

    # discard the remaining records if there are any
    summary = await result.consume()
    # use the summary for logging etc.
    return values, summary

async with driver.session() as session:
    values, summary = await session.execute_read(get_two_tx)
Returns:

The neo4j.ResultSummary for this result

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed.

Return type:

ResultSummary

Changed in version 5.0: Can raise ResultConsumedError.

async single(strict: te.Literal[False] = False) Record | None
async single(strict: te.Literal[True]) Record

Obtain the next and only remaining record or None.

Calling this method always exhausts the result.

If strict is True, this method will raise an exception if there is not exactly one record left.

If strict is False, fewer than one record will make this method return None, more than one record will make this method emit a warning and return the first record.

Parameters:

strict – If True, raise a neo4j.ResultNotSingleError instead of returning None if there is more than one record or warning if there are more than 1 record. False by default.

Returns:

the next neo4j.Record or None if none remain

Warns:

if more than one record is available and strict is False

Raises:
  • ResultNotSingleError – If strict=True and not exactly one record is available.

  • ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Changed in version 5.0: Added strict parameter.

Changed in version 5.0: Can raise ResultConsumedError.

async fetch(n)

Obtain up to n records from this result.

Fetch min(n, records_left) records from this result and return them as a list.

Parameters:

n (int) – the maximum number of records to fetch.

Returns:

list of neo4j.Record

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

List[Record]

New in version 5.0.

async peek()

Obtain the next record from this result without consuming it.

This leaves the record in the buffer for further processing.

Returns:

the next neo4j.Record or None if none remain.

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

Record | None

Changed in version 5.0: Can raise ResultConsumedError.

async graph()

Turn the result into a neo4j.Graph.

Return a neo4j.graph.Graph instance containing all the graph objects in the result. This graph will also contain already consumed records.

After calling this method, the result becomes detached, buffering all remaining records.

Returns:

a result graph

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

Graph

Changed in version 5.0: Can raise ResultConsumedError.

async value(key=0, default=None)

Return the remainder of the result as a list of values.

Parameters:
  • key (int | str) – field to return for each remaining record. Obtain a single value from the record by index or key.

  • default (object | None) – default value, used if the index of key is unavailable

Returns:

list of individual values

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

List[Any]

Changed in version 5.0: Can raise ResultConsumedError.

See also

Record.value()

async values(*keys)

Return the remainder of the result as a list of values lists.

Parameters:

keys (int | str) – fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

Returns:

list of values lists

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

List[List[Any]]

Changed in version 5.0: Can raise ResultConsumedError.

See also

Record.values()

async data(*keys)

Return the remainder of the result as a list of dictionaries.

This function provides a convenient but opinionated way to obtain the remainder of the result as mostly JSON serializable data. It is mainly useful for interactive sessions and rapid prototyping.

For instance, node and relationship labels are not included. You will have to implement a custom serializer should you need more control over the output format.

Parameters:

keys (int | str) – fields to return for each remaining record. Optionally filtering to include only certain values by index or key.

Returns:

list of dictionaries

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

List[Dict[str, Any]]

Changed in version 5.0: Can raise ResultConsumedError.

See also

Record.data()

async to_df(expand=False, parse_dates=False)

Convert (the rest of) the result to a pandas DataFrame.

This method is only available if the pandas library is installed.

res = await tx.run("UNWIND range(1, 10) AS n RETURN n, n+1 AS m")
df = await res.to_df()

for instance will return a DataFrame with two columns: n and m and 10 rows.

Parameters:
  • expand (bool) –

    If True, some structures in the result will be recursively expanded (flattened out into multiple columns) like so (everything inside <...> is a placeholder):

    • Node objects under any variable <n> will be expanded into columns (the recursion stops here)

      • <n>().prop.<property_name> (any) for each property of the node.

      • <n>().element_id (str) the node’s element id. See Node.element_id.

      • <n>().labels (frozenset of str) the node’s labels. See Node.labels.

    • Relationship objects under any variable <r> will be expanded into columns (the recursion stops here)

      • <r>->.prop.<property_name> (any) for each property of the relationship.

      • <r>->.element_id (str) the relationship’s element id. See Relationship.element_id.

      • <r>->.start.element_id (str) the relationship’s start node’s element id. See Relationship.start_node.

      • <r>->.end.element_id (str) the relationship’s end node’s element id. See Relationship.end_node.

      • <r>->.type (str) the relationship’s type. See Relationship.type.

    • list objects under any variable <l> will be expanded into

      • <l>[].0 (any) the 1st list element

      • <l>[].1 (any) the 2nd list element

    • dict objects under any variable <d> will be expanded into

      • <d>{}.<key1> (any) the 1st key of the dict

      • <d>{}.<key2> (any) the 2nd key of the dict

    • list and dict objects are expanded recursively. Example:

      variable x: [{"foo": "bar", "baz": [42, 0]}, "foobar"]
      

      will be expanded to:

      {
          "x[].0{}.foo": "bar",
          "x[].0{}.baz[].0": 42,
          "n[].0{}.baz[].1": 0,
          "n[].1": "foobar"
      }
      
    • Everything else (including Path objects) will not be flattened.

    dict keys and variable names that contain . or \ will be escaped with a backslash (\. and \\ respectively).

  • parse_dates (bool) – If True, columns that exclusively contain time.DateTime objects, time.Date objects, or None, will be converted to pandas.Timestamp.

Raises:
  • ImportError – if pandas library is not available.

  • ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

pandas.DataFrame

async to_eager_result()

Convert this result to an EagerResult.

This method exhausts the result and triggers a consume().

Returns:

all remaining records in the result stream, the result’s summary, and keys as an EagerResult instance.

Raises:

ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.

Return type:

EagerResult

This is experimental. (See Filter Warnings) It might be changed or removed any time even without prior notice.

New in version 5.5.

closed()

Return True if the result has been closed.

When a result gets consumed consume() or the transaction that owns the result gets closed (committed, rolled back, closed), the result cannot be used to acquire further records.

In such case, all methods that need to access the Result’s records, will raise a ResultConsumedError when called.

Returns:

whether the result is closed.

Return type:

bool

New in version 5.0.

See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.

AsyncBookmarkManager

class neo4j.api.AsyncBookmarkManager

Same as BookmarkManager but with async methods.

The driver comes with a default implementation of the async bookmark manager accessible through AsyncGraphDatabase.bookmark_manager().

This is experimental. It might be changed or removed any time even without prior notice.

New in version 5.0.

Changed in version 5.3: See BookmarkManager for changes.

abstract async update_bookmarks(previous_bookmarks, new_bookmarks)

Handle bookmark updates.

Parameters:
  • previous_bookmarks (Collection[str]) – The bookmarks used at the start of a transaction

  • new_bookmarks (Collection[str]) – The new bookmarks retrieved at the end of a transaction

Return type:

None

abstract async get_bookmarks()

Return the bookmarks stored in the bookmark manager.

Returns:

The bookmarks for the given database

Return type:

Collection[str]

Async Cancellation

Async Python provides a mechanism for cancelling futures (asyncio.Future.cancel()). The driver and its components can handle this. However, generally, it’s not advised to rely on cancellation as it forces the driver to close affected connections to avoid leaving them in an undefined state. This makes the driver less efficient.

The easiest way to make sure your application code’s interaction with the driver is playing nicely with cancellation is to always use the async context manager provided by neo4j.AsyncSession like so:

async with driver.session() as session:
    ...  # do what you need to do with the session

If, for whatever reason, you need handle the session manually, you can it like so:

session = await with driver.session()
try:
    ...  # do what you need to do with the session
except asyncio.CancelledError:
    session.cancel()
    raise
finally:
    # this becomes a no-op if the session has been cancelled before
    await session.close()

As mentioned above, any cancellation of I/O work will cause the driver to close the affected connection. This will kill any neo4j.AsyncTransaction and neo4j.AsyncResult objects that are attached to that connection. Hence, after catching a asyncio.CancelledError, you should not try to use transactions or results created earlier. They are likely to not be valid anymore.

Furthermore, there is no guarantee as to whether a piece of ongoing work got successfully executed on the server side or not, when a cancellation happens: await transaction.commit() and other methods can throw asyncio.CancelledError but still have managed to complete from the server’s perspective.

Async Logging

For the most parts, logging works the same way as in the synchronous driver. See Logging for more information.

However, when following the manual approach to logging, it is recommended to include information about the current async task in the log record. Like so:

import asyncio
import logging
import sys

class TaskIdFilter(logging.Filter):
    """Injecting async task id into log records."""

    def filter(self, record):
        try:
            record.taskId = id(asyncio.current_task())
        except RuntimeError:
            record.taskId = None
        return True


# create a handler, e.g. to log to stdout
handler = logging.StreamHandler(sys.stdout)
# configure the handler to your liking
handler.setFormatter(logging.Formatter(
    "[%(levelname)-8s] [Task %(taskId)-15s] %(asctime)s  %(message)s"
    # or when using threading AND asyncio
    # "[%(levelname)-8s] [Thread %(thread)d] [Task %(taskId)-15s] "
    # "%(asctime)s  %(message)s"
))
# attache the filter injecting the task id to the handler
handler.addFilter(TaskIdFilter())
# add the handler to the driver's logger
logging.getLogger("neo4j").addHandler(handler)
# make sure the logger logs on the desired log level
logging.getLogger("neo4j").setLevel(logging.DEBUG)
# from now on, DEBUG logging to stdout is enabled in the driver