4.3. Sessions and transactions

This section describes how to create units of work and provide a logical context for that work.

4.3.1. Sessions

A session is a container for a sequence of transactions. Sessions borrow connections from a pool as required and so should be considered lightweight and disposable. In languages where thread safety is an issue, a session should not be considered thread-safe.

In languages that support them, sessions are usually scoped within a context block. This ensures that they are properly closed and that any underlying connections are released and not leaked.

Example 4.17. Session
public void AddPerson(string name)
{
    using (var session = Driver.Session())
    {
        session.Run("CREATE (a:Person {name: $name})", new {name});
    }
}
public void addPerson(String name)
{
    try ( Session session = driver.session() )
    {
        session.run("CREATE (a:Person {name: $name})", parameters( "name", name ) );
    }
}
const session = driver.session();

session.run('CREATE (a:Person {name: $name})', {'name': personName}).then(() => {
  session.close(() => {
    console.log('Person created, session closed');
  });
});
def add_person(self, name):
    with self._driver.session() as session:
        session.run("CREATE (a:Person {name: $name})", name=name)

4.3.2. Transactions

Transactions are atomic units of work consisting of one or more Cypher statement executions. A transaction is executed within a session.

To execute a Cypher statement, two pieces of information are required: the statement template and a keyed set of parameters. The template is a string containing placeholders that are substituted with parameter values at runtime. While it is possible to run non-parameterized Cypher, good programming practice is to use parameters in Cypher statements. This allows for caching of statements within the Cypher engine, which is beneficial for performance. Parameter values should adhere to Section 3.2.1, “Values and types”.

The Neo4j driver API provides for three forms of transaction:

  • Auto-commit transactions
  • Transaction functions
  • Explicit transactions

Of these, only transaction function can be automatically replayed on failure.

4.3.2.1. Auto-commit Transactions

An auto-commit transaction is a simple but limited form of transaction. Such a transaction consists of only one Cypher statement, cannot be automatically replayed on failure, and cannot take part in a causal chain.

An auto-commit transaction is invoked using the session.run method:

Example 4.18. Auto-commit transaction
public void AddPerson(string name)
{
    using (var session = Driver.Session())
    {
        session.Run("CREATE (a:Person {name: $name})", new {name});
    }
}
public void addPerson( String name )
{
    try ( Session session = driver.session() )
    {
        session.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
    }
}
function addPerson(name) {
  const session = driver.session();
  return session.run('CREATE (a:Person {name: $name})', {name: name}).then(result => {
    session.close();
    return result;
  });
}
def add_person(self, name):
    with self._driver.session() as session:
        session.run("CREATE (a:Person {name: $name})", name=name)

Auto-commit transactions are sent to the network and acknowledged immediately. This means that multiple transactions cannot share network packets, thereby exhibiting a lesser network efficiency than other forms of transaction.

Auto-commit transactions are intended to be used for simple use cases such as when learning Cypher or writing one-off scripts. It is not recommended to use auto-commit transactions in production environments or when performance or resilience are a primary concern.

However, Auto-commit transactions are the only way to execute USING PERIODIC COMMIT Cypher statements.

4.3.2.2. Transaction functions

Transaction functions are the recommended form for containing transactional units of work. This form requires minimal boilerplate code and allows for a clear separation of database queries and application logic.

Example 4.19. Transaction function
public void AddPerson(string name)
{
    using (var session = Driver.Session())
    {
        session.WriteTransaction(tx => tx.Run("CREATE (a:Person {name: $name})", new {name}));
    }
}
public void addPerson( final String name )
{
    try ( Session session = driver.session() )
    {
        session.writeTransaction( new TransactionWork<Integer>()
        {
            @Override
            public Integer execute( Transaction tx )
            {
                return createPersonNode( tx, name );
            }
        } );
    }
}

private static int createPersonNode( Transaction tx, String name )
{
    tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
    return 1;
}
const session = driver.session();
const writeTxPromise = session.writeTransaction(tx => tx.run('CREATE (a:Person {name: $name})', {'name': personName}));

writeTxPromise.then(result => {
  session.close();

  if (result) {
    console.log('Person created');
  }
});
def add_person(self, name):
    with self._driver.session() as session:
        session.write_transaction(self.create_person_node, name)

@staticmethod
def create_person_node(tx, name):
    tx.run("CREATE (a:Person {name: $name})", name=name)

Transaction functions are also able to handle connection problems and transient errors using an automatic retry mechanism. This retry capability can be configured on Driver construction.

Any query results obtained within a transaction function should be consumed within that function. Transaction functions can return values but these should be derived values rather than raw results.

4.3.2.3. Explicit transactions

Explicit transactions are the longhand form of transaction functions, providing access to explicit BEGIN, COMMIT and ROLLBACK operations. While this form is useful for a handful of use cases, it is recommended to use transaction functions wherever possible.

4.3.2.4. Cypher errors

When executing Cypher, it is possible for an exception to be thrown by the Cypher engine. Each such exception is associated with a status code that describes the nature of the error and a message that provides more detail.

The error classifications are listed in the table below.

Table 4.4. Error classifications
Classification Description

ClientError

The client application has caused an error. The application should amend and retry the operation.

DatabaseError

The server has caused an error. Retrying the operation will generally be unsuccessful.

TransientError

A temporary error has occurred. The application should retry the operation.

4.3.3. Causal chaining

When working with a causal cluster, transactions can be chained to ensure causal consistency. This means that for any two transactions, it is guaranteed that the second transaction will begin only after the first has been successfully committed. This is true even if the transactions are carried out on different physical cluster members.

Causal chaining is carried out by passing bookmarks between transactions. Each bookmark records a point in transactional history and can be used to inform cluster members to carry out units of work in a particular sequence. Internally, a bookmark is passed from server to client on a successful COMMIT and back from client to server on BEGIN. On receipt of one or more bookmarks, the transaction server will block until it has fast forwarded to catch up with the latest of these.

Within a session, bookmark propagation is carried out automatically and does not require any explicit signal or setting from the application. To opt out of this mechanism for unrelated units of work, applications can use multiple sessions. This avoids the small latency overhead of the causal chain. Propagation between sessions can be achieved by extracting the last bookmarks from one or more sessions and passing these into the construction of another. This is generally the only case in which an application will need to work with bookmarks directly.

Figure 4.1. Passing bookmarks
driver passing bookmarks
Example 4.20. Passing bookmarks between sessions

This example illustrates the passing of bookmarks between sessions.

We are using three separate sessions: a, b and c. In session a we run two separate transactions. In the first one we create the person Alice, and in the second one we record that she works at Wayne Enterprises. The bookmark being passed between the two transactions is handled by the session. The bookmark from the last transaction is saved into an array for future use.

In session b we also run two separate transactions. In the first one we create the person Bob, and in the second one we record that he works at LexCorp. Again, the bookmark being passed between the two transactions is handled by the session. The bookmark from the last transaction is saved into an array for future use.

In the last session, session c, we wish to create a friendship between Alice and Bob. This can only be done if both Alice and Bob have been created first. In order to ensure this, we pass the bookmarks from the last transactions in session a and session b, respectively.

// Create a company node
private void AddCompany(ITransaction tx, string name)
{
    tx.Run("CREATE (a:Company {name: $name})", new {name});
}

// Create a person node
private void AddPerson(ITransaction tx, string name)
{
    tx.Run("CREATE (a:Person {name: $name})", new {name});
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
private void Employ(ITransaction tx, string personName, string companyName)
{
    tx.Run(@"MATCH (person:Person {name: $personName})
             MATCH (company:Company {name: $companyName})
             CREATE (person)-[:WORKS_FOR]->(company)", new {personName, companyName});
}

// Create a friendship between two people.
private void MakeFriends(ITransaction tx, string name1, string name2)
{
    tx.Run(@"MATCH (a:Person {name: $name1})
             MATCH (b:Person {name: $name2})
             MERGE (a)-[:KNOWS]->(b)", new {name1, name2});
}

// Match and display all friendships.
private void PrintFriendships(ITransaction tx)
{
    var result = tx.Run("MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name");

    foreach (var record in result)
    {
        Console.WriteLine($"{record["a.name"]} knows {record["b.name"]}");
    }
}

public void AddEmployAndMakeFriends()
{
    // To collect the session bookmarks
    var savedBookmarks = new List<string>();

    // Create the first person and employment relationship.
    using (var session1 = Driver.Session(AccessMode.Write))
    {
        session1.WriteTransaction(tx => AddCompany(tx, "Wayne Enterprises"));
        session1.WriteTransaction(tx => AddPerson(tx, "Alice"));
        session1.WriteTransaction(tx => Employ(tx, "Alice", "Wayne Enterprises"));

        savedBookmarks.Add(session1.LastBookmark);
    }

    // Create the second person and employment relationship.
    using (var session2 = Driver.Session(AccessMode.Write))
    {
        session2.WriteTransaction(tx => AddCompany(tx, "LexCorp"));
        session2.WriteTransaction(tx => AddPerson(tx, "Bob"));
        session2.WriteTransaction(tx => Employ(tx, "Bob", "LexCorp"));

        savedBookmarks.Add(session2.LastBookmark);
    }

    // Create a friendship between the two people created above.
    using (var session3 = Driver.Session(AccessMode.Write, savedBookmarks))
    {
        session3.WriteTransaction(tx => MakeFriends(tx, "Alice", "Bob"));

        session3.ReadTransaction(PrintFriendships);
    }
}
// Create a company node
private StatementResult addCompany( final Transaction tx, final String name )
{
    return tx.run( "CREATE (:Company {name: $name})", parameters( "name", name ) );
}

// Create a person node
private StatementResult addPerson( final Transaction tx, final String name )
{
    return tx.run( "CREATE (:Person {name: $name})", parameters( "name", name ) );
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
private StatementResult employ( final Transaction tx, final String person, final String company )
{
    return tx.run( "MATCH (person:Person {name: $person_name}) " +
                    "MATCH (company:Company {name: $company_name}) " +
                    "CREATE (person)-[:WORKS_FOR]->(company)",
            parameters( "person_name", person, "company_name", company ) );
}

// Create a friendship between two people.
private StatementResult makeFriends( final Transaction tx, final String person1, final String person2 )
{
    return tx.run( "MATCH (a:Person {name: $person_1}) " +
                    "MATCH (b:Person {name: $person_2}) " +
                    "MERGE (a)-[:KNOWS]->(b)",
            parameters( "person_1", person1, "person_2", person2 ) );
}

// Match and display all friendships.
private StatementResult printFriends( final Transaction tx )
{
    StatementResult result = tx.run( "MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name" );
    while ( result.hasNext() )
    {
        Record record = result.next();
        System.out.println( String.format( "%s knows %s", record.get( "a.name" ).asString(), record.get( "b.name" ).toString() ) );
    }
    return result;
}

public void addEmployAndMakeFriends()
{
    // To collect the session bookmarks
    List<String> savedBookmarks = new ArrayList<>();

    // Create the first person and employment relationship.
    try ( Session session1 = driver.session( AccessMode.WRITE ) )
    {
        session1.writeTransaction( tx -> addCompany( tx, "Wayne Enterprises" ) );
        session1.writeTransaction( tx -> addPerson( tx, "Alice" ) );
        session1.writeTransaction( tx -> employ( tx, "Alice", "Wayne Enterprises" ) );

        savedBookmarks.add( session1.lastBookmark() );
    }

    // Create the second person and employment relationship.
    try ( Session session2 = driver.session( AccessMode.WRITE ) )
    {
        session2.writeTransaction( tx -> addCompany( tx, "LexCorp" ) );
        session2.writeTransaction( tx -> addPerson( tx, "Bob" ) );
        session2.writeTransaction( tx -> employ( tx, "Bob", "LexCorp" ) );

        savedBookmarks.add( session2.lastBookmark() );
    }

    // Create a friendship between the two people created above.
    try ( Session session3 = driver.session( AccessMode.WRITE, savedBookmarks ) )
    {
        session3.writeTransaction( tx -> makeFriends( tx, "Alice", "Bob" ) );

        session3.readTransaction( this::printFriends );
    }
}
// Create a company node
function addCompany(tx, name) {
  return tx.run('CREATE (a:Company {name: $name})', {'name': name});
}

// Create a person node
function addPerson(tx, name) {
  return tx.run('CREATE (a:Person {name: $name})', {'name': name});
}

// Create an employment relationship to a pre-existing company node.
// This relies on the person first having been created.
function addEmployee(tx, personName, companyName) {
  return tx.run('MATCH (person:Person {name: $personName}) ' +
    'MATCH (company:Company {name: $companyName}) ' +
    'CREATE (person)-[:WORKS_FOR]->(company)', {'personName': personName, 'companyName': companyName});
}

// Create a friendship between two people.
function makeFriends(tx, name1, name2) {
  return tx.run('MATCH (a:Person {name: $name1}) ' +
    'MATCH (b:Person {name: $name2}) ' +
    'MERGE (a)-[:KNOWS]->(b)', {'name1': name1, 'name2': name2});
}

// To collect friend relationships
const friends = [];

// Match and display all friendships.
function findFriendships(tx) {
  const result = tx.run('MATCH (a)-[:KNOWS]->(b) RETURN a.name, b.name');

  result.subscribe({
    onNext: record => {
      const name1 = record.get(0);
      const name2 = record.get(1);

      friends.push({'name1': name1, 'name2': name2});
    }
  });
}

// To collect the session bookmarks
const savedBookmarks = [];

// Create the first person and employment relationship.
const session1 = driver.session(neo4j.WRITE);
const first = session1.writeTransaction(tx => addCompany(tx, 'Wayne Enterprises')).then(
  () => session1.writeTransaction(tx => addPerson(tx, 'Alice'))).then(
  () => session1.writeTransaction(tx => addEmployee(tx, 'Alice', 'Wayne Enterprises'))).then(
  () => {
    savedBookmarks.push(session1.lastBookmark());

    return session1.close();
  });

// Create the second person and employment relationship.
const session2 = driver.session(neo4j.WRITE);
const second = session2.writeTransaction(tx => addCompany(tx, 'LexCorp')).then(
  () => session2.writeTransaction(tx => addPerson(tx, 'Bob'))).then(
  () => session2.writeTransaction(tx => addEmployee(tx, 'Bob', 'LexCorp'))).then(
  () => {
    savedBookmarks.push(session2.lastBookmark());

    return session2.close();
  });

// Create a friendship between the two people created above.
const last = Promise.all([first, second]).then(ignore => {
  const session3 = driver.session(neo4j.WRITE, savedBookmarks);

  return session3.writeTransaction(tx => makeFriends(tx, 'Alice', 'Bob')).then(
    () => session3.readTransaction(findFriendships).then(
      () => session3.close()
    )
  );
});
Unresolved directive in n/1.5/src/main/asciidoc/sessions-and-transactions.adoc - include::/mnt/teamcity/work/67def7d1a141cd2b/build/driver-sources/python-driver/1.5.3/test/examples/pass_bookmarks_example.py[tags=pass-bookmarks]

If you try to extract a bookmark from a database which is not running in Causal Cluster mode, you will receive a null result.

4.3.4. Access modes

Transactions can be executed in either read or write mode. In a causal cluster, each transaction will be routed to an appropriate server based on the mode. When using a single instance, all transactions will be passed to that one server. Routing Cypher by identifying reads and writes can improve the utilization of available cluster resources: as read servers are typically more plentiful than write servers, it is beneficial to direct as much as possible of read transactions to read servers. Doing so helps keeping write servers available for write transactions.

Note that the driver does not parse Cypher and cannot determine whether a transaction is intended to carry out read or write operations. As a result of this, a write transaction tagged for read will be sent to a read server, but will fail on execution.

Example 4.21. Read-write transaction
public long AddPerson(string name)
{
    using (var session = Driver.Session())
    {
        session.WriteTransaction(tx => CreatePersonNode(tx, name));
        return session.ReadTransaction(tx => MatchPersonNode(tx, name));
    }
}

private static void CreatePersonNode(ITransaction tx, string name)
{
    tx.Run("CREATE (a:Person {name: $name})", new {name});
}

private static long MatchPersonNode(ITransaction tx, string name)
{
    var result = tx.Run("MATCH (a:Person {name: $name}) RETURN id(a)", new {name});
    return result.Single()[0].As<long>();
}
public long addPerson( final String name )
{
    try ( Session session = driver.session() )
    {
        session.writeTransaction( new TransactionWork<Void>()
        {
            @Override
            public Void execute( Transaction tx )
            {
                return createPersonNode( tx, name );
            }
        } );
        return session.readTransaction( new TransactionWork<Long>()
        {
            @Override
            public Long execute( Transaction tx )
            {
                return matchPersonNode( tx, name );
            }
        } );
    }
}

private static Void createPersonNode( Transaction tx, String name )
{
    tx.run( "CREATE (a:Person {name: $name})", parameters( "name", name ) );
    return null;
}

private static long matchPersonNode( Transaction tx, String name )
{
    StatementResult result = tx.run( "MATCH (a:Person {name: $name}) RETURN id(a)", parameters( "name", name ) );
    return result.single().get( 0 ).asLong();
}
const session = driver.session();

const writeTxPromise = session.writeTransaction(tx => tx.run('CREATE (a:Person {name: $name})', {name: personName}));

writeTxPromise.then(() => {
  const readTxPromise = session.readTransaction(tx => tx.run('MATCH (a:Person {name: $name}) RETURN id(a)', {name: personName}));

  readTxPromise.then(result => {
    session.close();

    const singleRecord = result.records[0];
    const createdNodeId = singleRecord.get(0);

    console.log('Matched created node with id: ' + createdNodeId);
  });
});
def add_person(self, name):
    with self._driver.session() as session:
        session.write_transaction(self.create_person_node, name)
        return session.read_transaction(self.match_person_node, name)

@staticmethod
def create_person_node(tx, name):
    tx.run("CREATE (a:Person {name: $name})", name=name)
    return None

@staticmethod
def match_person_node(tx, name):
    result = tx.run("MATCH (a:Person {name: $name}) RETURN count(a)", name=name)
    return result.single()[0]

4.3.5. Asynchronous programming

Java, .NET and JavaScript all support asynchronous programming. The examples here highlight specifically how Java and .NET provide for this programming model alongside their blocking API.

In addition to the methods listed in the previous sections, there also exist several asynchronous methods which allow for better integration with applications written in an asynchronous style. Asynchronous methods are named as their synchronous counterparts but with an additional async prefix.

Example 4.22. Asynchronous programming examples

Auto-commit transactions. 

var records = new List<string>();
var session = Driver.Session();

try
{
    // Send cypher statement to the database.
    // The existing IStatementResult interface implements IEnumerable
    // and does not play well with asynchronous use cases. The replacement
    // IStatementResultCursor interface is returned from the RunAsync
    // family of methods instead and provides async capable methods.
    var reader = await session.RunAsync(
        "MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher statement
        new { id = 0 } // Parameters in the statement, if any
    );

    // Loop through the records asynchronously
    while (await reader.FetchAsync())
    {
        // Each current read in buffer can be reached via Current
        records.Add(reader.Current[0].ToString());
    }
}
finally
{
    // asynchronously close session
    await session.CloseAsync();
}

Transaction functions. 

var session = Driver.Session();

try
{
    // Wrap whole operation into an implicit transaction and
    // get the results back.
    result = await session.ReadTransactionAsync(async tx =>
    {
        var records = new List<string>();

        // Send cypher statement to the database
        var reader = await tx.RunAsync(
            "MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher statement
            new { id = 0 } // Parameters in the statement, if any
        );

        // Loop through the records asynchronously
        while (await reader.FetchAsync())
        {
            // Each current read in buffer can be reached via Current
            records.Add(reader.Current[0].ToString());
        }

        return records;
    });
}
finally
{
    // asynchronously close session
    await session.CloseAsync();
}

Explicit transactions. 

var records = new List<string>();
var session = Driver.Session();

try
{
    // Start an explicit transaction
    var tx = await session.BeginTransactionAsync();

    // Send cypher statement to the database through the explicit
    // transaction acquired
    var reader = await tx.RunAsync(
        "MATCH (p:Product) WHERE p.id = $id RETURN p.title", // Cypher statement
        new { id = 0 } // Parameters in the statement, if any
    );

    // Loop through the records asynchronously
    while (await reader.FetchAsync())
    {
        // Each current read in buffer can be reached via Current
        records.Add(reader.Current[0].ToString());
    }

    // Commit the transaction
    await tx.CommitAsync();
}
finally
{
    // asynchronously close session
    await session.CloseAsync();
}

It is always important to close the session object and it is suggested to keep session.CloseAsync in a finally block to make sure all resources (such as network connection) obtained by the session will always be cleaned up properly. The session close method also enforces rolling back of the last uncommitted or failed transaction in this session. Thus it is optional to put tx.CommitAsync or tx.RollbackAsync in a finally block as long as the close of session where the transaction is created will be executed in an outer finally block.

Auto-commit transactions. 

String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );

Session session = driver.session();

return session.runAsync( query, parameters )
        .thenCompose( cursor -> cursor.listAsync( record -> record.get( 0 ).asString() ) )
        .exceptionally( error ->
        {
            // query execution failed, print error and fallback to empty list of titles
            error.printStackTrace();
            return Collections.emptyList();
        } )
        .thenCompose( titles -> session.closeAsync().thenApply( ignore -> titles ) );

Transaction functions. 

String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );

Session session = driver.session();

return session.readTransactionAsync( tx ->
        tx.runAsync( query, parameters )
                .thenCompose( cursor -> cursor.forEachAsync( record ->
                        // asynchronously print every record
                        System.out.println( record.get( 0 ).asString() ) ) )
);

Explicit transactions. 

String query = "MATCH (p:Product) WHERE p.id = $id RETURN p.title";
Map<String,Object> parameters = Collections.singletonMap( "id", 0 );

Session session = driver.session();

Function<Transaction,CompletionStage<Void>> printSingleTitle = tx ->
        tx.runAsync( query, parameters )
                .thenCompose( StatementResultCursor::singleAsync )
                .thenApply( record -> record.get( 0 ).asString() )
                .thenApply( title ->
                {
                    // single title fetched successfully
                    System.out.println( title );
                    return true; // signal to commit the transaction
                } )
                .exceptionally( error ->
                {
                    // query execution failed
                    error.printStackTrace();
                    return false; // signal to rollback the transaction
                } )
                .thenCompose( commit -> commit ? tx.commitAsync() : tx.rollbackAsync() );

return session.beginTransactionAsync()
        .thenCompose( printSingleTitle )
        .exceptionally( error ->
        {
            // either commit or rollback failed
            error.printStackTrace();
            return null;
        } )
        .thenCompose( ignore -> session.closeAsync() );

It is important to close the session object to make sure all resources (such as network connections) obtained by the session are cleaned up properly. It is therefore suggested to always do Session#closeAsync() at the end of the whole CompletionStage chain. Sessions should be closed regardless of whether the completes normally or exceptionally. Closing the session also enforces the rollback of the last uncommitted or failed transaction in the session. Thus it is optional to roll back the transaction using Transaction#rollbackAsync(), as long as the session is closed at the end of the full chain.