Spark Structured Streaming
Let’s see how you can leverage the Spark Structured Streaming API with the Neo4j Connector for Apache Spark.
Although the connector is the same, Spark streaming works differently from Spark batching. Here are some links to learn more about the Spark streaming approach:
-
Structured Streaming Programming Guide from the Spark website.
-
Diving into Apache Spark Streaming’s Execution Model by Databricks.
Neo4j streaming options
Setting name | Description | Default value | Required |
---|---|---|---|
Sink |
|||
|
Checkpoint file location (see more). |
(none) |
Yes |
Source |
|||
|
The timestamp property used for batch reading. Read more here. |
(none) |
Yes |
|
This option is used to tell the connector from where to send data to the stream. Read more here.
|
|
Yes |
|
A valid Cypher® READ_ONLY query that returns a long value. (i.e., This is used to get the last timestamp in the database for a given query. More on this here. |
(none) |
Yes, only for |
Sink
Writing a stream to a Neo4j instance is pretty easy and can be done using any of the three writing strategies.
The same schema concepts also apply here. If you start a streaming read with an empty result set, you need to specify the schema using the user defined schema, or the batch read fails. |
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("kafka") \
.option("subscribe", "PeopleTopic") \
.load()
query = df.writeStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://localhost:7687") \
.option("save.mode", "ErrorIfExists") \
.option("checkpointLocation", "/tmp/checkpoint/myCheckPoint") \
.option("labels", "Person") \
.option("node.keys", "value") \
.start()
As said, you can use any writing strategy: Node, Relationship, or Query.
The only difference is that you must set the checkpointLocation
and save.mode
options.
With save.mode
, you can control how the data are written. More information here.
Checkpoint
The checkpoint is a file that allows Spark Structured Streaming to recover from failures. Spark updates this file with the progress information and recovers from that point in case of failure or query restart. This checkpoint location has to be a path in an HDFS compatible file system.
Since the topic is wide and complex, you can read the official Spark documentation.
Source
Reading a stream from Neo4j requires some additional configuration.
Let’s see the code first and then analyze all the options.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://localhost:7687") \
.option("labels", "Person") \
.option("streaming.property.name", "timestamp") \
.option("streaming.from", "NOW") \
.load()
# Memory streaming format writes the streamed data to a SparkSQL table
# NOTE: make sure this code is executed in another block,
# or at least seconds later the previous one to allow the full initialization of the stream.
# The risk is that the query will return an empty result.
query = stream.writeStream \
.format("memory") \
.queryName("testReadStream") \
.start()
spark \
.sql("select * from testReadStream order by timestamp") \
.show()
Streaming property name
For the streaming to work, you need each record to have a property of type timestamp
to leverage when reading new data from Neo4j to be sent to the stream.
Behind the scenes the connector is building a query with a WHERE
clause that checks for the
records that have this [timestampProperty]
between a range of timestamps computed from checkpoint data
and latest offset available in the database.
So it’s required that each node has the timestamp property of a Neo4j type (Long),
and it must be not null
.
A property of type string like "2021-08-11" does not work. It needs to be a Long of Neo4j type. |
The property name can be anything, just remember to set the streaming.property.name
accordingly.
Streaming from option
You can decide to stream all the data in the database, or just the new ones.
To achieve this you can set the streaming.from
option to one of these two values:
-
NOW
: that starts reading from the current timestamp. This is the default value for thestreaming.from
option. -
ALL
: that reads all the data in the database first, and then just the new ones.
Reading mode
As for Sink mode, you can use any of the reading strategies: Node, Relationship, or Query.
Notes on query
mode
Handling the streaming.from
and streaming.property.name
is a bit less automatic when using the query mode.
Let’s look at the example and then explain what’s happening.
from pyspark.sql import SparkSession
spark = SparkSession \
.builder() \
.master('local[*]') \
.getOrCreate()
df = spark.readStream \
.format("org.neo4j.spark.DataSource") \
.option("url", "neo4j://localhost:7687") \
.option("streaming.from", "NOW") \
.option("streaming.property.name", "timestamp") \
.option("query", \
"""MATCH (p:Test3_Person)
WHERE p.timestamp > $stream.from AND p.timestamp <= $stream.to
RETURN p.age AS age, p.timestamp AS timestamp""") \
.option("streaming.query.offset", \
"MATCH (p:Test3_Person) RETURN max(p.timestamp)") \
.load()
As you can see, the streaming.from
and streaming.property.name
must be specified anyway, but you need to take care of the WHERE
clause by yourself.
You are provided with two parameters to the query, $stream.to
and $stream.from
, which describes the range of changes we need to read.
Although the query parameter |
In this case, the streaming.query.offset
option is mandatory;
this option is used by the connector to read the last timestamp in the database, and the result is used to compute the ranges to be selected.
Additional examples
You can find streaming code bits and many other examples on this repository that contains Zeppelin notebooks.
A complete example using Spark, Neo4j and AWS Kinesis is described in the article From Kinesis via Spark to Neo4j.