Writing to Neo4j

The following section covers the DataSource Writer and how to transfer the Spark dataset content into Neo4j.

Getting started

Let’s look at the following code sample:

import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val ds = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDS()

ds.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

The above code inserts 10 nodes into Neo4j via Spark, and each of them has:

  • Two labels: Person and Customer.

  • Four properties: name, surname, age, and livesIn.

Save mode

To persist data into Neo4j, the Spark Connector supports two save modes that work only if UNIQUE or NODE KEY constraints are defined in Neo4j for the given properties.

The SaveMode examples apply to the Scala class org.apache.spark.sql.SaveMode. For PySpark, use a static string with the name of the SaveMode. So instead of SaveMode.Overwrite, use Overwrite for PySpark.

For Node type:

  • SaveMode.ErrorIfExists builds a CREATE query.

  • SaveMode.Overwrite builds a MERGE query.

For SaveMode.Overwrite mode, you need to have unique constraints on the keys.

For Relationship type:

  • SaveMode.ErrorIfExists builds a CREATE query.

  • SaveMode.Overwrite builds a MERGE query.

If you are using Spark 3, the default save mode ErrorIfExists does not work, use Append instead.
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.

In both cases, the default save mode is ErrorIfExists.

Options

The DataSource Writer has several options to connect and persist data into Neo4j.

Table 1. List of available write options
Setting name Description Default value Required

labels

Colon separated list of the labels to attach to the node

(none)

No

batch.size

The number of the rows sent to Neo4j as batch

5000

No

transaction.codes.fail

Comma-separated list of Neo4j codes that cause the transaction to fail

(none)

No

transaction.retries

Number of retries in case of failure

3

No

transaction.retry.timeout

The time in milliseconds that the connector should wait before retry

0

No

Node specific options

node.keys

Comma-separated list of properties considered as node keys if you are using SaveMode.Overwrite

(none)

No

Relationship specific options

relationship.properties

Map used as keys for specifying the relationship properties. Used only if relationship.save.strategy is keys

(empty)

No

relationship.save.strategy

Save strategy to be used

native

Yes

relationship.source.labels

Colon-separated list of labels that identify the source node

(empty)

Yes

relationship.source.node.keys

Map used as keys for matching the source node

(empty)

No

relationship.source.save.mode

Source Node save mode

Match

No

relationship.source.node.properties

Map used as keys for specifying the source properties. Only used if relationship.save.strategy is keys

(empty)

No

relationship.target.labels

Colon-separated list of labels that identify the target node

(empty)

Yes

relationship.target.node.keys

Map used as keys for matching the target node

(empty)

No

relationship.target.save.mode

Target Node save mode

Match

No

relationship.target.node.properties

Map used as keys for specifying the target properties. Only used if relationship.save.strategy is keys

(empty)

No

Neo4j Connector for Apache Spark provides batch writes to speed up the ingestion process, so if the process at some point fails, all the previous data are already persisted.

Write data

Writing data to a Neo4j database can be done in three ways:

Custom Cypher query

In case you use the option query, the Spark Connector persists the entire Dataset by using the provided query. The nodes are sent to Neo4j in a batch of rows defined in the batch.size property, and your query is wrapped up in an UNWIND $events AS event statement. The query option supports both CREATE and MERGE clauses.

Let’s look at the following simple Spark program:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val df = Seq(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://localhost:7687")
  .option("authentication.basic.username", USERNAME)
  .option("authentication.basic.password", PASSWORD)
  .option("query", query)
  .mode(SaveMode.Overwrite)
  .save()

This generates the following query:

UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})

Thus events is the batch created from your dataset.

Considerations

  • You must always specify the Save mode.

  • You can use the events list in WITH statements as well. For example, you can replace the query in the previous example with the following:

    WITH event.name + ' ' + toUpper(event.surname) AS fullName
    CREATE (n:Person {fullName: fullName})
  • Subqueries that reference the events list in CALLs are supported:

    CALL {
      WITH event
      RETURN event.name + ' ' + toUpper(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • If APOC is installed, APOC procedures and functions can be used:

    CALL {
      WITH event
      RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • Although a RETURN clause is not forbidden, adding one does not have any effect on the query result.

Node

In case you use the option labels, the Spark Connector persists the entire dataset as nodes. Depending on the SaveMode, it is going to CREATE or MERGE nodes (in the last case the node.keys properties are being used).

The nodes are sent to Neo4j in a batch of rows defined in the batch.size property, and an UNWIND operation is performed under the hood.

Let’s remember the first example in this chapter:

ErrorIfExists mode
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

case class Point3d(`type`: String = "point-3d",
                   srid: Int,
                   x: Double,
                   y: Double,
                   z: Double)

case class Person(name: String, surname: String, age: Int, livesIn: Point3d)

val total = 10
val rand = Random
val df = (1 to total)
  .map(i => {
    Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
    Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
  }).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

The above code is converted in a similar Cypher query:

UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties

The following example of how to use the same DataFrame and save it in Overwrite mode:

Overwrite mode
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val df = (1 to 10)/*...*/.toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Overwrite)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .option("node.keys", "name,surname")
  .save()

The code above generates the following Cypher query:

UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties

You must specify, which columns of your DataFrame are used as keys to match the nodes. You control this with the option node.keys, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name and the value is the node property name.

If key and value are the same field, you can specify one without the colon. For example, if you have .option("node.keys", "name:name,email:email"), you can also write .option("node.keys", "name,email").

In case the column value is a Map<String, Value> (where Value can be any supported Neo4j Type), the connector automatically tries to flatten it.

Let’s consider the following dataset:

id name lives_in

1

Andrea Santurbano

{address: 'Times Square, 1', city: 'NY', state: 'NY'}

2

Davide Fantuzzi

{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'}

Neo4j Connector for Apache Spark flattens the maps, and each map value is in it’s own property.

id name lives_in.address lives_in.city lives_in.state

1

Andrea Santurbano

Times Square, 1

NY

NY

2

Davide Fantuzzi

Statue of Liberty, 10

NY

NY

Relationship

You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.

To avoid deadlocks, always use a single partition (for example with coalesce(1)) before writing relationships to Neo4j.

Overview

Before diving into the actual process, let’s clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let’s spend more time on explaining it.

In theory you should take your dataset and move the columns around to create source and target nodes, eventually creating the specified relationships between them.

This is a basic example of what would happen:

UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

The CREATE keyword for the source and target nodes can be replaced by MERGE or MATCH. To control this you can use the Node save modes.

You can set source and target nodes independently by using relationship.source.save.mode or relationship.target.save.mode.

These options accept a case insensitive string as a value, that can be one of ErrorIfExists, Overwrite, Append; they work in the same same way as the Node save modes.

When using MATCH or MERGE, you need to specify keys that identify the nodes. This is what the options relationship.source.node.keys and relationship.target.node.keys. More on this here.

The CREATE keyword for the relationship can be replaced by a MERGE. You can control this with Save mode.

You are also required to specify one of the two Save Strategies. This identifies which method is to be used to create the Cypher query and can have additional options available.

Save strategies

There are two strategies you can use to write relationships: Native (default strategy) and Keys.

Native strategy

The Native strategy is useful when you have a schema that conforms with the Relationship read schema, and the relationship.nodes.map set to false.

If you want to read relationship from a database, filter data, and write the result to another database, you can refer to the following example:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val originalDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://allprod.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

originalDf
    .where("`target.price` > 2000")
    .write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://expensiveprod.host.com:7687")
    .option("relationship", "SOLD")
    .option("relationship.source.labels", ":Person:Rich")
    .option("relationship.source.save.mode", "ErrorIfExists")
    .option("relationship.target.labels", ":Product:Expensive")
    .option("relationship.target.save.mode", "ErrorIfExists")
    .save()

You just need to specify the source node labels, the target node labels, and the relationship you want between them.

The generated query is the following:

UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel

event.source, event.target, and event.rel contain the column described here.

The default save mode for source and target nodes is Match. That means that the relationship can be created only if the nodes are already in your database. Look at here for more information on node save modes.

When using Overwrite or Match node save mode, you should specify which keys should be used to identify the nodes.

Table 2. The DataFrame we are working with
<rel.id> <rel.type> <source.id> <source.labels> source.id source.fullName <target.id> <target.labels> target.name target.id rel.quantity

4

BOUGHT

1

[Person]

1

John Doe

0

[Product]

Product 1

52

240

5

BOUGHT

3

[Person]

2

Jane Doe

2

[Product]

Product 2

53

145

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://first.host.com:7687")
  .option("relationship", "BOUGHT")
  .option("relationship.nodes.map", "false")
  .option("relationship.source.labels", "Person")
  .option("relationship.target.labels", "Product")
  .load()

df.write
  .format("org.neo4j.spark.DataSource")
  .option("url", "bolt://second.host.com:7687")
  .option("relationship", "SOLD")
  .option("relationship.source.labels", ":Person:Rich")
  .option("relationship.source.save.mode", "Overwrite")
  .option("relationship.source.node.keys", "source.fullName:fullName")
  .option("relationship.target.labels", ":Product:Expensive")
  .option("relationship.target.save.mode", "Overwrite")
  .option("relationship.target.node.keys", "target.id:id")
  .save()

You must specify which columns of your DataFrame are being used as keys to match the nodes. You control this with the options relationship.source.node.keys and relationship.target.node.keys, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name, and the value is the node property name.

The generated query is the following:

UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the Save mode.
If the provided DataFrame schema doesn’t conform to the required schema, meaning that none of the required columns is present, the write fails.
Keys strategy

When you want more control over the relationship writing, you can use the Keys strategy.

As in the case of using the Native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.

Specify keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Drums"),
        (19, "John Mayer", "Guitar"),
        (32, "John Scofield", "Guitar"),
        (15, "John Butler", "Guitar")
    ).toDF("experience", "name", "instrument")

musicDf.coalesce(1)
    .write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.save.mode", "overwrite")
    .save()

This creates a MERGE query using name property as key for Musician nodes. The value of instrument column is used as a value for Instrument property name, generating a statement like:

MERGE (target:Instrument {name: event.target.instrument}).

Here you must specify which columns of your DataFrame will be written in the source node and in the target node properties. You can do this with the options relationship.source.node.properties and relationship.target.node.properties, specifying a comma-separated list of key:value pairs, where the key is the DataFrame column name, and the value is the node property name.

Same applies to relationship.properties option, used to specify which DataFrame columns are written as relationship properties.

If key and value are the same field you can specify one without the colon. For example, if you have .option("relationship.source.node.properties", "name:name,email:email"), you can also write .option("relationship.source.node.properties", "name,email"). Same applies for relationship.source.node.keys and relationship.target.node.keys.
Specify properties and keys
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()
import spark.implicits._

val musicDf = Seq(
        (12, "John Bonham", "Orange", "Drums"),
        (19, "John Mayer", "White", "Guitar"),
        (32, "John Scofield", "Black", "Guitar"),
        (15, "John Butler", "Wooden", "Guitar")
    ).toDF("experience", "name", "instrument_color", "instrument")

musicDf.coalesce(1)
    .write
    .format("org.neo4j.spark.DataSource")
    .option("url", "bolt://localhost:7687")
    .option("relationship", "PLAYS")
    .option("relationship.save.strategy", "keys")
    .option("relationship.source.labels", ":Musician")
    .option("relationship.source.save.mode", "overwrite")
    .option("relationship.source.node.keys", "name:name")
    .option("relationship.target.labels", ":Instrument")
    .option("relationship.target.node.keys", "instrument:name")
    .option("relationship.target.node.properties", "instrument_color:color")
    .option("relationship.target.save.mode", "overwrite")
    .save()
Node save modes

You can specify four different modes for saving the nodes:

  • Overwrite mode performs a MERGE on that node.

  • ErrorIfExists mode performs a CREATE (not available for Spark 3).

  • Append mode performs a CREATE (not available for Spark 2.4).

  • Match mode performs a MATCH.

For Overwrite mode you must have unique constraints on the keys.

Schema optimization operations

The Spark Connector supports schema optimization operations via:

  • indexes (deprecated);

  • constraints;

  • property type enforcement;

  • set of schema queries.

To speed up the import itself that is executed before the import process starts.

Starting from version 5.3.0, the schema.optimization.type option is deprecated in favor of schema.optimization.node.keys, schema.optimization.relationship.keys, and schema.type.constraint.

schema.optimization.type (Deprecated)

You can set the optimization via schema.optimization.type option that works only if you are merging nodes and takes three values:

  • INDEX: it creates only indexes on provided nodes.

  • NODE_CONSTRAINTS: it creates only indexes on provided nodes.

The schema.optimization.type option cannot be used with the query option. If you are using a custom Cypher query, you need to create indexes and constraints manually using the script option.
Index creation

The following example shows how to create indexes while you’re creating nodes.

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "INDEX")
      .save()

Before the import starts, the following schema query is being created:

CREATE INDEX ON :Person(surname)

The name of the created index is spark_INDEX_<LABEL>_<NODE_KEYS>, where <LABEL> is the first label from the labels option and <NODE_KEYS> is a dash-separated sequence of one or more properties as specified in the node.keys options. In this example, the name of the created index is spark_INDEX_Person_surname. If the node.keys option were set to "name,surname" instead, the index name would be spark_INDEX_Person_name-surname.

The index is not recreated if it is already present.

Constraint creation

Below you can see an example of how to create constraints while you’re creating nodes.

ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.type", "NODE_CONSTRAINTS")
      .save()

Before the import starts, the code above creates the following schema query:

CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE

The name of the created constraint is spark_NODE_CONSTRAINTS_<LABEL>_<NODE_KEYS>, where <LABEL> is the first label from the labels option and <NODE_KEYS> is a dash-separated sequence of one or more properties as specified in the node.keys options. In this example, the name of the created constraint is spark_NODE_CONSTRAINTS_Person_surname. If the node.keys option were set to "name,surname" instead, the constraint name would be spark_NODE_CONSTRAINTS_Person_name-surname. Take into consideration that the first label is used for the index creation.

With constraints

The connector allows to enforce the following constraints:

Table 3. Allowed configuration
config values default description

schema.optimization.node.keys

UNIQUE/KEY/NONE

NONE

Create the UNIQUE or NODE KEY constraint for the properties defined in the option node.keys

schema.optimization.relationship.keys

UNIQUE/KEY/NONE

false

Create the UNIQUE or NODE KEY constraint for the properties defined in the option relationship.keys

schema.optimization

TYPE/EXISTS/NONE

NONE

A comma separated list of values. Creates the type constraints for nodes/relationships enforcing the type and non-nullability from the DataFrame schema

Please consider that if you define more than one label we use the first for creating constraints.
Node constraints

Enforcing unique constraint

For a detailed description of how Neo4j handles unique constraints on nodes see the Cypher documentation

Given the following example:

    ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.node.keys", "UNIQUE")
      .save()

Under the hood the Spark connector will create the following constraint:

CREATE CONSTRAINT `spark_NODE_UNIQUE-CONSTRAINT_Person_surname IF NOT EXISTS FOR (e:Person) REQUIRE (e.surname) IS UNIQUE`

Enforcing node key constraint

For a detailed description of how Neo4j handles node key constraints see the Cypher documentation

Given the following example:

    ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization.node.keys", "KEY")
      .save()

Under the hood the Spark connector will create the following constraint:

CREATE CONSTRAINT `spark_NODE_KEY-CONSTRAINT_Person_surname IF NOT EXISTS FOR (e:Person) REQUIRE (e.surname) IS NODE KEY`

Relationship constraints

Enforcing unique constraint

For a detailed description of how Neo4j handles unique constraints on relationships see the official Cypher documentation

Given the following example:

    ds
      .write
      .mode(SaveMode.Overwrite)
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("relationship", "MY_REL")
      .option("relationship.save.strategy", "keys")
      .option("relationship.source.labels", ":NodeA")
      .option("relationship.source.save.mode", "Overwrite")
      .option("relationship.source.node.keys", "idSource:id")
      .option("relationship.target.labels", ":NodeB")
      .option("relationship.target.node.keys", "idTarget:id")
      .option("relationship.target.save.mode", "Overwrite")
      .option("schema.optimization.relationship.keys", "UNIQUE")
      .option("relationship.keys", "foo,bar")
      .save()

Under the hood the Spark connector will create the following constraint:

CREATE CONSTRAINT `spark_RELATIONSHIP_UNIQUE-CONSTRAINT_MY_REL_foo-bar IF NOT EXISTS FOR ()-[e:MY_REL]→() REQUIRE (e.foo, e.bar) IS UNIQUE`

Enforcing relationship key constraint

For a detailed description of how Neo4j handles relationship key constraint see the official Cypher documentation

Given the following example:

    ds
      .write
      .mode(SaveMode.Overwrite)
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("relationship", "MY_REL")
      .option("relationship.save.strategy", "keys")
      .option("relationship.source.labels", ":NodeA")
      .option("relationship.source.save.mode", "Overwrite")
      .option("relationship.source.node.keys", "idSource:id")
      .option("relationship.target.labels", ":NodeB")
      .option("relationship.target.node.keys", "idTarget:id")
      .option("relationship.target.save.mode", "Overwrite")
      .option("schema.optimization.relationship.keys", "KEY")
      .option("relationship.keys", "foo,bar")
      .save()

Under the hood the Spark connector will create the following constraint:

CREATE CONSTRAINT `spark_RELATIONSHIP_KEY-CONSTRAINT_MY_REL_foo-bar IF NOT EXISTS FOR ()-[e:MY_REL]→() REQUIRE (e.foo, e.bar) IS RELATIONSHIP KEY`

Property type constraints

Since Neo4j 5.11 the database allows to create type constraints for node and relationship properties. In order to leverage this feature we added the option schema.optimization that will use the DataFrame schema in order to enforce the type. Internally the connector will use the following mapping:

Table 4. Spark to Cypher constraint type mapping

Spark type

Neo4j Type

BooleanType

BOOLEAN

StringType

STRING

IntegerType

INTEGER

LongType

INTEGER

FloatType

FLOAT

DoubleType

FLOAT

DateType

DATE

TimestampType

LOCAL DATETIME

Custom pointType as: Struct { type: string, srid: integer, x: double, y: double, z: double }

POINT

Custom durationType as: Struct { type: string, months: long, days: long, seconds: long, nanonseconds: integer, value: string }

DURATION

DataTypes.createArrayType(BooleanType, false)

LIST<BOOLEAN NOT NULL>

DataTypes.createArrayType(StringType, false)

LIST<STRING NOT NULL>

DataTypes.createArrayType(IntegerType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(LongType, false)

LIST<INTEGER NOT NULL>

DataTypes.createArrayType(FloatType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DoubleType, false)

LIST<FLOAT NOT NULL>

DataTypes.createArrayType(DateType, false)

LIST<DATE NOT NULL>

DataTypes.createArrayType(TimestampType, false)

LIST<LOCAL DATETIME NOT NULL>

DataTypes.createArrayType(pointType, false)

LIST<POINT NOT NULL>

DataTypes.createArrayType(durationType, false)

LIST<DURATION NOT NULL>

For the arrays in particular we use the version without null elements as Neo4j does not allow to have them in arrays.

You can leverage this kind of schema enforcement with the value TYPE.

Property existence constraints

Neo4j defines "property existence" as a synonym for NOT NULL condition. You can leverage this kind of schema enforcement with the value EXISTS, the connector will use the nullability of the DataFrame column to choose whether to apply or not the NOT NULL condition.

Node Property type and existence constraints

Given the following example:

    ds.write
      .format(classOf[DataSource].getName)
      .mode(SaveMode.Overwrite)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", ":Person:Customer")
      .option("node.keys", "surname")
      .option("schema.optimization", "TYPE,EXISTS")
      .save()

The connector will create, for each dataframe column a type constraint for the label Person according with the mapping table provided above.

The constraint query looks like the following:

CREATE CONSTRAINT `spark_NODE-TYPE-CONSTRAINT-Person-surname` IF NOT EXISTS FOR (e:Person) REQUIRE e.surname IS :: STRING

If the DataFrame schema says that the field is also NOT NULL the connector creates an existence constraint as it follows:

CREATE CONSTRAINT `spark_NODE-NOT_NULL-CONSTRAINT-Person-surname` IF NOT EXISTS FOR (e:Person) REQUIRE e.surname IS NOT NULL
Relationship Property type and existence constraints

Given the following example:

    ds.write
      .mode(SaveMode.Overwrite)
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("relationship", "MY_REL")
      .option("relationship.save.strategy", "keys")
      .option("relationship.source.labels", ":NodeA")
      .option("relationship.source.save.mode", "Overwrite")
      .option("relationship.source.node.keys", "idSource:id")
      .option("relationship.target.labels", ":NodeB")
      .option("relationship.target.node.keys", "idTarget:id")
      .option("relationship.target.save.mode", "Overwrite")
      .option("schema.optimization", "TYPE,EXISTS")
      .save()

The connector will create:

  • a type constraint for node NodeA and property id

  • a type constraint for node NodeB and property id

  • all the remaining properties are used as relationship properties; for each property a type constraint is created for the relationship MY_REL by using the following query:

CREATE CONSTRAINT `spark_RELATIONSHIP-TYPE-CONSTRAINT-MY_REL-foo` IF NOT EXISTS FOR ()-[e:MY_REL]->() REQUIRE e.foo IS :: STRING

If the DataFrame schema says that the field is also NOT NULL the connector creates an existence constraint as it follows:

CREATE CONSTRAINT `spark_RELATIONSHIP-NOT_NULL-CONSTRAINT-MY_REL-foo` IF NOT EXISTS FOR ()-[e:MY_REL]->() REQUIRE e.foo IS NOT NULL

The constraint is not recreated if it is already present.

Script option

The script option allows you to execute a series of preparation script before Spark Job execution. The result of the last query can be reused in combination with the query ingestion mode as it follows:

val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS()

ds.write
  .format(classOf[DataSource].getName)
  .mode(SaveMode.ErrorIfExists)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
      |CREATE CONSTRAINT product_name_sku FOR (p:Product)
      | REQUIRE (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()

Before the import starts, the connector runs the content of the script option, and the result of the last query is injected into the query. At the end the full query executed by the connector while the data is being ingested is the following:

WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

scriptResult is the result from the last query contained within the script options that is RETURN 36 AS age;

Performance considerations

Since writing is typically an expensive operation, make sure you write only the columns you need from the DataFrame. For example, if the columns from the data source are name, surname, age, and livesIn, but you only need name and surname, you can do the following:

ds.select(ds("name"), ds("surname"))
  .write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://localhost:7687")
  .option("labels", ":Person:Customer")
  .save()

Note about columns with Map type

When a Dataframe column is a map, what we do internally is to flatten the map as Neo4j does not support this type for graph entity properties; so for a Spark job like this:

val data = Seq(
  ("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
  ("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")

data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

In Neo4j for the nodes with label MyNodeWithFlattenedMap you’ll find this information stored:

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.inner.key`: 'innerValue'
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.inner.key`: 'innerValue1'
}

Now you could fall into problematic situations like the following one:

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .save()

since the resulting flattened keys are duplicated, the Neo4j Spark will pick one of the associated value in a non-deterministic way.

Because the information that we’ll store into Neo4j will be this (consider that the order is not guaranteed):

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed
}

Group duplicated keys to array of values

You can use the option schema.map.group.duplicate.keys to avoid this problem. The connector will group all the values with the same keys into an array. The default value for the option is false. In a scenario like this:

val data = Seq(
  ("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
  ("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
  .mode(SaveMode.Append)
  .format(classOf[DataSource].getName)
  .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
  .option("labels", ":MyNodeWithFlattenedMap")
  .option("schema.map.group.duplicate.keys", true)
  .save()

the output would be:

MyNodeWithFlattenedMap {
    id: 'Foo',
    time: 1,
    `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed
}
MyNodeWithFlattenedMap {
    id: 'Bar',
    time: 1,
    `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed
}