Run concurrent transactions
You may leverage Goroutines and channels to run concurrent queries, or to delegate the processing of a query’s result to multiple threads.
The examples below also use the Go sync
package to coordinate different routines.
If you are not familiar with concurrency in Go, checkout The Go Programming Language → Go Concurrency Patterns: Pipelines and cancellation.
If you need causal consistency across different transactions, use bookmarks.
Concurrent processing of a query result set (using sessions)
The following example shows how you can stream a query result to a channel, and have its records concurrently processed by several consumers.
package main
import (
"fmt"
"context"
"time"
"sync"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
// Run a query and get results in a channel
recordsC := queryToChannel(ctx, driver) (1)
// Spawn some consumers that will process records
// They communicate back on the log channel
// WaitGroup allows to keep track of progress and close channel when all are done
log := make(chan string) (4)
wg := &sync.WaitGroup{} (5)
for i := 1; i < 10; i++ { // i starts from 1 because 0th receiver would process too fast
wg.Add(1)
go consumer(wg, recordsC, log, i) (6)
}
// When all consumers are done, close log channel
go func() {
wg.Wait()
close(log)
}()
// Print log as it comes
for v := range log {
fmt.Println(v)
}
}
func queryToChannel(ctx context.Context, driver neo4j.DriverWithContext) chan *neo4j.Record {
recordsC := make(chan *neo4j.Record, 10) (2)
session := driver.NewSession(ctx, neo4j.SessionConfig{DatabaseName: "neo4j"})
defer session.Close(ctx)
go session.ExecuteWrite(ctx,
func(tx neo4j.ManagedTransaction) (any, error) {
// Neo4j query to create and retrieve some nodes
result, err := tx.Run(ctx, `
UNWIND range(1,25) AS id
MERGE (p:Person {id: id})
RETURN p
`, nil)
if err != nil {
panic(err)
}
// Stream results to channel as they come from the server
for result.Next(ctx) { (3)
record := result.Record()
recordsC <- record
}
close(recordsC)
return nil, err
})
return recordsC
}
func consumer(wg *sync.WaitGroup, records <-chan *neo4j.Record, log chan string, n int) {
defer wg.Done() // will communicate that routine is done
for record := range records {
log <- fmt.Sprintf("Receiver %v processed %v", n, record)
time.Sleep(time.Duration(n) * time.Second) // proxy for a time-consuming processing
}
}
1 | A Goroutine runs the query to the Neo4j server with a managed transaction. Notice that the driver session is created inside the routine, as sessions are not thread-safe. |
2 | The channel recordsC is where the query result records get streamed to.
The transaction function from .ExecuteWrite() writes to it, and the various consumer s read from it.
It is buffered so that the driver does not retrieve records faster than what the consumers can handle. |
3 | Each result record coming from the server is sent over the recordsC channel.
The streaming continues so long as there are records to be processed, after which the channel gets closed and the routine exits. |
4 | The channel log is where the consumers comunicate on. |
5 | A sync.WaitGroup is needed to know when all consumers are done, and thus the log channel can be closed. |
6 | A number of consumer s get started in separate Goroutines.
Each consumer reads and processes records from the recordsC channel.
Each consumer simulates a lengthy operation with a sleeping timer. |
Concurrent run of multiple queries (using ExecuteQuery()
)
The following example shows how you can run multiple queries concurrently.
package main
import (
"fmt"
"context"
"sync"
"github.com/neo4j/neo4j-go-driver/v5/neo4j"
)
func main() {
ctx := context.Background()
// Connection to database
dbUri := "<URI for Neo4j database>"
dbUser := "<Username>"
dbPassword := "<Password>"
driver, err := neo4j.NewDriverWithContext(
dbUri,
neo4j.BasicAuth(dbUser, dbPassword, ""))
if err != nil {
panic(err)
}
defer driver.Close(ctx)
err = driver.VerifyConnectivity(ctx)
if err != nil {
panic(err)
}
log := make(chan string) (1)
wg := &sync.WaitGroup{} (2)
// Spawn 10 concurrent queries
for i := 0; i < 10; i++ {
wg.Add(1)
go runQuery(wg, ctx, driver, log) (3)
}
// Wait for all runner routines to be done before closing log
go func() {
wg.Wait()
close(log)
}()
// Print log
for msg := range log {
fmt.Println(msg)
}
}
// Run Neo4j query with random sleep time, returning the sleep time in ms
func runQuery(wg *sync.WaitGroup, ctx context.Context, driver neo4j.DriverWithContext, log chan string) {
defer wg.Done() // will communicate that routine is done
result, err := neo4j.ExecuteQuery(ctx, driver, `
WITH round(rand()*2000) AS waitTime
CALL apoc.util.sleep(toInteger(waitTime)) RETURN waitTime AS time
`, nil, neo4j.EagerResultTransformer,
neo4j.ExecuteQueryWithDatabase("neo4j"))
if err != nil {
log <- fmt.Sprintf("ERROR: %v", err)
} else {
neo, _ := result.Records[0].Get("time")
log <- fmt.Sprintf("Query returned %v", neo)
}
}
1 | The log channel is where all query routine communicate to. |
2 | A sync.WaitGroup is needed to know when all query routines are done, and thus the log channel can be closed. |
3 | Ten different queries are run, each in its own Go routine.
They run independently and concurrently, reporting to the shared log channel. |
Glossary
- LTS
-
A Long Term Support release is one guaranteed to be supported for a number of years. Neo4j 4.4 is LTS, and Neo4j 5 will also have an LTS version.
- Aura
-
Aura is Neo4j’s fully managed cloud service. It comes with both free and paid plans.
- Cypher
-
Cypher is Neo4j’s graph query language that lets you retrieve data from the database. It is like SQL, but for graphs.
- APOC
-
Awesome Procedures On Cypher (APOC) is a library of (many) functions that can not be easily expressed in Cypher itself.
- Bolt
-
Bolt is the protocol used for interaction between Neo4j instances and drivers. It listens on port 7687 by default.
- ACID
-
Atomicity, Consistency, Isolation, Durability (ACID) are properties guaranteeing that database transactions are processed reliably. An ACID-compliant DBMS ensures that the data in the database remains accurate and consistent despite failures.
- eventual consistency
-
A database is eventually consistent if it provides the guarantee that all cluster members will, at some point in time, store the latest version of the data.
- causal consistency
-
A database is causally consistent if read and write queries are seen by every member of the cluster in the same order. This is stronger than eventual consistency.
- NULL
-
The null marker is not a type but a placeholder for absence of value. For more information, see Cypher → Working with
null
. - transaction
-
A transaction is a unit of work that is either committed in its entirety or rolled back on failure. An example is a bank transfer: it involves multiple steps, but they must all succeed or be reverted, to avoid money being subtracted from one account but not added to the other.
- backpressure
-
Backpressure is a force opposing the flow of data. It ensures that the client is not being overwhelmed by data faster than it can handle.
- transaction function
-
A transaction function is a callback executed by an
ExecuteRead
orExecuteWrite
call. The driver automatically re-executes the callback in case of server failure. - DriverWithContext
-
A
DriverWithContext
object holds the details required to establish connections with a Neo4j database.