Projecting graphs using Apache Arrow

This chapter explains how to import data using Apache Arrow™ into the Graph Data Science library.

This feature is in the alpha tier.

This feature is not available in AuraDS

Projecting graphs via Apache Arrow allows importing graph data which is stored outside of Neo4j. Apache Arrow is a language-agnostic in-memory, columnar data structure specification. With Arrow Flight, it also contains a protocol for serialization and generic data transport.

GDS exposes an Arrow Flight Server which accepts graph data from an Arrow Flight Client. The data that is being sent is represented using the Arrow columnar format. Projecting graphs via Arrow Flight follows a specific client-server protocol. In this chapter, we explain that protocol, message formats and schema constraints.

In this chapter, we assume that a Flight server has been set up and configured. To learn more about the installation, please refer to the installation chapter.

1. Client-Server protocol

The protocol describes the projection of a single in-memory graph into GDS. Each projection is represented as an import process on the server side. The protocol divides the import process into three phases.

Client-server protocol for Arrow import in GDS
  1. Initialize the import process

    To initialize the import process, the client needs to execute a Flight action on the server. The action type is called CREATE_GRAPH and the action body configures the import process. The server receives the action, creates the import process and acknowledges success.

    See Initializing the Import Process for more details.

  2. Send node records via an Arrow Flight stream

    In the second phase, the client sends record batches of nodes via PUT as a Flight stream. Once all record batches are sent, the client needs to indicate that all nodes have been sent. This is done via sending another Flight action with type NODE_LOAD_DONE.

  3. Send relationship records via an Arrow Flight stream

    In the third and last phase, the client sends record batches of relationships via PUT as a Flight stream. Once all record batches are sent, the client needs to indicate that the import process is complete. This is done via sending another Flight action with type RELATIONSHIP_LOAD_DONE. The server finalizes the construction of the in-memory graph and stores the graph in the graph catalog.

2. Initializing the Import Process

An import process is initialized by sending a Flight action using the action type CREATE_GRAPH. The action body is a JSON document containing metadata for the import process:

{
    name: "my_graph",
    database_name: "neo4j",
    concurrency: 4
}

The name is used to identify the import process, it is also the name of the resulting in-memory graph in the graph catalog. The database_name is used to tell the server on which database the projected graph will be available. The concurrency key is optional, it is used during finalizing the in-memory graph on the server after all data has been received.

The server acknowledges creating the import process by sending a result JSON document which contains the name of the import process. If an error occurs, e.g., if the graph already exists or if the server is not started, the client is informed accordingly.

3. Sending node records via PUT as a Flight stream

Nodes need to be turned into Arrow record batches and sent to the server via a Flight stream. Each stream needs to target an import process on the server. That information is encoded in the Flight descriptor body as a JSON document:

{
    name: "my_graph",
    entity_type: "node",
}

The server expects the node records to adhere to a specific schema. Given an example node such as (:Pokemon { weight: 8.5, height: 0.6, hp: 39 }), it’s record must be represented as follows:

nodeId labels weight height hp

0

"Pokemon"

8.5

0.6

39

The following table describes the node columns with reserved names.

Name Type Optional Nullable Description

nodeId

Integer

No

No

Unique 64-bit node identifiers for the in-memory graph. Must be positive values.

labels

String or Integer or List of String

Yes

No

Node labels, either a single string node label, a single dictionary encoded node label or a list of node label strings.

Any additional column is interpreted as a node property. The supported data types are equivalent to the GDS node property types, i.e., long, double, long[], double[] and float[].

To increase the throughput, multiple Flight streams can be sent in parallel. The server manages multiple incoming streams for the same import process. In addition to the number of parallel streams, the size of a single record batch can also affect the overall throughput. The client has to make sure that node ids are unique across all streams.

Sending duplicate node ids will result in an undefined behaviour.

Once all node record batches are sent to the server, the client needs to indicate that node loading is done. This is achieved by sending another Flight action with the action type NODE_LOAD_DONE and the following JSON document as action body:

{
    name: "my_graph"
}

The server acknowledges the action by returning a JSON document including the name of the import process and the number of nodes that have been imported:

{
    name: "my_graph",
    node_count: 42
}

Node identifiers are represented by long values in the range of 0 to 2^63. If the input node id space is sparse and contains very large node id values, one might observe a high memory footprint for the projected graph. In these situations, the memory footprint of the graph could be reduced by switching to another id map implementation.

4. Sending relationship records via PUT as a Flight stream

Similar to nodes, relationships need to be turned into record batches in order to send them to the server via a Flight stream. The Flight descriptor is a JSON document containing the name of the import process as well as the entity type:

{
    name: "my_graph",
    entity_type: "relationship",
}

As for nodes, the server expects a specific schema for relationship records. For example, given the relationship (a)-[:EVOLVES_TO { at_level: 16 }]→(b) an assuming node id 0 for a and node id 1 for b, the record must be represented as follow:

sourceNodeId targetNodeId type at_level

0

1

"EVOLVES_TO"

16

The following table describes the node columns with reserved names.

Name Type Optional Nullable Description

sourceNodeId

Integer

No

No

Unique 64-bit source node identifiers. Must be positive values and present in the imported nodes.

targetNodeId

Integer

No

No

Unique 64-bit target node identifiers. Must be positive values and present in the imported nodes.

relationshipType

String or Integer

Yes

No

Single relationship type. Either a string literal or a dictionary encoded number.

Any additional column is interpreted as a relationship property. GDS only supports relationship properties of type double.

Similar to sending nodes, the overall throughput depends on the number of parallel Flight streams and the record batch size.

Once all relationship record batches are sent to the server, the client needs to indicate that the import process is done. This is achieved by sending a final Flight action with the action type RELATIONSHIP_LOAD_DONE and the following JSON document as action body:

{
    name: "my_graph"
}

The server finalizes the graph projection and stores the in-memory graph in the graph catalog. Once completed, the server acknowledges the action by returning a JSON document including the name of the import process and the number of relationships that have been imported:

{
    name: "my_graph",
    relationship_count: 1337
}

5. Creating a Neo4j database

The Client-Server protocol can also be used to create a new Neo4j database instead of an in-memory graph. To initialize a database import process, we need to change the initial action type to CREATE_DATABASE. The action body is a JSON document containing the configuration for the import process:

{
    name: "my_database",
    concurrency: 4
}

The following table contains all settings for the database import.

Name Type Optional Default value Description

name

String

No

None

The name of the import process and the resulting database.

concurrency

Integer

Yes

Available cores

Number of threads to use for the database creation process.

debug_log

Boolean

Yes

False

Write the database creation process into the log.

id_property

String

Yes

originalId

The node property key which stores the node id of the input data.

record_format

String

Yes

dbms.record_format

Database record format. Valid values are blank (no value, default), standard, aligned, or high_limit.

force

Boolean

Yes

False

Force deletes any existing database files prior to the import.

high_io

Boolean

Yes

False

Ignore environment-based heuristics, and specify whether the target storage subsystem can support parallel IO with high throughput.

use_bad_collector

Boolean

Yes

False

Collects bad node and relationship records during import and writes them into the log.

After sending the action to initialize the import process, the subsequent protocol is the same as for creating an in-memory graph. See Sending node records via PUT as a Flight stream and Sending relationship records via PUT as a Flight stream for further details.