Write relationships

All the examples in this page assume that the SparkSession has been initialized with the appropriate authentication options. See the Quickstart examples for more details.

With the relationship option, the connector writes a Spark DataFrame to the Neo4j database by specifying source, target nodes, and a relationship.

To avoid deadlocks, always use a single partition (with coalesce(1) or repartition(1)) before writing relationships to Neo4j.

The connector builds a CREATE or a MERGE Cypher® query (depending on the save mode) that uses the UNWIND clause to write a batch of rows (an events list with size defined by the batch.size option).

The rest of the query is built depending on a number of data source options.

Table 1. Write options
Option Description Value Default

relationship.save.strategy

Defines the save strategy to use.

  • native requires the DataFrame to use a specific schema.

  • keys is more flexible.

native

relationship.source.save.mode

and

relationship.target.save.mode

Define the node save mode, and can be set independently for source and target nodes.

  • Match mode performs a MATCH.

  • Append mode performs a CREATE.

  • Overwrite mode performs a MERGE.

Match

relationship.source.labels

and

relationship.target.labels

Required.

Define the labels to assign to source and target nodes.

Colon-separated list of labels.

(empty)

relationship.source.node.keys

and

relationship.target.node.keys

When the node save mode is Match or Overwrite, define the node keys that identify the nodes.

Comma-separated list of key:value pairs.

If key and value have the same value (for example "name:name"), you can omit one.

(empty)

relationship.source.node.properties

and

relationship.target.node.properties

When the save strategy is keys, define which DataFrame columns to write as source/target node properties.

Comma-separated list of key:value pairs.

If key and value have the same value (for example "name:name"), you can omit one.

(empty)

relationship.properties

When the save strategy is keys, defines which DataFrame columns to write as relationship properties.

Comma-separated list of key:value pairs.

If key and value have the same value (for example "name:name"), you can omit one.

(empty)

Save strategies

The save strategy defines the way the connector maps the DataFrame schema to Neo4j nodes and relationships.

native strategy

The native strategy requires the DataFrame to conform to the relationship read schema with the relationship.nodes.map option set to false. The DataFrame must include at least one of the rel.[property name], source.[property name], or target.[property name] columns.

A good use case for this mode is transferring data from a database to another one. When you use the connector to read a relationship, the resulting DataFrame has the correct schema.

If you use the connector to read data from a database and write data to a different database, you need to set the connection options on each DataFrame rather than on the Spark session.

The following example shows how to use the native strategy with the Append save mode both for the relationship and for source/target nodes. If run multiple times, it creates duplicate relationships and nodes.

Example
// Columns representing node/relationships properties
// must use the "rel.", "source.", or "target." prefix
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF(
    "source.name",
    "source.surname",
    "source.id",
    "target.name",
    "rel.quantity",
    "rel.order"
)

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
Example
# Columns representing node/relationships properties
# must use the "rel.", "source.", or "target." prefix
relDF = spark.createDataFrame(
    [
        {
            "source.name": "John",
            "source.surname": "Doe",
            "source.customerID": 1,
            "target.name": "Product 1",
            "rel.quantity": 200,
            "rel.order": "ABC100",
        },
        {
            "source.name": "Jane",
            "source.surname": "Doe",
            "source.customerID": 2,
            "target.name": "Product 2",
            "rel.quantity": 100,
            "rel.order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    .save()
)
Equivalent Cypher query
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

keys strategy

The keys strategy gives more control on how relationships and source/target nodes are written. It does not require any specific schema for the DataFrame, and you can specify which columns to write as node and relationship properties.

The following example shows how to use the keys strategy with the Append save mode both for the relationship and for source/target nodes. If run multiple times, it creates duplicate relationships and nodes.

Example
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
Example
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Create source nodes and assign them a label
    .option("relationship.source.save.mode", "Append")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Create target nodes and assign them a label
    .option("relationship.target.save.mode", "Append")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
Equivalent Cypher query
UNWIND $events AS event
CREATE (source:Customer)
SET source += event.source.properties
CREATE (target:Product)
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Node save modes

The examples from the previous section use the Append mode for both relationships and nodes; this means that new relationships and new nodes are created every time the code is run.

Node save modes different from Append have a different behaviour.

Match mode

The Match mode requires nodes with the selected labels and keys to already exist. This mode requires both the relationship.source.node.keys and the relationship.target.node.keys options.

The following example does not create any relationships if there are no matching nodes.

Example
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
Example
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Match source nodes with the specified label
    .option("relationship.source.save.mode", "Match")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Match target nodes with the specified label
    .option("relationship.target.save.mode", "Match")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
Equivalent Cypher query
UNWIND $events AS event
MATCH (source:Customer {id: event.source.keys.id})
MATCH (target:Product {name: event.target.keys.name})
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Overwrite mode

The Overwrite mode creates nodes with the selected labels and keys if they do not already exist. This mode requires both the relationship.source.node.keys and the relationship.target.node.keys options.

If run multiple times, the following example creates duplicate relationships but no duplicate nodes.

Example
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Create new relationships
    .mode(SaveMode.Append)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
Example
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Create new relationships
    .mode("Append")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
Equivalent Cypher query
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
CREATE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Due to the concurrency of Spark jobs, when using the Overwrite mode you should use the property uniqueness constraint to guarantee node uniqueness.

Overwrite nodes and relationships

If you need to upsert both nodes and relationships, you must use the Overwrite mode for all the relationship.source.node.keys, relationship.target.node.keys, and mode options.

If run multiple times, the following example does not create any duplicate nodes or relationships.

Example
val relDF = Seq(
    ("John", "Doe", 1, "Product 1", 200, "ABC100"),
    ("Jane", "Doe", 2, "Product 2", 100, "ABC200")
).toDF("name", "surname", "customerID", "product", "quantity", "order")

relDF.write
    // Overwrite relationships
    .mode(SaveMode.Overwrite)
    .format("org.neo4j.spark.DataSource")
    // Assign a type to the relationships
    .option("relationship", "BOUGHT")
    // Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    // Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    // Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties",
        "name,surname,customerID:id"
    )
    // Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    // Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    // Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    // Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    // Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
Example
relDF = spark.createDataFrame(
    [
        {
            "name": "John",
            "surname": "Doe",
            "customerID": 1,
            "product": "Product 1",
            "quantity": 200,
            "order": "ABC100",
        },
        {
            "name": "Jane",
            "surname": "Doe",
            "customerID": 2,
            "product": "Product 2",
            "quantity": 100,
            "order": "ABC200",
        },
    ]
)

(
    relDF.write
    # Overwrite relationships
    .mode("Overwrite")
    .format("org.neo4j.spark.DataSource")
    # Assign a type to the relationships
    .option("relationship", "BOUGHT")
    # Use `keys` strategy
    .option("relationship.save.strategy", "keys")
    # Overwrite source nodes and assign them a label
    .option("relationship.source.save.mode", "Overwrite")
    .option("relationship.source.labels", ":Customer")
    # Map the DataFrame columns to node properties
    .option(
        "relationship.source.node.properties", "name,surname,customerID:id"
    )
    # Node keys are mandatory for overwrite save mode
    .option("relationship.source.node.keys", "customerID:id")
    # Overwrite target nodes and assign them a label
    .option("relationship.target.save.mode", "Overwrite")
    .option("relationship.target.labels", ":Product")
    # Map the DataFrame columns to node properties
    .option("relationship.target.node.properties", "product:name")
    # Node keys are mandatory for overwrite save mode
    .option("relationship.target.node.keys", "product:name")
    # Map the DataFrame columns to relationship properties
    .option("relationship.properties", "quantity,order")
    .save()
)
Equivalent Cypher query
UNWIND $events AS event
MERGE (source:Customer {id: event.source.keys.id})
SET source += event.source.properties
MERGE (target:Product {name: event.target.keys.name})
SET target += event.target.properties
MERGE (source)-[rel:BOUGHT]->(target)
SET rel += event.rel.properties

Due to the concurrency of Spark jobs, when using the Overwrite mode you should use the property uniqueness constraint to guarantee node uniqueness.