Async API Documentation¶
Added in version 5.0.
Warning
There are known issue with Python 3.8 and the async driver where it gradually slows down. Generally, it’s recommended to use the latest supported version of Python for best performance, stability, and security.
AsyncGraphDatabase¶
Async Driver Construction¶
The neo4j.AsyncDriver
construction is done via a classmethod
on the neo4j.AsyncGraphDatabase
class.
- class neo4j.AsyncGraphDatabase¶
Accessor for
neo4j.AsyncDriver
construction.- classmethod driver(uri, *, auth=None, **config)¶
Create a driver.
- Parameters:
uri (str) – the connection URI for the driver, see URI for available URIs.
auth (_TAuth | AsyncAuthManager) – the authentication details, see Auth for available authentication details.
config – driver configuration key-word arguments, see Async Driver Configuration for available key-word arguments.
- Return type:
Driver creation example:
import asyncio from neo4j import AsyncGraphDatabase async def main(): uri = "neo4j://example.com:7687" driver = AsyncGraphDatabase.driver(uri, auth=("neo4j", "password")) await driver.close() # close the driver object asyncio.run(main())
For basic authentication,
auth
can be a simple tuple, for example:auth = ("neo4j", "password")
This will implicitly create a
neo4j.Auth
with ascheme="basic"
. Other authentication methods are described under Auth.with
block context example:import asyncio from neo4j import AsyncGraphDatabase async def main(): uri = "neo4j://example.com:7687" auth = ("neo4j", "password") async with AsyncGraphDatabase.driver(uri, auth=auth) as driver: ... # use the driver asyncio.run(main())
- classmethod bookmark_manager(initial_bookmarks=None, bookmarks_supplier=None, bookmarks_consumer=None)¶
Create a
AsyncBookmarkManager
with default implementation.Basic usage example to configure sessions with the built-in bookmark manager implementation so that all work is automatically causally chained (i.e., all reads can observe all previous writes even in a clustered setup):
import neo4j # omitting closing the driver for brevity driver = neo4j.AsyncGraphDatabase.driver(...) bookmark_manager = neo4j.AsyncGraphDatabase.bookmark_manager(...) async with driver.session( bookmark_manager=bookmark_manager ) as session1: async with driver.session( bookmark_manager=bookmark_manager, access_mode=neo4j.READ_ACCESS ) as session2: result1 = await session1.run("<WRITE_QUERY>") await result1.consume() # READ_QUERY is guaranteed to see what WRITE_QUERY wrote. result2 = await session2.run("<READ_QUERY>") await result2.consume()
This is a very contrived example, and in this particular case, having both queries in the same session has the exact same effect and might even be more performant. However, when dealing with sessions spanning multiple threads, async Tasks, processes, or even hosts, the bookmark manager can come in handy as sessions are not safe to be used concurrently.
- Parameters:
initial_bookmarks (Bookmarks | Iterable[str] | None) – The initial set of bookmarks. The returned bookmark manager will use this to initialize its internal bookmarks.
bookmarks_supplier (Callable[[], Bookmarks | Awaitable[Bookmarks]] | None) – Function which will be called every time the default bookmark manager’s method
AsyncBookmarkManager.get_bookmarks()
gets called. The function takes no arguments and must return aBookmarks
object. The result ofbookmarks_supplier
will then be concatenated with the internal set of bookmarks and used to configure the session in creation. It will, however, not update the internal set of bookmarks.bookmarks_consumer (Callable[[Bookmarks], None | Awaitable[None]] | None) – Function which will be called whenever the set of bookmarks handled by the bookmark manager gets updated with the new internal bookmark set. It will receive the new set of bookmarks as a
Bookmarks
object and returnNone
.
- Returns:
A default implementation of
AsyncBookmarkManager
.- Return type:
Added in version 5.0.
Changed in version 5.3: The bookmark manager no longer tracks bookmarks per database. This effectively changes the signature of almost all bookmark manager related methods:
initial_bookmarks
is no longer a mapping from database name to bookmarks but plain bookmarks.bookmarks_supplier
no longer receives the database name as an argument.bookmarks_consumer
no longer receives the database name as an argument.
Changed in version 5.8: Stabilized from experimental.
URI¶
On construction, the scheme
of the URI determines the type of neo4j.AsyncDriver
object created.
Available valid URIs:
bolt://host[:port]
bolt+ssc://host[:port]
bolt+s://host[:port]
neo4j://host[:port][?routing_context]
neo4j+ssc://host[:port][?routing_context]
neo4j+s://host[:port][?routing_context]
uri = "bolt://example.com:7687"
uri = "neo4j://example.com:7687"
Each supported scheme maps to a particular neo4j.AsyncDriver
subclass that implements a specific behaviour.
URI Scheme |
Driver Object and Setting |
---|---|
bolt |
AsyncBoltDriver with no encryption. |
bolt+ssc |
AsyncBoltDriver with encryption (accepts self signed certificates). |
bolt+s |
AsyncBoltDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks. |
neo4j |
AsyncNeo4jDriver with no encryption. |
neo4j+ssc |
AsyncNeo4jDriver with encryption (accepts self signed certificates). |
neo4j+s |
AsyncNeo4jDriver with encryption (accepts only certificates signed by a certificate authority), full certificate checks. |
Note
See https://neo4j.com/docs/operations-manual/current/configuration/ports/ for Neo4j ports.
Async Auth¶
Authentication works the same as in the synchronous driver. With the exception that when using AuthManagers, their asynchronous equivalents have to be used.
- class neo4j.auth_management.AsyncAuthManager¶
Async version of
AuthManager
.See also
Added in version 5.8.
Changed in version 5.12:
on_auth_expired
was removed from the interface and replaced byhandle_security_exception()
. SeeAuthManager
.Changed in version 5.14: Stabilized from preview.
- abstract async get_auth()¶
Async version of
AuthManager.get_auth()
.See also
- abstract async handle_security_exception(auth, error)¶
Async version of
AuthManager.handle_security_exception()
.- Parameters:
auth (_TAuth)
error (Neo4jError)
- Return type:
- class neo4j.auth_management.AsyncAuthManagers¶
A collection of
AsyncAuthManager
factories.Added in version 5.8.
Changed in version 5.14: Stabilized from preview.
- static static(auth)¶
Create a static auth manager.
The manager will always return the auth info provided at its creation.
Example:
# NOTE: this example is for illustration purposes only. # The driver will automatically wrap static auth info in a # static auth manager. import neo4j from neo4j.auth_management import AsyncAuthManagers auth = neo4j.basic_auth("neo4j", "password") with neo4j.GraphDatabase.driver( "neo4j://example.com:7687", auth=AsyncAuthManagers.static(auth) # auth=auth # this is equivalent ) as driver: ... # do stuff
- Parameters:
- Returns:
An instance of an implementation of
AsyncAuthManager
that always returns the same auth.- Return type:
Added in version 5.8.
Changed in version 5.14: Stabilized from preview.
- static basic(provider)¶
Create an auth manager handling basic auth password rotation.
This factory wraps the provider function in an auth manager implementation that caches the provided auth info until the server notifies the driver that the auth info has expired (by returning an error that indicates that the password is invalid).
Note that this implies that the provider function will be called again if it provides wrong auth info, potentially deferring failure due to a wrong password or username.
Warning
The provider function must not interact with the driver in any way as this can cause deadlocks and undefined behaviour.
The provider function must only ever return auth information belonging to the same identity. Switching identities is undefined behavior. You may use session-level authentication for such use-cases.
Example:
import neo4j from neo4j.auth_management import ( AsyncAuthManagers, ExpiringAuth, ) async def auth_provider(): # some way of getting a token user, password = await get_current_auth() return (user, password) with neo4j.GraphDatabase.driver( "neo4j://example.com:7687", auth=AsyncAuthManagers.basic(auth_provider) ) as driver: ... # do stuff
- Parameters:
provider (Callable[[], Awaitable[Tuple[Any, Any] | Auth | None]]) – A callable that provides new auth info whenever the server notifies the driver that the previous auth info is invalid.
- Returns:
An instance of an implementation of
AsyncAuthManager
that returns auth info from the given provider and refreshes it, calling the provider again, when the auth info was rejected by the server.- Return type:
Added in version 5.12.
Changed in version 5.14: Stabilized from preview.
- static bearer(provider)¶
Create an auth manager for potentially expiring bearer auth tokens.
This factory wraps the provider function in an auth manager implementation that caches the provided auth info until either the
ExpiringAuth.expires_at
exceeded or the server notified the driver that the auth info has expired (by returning an error that indicates that the bearer auth token has expired).Warning
The provider function must not interact with the driver in any way as this can cause deadlocks and undefined behaviour.
The provider function must only ever return auth information belonging to the same identity. Switching identities is undefined behavior. You may use session-level authentication for such use-cases.
Example:
import neo4j from neo4j.auth_management import ( AsyncAuthManagers, ExpiringAuth, ) async def auth_provider(): # some way of getting a token sso_token = await get_sso_token() # assume we know our tokens expire every 60 seconds expires_in = 60 # Include a little buffer so that we fetch a new token # *before* the old one expires expires_in -= 10 auth = neo4j.bearer_auth(sso_token) return ExpiringAuth(auth=auth).expires_in(expires_in) with neo4j.GraphDatabase.driver( "neo4j://example.com:7687", auth=AsyncAuthManagers.bearer(auth_provider) ) as driver: ... # do stuff
- Parameters:
provider (Callable[[], Awaitable[ExpiringAuth]]) – A callable that provides a
ExpiringAuth
instance.- Returns:
An instance of an implementation of
AsyncAuthManager
that returns auth info from the given provider and refreshes it, calling the provider again, when the auth info expires (either because it’s reached its expiry time or because the server flagged it as expired).- Return type:
Added in version 5.12.
Changed in version 5.14: Stabilized from preview.
AsyncDriver¶
Every Neo4j-backed application will require a driver object.
This object holds the details required to establish connections with a Neo4j database, including server URIs, credentials and other configuration.
neo4j.AsyncDriver
objects hold a connection pool from which neo4j.AsyncSession
objects can borrow connections.
Closing a driver will immediately shut down all connections in the pool.
Note
Driver objects only open connections and pool them as needed. To verify that
the driver is able to communicate with the database without executing any
query, use neo4j.AsyncDriver.verify_connectivity()
.
- class neo4j.AsyncDriver¶
Base class for all driver types.
Drivers are used as the primary access point to Neo4j.
- async execute_query(query, parameters_=None, routing_=neo4j.RoutingControl.WRITE, database_=None, impersonated_user_=None, bookmark_manager_=self.execute_query_bookmark_manager, auth_=None, result_transformer_=AsyncResult.to_eager_result, **kwargs)¶
Execute a query in a transaction function and return all results.
This method is a handy wrapper for lower-level driver APIs like sessions, transactions, and transaction functions. It is intended for simple use cases where there is no need for managing all possible options.
The internal usage of transaction functions provides a retry-mechanism for appropriate errors. Furthermore, this means that queries using
CALL {} IN TRANSACTIONS
or the olderUSING PERIODIC COMMIT
will not work (useAsyncSession.run()
for these).The method is roughly equivalent to:
async def execute_query( query_, parameters_, routing_, database_, impersonated_user_, bookmark_manager_, auth_, result_transformer_, **kwargs ): @unit_of_work(query_.metadata, query_.timeout) async def work(tx): result = await tx.run(query_.text, parameters_, **kwargs) return await result_transformer_(result) async with driver.session( database=database_, impersonated_user=impersonated_user_, bookmark_manager=bookmark_manager_, auth=auth_, ) as session: if routing_ == RoutingControl.WRITE: return await session.execute_write(work) elif routing_ == RoutingControl.READ: return await session.execute_read(work)
Usage example:
from typing import List import neo4j async def example(driver: neo4j.AsyncDriver) -> List[str]: """Get the name of all 42 year-olds.""" records, summary, keys = await driver.execute_query( "MATCH (p:Person {age: $age}) RETURN p.name", {"age": 42}, routing_=neo4j.RoutingControl.READ, # or just "r" database_="neo4j", ) assert keys == ["p.name"] # not needed, just for illustration # log_summary(summary) # log some metadata return [str(record["p.name"]) for record in records] # or: return [str(record[0]) for record in records] # or even: return list(map(lambda r: str(r[0]), records))
Another example:
import neo4j async def example(driver: neo4j.AsyncDriver) -> int: """Call all young people "My dear" and get their count.""" record = await driver.execute_query( "MATCH (p:Person) WHERE p.age <= $age " "SET p.nickname = 'My dear' " "RETURN count(*)", # optional routing parameter, as write is default # routing_=neo4j.RoutingControl.WRITE, # or just "w", database_="neo4j", result_transformer_=neo4j.AsyncResult.single, age=15, ) assert record is not None # for typechecking and illustration count = record[0] assert isinstance(count, int) return count
- Parameters:
query (LiteralString | Query) – Cypher query to execute. Use a
Query
object to pass a query with additional transaction configuration.parameters_ (Dict[str, Any] | None) – parameters to use in the query
routing_ (RoutingControl) – Whether to route the query to a reader (follower/read replica) or a writer (leader) in the cluster. Default is to route to a writer.
database_ (str | None) –
Database to execute the query against.
None (default) uses the database configured on the server side.
Note
It is recommended to always specify the database explicitly when possible. This allows the driver to work more efficiently, as it will not have to resolve the default database first.
See also the Session config database.
impersonated_user_ (str | None) –
Name of the user to impersonate.
This means that all query will be executed in the security context of the impersonated user. For this, the user for which the
Driver
has been created needs to have the appropriate permissions.See also the Session config impersonated_user.
auth_ (Tuple[Any, Any] | Auth | None) –
Authentication information to use for this query.
By default, the driver configuration is used.
See also the Session config auth.
result_transformer_ (Callable[[AsyncResult], Awaitable[T]]) –
A function that gets passed the
neo4j.AsyncResult
object resulting from the query and converts it to a different type. The result of the transformer function is returned by this method.Warning
The transformer function must not return the
neo4j.AsyncResult
itself.Warning
N.B. the driver might retry the underlying transaction so the transformer might get invoked more than once (with different
neo4j.AsyncResult
objects). Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).Example transformer that checks that exactly one record is in the result stream, then returns the record and the result summary:
from typing import Tuple import neo4j async def transformer( result: neo4j.AsyncResult ) -> Tuple[neo4j.Record, neo4j.ResultSummary]: record = await result.single(strict=True) summary = await result.consume() return record, summary
Note that methods of
neo4j.AsyncResult
that don’t take mandatory arguments can be used directly as transformer functions. For example:import neo4j async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: record = await driver.execute_query( "SOME QUERY", result_transformer_=neo4j.AsyncResult.single ) # is equivalent to: async def transformer(result: neo4j.AsyncResult) -> neo4j.Record: return await result.single() async def example(driver: neo4j.AsyncDriver) -> neo4j.Record:: record = await driver.execute_query( "SOME QUERY", result_transformer_=transformer )
bookmark_manager_ (AsyncBookmarkManager | BookmarkManager | None) –
Specify a bookmark manager to use.
If present, the bookmark manager is used to keep the query causally consistent with all work executed using the same bookmark manager.
Defaults to the driver’s
execute_query_bookmark_manager
.Pass
None
to disable causal consistency.kwargs (Any) – additional keyword parameters. None of these can end with a single underscore. This is to avoid collisions with the keyword configuration parameters of this method. If you need to pass such a parameter, use the
parameters_
parameter instead. Parameters passed as kwargs take precedence over those passed inparameters_
.
- Returns:
the result of the
result_transformer_
- Return type:
T
Added in version 5.5.
Changed in version 5.8:
Added
auth_
parameter in preview.Stabilized from experimental.
Changed in version 5.14: Stabilized
auth_
parameter from preview.
- session(**config)¶
Create a session.
See AsyncSession Construction for details.
- Parameters:
config – session configuration key-word arguments, see Session Configuration for available key-word arguments.
- Returns:
new
neo4j.AsyncSession
object- Return type:
- async close()¶
Shut down, closing any open connections in the pool.
- Return type:
None
- property execute_query_bookmark_manager: AsyncBookmarkManager¶
The driver’s default query bookmark manager.
This is the default
AsyncBookmarkManager
used byexecute_query()
. This can be used to causally chainexecute_query()
calls and sessions. Example:async def example(driver: neo4j.AsyncDriver) -> None: await driver.execute_query("<QUERY 1>") async with driver.session( bookmark_manager=driver.execute_query_bookmark_manager ) as session: # every query inside this session will be causally chained # (i.e., can read what was written by <QUERY 1>) await session.run("<QUERY 2>") # subsequent execute_query calls will be causally chained # (i.e., can read what was written by <QUERY 2>) await driver.execute_query("<QUERY 3>")
Added in version 5.5.
Changed in version 5.8:
Renamed from
query_bookmark_manager
toexecute_query_bookmark_manager
.Stabilized from experimental.
- async verify_connectivity(**config)¶
Verify that the driver can establish a connection to the server.
This verifies if the driver can establish a reading connection to a remote server or a cluster. Some data will be exchanged.
Note
Even if this method raises an exception, the driver still needs to be closed via
close()
to free up all resources.- Parameters:
config –
accepts the same configuration key-word arguments as
session()
.Warning
All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.
- Raises:
Exception – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.
- Return type:
None
Changed in version 5.0: The undocumented return value has been removed. If you need information about the remote server, use
get_server_info()
instead.
- async get_server_info(**config)¶
Get information about the connected Neo4j server.
Try to establish a working read connection to the remote server or a member of a cluster and exchange some data. Then return the contacted server’s information.
In a cluster, there is no guarantee about which server will be contacted.
Note
Even if this method raises an exception, the driver still needs to be closed via
close()
to free up all resources.- Parameters:
config –
accepts the same configuration key-word arguments as
session()
.Warning
All configuration key-word arguments are experimental. They might be changed or removed in any future version without prior notice.
- Raises:
Exception – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.
- Return type:
Added in version 5.0.
- async supports_multi_db()¶
Check if the server or cluster supports multi-databases.
- Returns:
Returns true if the server or cluster the driver connects to supports multi-databases, otherwise false.
- Return type:
Note
Feature support query based solely on the Bolt protocol version. The feature might still be disabled on the server side even if this function return
True
. It just guarantees that the driver won’t throw aConfigurationError
when trying to use this driver feature.
- async verify_authentication(auth=None, **config)¶
Verify that the authentication information is valid.
Like
verify_connectivity()
, but for checking authentication.Try to establish a working read connection to the remote server or a member of a cluster and exchange some data. In a cluster, there is no guarantee about which server will be contacted. If the data exchange is successful and the authentication information is valid,
True
is returned. Otherwise, the error will be matched against a list of known authentication errors. If the error is on that list,False
is returned indicating that the authentication information is invalid. Otherwise, the error is re-raised.- Parameters:
auth (Auth | tuple[str, str] | None) – authentication information to verify. Same as the session config Auth.
config –
accepts the same configuration key-word arguments as
session()
.Warning
All configuration key-word arguments (except
auth
) are experimental. They might be changed or removed in any future version without prior notice.
- Raises:
Exception – if the driver cannot connect to the remote. Use the exception to further understand the cause of the connectivity problem.
- Return type:
Added in version 5.8.
Changed in version 5.14: Stabilized from experimental.
- async supports_session_auth()¶
Check if the remote supports connection re-authentication.
- Returns:
Returns true if the server or cluster the driver connects to supports re-authentication of existing connections, otherwise false.
- Return type:
Note
Feature support query based solely on the Bolt protocol version. The feature might still be disabled on the server side even if this function return
True
. It just guarantees that the driver won’t throw aConfigurationError
when trying to use this driver feature.Added in version 5.8.
Async Driver Configuration¶
neo4j.AsyncDriver
is configured exactly like neo4j.Driver
(see Driver Configuration). The only differences are that the async
driver accepts
a sync as well as an async custom resolver function (see resolver)
a sync as well as an async auth token manager (see
AsyncAuthManager
).an async client certificate provider (see client_certificate).
resolver
¶
A custom resolver function to resolve any addresses the driver receives ahead of DNS resolution.
This function is called with an Address
and should return an iterable of Address
objects or values that can be used to construct Address
objects.
If no custom resolver function is supplied, the internal resolver moves straight to regular DNS resolution.
The custom resolver function can but does not have to be a coroutine.
For example:
import neo4j
async def custom_resolver(socket_address):
# assert isinstance(socket_address, neo4j.Address)
if socket_address != ("example.com", 9999):
raise OSError(f"Unexpected socket address {socket_address!r}")
# You can return any neo4j.Address object
yield neo4j.Address(("localhost", 7687)) # IPv4
yield neo4j.Address(("::1", 7687, 0, 0)) # IPv6
yield neo4j.Address.parse("localhost:7687")
yield neo4j.Address.parse("[::1]:7687")
# or any tuple that can be passed to neo4j.Address(...).
# Initially, this will be interpreted as IPv4, but DNS resolution
# will turn it into IPv6 if appropriate.
yield "::1", 7687
# This will be interpreted as IPv6 directly, but DNS resolution will
# still happen.
yield "::1", 7687, 0, 0
yield "127.0.0.1", 7687
# alternatively
def custom_resolver(socket_address):
...
driver = neo4j.GraphDatabase.driver("neo4j://example.com:9999",
auth=("neo4j", "password"),
resolver=custom_resolver)
- Default:
client_certificate
¶
Specify a client certificate or certificate provider for mutual TLS (mTLS) authentication.
This setting does not have any effect if encrypted
is set to False
(and the URI scheme is bolt://
or neo4j://
) or a custom ssl_context
is configured.
- Type:
- Default:
Added in version 5.19.
Changed in version 5.27: Stabilized from preview.
- class neo4j.auth_management.AsyncClientCertificateProvider¶
Async version of
ClientCertificateProvider
.The package provides some default implementations of this class in
AsyncClientCertificateProviders
for convenience.Added in version 5.19.
Changed in version 5.27: Stabilized from preview.
- abstract async get_certificate()¶
Return the new certificate (if present) to use for new connections.
- Return type:
ClientCertificate | None
- class neo4j.auth_management.AsyncClientCertificateProviders¶
A collection of
AsyncClientCertificateProvider
factories.Added in version 5.19.
Changed in version 5.27: Stabilized from preview.
- static static(cert)¶
Create a static client certificate provider.
The provider simply makes the driver use the given certificate for all connections.
- Parameters:
cert (ClientCertificate)
- Return type:
- static rotating(initial_cert)¶
Create certificate provider that allows for rotating certificates.
- Parameters:
initial_cert (ClientCertificate)
- Return type:
- class neo4j.auth_management.AsyncRotatingClientCertificateProvider¶
Bases:
AsyncClientCertificateProvider
Abstract base class for certificate providers that can rotate certificates.
The provider will make the driver use the initial certificate for all connections until the certificate is updated using the
update_certificate()
method. From that point on, the new certificate will be used for all new connections untilupdate_certificate()
is called again and so on.Example:
from neo4j import AsyncGraphDatabase from neo4j.auth_management import ( ClientCertificate, AsyncClientCertificateProviders, ) provider = AsyncClientCertificateProviders.rotating( ClientCertificate( certfile="path/to/certfile.pem", keyfile="path/to/keyfile.pem", password=lambda: "super_secret_password" ) ) driver = AsyncGraphDatabase.driver( # secure driver must be configured for client certificate # to be used: (...+s[sc] scheme or encrypted=True) "neo4j+s://example.com:7687", # auth still required as before, unless server is configured to not # use authentication auth=("neo4j", "password"), client_certificate=provider ) # do work with the driver, until the certificate needs to be rotated ... await provider.update_certificate( ClientCertificate( certfile="path/to/new/certfile.pem", keyfile="path/to/new/keyfile.pem", password=lambda: "new_super_secret_password" ) ) # do more work with the driver, until the certificate needs to be # rotated again ...
Added in version 5.19.
Changed in version 5.24: Turned this class into an abstract class to make the actual implementation internal. This entails removing the possibility to directly instantiate this class. Please use the factory method
AsyncClientCertificateProviders.rotating()
instead.Changed in version 5.27: Stabilized from preview.
- abstract async update_certificate(cert)¶
Update the certificate to use for new connections.
- Parameters:
cert (ClientCertificate)
- Return type:
None
Driver Object Lifetime¶
For general applications, it is recommended to create one top-level neo4j.AsyncDriver
object that lives for the lifetime of the application.
For example:
from neo4j import AsyncGraphDatabase
class Application:
def __init__(self, uri, user, password)
self.driver = AsyncGraphDatabase.driver(uri, auth=(user, password))
async def close(self):
await self.driver.close()
Connection details held by the neo4j.AsyncDriver
are immutable.
Therefore if, for example, a password is changed, a replacement neo4j.AsyncDriver
object must be created.
More than one AsyncDriver
may be required if connections to multiple remotes, or connections as multiple users, are required,
unless when using impersonation (impersonated_user).
neo4j.AsyncDriver
objects are safe to be used in concurrent coroutines.
They are not thread-safe.
AsyncBoltDriver¶
- URI schemes:
bolt
,bolt+ssc
,bolt+s
Will result in:
- class neo4j.AsyncBoltDriver(pool, default_workspace_config)¶
AsyncBoltDriver
is instantiated forbolt
URIs.It addresses a single database machine. This may be a standalone server or could be a specific member of a cluster.
Connections established by a
AsyncBoltDriver
are always made to the exact host and port detailed in the URI.This class is not supposed to be instantiated externally. Use
AsyncGraphDatabase.driver()
instead.
AsyncNeo4jDriver¶
- URI schemes:
neo4j
,neo4j+ssc
,neo4j+s
Will result in:
- class neo4j.AsyncNeo4jDriver(pool, default_workspace_config)¶
AsyncNeo4jDriver
is instantiated forneo4j
URIs.The routing behaviour works in tandem with Neo4j’s Causal Clustering feature by directing read and write behaviour to appropriate cluster members.
This class is not supposed to be instantiated externally. Use
AsyncGraphDatabase.driver()
instead.
AsyncSessions & AsyncTransactions¶
All database activity is co-ordinated through two mechanisms:
sessions (neo4j.AsyncSession
) and transactions
(neo4j.AsyncTransaction
, neo4j.AsyncManagedTransaction
).
A session is a logical container for any number of causally-related transactional units of work. Sessions automatically provide guarantees of causal consistency within a clustered environment but multiple sessions can also be causally chained if required. Sessions provide the top level of containment for database activity. Session creation is a lightweight operation and sessions are not thread safe.
Connections are drawn from the neo4j.AsyncDriver
connection pool as required.
A transaction is a unit of work that is either committed in its entirety or is rolled back on failure.
AsyncSession Construction¶
To construct a neo4j.AsyncSession
use the neo4j.AsyncDriver.session()
method.
import asyncio
from neo4j import AsyncGraphDatabase
async def main():
async with AsyncGraphDatabase.driver(uri, auth=(user, password)) as driver:
session = driver.session()
try:
result = await session.run("MATCH (a:Person) RETURN a.name AS name")
names = [record["name"] async for record in result]
except asyncio.CancelledError:
session.cancel()
raise
finally:
await session.close()
asyncio.run(main())
Sessions will often be created and destroyed using a with block context. This is the recommended approach as it takes care of closing the session properly even when an exception is raised.
async with driver.session() as session:
result = await session.run("MATCH (a:Person) RETURN a.name AS name")
... # do something with the result
Sessions will often be created with some configuration settings, see Session Configuration.
async with driver.session(database="example_database",
fetch_size=100) as session:
result = await session.run("MATCH (a:Person) RETURN a.name AS name")
... # do something with the result
AsyncSession¶
- class neo4j.AsyncSession¶
Context for executing work.
A
AsyncSession
is a logical context for transactional units of work. Connections are drawn from theAsyncDriver
connection pool as required.Session creation is a lightweight operation and sessions are not safe to be used in concurrent contexts (multiple threads/coroutines). Therefore, a session should generally be short-lived, and must not span multiple threads/asynchronous Tasks.
In general, sessions will be created and destroyed within a with context. For example:
async with driver.session(database="neo4j") as session: result = await session.run("MATCH (n:Person) RETURN n.name AS name") ... # do something with the result
Note
Some asyncio utility functions (e.g.,
asyncio.wait_for()
andasyncio.shield()
) will wrap work in aasyncio.Task
. This introduces concurrency and can lead to undefined behavior asAsyncSession
is not concurrency-safe.Consider this wrong example
async def dont_do_this(driver): async with driver.session() as session: await asyncio.shield(session.run("RETURN 1"))
If
dont_do_this
gets cancelled while waiting forsession.run
,session.run
itself won’t get cancelled (it’s shielded) so it will continue to use the session in another Task. Concurrently, will the async context manager (async with driver.session()
) on exit clean up the session. That’s two Tasks handling the session concurrently. Therefore, this yields undefined behavior.In this particular example, the problem could be solved by shielding the whole coroutine
dont_do_this
instead of only thesession.run
. Like soasync def thats_better(driver): async def inner() async with driver.session() as session: await session.run("RETURN 1") await asyncio.shield(inner())
- async close()¶
Close the session.
This will release any borrowed resources, such as connections, and will roll back any outstanding transactions.
- Return type:
None
- cancel()¶
Cancel this session.
If the session is already closed, this method does nothing. Else, it will if present, forcefully close the connection the session holds. This will violently kill all work in flight.
The primary purpose of this function is to handle
asyncio.CancelledError
.session = driver.session() try: ... # do some work except asyncio.CancelledError: session.cancel() raise
- Return type:
None
- closed()¶
Indicate whether the session has been closed.
- async run(query, parameters=None, **kwargs)¶
Run a Cypher query within an auto-commit transaction.
The query is sent and the result header received immediately but the
neo4j.Result
content is fetched lazily as consumed by the client application.If a query is executed before a previous
neo4j.AsyncResult
in the sameAsyncSession
has been fully consumed, the first result will be fully fetched and buffered. Note therefore that the generally recommended pattern of usage is to fully consume one result before executing a subsequent query. If two results need to be consumed in parallel, multipleAsyncSession
objects can be used as an alternative to result buffering.For more usage details, see
AsyncTransaction.run()
.- Parameters:
- Returns:
a new
neo4j.AsyncResult
object- Raises:
SessionError – if the session has been closed.
- Return type:
- async last_bookmarks()¶
Return most recent bookmarks of the session.
Bookmarks can be used to causally chain sessions. For example, if a session (
session1
) wrote something, that another session (session2
) needs to read, usesession2 = driver.session(bookmarks=session1.last_bookmarks())
to achieve this.Combine the bookmarks of multiple sessions like so:
bookmarks1 = await session1.last_bookmarks() bookmarks2 = await session2.last_bookmarks() session3 = driver.session(bookmarks=bookmarks1 + bookmarks2)
A session automatically manages bookmarks, so this method is rarely needed. If you need causal consistency, try to run the relevant queries in the same session.
“Most recent bookmarks” are either the bookmarks passed to the session on creation, or the last bookmark the session received after committing a transaction to the server.
Note: For auto-commit transactions (
Session.run()
), this will triggerResult.consume()
for the current result.- Returns:
the session’s last known bookmarks
- Return type:
- async last_bookmark()¶
Get the bookmark received following the last completed transaction.
Note: For auto-commit transactions (
Session.run()
), this will triggerResult.consume()
for the current result.Warning
This method can lead to unexpected behaviour if the session has not yet successfully completed a transaction.
- Returns:
last bookmark
- Return type:
str | None
Deprecated since version 5.0:
last_bookmark()
will be removed in version 6.0. Uselast_bookmarks()
instead.
- async begin_transaction(metadata=None, timeout=None)¶
Begin a new unmanaged transaction.
Creates a new
AsyncTransaction
within this session. At most one transaction may exist in a session at any point in time. To maintain multiple concurrent transactions, use multiple concurrent sessions.Note: For auto-commit transactions (
AsyncSession.run()
), this will trigger aAsyncResult.consume()
for the current result.- Parameters:
metadata (dict[str, Any] | None) – a dictionary with metadata. Specified metadata will be attached to the executing transaction and visible in the output of
SHOW TRANSACTIONS YIELD *
It will also get logged to thequery.log
. This functionality makes it easier to tag transactions and is equivalent to thedbms.setTXMetaData
procedure, see https://neo4j.com/docs/cypher-manual/current/clauses/transaction-clauses/#query-listing-transactions and https://neo4j.com/docs/operations-manual/current/reference/procedures/ for reference.timeout (float | None) – the transaction timeout in seconds. Transactions that execute longer than the configured timeout will be terminated by the database. This functionality allows user code to limit query/transaction execution time. The specified timeout overrides the default timeout configured in the database using the
db.transaction.timeout
setting (dbms.transaction.timeout
before Neo4j 5.0). Values higher thandb.transaction.timeout
will be ignored and will fall back to the default for server versions between 4.2 and 5.2 (inclusive). The value should not represent a negative duration. A0
duration will make the transaction execute indefinitely.None
will use the default timeout configured on the server.
- Returns:
A new transaction instance.
- Raises:
TransactionError – if a transaction is already open.
SessionError – if the session has been closed.
- Return type:
- async read_transaction(transaction_function, *args, **kwargs)¶
Execute a unit of work in a managed read transaction.
Note
This does not necessarily imply access control, see the session configuration option default_access_mode.
- Parameters:
transaction_function (Callable[[AsyncManagedTransaction, P], Awaitable[R]]) – a function that takes a transaction as an argument and does work with the transaction.
transaction_function(tx, *args, **kwargs)
wheretx
is aAsyncManagedTransaction
.args (P) – additional arguments for the transaction_function
kwargs (P) – key word arguments for the transaction_function
- Returns:
a result as returned by the given unit of work
- Return type:
R
- Raises:
SessionError – if the session has been closed.
Deprecated since version 5.0: Method was renamed to
execute_read()
.
- async execute_read(transaction_function, *args, **kwargs)¶
Execute a unit of work in a managed read transaction.
Note
This does not necessarily imply access control, see the session configuration option default_access_mode.
This transaction will automatically be committed when the function returns, unless an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).
Example:
async def do_cypher_tx(tx, cypher): result = await tx.run(cypher) values = [record.values() async for record in result] return values async with driver.session() as session: values = await session.execute_read(do_cypher_tx, "RETURN 1 AS x")
Example:
async def get_two_tx(tx): result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x") values = [] async for record in result: if len(values) >= 2: break values.append(record.values()) # or shorter: values = [record.values() # for record in await result.fetch(2)] # discard the remaining records if there are any summary = await result.consume() # use the summary for logging etc. return values async with driver.session() as session: values = await session.execute_read(get_two_tx)
- Parameters:
transaction_function (Callable[[AsyncManagedTransaction, P], Awaitable[R]]) – a function that takes a transaction as an argument and does work with the transaction.
transaction_function(tx, *args, **kwargs)
wheretx
is aAsyncManagedTransaction
.args (P) – additional arguments for the transaction_function
kwargs (P) – key word arguments for the transaction_function
- Returns:
whatever the given transaction_function returns
- Return type:
R
- Raises:
SessionError – if the session has been closed.
Added in version 5.0.
- async write_transaction(transaction_function, *args, **kwargs)¶
Execute a unit of work in a managed write transaction.
Note
This does not necessarily imply access control, see the session configuration option default_access_mode.
- Parameters:
transaction_function (Callable[[AsyncManagedTransaction, P], Awaitable[R]]) – a function that takes a transaction as an argument and does work with the transaction.
transaction_function(tx, *args, **kwargs)
wheretx
is aAsyncManagedTransaction
.args (P) – additional arguments for the transaction_function
kwargs (P) – key word arguments for the transaction_function
- Returns:
a result as returned by the given unit of work
- Return type:
R
- Raises:
SessionError – if the session has been closed.
Deprecated since version 5.0: Method was renamed to
execute_write()
.
- async execute_write(transaction_function, *args, **kwargs)¶
Execute a unit of work in a managed write transaction.
Note
This does not necessarily imply access control, see the session configuration option default_access_mode.
This transaction will automatically be committed when the function returns unless, an exception is thrown during query execution or by the user code. Note, that this function performs retries and that the supplied transaction_function might get invoked more than once. Therefore, it needs to be idempotent (i.e., have the same effect, regardless if called once or many times).
Example:
async def create_node_tx(tx, name): query = ("CREATE (n:NodeExample {name: $name, id: randomUUID()}) " "RETURN n.id AS node_id") result = await tx.run(query, name=name) record = await result.single() return record["node_id"] async with driver.session() as session: node_id = await session.execute_write(create_node_tx, "Bob")
- Parameters:
transaction_function (Callable[[AsyncManagedTransaction, P], Awaitable[R]]) – a function that takes a transaction as an argument and does work with the transaction.
transaction_function(tx, *args, **kwargs)
wheretx
is aAsyncManagedTransaction
.args (P) – additional arguments for the transaction_function
kwargs (P) – key word arguments for the transaction_function
- Returns:
a result as returned by the given unit of work
- Return type:
R
- Raises:
SessionError – if the session has been closed.
Added in version 5.0.
Session Configuration¶
neo4j.AsyncSession
is configured exactly like neo4j.Session
(see Session Configuration). The only difference is the async session
accepts either a neo4j.api.BookmarkManager
object or a
neo4j.api.AsyncBookmarkManager
as bookmark manager:
bookmark_manager
¶
Specify a bookmark manager for the driver to use. If present, the bookmark manager is used to keep all work on the driver causally consistent.
See BookmarkManager
for more information.
Warning
Enabling the BookmarkManager can have a negative impact on performance since all queries will wait for the latest changes to be propagated across the cluster.
For simpler use-cases, sessions (AsyncSession
) can be used to
group a series of queries together that will be causally chained
automatically.
- Type:
- Default:
Changed in version 5.8: Stabilized from experimental.
AsyncTransaction¶
Neo4j supports three kinds of async transaction:
Each has pros and cons but if in doubt, use a managed transaction with a transaction function.
Auto-commit Transactions¶
Auto-commit transactions are the simplest form of transaction, available via
neo4j.Session.run()
. These are easy to use but support only one
statement per transaction and are not automatically retried on failure.
Auto-commit transactions are also the only way to run PERIODIC COMMIT
(only Neo4j 4.4 and earlier) or CALL {...} IN TRANSACTIONS
(Neo4j 4.4 and
newer) statements, since those Cypher clauses manage their own transactions
internally.
Write example:
import neo4j
async def create_person(driver, name):
# default_access_mode defaults to WRITE_ACCESS
async with driver.session(database="neo4j") as session:
query = "CREATE (a:Person { name: $name }) RETURN id(a) AS node_id"
result = await session.run(query, name=name)
record = await result.single()
return record["node_id"]
Read example:
import neo4j
async def get_numbers(driver):
numbers = []
async with driver.session(
database="neo4j",
default_access_mode=neo4j.READ_ACCESS
) as session:
result = await session.run("UNWIND [1, 2, 3] AS x RETURN x")
async for record in result:
numbers.append(record["x"])
return numbers
Explicit Transactions (Unmanaged Transactions)¶
Explicit transactions support multiple statements and must be created with an explicit neo4j.AsyncSession.begin_transaction()
call.
This creates a new neo4j.AsyncTransaction
object that can be used to run Cypher.
It also gives applications the ability to directly control commit
and rollback
activity.
- class neo4j.AsyncTransaction¶
Fully user-managed transaction.
Container for multiple Cypher queries to be executed within a single context.
AsyncTransaction
objects can be used as a context manager (async with
block) where the transaction is committed or rolled back based on whether an exception is raised:async with await session.begin_transaction() as tx: ...
- async run(query, parameters=None, **kwparameters)¶
Run a Cypher query within the context of this transaction.
Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:
query = "CREATE (a:Person { name: $name, age: $age })" result = await tx.run(query, {"name": "Alice", "age": 33}) result = await tx.run(query, {"name": "Alice"}, age=33) result = await tx.run(query, name="Alice", age=33)
Parameter values can be of any type supported by the Neo4j type system. In Python, this includes
bool
,int
,str
,list
anddict
. Note however thatlist
properties must be homogenous.- Parameters:
query (LiteralString) – cypher query
parameters (Dict[str, Any] | None) – dictionary of parameters
kwparameters (Any) – additional keyword parameters. These take precedence over parameters passed as
parameters
.
- Raises:
TransactionError – if the transaction is already closed
- Returns:
a new
neo4j.AsyncResult
object- Return type:
- async commit()¶
Commit the transaction and close it.
Marks this transaction as successful and closes in order to trigger a COMMIT.
- Raises:
TransactionError – if the transaction is already closed
- Return type:
None
- async rollback()¶
Rollback the transaction and close it.
Marks the transaction as unsuccessful and closes in order to trigger a ROLLBACK.
- Raises:
TransactionError – if the transaction is already closed
- Return type:
None
- async close()¶
Close this transaction, triggering a ROLLBACK if not closed.
- Return type:
None
- cancel()¶
Cancel this transaction.
If the transaction is already closed, this method does nothing. Else, it will close the connection without ROLLBACK or COMMIT in a non-blocking manner.
The primary purpose of this function is to handle
asyncio.CancelledError
.tx = await session.begin_transaction() try: ... # do some work except asyncio.CancelledError: tx.cancel() raise
- Return type:
None
Closing an explicit transaction can either happen automatically at the end of a async with
block,
or can be explicitly controlled through the neo4j.AsyncTransaction.commit()
, neo4j.AsyncTransaction.rollback()
, neo4j.AsyncTransaction.close()
or neo4j.AsyncTransaction.cancel()
methods.
Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction or that need to run multiple queries within a single transaction but without the retries provided by managed transactions.
Example:
import asyncio
import neo4j
async def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
async with driver.session(
database="neo4j",
# optional, defaults to WRITE_ACCESS
default_access_mode=neo4j.WRITE_ACCESS
) as session:
tx = await session.begin_transaction()
# or just use a `with` context instead of try/excpet/finally
try:
if not await customer_balance_check(tx, customer_id, amount):
# give up
return
await other_bank_transfer_api(customer_id, other_bank_id, amount)
# Now the money has been transferred
# => we can't retry or rollback anymore
try:
await decrease_customer_balance(tx, customer_id, amount)
await tx.commit()
except Exception as e:
request_inspection(customer_id, other_bank_id, amount, e)
raise
except asyncio.CancelledError:
tx.cancel()
raise
finally:
await tx.close() # rolls back if not yet committed
async def customer_balance_check(tx, customer_id, amount):
query = ("MATCH (c:Customer {id: $id}) "
"RETURN c.balance >= $amount AS sufficient")
result = await tx.run(query, id=customer_id, amount=amount)
record = await result.single(strict=True)
return record["sufficient"]
async def other_bank_transfer_api(customer_id, other_bank_id, amount):
... # make some API call to other bank
async def decrease_customer_balance(tx, customer_id, amount):
query = ("MATCH (c:Customer {id: $id}) "
"SET c.balance = c.balance - $amount")
await tx.run(query, id=customer_id, amount=amount)
def request_inspection(customer_id, other_bank_id, amount, e):
# manual cleanup required; log this or similar
print("WARNING: transaction rolled back due to exception:", repr(e))
print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
"amount:", amount)
Managed Transactions (transaction functions)¶
Transaction functions are the most powerful form of transaction, providing access mode override and retry capabilities.
These allow a function object representing the transactional unit of work to be passed as a parameter. This function is called one or more times, within a configurable time limit, until it succeeds. Results should be fully consumed within the function and only aggregate or status values should be returned. Returning a live result object would prevent the driver from correctly managing connections and would break retry guarantees.
This function will receive a neo4j.AsyncManagedTransaction
object as its first parameter. For more details see neo4j.AsyncSession.execute_write()
and neo4j.AsyncSession.execute_read()
.
- class neo4j.AsyncManagedTransaction¶
Transaction object provided to transaction functions.
Inside a transaction function, the driver is responsible for managing (committing / rolling back) the transaction. Therefore, AsyncManagedTransactions don’t offer such methods. Otherwise, they behave like
AsyncTransaction
.To commit the transaction, return anything from the transaction function.
To rollback the transaction, raise any exception.
Note that transaction functions have to be idempotent (i.e., the result of running the function once has to be the same as running it any number of times). This is, because the driver will retry the transaction function if the error is classified as retryable.
Added in version 5.0: Prior, transaction functions used
AsyncTransaction
objects, but would cause hard to interpret errors when managed explicitly (committed or rolled back by user code).- async run(query, parameters=None, **kwparameters)¶
Run a Cypher query within the context of this transaction.
Cypher is typically expressed as a query template plus a set of named parameters. In Python, parameters may be expressed through a dictionary of parameters, through individual parameter arguments, or as a mixture of both. For example, the run queries below are all equivalent:
query = "CREATE (a:Person { name: $name, age: $age })" result = await tx.run(query, {"name": "Alice", "age": 33}) result = await tx.run(query, {"name": "Alice"}, age=33) result = await tx.run(query, name="Alice", age=33)
Parameter values can be of any type supported by the Neo4j type system. In Python, this includes
bool
,int
,str
,list
anddict
. Note however thatlist
properties must be homogenous.- Parameters:
query (LiteralString) – cypher query
parameters (Dict[str, Any] | None) – dictionary of parameters
kwparameters (Any) – additional keyword parameters. These take precedence over parameters passed as
parameters
.
- Raises:
TransactionError – if the transaction is already closed
- Returns:
a new
neo4j.AsyncResult
object- Return type:
Example:
async def create_person(driver, name)
async with driver.session() as session:
node_id = await session.execute_write(create_person_tx, name)
async def create_person_tx(tx, name):
query = ("CREATE (a:Person {name: $name, id: randomUUID()}) "
"RETURN a.id AS node_id")
result = await tx.run(query, name=name)
record = await result.single()
return record["node_id"]
To exert more control over how a transaction function is carried out, the neo4j.unit_of_work()
decorator can be used.
AsyncResult¶
Every time a query is executed, a neo4j.AsyncResult
is returned.
This provides a handle to the result of the query, giving access to the records within it as well as the result metadata.
Results also contain a buffer that automatically stores unconsumed records when results are consumed out of order.
A neo4j.AsyncResult
is attached to an active connection, through a neo4j.AsyncSession
, until all its content has been buffered or consumed.
- class neo4j.AsyncResult¶
Handler for the result of Cypher query execution.
Instances of this class are typically constructed and returned by
AsyncSession.run()
andAsyncTransaction.run()
.- async __aiter__()¶
Create an iterator returning records.
Advancing the iterator advances the underlying result stream. So even when creating multiple iterators from the same result, each Record will only be returned once.
- Returns:
Iterator over the result stream’s records.
- Return type:
- async __anext__()¶
Advance the result stream and return the record.
- Raises:
StopAsyncIteration – if no more records are available.
- Return type:
- async consume()¶
Consume the remainder of this result and return the summary.
Example:
async def create_node_tx(tx, name): result = await tx.run( "CREATE (n:ExampleNode {name: $name}) RETURN n", name=name ) record = await result.single() value = record.value() summary = await result.consume() return value, summary async with driver.session() as session: node_id, summary = await session.execute_write( create_node_tx, "example" )
Example:
async def get_two_tx(tx): result = await tx.run("UNWIND [1,2,3,4] AS x RETURN x") values = [] async for record in result: if len(values) >= 2: break values.append(record.values()) # or shorter: values = [record.values() # for record in await result.fetch(2)] # discard the remaining records if there are any summary = await result.consume() # use the summary for logging etc. return values, summary async with driver.session() as session: values, summary = await session.execute_read(get_two_tx)
- Returns:
The
neo4j.ResultSummary
for this result- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed.
- Return type:
Changed in version 5.0: Can raise
ResultConsumedError
.
- async single(strict: Literal[False] = False) Record | None ¶
- async single(strict: Literal[True]) Record
Obtain the next and only remaining record or None.
Calling this method always exhausts the result.
If
strict
isTrue
, this method will raise an exception if there is not exactly one record left.If
strict
isFalse
, fewer than one record will make this method returnNone
, more than one record will make this method emit a warning and return the first record.- Parameters:
strict (bool) – If
True
, raise aResultNotSingleError
instead of returningNone
if there is more than one record or warning if there is more than 1 record.False
by default.- Returns:
the next
neo4j.Record
orNone
if none remain- Warns:
if more than one record is available and
strict
isFalse
- Raises:
ResultNotSingleError – If
strict=True
and not exactly one record is available.ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
Changed in version 5.0:
Added
strict
parameter.Can raise
ResultConsumedError
.
- async fetch(n)¶
Obtain up to n records from this result.
Fetch
min(n, records_left)
records from this result and return them as a list.- Parameters:
n (int) – the maximum number of records to fetch.
- Returns:
list of
neo4j.Record
- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
Added in version 5.0.
- async peek()¶
Obtain the next record from this result without consuming it.
This leaves the record in the buffer for further processing.
- Returns:
the next
neo4j.Record
orNone
if none remain.- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
Record | None
Changed in version 5.0: Can raise
ResultConsumedError
.
- async graph()¶
Turn the result into a
Graph
.Return a
Graph
instance containing all the graph objects in the result. This graph will also contain already consumed records.After calling this method, the result becomes detached, buffering all remaining records.
- Returns:
a result graph
- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
Changed in version 5.0: Can raise
ResultConsumedError
.
- async value(key=0, default=None)¶
Return the remainder of the result as a list of values.
- Parameters:
- Returns:
list of individual values
- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
See also
Changed in version 5.0: Can raise
ResultConsumedError
.
- async values(*keys)¶
Return the remainder of the result as a list of values lists.
- Parameters:
keys (int | str) – fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
- Returns:
list of values lists
- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
See also
Changed in version 5.0: Can raise
ResultConsumedError
.
- async data(*keys)¶
Return the remainder of the result as a list of dictionaries.
Each dictionary represents a record
This function provides a convenient but opinionated way to obtain the remainder of the result as mostly JSON serializable data. It is mainly useful for interactive sessions and rapid prototyping.
For details see
Record.data()
.- Parameters:
keys (int | str) – Fields to return for each remaining record. Optionally filtering to include only certain values by index or key.
- Returns:
list of dictionaries
- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
See also
Changed in version 5.0: Can raise
ResultConsumedError
.
- async to_df(expand=False, parse_dates=False)¶
Convert (the rest of) the result to a pandas DataFrame.
This method is only available if the pandas library is installed.
res = await tx.run("UNWIND range(1, 10) AS n RETURN n, n+1 AS m") df = await res.to_df()
for instance will return a DataFrame with two columns:
n
andm
and 10 rows.- Parameters:
expand (bool) –
If
True
, some structures in the result will be recursively expanded (flattened out into multiple columns) like so (everything inside<...>
is a placeholder):Node
objects under any variable<n>
will be expanded into columns (the recursion stops here)<n>().prop.<property_name>
(any) for each property of the node.<n>().element_id
(str) the node’s element id. SeeNode.element_id
.<n>().labels
(frozenset of str) the node’s labels. SeeNode.labels
.
Relationship
objects under any variable<r>
will be expanded into columns (the recursion stops here)<r>->.prop.<property_name>
(any) for each property of the relationship.<r>->.element_id
(str) the relationship’s element id. SeeRelationship.element_id
.<r>->.start.element_id
(str) the relationship’s start node’s element id. SeeRelationship.start_node
.<r>->.end.element_id
(str) the relationship’s end node’s element id. SeeRelationship.end_node
.<r>->.type
(str) the relationship’s type. SeeRelationship.type
.
list
objects under any variable<l>
will be expanded into<l>[].0
(any) the 1st list element<l>[].1
(any) the 2nd list element…
dict
objects under any variable<d>
will be expanded into<d>{}.<key1>
(any) the 1st key of the dict<d>{}.<key2>
(any) the 2nd key of the dict…
list
anddict
objects are expanded recursively. Example:variable x: [{"foo": "bar", "baz": [42, 0]}, "foobar"]
will be expanded to:
{ "x[].0{}.foo": "bar", "x[].0{}.baz[].0": 42, "n[].0{}.baz[].1": 0, "n[].1": "foobar" }
Everything else (including
Path
objects) will not be flattened.
dict
keys and variable names that contain.
or\
will be escaped with a backslash (\.
and\\
respectively).parse_dates (bool) – If
True
, columns that exclusively containtime.DateTime
objects,time.Date
objects, orNone
, will be converted topandas.Timestamp
. IfFalse
, columns of the above types will be left as driver types (dtypeobject
).
- Raises:
ImportError – if pandas library is not available.
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
pandas.DataFrame
- async to_eager_result()¶
Convert this result to an
EagerResult
.This method exhausts the result and triggers a
consume()
.- Returns:
all remaining records in the result stream, the result’s summary, and keys as an
EagerResult
instance.- Raises:
ResultConsumedError – if the transaction from which this result was obtained has been closed or the Result has been explicitly consumed.
- Return type:
Added in version 5.5.
Changed in version 5.8: Stabilized from experimental.
- closed()¶
Return True if the result has been closed.
When a result gets consumed
consume()
or the transaction that owns the result gets closed (committed, rolled back, closed), the result cannot be used to acquire further records.In such case, all methods that need to access the Result’s records, will raise a
ResultConsumedError
when called.- Returns:
whether the result is closed.
- Return type:
Added in version 5.0.
See https://neo4j.com/docs/python-manual/current/cypher-workflow/#python-driver-type-mapping for more about type mapping.
AsyncBookmarkManager¶
- class neo4j.api.AsyncBookmarkManager¶
Same as
BookmarkManager
but with async methods.The driver comes with a default implementation of the async bookmark manager accessible through
AsyncGraphDatabase.bookmark_manager()
.Added in version 5.0.
Changed in version 5.3: See
BookmarkManager
for changes.Changed in version 5.8: Stabilized from experimental.
- abstract async update_bookmarks(previous_bookmarks, new_bookmarks)¶
Handle bookmark updates.
- Parameters:
previous_bookmarks (Collection[str]) – The bookmarks used at the start of a transaction
new_bookmarks (Collection[str]) – The new bookmarks retrieved at the end of a transaction
- Return type:
None
- abstract async get_bookmarks()¶
Return the bookmarks stored in the bookmark manager.
- Returns:
The bookmarks for the given database
- Return type:
Async Cancellation¶
Async Python provides a mechanism for cancelling futures
(asyncio.Future.cancel()
). The driver and its components can handle this.
However, generally, it’s not advised to rely on cancellation as it forces the
driver to close affected connections to avoid leaving them in an undefined
state. This makes the driver less efficient.
The easiest way to make sure your application code’s interaction with the driver
is playing nicely with cancellation is to always use the async context manager
provided by neo4j.AsyncSession
like so:
async with driver.session() as session:
... # do what you need to do with the session
If, for whatever reason, you need handle the session manually, you can it like so:
session = await with driver.session()
try:
... # do what you need to do with the session
except asyncio.CancelledError:
session.cancel()
raise
finally:
# this becomes a no-op if the session has been cancelled before
await session.close()
As mentioned above, any cancellation of I/O work will cause the driver to close
the affected connection. This will kill any neo4j.AsyncTransaction
and
neo4j.AsyncResult
objects that are attached to that connection. Hence,
after catching a asyncio.CancelledError
, you should not try to use
transactions or results created earlier. They are likely to not be valid
anymore.
Furthermore, there is no guarantee as to whether a piece of ongoing work got
successfully executed on the server side or not, when a cancellation happens:
await transaction.commit()
and other methods can throw
asyncio.CancelledError
but still have managed to complete from the
server’s perspective.
Async Logging¶
For the most parts, logging works the same way as in the synchronous driver. See Logging for more information.
However, when following the manual approach to logging, it is recommended to include information about the current async task in the log record. Like so:
import asyncio
import logging
import sys
class TaskIdFilter(logging.Filter):
"""Injecting async task id into log records."""
def filter(self, record):
try:
record.taskId = id(asyncio.current_task())
except RuntimeError:
record.taskId = None
return True
# create a handler, e.g. to log to stdout
handler = logging.StreamHandler(sys.stdout)
# configure the handler to your liking
handler.setFormatter(logging.Formatter(
"[%(levelname)-8s] [Task %(taskId)-15s] %(asctime)s %(message)s"
# or when using threading AND asyncio
# "[%(levelname)-8s] [Thread %(thread)d] [Task %(taskId)-15s] "
# "%(asctime)s %(message)s"
))
# attache the filter injecting the task id to the handler
handler.addFilter(TaskIdFilter())
# add the handler to the driver's logger
logging.getLogger("neo4j").addHandler(handler)
# make sure the logger logs on the desired log level
logging.getLogger("neo4j").setLevel(logging.DEBUG)
# from now on, DEBUG logging to stdout is enabled in the driver