Writing to Neo4j

The following section covers the DataSource Writer and how to transfer the Spark dataset content into Neo4j.

Getting started

Let’s look at the following code sample:

import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val ds = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDS()

ds.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

The above code inserts 10 nodes into Neo4j via Spark, and each of them has:

  • Two labels: Person and Customer.

  • Four properties: name, surname, age, and livesIn.

Save mode

To persist data into Neo4j, the Spark Connector supports two save modes that work only if UNIQUE or NODE KEY constraints are defined in Neo4j for the given properties.

The SaveMode examples apply to the Scala class org.apache.spark.sql.SaveMode. For PySpark, use a static string with the name of the SaveMode. So instead of SaveMode.Overwrite, use Overwrite for PySpark.

For Node type:

  • SaveMode.ErrorIfExists builds a CREATE query.

  • SaveMode.Overwrite builds a MERGE query.

For SaveMode.Overwrite mode, you need to have unique constraints on the keys.

For Relationship type:

  • SaveMode.ErrorIfExists builds a CREATE query.

  • SaveMode.Overwrite builds a MERGE query.

If you are using Spark 3, the default save mode ErrorIfExists does not work, use Append instead.
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.

In both cases, the default save mode is ErrorIfExists.

Options

The DataSource Writer has several options to connect and persist data into Neo4j.

Table 1. List of available write options
Setting name Description Default value Required

labels

Colon separated list of the labels to attach to the node

(none)

No

batch.size

The number of the rows sent to Neo4j as batch

5000

No

transaction.codes.fail

Comma-separated list of Neo4j codes that cause the transaction to fail

(none)

No

transaction.retries

Number of retries in case of failure

3

No

transaction.retry.timeout

The time in milliseconds that the connector should wait before retry

0

No

Node specific options

node.keys

Comma-separated list of properties considered as node keys if you are using SaveMode.Overwrite

(none)

No

Relationship specific options

relationship.properties

Map used as keys for specifying the relationship properties. Used only if relationship.save.strategy is keys

(empty)

No

relationship.save.strategy

Save strategy to be used

native

Yes

relationship.source.labels

Colon-separated list of labels that identify the source node

(empty)

Yes

relationship.source.node.keys

Map used as keys for matching the source node

(empty)

No

relationship.source.save.mode

Source Node save mode

Match

No

relationship.source.node.properties

Map used as keys for specifying the source properties. Only used if relationship.save.strategy is keys

(empty)

No

relationship.target.labels

Colon-separated list of labels that identify the target node

(empty)

Yes

relationship.target.node.keys

Map used as keys for matching the target node

(empty)

No

relationship.target.save.mode

Target Node save mode

Match

No

relationship.target.node.properties

Map used as keys for specifying the target properties. Only used if relationship.save.strategy is keys

(empty)

No

Neo4j Connector for Apache Spark provides batch writes to speed up the ingestion process, so if the process at some point fails, all the previous data are already persisted.

Write data

Writing data to a Neo4j database can be done in three ways:

Custom Cypher query

In case you use the option query, the Spark Connector persists the entire Dataset by using the provided query. The nodes are sent to Neo4j in a batch of rows defined in the batch.size property, and your query is wrapped up in an UNWIND $events AS event statement.

Let’s look at the following simple Spark program:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()
df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
  .save()

This generates the following query:

UNWIND $events AS event
CREATE (n:Person {fullName: event.name + event.surname})

Thus events is the batch created from your dataset.

Node

In case you use the option labels, the Spark Connector persists the entire dataset as nodes. Depending on the SaveMode, it is going to CREATE or MERGE nodes (in the last case the node.keys properties are being used).

The nodes are sent to Neo4j in a batch of rows defined in the batch.size property, and an UNWIND operation is performed under the hood.

Let’s remember the first example in this chapter:

ErrorIfExists mode
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val df = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

The above code is converted in a similar Cypher query:

UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties

The following example of how to use the same DataFrame and save it in Overwrite mode:

Overwrite mode
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .option("node.keys", "name,surname")
  .save()

The code above generates the following Cypher query:

UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties

You must specify, which columns of your DataFrame are used as keys to match the nodes. You control this with the option node.keys, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name and the value is the node property name.

If key and value are the same field, you can specify one without the colon. For example, if you have .option("node.keys", "name:name,email:email"), you can also write .option("node.keys", "name,email").

In case the column value is a Map<String, Value> (where Value can be any supported Neo4j Type), the connector automatically tries to flatten it.

Let’s consider the following dataset:

id name lives_in

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

Neo4j Connector for Apache Spark flattens the maps, and each map value is in it’s own property.

id name lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

Relationship

You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.

Overview

Before diving into the actual process, let’s clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let’s spend more time on explaining it.

In theory you should take your dataset and move the columns around to create source and target nodes, eventually creating the specified relationships between them.

This is a basic example of what would happen:

UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

The CREATE keyword for the source and target nodes can be replaced by MERGE or MATCH. To control this you can use the Node save modes.

You can set source and target nodes independently by using relationship.source.save.mode or relationship.target.save.mode.

These options accept a case insensitive string as a value, that can be one of ErrorIfExists, Overwrite, Append; they work in the same same way as the Node save modes.

When using MATCH or MERGE, you need to specify keys that identify the nodes. This is what the options relationship.source.node.keys and relationship.target.node.keys. More on this here.

The CREATE keyword for the relationship can be replaced by a MERGE. You can control this with Save mode.

You are also required to specify one of the two Save Strategies. This identifies which method is to be used to create the Cypher query and can have additional options available.

Save strategies

There are two strategies you can use to write relationships: Native (default strategy) and Keys.

Native strategy

The Native strategy is useful when you have a schema that conforms with the Relationship read schema, and the relationship.nodes.map set to false.

If you want to read relationship from a database, filter data, and write the result to another database, you can refer to the following example:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val originalDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://allprod.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

originalDf
    .where("`target.price` > 2000")
    .write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://expensiveprod.host.com:7687")
    .option("relationship", "SOLD")
    .option("relationship.source.labels", ":Person:Rich")
    .option("relationship.source.save.mode", "ErrorIfExists")
    .option("relationship.target.labels", ":Product:Expensive")
    .option("relationship.target.save.mode", "ErrorIfExists")
    .save()

You just need to specify the source node labels, the target node labels, and the relationship you want between them.

The generated query is the following:

UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

event.source, event.target, and event.rel contain the column described here.

The default save mode for source and target nodes is Match. That means that the relationship can be created only if the nodes are already in your database. Look at here for more information on node save modes.

When using Overwrite or Match node save mode, you should specify which keys should be used to identify the nodes.

Table 2. The DataFrame we are working with
<rel.id> <rel.type> <source.id> <source.labels> source.id source.fullName <target.id> <target.labels> target.name target.id rel.quantity

4

BOUGHT

1

[Person]

1

John Doe

0

[Product]

Product 1

52

240

5

BOUGHT

3

[Person]

2

Jane Doe

2

[Product]

Product 2

53

145

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://first.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://second.host.com:7687")
  .option("relationship", "SOLD")
  .option("relationship.source.labels", ":Person:Rich")
  .option("relationship.source.save.mode", "Overwrite")
  .option("relationship.source.node.keys", "source.fullName:fullName")
  .option("relationship.target.labels", ":Product:Expensive")
  .option("relationship.target.save.mode", "Overwrite")
  .option("relationship.target.node.keys", "target.id:id")
  .save()

You must specify which columns of your DataFrame are being used as keys to match the nodes. You control this with the options relationship.source.node.keys and relationship.target.node.keys, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name, and the value is the node property name.

The generated query is the following:

UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the Save mode.
If the provided DataFrame schema doesn’t conform to the required schema, meaning that none of the required columns is present, the write fails.
Keys strategy

When you want more control over the relationship writing, you can use the Keys strategy.

As in the case of using the Native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.

Specify keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Drums"),
        (19, "John Mayer", "Guitar"),
        (32, "John Scofield", "Guitar"),
        (15, "John Butler", "Guitar")
    ).toDF("experience", "name", "instrument")

musicDf.write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.save.mode", "overwrite")
    .save()

This creates a MERGE query using name property as key for Musician nodes. The value of instrument column is used as a value for Instrument property name, generating a statement like:

MERGE (target:Instrument {name: event.target.instrument}).

Here you must specify which columns of your DataFrame will be written in the source node and in the target node properties. You can do this with the options relationship.source.node.properties and relationship.target.node.properties, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name, and the value is the node property name.

Same applies to relationship.properties option, used to specify which DataFrame columns are written as relationship properties.

If key and value are the same field you can specify one without the colon. For example, if you have .option("relationship.source.node.properties", "name:name,email:email"), you can also write .option("relationship.source.node.properties", "name,email"). Same applies for relationship.source.node.keys and relationship.target.node.keys.
Specify properties and keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Orange", "Drums"),
        (19, "John Mayer", "White", "Guitar"),
        (32, "John Scofield", "Black", "Guitar"),
        (15, "John Butler", "Wooden", "Guitar")
    ).toDF("experience", "name", "instrument_color", "instrument")

musicDf.write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.node.properties", "instrument_color:color")
    .option("relationship.target.save.mode", "overwrite")
    .save()
Node save modes

You can specify four different modes for saving the nodes:

  • Overwrite mode performs a MERGE on that node.

  • ErrorIfExists mode performs a CREATE (not available for Spark 3).

  • Append mode performs a CREATE (not available for Spark 2.4).

  • Match mode performs a MATCH.

For Overwrite mode you must have unique constraints on the keys.

Schema optimization operations

The Spark Connector supports schema optimization operations via:

  • index.

  • constraints.

  • set of schema queries.

To speed up the import itself that is executed before the import process starts.

You can set the optimization via schema.optimization.type option that works only if you are merging nodes and takes three values:

  • INDEX: it creates only indexes on provided nodes.

  • NODE_CONSTRAINTS: it creates only indexes on provided nodes.

Index creation

The following example shows how to create indexes while you’re creating nodes.

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "INDEX")
      .save()

Before the import starts, the following schema query is being created:

CREATE INDEX ON :Person(surname)

Take into consideration that the first label is used for the index creation.

Constraint creation

Below you can see an example of how to create indexes while you’re creating nodes.

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "NODE_CONSTRAINTS")
      .save()

Before the import starts, the code above creates the following schema query:

CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE

Take into consideration that the first label is used for the index creation.

Script option

The script option allows you to execute a series of preparation script before Spark Job execution. The result of the last query can be reused in combination with the query ingestion mode as it follows:

val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS()

ds.write
  .format(classOf[DataSource].getName)
  .mode(SaveMode.ErrorIfExists)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
      |CREATE CONSTRAINT product_name_sku FOR (p:Product)
      | REQUIRE (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()

Before the import starts, the connector runs the content of the script option, and the result of the last query is injected into the query. At the end the full query executed by the connector while the data is being ingested is the following:

WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

scriptResult is the result from the last query contained within the script options that is RETURN 36 AS age;