Using with Neo4j Graph Data Science

Neo4j Graph Data Science (GDS) lets data scientists benefit from powerful graph algorithms. It provides unsupervised machine learning (ML) methods and heuristics that learn and describe the topology of your graph. GDS includes hardened graph algorithms with enterprise features, like deterministic seeding for consistent results and reproducible ML workflows.

GDS algorithms are bucketed into five groups:

  • Community detection which detects group clusters and partition options.

  • Centrality which helps to compute the importance of a node in a graph.

  • Topological link prediction which estimates the likelihood of nodes forming a relationship.

  • Similarity which evaluates the similarity of pairs of nodes.

  • Path finding & search which finds optimal paths, evalutes route availability, and so on.

GDS operates via Cypher

All of the functionality of GDS is used by issuing Cypher® queries. As such, it is easily accessible via Spark, because the Neo4j Connector for Apache Spark can issue Cypher queries and read their results back. This combination means that you can use Neo4j and GDS as a graph co-processor in an existing ML workflow that you may implement in Apache Spark.

GDS first-class support Introduced in 5.1

Neo4j Spark Connector provides first-class support to Graph Data Science library let’s see how it works.

Limitations

We don’t support mutate or write procedure modes as they don’t return any kind of usable information in the dataframe, you can achieve the same by joining dataframes and then use the Neo4j Spark connector to write back the data to whatever Neo4j instance you want.

Example

Immagine that we want to replicate the example for the Page Rank algorithm detailed here in the GDS manual. The related Spark example looks like the following (we assume that the data is already in Neo4j).

  1. Create the projection graph: The following Python code will project a graph using a native projection and store it in the graph catalog under the name 'myGraph'.

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("gds", "gds.graph.project") \
      .option("gds.graphName", "myGraph") \
      .option("gds.nodeProjection", "Page") \
      .option("gds.relationshipProjection", "LINKS") \
      .option("gds.configuration.relationshipProperties", "weight") \
      .load() \
      .show(truncate=False)

which will show a result like this:

+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|nodeProjection                            |relationshipProjection                                                                                                                                                                    |graphName|nodeCount|relationshipCount|projectMillis|
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
|{Page -> {"properties":{},"label":"Page"}}|{LINKS -> {"orientation":"NATURAL","aggregation":"DEFAULT","type":"LINKS","properties":{"weight":{"property":"weight","aggregation":"DEFAULT","defaultValue":null}},"indexInverse":false}}|myGraph  |8        |14               |503          |
+------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------+---------+-----------------+-------------+
  1. Estimate the cost of the algorithm: With the following Python code we will estimate the cost of running the algorithm using the estimate procedure

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("gds", "gds.pageRank.stream") \
      .option("gds.graphName", "myGraph") \
      .option("gds.configuration.concurrency", "2") \
      .load() \
      .show(truncate=False)

which will show a result like this:

+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|requiredMemory|treeView                                                                                                                                                                                                                                                                                                                                                                                                                 |mapView                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 |bytesMin|bytesMax|nodeCount|relationshipCount|heapPercentageMin|heapPercentageMax|
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
|816 Bytes     |Memory Estimation: 816 Bytes\n|-- algorithm: 816 Bytes\n    |-- this.instance: 88 Bytes\n    |-- vote bits: 104 Bytes\n    |-- compute steps: 208 Bytes\n        |-- this.instance: 104 Bytes\n    |-- node value: 120 Bytes\n        |-- pagerank (DOUBLE): 120 Bytes\n    |-- message arrays: 296 Bytes\n        |-- this.instance: 56 Bytes\n        |-- send array: 120 Bytes\n        |-- receive array: 120 Bytes\n|{name -> Memory Estimation, components -> [{"name":"algorithm","components":[{"name":"this.instance","memoryUsage":"88 Bytes"},{"name":"vote bits","memoryUsage":"104 Bytes"},{"name":"compute steps","components":[{"name":"this.instance","memoryUsage":"104 Bytes"}],"memoryUsage":"208 Bytes"},{"name":"node value","components":[{"name":"pagerank (DOUBLE)","memoryUsage":"120 Bytes"}],"memoryUsage":"120 Bytes"},{"name":"message arrays","components":[{"name":"this.instance","memoryUsage":"56 Bytes"},{"name":"send array","memoryUsage":"120 Bytes"},{"name":"receive array","memoryUsage":"120 Bytes"}],"memoryUsage":"296 Bytes"}],"memoryUsage":"816 Bytes"}], memoryUsage -> 816 Bytes}|816     |816     |8        |14               |0.1              |0.1              |
+--------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+--------+---------+-----------------+-----------------+-----------------+
  1. Compute the algorithm: The following Python code will return the Page Rank computation without modifying the graph

    # we'll assume that `spark` variable is already present
    # we save the dataframe in the `pr_df` variable as we'll reuse it later
    pr_df = spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("gds", "gds.pageRank.stream") \
      .option("gds.graphName", "myGraph") \
      .option("gds.configuration.concurrency", "2") \
      .load()

    pr_df.show(truncate=False)

which will show a result like this:

+------+------------------+
|nodeId|score             |
+------+------------------+
|0     |3.215681999884452 |
|1     |1.0542700552146722|
|2     |1.0542700552146722|
|3     |1.0542700552146722|
|4     |0.3278578964488539|
|5     |0.3278578964488539|
|6     |0.3278578964488539|
|7     |0.3278578964488539|
+------+------------------+

As you can see, we have now only the two columns nodeId and score, let’s see how you can enrich your nodes with the score.

  1. Enrich nodes with the score: the following Python code will enrich the nodes with the score

    # we'll assume that `spark` variable is already present
    # we create the `nodes_df`
    nodes_df = spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("labels", "Page") \
      .load()

    # we join `nodes_df` with `pr_df` created in the step before
    new_df = nodes_df.join(pr_df, nodes_df.col("<id>").equalTo(pr_df.col("nodeId")))
    new_df.show(truncate=False)

which will show a result like this:

+----+--------+-------+------+------------------+
|<id>|<labels>|   name|nodeId|             score|
+----+--------+-------+------+------------------+
|   0|  [Page]|   Home|     0| 3.215681999884452|
|   1|  [Page]|  About|     1|1.0542700552146722|
|   2|  [Page]|Product|     2|1.0542700552146722|
|   3|  [Page]|  Links|     3|1.0542700552146722|
|   4|  [Page]| Site A|     4|0.3278578964488539|
|   5|  [Page]| Site B|     5|0.3278578964488539|
|   6|  [Page]| Site C|     6|0.3278578964488539|
|   7|  [Page]| Site D|     7|0.3278578964488539|
+----+--------+-------+------+------------------+

Now you can persist this dataset to whatever Neo4j instance you want.

Options

As you may understand from the examples above, you can pass all the required options with the gds. prefix with the dot notation support for nested maps.

Table 1. List of available configuration settings
Setting name Description Default value Required

GDS options

gds

The procedure name. You can pick the well suited algorithm for your use case from the fllowing page in the GDS manual

(none)

Yes

gds.

the setting name is just a prefix that needs to be complete with input option of the procedure you choose.

(none)

Yes, it’s related to the procedure that you choose

How to manage the gds. prefix in your Spark Job

Consider for instance that you want to project a graph. Like the following:

---
CALL gds.graph.project(
  'myGraph',
  'Page',
  'LINKS',
  {
    relationshipProperties: 'weight'
  }
)
---

So we need:

  • to invoke the gds.graph.project, and this leads to add .option("gds", "gds.graph.project") to our Spark Job. The project procedure, as you can see ink:https://neo4j.com/docs/graph-data-science/current/management-ops/projections/graph-project/[here^] has 4 input parameters:

    • graphName: we want to name the graph myGraph; this leads to add .option("gds.graphName", "myGraph")

    • nodeProjection: we want to project Page nodes; this leads to add .option("gds.nodeProjection", "Page")

    • relationshipProjection: we want to project LINKS relationships; this leads to add .option("gds.relationshipProjection", "LINKS")

    • configuration: we want to configure weight as the property that defines the importance of the relationship; configuration is a map, and we need to add a relationshipPropertis key with the value weight to our map, we can do this via dot notation and this leads to add .option("gds.configuration.relationshipProperties", "weight")

So the final Spark job will result as it follows:

    # we'll assume that `spark` variable is already present
    spark.read.format("org.neo4j.spark.DataSource") \
      .option("url", "neo4j://localhost:7687") \
      .option("gds", "gds.graph.project") \
      .option("gds.graphName", "myGraph") \
      .option("gds.nodeProjection", "Page") \
      .option("gds.relationshipProjection", "LINKS") \
      .option("gds.configuration.relationshipProperties", "weight") \
      .load() \
      .show(truncate=False)

GDS support via Cypher queries

With the mode you can use complicated custom queries in order to analyze your data with GDS.

Example

In the sample Zeppelin Notebook repository, there is a GDS example that can be run against a Neo4j Sandbox, showing how to use the two together.

Create a virtual graph in GDS using Spark

This is very simple, straightforward code; it constructs the right Cypher statement to create a virtual graph in GDS and returns the results.

%pyspark
query = """
    CALL gds.graph.project('got-interactions', 'Person', {
      INTERACTS: {
        orientation: 'UNDIRECTED'
      }
    })
    YIELD graphName, nodeCount, relationshipCount, projectMillis
    RETURN graphName, nodeCount, relationshipCount, projectMillis
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()
If you get a A graph with name [name] already exists error, take a look at this FAQ.

Ensure that option partitions is set to 1. You do not want to execute this query in parallel, it should be executed only once.

When you use stored procedures, you must include a RETURN clause.

Run a GDS analysis and stream the results back

The following example shows how to run an analysis and get the result as just another Cypher query, executed as a Spark read from Neo4j.

%pyspark

query = """
    CALL gds.pageRank.stream('got-interactions')
    YIELD nodeId, score
    RETURN gds.util.asNode(nodeId).name AS name, score
"""

df = spark.read.format("org.neo4j.spark.DataSource") \
    .option("url", host) \
    .option("authentication.type", "basic") \
    .option("authentication.basic.username", user) \
    .option("authentication.basic.password", password) \
    .option("query", query) \
    .option("partitions", "1") \
    .load()

df.show()
Ensure that option partitions is set to 1. The algorithm should be executed only once.

Streaming versus persisting GDS results

When running GDS algorithms, the library gives you the choice of either streaming the algorithm results back to the caller, or mutating the underlying graph. Using GDS together with Spark provides an additional option of transforming or otherwise using a GDS result. Ultimately, either modality works with the Neo4j Connector for Apache Spark, and you choose what’s best for your use case.

If you have an architecture where the GDS algorithm is being run on a Read Replica or a separate standalone instance, it may be convenient to stream the results back (as you cannot write them to a Read Replica), and then use the connector’s write functionality to take that stream of results, and write them back to a different Neo4j connection, i.e., to a regular Causal Cluster.