apoc.periodic.iterate
Procedure
apoc.periodic.iterate(cypherIterate STRING, cypherAction STRING, config MAP<STRING, ANY>)
- runs the second statement for each item returned by the first statement.
This procedure returns the number of batches and the total number of processed rows.
As of Neo4j 5.21, transactions can be processed in parallel using the Cypher command CALL {…} IN CONCURRENT TRANSACTIONS .
For more information, see CALL subqueries in transactions → Concurrent transactions.
|
Signature
apoc.periodic.iterate(cypherIterate :: STRING, cypherAction :: STRING, config :: MAP) :: (batches :: INTEGER, total :: INTEGER, timeTaken :: INTEGER, committedOperations :: INTEGER, failedOperations :: INTEGER, failedBatches :: INTEGER, retries :: INTEGER, errorMessages :: MAP, batch :: MAP, operations :: MAP, wasTerminated :: BOOLEAN, failedParams :: MAP, updateStatistics :: MAP)
Input parameters
Name | Type | Default |
---|---|---|
cypherIterate |
STRING |
null |
cypherAction |
STRING |
null |
config |
MAP |
null |
Config parameters
The procedure support the following config parameters:
name | type | default | description |
---|---|---|---|
batchSize |
INTEGER |
10000 |
run the specified number of operation statements in a single tx - params: {_count, _batch} |
parallel |
BOOLEAN |
false |
run operation statements in parallel (note that statements might deadlock if conflicting). |
retries |
INTEGER |
0 |
if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param {_retry} |
batchMode |
STRING |
"BATCH" |
how data-driven statements should be processed by operation statement. Valid values are:
UNWIND $_batch AS _batch WITH _batch.field1 AS field1, _batch.field2 AS field2
|
params |
MAP |
{} |
externally pass in map of params |
concurrency |
INTEGER |
Number of processors available |
number of concurrent tasks are generated when using |
failedParams |
INTEGER |
-1 |
if set to a non-negative value, each failed batch up to |
planner |
Enum[DEFAULT, COST, IDP, DP] |
DEFAULT |
Any planner other than |
In APOC versions 4.0.0.11 and earlier, the
|
param | default | description |
---|---|---|
iterateList |
true |
execute operation statements once per batchSize (whole batchSize list is passed in as parameter {_batch})
|
Output parameters
Name | Type | What |
---|---|---|
batches |
INTEGER |
Number of batches |
total |
INTEGER |
Number of processed input rows |
timeTaken |
INTEGER |
Duration in seconds |
committedOperations |
INTEGER |
Number of successful inner queries (actions) |
failedOperations |
INTEGER |
Number of failed inner queries (actions) |
failedBatches |
INTEGER |
Number of failed batches |
retries |
INTEGER |
Number of retries |
errorMessages |
MAP<STRING,INTEGER> |
Map of batch error messages to error count |
batch |
MAP |
Batch statistics
|
operations |
MAP |
Inner queries (actions) statistics
|
wasTerminated |
BOOLEAN |
True if transaction was terminated before completion |
failedParams |
MAP<STRING, LIST<MAP>> |
Parameters of failed batches. Key is batch number as a string and value is list of batch parameters. |
updateStatistics |
MAP |
|
Usage Examples
Let’s go through some examples.
If you were to add an :Actor
label to several million :Person
nodes, you could run the following code:
CALL apoc.periodic.iterate(
"MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
"SET p:Actor",
{batchSize:10000, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Person
nodes with anACTED_IN
relationship to another node and returns those persons. This is the data-driven portion where we select the data that we want to change. -
Our second Cypher statement sets the
:Actor
label on each of thePerson
nodes selected. This is the operation portion where we apply the change to the data from our first statement. -
And finally, we specify any configuration we want the procedure to use. We have defined a
batchSize
of 10,000 and to run the statements in parallel.
Executing this procedure would take all of our Person
nodes gathered in the first Cypher statement and update each of them with the second Cypher statement.
It divides the work into batches - taking 10,000 Person
nodes from the stream and updating them in a single transaction.
If we have 30,000 Person
nodes in our graph with an ACTED_IN
relationship, then it would break this down into 3 batches.
Finally, it runs those in parallel, as updating node labels or properties do not conflict.
For more complex operations like updating or removing relationships, either do not use parallel: true OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects.
If you attempt complex operations, also enable retrying failed operations, e.g. with |
Now let us look at a more complex example.
CALL apoc.periodic.iterate(
"MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o.id as orderId",
"MATCH (o:Order)-[:HAS_ITEM]->(i) WHERE o.id = orderId WITH o, sum(i.value) as value SET o.value = value",
{batchSize:100, parallel:true})
Let’s break down the parameters passed to the procedure:
-
Our first Cypher statement selects all the
Order
nodes that have an order date greater thanOctober 13, 2016
(first Cypher statement). -
Our second Cypher statement takes those groups and finds the nodes that have a
HAS_ITEM
relationship to other nodes, then sums up the value of those items and sets that sum as a property (o.value
) for the total order value. -
Our configuration will batch those nodes into groups of 100 (
batchSize:100
) and run the batches in parallel for the second statement to process.
Batch mode: BATCH_SINGLE
If our operation statement calls a procedure that takes in a batch of values, we can use batchMode: "BATCH_SINGLE"
to get access to a batch of values to pass to that procedure.
When we use BATCH_SINGLE
, the operation statement will have access to the $_batch
parameter, which will contain a list of the fields returned in the data-driven statement.
For example, if the data driven statement is:
RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b
The contents of the $_batch
variable passed to the operation statement would be:
[
{a: "mark", b: "michael"},
{a: "jennifer", b: "andrea"}
]
Let’s see an example of this in action. We’ll start by creating some nodes:
Person
and property id
UNWIND range(1,100000) as id create (:Person {id: id})
We can delete these nodes using the apoc.nodes.delete
procedure.
See Deleting data.
This procedure takes in a list of nodes, which we can extract from the $_batch
parameter.
The following query streams all the Person
nodes and deletes them in batches of 100.
Note that using a node instead of a node id for the first parameter, such as MATCH (p:Person) RETURN p
, will result
in the parent transaction tracking all deleted nodes, which leads to overall higher memory usage.
If you are using Neo4j 5.6 or later consider using the elementId
function to pass node information between transactions.
CALL apoc.periodic.iterate(
"MATCH (p:Person) RETURN id(p) as personId",
// Extract `p` variable using list comprehension
"CALL apoc.nodes.delete([item in $_batch | item.personId], size($_batch))",
{batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;
The contents of the $_batch
parameter that is used in the operation statement would be as follows:
[
{p: Node<1>},
{p: Node<2>},
...
]
We can use a list comprehension to extract the p
variable from each item in the list.
If we run this query, we’ll see the following output:
batch | operations |
---|---|
{total: 1000, committed: 1000, failed: 0, errors: {}} |
{total: 100000, committed: 100000, failed: 0, errors: {}} |