Quickstart

Neo4j Connector for Apache Spark allows you to read from and write to Neo4j databases.

It’s fairly easy to use, although it can be highly customized.

Installation Guide

Where to get the JARs?

You can download the connector JAR from the Neo4j Connector Page or from the GitHub releases page.

Choose the version carefully since the wrong combination of Scala version and Spark version will break your code. Check this page for more info about the proper JAR to use.

Using spark-submit, spark-shell or pyspark

$SPARK_HOME/bin/spark-shell --jars neo4j-connector-apache-spark_2.12-4.0.1_for_spark_3.jar

The connector is also available from Spark Packages:

$SPARK_HOME/bin/spark-shell --packages org.neo4j:neo4j-connector-apache-spark_2.12:4.0.1_for_spark_3

Using sbt

If you use the sbt-spark-package plugin, in your sbt build file, add:

scala spDependencies += "org.neo4j/neo4j-connector-apache-spark_${scala.version}:4.0.1_for_spark_${spark.version}"

Otherwise:

libraryDependencies += "org.neo4j" % "neo4j-connector-apache-spark_${scala.version}" % "4.0.1_for_spark_${spark.version}"

Using Maven

pom.xml
<dependencies>
  <!-- list of dependencies -->
  <dependency>
    <groupId>org.neo4j</groupId>
    <artifactId>neo4j-connector-apache-spark_${scala.version}</artifactId>
    <version>4.0.1_for_spark_${spark.version}</version>
  </dependency>
</dependencies>

Gradle

dependencies{
    // list of dependencies
    compile "org.neo4j:neo4j-spark-connector:${connector.version}"
}

Using the connector on Databricks

You need to first get the proper JAR for the Databricks Runtime Version, here you can find where to download the JARs.

Go to your cluster page, select the Library tab and install a new library simply by uploading the JAR.

This is it!

Getting Started

You can read your Neo4j database and have the data available as Spark DataFrame.

Read all the nodes with of type Person
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "neo4j")
  .option("labels", "Person")
  .load()

Similarly, is possible to write your own DataFrame to Neo4j:

Write the DataFrame to nodes of type Person
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = Seq(
  ("John Doe"),
  ("Jane Doe")
).toDF("name")

df.write.format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "neo4j")
  .option("labels", ":Person")
  .save()

Visit the Reading and Writing sections for advanced usage.

General Considerations

Before diving into the advanced stuff, we need to address you to some preliminary considerations on data types, filters, and schema.

Complex Data Types

Spark doesn’t support all Neo4j data types (ie: Point, Time, Duration). Such types are transformed into Struct types containing all the useful data.

For complete details on type handling, consult the Neo4j-Spark Data Types Reference

Table 1. Complex data type conversion
Type Struct

Duration

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("months", DataTypes.LongType, false),
    ("days", DataTypes.LongType, false),
    ("seconds", DataTypes.LongType, false),
    ("nanoseconds", DataTypes.IntegerType, false),
    ("value", DataTypes.StringType, false)
  ))

Point

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("srid", DataTypes.IntegerType, false),
    ("x", DataTypes.DoubleType, false),
    ("y", DataTypes.DoubleType, false),
    ("z", DataTypes.DoubleType, true),
  ))

Time

Struct(Array(
    ("type", DataTypes.StringType, false),
    ("value", DataTypes.StringType, false)
  ))

Filters

The Neo4j Connector for Apache Spark implements the SupportPushDownFilters interface, that allows you to push the Spark filters down to the Neo4j layer. In this way the data that Spark will receive will be already filtered by Neo4j, decreasing the amount of data transferred from Neo4j to Spark.

You can manually disable the Push Down Filters support using the pushdown.filters.enabled option and set it to false (default is true).

If you use the filter function more than once, like in this example:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "neo4j")
  .option("labels", ":Person")
  .load()

df.where("name = 'John Doe'").where("age = 32").show()

The conditions will be automatically joined with an AND operator.

When using relationship.node.map = true or query the PushDownFilters support automatically disabled, thus the filters will be applied by Spark and not by Neo4j.

Schema

Spark works with data in a fixed tabular schema. To accomplish this Neo4j Connector has a schema infer system that creates the schema based on the data retrieved from the db. Each read data method has is own strategy to create it, that will be explained it each section.

In general, we first try to use APOC, if these are not available we flatten the first schema.flatten.limit results and try to infer the schema by the type of each column.

If you don’t want this process to happen you can set schema.strategy to string (default is sample), and every column will be a string.

Schema strategy sample is good when all instances of a property in Neo4j are the same type, and string followed by cast is better when property types may differ. Remember that Neo4j does not enforce property typing, and so person.age could sometimes be a long and sometimes be a string.

Example

Using sample strategy
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("authentication.basic.username", "neo4j")
  .option("authentication.basic.password", "neo4j")
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
  .load()
  .show()
Table 2. Result of the above code
id name

0

John Doe

1

Jane Doe

Using string strategy
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
  .option("schema.strategy", "string")
  .load()
  .show()
Table 3. Result of the above code
id name

"0"

"John Doe"

"1"

"Jane Doe"

As you can see, the struct returned by the query is made of strings that you can then be cast Spark’s getters (ie: getLong).

User Defined Schema

You can skip the automatic schema extraction process by providing a user defined schema using the .schema() method.

Using user defined schema
import org.apache.spark.sql.types.{DataTypes, StructType, StructField}
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

spark.read.format("org.neo4j.spark.DataSource")
  .schema(StructType(StructField("id", DataTypes.StringType), StructField("name", DataTypes.StringType)))
  .option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.name as name")
  .load()
  .show()
Table 4. Result of the above code
id name

"0"

"John Doe"

"1"

"Jane Doe"

In this way we have total control over the schema.

Known Problem

Being Neo4j a schema less database, this scenario may occur:

CREATE (p1:Person {age: "32"}), (p2:Person {age: 23})

Where the same field, on the same node label, has two different types.

Spark doesn’t like it since the dataframe requires a schema, meaning each column of the dataframe needs to have its own type.

If you don’t have APOC installed on your Neo4j instance, you’re most likely to be exposed to errors like this:

java.lang.ClassCastException: org.apache.spark.unsafe.types.UTF8String cannot be cast to java.lang.Long

In this case you can either clean up and normalize your data, or install APOC.

APOC will cause every value of attributes affected by this problem to be cast to String.

This solution is not error-proof, you might still get the errors. Behind the scenes the Connector uses apoc.meta.nodeTypeProperties and apoc.meta.relTypeProperties to sample the data.

When the casting operation happens, you will prompted this warning in your log letting you know what happened:

The field "age" has different types: [String, Long]
Every value will be casted to string.

The safest solution is to clean your data, but we understand that is not always possible. This is why we introduced the option schema.strategy, that you can set to string to get all the values converted to string.

Partitioning

While we’re trying to pull off the data we offer the possibility to partition the extraction in order parallelizing it.

Please consider the following job:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "bolt://localhost:7687")
        .option("authentication.basic.username", "neo4j")
        .option("authentication.basic.password", "neo4j")
        .option("labels", "Person")
        .option("partitions", "5")
        .load()

This means that if the total count of the nodes with label Person into Neo4j is 100 we are creating 5 partitions and each one will manage 20 records (we use SKIP / LIMIT queries).

Partitioning the dataset makes sense only if you’re dealing with a big dataset (>= 10M of records).

How we parallelize the query execution

Considering that we have three options

  1. Node extraction

  2. Relationship extraction

  3. Query extraction

We adopt generally provide a general count on what you’re trying to pull of and add build a query with the skip/limit approach over each partition.

So for a dataset of 100 nodes (Person) with a partition size of 5 we’ll generate these queries (one for partition):

MATCH (p:Person) RETURN p SKIP 0 LIMIT 20
MATCH (p:Person) RETURN p SKIP 20 LIMIT 20
MATCH (p:Person) RETURN p SKIP 40 LIMIT 20
MATCH (p:Person) RETURN p SKIP 60 LIMIT 20
MATCH (p:Person) RETURN p SKIP 80 LIMIT 20

While for (1) and (2) we leverage the Neo4j count store in order to retrieve the total count about the nodes/relationships we’re trying pulling off, for the (3) we have two possible approaches:

  • Compute a count over the query that we’re using

  • Compute a count over a second optimized query that leverages indexes, in this case you can pass it via the .option("query.count", "<your cypher query>") the query must always return only one field named count which is the result of the count. ie.:

MATCH (p:Person)-[r:BOUGHT]->(pr:Product)
WHERE pr.name = 'An Awesome Product'
RETURN count(p) AS count

Examples

You can find examples on how to use the Neo4j Connector for Apache Spark at this repository. It’s a collection of Zeppelin Notebooks with different usage scenarios, along with a getting started guide.

The repository is in constant development, and feel free to submit your examples.