Goals There are various ways to beneficially use Neo4j with Apache Spark, here we will list some approaches and point to solutions that enable you to leverage your Spark infrastructure with Neo4j. Prerequisites You should have a sound understanding of… Learn More →

Goals
There are various ways to beneficially use Neo4j with Apache Spark, here we will list some approaches and point to solutions that enable you to leverage your Spark infrastructure with Neo4j.
Prerequisites
You should have a sound understanding of both Apache Spark and Neo4j, each data model, data processing paradigm and APIs to leverage them effectively together.
Intermediate


General Observations

Apache Spark is a clustered, in-memory data processing solution that scales processing of large datasets easily across many machines. It also comes with GraphX and GraphFrames two frameworks for running graph compute operations on your data.

You can integrate with Spark in a variety of ways. Either to pre-process (aggregate, filter, convert) your raw data to be imported into Neo4j.

Spark can also serve as external Graph Compute solution, where you

  1. export data of selected subgraphs from Neo4j to Spark,
  2. compute the analytic aspects, and
  3. write the results back to Neo4j
  4. to be used in your Neo4j operations and Cypher queries.
Neo4j itself is capable of running graph processing on medium to large graphs quickly. For instance the graph-processing project demonstrates that we can run PageRank (5 iterations) on the dbpedia dataset (10M nodes, 125M relationships) in 20 seconds as a Neo4j server extension or user defined procedure. Spark might be better suited for larger datasets or more intensive compute operations.

Neo4j-Spark-Connector

The Neo4j Spark Connector uses the binary Bolt protocol to transfer data from and to a Neo4j server.

It offers Spark-2.0 APIs for RDD, DataFrame, GraphX and GraphFrames, so you’re free to chose how you want to use and process your Neo4j graph data in Apache Spark.

Configure Neo4j-URL, -user and -password via spark.neo4j.bolt.* Spark Config options.

The general usage is:

  1. create org.neo4j.spark.Neo4j(sc)
  2. set cypher(query,[params]),nodes(query,[params]),rels(query,[params]) as direct query, or
    pattern("Label1",Seq("REL"),"Label2") or pattern( ("Label1","prop1"),("REL","prop"),("Label2","prop2") )
  3. optionally define partitions(n), batch(size), rows(count) for parallelism
  4. choose which datatype to return
    • loadRowRdd, loadNodeRdds, loadRelRdd, loadRdd[T]
    • loadDataFrame,loadDataFrame(schema)
    • loadGraph[VD,ED]
    • loadGraphFrame[VD,ED]

Here is a basic example for loading a RDD[Row].

org.neo4j.spark.Neo4j(sc).cypher("MATCH (n:Person) RETURN n.name").partitions(5).batch(10000).loadRowRdd
$SPARK_HOME/bin/spark-shell --conf spark.neo4j.bolt.password=<password> \
--packages neo4j-contrib:neo4j-spark-connector:2.0.0-M2,graphframes:graphframes:0.2.0-spark2.0-s_2.11
import org.neo4j.spark._

val neo = Neo4j(sc)

val rdd = neo.cypher("MATCH (n:Person) RETURN id(n) as id ").loadRowRdd
rdd.count

// inferred schema
rdd.first.schema.fieldNames
//   => ["id"]
rdd.first.schema("id")
//   => StructField(id,LongType,true)

neo.cypher("MATCH (n:Person) RETURN id(n)").loadRdd[Long].mean
//   => res30: Double = 236696.5

neo.cypher("MATCH (n:Person) WHERE n.id <= {maxId} RETURN n.id").param("maxId", 10).loadRowRdd.count
//   => res34: Long = 10

Similar operations are available for DataFrames and GraphX. The GraphX integration also allows to write data back to Neo4j with a save operation.

To use GraphFrames you have to declare it as package. Then you can load a GraphFrame with graph data from Neo4j and run graph algorithms or pattern matchin on it (the latter will be slower than in Neo4j).

import org.neo4j.spark._

val neo = Neo4j(sc)

import org.graphframes._

val graphFrame = neo.pattern(("Person","id"),("KNOWS",null), ("Person","id")).partitions(3).rows(1000).loadGraphFrame

graphFrame.vertices.count
//     => 100
graphFrame.edges.count
//     => 1000

val pageRankFrame = graphFrame.pageRank.maxIter(5).run()
val ranked = pageRankFrame.vertices
ranked.printSchema()

val top3 = ranked.orderBy(ranked.col("pagerank").desc).take(3)
//     => top3: Array[org.apache.spark.sql.Row]
//     => Array([236716,70,0.62285...], [236653,7,0.62285...], [236658,12,0.62285])

More examples and details can be found in the docs of the GitHub repository.

Neo4j-Mazerunner

An interest in analytical graph processing led Kenny Bastani to work on an integration solution. It allows to export dedicated datasets, e.g. node or relationship-lists to Spark.

It supports these algorithms:

  • PageRank
  • Closeness Centrality
  • Betweenness Centrality
  • Triangle Counting
  • Connected Components
  • Strongly Connected Components

After running graph processing algorithms the results are written back concurrently and transactionally to Neo4j.

One focus of this approach is on data safety, that’s why it uses a persistent queue (RabbitMQ) to communicate data between Neo4j and Spark.

The infrastructure is set up using Docker containers, there are dedicated containers for Spark, RabbitMQ, HDFS and Neo4j with the Mazerunner Extension.

More details can be found on the project’s GitHub page.

Spark for Data Preprocessing

One example of pre-processing raw data (Chicago Crime dataset) into a format that’s well suited for import into Neo4j, was demonstrated by Mark Needham. He combined a number of functions into a Spark-job that takes the existing data, cleans and aggregates it and outputs fragments which are recombined later to larger files.

The approach is detailed in his blog post: Spark: Generating CSV Files to import into Neo4j.