Write with a Cypher query
All the examples in this page assume that the |
If you need more flexibility, you can use the query
option to run a custom Cypher® query with CREATE
and MERGE
clauses.
The connector persists the entire DataFrame using the provided query.
The nodes are sent to Neo4j in batches of rows defined by the batch.size
property, and the query is wrapped in an UNWIND $events AS event
statement.
case class Person(name: String, surname: String, age: Int)
// Create an example DataFrame
val df = Seq(
Person("John", "Doe", 42),
Person("Jane", "Doe", 40)
).toDF()
// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"
df.write
.format("org.neo4j.spark.DataSource")
.option("query", query)
.mode(SaveMode.Overwrite)
.save()
Equivalent Cypher query
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})
events
is one batch created from the dataset.
Considerations
-
You must always specify the save mode.
-
You can use the
events
list inWITH
statements as well. For example, you can replace the query in the previous example with the following:WITH event.name + ' ' + toUpper(event.surname) AS fullName CREATE (n:Person {fullName: fullName})
-
Subqueries that reference the
events
list inCALL
s are supported:CALL { WITH event RETURN event.name + ' ' + toUpper(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
If APOC is installed, APOC procedures and functions can be used:
CALL { WITH event RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName } CREATE (n:Person {fullName: fullName})
-
Although a
RETURN
clause is not forbidden, adding one does not have any effect on the query result.
script
option
The script
option allows to run a sequence of Cypher queries before executing the read operation.
The result of the script can be used in a subsequent query, for example to inject query parameters.
val df = Seq(Person("John", "Doe", 42)).toDF()
df.write
.format("org.neo4j.spark.DataSource")
.mode(SaveMode.Append)
.option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
.option("script",
"""CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
|CREATE CONSTRAINT product_name_sku FOR (p:Product)
| REQUIRE (p.name, p.sku)
| IS NODE KEY;
|RETURN 36 AS age;
|""".stripMargin)
.save()
Equivalent Cypher query
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})
scriptResult
is the result of the last Cypher query in the script
, which in this case is RETURN 36 AS age
.
events
is one batch created from the dataset.