Options and configuration
When using the connector, any valid Neo4j driver option can be set using the option
method in
Spark, like so:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://localhost:7687")
.option("authentication.type", "basic")
.option("authentication.basic.username", "myuser")
.option("authentication.basic.password", "neo4jpassword")
.option("labels", "Person")
.load()
Alternatively, you can specify a global configuration in the Spark Session to avoid retyping connection options every time.
You can set any Neo4j Connector option, just prepend it with neo4j.
.
For example, if you want to set the option authentication.type
in the session, you have to type neo4j.authentication.type
.
Here is a full example:
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder()
.config("neo4j.url", "neo4j://localhost:7687")
.config("neo4j.authentication.type", "basic")
.config("neo4j.authentication.basic.username", "myuser")
.config("neo4j.authentication.basic.password", "neo4jpassword")
.getOrCreate()
val dfPerson = spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Person")
.load()
val dfProduct = spark.read.format("org.neo4j.spark.DataSource")
.option("labels", "Product")
.load()
Custom authentication supplierIntroduced in 5.3.9
Starting from version 5.3.9, the connector provides an option to use a custom authentication supplier in addition to
already supported authentication types, such as none
, basic
, kerberos
, custom
and bearer
.
The authentication supplier needs to implement interface org.neo4j.connectors.authn.AuthenticationTokenSupplierFactory
from the org.neo4j.connectors:commons-authn-spi
library.
The interface requires implementing two methods:
-
String getName() - returns the name of the supplier. Must be unique and not clash with existing authentication types.
-
Supplier<AuthenticationToken> create(String, String, Map<String, String>) - accepts username, password and additional parameters and return a supplier of
AuthenticationToken
.
An AuthenticationToken
instance is an abstract representation of an arbitrary authentication to be presented to the Neo4j server and can contain a principal and its credentials (username and password), a token or other ways of confirming an identity.
The interface itself offers the following factories:
-
AuthenticationToken#bearer
-
AuthenticationToken#kerberos
-
AuthenticationToken#none
-
AuthenticationToken#usernameAndPassword
-
AuthenticationToken#custom
The typical use-case for an authentication supplier is supporting expiring tokens issued by OAuth 2.0 or OIDC providers.
Custom authentication supplier can be used via the service loader mechanism and by setting configuration parameter authentication.type
to the name
of the supplier. Additional parameters can be specified as configuration options prefixed with authentication.$name.
.
Keycloak authentication supplier example
This supplier is based on the Keycloak client, which requires Java 11 or later. |
An authentication supplier for Keycloak is provided as part of the org.neo4j.connectors:commons-authn-keycloak
library.
It includes:
-
A factory class creating an instance of an authentication token supplier:
package org.neo4j.connectors.authn.keycloak; import java.util.Map; import java.util.Objects; import java.util.function.Supplier; import org.apache.http.impl.client.HttpClients; import org.keycloak.authorization.client.Configuration; import org.neo4j.connectors.authn.AuthenticationToken; import org.neo4j.connectors.authn.AuthenticationTokenSupplierFactory; public class KeycloakOIDCAuthenticationSupplierFactory implements AuthenticationTokenSupplierFactory { @Override public String getName() { return "keycloak"; } @Override public Supplier<AuthenticationToken> create(String username, String password, Map<String, String> parameters) { var url = Objects.requireNonNull(parameters.get("authServerUrl")); var realm = Objects.requireNonNull(parameters.get("realm")); var clientId = Objects.requireNonNull(parameters.get("clientId")); var clientSecret = Objects.requireNonNull(parameters.get("clientSecret")); return new KeycloakOIDCAuthenticationSupplier( username, password, new Configuration(url, realm, clientId, Map.of("secret", clientSecret), HttpClients.createMinimal())); } }
-
A supplier that collects an access token from a Keycloak server and refreshes it on expiration:
import com.fasterxml.jackson.jr.ob.JSON; import java.io.IOException; import java.io.UncheckedIOException; import java.time.Instant; import java.util.Base64; import java.util.Map; import java.util.concurrent.atomic.AtomicReference; import java.util.function.Supplier; import org.keycloak.authorization.client.AuthzClient; import org.keycloak.authorization.client.Configuration; import org.keycloak.authorization.client.util.Http; import org.keycloak.representations.AccessTokenResponse; import org.neo4j.connectors.authn.AuthenticationToken; public class KeycloakOIDCAuthenticationSupplier implements Supplier<AuthenticationToken> { private final String username; private final String password; private final Configuration config; private final AuthzClient client; private final Http http; private final String url; private final AtomicReference<AuthenticationTokenAndTime> token = new AtomicReference<>(); KeycloakOIDCAuthenticationSupplier(String username, String password, Configuration config) { this.username = username; this.password = password; this.config = config; this.client = AuthzClient.create(config); this.url = constructUrl(config); this.http = new Http(config, config.getClientCredentialsProvider()); } public static Supplier<AuthenticationToken> of(String username, String password, Configuration config) { return new KeycloakOIDCAuthenticationSupplier(username, password, config); } private String constructUrl(Configuration config) { return config.getAuthServerUrl() + "/realms/" + config.getRealm() + "/protocol/openid-connect/token"; } public boolean currentTokenIsExpired() { return token.get() == null || token.get().expireAt.isBefore(Instant.now()); } @Override public AuthenticationToken get() { AuthenticationTokenAndTime freshToken = this.token.updateAndGet(this::get0); return freshToken.toAuthenticationToken(); } private AuthenticationTokenAndTime get0(AuthenticationTokenAndTime previous) { if (previous == null) { return fetch(); } else { return refresh(previous.refreshToken); } } private AuthenticationTokenAndTime fetch() { try { AccessTokenResponse response = this.client.obtainAccessToken(this.username, this.password); return AuthenticationTokenAndTime.of(response); } catch (IOException e) { throw new UncheckedIOException(e); } } private AuthenticationTokenAndTime refresh(String refreshToken) { try { AccessTokenResponse response = this.http .<AccessTokenResponse>post(this.url) .authentication() .client() .form() .param("grant_type", "refresh_token") .param("refresh_token", refreshToken) .param("client_id", this.config.getResource()) .param("client_secret", (String) this.config.getCredentials().get("secret")) .response() .json(AccessTokenResponse.class) .execute(); return AuthenticationTokenAndTime.of(response); } catch (IOException e) { throw new UncheckedIOException(e); } } private static final class AuthenticationTokenAndTime { static final Base64.Decoder DECODER = Base64.getDecoder(); private final String token; private final Instant expireAt; private final String refreshToken; public AuthenticationTokenAndTime(String token, Instant expireAt, String refreshToken) { this.token = token; this.expireAt = expireAt; this.refreshToken = refreshToken; } static AuthenticationTokenAndTime of(AccessTokenResponse accessTokenResponse) throws IOException { String token = accessTokenResponse.getToken(); String[] chunks = token.split("\\."); Map<String, Object> payload = JSON.std.mapFrom(DECODER.decode(chunks[1])); long epoch = ((Number) payload.get("exp")).longValue(); Instant expireAt = Instant.ofEpochSecond(epoch); return new AuthenticationTokenAndTime(token, expireAt, accessTokenResponse.getRefreshToken()); } AuthenticationToken toAuthenticationToken() { return AuthenticationToken.bearer(token, expireAt); } } }
You can configure Spark to use keycloak authentication as follows:
val df = spark.read
.format("org.neo4j.spark.DataSource")
.option("url", s"$NEO4J_URL")
.option("authentication.type", "keycloak")
.option("authentication.keycloak.username", s"$KEYCLOAK_USERNAME")
.option("authentication.keycloak.password", s"$KEYCLOAK_PASSWORD")
.option("authentication.keycloak.authServerUrl", s"$KEYCLOAK_URL")
.option("authentication.keycloak.realm", s"$KEYCLOAK_REALM")
.option("authentication.keycloak.clientId", s"$KEYCLOAK_CLIENT_ID")
.option("authentication.keycloak.clientSecret", s"$KEYCLOAK_CLIENT_SECRET")
.option("query", "MATCH (n:Person) WITH n LIMIT 2 RETURN id(n) as id, n.age as age")
.load()
Neo4j driver options
Under the covers, the Spark connector uses the official Neo4j Java Driver. In many situations, you want the control to set driver options to account for your production deployment of Neo4j and how to communicate with it. You can do this using the options
example above.
The following table captures the most common configuration settings to use with the Neo4j driver. For full documentation on all possible configuration options for Neo4j Drivers, see the Neo4j Java Driver manual.
Setting name | Description | Default value | Required |
---|---|---|---|
Driver options |
|||
|
The url of the Neo4j instance to connect to. When provided with a comma-separated list of URIs, the resolver function feature of the driver will be activated. The first URI will be used as original host while the rest are treated as resolver function outputs. |
(none) |
Yes |
|
The authentication methods to be used:
See Authentication for more information. |
|
No |
|
Username to use for basic authentication type |
(Neo4j Driver default) |
No |
|
Username to use for basic authentication type |
(Neo4j Driver default) |
No |
|
Kerberos Auth Ticket |
(Neo4j Driver default) |
No |
|
This is used to identify who this token represents |
(Neo4j Driver default) |
No |
|
These are the credentials authenticating the principal |
(Neo4j Driver default) |
No |
|
This is the "realm" string specifying the authentication provider |
(Neo4j Driver default) |
No |
|
This is the token to provide for the bearer authentication scheme |
(Neo4j Driver default) |
No |
|
Specify if encryption should be enabled.
This setting is ignored if you use a URI scheme with |
|
No |
|
Set certificate trust strategy, it is ignored if the connection URI uses
|
(Neo4j Driver default) |
No |
|
Set certificate path for |
(Neo4j Driver default) |
No |
|
Connection lifetime in milliseconds |
(Neo4j Driver default) |
No |
|
Liveness check timeout in milliseconds |
(Neo4j Driver default) |
No |
|
Connection acquisition timeout in milliseconds |
(Neo4j Driver default) |
No |
|
Connection timeout in milliseconds |
(Neo4j Driver default) |
No |
|
Transaction timeout in milliseconds |
(Neo4j Driver default) |
No |
Session options |
|||
|
Database name to connect to. The driver allows to define the database in the URL, yet in case you set this option, it has the priority compared to the one defined in the URL. |
(Neo4j Driver default) |
No |
|
Possible values are:
Used only while you’re pulling data from Neo4j.
In case of |
|
No |
Multiple connections
Neo4j Connector for Apache Spark allows you to use more than one connection in a single Spark Session. For example, you can read data from a database and write them in another database in the same session.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val df = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://first.host.com:7687")
.option("labels", "Person")
.load()
df.write.format("org.neo4j.spark.DataSource")
.mode(SaveMode.ErrorIfExists)
.option("url", "neo4j://second.host.com:7687")
.option("labels", "Person")
.save()
Another case to use multiple connections is when you want to merge two datasources.
import org.apache.spark.sql.{SaveMode, SparkSession}
val spark = SparkSession.builder().getOrCreate()
val dfOne = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://first.host.com:7687")
.option("labels", "Person")
.load()
val dfTwo = spark.read.format("org.neo4j.spark.DataSource")
.option("url", "neo4j://second.host.com:7687")
.option("labels", "Person")
.load()
val dfJoin = dfOne.join(dfTwo, dfOne("name") === dfTwo("name"))