Create a job specification file

The job configuration file consists of a JSON object with four sections:

  • config — global flags affecting how the import is performed

  • sources — data source definitions (relational)

  • targets — data target definitions (graph: nodes/relationships)

  • actions — pre/post-load actions

Job specification JSON skeleton
{
  "config": {},
  "sources": [
    { ... }
  ],
  "targets": [
    { ... }
  ],
  "actions": [
    { ... }
  ]
}

At a high level, the job will fetch data from sources and transform/import them into the targets.

Here below you can find an example job specification file that works out of the box to import the publicly-available movies dataset. In the next sections, we break it down and provide in-context information for each part. We recommend reading this guide side by side with the job specification example.

{
  "config": {
    "reset_db": true,
    "index_all_properties": false
  },
  "sources": [
    {
      "type": "bigquery",
      "name": "persons",
      "query": "SELECT person_tmdbId, name, bornIn, born, died FROM team-connectors-dev.movies.persons"
    },
    {
      "type": "bigquery",
      "name": "movies",
      "query": "SELECT movieId, title, imdbRating, year FROM team-connectors-dev.movies.movies"
    },
    {
      "type": "bigquery",
      "name": "directed",
      "query": "SELECT movieId, person_tmdbId FROM team-connectors-dev.movies.directed"
    },
    {
      "type": "bigquery",
      "name": "acted_in",
      "query": "SELECT movieId, person_tmdbId, role FROM team-connectors-dev.movies.acted_in"
    }
  ],
  "targets": [
    {
      "node": {
        "source": "persons",
        "name": "Person",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "labels": [
            "\"Person\""
          ],
          "properties": {
            "keys": [
              {"person_tmdbId": "id"}
            ],
            "unique": [],
            "indexed": [
              {"name": "name"}
            ],
            "strings": [
              {"bornIn": "born_in"}
            ],
            "dates": [
              {"born": "born"},
              {"died": "died"}
            ]
          }
        }
      }
    },
    {
      "node": {
        "source": "movies",
        "name": "Movies",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "labels": [
            "\"Movie\""
          ],
          "properties": {
            "keys": [
              {"movieId": "id"}
            ],
            "unique": [],
            "indexed": [
              {"title": "title"}
            ],
            "floats": [
              {"imdbRating": "rating"},
              {"year": "year"}
            ]
          }
        }
      }
    },
    {
      "edge": {
        "source": "directed",
        "name": "Directed",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "type": "\"DIRECTED\"",
          "source": {
            "label": "\"Person\"",
            "key": {"person_tmdbId": "id"}
          },
          "target": {
            "label": "\"Movie\"",
            "key": {"movieId": "id"}
          },
          "properties": {}
        }
      }
    },
    {
      "edge": {
        "source": "acted_in",
        "name": "Acted_in",
        "mode": "merge",
        "transform": {
          "group": true
        },
        "mappings": {
          "type": "\"ACTED_IN\"",
          "source": {
            "label": "\"Person\"",
            "key": {"person_tmdbId": "id"}
          },
          "target": {
            "label": "\"Movie\"",
            "key": {"movieId": "id"}
          },
          "properties": {
            "strings": [
              {"role": "role"}
            ]
          }
        }
      }
    }
  ]
}

Configuration

The config object contains global configuration for the import job. The flags it supports are:

  • reset_db (bool) — whether to clear the target database before importing. Deletes data, indexes, and constraints.

  • index_all_properties (bool) — whether to create indexes for all properties. See Cypher® → Indexes for search performance.

Configuration settings and their defaults
"config": {
  "reset_db": false,
  "index_all_properties": false
}

Sources

The sources section contains the definitions of the data sources, as a list. As a rough guideline, you can think one table <=> one source. The importer will leverage the data surfaced by the sources and make it available to the targets, which eventually map it into Neo4j.

To import a BigQuery dataset, three attributes are compulsory.

{
  "type": "bigquery",
  "name": "movies",
  "query": "SELECT movieId, title FROM team-connectors-dev.movies.movies"
}
  • type (string) — bigquery.

  • name (string) — a human-friendly label for the source (unique among all sources). You will use this to reference the source in the targets section.

  • query (string) — the dataset to extract from BigQuery, as an SQL query. Notice that:

    1. the source BigQuery table can have more columns than what you select in the query;

    2. multiple targets can use the same source, even filtering it for a subset of columns.

Columns of type BIGNUMERIC, GEOGRAPHY, JSON, INTERVAL and STRUCT are not supported.

Targets

The targets section contains the definitions of the graph entities that will result from the import. Neo4j represents objects with nodes (ex. movies, people) and connects them with relationships (ex. ACTED_IN, DIRECTED). Each object in the targets section is keyed as either node or edge (synonym for relationship) and will generate a corresponding entity in Neo4j drawing data from a source. It is also possible to run custom Cypher queries via targets keyed as custom_query.

By default, you do not have to think about dependencies between nodes and relationships, as the job imports all node targets before any edge target, and processes targets of type custom_query last. It is possible to alter this behavior and customize the ordering of targets.

Node objects

Compulsory attributes for node objects are source, mappings.labels, and mappings.keys.

{
  "node": {
    "source": "persons",
    "name": "Person",
    "mode": "merge",
    "transform": {
      "group": true
    },
    "mappings": {
      "labels": [
        "\"Person\""
      ],
      "properties": {
        "keys": [
          {"person_tmdbId": "id"}
        ],
        "mandatory": [],
        "unique": [],
        "indexed": [
          {"name": "name"}
        ],
        "strings": [
          {"person_tmdbId": "id"},
          {"name": "name"},
          {"bornIn": "born_in"}
        ],
        "dates": [
          {"born": "born"},
          {"died": "died"}
        ]
      }
    }
  }
}
  • source (string) — the name of the source this target should draw data from. Should match one of the names from the sources objects.

  • name (string) — a human-friendly name for the target (unique among all targets).

  • mode (string) — the creation mode in Neo4j. Either merge or append (default). See Cypher → MERGE and Cypher → CREATE for info.

  • mappings (object) — details on how the source columns should be mapped into node details.

    • labels (list of strings) — labels to mark the nodes with. Note that they should be quoted and escaped.

    • properties (object) — mapping of source columns into node properties.

      • keys (list of objects) — source columns that should be mapped into node properties and get a node key constraint.

      • mandatory (list of objects) — source columns that should be mapped into node properties and get a node property existence constraint.

      • unique (list of objects) — source columns that should be mapped into node properties and get a unique node property constraint.

      • indexed (list of objects) — source columns that should be mapped into node properties and get an index on the corresponding node property.

      • strings, floats, integers, dates, points, booleans (list of objects) — source columns to be mapped into node properties of the given type. The data type affects how the data is represented into Neo4j, but does not create type constraints.

  • transform (object) — if "group": true, the import will SQL GROUP BY on all fields specified in keys and properties. If set to false, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions, see Transformations.

  • execute_after (string) — target object after which the current target should run. Either node, edge, or custom_query. To be used in conjunction with execute_after_name.

  • execute_after_name (string) — the name of the target after which the current one should run.

For more information on indexes and constraints in Neo4j, see Cypher → Constraints and Cypher → Indexes for search performance.

The objects in keys, mandatory, unique, indexed, and all the type properties (strings, floats, etc) have the format

{"<column-name-in-source>": "<wished-node-property-name>"}

For example, {"person_tmbdId": "id"} will map the source column person_tmbdId to the property id in the new nodes.

Things to pay attention to:

  • make sure to quote and escape labels.

  • names in keys should not also be listed in unique or mandatory, or the constraints will conflict.

  • source data must not have null values for keys columns, or they will clash with the node key constraint. If the source is not clean in this respect, think of cleaning it upfront in the related source.query field by excluding all rows that wouldn’t fulfill the constraints (ex. WHERE person_tmbdId IS NOT NULL).

  • if index_all_properties: true in config, it is pointless to specify any columns in properties.indexed.

  • when a property list has multiple items, specify each property in a separate object within the list, and not as a single object containing them all.

    // Good
    "dates": [
      {"born": "born"},
      {"died": "died"}
    ]
    
    // Bad
    "dates": [
      {
        "born": "born",
        "died": "died"
      }
    ]

Edge objects

Compulsory attributes for edge objects are source, mappings.type, mappings.source, and mappings.target.

{
  "edge": {
    "source": "acted_in",
    "name": "Acted_in",
    "mode": "merge",
    "mappings": {
      "type": "\"ACTED_IN\"",
      "source": {
        "label": "\"Person\"",
        "key": "person_tmdbId"
      },
      "target": {
        "label": "\"Movie\"",
        "key": "movieId"
      },
      "properties": {
        "keys": [],
        "mandatory": [],
        "unique": [],
        "indexed": [],
        "strings": [
          {"role": "role"}
        ]
      }
    },
    "transform": {
      "group": true
    }
  }
}
  • source (string) — the name of the source this target should draw data from. Should match one of the names from the sources objects.

  • name (string) — a human-friendly name for the target (unique among all targets).

  • mode (string) — the creation mode in Neo4j. Either merge or append (default). See Cypher → MERGE and Cypher → CREATE for info.

  • mappings (object) — details on how the source columns should be mapped into node details.

    • type (string) — type to assign to the relationship. Note that it should be quoted and escaped.

    • source (object) — starting node for the relationship (identified by node label and key).

    • target (object) — ending node for the relationship (identified by node label and key).

    • properties (object) — mapping of source columns into relationship properties.

      • keys (list of objects) — source columns that should be mapped into relationship properties and get a relationship key constraint.

      • mandatory (list of objects) — source columns that should be mapped into relationship properties and get a relationship property existence constraint.

      • unique (list of objects) — source columns that should be mapped into relationship properties and get a relationship uniqueness constraint.

      • indexed (list of objects) — source columns that should be mapped into relationship properties and get an index on the corresponding relationship property.

      • strings, floats, integers, dates, points, booleans (list of objects) — source columns to be mapped into node properties of the given type. The data type affects how the data is represented into Neo4j, but does not create type constraints.

  • transform (object) — if "group": true, the import will SQL GROUP BY on all fields specified in mappings.source, mappings.target, and properties. If set to false, any duplicate data in the source will be pushed into Neo4j, potentially raising constraints errors or making insertion less efficient. The object can also contain aggregation functions, see Transformations.

  • execute_after (string) — target object after which the current target should run. Either node, edge, or custom_query. To be used in conjunction with execute_after_name.

  • execute_after_name (string) — the name of the target after which the current one should run.

For more information on indexes and constraints in Neo4j, see Cypher → Constraints and Cypher → Indexes for search performance.

The objects in unique, indexed, and all the type properties (strings, floats, etc) have the format

{"<column-name-in-source>": "<wished-relationship-property-name>"}

For example, {"role": "role"} will map the source column role to the property role in the new relationships.

Things to pay attention to:

  • make sure to quote and escape relationship types and node labels.

  • source.key and target.key take names from the source columns, not from the mapped graph properties. In the snippet above, notice how the key names are person_tmdbId and movieId even if the mapped property names in the related node objects are person_id and movie_id.

  • if index_all_properties: true in config, it is pointless to specify any columns in properties.indexed.

  • when a property list has multiple items, specify each property in a separate object within the list, and not as a single object containing them all.

Custom queries

Custom query targets are useful when the import requires a complex query that does not easily fit into the node/edge targets format.

Compulsory attributes for custom_query objects are source and query.

{
  "custom_query": {
    "name": "Person nodes",
    "source": "persons",
    "query": "UNWIND $rows AS row WHERE row.person_tmdbId IS NOT NULL MERGE (p:Person {id: row.person_tmdbId, name: row.name, born_in: row.bornIn, born: date(row.born), died: date(row.died)}) ON CREATE SET p.created_time=datetime()"
  }
}
  • source (string) — the name of the source this target should draw data from. Should match one of the names from the sources objects. The rows extracted from the source are passed as a list query parameter to query.

  • name (string) — a human-friendly name for the target (unique among all targets).

  • query (string) — a Cypher query. Data from the source is available as a list in the parameter $rows.

  • execute_after (string) -- target object after which the current target should run. Either node, edge, or custom_query. To be used in conjunction with execute_after_name.

  • execute_after_name (string) -- the name of the target after which the current one should run.

Do not use custom queries to run Cypher that does not directly depend on a source; use actions instead. One-off queries, especially if not idempotent, are not fit to use in custom query targets. The reason for this is that queries from targets are run in batches, so a custom query may be run several times depending on the number of $rows batches extracted from the source.

Transformations

Each target can optionally have a transform attribute containing aggregation functions. This can be useful to extract higher-level dimensions from a more granular source. Aggregations result in extra fields that become available for import into Neo4j.

The following example shows how the aggregations would work on a fictitious dataset (not the movies one).

"transform": {
  "group": true,
  "aggregations": [
    {
      "expr": "SUM(unit_price*quantity)",
      "field": "total_amount_sold"
    },
    {
      "expr": "SUM(quantity)",
      "field": "total_quantity_sold"
    }
  ],
  "limit": 50,
  "where": "person_tmbdId IS NOT NULL"
}
  • group (bool) — must be true for aggregations/where to work.

  • aggregations (list of objects) — aggregation functions are specified as SQL queries in the expr attribute, and the result is available under the name specified in field.

  • limit (int) — caps the number of source rows that are considered for import (defaults to no limit, encoded as -1).

  • where (string) — filters out source data prior to import (with an SQL WHERE clause format).

Pre/Post load actions

The actions section contains commands that can be run before or after specific steps of the import process. You may for example submit HTTP requests when steps complete, or execute SQL queries on the source, or Cypher statements on the Neo4j target.

{
  "name": "Post load POST request",
  "execute_after": "edge",
  "execute_after_name": "Acted_in",
  "type": "http_post",
  "options": [
    {"url": "https://httpbin.org/post"},
    {"param1": "value1"}
  ],
  "headers": [
    {"header1": "value1"},
    {"header2": "value2"}
  ]
}
  • name (string) — a human friendly name for the action.

  • execute_after (string) — after what import step the action should run. Valid values are:

    • preloads — before any source is parsed

    • sources — after sources have been parsed

    • nodes — after all node objects have been processed

    • edges — after all edge objects have been processed

    • custom_queries — after all custom_query objects have been processed

    • loads — after all entities (nodes+edges) have been processed

    • source, node, edge, custom_queries, action — after a specific source or node or edge or custom query or action object has been run, to be used in conjunction with execute_after_name

  • execute_after_name (string) — after which source/node/edge/custom_query/action object the step should run.

  • type (string) — what action to run. Valid values are:

    • http_post — HTTP POST request (requires a url option)

    • http_get — HTTP GET request (requires a url option)

    • bigquery — query to a BigQuery database (requires an sql option)

    • cypher — query to the target Neo4j database (requires a cypher option)

  • options (list of objects) — action options, such as url, sql, cypher.

  • headers (list of objects) — headers to send with the request.

Variables

For production use cases it is common to supply date ranges or parameters based on dimensions, tenants, or tokens. Key-values can be supplied to replace $ delimited tokens in SQL queries, URLs, custom queries, or action options/headers. You can provide parameters in the Options JSON field when creating the Dataflow job, as a JSON object.

Variables must be escaped with the $ symbol (ex. $limit). Replaceable tokens can appear in job specification files and in readQuery or inputFilePattern (source URI) command-line parameters.