Apache Arrow projection

Projecting graphs via Apache Arrow allows importing graph data which is stored outside of Neo4j. Apache Arrow is a language-agnostic in-memory, columnar data structure specification. With Arrow Flight, it also contains a protocol for serialization and generic data transport.

GDS exposes an Arrow Flight Server which accepts graph data from an Arrow Flight Client. The data that is being sent is represented using the Arrow columnar format. Projecting graphs via Arrow Flight follows a specific client-server protocol. In this chapter, we explain that protocol, message formats and schema constraints.

In this chapter, we assume that a Flight server has been set up and configured. To learn more about the installation, please refer to the installation chapter.

Graph projection features are versioned to allow for future changes. Please refer to the corresponding section in the Configure Apache Arrow server documentation for more details on versioned commands.

Client-Server protocol

The protocol describes the projection of a single in-memory graph into GDS. Each projection is represented as an import process on the server side. The protocol divides the import process into three phases.

Client-server protocol for Arrow import in GDS
  1. Initialize the import process

    To initialize the import process, the client needs to execute a Flight action on the server. The action type is called CREATE_GRAPH and the action body configures the import process. The server receives the action, creates the import process and acknowledges success.

    See Initializing the Import Process for more details.

  2. Send node records via an Arrow Flight stream

    In the second phase, the client sends record batches of nodes via PUT as a Flight stream. Once all record batches are sent, the client needs to indicate that all nodes have been sent. This is done via sending another Flight action with type NODE_LOAD_DONE.

  3. Send relationship records via an Arrow Flight stream

    In the third and last phase, the client sends record batches of relationships via PUT as a Flight stream. Once all record batches are sent, the client needs to indicate that the import process is complete. This is done via sending another Flight action with type RELATIONSHIP_LOAD_DONE. The server finalizes the construction of the in-memory graph and stores the graph in the graph catalog.

Initializing the import process

An import process is initialized by sending a Flight action using the action type v1/CREATE_GRAPH. The action type contains the server version, which is currently version 1 (v1). The action body is a JSON document containing metadata for the import process:

{
    name: "my_graph", (1)
    database_name: "neo4j", (2)
    concurrency: 4, (3)
    undirected_relationship_types: [] (4)
    inverse_indexed_relationship_types: [] (5)
    skip_dangling_relationships: false (6)
}
1 Used to identify the import process. It is also the name of the resulting in-memory graph in the graph catalog.
2 The name of the database on which the projected graph will be available.
3 (optional) The level of concurrency that will be set on the in-memory graph after all data has been received.
4 (optional) A list of relationship types that must be imported as undirected. A wildcard (*) can be used to include all the types.
5 (optional) A list of relationship types that must be indexed in inverse direction. A wildcard (*) can be used to include all the types.
6 (optional) If set to true, dangling relationships will be skipped during the import process. Otherwise, the import process will fail if dangling relationships are detected.
Relationships declared as undirected should only be provided once, i.e. in a single direction.

The server acknowledges creating the import process by sending a result JSON document which contains the name of the import process. If an error occurs, e.g., if the graph already exists or if the server is not started, the client is informed accordingly.

Sending node records via PUT as a Flight stream

Nodes need to be turned into Arrow record batches and sent to the server via a Flight stream. Each stream needs to target an import process on the server. That information is encoded in the Flight descriptor body as a JSON document:

{
    name: "PUT_COMMAND",
    version: "v1",
    body: {
        name: "my_graph",
        entity_type: "node",
    }
}

The server expects the node records to adhere to a specific schema. Given an example node such as (:Pokemon { weight: 8.5, height: 0.6, hp: 39 }), it’s record must be represented as follows:

nodeId labels weight height hp

0

"Pokemon"

8.5

0.6

39

The following table describes the node columns with reserved names.

Name Type Optional Nullable Description

nodeId

Integer

No

No

Unique 64-bit node identifiers for the in-memory graph. Must be positive values.

labels

String or Integer or List of String

Yes

No

Node labels, either a single string node label, a single dictionary encoded node label or a list of node label strings.

Any additional column is interpreted as a node property. The supported data types are equivalent to the GDS node property types, i.e., long, double, long[], double[] and float[].

For floating point values, null will be converted to NaN.

To increase the throughput, multiple Flight streams can be sent in parallel. The server manages multiple incoming streams for the same import process. In addition to the number of parallel streams, the size of a single record batch can also affect the overall throughput. The client has to make sure that node ids are unique across all streams.

Sending duplicate node ids will result in an undefined behaviour.

Once all node record batches are sent to the server, the client needs to indicate that node loading is done. This is achieved by sending another Flight action with the action type v1/NODE_LOAD_DONE and the following JSON document as action body:

{
    name: "my_graph"
}

The server acknowledges the action by returning a JSON document including the name of the import process and the number of nodes that have been imported:

{
    name: "my_graph",
    node_count: 42
}

Importing nodes with common labels

In case all nodes of a single call to the PUT endpoint share the same labels, the labels can be specified as part of the import process metadata via the common_labels property:

{
    name: "PUT_COMMAND",
    version: "v1",
    body: {
        name: "my_graph",
        entity_type: "node",
        common_labels: ["Pokemon"]
    }
}

If the common labels are the only labels present in a stream, the labels column can be omitted from the node record batches. In case that there are nodes with more labels than the common ones, it is still possible to include a labels column which contains the additional labels.

Sending relationship records via PUT as a Flight stream

Similar to nodes, relationships need to be turned into record batches in order to send them to the server via a Flight stream. The Flight descriptor is a JSON document containing the name of the import process as well as the entity type:

{
    name: "PUT_COMMAND",
    version: "v1",
    body: {
        name: "my_graph",
        entity_type: "relationship",
    }
}

As for nodes, the server expects a specific schema for relationship records. For example, given the relationship (a)-[:EVOLVES_TO { at_level: 16 }]→(b) an assuming node id 0 for a and node id 1 for b, the record must be represented as follow:

sourceNodeId targetNodeId type at_level

0

1

"EVOLVES_TO"

16

The following table describes the node columns with reserved names.

Name Type Optional Nullable Description

sourceNodeId

Integer

No

No

Unique 64-bit source node identifiers. Must be positive values and present in the imported nodes.

targetNodeId

Integer

No

No

Unique 64-bit target node identifiers. Must be positive values and present in the imported nodes.

relationshipType

String or Integer

Yes

No

Single relationship type. Either a string literal or a dictionary encoded number.

Any additional column is interpreted as a relationship property. GDS only supports relationship properties of type double.

Similar to sending nodes, the overall throughput depends on the number of parallel Flight streams and the record batch size.

Once all relationship record batches are sent to the server, the client needs to indicate that the import process is done. This is achieved by sending a final Flight action with the action type v1/RELATIONSHIP_LOAD_DONE and the following JSON document as action body:

{
    name: "my_graph"
}

The server finalizes the graph projection and stores the in-memory graph in the graph catalog. Once completed, the server acknowledges the action by returning a JSON document including the name of the import process and the number of relationships that have been imported:

{
    name: "my_graph",
    relationship_count: 1337
}

Aborting an import process

A started import process can be aborted by sending a Flight action using the action type v1/ABORT. This will immediately cancel the running graph or database import process and remove all temporary data.

The action body is a JSON document containing the name of the graph or database that is being imported:

{
    name: "my_graph",
}

Arrow import processes will be aborted automatically if no data or instructions were received within a configurable timeout. The timeout can be configured via the gds.arrow.abortion_timeout setting, for more information see the installation chapter.

Creating a Neo4j database

This feature is not available in AuraDS.

The Client-Server protocol can also be used to create a new Neo4j database instead of an in-memory graph. To initialize a database import process, we need to change the initial action type to v1/CREATE_DATABASE. The action body is a JSON document containing the configuration for the import process:

{
    name: "my_database",
    concurrency: 4
}

The following table contains all settings for the database import.

Name Type Optional Default value Description

name

String

No

None

The name of the import process and the resulting database.

id_type

String

Yes

INTEGER

Sets the node id type used in the input data. Can be either INTEGER or STRING.

concurrency

Integer

Yes

Available cores

Number of threads to use for the database creation process.

id_property

String

Yes

originalId

The node property key which stores the node id of the input data.

record_format

String

Yes

dbms.record_format

Database record format. Valid values are blank (no value, default), standard, aligned, or high_limit.

force

Boolean

Yes

False

Force deletes any existing database files prior to the import.

high_io

Boolean

Yes

False

Ignore environment-based heuristics, and specify whether the target storage subsystem can support parallel IO with high throughput.

use_bad_collector

Boolean

Yes

False

Collects bad node and relationship records during import and writes them into the log.

After sending the action to initialize the import process, the subsequent protocol is the same as for creating an in-memory graph. See Sending node records via PUT as a Flight stream and Sending relationship records via PUT as a Flight stream for further details.

Supported node identifier types

For the CREATE_DATABASE action, one can set the id_type configuration parameter. The two possible options are INTEGER and STRING, with INTEGER being the default. If set to INTEGER, the node id columns for both node (nodeId) and relationship records (sourceNodeId and targetNodeId), are expected to be represented as BigIntVector. For the STRING id type, the server expects the identifiers to be represented as VarCharVector. In both cases, the original id is being stored as a property on the imported nodes. The property key can be changed by the id_property config option.