Reading from Neo4j

Neo4j Connector for Apache Spark allows you to read data from a Neo4j instance in three different ways:

  • By node labels

  • By relationship type

  • By Cypher® query

Getting started

Reading all the nodes of label Person from your local Neo4j instance is as simple as this:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person")
  .load()
  .show()
Table 1. Result of the above code
<id> <labels> name age

0

[Person]

John

32

Neo4j read options

Table 2. List of available read options
Setting name Description Default value Required

query

Cypher query to read the data

(none)

Yes*

labels

List of node labels separated by colon. The first label is to be the primary label.

(none)

Yes*

relationship

Type of a relationship

(none)

Yes*

schema.flatten.limit

Number of records to be used to create the Schema (only if APOC is not installed, or for custom Cypher queries provided via query options).

10

No

schema.strategy

Strategy used by the connector in order to compute the Schema definition for the Dataset. Possible values are string, sample. When string is set, it coerces all the properties to String, otherwise it tries to sample the Neo4j’s dataset.

sample

No

pushdown.filters.enabled

Enable or disable the PushdownFilters support.

true

No

pushdown.columns.enabled

Enable or disable the PushdownColumn support.

true

No

pushdown.aggregate.enabled

Enable or disable the PushdownAggregate support.

true

No

pushdown.limit.enabled v.5.1

Enable or disable the PushdownLimit support.

true

No

pushdown.topN.enabled v.5.2

Enable or disable the PushDownTopN support.

true

No

partitions

This defines the parallelization level while pulling data from Neo4j.

Note: as more parallelization does not mean better query performance, tune wisely in according to your Neo4j installation.

1

No

Query specific options

query.count

Query count is used only in combination with query option. This is a query that returns a count field like the following:

MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count

or a simple number that represents the number of records returned by query. Consider that the number passed by this value represents the volume of the data pulled off Neo4j, so use it carefully.

(empty)

No

Relationship specific options

relationship.nodes.map

If it’s set to true, source and target nodes are returned as Map<String, String>, otherwise we flatten the properties by returning every single node property as column prefixed by source or target

false

No

relationship.source.labels

List of source node labels separated by colon.

(empty)

Yes

relationship.target.labels

List of target node labels separated by colon.

(empty)

Yes

* Just one of the options can be specified at the time.

Read data

Reading data from a Neo4j Database can be done in three ways:

Custom Cypher query

You can specify a Cypher query in this way:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) AS id, n.name AS name")
  .load()
  .show()
Table 3. Result of the above code
id name

0

John Doe

1

Jane Doe

We recommend individual property fields to be returned, rather than returning graph entity (node, relationship, and path) types. This best maps to Spark’s type system and yields the best results. So instead of writing:

MATCH (p:Person) RETURN p

write the following:

MATCH (p:Person) RETURN id(p) AS id, p.name AS name.

If your query returns a graph entity, use the labels or relationship modes instead.

The structure of the Dataset returned by the query is influenced by the query itself. In this particular context, it could happen that the connector isn’t able to sample the Schema from the query, so in these cases, we suggest trying with the option schema.strategy set to string as described here.

Read query must always return some data (read: must always have a return statement). If you use store procedures, remember to YIELD and then RETURN data.

Script option

The script option allows you to execute a series of preparation script before Spark Job execution, the result of the last query can be reused in combination with the query read mode as it follows:

import org.apache.spark.sql.SparkSession

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("script", "RETURN 'foo' AS val")
  .option("query", "UNWIND range(1,2) as id RETURN id AS val, scriptResult[0].val AS script")
  .load()
  .show()

Before the extraction from Neo4j starts, the connector runs the content of the script option and the result of the last query is injected into the query.

Table 4. Result of the above code
val script

1

foo

2

foo

Schema

The first 10 (or any number specified by the schema.flatten.limit option) results are flattened and the schema is created from those properties.

If the query returns no data, the sampling is not possible. In this case, the connector creates a schema from the return statement, and every column is going to be of type String. This does not cause any problems since you have no data in your dataset.

For example, you have this query:

MATCH (n:NON_EXISTENT_LABEL) RETURN id(n) AS id, n.name, n.age

The created schema is the following:

Column Type

id

String

n.name

String

n.age

String

The returned column order is not guaranteed to match the RETURN statement for Neo4j 3.x and Neo4j 4.0.

Starting from Neo4j 4.1 the order is the same.

Limit the results

This connector does not permit using SKIP or LIMIT at the end of a Cypher query.
Attempts to do this result in errors, such as the message:
SKIP/LIMIT are not allowed at the end of the query.

This is not supported, because internally the connector uses SKIP/LIMIT pagination to break read sets up into multiple partitions to support partitioned reads. As a result, user-provided SKIP/LIMIT clashes with what the connector itself adds to your query to support parallelism.

There is a work-around though; you can still accomplish the same by using SKIP / LIMIT internal inside of the query, rather than after the final RETURN block of the query.

Here’s an example. This first query is rejected and fails:

MATCH (p:Person)
RETURN p.name AS name
ORDER BY name
LIMIT 10

However, you can reformulate this query to make it works:

MATCH (p:Person)
WITH p.name AS name
ORDER BY name
LIMIT 10
RETURN p.name

The queries return the exact same data, but only the second one is usable with the Spark connector and partition-able, because of the WITH clause and the simple final RETURN clause. If you choose to reformulate queries to use "internal SKIP/LIMIT", take careful notice of ordering operations to guarantee the same result set.

You may also use the query.count option rather than reformulating your query (more on it here).

Node

You can read nodes by specifiying a single label, or multiple labels. Like so:

Single label
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person")
  .load()
Multiple labels
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("labels", "Person:Customer:Confirmed")
  .load()
Label list can be specified both with starting colon or without it:
Person:Customer and :Person:Customer are considered the same thing.

Columns

When reading data with this method, the DataFrame contains all the fields contained in the nodes, plus two additional columns.

  • <id> the internal Neo4j ID

  • <labels> a list of labels for that node

Schema

If APOC is available, the schema is created with apoc.meta.nodeTypeProperties. Otherwise, we execute the following Cypher query:

MATCH (n:<labels>)
RETURN n
ORDER BY rand()
LIMIT <limit>

Where <labels> is the list of labels provided by labels option and <limit> is the value provided by schema.flatten.limit option. The results of such query are flattened, and the schema is created from those properties.

Example
CREATE (p1:Person {age: 31, name: 'Jane Doe'}),
    (p2:Person {name: 'John Doe', age: 33, location: null}),
    (p3:Person {age: 25, location: point({latitude: -37.659560, longitude: -68.178060})})

The following schema is created:

Field Type

<id>

Int

<labels>

String[]

age

Int

name

String

location

Point

Relationship

To read a relationship you must specify the relationship type, the source node labels, and the target node labels.

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

This creates the following Cypher query:

MATCH (source:Person)-[rel:BOUGHT]->(target:Product)
RETURN source, rel, target

Node mapping

The result format can be controlled by the relationship.nodes.map option (default is false).

When it is set to false, source and target nodes properties are returned in separate columns prefixed with source. or target. (i.e., source.name, target.price).

When it is set to true, the source and target nodes properties are returned as Map[String, String] in two columns named source and target.

Nodes map set to false
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()
  .show()
Table 5. Result of the above code
<rel.id> <rel.type> <source.id> <source.labels> source.id source.fullName <target.id> <target.labels> target.name target.id rel.quantity

4

BOUGHT

1

[Person]

1

John Doe

0

[Product]

Product 1

52

240

5

BOUGHT

3

[Person]

2

Jane Doe

2

[Product]

Product 2

53

145

Nodes map set to true
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "true")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()
  .show()
Table 6. Result of the above code
<rel.id> <rel.type> rel.quantity <source> <target>

4

BOUGHT

240

{
  "fullName": "John Doe",
  "id": 1,
  "<labels>: "[Person]",
  "<id>": 1
}
{
  "name": "Product 1",
  "id": 52,
  "<labels>: "[Product]",
  "<id>": 0
}

4

BOUGHT

145

{
  "fullName": "Jane Doe",
  "id": 1,
  "<labels>:
  "[Person]",
  "<id>": 3
}
{
  "name": "Product 2",
  "id": 53,
  "<labels>: "[Product]",
  "<id>": 2
}

Columns

When reading data with this method, the DataFrame contains the following columns:

  • <id> the internal Neo4j ID.

  • <relationshipType> the relationship type.

  • rel.[property name] relationship properties.

Depending on the value of relationship.nodes.map option.

If true:

  • source the Map<String, String> of source node

  • target the Map<String, String> of target node

If false:

  • <sourceId> the internal Neo4j ID of source node

  • <sourceLabels> a list of labels for source node

  • <targetId> the internal Neo4j ID of target node

  • <targetLabels> a list of labels for target node

  • source.[property name] source node properties

  • target.[property name] target node properties

Filtering

You can use Spark to filter properties of the relationship, the source node, or the target node. Use the correct prefix:

If relationship.nodes.map is set to false:

  • `source.[property]` for the source node properties.

  • `rel.[property]` for the relationship property.

  • `target.[property]` for the target node property.

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.where("`source.id` = 14 AND `target.id` = 16")

If relationship.nodes.map is set to true:

  • `<source>`.`[property]` for the source node map properties.

  • `<rel>`.`[property]` for the relationship map property.

  • `<target>`.`[property]` for the target node map property.

In this case, all the map values are to be strings, so the filter value must be a string too.

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "true")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.where("`<source>`.`id` = '14' AND `<target>`.`id` = '16'")

Schema

In case you’re extracting a relationship from Neo4j, the first step is to invoke the apoc.meta.relTypeProperties procedure. If APOC is not installed, we execute the following Cypher query:

MATCH (source:<source_labels>)-[rel:<relationship>]->(target:<target_labels>)
RETURN rel
ORDER BY rand()
LIMIT <limit>

Where:

  • <source_labels> is the list of labels provided by relationship.source.labels option

  • <target_labels> is the list of labels provided by relationship.target.labels option

  • <relationship> is the list of labels provided by relationship option

  • <limit> is the value provided via schema.flatten.limit

Performance considerations

If the schema is not specified, the Spark Connector uses sampling as explained here and here. Since sampling is potentially an expensive operation, consider supplying your own schema.