Writing to Neo4j
The following section covers the DataSource Writer and how to transfer the Spark dataset content into Neo4j.
Getting started
Let’s look at the following code sample:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val ds = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDS()
ds.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
The above code inserts 10 nodes into Neo4j via Spark, and each of them has:
-
Two labels:
Person
andCustomer
. -
Four properties:
name
,surname
,age
, andlivesIn
.
Save mode
To persist data into Neo4j, the Spark Connector supports two save modes that
work only if UNIQUE
or NODE KEY
constraints are defined in Neo4j for the given properties.
The SaveMode examples apply to the Scala class org.apache.spark.sql.SaveMode . For PySpark,
use a static string with the name of the SaveMode. So instead of SaveMode.Overwrite , use Overwrite for PySpark.
|
For Node type:
-
SaveMode.ErrorIfExists
builds aCREATE
query. -
SaveMode.Overwrite
builds aMERGE
query.
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.
|
For Relationship type:
-
SaveMode.ErrorIfExists
builds aCREATE
query. -
SaveMode.Overwrite
builds aMERGE
query.
If you are using Spark 3, the default save mode ErrorIfExists does not work, use Append instead. |
For SaveMode.Overwrite mode, you need to have unique constraints on the keys.
|
In both cases, the default save mode is ErrorIfExists
.
Options
The DataSource Writer has several options to connect and persist data into Neo4j.
Setting name | Description | Default value | Required |
---|---|---|---|
|
Colon separated list of the labels to attach to the node |
(none) |
No |
|
The number of the rows sent to Neo4j as batch |
5000 |
No |
|
Comma-separated list of Neo4j codes that cause the transaction to fail |
(none) |
No |
|
Number of retries in case of failure |
3 |
No |
|
The time in milliseconds that the connector should wait before retry |
0 |
No |
Node specific options |
|||
|
Comma-separated list of properties considered as node keys if you are using
|
(none) |
No |
Relationship specific options |
|||
|
Map used as keys for specifying the relationship properties. Used only if |
(empty) |
No |
|
Save strategy to be used |
|
Yes |
|
Colon-separated list of labels that identify the source node |
(empty) |
Yes |
|
Map used as keys for matching the source node |
(empty) |
No |
|
Source Node save mode |
|
No |
|
Map used as keys for specifying the source properties. Only used if |
(empty) |
No |
|
Colon-separated list of labels that identify the target node |
(empty) |
Yes |
|
Map used as keys for matching the target node |
(empty) |
No |
|
Target Node save mode |
|
No |
|
Map used as keys for specifying the target properties. Only used if |
(empty) |
No |
Neo4j Connector for Apache Spark provides batch writes to speed up the ingestion process, so if the process at some point fails, all the previous data are already persisted. |
Write data
Writing data to a Neo4j database can be done in three ways:
Custom Cypher query
In case you use the option query
, the Spark Connector persists the entire Dataset by using the provided query.
The nodes are sent to Neo4j in a batch of rows defined in the batch.size
property, and your query is wrapped up in an UNWIND $events AS event
statement.
Let’s look at the following simple Spark program:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (1 to 10)/*...*/.toDF()
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
.save()
This generates the following query:
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + event.surname})
Thus events
is the batch created from your dataset.
Node
In case you use the option labels
, the Spark Connector persists the entire dataset as nodes.
Depending on the SaveMode, it is going to CREATE
or MERGE
nodes (in the last case the node.keys
properties are being used).
The nodes are sent to Neo4j in a batch of rows defined in the batch.size
property, and an UNWIND
operation is performed under the hood.
Let’s remember the first example in this chapter:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val df = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
The above code is converted in a similar Cypher query:
UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties
The following example of how to use the same DataFrame and save it in Overwrite
mode:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (1 to 10)/*...*/.toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.option("node.keys", "name,surname")
.save()
The code above generates the following Cypher query:
UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties
You must specify, which columns of your DataFrame are used as keys to match the nodes.
You control this with the option node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the DataFrame column name and the value is the node property name.
If key and value are the same field, you can specify one without the colon.
For example, if you have .option("node.keys", "name:name,email:email") , you can also write
.option("node.keys", "name,email") .
|
In case the column value is a Map<String, Value
> (where Value
can be any supported
Neo4j Type), the connector
automatically tries to flatten it.
Let’s consider the following dataset:
id | name | lives_in |
---|---|---|
1 |
Andrea Santurbano |
{address: 'Times Square, 1', city: 'NY', state: 'NY'} |
2 |
Davide Fantuzzi |
{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'} |
Neo4j Connector for Apache Spark flattens the maps, and each map value is in it’s own property.
id | name | lives_in.address | lives_in.city | lives_in.state |
---|---|---|---|---|
1 |
Andrea Santurbano |
Times Square, 1 |
NY |
NY |
2 |
Davide Fantuzzi |
Statue of Liberty, 10 |
NY |
NY |
Relationship
You can write a DataFrame to Neo4j by specifying source, target nodes, and relationships.
Overview
Before diving into the actual process, let’s clarify the vocabulary first. Since this method of writing data to Neo4j is more complex and few combinations of options can be used, let’s spend more time on explaining it.
In theory you should take your dataset and move the columns around to create source and target nodes, eventually creating the specified relationships between them.
This is a basic example of what would happen:
UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
The CREATE
keyword for the source and target nodes can be replaced by MERGE
or MATCH
.
To control this you can use the Node save modes.
You can set source and target nodes independently by using relationship.source.save.mode
or relationship.target.save.mode
.
These options accept a case insensitive string as a value, that can be one of ErrorIfExists
, Overwrite
, Append
;
they work in the same same way as the Node save modes.
When using MATCH
or MERGE
, you need to specify keys that identify the nodes.
This is what the options relationship.source.node.keys
and relationship.target.node.keys
.
More on this here.
The CREATE
keyword for the relationship can be replaced by a MERGE
.
You can control this with Save mode.
You are also required to specify one of the two Save Strategies. This identifies which method is to be used to create the Cypher query and can have additional options available.
Save strategies
Native strategy
The Native strategy is useful when you have a schema that conforms with the Relationship read schema, and the relationship.nodes.map
set to false.
If you want to read relationship from a database, filter data, and write the result to another database, you can refer to the following example:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val originalDf = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://allprod.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
originalDf
.where("`target.price` > 2000")
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://expensiveprod.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "ErrorIfExists")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "ErrorIfExists")
.save()
You just need to specify the source node labels, the target node labels, and the relationship you want between them.
The generated query is the following:
UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
event.source
, event.target
, and event.rel
contain the column described here.
The default save mode for source and target nodes is Match .
That means that the relationship can be created only if the nodes are already in your database.
Look at here for more information on node save modes.
|
When using Overwrite
or Match
node save mode, you should specify which keys should be used to identify the nodes.
<rel.id> | <rel.type> | <source.id> | <source.labels> | source.id | source.fullName | <target.id> | <target.labels> | target.name | target.id | rel.quantity |
---|---|---|---|---|---|---|---|---|---|---|
4 |
BOUGHT |
1 |
[Person] |
1 |
John Doe |
0 |
[Product] |
Product 1 |
52 |
240 |
5 |
BOUGHT |
3 |
[Person] |
2 |
Jane Doe |
2 |
[Product] |
Product 2 |
53 |
145 |
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://first.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://second.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "source.fullName:fullName")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.node.keys", "target.id:id")
.save()
You must specify which columns of your DataFrame are being used as keys to match the nodes.
You control this with the options relationship.source.node.keys
and relationship.target.node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the DataFrame column name, and the value is the node property name.
The generated query is the following:
UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the Save mode.
|
If the provided DataFrame schema doesn’t conform to the required schema, meaning that none of the required columns is present, the write fails. |
Keys strategy
When you want more control over the relationship writing, you can use the Keys strategy.
As in the case of using the Native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Drums"),
(19, "John Mayer", "Guitar"),
(32, "John Scofield", "Guitar"),
(15, "John Butler", "Guitar")
).toDF("experience", "name", "instrument")
musicDf.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.save.mode", "overwrite")
.save()
This creates a MERGE
query using name
property as key for Musician
nodes.
The value of instrument
column is used as a value for Instrument
property name
, generating a statement like:
MERGE (target:Instrument {name: event.target.instrument})
.
Here you must specify which columns of your DataFrame will be written in the source node and in the target node properties.
You can do this with the options relationship.source.node.properties
and relationship.target.node.properties
,
specifying a comma-separated list of key:value
pairs, where the key is the DataFrame column name,
and the value is the node property name.
Same applies to relationship.properties
option, used to specify which DataFrame columns are written as relationship properties.
If key and value are the same field you can specify one without the colon.
For example, if you have .option("relationship.source.node.properties", "name:name,email:email") , you can also write
.option("relationship.source.node.properties", "name,email") .
Same applies for relationship.source.node.keys and relationship.target.node.keys .
|
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Orange", "Drums"),
(19, "John Mayer", "White", "Guitar"),
(32, "John Scofield", "Black", "Guitar"),
(15, "John Butler", "Wooden", "Guitar")
).toDF("experience", "name", "instrument_color", "instrument")
musicDf.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.node.properties", "instrument_color:color")
.option("relationship.target.save.mode", "overwrite")
.save()
Node save modes
You can specify four different modes for saving the nodes:
-
Overwrite
mode performs aMERGE
on that node. -
ErrorIfExists
mode performs aCREATE
(not available for Spark 3). -
Append
mode performs aCREATE
(not available for Spark 2.4). -
Match
mode performs aMATCH
.
For Overwrite mode you must have unique constraints on the keys.
|
Schema optimization operations
The Spark Connector supports schema optimization operations via:
-
index.
-
constraints.
-
set of schema queries.
To speed up the import itself that is executed before the import process starts.
You can set the optimization via schema.optimization.type
option that works only if you are merging nodes and takes three values:
-
INDEX
: it creates only indexes on provided nodes. -
NODE_CONSTRAINTS
: it creates only indexes on provided nodes.
Index creation
The following example shows how to create indexes while you’re creating nodes.
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "INDEX") .save()
Before the import starts, the following schema query is being created:
CREATE INDEX ON :Person(surname)
Take into consideration that the first label is used for the index creation.
Constraint creation
Below you can see an example of how to create indexes while you’re creating nodes.
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "NODE_CONSTRAINTS") .save()
Before the import starts, the code above creates the following schema query:
CREATE CONSTRAINT ON (p:Person) ASSERT (p.surname) IS UNIQUE
Take into consideration that the first label is used for the index creation.
Script option
The script option allows you to execute a series of preparation script before Spark
Job execution. The result of the last query can be reused in combination with the
query
ingestion mode as it follows:
val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS() ds.write .format(classOf[DataSource].getName) .mode(SaveMode.ErrorIfExists) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})") .option("script", """CREATE INDEX ON :Person(surname); |CREATE CONSTRAINT ON (p:Product) | ASSERT (p.name, p.sku) | IS NODE KEY; |RETURN 36 AS age; |""".stripMargin) .save()
Before the import starts, the connector runs the content of the script
option,
and the result of the last query is injected into the query
. At the end the full
query executed by the connector while the data is being ingested is the following:
WITH $scriptResult AS scriptResult UNWIND $events AS event CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
scriptResult
is the result from the last query contained within the script
options
that is RETURN 36 AS age;
Was this page helpful?