Reading from Neo4j

Neo4j Connector for Apache Spark allows you to read data from Neo4j in 3 different ways: by node labels, by relationship name, and by direct Cypher query.

Getting Started

Reading all the nodes of type Person from your local Neo4j instance is as simple as this:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person")
  .load()
  .show()
Table 1. Result of the above code
<id> <labels> name age

0

[Person]

John

32

Neo4j Read Options

Table 2. List of available read options
Setting Name Description Default Value Required

query

Cypher query to read the data

(none)

Yes*

labels

List of node labels separated by :. The first label will be the primary label

(none)

Yes*

relationship

Name of a relationship

(none)

Yes*

schema.flatten.limit

Number of records to be used to create the Schema (only if APOC are not installed)

10

No

schema.strategy

Strategy used by the connector in order to compute the Schema definition for the Dataset. Possibile values are string, sample. When string it coerces all the properties to String otherwise it will try to sample the Neo4j’s dataset.

sample

No

pushdown.filters.enabled

Enable or disable the Push Down Filters support

true

No

pushdown.columns.enabled

Enable or disable the Push Down Column support

true

No

partitions

This defines the parallelization level while pulling data from Neo4j.

Note: as more parallelization does not mean more performances so please tune wisely in according to your Neo4j installation.

1

No

Query Specific Options

query.count

Query count, used only in combination with query option, it’s a query that returns a count field like the following:

MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count

or a simple number that represents the amount of records returned by query. Consider that the number passed by this value represent the volume of the data pulled of Neo4j, so please use it carefully.

(empty)

No

Relationship Specific Options

relationship.nodes.map

If true return source and target nodes as Map<String, String>, otherwise we flatten the properties by returning every single node property as column prefixed by source or target

false

No

relationship.source.labels

List of source node Labels separated by :

(empty)

Yes

relationship.target.labels

List of target node Labels separated by :

(empty)

Yes

* Just one of the options can be specified at the time.

Read Data

Reading data from a Neo4j Database can be done in 3 ways:

Custom Cypher Query

You can specify a Cypher query in this way:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
  .load()
  .show()
Table 3. Result of the above code
id name

0

John Doe

1

Jane Doe

We recommend that individual property fields be returned, rather than returning graph entity (node, relationship, and path) types. This best maps to Spark’s type system and yields best results. So instead writing this MATCH (p:Person) RETURN p please write this: MATCH (p:Person) RETURN id(p) as id, p.name as name. If your query returns a graph entity please use the labels or relationship modes instead.

The struct of the Dataset returned by the query is influenced by the query itself, in this particular context it could happen that the connector won’t be able to sample the Schema from the query, in these cases we suggest trying with the option schema.strategy set to string as described here.

Read query must always return some data (read: must always have a return statement). If you use store procedures remember to YIELD and then RETURN data.

Script Option

The script option allow you to execute a series of preparation script before Spark Job execution, the result of the last query can be reused in combination with the query read mode as it follows

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("script", "RETURN 'foo' AS val")
  .option("query", "UNWIND range(1,2) as id RETURN id AS val, scriptResult[0].val AS script")
  .load()
  .show()

Before the extraction from Neo4j starts, the connector will run the content of the script option and the result of the last query will be injected into the query.

Table 4. Result of the above code
val script

1

foo

2

foo

Schema

The first 10 (or any number specified by the schema.flatten.limit option) results will be flattened and the schema will be created from those properties.

If the query returns no data the sampling won’t be possible. In these case the connector will create a schema from the return statement and every column will be of type String. This won’t cause any problems since you won’t have any data in your dataset.

For example, say you have this query:

MATCH (n:NON_EXISTENT_LABEL) RETURN id(n) as id, n.name, n.age

The created schema will be

Column Type

id

String

n.name

String

n.age

String

The returned column order is not guarantee to match the RETURN statement for Neo4j 3.* and Neo4j 4.0. Starting from Neo4j 4.1 it the order will be the same.

Limit the results

This connector does not permit using SKIP or LIMIT at the end of a Cypher query. Attempts to do this will result in errors, such as the message "SKIP/LIMIT are not allowed at the end of the query".

This is not supported because internally, the connector uses SKIP/LIMIT pagination to break read sets up into multiple partitions, to support partitioned reads. As a result, user-provided SKIP/LIMIT clashes with what the connector itself adds to your query to support parallelism.

There is a work-around though; you can still accomplish the same by using SKIP/LIMIT internal inside of the query, rather than after the final RETURN block of the query.

Here’s a simple example. This first query will be rejected, and will fail:

MATCH (p:Person)
RETURN p.name as name
ORDER BY name
LIMIT 10

This same query though can be reformulated and will still work.

MATCH (p:Person)
WITH p.name as name
ORDER BY name
LIMIT 10
RETURN p.name

The queries return the exact same data, but only the second one is usable with the spark connector, and partition-able, because of the WITH clause, and the simple final RETURN clause. If you choose to reformulate queries to use "internal SKIP/LIMIT" take careful notice of ordering operations to guarantee the same result set.

You may also use the query.count option rather than reformulating your query (more on it here).

Node

You can read nodes by specifiying a single label, or multiple labels. Like so:

Single label
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person")
  .load()
Multiple label
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person:Customer:Confirmed")
  .load()
Label list can be specified both with starting colon or without it: Person:Customer and :Person:Customer are considered the same thing.

Columns

When reading data with this method, the Dataframe will contain all the fields contained in the nodes, plus 2 additional columns.

  • <id> the internal Neo4j id

  • <labels> a list of labels for that node

Schema

If APOC are available, the schema will be created with apoc.meta.nodeTypeProperties. Otherwise, we’ll execute the following Cypher query:

MATCH (n:<labels>)
RETURN n
ORDER BY rand()
LIMIT <limit>

Where <labels> is the list of labels provided by labels option and <limit> is the value provided by schema.flatten.limit option. The results of such query will be flattened and the schema will be create from those properties.

Example
CREATE (p1:Person {age: 31, name: 'Jane Doe'}),
    (p2:Person {name: 'John Doe', age: 33, location: null}),
    (p3:Person {age: 25, location: point({latitude: -37.659560, longitude: -68.178060})})

Will create this schema

Field Type

<id>

Int

<labels>

String[]

age

Int

name

String

location

Point

Relationship

To read a relationship you must specify the relationship name, the source node labels, and the target node labels.

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

This will create a the following Cypher query:

MATCH (source:Person)-[rel:BOUGHT]->(target:Product)
RETURN source, rel, target

Node mapping

The result format can be controlled by the relationship.nodes.map option (default is false).

When set to false source and target nodes properties will be returned in separate columns, prefixed with source. or target. (ie: source.name, target.price)

When set to true the source and target properties will be returned as Map[String, String] in two columns named source`and `target.

Nodes map set to false
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()
  .show()
Table 5. Result of the above code
<rel.id> <rel.type> <source.id> <source.labels> source.id source.fullName <target.id> <target.labels> target.name target.id rel.quantity

4

BOUGHT

1

[Person]

1

John Doe

0

[Product]

Product 1

52

240

5

BOUGHT

3

[Person]

2

Jane Doe

2

[Product]

Product 2

53

145

Nodes map set to true
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "true")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()
  .show()
Table 6. Result of the above code
<rel.id> <rel.type> rel.quantity <source> <target>

4

BOUGHT

240

{
  "fullName": "John Doe",
  "id": 1,
  "<labels>: "[Person]",
  "<id>": 1
}
{
  "name": "Product 1",
  "id": 52,
  "<labels>: "[Product]",
  "<id>": 0
}

4

BOUGHT

145

{
  "fullName": "Jane Doe",
  "id": 1,
  "<labels>:
  "[Person]",
  "<id>": 3
}
{
  "name": "Product 2",
  "id": 53,
  "<labels>: "[Product]",
  "<id>": 2
}

Columns

When reading data with this method, the Dataframe will contain the following columns:

  • <id> the internal Neo4j id

  • <relationshipType> the relationship type

  • rel.[property name] relationship properties

Depending on the value of relationship.nodes.map option.

If true:

  • source the Map<String, String> of source node

  • target the Map<String, String> of target node

If false:

  • <sourceId> the internal Neo4j id of source node

  • <sourceLabels> a list of labels for source node

  • <targetId> the internal Neo4j id of target node

  • <targetLabels> a list of labels for target node

  • source.[property name] source node properties

  • target.[property name] target node properties

Filtering

You can use Spark to filter properties of the relationship, the source node, or the target node. Just use the correct prefix:

If relationship.nodes.map is set to false

  • `source.[property]` for the source node properties

  • `rel.[property]` for the relation property

  • `target.[property]` for the target node property

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.where("`source.id` = 14 AND `target.id` = 16")

If relationship.nodes.map is set to true

  • `<source>`.`[property]` for the source node map properties

  • `<rel>`.`[property]` for the relation map property

  • `<target>`.`[property]` for the target node map property

in this case, all the map values will be strings, so the filter value must be a string too.

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "true")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.where("`<source>`.`id` = '14' AND `<target>`.`id` = '16'")

Schema

In case you’re extracting a relationship from Neo4j we try as first step to invoke the apoc.meta.relTypeProperties procedure, in case the procedure is not installed we’ll execute the following Cypher query:

MATCH (source:<source_labels>)-[rel:<relationship>]->(target:<target_labels>)
RETURN rel
ORDER BY rand()
LIMIT <limit>

Where:

  • <source_labels> is the list of labels provided by relationship.source.labels option

  • <target_labels> is the list of labels provided by relationship.target.labels option

  • <relationship> is the list of labels provided by relationship option

  • <limit> is the value provided via schema.flatten.limit