Neo4j Spark Connector options and configuration

When using the connector, any valid Neo4j driver option can be set using the option method in Spark, like so:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
        .option("url", "bolt://localhost:7687")
        .option("authentication.type", "basic")
        .option("authentication.basic.username", "myuser")
        .option("authentication.basic.password", "neo4jpassword")
        .option("labels", "Person")
        .load()

Alternatively, you can specify a global configuration in the Spark Session to avoid retyping connection options every time. You can set any Neo4j Connector option, just prepend it with neo4j..

For example, if you want to set the option authentication.type in the session, you have to type neo4j.authentication.type. Here is a full example:

import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder()
    .config("neo4j.url", "bolt://localhost:7687")
    .config("neo4j.authentication.type", "basic")
    .config("neo4j.authentication.basic.username", "myuser")
    .config("neo4j.authentication.basic.password", "neo4jpassword")
    .getOrCreate()

val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Person")
        .load()

val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
        .option("labels", "Product")
        .load()

Configuration on Databricks

On Databricks you can’t set Session configuration at runtime, but you can set Spark configuration on the cluster you’re running your notebooks on. To do this go on the cluster configuration page, click the Advanced Options toggle and then the Spark tab.

Add the Neo4j Connector configuration in the text area like this:

neo4j.url bolt://1.2.3.4
neo4j.authentication.type basic
neo4j.authentication.basic.password mysecret
neo4j.authentication.basic.username neo4j

Neo4j driver options

Under the covers, the Spark connector uses the official Neo4j Java Driver. In many situations, you want the control to set driver options to account for your production deployment of Neo4j and how to communicate with it. You can do this using the options example above.

The following table captures the most common configuration settings to use with the Neo4j driver. For full documentation on all possible configuration options for Neo4j Drivers, see the Neo4j Java Driver manual.

Table 1. List of available configuration settings
Setting name Description Default value Required

Driver options

url

The url of the Neo4j instance to connect to.

When provided with a comma-separated list of URIs, the resolver function feature of the driver will be activated. The first URI will be used as original host while the rest are treated as resolver function outputs.

(none)

Yes

authentication.type

The authentication methods to be used:

  • none

  • basic

  • kerberos

  • custom

  • bearer v.5.1

See Authentication for more information.

basic

No

authentication.basic.username

Username to use for basic authentication type

(Neo4j Driver default)

No

authentication.basic.password

Username to use for basic authentication type

(Neo4j Driver default)

No

authentication.kerberos.ticket

Kerberos Auth Ticket

(Neo4j Driver default)

No

authentication.custom.principal

This is used to identify who this token represents

(Neo4j Driver default)

No

authentication.custom.credentials

These are the credentials authenticating the principal

(Neo4j Driver default)

No

authentication.custom.realm

This is the "realm" string specifying the authentication provider

(Neo4j Driver default)

No

authentication.bearer.token

This is the token to provide for the bearer authentication scheme

(Neo4j Driver default)

No

encryption.enabled

Specify if encryption should be enabled. This setting is ignored if you use a URI scheme with +s or +ssc

false

No

encryption.trust.strategy

Set certificate trust strategy, it is ignored if the connection URI uses +s or +ssc as suffix. Available values are:

  • TRUST_SYSTEM_CA_SIGNED_CERTIFICATES

  • TRUST_CUSTOM_CA_SIGNED_CERTIFICATES

  • TRUST_ALL_CERTIFICATES.

(Neo4j Driver default)

No

encryption.ca.certificate.path

Set certificate path for TRUST_CUSTOM_CA_SIGNED_CERTIFICATES trust strategy

(Neo4j Driver default)

No

connection.max.lifetime.msecs

Connection lifetime in milliseconds

(Neo4j Driver default)

No

connection.liveness.timeout.msecs

Liveness check timeout in milliseconds

(Neo4j Driver default)

No

connection.acquisition.timeout.msecs

Connection acquisition timeout in milliseconds

(Neo4j Driver default)

No

connection.timeout.msecs

Connection timeout in milliseconds

(Neo4j Driver default)

No

Session options

database

Database name to connect to. The driver allows to define the database in the URL, yet in case you set this option, it has the priority compared to the one defined in the URL.

(Neo4j Driver default)

No

access.mode

Possible values are:

  • read

  • write

Used only while you’re pulling data from Neo4j. In case of read, the connector in a cluster environment routes the requests to the followers, otherwise to the leader.

read

No

Multiple connections

Neo4j Connector for Apache Spark allows you to use more than one connection in a single Spark Session. For example, you can read data from a database and write them in another database in the same session.

Reading from a database and writing to a different one
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val df = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://first.host.com:7687")
  .option("labels", "Person")
  .load()

df.write.format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "bolt://second.host.com:7687")
  .option("labels", "Person")
  .save()

Another case to use multiple connections is when you want to merge two datasources.

Merge data from two databases
import org.apache.spark.sql.{SaveMode, SparkSession}

val spark = SparkSession.builder().getOrCreate()

val dfOne = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://first.host.com:7687")
  .option("labels", "Person")
  .load()

val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "bolt://second.host.com:7687")
  .option("labels", "Person")
  .load()

val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))