Reading from Neo4j
Neo4j Connector for Apache Spark allows you to read data from a Neo4j instance in three different ways:
-
By node labels
-
By relationship type
-
By Cypher® query
Getting started
Reading all the nodes of label Person
from your local Neo4j instance is as simple as this:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("labels", "Person")
.load()
.show()
<id> | <labels> | name | age |
---|---|---|---|
0 |
[Person] |
John |
32 |
Neo4j read options
Setting name | Description | Default value | Required |
---|---|---|---|
|
Cypher query to read the data |
(none) |
Yes* |
|
List of node labels separated by colon. The first label is to be the primary label. |
(none) |
Yes* |
|
Type of a relationship |
(none) |
Yes* |
|
Number of records to be used to create the Schema (only if APOC is not installed,
or for custom Cypher queries provided via |
|
No |
|
Strategy used by the connector in order to compute the Schema definition for the Dataset.
Possible values are |
|
No |
|
Enable or disable the PushdownFilters support. |
|
No |
|
Enable or disable the PushdownColumn support. |
|
No |
|
Enable or disable the PushdownAggregate support. |
|
No |
|
Enable or disable the PushdownLimit support. |
|
No |
|
Enable or disable the PushDownTopN support. |
|
No |
|
This defines the parallelization level while pulling data from Neo4j. Note: as more parallelization does not mean better query performance, tune wisely in according to your Neo4j installation. |
|
No |
Query specific options |
|||
|
Query count is used only in combination with MATCH (p:Person)-[r:BOUGHT]->(pr:Product) WHERE pr.name = 'An Awesome Product' RETURN count(p) AS count or a simple number that represents the number of records returned by |
(empty) |
No |
Relationship specific options |
|||
|
If it’s set to |
|
No |
|
List of source node labels separated by colon. |
(empty) |
Yes |
|
List of target node labels separated by colon. |
(empty) |
Yes |
* Just one of the options can be specified at the time.
Read data
Reading data from a Neo4j Database can be done in three ways:
Custom Cypher query
You can specify a Cypher query in this way:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) AS id, n.name AS name")
.load()
.show()
id | name |
---|---|
0 |
John Doe |
1 |
Jane Doe |
We recommend individual property fields to be returned, rather than returning graph entity (node, relationship, and path) types. This best maps to Spark’s type system and yields the best results. So instead of writing:
write the following:
If your query returns a graph entity, use the |
The structure of the Dataset returned by the query is influenced by the query itself.
In this particular context, it could happen that the connector isn’t able to sample the Schema from the query,
so in these cases, we suggest trying with the option schema.strategy
set to string
as described here.
Read query must always return some data (read: must always have a return statement).
If you use store procedures, remember to YIELD and then RETURN data.
|
Script option
The script option allows you to execute a series of preparation script before Spark
Job execution, the result of the last query can be reused in combination with the
query
read mode as it follows:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() spark.read.format("org.neo4j.spark.DataSource") .option("url", "bolt://localhost:7687") .option("script", "RETURN 'foo' AS val") .option("query", "UNWIND range(1,2) as id RETURN id AS val, scriptResult[0].val AS script") .load() .show()
Before the extraction from Neo4j starts, the connector runs the content of the script
option
and the result of the last query is injected into the query
.
val | script |
---|---|
1 |
foo |
2 |
foo |
Schema
The first 10 (or any number specified by the schema.flatten.limit
option) results are flattened and the schema is created from those properties.
If the query returns no data, the sampling is not possible. In this case, the connector creates a schema from the return statement, and every column is going to be of type String. This does not cause any problems since you have no data in your dataset.
For example, you have this query:
MATCH (n:NON_EXISTENT_LABEL) RETURN id(n) AS id, n.name, n.age
The created schema is the following:
Column | Type |
---|---|
id |
String |
n.name |
String |
n.age |
String |
The returned column order is not guaranteed to match the RETURN statement for Neo4j 3.x and Neo4j 4.0. Starting from Neo4j 4.1 the order is the same. |
Limit the results
This connector does not permit using SKIP
or LIMIT
at the end of a Cypher query.
Attempts to do this result in errors, such as the message:
SKIP/LIMIT are not allowed at the end of the query.
This is not supported, because internally the connector uses SKIP/LIMIT pagination to break read sets up into multiple partitions to support partitioned reads. As a result, user-provided SKIP/LIMIT clashes with what the connector itself adds to your query to support parallelism.
There is a work-around though; you can still accomplish the same by using SKIP / LIMIT
internal inside of the query, rather than after the final RETURN
block of the query.
Here’s an example. This first query is rejected and fails:
MATCH (p:Person)
RETURN p.name AS name
ORDER BY name
LIMIT 10
However, you can reformulate this query to make it works:
MATCH (p:Person)
WITH p.name AS name
ORDER BY name
LIMIT 10
RETURN p.name
The queries return the exact same data, but only the second one is usable with the Spark connector and partition-able, because of the WITH
clause and the simple final RETURN
clause. If you choose to reformulate queries to use "internal SKIP/LIMIT", take careful notice of ordering operations to guarantee the same result set.
You may also use the query.count
option rather than reformulating your query (more on it here).
Node
You can read nodes by specifiying a single label, or multiple labels. Like so:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("labels", "Person")
.load()
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("labels", "Person:Customer:Confirmed")
.load()
Label list can be specified both with starting colon or without it:Person:Customer and :Person:Customer are considered the same thing.
|
Columns
When reading data with this method, the DataFrame contains all the fields contained in the nodes, plus two additional columns.
-
<id>
the internal Neo4j ID -
<labels>
a list of labels for that node
Schema
If APOC is available, the schema is created with apoc.meta.nodeTypeProperties. Otherwise, we execute the following Cypher query:
MATCH (n:<labels>)
RETURN n
ORDER BY rand()
LIMIT <limit>
Where <labels>
is the list of labels provided by labels
option and <limit>
is the
value provided by schema.flatten.limit
option.
The results of such query are flattened, and the schema is created from those properties.
Example
CREATE (p1:Person {age: 31, name: 'Jane Doe'}),
(p2:Person {name: 'John Doe', age: 33, location: null}),
(p3:Person {age: 25, location: point({latitude: -37.659560, longitude: -68.178060})})
The following schema is created:
Field | Type |
---|---|
<id> |
Int |
<labels> |
String[] |
age |
Int |
name |
String |
location |
Point |
Relationship
To read a relationship you must specify the relationship type, the source node labels, and the target node labels.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
This creates the following Cypher query:
MATCH (source:Person)-[rel:BOUGHT]->(target:Product)
RETURN source, rel, target
Node mapping
The result format can be controlled by the relationship.nodes.map
option (default is false
).
When it is set to false
, source and target nodes properties are returned in separate columns
prefixed with source.
or target.
(i.e., source.name
, target.price
).
When it is set to true
, the source and target nodes properties are returned as Map[String, String] in two columns named source
and target
.
false
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
.show()
<rel.id> | <rel.type> | <source.id> | <source.labels> | source.id | source.fullName | <target.id> | <target.labels> | target.name | target.id | rel.quantity |
---|---|---|---|---|---|---|---|---|---|---|
4 |
BOUGHT |
1 |
[Person] |
1 |
John Doe |
0 |
[Product] |
Product 1 |
52 |
240 |
5 |
BOUGHT |
3 |
[Person] |
2 |
Jane Doe |
2 |
[Product] |
Product 2 |
53 |
145 |
true
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "true")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
.show()
<rel.id> | <rel.type> | rel.quantity | <source> | <target> |
---|---|---|---|---|
4 |
BOUGHT |
240 |
{ "fullName": "John Doe", "id": 1, "<labels>: "[Person]", "<id>": 1 } |
{ "name": "Product 1", "id": 52, "<labels>: "[Product]", "<id>": 0 } |
4 |
BOUGHT |
145 |
{ "fullName": "Jane Doe", "id": 1, "<labels>: "[Person]", "<id>": 3 } |
{ "name": "Product 2", "id": 53, "<labels>: "[Product]", "<id>": 2 } |
Columns
When reading data with this method, the DataFrame contains the following columns:
-
<id>
the internal Neo4j ID. -
<relationshipType>
the relationship type. -
rel.[property name]
relationship properties.
Depending on the value of relationship.nodes.map
option.
If true
:
-
source
the Map<String, String> of source node -
target
the Map<String, String> of target node
If false
:
-
<sourceId>
the internal Neo4j ID of source node -
<sourceLabels>
a list of labels for source node -
<targetId>
the internal Neo4j ID of target node -
<targetLabels>
a list of labels for target node -
source.[property name]
source node properties -
target.[property name]
target node properties
Filtering
You can use Spark to filter properties of the relationship, the source node, or the target node. Use the correct prefix:
If relationship.nodes.map
is set to false
:
-
`source.[property]`
for the source node properties. -
`rel.[property]`
for the relationship property. -
`target.[property]`
for the target node property.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
df.where("`source.id` = 14 AND `target.id` = 16")
If relationship.nodes.map
is set to true
:
-
`<source>`.`[property]`
for the source node map properties. -
`<rel>`.`[property]`
for the relationship map property. -
`<target>`.`[property]`
for the target node map property.
In this case, all the map values are to be strings, so the filter value must be a string too.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "true")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
df.where("`<source>`.`id` = '14' AND `<target>`.`id` = '16'")
Schema
In case you’re extracting a relationship from Neo4j, the first step is to invoke the apoc.meta.relTypeProperties procedure. If APOC is not installed, we execute the following Cypher query:
MATCH (source:<source_labels>)-[rel:<relationship>]->(target:<target_labels>)
RETURN rel
ORDER BY rand()
LIMIT <limit>
Where:
-
<source_labels>
is the list of labels provided byrelationship.source.labels
option -
<target_labels>
is the list of labels provided byrelationship.target.labels
option -
<relationship>
is the list of labels provided byrelationship
option -
<limit>
is the value provided viaschema.flatten.limit
Was this page helpful?