Periodic Execution

Cypher is great for querying graphs and importing and updating graph structures. While during imports you can use PERIODIC COMMIT to control transaction sizes in memory, but for other graph refactorings it’s not that easy to commit transactions regularly to free memory for new update state.

Procedure Overview

The table below describes the available procedures:

Qualified Name Type Release

apoc.periodic.iterate

apoc.periodic.iterate('statement returning items', 'statement per item', {batchSize:1000,iterateList:true,parallel:false,params:{},concurrency:50,retries:0}) YIELD batches, total - run the second statement for each item returned by the first statement. Returns number of batches and total processed rows

Procedure

APOC Core

apoc.periodic.commit

apoc.periodic.commit(statement,params) - runs the given statement in separate transactions until it returns 0

Procedure

APOC Core

apoc.periodic.rock_n_roll

apoc.periodic.rock_n_roll('some cypher for iteration', 'some cypher as action on each iteration', 10000) YIELD batches, total - run the action statement in batches over the iterator statement’s results in a separate thread. Returns number of batches and total processed rows

Procedure

APOC Full

Periodic Iterate

The apoc.periodic.iterate procedure is helpful when you need to handle large amounts of data for import, refactoring, and other cases that require large transactions. It provides a way to batch the data by dividing the workload into two parts:

a data-driven statement

This defines how you select what data needs handled. You can provide a Cypher statement to select from existing graph data, read external data from a file or API, or retrieve data from another datastore.

an operation statement

This defines what you want done to the selected data. You can do things like execute Cypher for updating or creating/deleting the data or run other procedures to manipulate and transform values before loading.

The data-driven statement is provided as the first statement that results in a stream of values to be processed. The operation statement is provided as the second statement to process one element at a time or (with batchMode: "BATCH") a batch at a time. The results of the data-driven statement are passed to the operation statement as parameters, so they are automatically made available with their names.

Table 1. Config
name type default description

batchSize

Long

10000

run the specified number of operation statements in a single tx - params: {_count, _batch}

parallel

boolean

false

run operation statements in parallel (note that statements might deadlock if conflicting)
Please note that, in case of parallel: false, APOC is designed to reuse the same java.util.concurrent.ThreadPoolExecutor with a maximum pool size equal 1, in order to prevent parallelism; this means that if you want to execute multiple apoc.periodic.iterate each one will be executed when the previous one has been completed. Instead, with parallel: true, APOC will use a ThreadPoolExecutor with a configurable maximum pool size via the apoc.jobs.pool.num_threads config or as default with the number of available processor * 2. Therefore, if we execute multiple apoc.periodic.iterate each one will be executed in parallel if the queue pool size can accept new tasks. Furthermore, to be noted that running in parallel affects all databases, and not the single database you are using. So with e.g. 2 databases db1 and db2, the apoc.periodic.iterate on db1 will impact on performance if we execute an apoc.periodic.iterate on db2.

retries

Long

0

if the operation statement fails with an error, sleep 100ms and retry until retries-count is reached - param {_retry}

batchMode

String

"BATCH"

how data-driven statements should be processed by operation statement. Valid values are:

  • "BATCH" - execute operation statement once per batchSize. Operation statement is prefixed with the following, which extracts each field returned in the data-driven statement from the $_batch parameter:

UNWIND $_batch AS _batch
WITH _batch.field1 AS field1, _batch.field2 AS field2
  • "SINGLE" - execute operation statement one at a time

  • "BATCH_SINGLE" - execute operation statement once per batchSize, but leaves unpacking of batch to the operation statement. The operation query can access the batched values via the $_batch parameter.

params

Map

{}

externally pass in map of params

concurrency

Long

50

number of concurrent tasks are generated when using parallel:true

failedParams

Long

-1

if set to a non-negative value, each failed batch up to failedParams parameter sets are returned in yield failedParams.

In APOC versions 4.0.0.11 and earlier, the iterateList config key was used to control the batching of values returned by the data-driven statement. This was replaced by batchMode in version 4.0.0.12. These config keys are treated as follows:

  • If batchMode is provided, its value takes precedence over iterateList

  • If batchMode is not provided and iterateList is provided, the value of iterateList will be translated as described in the table below.

  • If neither batchMode nor iterateList are provided, batchMode defaults to BATCH, which is the same as iterateList:true

Table 2. Deprecated Config
param default description

iterateList

true

execute operation statements once per batchSize (whole batchSize list is passed in as parameter {_batch})

  • A value of true is equivalent to batchMode: "BATCH"

  • A value of false is equivalent to batchMode: "SINGLE"

Periodic Iterate Examples

Let’s go through some examples.

If you were to add an :Actor label to several million :Person nodes, you could run the following code:

CALL apoc.periodic.iterate(
  "MATCH (p:Person) WHERE (p)-[:ACTED_IN]->() RETURN p",
  "SET p:Actor",
  {batchSize:10000, parallel:true})

Let’s break down the parameters passed to the procedure:

  • Our first Cypher statement selects all the Person nodes with an ACTED_IN relationship to another node and returns those persons. This is the data-driven portion where we select the data that we want to change.

  • Our second Cypher statement sets the :Actor label on each of the Person nodes selected. This is the operation portion where we apply the change to the data from our first statement.

  • And finally, we specify any configuration we want the procedure to use. We have defined a batchSize of 10,000 and to run the statements in parallel.

Executing this procedure would take all of our Person nodes gathered in the first Cypher statement and update each of them with the second Cypher statement. It divides the work into batches - taking 10,000 Person nodes from the stream and updating them in a single transaction. If we have 30,000 Person nodes in our graph with an ACTED_IN relationship, then it would break this down into 3 batches.

Finally, it runs those in parallel, as updating node labels or properties do not conflict.

For more complex operations like updating or removing relationships, either do not use parallel: true OR make sure that you batch the work in a way that each subgraph of data is updated in one operation, such as by transferring the root objects. If you attempt complex operations, also enable retrying failed operations, e.g. with retries:3.

Now let us look at a more complex example.

CALL apoc.periodic.iterate(
  "MATCH (o:Order) WHERE o.date > '2016-10-13' RETURN o",
  "MATCH (o)-[:HAS_ITEM]->(i) WITH o, sum(i.value) as value SET o.value = value",
  {batchSize:100, parallel:true})

Let’s break down the parameters passed to the procedure:

  • Our first Cypher statement selects all the Order nodes that have an order date greater than October 13, 2016 (first Cypher statement).

  • Our second Cypher statement takes those groups and finds the nodes that have a HAS_ITEM relationship to other nodes, then sums up the value of those items and sets that sum as a property (o.value) for the total order value.

  • Our configuration will batch those nodes into groups of 100 (batchSize:100) and run the batches in parallel for the second statement to process.

Batch mode: BATCH_SINGLE

If our operation statement calls a procedure that takes in a batch of values, we can use batchMode: "BATCH_SINGLE" to get access to a batch of values to pass to that procedure. When we use BATCH_SINGLE, the operation statement will have access to the $_batch parameter, which will contain a list of the fields returned in the data-driven statement.

For example, if the data driven statement is:

RETURN 'mark' AS a, 'michael' AS b
UNION
RETURN 'jennifer' AS a, 'andrea' AS b

The contents of the $_batch variable passed to the operation statement would be:

[
  {a: "mark", b: "michael"},
  {a: "jennifer", b: "andrea"}
]

Let’s see an example of this in action. We’ll start by creating some nodes:

The following query creates 100,000 nodes with the label Person and property id
UNWIND range(1,100000) as id create (:Person {id: id})

We can delete these nodes using the apoc.nodes.delete procedure. See Deleting Data.

This procedure takes in a list of nodes, which we can extract from the $_batch parameter.

The following query streams all the Person nodes and deletes them in batches of 100
CALL apoc.periodic.iterate(
  "MATCH (p:Person) RETURN p",
  // Extract `p` variable using list comprehension
  "CALL apoc.nodes.delete([item in $_batch | item.p], size($_batch))",
  {batchMode: "BATCH_SINGLE", batchSize: 100}
)
YIELD batch, operations;

The contents of the $_batch parameter that is used in the operation statement would be as follows:

[
  {p: Node<1>},
  {p: Node<2>},
  ...
]

We can use a list comprehension to extract the p variable from each item in the list.

If we run this query, we’ll see the following output:

Table 3. Results
batch operations

{total: 1000, committed: 1000, failed: 0, errors: {}}

{total: 100000, committed: 100000, failed: 0, errors: {}}

Periodic Commit

Especially for graph processing it is useful to run a query repeatedly in separate transactions until it doesn’t process and generates any results anymore. So you can iterate in batches over elements that don’t fulfill a condition and update them so that they do afterwards.

as a safety net your statement used inside apoc.periodic.commit must contain a LIMIT clause.

The query is executed repeatedly in separate transactions until it returns 0.

call apoc.periodic.commit(
  "match (user:User) WHERE exists( user.city )
   with user limit $limit
   MERGE (city:City {name:user.city})
   MERGE (user)-[:LIVES_IN]->(city)
   REMOVE user.city
   RETURN count(*)",
  {limit:10000})
Table 4. Results
Updates Executions

2000000

200

Periodic Rock 'n' Roll

copies over the name property of each person to lastname
CALL apoc.periodic.rock_n_roll('match (p:Person) return id(p) as id_p', 'MATCH (p) where id(p)={id_p} SET p.lastname =p.name', 20000)

Progress logs

To visualize verbose progresses' logs of apoc.periodic.iterate, apoc.periodic.commit, apoc.periodic.rock_n_roll, apoc.periodic.rock_n_roll_while, please set dbms.logs.debug.level=DEBUG in neo4j.conf.

As example, with this query:

UNWIND range(1,100) AS x CREATE (:TestLog{bar:'TestLog_'+x});
CALL apoc.periodic.iterate('match (p:TestLog) return p', 'SET p.foo =p.bar REMOVE p.bar', {batchSize:10,parallel:true});

we receive the logs, as follows:

2020-11-27 09:03:44.279+0000 INFO  Starting periodic iterate from `match (p:TestLog) return p` operation using iteration `SET p.foo =p.bar REMOVE p.bar` in separate thread with id: `fc8ff303-bfdd-49f0-a724-603f03b0da45`
2020-11-27 09:03:44.279+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 10 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.280+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 20 total
2020-11-27 09:03:44.280+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.294+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 30 total
2020-11-27 09:03:44.294+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 40 total
2020-11-27 09:03:44.295+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.295+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 50 total
2020-11-27 09:03:44.297+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 60 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 70 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.298+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 80 total
2020-11-27 09:03:44.298+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.299+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 90 total
2020-11-27 09:03:44.299+0000 DEBUG Execute, in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, no 10 batch size
2020-11-27 09:03:44.300+0000 DEBUG Processed in periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45, 10 iterations of 100 total
2020-11-27 09:03:44.512+0000 DEBUG Terminated periodic iterate with id fc8ff303-bfdd-49f0-a724-603f03b0da45 with 100 executions