Scaling out jobs

Neo4j Graph Analytics for Snowflake is in Public Preview and is not intended for production use.

Neo4j Graph Analytics for Snowflake is designed to run on Snowpark Container Services, which allows you to scale out jobs across multiple compute nodes. This is particularly useful for two scenarios:

  1. Multiple users using the application at the same time.

  2. A single user running multiple jobs at the same time.

Both scenarios are handled by the application in the same way. An algorithm is executed on a compute node within a compute pool. The compute pool is selected based on the selector passed to the algorithm. Snowflake automatically scales the compute pool up or down based on the number of jobs running in the pool.

Considerations

The smallest unit of work is a job, i.e., a project-compute-write execution. The application does not automatically split jobs into smaller units of work and distribute them across multiple compute nodes. Depending on the graph size and algorithm, consider the right compute pool selector for your job.

As described in Managing compute pools, compute pools are configured with a minimum and maximum number of compute nodes. When algorithms are executed, Snowflake automatically places jobs on running nodes or starts new nodes if necessary. Consider adapting the minimum and maximum number of nodes in the compute pool to your needs. Please consult the Snowflake documentation for more information on working with compute pools.

Multiple users using the application at the same time

Multiple users can run jobs in parallel on the same compute pool or on different compute pools. The application automatically handles the distribution of jobs across compute nodes.

The following image shows an example execution of multiple jobs on different compute pools. Compute pools CPU_X64_M and GPU_NV_S are configured to use at most two compute nodes, while CPU_X64_L is configured to use at most one compute node.

Visualization of the example graph

Given the above image, a possible multi-user scenario could look like this:

  Alice> CALL graph.wcc('CPU_X64_M', <configuration>);            -- Job 1
    Bob> CALL graph.page_rank('CPU_X64_M', <configuration>);      -- Job 2
Charlie> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
  David> CALL graph.louvain('CPU_X64_L', <configuration>);        -- Job 4
    Eve> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>);   -- Job 5
  Frank> CALL graph.gs_nc_train('GPU_NV_S', <configuration>);     -- Job 6

Job 1 and 2 are both scheduled on the CPU_X64_M compute pool. They can run at the same time as this pool has two compute nodes. Job 3 and 4 are scheduled on the GPU_X64_L compute pool. Due to the limitation of one compute node, job 4 must wait for job 3 to finish. Job 5 and 6 are scheduled on the GPU_NV_S compute pool and can run in parallel.

Single user running multiple jobs at the same time

Similar to the multi-user scenario, a single user can run multiple jobs in parallel on the same compute pool or on different compute pools.

Generally speaking, the above example is valid for a single user as well:

Alice> CALL graph.wcc('CPU_X64_M', <configuration>);            -- Job 1
Alice> CALL graph.page_rank('CPU_X64_M', <configuration>);      -- Job 2
Alice> CALL graph.triangle_count('GPU_X64_L', <configuration>); -- Job 3
Alice> CALL graph.louvain('CPU_X64_L', <configuration>);        -- Job 4
Alice> CALL graph.gs_nc_predict('GPU_NV_S', <configuration>);   -- Job 5
Alice> CALL graph.gs_nc_train('GPU_NV_S', <configuration>);     -- Job 6

Executing multiple jobs in parallel as a single user can be achieved in multiple ways. We will give an example that demonstrates this for a Snowpark Python worksheet. Other ways to run multiple jobs in parallel are possible as well, e.g., using the Snowflake CLI or the Snowflake Python connector.

Example

In the following example, we will run multiple jobs in parallel using the Snowpark Python API. The key concept is to use collect_nowait() call to run a job and immediately return a future object. The example can be adapted to custom consumer tables and executed in a Snowpark Python worksheet. We will run three jobs in parallel on the CPU_X64_M compute pool. Each jobs will write its result in a separate output table.

import snowflake.snowpark as snowpark
from json import loads

def main(session: snowpark.Session):
    """
    This example shows how to run multiple Neo4j Graph Analytics jobs in parallel.
    The code can be run in a Snowflake worksheet.
    The return type of the worksheet must be set to `VARIANT`.
    """
    # The name of the application.
    app_name = 'Neo4j_Graph_Analytics'
    # The qualified name of the schema where the consumer data is stored.
    consumer_schema = 'consumer_db.data_schema'
    # The compute pool to execute the jobs.
    compute_pool_selector = 'CPU_X64_M'
    # The minimum and maximum number of compute nodes in the compute pool.
    min_nodes = max_nodes = 3

    # Make sure the compute pool is able to scale out compute jobs.
    # Note, that this command requires `app_admin` role.
    session.sql(f"CALL {app_name}.admin.set_max_nodes('{compute_pool_selector}', {max_nodes})").collect()
    session.sql(f"CALL {app_name}.admin.set_min_nodes('{compute_pool_selector}', {min_nodes})").collect()

    # Configure table to graph projection.
    # We assume two node tables and one relationship table.
    # * consumer_db.data_schema.Nodes_A
    # * consumer_db.data_schema.Nodes_B
    # * consumer_db.data_schema.Relationships
    project_cfg = {
        'defaultTablePrefix': consumer_schema,
        'nodeTables': ['Nodes_A', 'Nodes_B'],
        'relationshipTables': {
            'Relationships': {
                'sourceTable': 'Nodes_A',
                'targetTable': 'Nodes_B'
            }
        }
    }

    # Prepare procedure calls for three Neo4j Graph Analytics algorithms.
    wcc = f"""
        CALL {app_name}.graph.wcc('{compute_pool_selector}',
        {{
            'project': {project_cfg},
            'compute': {{}},
            'write': [{{
                'nodeLabel': 'Nodes_A',
                'outputTable': '{consumer_schema}.nodes_A_components'
            }}]
        }})"""
    page_rank = f"""
        CALL {app_name}.graph.page_rank('{compute_pool_selector}',
        {{
            'project': {project_cfg},
            'compute': {{ 'dampingFactor': 0.87 }},
            'write': [{{
                'nodeLabel': 'Nodes_B',
                'outputTable': '{consumer_schema}.nodes_B_ranks'
            }}]
        }})"""
    node_similarity = f"""
        CALL {app_name}.graph.node_similarity('{compute_pool_selector}',
        {{
            'project': {project_cfg},
            'compute': {{ 'topK': 5 }},
            'write': [{{
                'sourceLabel': 'Nodes_A',
                'targetLabel': 'Nodes_A',
                'outputTable': '{consumer_schema}.nodes_A_similarities'
            }}]
        }})"""

    # Start three jobs in parallel.
    # `collect_nowait` will return immediately, but the jobs will continue to run.
    wcc_result = session.sql(wcc).collect_nowait()
    page_rank_result = session.sql(page_rank).collect_nowait()
    node_similarity_result = session.sql(node_similarity).collect_nowait()

    # Wait for the jobs to finish and collect the results.
    wcc_result = loads(wcc_result.result()[0]["JOB_RESULT"])["wcc_1"]
    page_rank_result = loads(page_rank_result.result()[0]["JOB_RESULT"])["page_rank_1"]
    node_similarity_result = loads(node_similarity_result.result()[0]["JOB_RESULT"])["node_similarity_1"]

    # Return meta data about the three jobs.
    # Note, that each job also produces a result table.
    return {
        'wcc_components': wcc_result["componentCount"],
        'page_rank_iterations': page_rank_result["ranIterations"],
        'node_sim_comparisons': node_similarity_result["nodesCompared"]
    }