Architecture Guidance for Implementing

This chapter describes provides tips and tricks on how to get the best performance.

Overview

No matter which direction you’re going, whether sending data from Spark to Neo4j or pulling data out of Neo4j, the two systems are based on completely different organizing data models:

  • Spark is oriented around tabular DataFrames

  • Neo4j is oriented around graphs (nodes, relationships, paths)

The use of this connector can be thought of as a "Transform" step and a "Load" step, regardless of which direction the data is moving. The "Transform" step is concerned with how to move data between graphs and tables; and the "Load" step is concerned with moving masses of data.

Tables 4 Labels

In order to move the data back and forth between Neo4j and Spark we use an approach called Tables 4 Labels, please consider the following path:

(:Customer)-[:BOUGHT]->(:Product)

We can decompose it into:

  • 2 nodes: Customer and Product

  • 1 relationship between them: BOUGHT

so in total we have 3 graph entities, and each one will be managed as a simple table.

So given the following Cypher query:

CREATE (c1:Customer{id:1, username: 'foo', city: 'Venezia'}),
(p1:Product{id: 100, name: 'My Awesome Product'}),
(c2:Customer{id:2, username: 'bar', city: 'Pescara'}),
(p2:Product{id: 101, name: 'My Awesome Product 2'}),
(c1)-[:BOUGHT{quantity: 10}]->(p1),
(c2)-[:BOUGHT{quantity: 13}]->(p1),
(c2)-[:BOUGHT{quantity: 4}]->(p2);

The graph structure will be decomposed into 3 tables:

Table 1. Customer Table
id username city

1

foo

Venezia

2

bar

Pescara

Table 2. Product Table
id name

100

My Awesome Product

101

My Awesome Product 2

Table 3. BOUGHT Table
source_id target_id quantity

1

100

10

2

100

13

2

101

4

In order to have this decomposition we use, under-the-hood, two strategies that allows to extract metadata from your graph:

Graph Transforms

When taking any complex set of dataframes and preparing it for load into Neo4j, you have basically two options: * Normalized Loading * Cypher Destructuring

This section will describe both, and provide information on pros and cons from a performance and complexity perspective.

Where possible, use the normalized loading approach for best performance & maintainability

Normalized Loading

Suppose we want to load a single dataframe called purchases into Neo4j with the following contents:

product_id,product,customer_id,customer,quantity
1,Socks,10,David,2
2,Pants,11,Andrea,1

This data represents as simple (:Customer)-[:BOUGHT]→(:Product) "graph shape".

The normalized loading approach requires that you create a number of different dataframes; one for each node type and relationship type in your desired graph. For example, in this case, we might create 3 dataframes:

  • val products = spark.sql("SELECT product_id, product FROM purchases")

  • val customers = spark.sql("SELECT customer_id, customer FROM purchases")

  • val bought = spark.sql("SELECT product_id, customer_id, quantity FROM purchases")

Once these simple data frames represent a normalized view of "tables for labels" (that is, one DataFrame/table per node label or relationship) - then the existing utilities provided by the connector for writing nodes & relationships can be used with no additional cypher needed. Additionally — if these frames are made unique by identifier, then the data is already prepared for maximum parallelism. (See paralellism notes in sections below).

Pros

  • Shifts most of the data transform work to spark itself (in terms of splitting, uniquing, partitioning the data). Any data transform/cleanup work should be done in spark if possible

  • Makes for easy to follow code; in the end, the write of each dataframe to Neo4j is quite simple, and requires mostly just a label and a key

  • Allows for parallelism (discussed in sections below)

Cons

  • More SQL work before data is loaded into Neo4j

  • Requires identifying graph schema before beginning, as opposed to loading data into Neo4j and using cypher to manipulate it afterwards

Cypher Destructuring

Cypher destructuring is the process of using a single cypher statement to process a complex record into a finished graph pattern. Let’s look again at the data example:

product_id,product,customer_id,customer,quantity
1,Socks,10,David,2
2,Pants,11,Andrea,1

To store this in Neo4j, we might use a cypher query like this:

MERGE (p:Product { id: event.product_id })
  ON CREATE SET p.name = event.product
WITH p
MERGE (c:Customer { id: event.customer_id })
  ON CREATE SET c.name = event.customer
MERGE (c)-[:BOUGHT { quantity: event.quantity }]->(p);

Notice that in this case the entire job can be done by a single cypher statement. As data frames get complex, these cypher statements too can get quite complex.

Pros

  • Extremely flexible: you can do anything that Cypher provides for

  • Easy for Neo4j pros to get started with.

Cons

  • Tends to shift transform work to Neo4j, which is not a good idea as it does not have the same infrastructure to support that as Spark.

  • Tends to create heavy locking behavior, which will harm parallelism and possibly performance

  • Encourages you to embed schema information in a cypher query rather than use spark utilities

Graph Transforms: General Principles

  • Wherever possible, perform data quality fixes prior to loading into Neo4j; this includes dropping missing records, changing datatypes of properties, and so on.

  • Because Spark excels at parallel computation, any non-graph heavy computation should be done in the spark layer, rather than in Cypher on Neo4j

  • Size your Neo4j instance appropriately before using aggressive parallelism or large batch sizes

  • Experiment with larger batch sizes (ensuring that batches stay within Neo4j’s configured heap memory). In general, the larger the batches, the faster the overall throughput to Neo4j.

Transforming from Graphs Back to DataFrames

In general, always have an explicit RETURN statement and destructure your results

A common pattern will be to write a complex cypher statement, perhaps one that traverses many relationships, to return a dataset to spark. Because spark does not understand graph primitives, there are not many useful ways that a raw node, relationship, or path can be represented in spark. As a result we recommend you do not return those types from Cypher to Spark, instead focusing on concrete property values and function results, which can be represented as simple types in spark.

For example, this query would result in an awkward dataframe that would be hard to manipulate:

MATCH path=(p:Person { name: "Andrea" })-[r:KNOWS*]->(o:Person)
RETURN path;

A better query which will result in a cleaner DataFrame is as follows:

MATCH path=(p:Person { name: "Andrea" })-[r:KNOWS*]->(o:Person)
RETURN length(path) as pathLength, p.name as p1Name, o.name as p2Name

Improving Performance

To get the best possible performance reading from (and particularly writing to) Neo4j, make sure you’ve gone through this checklist:

  1. Tune your Batch Size

  2. Tune your Neo4j Memory Configuration

  3. Have the right indexes

  4. Tune your parallelism

Each of the following sections describes these in detail.

Tune your Batch Size

Writing data to Neo4j happens transactionally in batches; if we want to write 1 million nodes, we might break that into 40 batches of 25,000. The batch size of the connector is controlled by the batch.size option and is set to a fairly low, conservative level. This is likely too low for many applications and can be improved with better knowledge of your data.

Batch size tradeoff is as follows:

  • The bigger the batch size, the better the overall ingest performance, because it means fewer transactions, and less overall transactional overhead.

  • When batch sizes become too large, so that Neo4j’s heap memory cannot accomodate them, it can cause out of memory errors on the server and cause failures.

Best write throughput comes when you use the largest batch size you can, while staying in the range of memory available on the server.

It’s impossible to pick a single batch size that works for everyone, because how much memory your transactions take up depends on the number of properties & relationships, and other factors. A good general aggressive value to try is around 20,000 - but you can increase this number if your data is small, or if you have a lot of memory on the server. Lower the number if it’s a small database server, or the data your pushing has many large properties.

Tune your Neo4j Memory Configuration

In the Neo4j Performance Tuning Guide, important advice is given on how to size the heap and page cache of the server. What’s important for Spark is this:

  • Heap affects how big transactions can get. The bigger the heap, the larger the batch size you can use.

  • Page cache affects how much of your database stays resident in RAM at any given time. Page caches which are much smaller than your database will cause performance to suffer.

Have the Right Indexes

At the Neo4j Cypher level, it’s very common to use the Spark connector in a way that generates MERGE queries. In Neo4j, this looks up a node by some "key" and then creates it only if it does not already exist.

It is strongly recommended to assert indexes or constraints on any graph property that you use as part of node.keys, relationship.source.node.keys, relationship.target.node.keys or other similar key options

A common source of poor performance is to write Spark code that generates MERGE cypher, or otherwise tries to look data up in Neo4j without the appropriate database indexes. In this case, the Neo4j server ends up looking through much more data than necessary to satisfy the query, and performance suffers.

Tune Your Parallelism

Spark is fundamentally about partitioning and paralleism; the go-to technique is to split a batch of data into partitions for each machine to work on in parallel. In Neo4j, parallelism works very differently, which we will describe in this chapter.

Write Parallelism in Neo4j

For most writes to Neo4j, it is strongly recommended to repartition your dataframe to 1 partition only

When writing nodes & relationships in Neo4j:

  • Writing a relationship locks both nodes

  • Writing a node locks the node

Additionally, in Neo4j’s causal cluster model, only the cluster leader may write data. This means that because writes scale vertically in Neo4j, the practical paralleism is limited to the number of cores on the leader.

The reason a single partition for writes is recommended is because it eliminates lock contention between writes. Suppose one partition is writing:

(:Person { name: "Michael" })-[:KNOWS]->(:Person { name: "Andrea" })

while another partition is writing:

(:Person { name: "Andrea" })-[:KNOWS]->(:Person { name: "Davide" })

The relationship write will lock the "Andrea" node - and these writes cannot continue in parallel in any case. As a result, you may not gain performance by parallelizing more, if threads have to wait for each other’s locks. In extreme cases with too much parallelism, Neo4j may reject the writes with lock contention errors.

Dataset Partitioning

You can use as many partitions as there are cores in the Neo4j server, if you have properly partitioned your data to avoid Neo4j locks

There is an exception to the "1 partition" rule above; if your data writes are partitioned ahead of time to avoid locks, you can generally do as many write threads to Neo4j as there are cores in the server. Suppose we want to write a long list of :Person nodes, and we know they are distinct by the person id. We might stream those into Neo4j in 4 different partitions, as there will not be any lock contention.

Schema Considerations

Neo4j does not have a fixed schema; individual properties can contain multiple differently-typed values. Spark on the other hand will tend to expect a fixed schema. For this reason, the connector contains a number of schema inference techniques that help ease this mapping. Paying close attention to how these features work can help explain different scenarios.

The two core techniques are:

APOC

If your Neo4j installation has APOC installed, this approach will be used by default. These stored procedures within APOC allow inspection of the metadata in your graph, and provide information such as the type of properties, and the universe of possible properties attached to a given node label.

You may try these calls yourself on your Neo4j database if you wish, simply execute:

CALL apoc.meta.nodeTypeProperties();
CALL apoc.meta.relTypeProperties();

And inspect the results. These results are how the Neo4j Connector for Apache Spark represents the metadata of nodes & relationships read into DataFrames.

This approach uses a configurable sampling technique that looks through many (but not all) instances in the database to build a profile of the valid values that exist within properties. If the schema that is produced is not what is expected, take care to inspect the underlying data to ensure it has a consistent property set across all nodes of a label, or investigate tuning the sampling approach.

Tune parameters

You can tune the configuration parameters of the two APOC procedures via the option method as it follows:

ss.read
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("labels", "Product")
      .option("apoc.meta.nodeTypeProperties", """{"sample": 10}""")
      .load

or

ss.read
      .format(classOf[DataSource].getName)
      .option("url", SparkConnectorScalaSuiteIT.server.getBoltUrl)
      .option("relationship", "BOUGHT")
      .option("relationship.source.labels", "Product")
      .option("relationship.target.labels", "Person")
      .option("apoc.meta.relTypeProperties", """{"sample": 10}""")
      .load

For both procedures you can pass all the supported parameters except for:

  • includeLabels for apoc.meta.nodeTypeProperties, because we use the labels defined into the labels option;

  • includeRels for apoc.meta.relTypeProperties because we use the one defined into the relationship option.

Fine tuning

As these two procedure sample the graph in oder to extract the metadata necessary for building the Tables 4 Labels in most of real-world scenario is important to tune properly the sampling parameters because the execution of these can be expensive and have and impact in the performances of your extraction job.

Automatic Sampling

In some installations and environments, the key APOC calls above will not be available. In these cases, the connector will automatically sample the first few records and infer the right data type from the examples that it sees.

Automatic sampling may be error prone, and may produce incorrect results, particularly in cases where a single Neo4j property exists with several different data types. Consistent typing of properties is strongly recommended