Write relationships
All the examples in this page assume that the |
With the relationship
option, the connector writes a Spark DataFrame to the Neo4j database by specifying source, target nodes, and a relationship.
To avoid deadlocks, always use a single partition (with |
The connector builds a CREATE
or a MERGE
Cypher® query (depending on the save mode) that uses the UNWIND
clause to write a batch of rows (an events
list with size defined by the batch.size
option).
The rest of the query is built depending on a number of data source options.
Option | Description | Value | Default |
---|---|---|---|
|
Defines the save strategy to use. |
|
|
and
|
Define the node save mode, and can be set independently for source and target nodes. |
|
|
and
|
Required. Define the labels to assign to source and target nodes. |
Colon-separated list of labels. |
(empty) |
and
|
When the node save mode is |
Comma-separated list of If |
(empty) |
and
|
When the save strategy is |
Comma-separated list of If |
(empty) |
|
When the save strategy is |
Comma-separated list of If |
(empty) |
Save strategies
The save strategy defines the way the connector maps the DataFrame schema to Neo4j nodes and relationships.
native
strategy
The native
strategy requires the DataFrame to conform to the relationship read schema with the relationship.nodes.map
option set to false
.
The DataFrame must include at least one of the rel.[property name]
, source.[property name]
, or target.[property name]
columns.
A good use case for this mode is transferring data from a database to another one. When you use the connector to read a relationship, the resulting DataFrame has the correct schema.
If you use the connector to read data from a database and write data to a different database, you need to set the connection options on each DataFrame rather than on the Spark session. |
The following example shows how to use the native
strategy with the Append
save mode both for the relationship and for source/target nodes.
If run multiple times, it creates duplicate relationships and nodes.
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
"source.name",
"source.surname",
"source.id",
"target.name",
"rel.quantity",
"rel.order"
)
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
[
{
"source.name": "John",
"source.surname": "Doe",
"source.customerID": 1,
"target.name": "Product 1",
"rel.quantity": 200,
"rel.order": "ABC100",
},
{
"source.name": "Jane",
"source.surname": "Doe",
"source.customerID": 2,
"target.name": "Product 2",
"rel.quantity": 100,
"rel.order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
.save()
)
Equivalent Cypher query
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
keys
strategy
The keys
strategy gives more control on how relationships and source/target nodes are written.
It does not require any specific schema for the DataFrame, and you can specify which columns to write as node and relationship properties.
The following example shows how to use the keys
strategy with the Append
save mode both for the relationship and for source/target nodes.
If run multiple times, it creates duplicate relationships and nodes.
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Create source nodes and assign them a label
.option("relationship.source.save.mode", "Append")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Create target nodes and assign them a label
.option("relationship.target.save.mode", "Append")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
Equivalent Cypher query
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Node save modes
The examples from the previous section use the Append
mode for both relationships and nodes; this means that new relationships and new nodes are created every time the code is run.
Node save modes different from Append
have a different behaviour.
Match
mode
The Match
mode requires nodes with the selected labels and keys to already exist.
This mode requires both the relationship.source.node.keys
and the relationship.target.node.keys
options.
The following example does not create any relationships if there are no matching nodes.
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Match source nodes with the specified label
.option("relationship.source.save.mode", "Match")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Match target nodes with the specified label
.option("relationship.target.save.mode", "Match")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
Equivalent Cypher query
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Overwrite
mode
The Overwrite
mode creates nodes with the selected labels and keys if they do not already exist.
This mode requires both the relationship.source.node.keys
and the relationship.target.node.keys
options.
If run multiple times, the following example creates duplicate relationships but no duplicate nodes.
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Create new relationships
.mode(SaveMode.Append)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Create new relationships
.mode("Append")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
Equivalent Cypher query
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Due to the concurrency of Spark jobs, when using the |
Overwrite nodes and relationships
If you need to upsert both nodes and relationships, you must use the Overwrite
mode for all the relationship.source.node.keys
, relationship.target.node.keys
, and mode
options.
If run multiple times, the following example does not create any duplicate nodes or relationships.
val relDF = Seq(
("John", "Doe", 1, "Product 1", 200, "ABC100"),
("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")
relDF.write
// Overwrite relationships
.mode(SaveMode.Overwrite)
.format("org.neo4j.spark.DataSource")
// Assign a type to the relationships
.option("relationship", "BOUGHT")
// Use `keys` strategy
.option("relationship.save.strategy", "keys")
// Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
// Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties",
"name,surname,customerID:id"
)
// Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
// Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
// Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
// Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
// Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
relDF = spark.createDataFrame(
[
{
"name": "John",
"surname": "Doe",
"customerID": 1,
"product": "Product 1",
"quantity": 200,
"order": "ABC100",
},
{
"name": "Jane",
"surname": "Doe",
"customerID": 2,
"product": "Product 2",
"quantity": 100,
"order": "ABC200",
},
]
)
(
relDF.write
# Overwrite relationships
.mode("Overwrite")
.format("org.neo4j.spark.DataSource")
# Assign a type to the relationships
.option("relationship", "BOUGHT")
# Use `keys` strategy
.option("relationship.save.strategy", "keys")
# Overwrite source nodes and assign them a label
.option("relationship.source.save.mode", "Overwrite")
.option("relationship.source.labels", ":Customer")
# Map the DataFrame columns to node properties
.option(
"relationship.source.node.properties", "name,surname,customerID:id"
)
# Node keys are mandatory for overwrite save mode
.option("relationship.source.node.keys", "customerID:id")
# Overwrite target nodes and assign them a label
.option("relationship.target.save.mode", "Overwrite")
.option("relationship.target.labels", ":Product")
# Map the DataFrame columns to node properties
.option("relationship.target.node.properties", "product:name")
# Node keys are mandatory for overwrite save mode
.option("relationship.target.node.keys", "product:name")
# Map the DataFrame columns to relationship properties
.option("relationship.properties", "quantity,order")
.save()
)
Equivalent Cypher query
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties
Due to the concurrency of Spark jobs, when using the |