Periodic Execution
Cypher is great for querying graphs and importing and updating graph structures.
While during imports you can use PERIODIC COMMIT
to control transaction sizes in memory, but for other graph refactorings it’s not that easy to commit transactions regularly to free memory for new update state.
Procedure Overview
The table below describes the available procedures:
Qualified Name | Type | Release |
---|---|---|
|
|
|
|
|
|
|
|
|
Periodic Iterate
The apoc.periodic.iterate
procedure is helpful when you need to handle large amounts of data for import, refactoring, and other cases that require large transactions.
It provides a way to batch the data by dividing the workload into two parts:
- a data-driven statement
-
This defines how you select what data needs handled. You can provide a Cypher statement to select from existing graph data, read external data from a file or API, or retrieve data from another datastore.
- an operation statement
-
This defines what you want done to the selected data. You can do things like execute Cypher for updating or creating/deleting the data or run other procedures to manipulate and transform values before loading.
The data-driven statement is provided as the first statement that results in a stream of values to be processed.
The operation statement is provided as the second statement to process one element at a time or (with batchMode: "BATCH"
) a batch at a time.
The results of the data-driven statement are passed to the operation statement as parameters, so they are automatically made available with their names.
name | type | default | description |
---|---|---|---|
batchSize |
Long |
10000 |
run the specified number of operation statements in a single tx - params: {_count, _batch} |
parallel |
boolean |
false |
run operation statements in parallel (note that statements might deadlock if conflicting) |
retries |
Long |
0 |
if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param {_retry} |
batchMode |
String |
"BATCH" |
how data-driven statements should be processed by operation statement. Valid values are:
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
params |
Map |
{} |
externally pass in map of params |
concurrency |
Long |
50 |
number of concurrent tasks are generated when using |
failedParams |
Long |
-1 |
if set to a non-negative value, each failed batch up to |
In APOC versions 4.0.0.11 and earlier, the
|
param | default | description |
---|---|---|
iterateList |
true |
execute operation statements once per batchSize (whole batchSize list is passed in as parameter {_batch})
|
Periodic Iterate Examples
Let’s go through some examples.
If you were to add an :Actor
label to several million :Person
nodes, you could run the following code:
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Person
nodes with anACTED_IN
relationship to another node and returns those persons. This is the data-driven portion where we select the data that we want to change. -
Our second Cypher statement sets the
:Actor
label on each of thePerson
nodes selected. This is the operation portion where we apply the change to the data from our first statement. -
And finally, we specify any configuration we want the procedure to use. We have defined a
batchSize
of 10,000 and to run the statements in parallel.
Executing this procedure would take all of our Person
nodes gathered in the first Cypher statement and update each of them with the second Cypher statement.
It divides the work into batches - taking 10,000 Person
nodes from the stream and updating them in a single transaction.
If we have 30,000 Person
nodes in our graph with an ACTED_IN
relationship, then it would break this down into 3 batches.
Finally, it runs those in parallel, as updating node labels or properties do not conflict.
For more complex operations like updating or removing relationships, either do not use parallel: true OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects.
If you attempt complex operations, also enable retrying failed operations, e.g. with |
Now let us look at a more complex example.
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
"MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Order
nodes that have an order date greater thanOctober 13, 2016
(first Cypher statement). -
Our second Cypher statement takes those groups and finds the nodes that have a
HAS_ITEM
relationship to other nodes, then sums up the value of those items and sets that sum as a property (o.value
) for the total order value. -
Our configuration will batch those nodes into groups of 100 (
batchSize:100
) and run the batches in parallel for the second statement to process.
Batch mode: BATCH_SINGLE
If our operation statement calls a procedure that takes in a batch of values, we can use batchMode: "BATCH_SINGLE"
to get access to a batch of values to pass to that procedure.
When we use BATCH_SINGLE
, the operation statement will have access to the $_batch
parameter, which will contain a list of the fields returned in the data-driven statement.
For example, if the data driven statement is:
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
The contents of the $_batch
variable passed to the operation statement would be:
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
Let’s see an example of this in action. We’ll start by creating some nodes:
Person
and property id
UNWIND range(1,100000) as id create (:Person {id: id})
We can delete these nodes using the apoc.nodes.delete
procedure.
See Deleting Data.
This procedure takes in a list of nodes, which we can extract from the $_batch
parameter.
The following query streams all the Person
nodes and deletes them in batches of 100.
Note that using a node instead of a node id for the first parameter, such as MATCH (p:Person) RETURN p
, will result
in the parent transaction tracking all deleted nodes, which leads to overall higher memory usage.
CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN id(p) as personId",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
The contents of the $_batch
parameter that is used in the operation statement would be as follows:
[
{p: Node<1>},
{p: Node<2>},
...
]
We can use a list comprehension to extract the p
variable from each item in the list.
If we run this query, we’ll see the following output:
batch | operations |
---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |
Periodic Commit
Especially for graph processing it is useful to run a query repeatedly in separate transactions until it doesn’t process and generates any results anymore. So you can iterate in batches over elements that don’t fulfill a condition and update them so that they do afterwards.
as a safety net your statement used inside apoc.periodic.commit must contain a LIMIT clause.
|
The query is executed repeatedly in separate transactions until it returns 0.
call apoc.periodic.commit(
"match (user:User) WHERE exists( user.city )
with user limit $limit
MERGE (city:City {name:user.city})
MERGE (user)-[:LIVES_IN]->(city)
REMOVE user.city
RETURN count(*)",
{limit:10000})
Updates | Executions |
---|---|
2000000 |
200 |
Periodic Rock 'n' Roll
name
property of each person to lastname
CALL apoc.periodic.rock_n_roll('match (p:Person) return id(p) as id_p', 'MATCH (p) where id(p)={id_p} SET p.lastname =p.name', 20000)
Progress logs
To visualize verbose progresses' logs of apoc.periodic.iterate
, apoc.periodic.commit
, apoc.periodic.rock_n_roll
, apoc.periodic.rock_n_roll_while
, please set dbms.logs.debug.level=DEBUG
in neo4j.conf
.
As example, with this query:
UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});
we receive the logs, as follows:
2020-11-27 09:03:44.279+0000 INFO Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions