Run your own transactions

When querying the database with transaction functions, the driver automatically creates a transaction. You can easily include multiple queries in a single transaction function, but to design more involved use-cases it is important to understand how transactions work within the driver.

Obtain a session

Before running a transaction, you need to obtain a session. Sessions act as concrete query channels between the driver and the server. In particular, sessions borrow connections from the connection pool as needed, and ensure causal consistency is enforced.

Sessions are created with the method Driver.session(), with the keyword argument database allowing to specify the target database. For further parameters, see Session configuration.

with driver.session(database="neo4j") as session:
    ...

Session creation is a lightweight operation, so sessions can be created and destroyed without significant cost. Always close sessions when you are done with them.

Sessions are not thread safe: share the main Driver object across threads, but make sure each thread creates its own sessions.

Run a managed transaction

A transaction can contain any number of queries. As Neo4j is ACID compliant, queries within a transaction will either be executed as a whole or not at all: you cannot get a part of the transaction succeeding and another failing. Use transactions to group together related queries which work together to achieve a single logical database operation.

A managed transaction is created with the methods Session.execute_read() and Session.execute_write(), depending on whether you want to retrieve data from the database or alter it. Both methods take a transaction function callback (1) and an arbitrary number of positional and keyword arguments (2) which are handed down to the transaction function. The transaction function (3) is responsible for actually carrying out the queries and processing the result. Queries are specified with the Transaction.run() method (4), which returns a Result object. You can then process the result (5) using any of the Result methods, or simply casting it to list.

Retrieve people whose name starts with Al.
def match_person_nodes(tx, name_filter): (3)
    result = tx.run(""" (4)
        MATCH (p:Person) WHERE p.name STARTS WITH $filter
        RETURN p.name as name ORDER BY name
        """, filter=name_filter)
    return list(result)  # return a list of Record objects (5)

with driver.session(database="neo4j") as session:
    people = session.execute_read(
        match_person_nodes, (1)
        "Al", (2)
    )
    for person in people:
        print(person.data())  # obtain dict representation

Do not hardcode or concatenate parameters directly into the query. Use query parameters instead, both for performance and security reasons.

Transaction functions should never return the Result object directly. Instead, always process the result in some way; at minimum, cast it to list. Within a transaction function, a return statement results in the transaction being committed, while the transaction is automatically rolled back if an exception is raised.

A transaction with multiple queries, client logic, and potential roll backs.
from neo4j import GraphDatabase


URI = "neo4j://localhost"
AUTH = ("neo4j", "secret")
employee_threshold=10


def main():
    with GraphDatabase.driver(URI, auth=AUTH) as driver:
        with driver.session(database="neo4j") as session:
            for i in range(100):
                name = "Thor"+str(i)
                org_id = session.execute_write(employ_person_tx, name)
                print(f"User {name} added to organization {org_id}")


def employ_person_tx(tx, name):
    # Create new Person node with given name, if not existing already
    result = tx.run("""
        MERGE (p:Person {name: $name})
        RETURN p.name AS name
        """, name=name
    )

    # Obtain most recent organization ID and the number of people linked to it
    result = tx.run("""
        MATCH (o:Organization)
        RETURN o.id AS id, COUNT{(p:Person)-[r:WORKS_FOR]->(o)} AS employees_n
        ORDER BY o.created_date DESC
        LIMIT 1
    """)
    org = result.single()

    if org is not None and org["employees_n"] == 0:
        raise Exception("Most recent organization is empty.")
        # Transaction will roll back -> not even Person is created!

    # If org does not have too many employees, add this Person to that
    if org is not None and org.get("employees_n") < employee_threshold:
        result = tx.run("""
            MATCH (o:Organization {id: $org_id})
            MATCH (p:Person {name: $name})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN $org_id AS id
            """, org_id=org["id"], name=name
        )

    # Otherwise, create a new Organization and link Person to it
    else:
        from neo4j.time import Date
        result = tx.run("""
            MATCH (p:Person {name: $name})
            CREATE (o:Organization {id: randomuuid(), created_date: datetime()})
            MERGE (p)-[r:WORKS_FOR]->(o)
            RETURN o.id AS id
            """, name=name, date=Date.today()
        )

    # Return the Organization ID to which the new Person ends up in
    return result.single()["id"]


if __name__ == "__main__":
    main()

Should a transaction fail for a reason that the driver deems transient, it automatically retries to run the transaction function (with an exponentially increasing delay). For this reason, transaction functions must be idempotent (i.e., they should produce the same effect when run several times), because you do not know upfront how many times they are going to be executed. In practice, this means that you should not edit nor rely on globals, for example. Note that although transactions functions might be executed multiple times, the queries inside it will always run only once.

A session can chain multiple transactions, but only one single transaction can be active within a session at any given time. To maintain multiple concurrent transactions, use multiple concurrent sessions.

Further transaction function configuration

The decorator unit_of_work() allows to exert further control on transaction functions. It allows to specify:

  • a transaction timeout (in seconds). Transactions that run longer will be terminated by the server. The default value is set on the server side.

  • a dictionary of metadata that gets attached to the transaction. These metadata get logged in the server query.log, and are visible in the output of the SHOW TRANSACTIONS Cypher command. Use this to tag transactions.

from neo4j import unit_of_work

@unit_of_work(timeout=5, metadata={"app_name": "people"})
def count_people(tx):
    result = tx.run("MATCH (a:Person) RETURN count(a) AS people")
    record = result.single()
    return record["people"]


with driver.session(database="neo4j") as session:
    people_n = session.execute_read(count_people)

Run an explicit transaction

You can achieve full control over transactions by manually beginning one with the method Session.begin_transaction(). You run queries inside an explicit transaction with the method Transaction.run().

with driver.session(database="neo4j") as session:
    with session.begin_transaction() as tx:
        # use tx.run() to run queries

Closing an explicit transaction can either happen automatically at the end of a with block, or can be explicitly controlled through the methods Transaction.commit(), Transaction.rollback(), or Transaction.close().

Explicit transactions are most useful for applications that need to distribute Cypher execution across multiple functions for the same transaction, or for applications that need to run multiple queries within a single transaction but without the automatic retries provided by managed transactions.

from os import sleep
import neo4j


def transfer_to_other_bank(driver, customer_id, other_bank_id, amount):
    with driver.session(database="neo4j") as session:
        tx = session.begin_transaction()
        # or just use a `with` context on `tx` instead of try/finally
        try:
            if not customer_balance_check(tx, customer_id, amount):
                # give up
                return

            other_bank_transfer_api(customer_id, other_bank_id, amount)
            # Now the money has been transferred => can't rollback anymore
            # (cannot rollback external services interactions)

            try:
                decrease_customer_balance(tx, customer_id, amount)
                tx.commit()
            except Exception as e:
                request_inspection(customer_id, other_bank_id, amount, e)
                raise  # roll back
        finally:
            tx.close()  # rolls back if not yet committed


def customer_balance_check(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        RETURN c.balance >= $amount AS sufficient
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    record = result.single(strict=True)
    return record["sufficient"]


def other_bank_transfer_api(customer_id, other_bank_id, amount):
    ...
 # make some API call to other bank


def decrease_customer_balance(tx, customer_id, amount):
    query = ("""
        MATCH (c:Customer {id: $id})
        SET c.balance = c.balance - $amount
    """)
    result = tx.run(query, id=customer_id, amount=amount)
    result.consume()


def request_inspection(customer_id, other_bank_id, amount, e):
    # manual cleanup required; log this or similar
    print("WARNING: transaction rolled back due to exception:", repr(e))
    print("customer_id:", customer_id, "other_bank_id:", other_bank_id,
          "amount:", amount)

Session configuration

Database selection

It is recommended to always specify the database explicitly with the database parameter, even on single-database instances. This allows the driver to work more efficiently, as it does not have to resolve the home database first. If no database is given, the default database set in the Neo4j instance settings is used.

import neo4j

with driver.session(
    database="neo4j"
) as session:
    ...

Request routing

In a cluster environment, all sessions are opened in write mode, routing them to the leader. You can change this by explicitly setting the default_access_mode parameter to either neo4j.READ_ACCESS or neo4j.WRITE_ACCESS. Note that .execute_read() and .execute_write() automatically override the session’s default access mode.

import neo4j

with driver.session(
    database="neo4j",
    default_access_mode=neo4j.READ_ACCESS
) as session:
    ...

Although executing a write query in read mode likely results in a runtime error, you should not rely on this for access control. The difference between the two modes is that read transactions will be routed to any node of a cluster, whereas write ones will be directed to the leader. Still, depending on the server version and settings, the server might allow none, some, or all write statements to be executed even in read transactions.

Similar remarks hold for the .execute_read() and .execute_write() methods.

Run queries as a different user (impersonation)

You can execute a query under the security context of a different user with the parameter impersonated_user, specifying the name of the user to impersonate. For this to work, the user under which the Driver was created needs to have the appropriate permissions. Impersonating a user is cheaper than creating a new Driver object.

with driver.session(
    database="neo4j",
    impersonated_user="somebody_else"
) as session:
    ...

When impersonating a user, the query is run within the complete security context of the impersonated user and not the authenticated user (i.e., home database, permissions, etc.).

Close sessions

Each connection pool has a finite number of sessions, so if you open sessions without ever closing them, your application could run out of them. It is thus recommended to create sessions using the with statement, which automatically closes them when the application is done with them. When a session is closed, it is returned to the connection pool to be later reused.

If you do not use with, remember to call the .close() method when you have finished using a session.

session = driver.session(database="neo4j")
# ...
# session usage
# ...
session.close()

Glossary

LTS

A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.

Aura

Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.

Driver

A Driver object holds the details required to establish connections with a Neo4j database. Every Neo4j-backed application requires a Driver object.

Cypher

Cypher is Neo4j’s graph query language that lets you retrieve data from the graph. It is like SQL, but for graphs.

APOC

Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.

Bolt

Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.

ACID

Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.

eventual consistency

A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.

causal consistency

A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.

null

The null marker is not a type but a placeholder for absence of value. For more information, see Cypher Manual — Working with null.

transaction

A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.