apoc.load.directory.async.add

Procedure APOC Core

apoc.load.directory.async.add(name, cypher, pattern, urlDir, {}) YIELD name, status, pattern, cypher, urlDir, config, error - Add or replace a folder listener with a specific name, pattern and url directory that execute the specified cypher query when an event is triggered and return listener list

Signature

apoc.load.directory.async.add(name :: STRING?, cypher :: STRING?, pattern = * :: STRING?, urlDir =  :: STRING?, config = {} :: MAP?) :: (name :: STRING?, status :: STRING?, pattern :: STRING?, cypher :: STRING?, urlDir :: STRING?, config :: MAP?, error :: STRING?)

Input parameters

Name Type Default

name

STRING?

null

cypher

STRING?

null

pattern

STRING?

*

urlDir

STRING?

config

MAP?

{}

Config parameters

The procedure supports the following config parameters:

Table 1. Config parameters
name type default description

interval

Integer

1000

Time interval in ms after re-watch for directory changes

listenEventType

List<Enum>

List.of("CREATE", "DELETE", "MODIFY")

Types of event that execute the cypher query, that is creation (CREATE), deletion (DELETE) or editing (MODIFY) of a file in specified folder

Output parameters

Name Type

name

STRING?

status

STRING?

pattern

STRING?

cypher

STRING?

urlDir

STRING?

config

MAP?

error

STRING?

Usage Examples

The first parameter is the name of our custom watch listener. If we use an already existing listener name, that listener will be overwritten. The second parameter is the cypher query that will be executed. The query can have the following parameters:

  • $fileName: the name of the file which triggered the event

  • $filePath: the absolute path of the file which triggered the event if apoc.import.file.use_neo4j_config=false, otherwise the relative path starting from $IMPORT_DIR

  • $fileDirectory: the absolute path directory of the file which triggered the event if apoc.import.file.use_neo4j_config=false, otherwise the relative path starting from $IMPORT_DIR

  • $listenEventType: the triggered event ("CREATE", "DELETE" or "MODIFY"). The event "CREATE" happens when a file is inserted in the folder, "DELETE" when a file is removed from the folder and "MODIFY" when a file in the folder is changed. Please note that if a file is renamed, will be triggered 2 event, that is first "DELETE" and then"CREATE"

The third parameter is the pattern of file to search for. By default is '*', that is, search all files. The fourth is the search path of directory. By default is an empty string, that is, search file in import directory.

Examples of interoperability with other procedures

We can use another load procedure as query parameter, like apoc.load.csv or apoc.load.json.

Here are two examples.

We can execute:

CALL apoc.load.directory.async.add('csvImport',
"CALL apoc.load.csv($filePath) yield list WITH list CREATE (n:CsvToNode {content: list, fileName: $fileName, fileDirectory: $fileDirectory, listenEventType: $listenEventType})",
"*.csv", "csvFolder" ,{listenEventType: ["CREATE", "MODIFY"]})

where $fileName is the file created/modified, $filePath is the relative path of the file, that is $IMPORT_DIR/csvFolder/[FILENAME.csv], $fileDirectory is the relative path of the directory, that is $IMPORT_DIR/csvFolder and $listenEventType is the triggered event, that is CREATE or MODIFY.

If we upload the following file in $IMPORT_DIR/csvFolder folder:

test.csv
name,age
Selma,8
Rana,11
Selina,18

and then, executing MATCH (n:CsvToNode) RETURN properties(n) as props:

Table 2. Results
props

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selma", "8" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Rana", "11" ] }

{ "fileName": "test.csv", "listenEventType": "CREATE", "fileDirectory": "csvFolder", "content": [ "Selina", "18" ] }

If we modify the test.csv as follow:

test.csv
name,age
Selma,80
Rana,110
Selina,180

we obtain 3 new nodes with these props:

Table 3. Results
props

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selma", "80" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Rana", "110" ] }

{ "fileName": "test.csv", "listenEventType": "MODIFY", "fileDirectory": "csvFolder", "content": [ "Selina", "180" ] }

Or else, we can execute:

CALL apoc.load.directory.async.add('jsonImport',
"CALL apoc.load.json($filePath) yield value WITH value CREATE (n:JsonToNode {content: value['foo'], fileName: $fileName, fileDirectory: $fileDirectory})",
"*.json", "jsonFolder", {listenEventType: ["CREATE"]})

If we upload the following files in $IMPORT_DIR/jsonFolder folder:

fileOne.json
{ "foo":[1,2,3] }
{ "foo":[4,5,6] }

and

fileTwo.json
{ "foo":[11,22,33] }
{ "foo":[41,51,61] }

We obtain this result calling MATCH (n:JsonToNode) RETURN properties(n) as props:

Table 4. Results
props

{ "fileName": "fileOne.json", "fileDirectory": "jsonFolder", "content": [ 1,2,3 ] }

{ "fileName": "fileOne.json", "fileDirectory": "jsonFolder", "content": [ 4,5,6 ] }

{ "fileName": "fileTwo.json", "fileDirectory": "jsonFolder", "content": [ 11,22,33 ] }

{ "fileName": "fileTwo.json", "fileDirectory": "jsonFolder", "content": [ 41,51,61 ] }

Error handling

When for some reason, the listener fails, its status field change from RUNNING to ERROR, and the associated error is output. If we execute call apoc.load.directory.async.list, we obtain, for example:

name status pattern cypher urlDir config error

listenerName

ERROR

*.csv

'create (n:Node)'

'path'

{}

"org.neo4j.graphdb.QueryExecutionException: Failed to invoke procedure apoc.load.csv: Caused by: java.io.FileNotFoundException …​.

Procedure enabling

Please note that to use the apoc.load.directory.async.* procedures, we’ll have to enable the following config option:

apoc.conf
apoc.import.file.enabled=true

Configuration Examples

Let’s suppose we have set the config option:

apoc.import.file.use_neo4j_config=true

We can do the following procedure:

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)');

so every time, in import folder, a file is created/deleted/updated will be executed the query CREATE (n:Test).

We can listen only for specified event kinds, for example for file creation, and create an import report:

CALL apoc.load.directory.async.add('test', 'CREATE (n:Import {fileName: $fileName})', '*', '', {listenEventType: ['ENTRY_CREATE']});

to listen only for file creation. So, if we import two file "foo.csv" and "bar.csv", will be created 2 nodes: (n:Import {fileName: 'foo.csv'}) and (n:Import {fileName: 'bar.csv'}).

Moreover, we can set a specific file pattern, for example:

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv');

in this case, every time in import folder, a file .csv is created/deleted/updated will be executed the query CREATE (n:Test).

Furthermore, we can also set a path, like:

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'subfolderImport');

to listen in a subfolder called subfolderImport in import folder.

Instead, if we set apoc.import.file.use_neo4j_config=false, we can search with an absolute path:

CALL apoc.load.directory.async.add('test', 'CREATE (n:Test)', '*.csv', 'file:///Users/username/Downloads');