Read with a Cypher query

All the examples in this page assume that the SparkSession has been initialized with the appropriate authentication options. See the Quickstart examples for more details.

If you need more flexibility, you can use the query option to run a custom Cypher® query.

The query must include a RETURN clause. With stored procedures, you need to include a YIELD clause before RETURN.

query option example
val query = """
  MATCH (n:Person)
  WITH n
  LIMIT 2
  RETURN id(n) AS id, n.name AS name
"""

spark.read.format("org.neo4j.spark.DataSource")
  .option("query", query)
  .load()
  .show()
Table 1. Result
id name

0

John Doe

1

Jane Doe

DataFrame columns

The structure of the result DataFrame is defined by the query itself. See the Schema inference page for more details.

This option is best suited when you return individual properties rather than graph entities (nodes, relationships, paths). Returning a graph entity, anyway, does not cause an error.

Recommended Not recommended
MATCH (p:Person)
RETURN id(p) AS id, p.name AS name
MATCH (p:Person)
RETURN p

If you need to return a graph entity, use the labels or relationship read options instead.

Limit the results

The connector uses SKIP and LIMIT internally to support partitioned reads; as a result, SKIP and LIMIT clauses are not allowed in a custom Cypher query. Attempts to do this will cause execution errors.

A possible workaround is to use SKIP and LIMIT before the RETURN clause. For example, the following query fails:

MATCH (p:Person)
RETURN p.name AS name
ORDER BY name
LIMIT 10

The query can be rewritten with LIMIT before the RETURN to complete successfully:

MATCH (p:Person)
WITH p.name AS name
ORDER BY name
LIMIT 10
RETURN p.name

When you rewrite a query, make sure the new query is equivalent to your original query so that the result is the same.

You can also use the query.count option instead of rewriting your query. See the Spark optimizations page for more details.

The script option

The script option allows to run a sequence of Cypher queries before executing the read operation.

The result of the script can be used in a subsequent query, for example to inject query parameters.

Do not use this option to inject large amounts of data in a Cypher query. See the Performance considerations section below for the recommended alternative.

script and query example
val script = "RETURN 'foo' AS val"
val query = """
  UNWIND range(1, 2) as id
  RETURN id AS val, scriptResult[0].val AS script
"""

spark.read.format("org.neo4j.spark.DataSource")
  .option("script", script)
  .option("query", query)
  .load()
  .show()
Table 2. Result
val script

1

foo

2

foo

Performance considerations

The script option is not meant to inject a massive volume of data. Doing so would result in inefficient queries.

The recommended process is to create a dedicated dataframe containing the new data, then join it with another dataframe performing the actual reads against Neo4j. Spark supports multiple pushdown optimizations including filter pushdown, which means the resulting read query will be efficient.

// Suppose there are many nodes that match,
// resulting in a large list of values
val script = "MATCH (f:Filter) RETURN collect(f.id) AS ids"

// Sending a large list to the server would add a lot of memory pressure
val query = """
  MATCH (n:Comment)
  WHERE n.filterId IN scriptResult[0].ids
  RETURN n.id AS id
"""
spark.read.format("org.neo4j.spark.DataSource")
  .option("script", script)
  .option("query", query)
  .load()
  .show()
import org.apache.spark.sql.functions.col

val filterDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("labels", "Filter")
  .load()

val commentDf = spark.read.format("org.neo4j.spark.DataSource")
  .option("labels", "Comment")
  .load()

// Pushdown optimizations filter data at the source (Neo4j) to minimize data transfer
val resultDf = commentDf.join(filterDf, col("filterId") === filterDf("id"))