Architecture guidance

Overview

No matter which direction you’re going, whether sending data from Spark to Neo4j or pulling data out of Neo4j, the two systems are based on completely different organizing data models:

  • Spark is oriented around tabular DataFrames.

  • Neo4j is oriented around graphs (nodes, relationships, paths).

The use of this connector can be thought of as a "Transform" step and a "Load" step, regardless of which direction the data is moving. The "Transform" step is concerned with how to move data between graphs and tables; and the "Load" step is concerned with moving masses of data.

Tables for labels

To move the data back and forth between Neo4j and Spark an approach called Tables for Labels is used. Consider the following path:

(:Customer)-[:BOUGHT]->(:Product)

You can decompose it into:

  • two nodes: Customer and Product.

  • one relationship between them: BOUGHT.

So in total, you have three graph entities, and each one is managed as a simple table.

So given the following Cypher® query:

CREATE (c1:Customer{id:1, username: 'foo', city: 'Venezia'}),
(p1:Product{id: 100, name: 'My Awesome Product'}),
(c2:Customer{id:2, username: 'bar', city: 'Pescara'}),
(p2:Product{id: 101, name: 'My Awesome Product 2'}),
(c1)-[:BOUGHT{quantity: 10}]->(p1),
(c2)-[:BOUGHT{quantity: 13}]->(p1),
(c2)-[:BOUGHT{quantity: 4}]->(p2);

The graph structure is decomposed into three tables.

Table 1. Customer table
id username city

1

foo

Venezia

2

bar

Pescara

Table 2. Product table
id name

100

My Awesome Product

101

My Awesome Product 2

Table 3. BOUGHT table
source_id target_id quantity

1

100

10

2

100

13

2

101

4

To have this decomposition you can use two strategies that allow to extract metadata from your graph:

Graph transforms

General principles

  • Wherever possible, perform data quality fixes before loading into Neo4j; this includes dropping missing records, changing data types of properties, and so on.

  • Since Spark excels at parallel computation, any non-graph heavy computation should be done in the Spark layer, rather than in Cypher on Neo4j.

  • Size your Neo4j instance appropriately before using aggressive parallelism or large batch sizes.

  • Experiment with larger batch sizes (ensuring that batches stay within Neo4j configured heap memory). In general, the larger the batches, the faster the overall throughput to Neo4j.

Converting data from DataFrames to graphs

When taking any complex set of DataFrames and preparing it for load into Neo4j, you have two options:

  • Normalized loading

  • Cypher destructuring

This section describes both and provides information on the benefits and potential drawbacks from a performance and complexity perspective.

Where possible, use the normalized loading approach for best performance and maintainability.

Normalized loading

Suppose you want to load a single DataFrame called purchases into Neo4j with the following content:

product_id,product,customer_id,customer,quantity
1,Socks,10,David,2
2,Pants,11,Andrea,1

This data represents as simple (:Customer)-[:BOUGHT]→(:Product) graph model.

The normalized loading approach requires that you create a number of different DatafFrames: one for each node label and relationship type in your desired graph. For example, in this case, you might create three DataFrames:

  • val products = spark.sql("SELECT product_id, product FROM purchases")

  • val customers = spark.sql("SELECT customer_id, customer FROM purchases")

  • val bought = spark.sql("SELECT product_id, customer_id, quantity FROM purchases")

Once these simple DataFrames represent a normalized view of 'tables for labels' (that is, one DataFrame/table per node label or relationship type) - then the existing utilities provided by the connector for writing nodes and relationships can be used with no additional Cypher needed. Additionally — if these frames are made unique by identifier, then the data is already prepared for maximum parallelism. (See parallelism notes in sections below.)

Advantages
  • Normalized loading approach shifts most of the data transform work to Spark itself (in terms of splitting, uniquing, partitioning the data). Any data transform/cleanup work should be done in Spark if possible.

  • This approach makes for easy to follow code; eventually, the write of each DataFrame to Neo4j is quite simple and requires mostly just a label and a key.

  • This allows for parallelism (discussed in sections below).

Disadvantages
  • You need to do more SQL work before data is loaded into Neo4j.

  • This approach requires identifying graph schema before beginning, as opposed to loading data into Neo4j and using Cypher to manipulate it afterwards.

Cypher destructuring

Cypher destructuring is the process of using a single Cypher statement to process a complex record into a finished graph pattern. Let’s look again at the data example:

product_id,product,customer_id,customer,quantity
1,Socks,10,David,2
2,Pants,11,Andrea,1

To store this in Neo4j, you might use a Cypher query like this:

MERGE (p:Product { id: event.product_id })
  ON CREATE SET p.name = event.product
WITH p
MERGE (c:Customer { id: event.customer_id })
  ON CREATE SET c.name = event.customer
MERGE (c)-[:BOUGHT { quantity: event.quantity }]->(p);

In this case, the entire job can be done by a single Cypher statement. As DataFrames get complex, the Cypher statements too can get quite complicated.

Advantages
  • Extremely flexible: you can do anything that Cypher provides for.

  • If you are a Neo4j expert, it is easy for you to get started with.

Disadvantages
  • Cypher destructuring approach tends to shift transform work to Neo4j, which is not a good idea as it does not have the same infrastructure to support that as Spark.

  • This approach tends to create heavy locking behavior, which violates parallelism and possibly performance.

  • It encourages you to embed schema information in a Cypher query rather than use Spark utilities.

Converting data from graphs back to DataFrames

In general, always have an explicit RETURN statement and destructure your results.

A common pattern is to write a complex Cypher statement, perhaps one that traverses many relationships, to return a dataset to Spark. Since Spark does not understand graph primitives, there are not many useful ways to represent a raw node, relationship, or a path in Spark. As a result, it is highly recommended not to return those types from Cypher to Spark, focusing instead on concrete property values and function results which you can represent as simple types in Spark.

For example, the following Cypher query results in an awkward DataFrame that would be hard to manipulate:

MATCH path=(p:Person { name: "Andrea" })-[r:KNOWS*]->(o:Person)
RETURN path;

A better Cypher query which results in a cleaner DataFrame is as follows:

MATCH path=(p:Person { name: "Andrea" })-[r:KNOWS*]->(o:Person)
RETURN length(path) as pathLength, p.name as p1Name, o.name as p2Name

Improving performance

To get the best possible performance reading from (and particularly writing to) Neo4j, make sure you’ve gone through this checklist:

  1. Tune your batch size.

  2. Tune your Neo4j memory configuration.

  3. Have the correct indexes.

  4. Tune your parallelism.

Each of the following sections describes these in detail.

Tune your batch size

Writing data to Neo4j happens transactionally in batches; if you want to write 1 million nodes, you might break that into 40 batches of 25,000. The batch size of the connector is controlled by the batch.size option and is set to a fairly low, conservative level. This is likely too low for many applications and can be improved with better knowledge of your data.

Batch size trade-off is as follows:

  • The bigger the batch size, the better the overall ingest performance, because it means fewer transactions, and less overall transactional overhead.

  • When batch sizes become too large, so that Neo4j’s heap memory cannot accommodate them, it can cause out of memory errors on the server and cause failures.

Best write throughput comes when you use the largest batch size you can, while staying in the range of memory available on the server.

It’s impossible to pick a single batch size that works for everyone, because how much memory your transactions take up depends on the number of properties & relationships, and other factors. A good general aggressive value to try is around 20,000 - but you can increase this number if your data is small, or if you have a lot of memory on the server. Lower the number if it’s a small database server, or the data your pushing has many large properties.

Tune your Neo4j memory configuration

In the Neo4j Operations Manual, important advice is given on how to size the heap and page cache of the server. What’s important for Spark is this:

  • Heap affects how big transactions can get. The bigger the heap, the larger the batch size you can use.

  • Page cache affects how much of your database stays resident in RAM at any given time. Page caches which are much smaller than your database cause performance to suffer.

Have the correct indexes

At the Neo4j Cypher level, it’s very common to use the Spark connector in a way that generates MERGE queries. In Neo4j, this looks up a node by some 'key' and then creates it only if it does not already exist.

It is strongly recommended to assert indexes or constraints on any graph property that you use as part of node.keys, relationship.source.node.keys, relationship.target.node.keys or other similar key options.

A common source of poor performance is to write Spark code that generates MERGE Cypher, or otherwise tries to look data up in Neo4j without the appropriate database indexes. In this case, the Neo4j server ends up looking through much more data than necessary to satisfy the query, and performance suffers.

Tune your parallelism

Spark is fundamentally about partitioning and parallelism; the go-to technique is to split a batch of data into partitions for each machine to work on in parallel. In Neo4j, parallelism works very differently, which we will describe in this chapter.

Write parallelism in Neo4j

For most writes to Neo4j, it is strongly recommended to repartition your DataFrame to one partition only.

When writing nodes and relationships in Neo4j:

  • writing a relationship locks both nodes.

  • writing a node locks the node.

Additionally, in the Neo4j Causal Cluster model, only the cluster leader may write data. Since writes scale vertically in Neo4j, the practical parallelism is limited to the number of cores on the leader.

The reason a single partition for writes is recommended is that it eliminates lock contention between writes. Suppose one partition is writing:

(:Person { name: "Michael" })-[:KNOWS]->(:Person { name: "Andrea" })

While another partition is writing:

(:Person { name: "Andrea" })-[:KNOWS]->(:Person { name: "Davide" })

The relationship write locks the "Andrea" node - and these writes cannot continue in parallel in any case. As a result, you may not gain performance by parallelizing more, if threads have to wait for each other’s locks. In extreme cases with too much parallelism, Neo4j may reject the writes with lock contention errors.

Dataset partitioning

You can use as many partitions as there are cores in the Neo4j server, if you have properly partitioned your data to avoid Neo4j locks.

There is an exception to the "one partition" rule above: if your data writes are partitioned ahead of time to avoid locks, you can generally do as many write threads to Neo4j as there are cores in the server. Suppose we want to write a long list of :Person nodes, and we know they are distinct by the person id. We might stream those into Neo4j in four different partitions, as there will not be any lock contention.

Schema considerations

Neo4j does not have a fixed schema; individual properties can contain multiple differently typed values. Spark on the other hand tends to expect a fixed schema. For this reason, the connector contains a number of schema inference techniques that help ease this mapping. Paying close attention to how these features work can explain different scenarios.

The two core techniques are:

APOC

If your Neo4j installation has APOC installed, this approach is used by default. The stored procedures within APOC allow inspection of the metadata in your graph and provide information such as the type of relationship properties and the universe of possible properties attached to a given node label.

You may try these calls by yourself on your Neo4j database if you wish, simply execute:

CALL apoc.meta.nodeTypeProperties();
CALL apoc.meta.relTypeProperties();

Inspect the results. These results are how the Neo4j Connector for Apache Spark represents the metadata of nodes and relationships read into DataFrames.

This approach uses a configurable sampling technique that looks through many (but not all) instances in the database to build a profile of the valid values that exist within properties. If the schema that is produced is not what is expected, take care to inspect the underlying data to ensure it has a consistent property set across all nodes of a label, or investigate tuning the sampling approach.

Tune parameters

You can tune the configuration parameters of the two APOC procedures via the option method as it follows:

ss.read
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", "Product")
      .option("apoc.meta.nodeTypeProperties", """{"sample": 10}""")
      .load

or

ss.read
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Product")
      .option("relationship.target.labels", "Person")
      .option("apoc.meta.relTypeProperties", """{"sample": 10}""")
      .load

For both procedures you can pass all the supported parameters except for:

  • includeLabels for apoc.meta.nodeTypeProperties, because you use the labels defined in the labels option.

  • includeRels for apoc.meta.relTypeProperties, because you use the one defined in the relationship option.

Fine tuning

As these two procedures sample the graph to extract the metadata necessary for building the Tables for labels, in most real-world scenarios, it is crucial to tune the sampling parameters properly because using of them can be expensive and impact the performance of your extraction job.

Automatic sampling

In some installations and environments, the key APOC calls above are not available. In these cases, the connector automatically samples the first few records and infers the correct data type from the examples that it sees.

Automatic sampling may be error prone and may produce incorrect results, particularly in cases where a single Neo4j property exists with several different data types. Consistent typing of properties is strongly recommended.