Monitoring the progress of a running algorithm

Follow along with a notebook in Colab Google Colab

Running algorithms on large graphs can be computationally expensive. This example shows how to use the gds.beta.listProgress procedure to monitor the progress of an algorithm, both to get an idea of the processing speed and to determine when the computation is completed.

Setup

For more information on how to get started using Python, refer to the Connecting with Python tutorial.

pip install graphdatascience
# Import the client
from graphdatascience import GraphDataScience

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Configure the client with AuraDS-recommended settings
gds = GraphDataScience(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD),
    aura_ds=True
)

In the following code examples we use the print function to print Pandas DataFrame and Series objects. You can try different ways to print a Pandas object, for instance via the to_string and to_json methods; if you use a JSON representation, in some cases you may need to include a default handler to handle Neo4j DateTime objects. Check the Python connection section for some examples.

For more information on how to get started using the Cypher Shell, refer to the Neo4j Cypher Shell tutorial.

Run the following commands from the directory where the Cypher shell is installed.
export AURA_CONNECTION_URI="neo4j+s://xxxxxxxx.databases.neo4j.io"
export AURA_USERNAME="neo4j"
export AURA_PASSWORD=""

./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD

For more information on how to get started using Python, refer to the Connecting with Python tutorial.

pip install neo4j
# Import the driver
from neo4j import GraphDatabase

# Replace with the actual URI, username, and password
AURA_CONNECTION_URI = "neo4j+s://xxxxxxxx.databases.neo4j.io"
AURA_USERNAME = "neo4j"
AURA_PASSWORD = ""

# Instantiate the driver
driver = GraphDatabase.driver(
    AURA_CONNECTION_URI,
    auth=(AURA_USERNAME, AURA_PASSWORD)
)
# Import to prettify results
import json

# Import for the JSON helper function
from neo4j.time import DateTime

# Helper function for serializing Neo4j DateTime in JSON dumps
def default(o):
    if isinstance(o, (DateTime)):
        return o.isoformat()

Create an example graph

An easy way to create an in-memory graph is through the GDS graph generation algorithm. By specifing the number of nodes, the average number of relationships going out of each node and the relationship distribution function, the algorithm creates a graph having the following shape:

(:1000000_Nodes)-[:REL]→(:1000000_Nodes)

# Run the graph generation algorithm and retrieve the corresponding
# graph object and call result metadata
g, result = gds.beta.graph.generate(
    "example-graph",
    1000000,
    3,
    relationshipDistribution="POWER_LAW"
)

# Print prettified graph stats
print(result)
CALL gds.beta.graph.generate(
  'example-graph',
  1000000,
  3,
  {relationshipDistribution: 'POWER_LAW'}
)
YIELD name,
  nodes,
  relationships,
  generateMillis,
  relationshipSeed,
  averageDegree,
  relationshipDistribution,
  relationshipProperty
RETURN *
# Cypher query
create_example_graph_query = """
    CALL gds.beta.graph.generate(
      'example-graph',
      1000000,
      3,
      {relationshipDistribution: 'POWER_LAW'}
    )
    YIELD name,
      nodes,
      relationships,
      generateMillis,
      relationshipSeed,
      averageDegree,
      relationshipDistribution,
      relationshipProperty
    RETURN *
"""

# Create the driver session
with driver.session() as session:
    # Run query
    result = session.run(create_example_graph_query).data()

    # Prettify the result
    print(json.dumps(result, indent=2, sort_keys=True, default=default))

Run an algorithm and check the progress

We need to run an algorithm that takes some time to converge. In this example we use the Label Propagation algorithm, which we start in a separate thread so that we can check its progress in the same Python process.

# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time


# Method to call the label propagation algorithm from a thread
def run_label_prop():
    print("Running label propagation")

    result = gds.labelPropagation.mutate(
        g,
        mutateProperty="communityID"
    )

    print(result)


# Method to get and pretty-print the algorithm progress
def run_list_progress():
    result = gds.beta.listProgress()

    print(result)


# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()

# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)

# Check the algorithm progress
run_list_progress()

# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)

# Check the algorithm progress again
run_list_progress()

# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()
CALL gds.labelPropagation.mutate(
  'example-graph',
  {mutateProperty: 'communityID'}
)
YIELD preProcessingMillis,
  computeMillis,
  mutateMillis,
  postProcessingMillis,
  nodePropertiesWritten,
  communityCount,
  ranIterations,
  didConverge,
  communityDistribution,
  configuration
RETURN *

// The following query has to be run in another Cypher shell, so run this command
// in a different terminal first:
//
// ./cypher-shell -a $AURA_CONNECTION_URI -u $AURA_USERNAME -p $AURA_PASSWORD

CALL gds.beta.listProgress()
YIELD jobId, taskName, progress, progressBar
RETURN *
# Import to run the long-running algorithm in a thread
import threading
# Import to use the sleep method
import time


# Method to call the label propagation algorithm from a thread
def run_label_prop():
    label_prop_mutate_example_graph_query = """
        CALL gds.labelPropagation.mutate(
          'example-graph',
          {mutateProperty: 'communityID'}
        )
        YIELD preProcessingMillis,
          computeMillis,
          mutateMillis,
          postProcessingMillis,
          nodePropertiesWritten,
          communityCount,
          ranIterations,
          didConverge,
          communityDistribution,
          configuration
        RETURN *
    """

    # Create the driver session
    with driver.session() as session:
        # Run query
        print("Running label propagation")
        results = session.run(label_prop_mutate_example_graph_query).data()
        # Prettify the first result
        print(json.dumps(results[0], indent=2, sort_keys=True))


# Method to get and pretty-print the algorithm progress
def run_list_progress():
    gds_list_progress_query = """
        CALL gds.beta.listProgress()
        YIELD jobId, taskName, progress, progressBar
        RETURN *
    """

    # Create the driver session
    with driver.session() as session:
        # Run query
        print('running list progress')
        results = session.run(gds_list_progress_query).data()
        # Prettify the first result
        print('list progress results: ')
        print(json.dumps(results[0], indent=2, sort_keys=True))


# Create a thread for the label propagation algorithm and start it
label_prop_query_thread = threading.Thread(target=run_label_prop)
label_prop_query_thread.start()

# Sleep for a few seconds so the label propagation query has time to get going
print('Sleeping for 5 seconds')
time.sleep(5)

# Check the algorithm progress
run_list_progress()

# Sleep for a few more seconds
print('Sleeping for 10 more seconds')
time.sleep(10)

# Check the algorithm progress again
run_list_progress()

# Block and wait for the algorithm thread to finish
label_prop_query_thread.join()

Cleanup

The in-memory graph can now be deleted.

result = gds.graph.drop(g)

print(result)
CALL gds.graph.drop('example-graph')
delete_example_in_memory_graph_query = """
CALL gds.graph.drop('example-graph')
"""

with driver.session() as session:
    # Run query
    results = session.run(delete_example_in_memory_graph_query).data()

    # Prettify the results
    print(json.dumps(results, indent=2, sort_keys=True, default=default))

References

Cypher