Create a job specification file
The job configuration file consists of a JSON object with four sections:
-
config
— global flags affecting how the import is performed -
sources
— data source definitions (relational) -
targets
— data target definitions (graph: nodes/relationships) -
actions
— pre/post-load actions
{
"config": {},
"sources": [
{ ... }
],
"targets": [
{ ... }
],
"actions": [
{ ... }
]
}
At a high level, the job will fetch data from sources
and transform/import them into the targets
.
Here below you can find an example job specification file that works out of the box to import the publicly-available movies
dataset.
In the next sections, we break it down and provide in-context information for each part. We recommend reading this guide side by side with the job specification example.
{
"config": {
"reset_db": true,
"index_all_properties": false
},
"sources": [
{
"type": "text",
"name": "persons",
"uri": "gs://neo4j-examples/persons.csv",
"format": "EXCEL",
"delimiter": ",",
"ordered_field_names": "person_tmdbId,bio,born,bornIn,died,person_imdbId,name,person_poster,person_url"
},
{
"type": "text",
"name": "movies",
"uri": "gs://neo4j-examples/movies.csv",
"format": "EXCEL",
"delimiter": ",",
"ordered_field_names": "movieId,title,budget,countries,movie_imdbId,imdbRating,imdbVotes,languages,plot,movie_poster,released,revenue,runtime,movie_tmdbId,movie_url,year,genres"
},
{
"type": "text",
"name": "directed",
"uri": "gs://neo4j-examples/directed.csv",
"format": "EXCEL",
"delimiter": ",",
"ordered_field_names": "movieId,person_tmdbId"
},
{
"type": "text",
"name": "acted_in",
"uri": "gs://neo4j-examples/acted_in.csv",
"format": "EXCEL",
"delimiter": ",",
"ordered_field_names": "movieId,person_tmdbId,role"
}
],
"targets": [
{
"node": {
"source": "persons",
"name": "Person",
"mode": "merge",
"transform": {
"group": true
},
"mappings": {
"labels": [
"\"Person\""
],
"properties": {
"keys": [
{"person_tmdbId": "id"}
],
"unique": [],
"indexed": [
{"name": "name"}
],
"strings": [
{"bornIn": "born_in"}
]
}
}
}
},
{
"node": {
"source": "movies",
"name": "Movies",
"mode": "merge",
"transform": {
"group": true
},
"mappings": {
"labels": [
"\"Movie\""
],
"properties": {
"keys": [
{"movieId": "id"}
],
"unique": [],
"indexed": [
{"title": "title"}
],
"floats": [
{"imdbRating": "rating"}
],
"integers": [
{"year": "year"}
]
}
}
}
},
{
"edge": {
"source": "directed",
"name": "Directed",
"mode": "merge",
"transform": {
"group": true
},
"mappings": {
"type": "\"DIRECTED\"",
"source": {
"label": "\"Person\"",
"key": {"person_tmdbId": "id"}
},
"target": {
"label": "\"Movie\"",
"key": {"movieId": "id"}
},
"properties": {}
}
}
},
{
"edge": {
"source": "acted_in",
"name": "Acted_in",
"mode": "merge",
"transform": {
"group": true
},
"mappings": {
"type": "\"ACTED_IN\"",
"source": {
"label": "\"Person\"",
"key": {"person_tmdbId": "id"}
},
"target": {
"label": "\"Movie\"",
"key": {"movieId": "id"}
},
"properties": {
"strings": [
{"role": "role"}
]
}
}
}
}
]
}
Configuration
The config
object contains global configuration for the import job. The flags it supports are:
-
reset_db
(bool) — whether to clear the target database before importing. Deletes data, indexes, and constraints. -
index_all_properties
(bool) — whether to create indexes for all properties. See Cypher® → Indexes for search performance.
"config": {
"reset_db": false,
"index_all_properties": false
}
Sources
The sources
section contains the definitions of the data sources, as a list. As a rough guideline, you can think one table <=> one source
. The importer will leverage the data surfaced by the sources and make it available to the targets, which eventually map it into Neo4j.
To import data from a CSV file, six attributes are compulsory.
{
"type": "text",
"name": "movies",
"uri": "<path-to-csv>",
"format": "EXCEL",
"delimiter": ",",
"ordered_field_names": "movieId,title"
}
-
type
(string) —text
. -
name
(string) — a human-friendly label for the source (unique among all sources). You will use this to reference the source in the targets section. -
uri
(string) — the Google Storage location of the CSV file (ex.gs://neo4j-datasets/movies.csv
). -
format
(string) — any of Apache’sCSVFormat
predefined formats. -
delimiter
(string) — CSV field delimiter. -
ordered_field_names
(string) — list of field names the CSV file contains, in order.
How to retrieve the Google Storage location of a file?
To retrieve the Google Storage location of a file in a Cloud bucket, expand the file options through the three dots on the right, and choose Copy gsutil URI
.
CSV files must fulfill some constraints:
-
they should not contain headers. Column names should be specified in the
ordered_field_names
attributes, and files should contain data rows only. -
they should not contain empty rows.
Targets
The targets
section contains the definitions of the graph entities that will result from the import.
Neo4j represents objects with nodes (ex. movies
, people
) and connects them with relationships (ex. ACTED_IN
, DIRECTED
).
Each object in the targets section is keyed as either node
or edge
(synonym for relationship) and will generate a corresponding entity in Neo4j drawing data from a source.
It is also possible to run custom Cypher queries via targets keyed as custom_query
.
By default, you do not have to think about dependencies between nodes and relationships, as the job imports all node
targets before any edge
target, and processes targets of type custom_query
last.
It is possible to alter this behavior and customize the ordering of targets.
Node objects
Compulsory attributes for node
objects are source
, mappings.labels
, and mappings.keys
.
{
"node": {
"source": "persons",
"name": "Person",
"mode": "merge",
"transform": {
"group": true
},
"mappings": {
"labels": [
"\"Person\""
],
"properties": {
"keys": [
{"person_tmdbId": "id"}
],
"mandatory": [],
"unique": [],
"indexed": [
{"name": "name"}
],
"strings": [
{"person_tmdbId": "id"},
{"name": "name"},
{"bornIn": "born_in"}
],
"dates": [
{"born": "born"},
{"died": "died"}
]
}
}
}
}
-
source
(string) — the name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — a human-friendly name for the target (unique among all targets). -
mode
(string) — the creation mode in Neo4j. Eithermerge
orappend
(default). See Cypher →MERGE
and Cypher →CREATE
for info. -
mappings
(object) — details on how the source columns should be mapped into node details.-
labels
(list of strings) — labels to mark the nodes with. Note that they should be quoted and escaped. -
properties
(object) — mapping of source columns into node properties.-
keys
(list of objects) — source columns that should be mapped into node properties and get a node key constraint. -
mandatory
(list of objects) — source columns that should be mapped into node properties and get a node property existence constraint. -
unique
(list of objects) — source columns that should be mapped into node properties and get a unique node property constraint. -
indexed
(list of objects) — source columns that should be mapped into node properties and get an index on the corresponding node property. -
strings
,floats
,integers
,dates
,points
,booleans
(list of objects) — source columns to be mapped into node properties of the given type. The data type affects how the data is represented into Neo4j, but does not create type constraints.
-
-
-
transform
(object) — if"group": true
, the import will SQLGROUP BY
on all fields specified inkeys
andproperties
. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions, see Transformations. -
execute_after
(string) — target object after which the current target should run. Eithernode
,edge
, orcustom_query
. To be used in conjunction withexecute_after_name
. -
execute_after_name
(string) — thename
of the target after which the current one should run.
For more information on indexes and constraints in Neo4j, see Cypher → Constraints and Cypher → Indexes for search performance. |
The objects in keys
, mandatory
, unique
, indexed
, and all the type properties (strings
, floats
, etc) have the format
{"<column-name-in-source>": "<wished-node-property-name>"}
For example, {"person_tmbdId": "id"}
will map the source column person_tmbdId
to the property id
in the new nodes.
Things to pay attention to:
-
make sure to quote and escape labels.
-
names in
keys
should not also be listed inunique
ormandatory
, or the constraints will conflict. -
source data must not have null values for
keys
columns, or they will clash with the node key constraint. If the source is not clean in this respect, think of cleaning it upfront in the relatedsource.query
field by excluding all rows that wouldn’t fulfill the constraints (ex.WHERE person_tmbdId IS NOT NULL
). -
if
index_all_properties: true
in config, it is pointless to specify any columns inproperties.indexed
. -
when a property list has multiple items, specify each property in a separate object within the list, and not as a single object containing them all.
// Good "dates": [ {"born": "born"}, {"died": "died"} ] // Bad "dates": [ { "born": "born", "died": "died" } ]
|
Edge objects
Compulsory attributes for edge
objects are source
, mappings.type
, mappings.source
, and mappings.target
.
{
"edge": {
"source": "acted_in",
"name": "Acted_in",
"mode": "merge",
"mappings": {
"type": "\"ACTED_IN\"",
"source": {
"label": "\"Person\"",
"key": "person_tmdbId"
},
"target": {
"label": "\"Movie\"",
"key": "movieId"
},
"properties": {
"keys": [],
"mandatory": [],
"unique": [],
"indexed": [],
"strings": [
{"role": "role"}
]
}
},
"transform": {
"group": true
}
}
}
-
source
(string) — the name of the source this target should draw data from. Should match one of the names from thesources
objects. -
name
(string) — a human-friendly name for the target (unique among all targets). -
mode
(string) — the creation mode in Neo4j. Eithermerge
orappend
(default). See Cypher →MERGE
and Cypher →CREATE
for info. -
mappings
(object) — details on how the source columns should be mapped into node details.-
type
(string) — type to assign to the relationship. Note that it should be quoted and escaped. -
source
(object) — starting node for the relationship (identified by node label and key). -
target
(object) — ending node for the relationship (identified by node label and key). -
properties
(object) — mapping of source columns into relationship properties.-
keys
(list of objects) — source columns that should be mapped into relationship properties and get a relationship key constraint. -
mandatory
(list of objects) — source columns that should be mapped into relationship properties and get a relationship property existence constraint. -
unique
(list of objects) — source columns that should be mapped into relationship properties and get a relationship uniqueness constraint. -
indexed
(list of objects) — source columns that should be mapped into relationship properties and get an index on the corresponding relationship property. -
strings
,floats
,integers
,dates
,points
,booleans
(list of objects) — source columns to be mapped into node properties of the given type. The data type affects how the data is represented into Neo4j, but does not create type constraints.
-
-
-
transform
(object) — if"group": true
, the import will SQLGROUP BY
on all fields specified inmappings.source
,mappings.target
, and properties. If set tofalse
, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions, see Transformations. -
execute_after
(string) — target object after which the current target should run. Eithernode
,edge
, orcustom_query
. To be used in conjunction withexecute_after_name
. -
execute_after_name
(string) — thename
of the target after which the current one should run.
For more information on indexes and constraints in Neo4j, see Cypher → Constraints and Cypher → Indexes for search performance. |
The objects in unique
, indexed
, and all the type properties (strings
, floats
, etc) have the format
{"<column-name-in-source>": "<wished-relationship-property-name>"}
For example, {"role": "role"}
will map the source column role
to the property role
in the new relationships.
Things to pay attention to:
-
make sure to quote and escape relationship types and node labels.
-
source.key
andtarget.key
take names from the source columns, not from the mapped graph properties. In the snippet above, notice how the key names areperson_tmdbId
andmovieId
even if the mapped property names in the related node objects areperson_id
andmovie_id
. -
if
index_all_properties: true
in config, it is pointless to specify any columns inproperties.indexed
. -
when a property list has multiple items, specify each property in a separate object within the list, and not as a single object containing them all.
|
Custom queries
Custom query targets are useful when the import requires a complex query that does not easily fit into the node/edge targets format.
Compulsory attributes for custom_query
objects are source
and query
.
{
"custom_query": {
"name": "Person nodes",
"source": "persons",
"query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
}
}
-
source
(string) — the name of the source this target should draw data from. Should match one of the names from thesources
objects. The rows extracted from the source are passed as a list query parameter toquery
. -
name
(string) — a human-friendly name for the target (unique among all targets). -
query
(string) — a Cypher query. Data from the source is available as a list in the parameter$rows
. -
execute_after
(string) -- target object after which the current target should run. Eithernode
,edge
, orcustom_query
. To be used in conjunction withexecute_after_name
. -
execute_after_name
(string) -- the name of the target after which the current one should run.
Do not use custom queries to run Cypher that does not directly depend on a source; use actions instead.
One-off queries, especially if not idempotent, are not fit to use in custom query targets.
The reason for this is that queries from targets are run in batches, so a custom query may be run several times depending on the number of $rows batches extracted from the source.
|
Transformations
Each target can optionally have a transform
attribute containing aggregation functions. This can be useful to extract higher-level dimensions from a more granular source. Aggregations result in extra fields that become available for import into Neo4j.
The following example shows how the aggregations would work on a fictitious dataset (not the movies one).
"transform": {
"group": true,
"aggregations": [
{
"expr": "SUM(unit_price*quantity)",
"field": "total_amount_sold"
},
{
"expr": "SUM(quantity)",
"field": "total_quantity_sold"
}
],
"limit": 50,
"where": "person_tmbdId IS NOT NULL"
}
-
group
(bool) — must betrue
foraggregations
/where
to work. -
aggregations
(list of objects) — aggregation functions are specified as SQL queries in theexpr
attribute, and the result is available under the name specified infield
. -
limit
(int) — caps the number of source rows that are considered for import (defaults to no limit, encoded as-1
). -
where
(string) — filters out source data prior to import (with an SQLWHERE
clause format).
Pre/Post load actions
The actions
section contains commands that can be run before or after specific steps of the import process. You may for example submit HTTP requests when steps complete, or execute SQL queries on the source, or Cypher statements on the Neo4j target.
{
"name": "Post load POST request",
"execute_after": "edge",
"execute_after_name": "Acted_in",
"type": "http_post",
"options": [
{"url": "https://httpbin.org/post"},
{"param1": "value1"}
],
"headers": [
{"header1": "value1"},
{"header2": "value2"}
]
}
-
name
(string) — a human friendly name for the action. -
execute_after
(string) — after what import step the action should run. Valid values are:-
preloads
— before any source is parsed -
sources
— after sources have been parsed -
nodes
— after allnode
objects have been processed -
edges
— after alledge
objects have been processed -
custom_queries
— after allcustom_query
objects have been processed -
loads
— after all entities (nodes+edges) have been processed -
source
,node
,edge
,custom_queries
,action
— after a specific source or node or edge or custom query or action object has been run, to be used in conjunction withexecute_after_name
-
-
execute_after_name
(string) — after whichsource
/node
/edge
/custom_query
/action
object the step should run. -
type
(string) — what action to run. Valid values are:-
http_post
— HTTP POST request (requires aurl
option) -
http_get
— HTTP GET request (requires aurl
option) -
bigquery
— query to a BigQuery database (requires ansql
option) -
cypher
— query to the target Neo4j database (requires acypher
option)
-
-
options
(list of objects) — action options, such asurl
,sql
,cypher
. -
headers
(list of objects) — headers to send with the request.
Variables
For production use cases it is common to supply date ranges or parameters based on dimensions, tenants, or tokens.
Key-values can be supplied to replace $
delimited tokens in SQL queries, URLs, custom queries, or action options/headers.
You can provide parameters in the Options JSON
field when creating the Dataflow job, as a JSON object.
Variables must be escaped with the $
symbol (ex. $limit
). Replaceable tokens can appear in job specification files and in readQuery
or inputFilePattern
(source URI) command-line parameters.