Quickstart
Neo4j Connector for Apache Spark allows you to read from and write to Neo4j databases.
It’s fairly easy to use, although it can be highly customized.
Installation guide
Prerequisites
First, install Spark as documented on their website.
Choose the version carefully since the wrong combination of Scala version and Spark version breaks your code. Check this page for more information about the proper JAR to use.
Where to get the JARs?
You can download the connector JAR from the Neo4j Connector Page or from the GitHub releases page.
As for the Spark installation, make sure to choose the version carefully since the wrong combination of Scala version and Spark version breaks your code. Check this page for more information about the proper JAR to use.
Using spark-submit, spark-shell, or pyspark
$SPARK_HOME/bin/spark-shell --jars neo4j-connector-apache-spark_2.13-5.3.0_for_spark_3.jar
The connector is also available from Spark Packages:
$SPARK_HOME/bin/spark-shell --packages org.neo4j:neo4j-connector-apache-spark_2.13:5.3.0_for_spark_3
Using sbt
If you use the sbt-spark-package plugin, in your build.sbt
file add:
scala spDependencies += "org.neo4j/neo4j-connector-apache-spark_${scala.version}:5.3.0_for_spark_${spark.version.major}"
Otherwise:
libraryDependencies += "org.neo4j" % "neo4j-connector-apache-spark_${scala.version}" % "5.3.0_for_spark_${spark.version.major}"
Using the connector on Databricks
You need to first get the proper JAR for the Databricks Runtime Version, here you can find where to download the JARs.
Go to your cluster page, select the Library tab and install a new library simply by uploading the JAR.
This is it!
Getting started
You can create Spark DataFrames and write them to your Neo4j database:
Person
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = List(
("John Doe", 32),
("Jane Doe", 42),
).toDF("name", "age")
(df.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Append)
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", ":Person")
.save())
Similarly, you can read from your Neo4j database and have the data available as Spark DataFrame.
Person
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", "Person")
.load())
df.show()
General considerations
Before going into details, we need to address you to some preliminary considerations on data types, filters, and schema.
Complex data types
Spark doesn’t natively support all Neo4j data types (i.e., Point
, Time
, Duration
). Such types are transformed into Struct types containing all the useful data.
For complete details on type handling, consult the Data type mapping between Neo4j and Spark.
Type | Struct |
---|---|
Duration |
Struct(Array( ("type", DataTypes.StringType, false), ("months", DataTypes.LongType, false), ("days", DataTypes.LongType, false), ("seconds", DataTypes.LongType, false), ("nanoseconds", DataTypes.IntegerType, false), ("value", DataTypes.StringType, false) )) |
Point |
Struct(Array( ("type", DataTypes.StringType, false), ("srid", DataTypes.IntegerType, false), ("x", DataTypes.DoubleType, false), ("y", DataTypes.DoubleType, false), ("z", DataTypes.DoubleType, true), )) |
Time |
Struct(Array( ("type", DataTypes.StringType, false), ("value", DataTypes.StringType, false) )) |
Filters
The Neo4j Connector for Apache Spark implements the SupportPushdownFilters
interface, that allows you to push the Spark filters down to the Neo4j layer.
In this way the data that Spark receives have been already filtered by Neo4j,
decreasing the amount of data transferred from Neo4j to Spark.
You can manually disable the PushdownFilters
support using the pushdown.filters.enabled
option and set it to false
(default is true
).
If you use the filter function more than once, like in this example:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", ":Person")
.load())
df.where("name = 'John Doe'").where("age = 32").show()
The conditions are automatically joined with an AND
operator.
When using relationship.node.map = true or query the PushdownFilters support is automatically disabled.
In that case, the filters are applied by Spark and not by Neo4j.
|
Aggregation
The Neo4j Connector for Apache Spark implements the SupportsPushDownAggregates
interface, that allows you to push
Spark aggregations down to the Neo4j layer.
In this way the data that Spark receives have been already aggregate by Neo4j,
decreasing the amount of data transferred from Neo4j to Spark.
You can manually disable the PushdownAggregate support using the pushdown.aggregate.enabled
option and set it to false
(default is true
).
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
(spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.createTempView("BOUGHT"))
val df = spark.sql(
"""SELECT `source.fullName`, MAX(`target.price`) AS max, MIN(`target.price`) AS min
|FROM BOUGHT
|GROUP BY `source.fullName`""".stripMargin)
df.show()
The MAX
and MIN
operators are applied directly on Neo4j.
Push-down limit
The Neo4j Connector for Apache Spark implements the SupportsPushDownLimit
interface.
That allows you to push Spark limits down to the Neo4j layer.
In this way the data that Spark receives have been already limited by Neo4j.
This decreases the amount of data transferred from Neo4j to Spark.
You can manually disable the PushdownLimit
support using the pushdown.limit.enabled
option and set it to false
(default is true
).
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.limit(10))
df.show()
The limit
value will be pushed down to Neo4j.
Push-down top N
The Neo4j Connector for Apache Spark implements the SupportsPushDownTopN
interface.
That allows you to push top N aggregations down to the Neo4j layer.
In this way the data that Spark receives have been already aggregated and limited by Neo4j.
This decreases the amount of data transferred from Neo4j to Spark.
You can manually disable the PushDownTopN
support using the pushdown.topN.enabled
option and set it to false
(default is true
).
// Given a DB populated with the following query
"""
CREATE (pe:Person {id: 1, fullName: 'Jane Doe'})
WITH pe
UNWIND range(1, 10) as id
CREATE (pr:Product {id: id * rand(), name: 'Product ' + id, price: id})
CREATE (pe)-[:BOUGHT{when: rand(), quantity: rand() * 1000}]->(pr)
RETURN *
"""
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("relationship", "BOUGHT")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load
.select("`target.name`", "`target.id`")
.sort(col("`target.name`").desc)
.limit(10))
df.show()
The limit
value will be pushed down to Neo4j.
Schema
Spark works with data in a fixed tabular schema. To accomplish this, the Neo4j Connector has a schema inference system. It creates the schema based on the data retrieved from the database. Each read data method has its own strategy to create it, that is explained in the corresponding section.
In general, we first try to use APOC’s nodeTypeProperties
and relTypeProperties
procedures.
If they are not available, we flatten the first schema.flatten.limit
results and try to infer the schema by the type of each column.
If you don’t want this process to happen, set schema.strategy
to string
(default is sample
),
and every column is presented as a string.
Schema strategy sample is good when all instances of a property in Neo4j are of the same type,
and string followed by ad-hoc cast is better when property types may differ.
Remember that Neo4j does not enforce property typing, and so person.age could for instance sometimes be a long and sometimes be a string .
|
Example
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
.load())
df.printSchema()
df.show()
root |-- id: long (nullable = true) |-- name: string (nullable = true)
id | name |
---|---|
0 |
John Doe |
1 |
Jane Doe |
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
.option("schema.strategy", "string")
.load())
df.printSchema()
df.show()
root |-- id: string (nullable = true) |-- name: string (nullable = true)
id | name |
---|---|
"0" |
"John Doe" |
"1" |
"Jane Doe" |
As you can see, the Struct returned by the query is made of strings. To convert only some of the values, use regular Scala/Python code:
import scala.jdk.CollectionConverters._
val result = df.collectAsList()
for (row <- result.asScala) {
// if <some specific condition> then convert like below
println(s"""Age is: ${row.getString(0).toLong}""")
}
User defined schema
You can skip the automatic schema extraction process by providing a user defined schema using the .schema()
method.
import org.apache.spark.sql.types.{DataTypes, StructType, StructField}
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
(spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.schema(StructType(Array(StructField("id", DataTypes.StringType), StructField("name", DataTypes.StringType))))
.option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
.load()
.show())
id | name |
---|---|
"0" |
"John Doe" |
"1" |
"Jane Doe" |
In this way you have total control over the schema.
Known problem
Because Neo4j is a schema free database, the following scenario may occur:
CREATE (p1:Person {age: "32"}), (p2:Person {age: 23})
The same field on the same node label has two different types.
Spark doesn’t like it since the DataFrame requires a schema, meaning each column of the DataFrame needs to have its own type.
java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long
In this case you can either clean up and normalize your data, or rely on the connector to
implicitly cast values to String
.
This solution is not error-proof, you might still get errors if the values cannot be coerced to String. |
When the casting operation happens, this warning appears in your log, letting you know what has happened:
The field "age" has different types: [String, Long]
Every value will be casted to string.
The safest solution is to clean your data, but that is not always possible.
This is why schema.strategy
is introduced, and you can set to string
to get all the values
converted to string.
Partitioning
While we’re trying to pull off the data we offer the possibility to partition the extraction in order to parallelize it.
Please consider the following job:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = (spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", "neo4j")
.option("authentication.basic.password", "letmein!")
.option("labels", "Person")
.option("partitions", "5")
.load())
This means that if the total count of the nodes with label Person
into Neo4j is 100 we are creating 5
partitions and each one manages 20 records (we use SKIP / LIMIT
queries).
Partitioning the dataset makes sense only if you’re dealing with a big dataset (>= 10M of records).
How to parallelize the query execution
Three options are available:
-
Node extraction.
-
Relationship extraction.
-
Query extraction.
A general count on what you’re trying to pull off is being provided and
a query with SKIP / LIMIT
approach over each partition is being built.
Therefore, for a dataset of 100 nodes (Person
) with a partition size of 5 the following queries are generated (one for partition):
MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20
While for node and relationship extraction, you leverage the Neo4j count store in order to retrieve the total count about the nodes/relationships you’re trying pulling off, for the (3) you have two possible approaches:
-
Compute a count over the query that you’re using.
-
Compute a count over a second optimized query that leverages indexes. In this case, you can pass it via the
.option("query.count", "<your cypher query>")
the query must always return only one field namedcount
which is the result of the count:
MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count