Writing to Neo4j
The following section covers the DataSource Writer and how to transfer the Spark dataset content into Neo4j.
Getting started
Let’s look at the following code sample:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val ds = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDS()
ds.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
The above code inserts 10 nodes into Neo4j via Spark, and each of them has:
-
Two labels:
Person
andCustomer
. -
Four properties:
name
,surname
,age
, andlivesIn
.
Save mode
To persist data into Neo4j, the Spark Connector supports two save modes that
work only if UNIQUE
or NODE KEY
constraints are defined in Neo4j for the given properties.
The SaveMode examples apply to the Scala class |
For Node type:
-
SaveMode.ErrorIfExists
builds aCREATE
query. -
SaveMode.Overwrite
builds aMERGE
query.
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.
|
For Relationship type:
-
SaveMode.ErrorIfExists
builds aCREATE
query. -
SaveMode.Overwrite
builds aMERGE
query.
If you are using Spark 3, the default save mode ErrorIfExists does not work, use Append instead. |
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.
|
In both cases, the default save mode is ErrorIfExists
.
Options
The DataSource Writer has several options to connect and persist data into Neo4j.
Setting name | Description | Default value | Required |
---|---|---|---|
|
Colon separated list of the labels to attach to the node |
(none) |
No |
|
The number of the rows sent to Neo4j as batch |
5000 |
No |
|
Comma-separated list of Neo4j codes that cause the transaction to fail |
(none) |
No |
|
Number of retries in case of failure |
3 |
No |
|
The time in milliseconds that the connector should wait before retry |
0 |
No |
Node specific options |
|||
|
Comma-separated list of properties considered as node keys if you are using
|
(none) |
No |
Relationship specific options |
|||
|
Map used as keys for specifying the relationship properties. Used only if |
(empty) |
No |
|
Save strategy to be used |
|
Yes |
|
Colon-separated list of labels that identify the source node |
(empty) |
Yes |
|
Map used as keys for matching the source node |
(empty) |
No |
|
Source Node save mode |
|
No |
|
Map used as keys for specifying the source properties. Only used if |
(empty) |
No |
|
Colon-separated list of labels that identify the target node |
(empty) |
Yes |
|
Map used as keys for matching the target node |
(empty) |
No |
|
Target Node save mode |
|
No |
|
Map used as keys for specifying the target properties. Only used if |
(empty) |
No |
Neo4j Connector for Apache Spark provides batch writes to speed up the ingestion process, so if the process at some point fails, all the previous data are already persisted. |
Write data
Writing data to a Neo4j database can be done in three ways:
Custom Cypher query
In case you use the option query
, the Spark Connector persists the entire Dataset by using the provided query.
The nodes are sent to Neo4j in a batch of rows defined in the batch.size
property, and your query is wrapped up in an UNWIND $events AS event
statement.
The query
option supports both CREATE
and MERGE
clauses.
Let’s look at the following simple Spark program:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Person(name: String, surname: String, age: Int)
// Create an example DataFrame
val df = Seq(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("authentication.basic.username", USERNAME)
.option("authentication.basic.password", PASSWORD)
.option("query", query)
.mode(SaveMode.Overwrite)
.save()
This generates the following query:
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
Thus events
is the batch created from your dataset.
Considerations
-
You must always specify the Save mode.
-
You can use the
events
list inWITH
statements as well. For example, you can replace the query in the previous example with the following:WITH event.name + ' ' + toUpper(event.surname) AS fullName CREATE (n:Person {fullName: fullName})
-
Subqueries that reference the
events
list inCALL
s are supported:CALL { WITH event RETURN event.name + ' ' + toUpper(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
If APOC is installed, APOC procedures and functions can be used:
CALL { WITH event RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
Although a
RETURN
clause is not forbidden, adding one does not have any effect on the query result.
Node
In case you use the option labels
, the Spark Connector persists the entire dataset as nodes.
Depending on the SaveMode, it is going to CREATE
or MERGE
nodes (in the last case the node.keys
properties are being used).
The nodes are sent to Neo4j in a batch of rows defined in the batch.size
property, and an UNWIND
operation is performed under the hood.
Let’s remember the first example in this chapter:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val df = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
The above code is converted in a similar Cypher query:
UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties
The following example of how to use the same DataFrame and save it in Overwrite
mode:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (1 to 10)/*...*/.toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.option("node.keys", "name,surname")
.save()
The code above generates the following Cypher query:
UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties
You must specify, which columns of your DataFrame are used as keys to match the nodes.
You control this with the option node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the DataFrame column name and the value is the node property name.
If key and value are the same field, you can specify one without the colon.
For example, if you have .option("node.keys", "name:name,email:email") , you can also write
.option("node.keys", "name,email") .
|
In case the column value is a Map<String, Value
> (where Value
can be any supported
Neo4j Type), the connector
automatically tries to flatten it.
Let’s consider the following dataset:
id | name | lives_in |
---|---|---|
1 |
Andrea Santurbano |
{address: 'Times Square, 1', city: 'NY', state: 'NY'} |
2 |
Davide Fantuzzi |
{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'} |
Neo4j Connector for Apache Spark flattens the maps, and each map value is in it’s own property.
id | name | lives_in.address | lives_in.city | lives_in.state |
---|---|---|---|---|
1 |
Andrea Santurbano |
Times Square, 1 |
NY |
NY |
2 |
Davide Fantuzzi |
Statue of Liberty, 10 |
NY |
NY |
Relationship
You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.
To avoid deadlocks, always use a single partition (for example with |
Overview
Before diving into the actual process, let’s clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let’s spend more time on explaining it.
In theory you should take your dataset and move the columns around to create source and target nodes, eventually creating the specified relationships between them.
This is a basic example of what would happen:
UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
The CREATE
keyword for the source and target nodes can be replaced by MERGE
or MATCH
.
To control this you can use the Node save modes.
You can set source and target nodes independently by using relationship.source.save.mode
or relationship.target.save.mode
.
These options accept a case insensitive string as a value, that can be one of ErrorIfExists
, Overwrite
, Append
;
they work in the same same way as the Node save modes.
When using MATCH
or MERGE
, you need to specify keys that identify the nodes.
This is what the options relationship.source.node.keys
and relationship.target.node.keys
.
More on this here.
The CREATE
keyword for the relationship can be replaced by a MERGE
.
You can control this with Save mode.
You are also required to specify one of the two Save Strategies. This identifies which method is to be used to create the Cypher query and can have additional options available.
Save strategies
Native strategy
The Native strategy is useful when you have a schema that conforms with the Relationship read schema, and the relationship.nodes.map
set to false.
If you want to read relationship from a database, filter data, and write the result to another database, you can refer to the following example:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val originalDf = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://allprod.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
originalDf
.where("`target.price` > 2000")
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://expensiveprod.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "ErrorIfExists")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "ErrorIfExists")
.save()
You just need to specify the source node labels, the target node labels, and the relationship you want between them.
The generated query is the following:
UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
event.source
, event.target
, and event.rel
contain the column described here.
The default save mode for source and target nodes is Match .
That means that the relationship can be created only if the nodes are already in your database.
Look at here for more information on node save modes.
|
When using Overwrite
or Match
node save mode, you should specify which keys should be used to identify the nodes.
<rel.id> | <rel.type> | <source.id> | <source.labels> | source.id | source.fullName | <target.id> | <target.labels> | target.name | target.id | rel.quantity |
---|---|---|---|---|---|---|---|---|---|---|
4 |
BOUGHT |
1 |
[Person] |
1 |
John Doe |
0 |
[Product] |
Product 1 |
52 |
240 |
5 |
BOUGHT |
3 |
[Person] |
2 |
Jane Doe |
2 |
[Product] |
Product 2 |
53 |
145 |
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://first.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://second.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "source.fullName:fullName")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.node.keys", "target.id:id")
.save()
You must specify which columns of your DataFrame are being used as keys to match the nodes.
You control this with the options relationship.source.node.keys
and relationship.target.node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the DataFrame column name, and the value is the node property name.
The generated query is the following:
UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the Save mode.
|
If the provided DataFrame schema doesn’t conform to the required schema, meaning that none of the required columns is present, the write fails. |
Keys strategy
When you want more control over the relationship writing, you can use the Keys strategy.
As in the case of using the Native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Drums"),
(19, "John Mayer", "Guitar"),
(32, "John Scofield", "Guitar"),
(15, "John Butler", "Guitar")
).toDF("experience", "name", "instrument")
musicDf.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.save.mode", "overwrite")
.save()
This creates a MERGE
query using name
property as key for Musician
nodes.
The value of instrument
column is used as a value for Instrument
property name
, generating a statement like:
MERGE (target:Instrument {name: event.target.instrument})
.
Here you must specify which columns of your DataFrame will be written in the source node and in the target node properties.
You can do this with the options relationship.source.node.properties
and relationship.target.node.properties
,
specifying a comma-separated list of key:value
pairs, where the key is the DataFrame column name,
and the value is the node property name.
Same applies to relationship.properties
option, used to specify which DataFrame columns are written as relationship properties.
If key and value are the same field you can specify one without the colon.
For example, if you have .option("relationship.source.node.properties", "name:name,email:email") , you can also write
.option("relationship.source.node.properties", "name,email") .
Same applies for relationship.source.node.keys and relationship.target.node.keys .
|
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Orange", "Drums"),
(19, "John Mayer", "White", "Guitar"),
(32, "John Scofield", "Black", "Guitar"),
(15, "John Butler", "Wooden", "Guitar")
).toDF("experience", "name", "instrument_color", "instrument")
musicDf.coalesce(1)
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.node.properties", "instrument_color:color")
.option("relationship.target.save.mode", "overwrite")
.save()
Node save modes
You can specify four different modes for saving the nodes:
-
Overwrite
mode performs aMERGE
on that node. -
ErrorIfExists
mode performs aCREATE
(not available for Spark 3). -
Append
mode performs aCREATE
(not available for Spark 2.4). -
Match
mode performs aMATCH
.
For Overwrite mode you must have unique constraints on the keys.
|
Schema optimization operations
The Spark Connector supports schema optimization operations via:
-
indexes (deprecated);
-
constraints;
-
property type enforcement;
-
set of schema queries.
To speed up the import itself that is executed before the import process starts.
Starting from version |
schema.optimization.type
(Deprecated)
You can set the optimization via schema.optimization.type
option that works only if you are merging nodes and takes three values:
-
INDEX
: it creates only indexes on provided nodes. -
NODE_CONSTRAINTS
: it creates only indexes on provided nodes.
The schema.optimization.type option cannot be used with the query option.
If you are using a custom Cypher query, you need to create indexes and constraints manually using the script option.
|
Index creation
The following example shows how to create indexes while you’re creating nodes.
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "INDEX") .save()
Before the import starts, the following schema query is being created:
CREATE INDEX ON :Person(surname)
The name of the created index is spark_INDEX_<LABEL>_<NODE_KEYS>
, where <LABEL>
is the first label from the labels
option and <NODE_KEYS>
is a dash-separated sequence of one or more properties as specified in the node.keys
options.
In this example, the name of the created index is spark_INDEX_Person_surname
.
If the node.keys
option were set to "name,surname"
instead, the index name would be spark_INDEX_Person_name-surname
.
The index is not recreated if it is already present.
Constraint creation
Below you can see an example of how to create constraints while you’re creating nodes.
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "NODE_CONSTRAINTS") .save()
Before the import starts, the code above creates the following schema query:
CREATE CONSTRAINT FOR (p:Person) REQUIRE (p.surname) IS UNIQUE
The name of the created constraint is spark_NODE_CONSTRAINTS_<LABEL>_<NODE_KEYS>
, where <LABEL>
is the first label from the labels
option and <NODE_KEYS>
is a dash-separated sequence of one or more properties as specified in the node.keys
options.
In this example, the name of the created constraint is spark_NODE_CONSTRAINTS_Person_surname
.
If the node.keys
option were set to "name,surname"
instead, the constraint name would be spark_NODE_CONSTRAINTS_Person_name-surname
.
Take into consideration that the first label is used for the index creation.
With constraints
The connector allows to enforce the following constraints:
config | values | default | description |
---|---|---|---|
|
UNIQUE/KEY/NONE |
NONE |
Create the |
|
UNIQUE/KEY/NONE |
false |
Create the |
|
TYPE/EXISTS/NONE |
NONE |
A comma separated list of values. Creates the type constraints for nodes/relationships enforcing the type and non-nullability from the DataFrame schema |
Please consider that if you define more than one label we use the first for creating constraints. |
Node constraints
Enforcing unique constraint
For a detailed description of how Neo4j handles unique constraints on nodes see the Cypher documentation |
Given the following example:
ds.write
.format(classOf[DataSource].getName)
.mode(SaveMode.Overwrite)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":Person:Customer")
.option("node.keys", "surname")
.option("schema.optimization.node.keys", "UNIQUE")
.save()
Under the hood the Spark connector will create the following constraint:
CREATE CONSTRAINT `spark_NODE_UNIQUE-CONSTRAINT_Person_surname
IF NOT EXISTS FOR (e:Person) REQUIRE (e.surname) IS UNIQUE`
Enforcing node key constraint
For a detailed description of how Neo4j handles node key constraints see the Cypher documentation |
Given the following example:
ds.write
.format(classOf[DataSource].getName)
.mode(SaveMode.Overwrite)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":Person:Customer")
.option("node.keys", "surname")
.option("schema.optimization.node.keys", "KEY")
.save()
Under the hood the Spark connector will create the following constraint:
CREATE CONSTRAINT `spark_NODE_KEY-CONSTRAINT_Person_surname
IF NOT EXISTS FOR (e:Person) REQUIRE (e.surname) IS NODE KEY`
Relationship constraints
Enforcing unique constraint
For a detailed description of how Neo4j handles unique constraints on relationships see the official Cypher documentation |
Given the following example:
ds
.write
.mode(SaveMode.Overwrite)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("relationship", "MY_REL")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":NodeA")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "idSource:id")
.option("relationship.target.labels", ":NodeB")
.option("relationship.target.node.keys", "idTarget:id")
.option("relationship.target.save.mode", "Overwrite")
.option("schema.optimization.relationship.keys", "UNIQUE")
.option("relationship.keys", "foo,bar")
.save()
Under the hood the Spark connector will create the following constraint:
CREATE CONSTRAINT `spark_RELATIONSHIP_UNIQUE-CONSTRAINT_MY_REL_foo-bar
IF NOT EXISTS FOR ()-[e:MY_REL]→() REQUIRE (e.foo, e.bar) IS UNIQUE`
Enforcing relationship key constraint
For a detailed description of how Neo4j handles relationship key constraint see the official Cypher documentation |
Given the following example:
ds
.write
.mode(SaveMode.Overwrite)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("relationship", "MY_REL")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":NodeA")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "idSource:id")
.option("relationship.target.labels", ":NodeB")
.option("relationship.target.node.keys", "idTarget:id")
.option("relationship.target.save.mode", "Overwrite")
.option("schema.optimization.relationship.keys", "KEY")
.option("relationship.keys", "foo,bar")
.save()
Under the hood the Spark connector will create the following constraint:
CREATE CONSTRAINT `spark_RELATIONSHIP_KEY-CONSTRAINT_MY_REL_foo-bar
IF NOT EXISTS FOR ()-[e:MY_REL]→() REQUIRE (e.foo, e.bar) IS RELATIONSHIP KEY`
Property type constraints
Since Neo4j 5.11 the database allows to create type constraints for node and relationship properties.
In order to leverage this feature we added the option schema.optimization
that will use the DataFrame schema in order to enforce the type.
Internally the connector will use the following mapping:
Spark type |
Neo4j Type |
BooleanType |
BOOLEAN |
StringType |
STRING |
IntegerType |
INTEGER |
LongType |
INTEGER |
FloatType |
FLOAT |
DoubleType |
FLOAT |
DateType |
DATE |
TimestampType |
LOCAL DATETIME |
Custom |
POINT |
Custom |
DURATION |
DataTypes.createArrayType(BooleanType, false) |
LIST<BOOLEAN NOT NULL> |
DataTypes.createArrayType(StringType, false) |
LIST<STRING NOT NULL> |
DataTypes.createArrayType(IntegerType, false) |
LIST<INTEGER NOT NULL> |
DataTypes.createArrayType(LongType, false) |
LIST<INTEGER NOT NULL> |
DataTypes.createArrayType(FloatType, false) |
LIST<FLOAT NOT NULL> |
DataTypes.createArrayType(DoubleType, false) |
LIST<FLOAT NOT NULL> |
DataTypes.createArrayType(DateType, false) |
LIST<DATE NOT NULL> |
DataTypes.createArrayType(TimestampType, false) |
LIST<LOCAL DATETIME NOT NULL> |
DataTypes.createArrayType(pointType, false) |
LIST<POINT NOT NULL> |
DataTypes.createArrayType(durationType, false) |
LIST<DURATION NOT NULL> |
For the arrays in particular we use the version without null elements as Neo4j does not allow to have them in arrays.
You can leverage this kind of schema enforcement with the value TYPE
.
Property existence constraints
Neo4j defines "property existence" as a synonym for NOT NULL condition.
You can leverage this kind of schema enforcement with the value EXISTS
, the connector will use the nullability of the DataFrame column to choose whether to apply or not the NOT NULL condition.
Node Property type and existence constraints
Given the following example:
ds.write
.format(classOf[DataSource].getName)
.mode(SaveMode.Overwrite)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":Person:Customer")
.option("node.keys", "surname")
.option("schema.optimization", "TYPE,EXISTS")
.save()
The connector will create, for each dataframe column a type constraint for the label Person
according with the mapping table provided above.
The constraint query looks like the following:
CREATE CONSTRAINT `spark_NODE-TYPE-CONSTRAINT-Person-surname` IF NOT EXISTS FOR (e:Person) REQUIRE e.surname IS :: STRING
If the DataFrame schema says that the field is also NOT NULL the connector creates an existence constraint as it follows:
CREATE CONSTRAINT `spark_NODE-NOT_NULL-CONSTRAINT-Person-surname` IF NOT EXISTS FOR (e:Person) REQUIRE e.surname IS NOT NULL
Relationship Property type and existence constraints
Given the following example:
ds.write
.mode(SaveMode.Overwrite)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("relationship", "MY_REL")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":NodeA")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "idSource:id")
.option("relationship.target.labels", ":NodeB")
.option("relationship.target.node.keys", "idTarget:id")
.option("relationship.target.save.mode", "Overwrite")
.option("schema.optimization", "TYPE,EXISTS")
.save()
The connector will create:
-
a type constraint for node
NodeA
and propertyid
-
a type constraint for node
NodeB
and propertyid
-
all the remaining properties are used as relationship properties; for each property a type constraint is created for the relationship
MY_REL
by using the following query:
CREATE CONSTRAINT `spark_RELATIONSHIP-TYPE-CONSTRAINT-MY_REL-foo` IF NOT EXISTS FOR ()-[e:MY_REL]->() REQUIRE e.foo IS :: STRING
If the DataFrame schema says that the field is also NOT NULL the connector creates an existence constraint as it follows:
CREATE CONSTRAINT `spark_RELATIONSHIP-NOT_NULL-CONSTRAINT-MY_REL-foo` IF NOT EXISTS FOR ()-[e:MY_REL]->() REQUIRE e.foo IS NOT NULL
The constraint is not recreated if it is already present.
Script option
The script option allows you to execute a series of preparation script before Spark
Job execution. The result of the last query can be reused in combination with the
query
ingestion mode as it follows:
val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS() ds.write .format(classOf[DataSource].getName) .mode(SaveMode.ErrorIfExists) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})") .option("script", """CREATE INDEX person_surname FOR (p:Person) ON (p.surname); |CREATE CONSTRAINT product_name_sku FOR (p:Product) | REQUIRE (p.name, p.sku) | IS NODE KEY; |RETURN 36 AS age; |""".stripMargin) .save()
Before the import starts, the connector runs the content of the script
option,
and the result of the last query is injected into the query
. At the end the full
query executed by the connector while the data is being ingested is the following:
WITH $scriptResult AS scriptResult UNWIND $events AS event CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
scriptResult
is the result from the last query contained within the script
options
that is RETURN 36 AS age;
Performance considerations
Since writing is typically an expensive operation, make sure you write only the columns you need from the DataFrame.
For example, if the columns from the data source are name
, surname
, age
, and livesIn
, but you only need name
and surname
, you can do the following:
ds.select(ds("name"), ds("surname"))
.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
Note about columns with Map type
When a Dataframe column is a map, what we do internally is to flatten the map as Neo4j does not support this type for graph entity properties; so for a Spark job like this:
val data = Seq(
("Foo", 1, Map("inner" -> Map("key" -> "innerValue"))),
("Bar", 2, Map("inner" -> Map("key" -> "innerValue1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.save()
In Neo4j for the nodes with label MyNodeWithFlattenedMap
you’ll find this information stored:
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.inner.key`: 'innerValue' } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.inner.key`: 'innerValue1' }
Now you could fall into problematic situations like the following one:
val data = Seq(
("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.save()
since the resulting flattened keys are duplicated, the Neo4j Spark will pick one of the associated value in a non-deterministic way.
Because the information that we’ll store into Neo4j will be this (consider that the order is not guaranteed):
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.key.inner.key`: 'innerValue' // but it could be `value` as the order is not guaranteed } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.key.inner.key`: 'innerValue1' // but it could be `value1` as the order is not guaranteed }
Group duplicated keys to array of values
You can use the option schema.map.group.duplicate.keys
to avoid this problem. The connector will group all the values with the same keys into an array. The default value for the option is false
.
In a scenario like this:
val data = Seq(
("Foo", 1, Map("key.inner" -> Map("key" -> "innerValue"), "key" -> Map("inner.key" -> "value"))),
("Bar", 1, Map("key.inner" -> Map("key" -> "innerValue1"), "key" -> Map("inner.key" -> "value1"))),
).toDF("id", "time", "table")
data.write
.mode(SaveMode.Append)
.format(classOf[DataSource].getName)
.option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
.option("labels", ":MyNodeWithFlattenedMap")
.option("schema.map.group.duplicate.keys", true)
.save()
the output would be:
MyNodeWithFlattenedMap { id: 'Foo', time: 1, `table.key.inner.key`: ['innerValue', 'value'] // the order is not guaranteed } MyNodeWithFlattenedMap { id: 'Bar', time: 1, `table.key.inner.key`: ['innerValue1', 'value1'] // the order is not guaranteed }