Writing to Neo4j
The following section will cover the DataSource Writer, and how to transfer the Spark Dataset content into Neo4j.
Getting Started
Given the following Scala Program:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val ds = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDS()
ds.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
Will insert 10 nodes into Neo4j via Spark, and each of these will have:
-
2 labels:
Person
andCustomer
-
4 properties:
name
,surname
,age
andlivesIn
Save Mode
In order to persist data into Neo4j the Spark Connector supports two save modes that will
work only if UNIQUE
or NODE KEY
constraints are defined into Neo4j for the given properties.
These SaveMode examples apply to the Scala class org.apache.spark.sql.SaveMode . For PySpark, simply
use a static string with the name of the SaveMode. So instead of SaveMode.Overwrite , use "Overwrite" for pyspark.
|
For Node type
-
SaveMode.ErrorIfExists
: this will build aCREATE
query -
SaveMode.Overwrite
: this will build aMERGE
query
For SaveMode.Overwrite mode you need to have unique constrains on the keys.
|
For Relationship type
-
SaveMode.ErrorIfExists
: this will build aCREATE
query -
SaveMode.Append
: this will build aMERGE
query
For SaveMode.Append mode you need to have unique constrains on the keys.
|
In both cases the default SaveMode is ErrorIfExists
.
Options
The DataSource Writer has several options in order to connect and persist data into Neo4j.
Setting Name | Description | Default Value | Required |
---|---|---|---|
|
Colon separated list of the labels to attach to the node |
(none) |
No |
|
The number of the rows sent to Neo4j as batch |
5000 |
No |
|
Comma separated list of Neo4j codes that will cause the transaction to fail |
(none) |
No |
|
Number of retries in case of failure |
3 |
No |
Node Specific Options |
|||
|
Comma separated list of properties considered as node keys in case of you’re using
|
(none) |
No |
Relationship Specific Options |
|||
|
Map used as keys for specify the relationship properties. Only used if |
(empty) |
No |
|
Save strategy to be used |
|
Yes |
|
Colon separated list of labels that identify the source node |
(empty) |
Yes |
|
Map used as keys for matching the source node |
(empty) |
No |
|
Source node Save Mode |
|
No |
|
Map used as keys for specify the source properties. Only used if |
(empty) |
No |
|
Colon separated list of labels that identify the target node |
(empty) |
Yes |
|
Map used as keys for matching the target node |
(empty) |
No |
|
Target node Save Mode |
|
No |
|
Map used as keys for specify the target properties. Only used if |
(empty) |
No |
As the Neo4j Connector for Apache Spark provide batch writes in order to speed-up the ingestion process so if in the process at some point fails all the previous data is already persisted. |
Write Data
Writing data to a Neo4j Database can be done in 3 ways:
Custom Cypher Query
In case you use the option query
the Spark Connector will persist the entire Dataset by using the provided query.
The nodes will be sent to Neo4j in a batch of rows defined in the batch.size
property and we will
wrap your query in an UNWIND $events AS event
statement.
So given the following simple Spark program:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (1 to 10)/*...*/.toDF()
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("query", "CREATE (n:Person {fullName: event.name + event.surname})")
.save()
This will be the generated query:
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + event.surname})
Where events
is the batch created from your dataset.
Node
In case you use the option labels
the Spark Connector will persist the entire Dataset as nodes.
Depending on the SaveMode
it will CREATE
or MERGE
nodes (in the last case using the node.keys
properties).
The nodes will be sent to Neo4j in a batch of rows defined in the batch.size
property and we will
perform an UNWIND operation under the hood.
Let’s take our first example:
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.util.Random
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
case class Point3d(`type`: String = "point-3d",
srid: Int,
x: Double,
y: Double,
z: Double)
case class Person(name: String, surname: String, age: Int, livesIn: Point3d)
val total = 10
val rand = Random
val df = (1 to total)
.map(i => {
Person(name = "Andrea " + i, "Santurbano " + i, rand.nextInt(100),
Point3d(srid = 4979, x = 12.5811776, y = 41.9579492, z = 1.3))
}).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.save()
This will be converted in a similar query:
UNWIND $events AS event
CREATE (n:`Person`:`Customer`) SET n += event.properties
If we instead use the same DataFrame but we save it in Overwrite
mode:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val df = (1 to 10)/*...*/.toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Overwrite)
.option("url", "bolt://localhost:7687")
.option("labels", ":Person:Customer")
.option("node.keys", "name,surname")
.save()
The generated query will be
UNWIND $events AS event
MERGE (n:`Person`:`Customer` {name: event.keys.name, surname: event.keys.surname})
SET n += event.properties
Here you must specify which columns of your Dataframe will be used as keys to match the nodes.
You control this with the option node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the dataframe column name, and the value is the node property name.
If key and value are the same field you can just specify one without the colon.
For example, say you have .option("node.keys", "name:name,email:email") , you can also write
.option("node.keys", "name,email") .
|
In case the column value is a Map<String, Value
> (where Value
can be any supported
Neo4j Type) the Connector will
automatically try to flatten it.
Let’s say you have the following Dataset:
id | name | lives_in |
---|---|---|
1 |
Andrea Santurbano |
{address: 'Times Square, 1', city: 'NY', state: 'NY'} |
2 |
Davide Fantuzzi |
{address: 'Statue of Liberty, 10', city: 'NY', state: 'NY'} |
Neo4j Connector for Apache Spark will flatten the maps and each map value will be in it’s own property.
id | name | lives_in.address | lives_in.city | lives_in.state |
---|---|---|---|---|
1 |
Andrea Santurbano |
Times Square, 1 |
NY |
NY |
2 |
Davide Fantuzzi |
Statue of Liberty, 10 |
NY |
NY |
Relationship
You can write a dataframe to Neo4j by specifying source, target and relation.
Overview
We need to spend a some words on this method since its a bit complex, and the combinations of options are quite a few. So we feel the need to clarify the vocabulary first, before diving into the actual process.
Theory is simple, we take your Dataset and we move the columns around to create source and target nodes, eventually creating the specifid relationship between these two.
This is a basic example of what would happen.
UNWIND $events AS event
CREATE (source:Person)
SET source = event.source
CREATE (target:Product)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
The CREATE
keyword for the source and target nodes can be replaced by a MERGE
or a MATCH
.
To control this you can use the node save modes.
You can set source and target independently by using relationship.source.save.mode
or `relationship.target.save.mode
.
When using MATCH
or MERGE
you will need to specify keys that identify the nodes.
This is what the options relationship.source.node.keys
and relationship.target.node.keys
.
More on this here.
The CREATE
keyword for the relationship can be replaced by a MERGE
.
You can control this with Save Mode.
You are also required to specify one of the two Save Strategies. This will identify which method will be used to create the Cypher query and can have additional options available.
Native Strategy
This strategy is useful when you have a schema that conforms with the Relationship Read Schema, with the relationship.nodes.map
set to false.
Let’s say we want to read relationship from a Database, filter them, and write the result to another Database:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val originalDf = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://allprod.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
originalDf
.where("`target.price` > 2000")
.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://expensiveprod.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "ErrorIfExists")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "ErrorIfExists")
.save()
You just need to specify the source node labels, the target node labels, and the relationship you want between them.
The generated query will be:
UNWIND $events AS event
CREATE (source:Person:Rich)
SET source = event.source
CREATE (target:Product:Expensive)
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
event.source
, event.target
, and event.rel
will contain the column described here.
The default save mode for source and target nodes is Match .
This means that the relationship will be created only if the nodes are already in your DB.
Look at here for more info about node save modes.
|
When using Overwrite
or Match
node save mode, you should specify which keys should be used to identify the nodes.
<rel.id> | <rel.type> | <source.id> | <source.labels> | source.id | source.fullName | <target.id> | <target.labels> | target.name | target.id | rel.quantity |
---|---|---|---|---|---|---|---|---|---|---|
4 |
BOUGHT |
1 |
[Person] |
1 |
John Doe |
0 |
[Product] |
Product 1 |
52 |
240 |
5 |
BOUGHT |
3 |
[Person] |
2 |
Jane Doe |
2 |
[Product] |
Product 2 |
53 |
145 |
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
// we read our DF from Neo4j using the relationship method
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "bolt://first.host.com:7687")
.option("relationship", "BOUGHT")
.option("relationship.nodes.map", "false")
.option("relationship.source.labels", "Person")
.option("relationship.target.labels", "Product")
.load()
df.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://second.host.com:7687")
.option("relationship", "SOLD")
.option("relationship.source.labels", ":Person:Rich")
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.node.keys", "source.fullName:fullName")
.option("relationship.target.labels", ":Product:Expensive")
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.node.keys", "target.id:id")
.save()
Here you must specify which columns of your Dataframe will be used as keys to match the nodes.
You control this with the option relationship.source.node.keys
and relationship.target.node.keys
, specifying a comma-separated list of key:value
pairs,
where the key is the dataframe column name, and the value is the node property name.
The generated query will be:
UNWIND $events AS event
MERGE (source:Person:Rich {fullName: event.source.fullName})
SET source = event.source
MERGE (target:Product:Expensive {id: event.target.id})
SET target = event.target
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel
Remember that you can choose to CREATE or MERGE the relationship with the save mode.
|
If the provided dataframe schema doesn’t conform the required schema, meaning that none of the required column is present, the write will fail. |
Keys Strategy
When you want more control on the relationship writing you can use the KEYS strategy.
As the native strategy, you can specify node keys to identify nodes. In addition, you can also specify which columns should be written as nodes properties.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Drums"),
(19, "John Mayer", "Guitar"),
(32, "John Scofield", "Guitar"),
(15, "John Butler", "Guitar")
).toDF("experience", "name", "instrument")
musicDf.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.save.mode", "overwrite")
.save()
This will create a MERGE
query using name
property as key for Musician
nodes.
The value of instrument
column will be used as value for Instrument
property name
, generating a statement like:
MERGE (target:Instrument {name: event.target.instrument})
Here you must specify which columns of your Dataframe will be written in the source node and in the target node properties.
You can do this with the option relationship.source.node.properties
and relationship.target.node.properties
,
specifying a comma-separated list of key:value
pairs, where the key is the dataframe column name,
and the value is the node property name.
Same applies to relationship.properties
option, used to specify which dataframe columns will be written as relationship properties.
If key and value are the same field you can just specify one without the colon.
For example, say you have .option("relationship.source.node.properties", "name:name,email:email") , you can also write
.option("relationship.source.node.properties", "name,email") .
Same applies for relationship.source.node.keys and relationship.target.node.keys .
|
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
import spark.implicits._
val musicDf = Seq(
(12, "John Bonham", "Orange", "Drums"),
(19, "John Mayer", "White", "Guitar"),
(32, "John Scofield", "Black", "Guitar"),
(15, "John Butler", "Wooden", "Guitar")
).toDF("experience", "name", "instrument_color", "instrument")
musicDf.write
.format("org.neo4j.spark.DataSource")
.option("url", "bolt://localhost:7687")
.option("relationship", "PLAYS")
.option("relationship.save.strategy", "keys")
.option("relationship.source.labels", ":Musician")
.option("relationship.source.save.mode", "overwrite")
.option("relationship.source.node.keys", "name:name")
.option("relationship.target.labels", ":Instrument")
.option("relationship.target.node.keys", "instrument:name")
.option("relationship.target.node.properties", "instrument_color:color")
.option("relationship.target.save.mode", "overwrite")
.save()
Schema Optimization Operations
The spark connector supports schema optimization operations via:
-
index
-
constraints
-
set of schema queries
that will be executed before the import process will start in order to speed-up the import itself.
You can set the optimization via schema.optimization.type
option that takes three values:
-
INDEX
: it creates only indexes on provided nodes -
NODE_CONSTRAINTS
: it creates only indexes on provided nodes -
QUERY
: it perform a series of schema queries separated by;
and it works only when you’re merging nodes.
Index Creation
Following an example of how to create indexes while you’re creating nodes
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "INDEX") .save()
This will create, before the import starts, the following schema query:
CREATE INDEX ON :Person(surname)
So please into consideration that the first label is used for the index creation
Constraint Creation
Following an example of how to create indexes while you’re creating nodes
ds.write .format(classOf[DataSource].getName) .mode(SaveMode.Overwrite) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("labels", ":Person:Customer") .option("node.keys", "surname") .option("schema.optimization.type", "NODE_CONSTRAINTS") .save()
This will create, before the import starts, the following schema query:
CREATE CONSTRAINT ON (p:Person) ASSERT (p.surname) IS UNIQUE
So please into consideration that the first label is used for the index creation
Script Option
The script option allow you to execute a series of preparation script before Spark
Job execution, the result of the last query can be reused in combination with the
query
ingestion mode as it follows
val ds = Seq(SimplePerson("Andrea", "Santurbano")).toDS() ds.write .format(classOf[DataSource].getName) .mode(SaveMode.ErrorIfExists) .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl) .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})") .option("script", """CREATE INDEX ON :Person(surname); |CREATE CONSTRAINT ON (p:Product) | ASSERT (p.name, p.sku) | IS NODE KEY; |RETURN 36 AS age; |""".stripMargin) .save()
Before the import starts, the connector will run the content of the script
option
and the result of the last query will be injected into the query
; in the end the full
query executed by the connector while is ingesting the data will be
WITH $scriptResult AS scriptResult UNWIND $events AS event CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
where scriptResult
is the result from the last query contained into the script
options
that is RETURN 36 AS age;
Was this page helpful?