Writing to Neo4j

The following section will cover the DataSource Writer, and how to transfer the Spark Dataset content into Neo4j.

Getting Started

Given the following Scala Program:

import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val ds = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDS()

ds.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

Will insert 10 nodes into Neo4j via Spark, and each of these will have:

  • 2 labels: Person and Customer

  • 4 properties: name, surname, age and livesIn

Save Mode

In order to persist data into Neo4j the Spark Connector supports two save modes that will work only if UNIQUE or NODE KEY constraints are defined into Neo4j for the given properties.

These SaveMode examples apply to the Scala class org.apache.spark.sql.SaveMode. For PySpark, simply use a static string with the name of the SaveMode. So instead of SaveMode.Overwrite, use "Overwrite" for pyspark.

For Node type

  • SaveMode.ErrorIfExists: this will build a CREATE query

  • SaveMode.Overwrite: this will build a MERGE query

For SaveMode.Overwrite mode you need to have unique constrains on the keys.

For Relationship type

  • SaveMode.ErrorIfExists: this will build a CREATE query

  • SaveMode.Append: this will build a MERGE query

ATTENTION: If you are using Spark 3 the default Save Mode ErrorIfExists won’t work.
For SaveMode.Append mode you need to have unique constrains on the keys.

In both cases the default SaveMode is ErrorIfExists.

Using Save Mode with PySpark

Options

The DataSource Writer has several options in order to connect and persist data into Neo4j.

Table 1. List of available write options
Setting Name Description Default Value Required

labels

Colon separated list of the labels to attach to the node

(none)

No

batch.size

The number of the rows sent to Neo4j as batch

5000

No

transaction.codes.fail

Comma separated list of Neo4j codes that will cause the transaction to fail

(none)

No

transaction.retries

Number of retries in case of failure

3

No

transaction.retry.timeout

The time in ms that the Connector should wait before retry

0

No

Node Specific Options

node.keys

Comma separated list of properties considered as node keys in case of you’re using SaveMode.Overwrite

(none)

No

Relationship Specific Options

relationship.properties

Map used as keys for specify the relationship properties. Only used if relationship.save.strategy is keys

(empty)

No

relationship.save.strategy

Save strategy to be used

native

Yes

relationship.source.labels

Colon separated list of labels that identify the source node

(empty)

Yes

relationship.source.node.keys

Map used as keys for matching the source node

(empty)

No

relationship.source.save.mode

Source node Save Mode

Match

No

relationship.source.node.properties

Map used as keys for specify the source properties. Only used if relationship.save.strategy is keys

(empty)

No

relationship.target.labels

Colon separated list of labels that identify the target node

(empty)

Yes

relationship.target.node.keys

Map used as keys for matching the target node

(empty)

No

relationship.target.save.mode

Target node Save Mode

Match

No

relationship.target.node.properties

Map used as keys for specify the target properties. Only used if relationship.save.strategy is keys

(empty)

No

As the Neo4j Connector for Apache Spark provide batch writes in order to speed-up the ingestion process so if in the process at some point fails all the previous data is already persisted.

Write Data

Writing data to a Neo4j Database can be done in 3 ways:

Custom Cypher Query

In case you use the option query the Spark Connector will persist the entire Dataset by using the provided query. The nodes will be sent to Neo4j in a batch of rows defined in the batch.size property and we will wrap your query in an UNWIND $events AS event statement.

So given the following simple Spark program:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()
df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
  .save()

This will be the generated query:

UNWIND $events AS event
CREATE (n:Person {fullName: event.name + event.surname})

Where events is the batch created from your dataset.

Node

In case you use the option labels the Spark Connector will persist the entire Dataset as nodes. Depending on the SaveMode it will CREATE or MERGE nodes (in the last case using the node.keys properties).

The nodes will be sent to Neo4j in a batch of rows defined in the batch.size property and we will perform an UNWIND operation under the hood.

Let’s take our first example:

ErrorIfExists mode
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val df = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

This will be converted in a similar query:

UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties

If we instead use the same DataFrame but we save it in Overwrite mode:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .option("node.keys", "name,surname")
  .save()

The generated query will be

UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties

Here you must specify which columns of your Dataframe will be used as keys to match the nodes. You control this with the option node.keys, specifying a comma-separated list of key:value pairs, where the key is the dataframe column name, and the value is the node property name.

If key and value are the same field you can just specify one without the colon. For example, say you have .option("node.keys", "name:name,email:email"), you can also write .option("node.keys", "name,email").

In case the column value is a Map<String, Value> (where Value can be any supported Neo4j Type) the Connector will automatically try to flatten it.

Let’s say you have the following Dataset:

id name lives_in

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

Neo4j Connector for Apache Spark will flatten the maps and each map value will be in it’s own property.

id name lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

Relationship

You can write a dataframe to Neo4j by specifying source, target and relation.

Overview

We need to spend a some words on this method since its a bit complex, and the combinations of options are quite a few. So we feel the need to clarify the vocabulary first, before diving into the actual process.

Theory is simple, we take your Dataset and we move the columns around to create source and target nodes, eventually creating the specifid relationship between these two.

This is a basic example of what would happen.

UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

The CREATE keyword for the source and target nodes can be replaced by a MERGE or a MATCH. To control this you can use the node save modes.

You can set source and target independently by using relationship.source.save.mode or `relationship.target.save.mode.

These options accept a case insensitive string as a value, that can be one of ErrorIfExists, Overwrite, Append; they work in the exact same way as the Node Save Modes.

When using MATCH or MERGE you will need to specify keys that identify the nodes. This is what the options relationship.source.node.keys and relationship.target.node.keys. More on this here.

The CREATE keyword for the relationship can be replaced by a MERGE. You can control this with Save Mode.

You are also required to specify one of the two Save Strategies. This will identify which method will be used to create the Cypher query and can have additional options available.

Save Strategies

There are two strategies you can use to write relationships: Native (default strategy) and Keys.

Native Strategy

This strategy is useful when you have a schema that conforms with the Relationship Read Schema, with the relationship.nodes.map set to false.

Let’s say we want to read relationship from a Database, filter them, and write the result to another Database:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val originalDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://allprod.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

originalDf
    .where("`target.price` > 2000")
    .write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://expensiveprod.host.com:7687")
    .option("relationship", "SOLD")
    .option("relationship.source.labels", ":Person:Rich")
    .option("relationship.source.save.mode", "ErrorIfExists")
    .option("relationship.target.labels", ":Product:Expensive")
    .option("relationship.target.save.mode", "ErrorIfExists")
    .save()

You just need to specify the source node labels, the target node labels, and the relationship you want between them.

The generated query will be:

UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

event.source, event.target, and event.rel will contain the column described here.

The default save mode for source and target nodes is Match. This means that the relationship will be created only if the nodes are already in your DB. Look at here for more info about node save modes.

When using Overwrite or Match node save mode, you should specify which keys should be used to identify the nodes.

Table 2. The Dataframe we are working with
<rel.id> <rel.type> <source.id> <source.labels> source.id source.fullName <target.id> <target.labels> target.name target.id rel.quantity

4

BOUGHT

1

[Person]

1

John Doe

0

[Product]

Product 1

52

240

5

BOUGHT

3

[Person]

2

Jane Doe

2

[Product]

Product 2

53

145

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://first.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://second.host.com:7687")
  .option("relationship", "SOLD")
  .option("relationship.source.labels", ":Person:Rich")
  .option("relationship.source.save.mode", "Overwrite")
  .option("relationship.source.node.keys", "source.fullName:fullName")
  .option("relationship.target.labels", ":Product:Expensive")
  .option("relationship.target.save.mode", "Overwrite")
  .option("relationship.target.node.keys", "target.id:id")
  .save()

Here you must specify which columns of your Dataframe will be used as keys to match the nodes. You control this with the option relationship.source.node.keys and relationship.target.node.keys, specifying a comma-separated list of key:value pairs, where the key is the dataframe column name, and the value is the node property name.

The generated query will be:

UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the save mode.
If the provided dataframe schema doesn’t conform the required schema, meaning that none of the required column is present, the write will fail.

Keys Strategy

When you want more control on the relationship writing you can use the KEYS strategy.

As the native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.

Specify keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Drums"),
        (19, "John Mayer", "Guitar"),
        (32, "John Scofield", "Guitar"),
        (15, "John Butler", "Guitar")
    ).toDF("experience", "name", "instrument")

musicDf.write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.save.mode", "overwrite")
    .save()

This will create a MERGE query using name property as key for Musician nodes. The value of instrument column will be used as value for Instrument property name, generating a statement like: MERGE (target:Instrument {name: event.target.instrument})

Here you must specify which columns of your Dataframe will be written in the source node and in the target node properties. You can do this with the option relationship.source.node.properties and relationship.target.node.properties, specifying a comma-separated list of key:value pairs, where the key is the dataframe column name, and the value is the node property name.

Same applies to relationship.properties option, used to specify which dataframe columns will be written as relationship properties.

If key and value are the same field you can just specify one without the colon. For example, say you have .option("relationship.source.node.properties", "name:name,email:email"), you can also write .option("relationship.source.node.properties", "name,email"). Same applies for relationship.source.node.keys and relationship.target.node.keys.
Specify properties and keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Orange", "Drums"),
        (19, "John Mayer", "White", "Guitar"),
        (32, "John Scofield", "Black", "Guitar"),
        (15, "John Butler", "Wooden", "Guitar")
    ).toDF("experience", "name", "instrument_color", "instrument")

musicDf.write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.node.properties", "instrument_color:color")
    .option("relationship.target.save.mode", "overwrite")
    .save()
Node Save Modes

You can specify 3 different modes to use for saving the nodes:

  • Overwrite: will perform a MERGE on that node

  • ErrorIfExists: will perform a CREATE (not available for Spark 3)

  • Append: will perform a CREATE (not available for Spark 2.4)

  • Match: will perform a MATCH

For Overwrite mode you must to have unique constrains on the keys.

Schema Optimization Operations

The spark connector supports schema optimization operations via:

  • index

  • constraints

  • set of schema queries

that will be executed before the import process will start in order to speed-up the import itself.

You can set the optimization via schema.optimization.type option that takes three values:

  • INDEX: it creates only indexes on provided nodes

  • NODE_CONSTRAINTS: it creates only indexes on provided nodes

  • QUERY: it perform a series of schema queries separated by ;

and it works only when you’re merging nodes.

Index Creation

Following an example of how to create indexes while you’re creating nodes

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "INDEX")
      .save()

This will create, before the import starts, the following schema query:

CREATE INDEX ON :Person(surname)

So please into consideration that the first label is used for the index creation

Constraint Creation

Following an example of how to create indexes while you’re creating nodes

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "NODE_CONSTRAINTS")
      .save()

This will create, before the import starts, the following schema query:

CREATE CONSTRAINT ON (p:Person) ASSERT (p.surname) IS UNIQUE

So please into consideration that the first label is used for the index creation

Script Option

The script option allow you to execute a series of preparation script before Spark Job execution, the result of the last query can be reused in combination with the query ingestion mode as it follows

val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS()

ds.write
  .format(classOf[DataSource].getName)
  .mode(SaveMode.ErrorIfExists)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX ON :Person(surname);
      |CREATE CONSTRAINT ON (p:Product)
      | ASSERT (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()

Before the import starts, the connector will run the content of the script option and the result of the last query will be injected into the query; in the end the full query executed by the connector while is ingesting the data will be

WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

where scriptResult is the result from the last query contained into the script options that is RETURN 36 AS age;