Spark Structured Streaming

Let’s see how you can leverage the Spark Structured Streaming API with the Neo4j Connector for Apache Spark.

Although the connector is the same, Spark streaming works differently from Spark batching. Here are some links to learn more about the Spark streaming approach:

Neo4j streaming options

Table 1. List of available streaming options
Setting name Description Default value Required

Sink

checkpointLocation

Checkpoint file location (see more).

(none)

Yes

Source

streaming.property.name

The timestamp property used for batch reading. Read more here.

(none)

Yes

streaming.from

This option is used to tell the connector from where to send data to the stream. Read more here.

NOW starts streaming from the moment the stream starts.

ALL sends all the data in the database to the stream before reading new data.

NOW

Yes

streaming.query.offset

A valid Cypher® READ_ONLY query that returns a long value.

(i.e., MATCH (p:MyLabel) RETURN MAX(p.timestamp))

This is used to get the last timestamp in the database for a given query. More on this here.

(none)

Yes, only for query mode

Sink

Writing a stream to a Neo4j instance is pretty easy and can be done using any of the three writing strategies.

The same schema concepts also apply here. If you start a streaming read with an empty result set, you need to specify the schema using the user defined schema, or the batch read fails.
Code example that reads from a Kafka topic and writes to Neo4j.
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("kafka") \
    .option("subscribe", "PeopleTopic") \
    .load()

query = df.writeStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("save.mode", "ErrorIfExists") \
    .option("checkpointLocation", "/tmp/checkpoint/myCheckPoint") \
    .option("labels", "Person") \
    .option("node.keys", "value") \
    .start()

As said, you can use any writing strategy: Node, Relationship, or Query.

The only difference is that you must set the checkpointLocation and save.mode options.

With save.mode, you can control how the data are written. More information here.

Checkpoint

The checkpoint is a file that allows Spark Structured Streaming to recover from failures. Spark updates this file with the progress information and recovers from that point in case of failure or query restart. This checkpoint location has to be a path in an HDFS compatible file system.

Since the topic is wide and complex, you can read the official Spark documentation.

Source

Reading a stream from Neo4j requires some additional configuration.

Let’s see the code first and then analyze all the options.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("labels", "Person") \
    .option("streaming.property.name", "timestamp") \
    .option("streaming.from", "NOW") \
    .load()
# Memory streaming format writes the streamed data to a SparkSQL table
# NOTE: make sure this code is executed in another block,
# or at least seconds later the previous one to allow the full initialization of the stream.
# The risk is that the query will return an empty result.
query = stream.writeStream \
    .format("memory") \
    .queryName("testReadStream") \
    .start()

spark \
  .sql("select * from testReadStream order by timestamp") \
  .show()

Streaming property name

For the streaming to work, you need each record to have a property of type timestamp to leverage when reading new data from Neo4j to be sent to the stream.

Behind the scenes the connector is building a query with a WHERE clause that checks for the records that have this [timestampProperty] between a range of timestamps computed from checkpoint data and latest offset available in the database.

So it’s required that each node has the timestamp property of a Neo4j type (Long), and it must be not null.

A property of type string like "2021-08-11" does not work. It needs to be a Long of Neo4j type.

The property name can be anything, just remember to set the streaming.property.name accordingly.

Streaming from option

You can decide to stream all the data in the database, or just the new ones. To achieve this you can set the streaming.from option to one of these two values:

  • NOW: that starts reading from the current timestamp. This is the default value for the streaming.from option.

  • ALL: that reads all the data in the database first, and then just the new ones.

Reading mode

As for Sink mode, you can use any of the reading strategies: Node, Relationship, or Query.

Notes on query mode

Handling the streaming.from and streaming.property.name is a bit less automatic when using the query mode.

Let’s look at the example and then explain what’s happening.

from pyspark.sql import SparkSession

spark = SparkSession \
    .builder() \
    .master('local[*]') \
    .getOrCreate()

df = spark.readStream \
    .format("org.neo4j.spark.DataSource") \
    .option("url", "neo4j://localhost:7687") \
    .option("streaming.from", "NOW") \
    .option("streaming.property.name", "timestamp") \
    .option("query", \
        """MATCH (p:Test3_Person)
           WHERE p.timestamp > $stream.from AND p.timestamp <= $stream.to
           RETURN p.age AS age, p.timestamp AS timestamp""") \
    .option("streaming.query.offset", \
        "MATCH (p:Test3_Person) RETURN max(p.timestamp)") \
    .load()

As you can see, the streaming.from and streaming.property.name must be specified anyway, but you need to take care of the WHERE clause by yourself. You are provided with two parameters to the query, $stream.to and $stream.from, which describes the range of changes we need to read.

Although the query parameter $stream.offset, which is now deprecated, is still supported in the query, all queries will be rewritten to also filter against $stream.from and $stream.to parameters. In order for database to query changes efficiently, update your queries to use these new query parameters.

In this case, the streaming.query.offset option is mandatory; this option is used by the connector to read the last timestamp in the database, and the result is used to compute the ranges to be selected.

Additional examples

You can find streaming code bits and many other examples on this repository that contains Zeppelin notebooks.

A complete example using Spark, Neo4j and AWS Kinesis is described in the article From Kinesis via Spark to Neo4j.