6.9. Pregel API

This chapter provides documentation for the Pregel API in the Neo4j Graph Data Science library.

This topic includes:

6.9.1. Introduction

Pregel is a vertex-centric computation model to define your own algorithms via a user-defined compute function. Node values can be updated within the compute function and represent the algorithm result. The input graph contains default node values or node values from a graph projection.

The compute function is executed in multiple iterations, also called supersteps. In each superstep, the compute function runs for each node in the graph. Within that function, a node can receive messages from other nodes, typically its neighbors. Based on the received messages and its currently stored value, a node can compute a new value. A node can also send messages to other nodes, typically its neighbors, which are received in the next superstep. The algorithm terminates after a fixed number of supersteps or if no messages are being sent between nodes.

A Pregel computation is executed in parallel. Each thread executes the compute function for a batch of nodes.

For more information about Pregel, have a look at https://kowshik.github.io/JPregel/pregel_paper.pdf.

To implement your own Pregel algorithm, the Graph Data Science library provides a Java API, which is described below.

The introduction of a new Pregel algorithm can be separated in two main steps. First, we need to implement the algorithm using the Pregel Java API. Second, we need to expose the algorithm via a Cypher procedure to make use of it.

For an example on how to expose a custom Pregel computation via a Neo4j procedure, have a look at the Pregel examples.

6.9.2. Pregel Java API

The Pregel Java API allows us to easily build our own algorithm by implementing several interfaces.

6.9.2.1. Computation

The first step is to implement the org.neo4j.graphalgo.beta.pregel.PregelComputation interface. It is the main interface to express user-defined logic using the Pregel framework.

The Pregel computation. 

public interface PregelComputation<C extends PregelConfig> {
    // The schema describes the node property layout.
    PregelSchema schema();
    // Called in the first superstep and allows initializing node state.
    default void init(PregelContext.InitContext<C> context) {}
    // Called in each superstep for each node and contains the main logic.
    void compute(PregelContext.ComputeContext<C> context, Pregel.Messages messages);
    // Used to apply a relationship weight on a message.
    default double applyRelationshipWeight(double message, double relationshipWeight);
}

Pregel node values are composite values. The schema describes the layout of that composite value. Each element of the schema can represent either a primitive long or double value as well as arrays of those. The element is uniquely identified by a key, which is used to access the value during the computation.

The init method is called in the beginning of the first superstep of the Pregel computation and allows initializing node values. The interface defines an abstract compute method, which is called for each node in every superstep. Algorithm-specific logic is expressed within the compute method. The context parameter provides access to node properties of the in-memory graph and the algorithm configuration.

The compute method is called individually for each node in every superstep as long as the node receives messages or has not voted to halt yet. Since an implementation of PregelComputation is stateless, a node can only communicate with other nodes via messages. In each superstep, a node receives messages and can send new messages via the context parameter. Messages can be sent to neighbor nodes or any node if its identifier is known.

The applyRelationshipWeight method can be used to modify the message based on a relationship property. If the input graph has no relationship properties, i.e. is unweighted, the method is skipped.

6.9.2.2. Init context and compute context

The main purpose of the two context objects is to enable the computation to communicate with the Pregel framework. A context is stateful, and all its methods are subject to the current superstep and the currently processed node. Both context objects share a set of methods, e.g., to access the config and node state. Additionally, each context adds context-specific methods.

The org.neo4j.graphalgo.beta.pregel.PregelContext.InitContext is available in the init method of a Pregel computation. It provides access to node properties stored in the in-memory graph. We can set the initial node state to a fixed value, e.g. the node id, or use graph properties and the user-defined configuration to initialize a context-dependent state.

The InitContext. 

public final class InitContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Sets a double array node value for the given schema key.
    public void setNodeValue(String key, double[] value);
    // Sets a long array node value for the given schema key.
    public void setNodeValue(String key, long[] value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Number of relationships of the current node.
    public int degree();
    // Available node property keys in the input graph.
    public Set<String> nodePropertyKeys();
    // Node properties stored in the input graph.
    public NodeProperties nodeProperties(String key);
}

In contrast, org.neo4j.graphalgo.beta.pregel.PregelContext.ComputeContext can be accessed inside the compute method. The context provides methods to access the computation state, e.g. the current superstep, and to send messages to other nodes in the graph.

The ComputeContext. 

public final class ComputeContext {
    // The currently processed node id.
    public long nodeId();
    // User-defined Pregel configuration
    public PregelConfig config();
    // Sets a double node value for the given schema key.
    public void setNodeValue(String key, double value);
    // Sets a long node value for the given schema key.
    public void setNodeValue(String key, long value);
    // Number of nodes in the input graph.
    public long nodeCount();
    // Number of relationships in the input graph.
    public long relationshipCount();
    // Number of relationships of the current node.
    public int degree();
    // Double value for the given node schema key.
    public double doubleNodeValue(String key);
    // Double value for the given node schema key.
    public long longNodeValue(String key);
    // Double array value for the given node schema key.
    public double[] doubleArrayNodeValue(String key);
    // Long array value for the given node schema key.
    public long[] longArrayNodeValue(String key);
    // Notify the framework that the node intends to stop its computation.
    public void voteToHalt();
    // Indicates whether this is superstep 0.
    public boolean isInitialSuperstep();
    // 0-based superstep identifier.
    public int superstep();
    // Sends the given message to all neighbors of the node.
    public void sendToNeighbors(double message);
    // Sends the given message to the target node.
    public void sendTo(long targetNodeId, double message);
}

6.9.2.3. Configuration

To configure the execution of a custom Pregel computation, the framework requires a configuration. The org.neo4j.graphalgo.beta.pregel.PregelConfig provides the minimum set of options to execute a computation. The configuration options also map to the parameters that can later be set via a custom procedure. This is equivalent to all the other algorithms within the GDS library.

Table 6.532. Pregel Configuration
Name Type Default Value Description

maxIterations

Integer

-

Maximum number of supersteps after which the computation will terminate.

isAsynchronous

Boolean

false

Flag indicating if messages can be sent and received in the same superstep.

relationshipWeightProperty

String

null

Name of the relationship property that represents a relationship weight.

concurrency

Integer

4

Concurrency used when executing the Pregel computation.

writeConcurrency

Integer

concurrency

Concurrency used when writing computation results to Neo4j.

writeProperty

String

"pregel_"

Prefix string that is prepended to node schema keys in write mode.

mutateProperty

String

"pregel_"

Prefix string that is prepended to node schema keys in mutate mode.

For some algorithms, we want to specify additional configuration options.

Typically, these options are algorithm specific arguments, such as thresholds. Another reason for a custom config relates to the initialization phase of the computation. If we want to init the node state based on a graph property, we need to access that property via its key. Since those keys are dynamic properties of the graph, we need to provide them to the computation. We can achieve that by declaring an option to set that key in a custom configuration.

If a user-defined Pregel computation requires custom options a custom configuration can be created by extending the PregelConfig.

A custom configuration and how it can be used in the init phase. 

@ValueClass
@Configuration
public interface CustomConfig extends PregelConfig {
    // A property key that refers to a seed property.
    String seedProperty();
    // An algorithm specific parameter.
    int minDegree();
}

public class CustomComputation implements PregelComputation<CustomConfig> {

    @Override
    public void init(PregelContext.InitContext<CustomConfig> context) {
        // Use the custom config key to access a graph property.
        var seedProperties = context.nodeProperties(context.config().seedProperty());
        // Init the node state with the graph property for that node.
        context.setNodeValue("state", seedProperties.doubleValue(context.nodeId()));
    }

    @Override
    public void compute(PregelContext.ComputeContext<CustomConfig> context, Pregel.Messages messages) {
        if (context.degree() >= context.config().minDegree()) {
            // ...
        }
    }

    // ...
}

6.9.3. Run Pregel via Cypher

To make a custom Pregel computation accessible via Cypher, it needs to be exposed via the procedure API. The Pregel framework in GDS provides an easy way to generate procedures for all the default modes.

6.9.3.1. Procedure generation

To generate procedures for a computation, it needs to be annotated with the @org.neo4j.graphalgo.beta.pregel.annotation.PregelProcedure annotation.

Using the @PregelProcedure annotation to configure code generation. 

@PregelProcedure(
    name = "custom.pregel.proc",
    modes = {GDSMode.STREAM, GDSMode.WRITE},
    description = "My custom Pregel algorithm"
)
public class CustomComputation implements PregelComputation<PregelConfig> {
    // ...
}

The annotation provides a number of configuration options for the code generation.

Table 6.533. Configuration
Name Type Default Value Description

name

String

-

The prefix of the generated procedure name. It is appended by the mode.

modes

List

[STREAM, WRITE, MUTATE, STATS]

A procedure is generated for each of the specified modes.

description

String

""

Procedure description that is printed in dbms.listProcedures().

For the above Code snippet, we generate four procedures:

  • custom.pregel.proc.stream
  • custom.pregel.proc.stream.estimate
  • custom.pregel.proc.write
  • custom.pregel.proc.write.estimate

6.9.3.2. Building and installing a Neo4j plugin

In order to use a Pregel algorithm in Neo4j via a procedure, we need to package it as Neo4j plugin. The pregel-bootstrap project is a good starting point. The build.gradle file within the project contains all the dependencies necessary to implement a Pregel algorithm and to generate corresponding procedures.

Make sure to change the gdsVersion and neo4jVersion according to your setup. GDS and Neo4j are runtime dependencies. Therefore, GDS needs to be installed as a plugin on the Neo4j server.

To build the project and create a plugin jar, just run:

./gradlew shadowJar

You can find the pregel-bootstrap.jar in build/libs. The jar needs to be placed in the plugins directory within your Neo4j installation alongside a GDS plugin jar. In order to have access to the procedure in Cypher, its namespace potentially needs to be added to the neo4j.conf file.

Enabling an example procedure in neo4j.conf

dbms.security.procedures.unrestricted=custom.pregel.proc.*
dbms.security.procedures.whitelist=custom.pregel.proc.*

6.9.4. Examples

The pregel-examples module contains a set of examples for Pregel algorithms. The algorithm implementations demonstrate the usage of the Pregel API. Along with each example, we provide test classes that can be used as a guideline on how to write tests for custom algorithms. To play around, we recommend copying one of the algorithms into the pregel-bootstrap project, build it and setup the plugin in Neo4j.