Write with a Cypher query

All the examples in this page assume that the SparkSession has been initialized with the appropriate authentication options. See the Quickstart examples for more details.

If you need more flexibility, you can use the query option to run a custom Cypher® query with CREATE and MERGE clauses.

The connector persists the entire DataFrame using the provided query. The nodes are sent to Neo4j in batches of rows defined by the batch.size property, and the query is wrapped in an UNWIND $events AS event statement.

Example
case class Person(name: String, surname: String, age: Int)

// Create an example DataFrame
val df = Seq(
    Person("John", "Doe", 42),
    Person("Jane", "Doe", 40)
).toDF()

// Define the Cypher query to use in the write
val query = "CREATE (n:Person {fullName: event.name + ' ' + event.surname})"

df.write
  .format("org.neo4j.spark.DataSource")
  .option("query", query)
  .mode(SaveMode.Overwrite)
  .save()
Equivalent Cypher query
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname})

events is one batch created from the dataset.

Considerations

  • You must always specify the save mode.

  • You can use the events list in WITH statements as well. For example, you can replace the query in the previous example with the following:

    WITH event.name + ' ' + toUpper(event.surname) AS fullName
    CREATE (n:Person {fullName: fullName})
  • Subqueries that reference the events list in CALLs are supported:

    CALL {
      WITH event
      RETURN event.name + ' ' + toUpper(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • If APOC is installed, APOC procedures and functions can be used:

    CALL {
      WITH event
      RETURN event.name + ' ' + apoc.text.toUpperCase(event.surname) AS fullName
    }
    CREATE (n:Person {fullName: fullName})
  • Although a RETURN clause is not forbidden, adding one does not have any effect on the query result.

script option

The script option allows to run a sequence of Cypher queries before executing the read operation.

The result of the script can be used in a subsequent query, for example to inject query parameters.

Example
val df = Seq(Person("John", "Doe", 42)).toDF()

df.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.Append)
  .option("query", "CREATE (n:Person{fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})")
  .option("script",
    """CREATE INDEX person_surname FOR (p:Person) ON (p.surname);
      |CREATE CONSTRAINT product_name_sku FOR (p:Product)
      | REQUIRE (p.name, p.sku)
      | IS NODE KEY;
      |RETURN 36 AS age;
      |""".stripMargin)
  .save()
Equivalent Cypher query
WITH $scriptResult AS scriptResult
UNWIND $events AS event
CREATE (n:Person {fullName: event.name + ' ' + event.surname, age: scriptResult[0].age})

scriptResult is the result of the last Cypher query in the script, which in this case is RETURN 36 AS age.

events is one batch created from the dataset.