Interacting with Data Warehouses

Exchange data between Snowflake and Neo4j

Snowflake is a fully managed SaaS (software as a service) that provides a single platform for data warehousing, data lakes, data engineering, data science, data application development, and secure sharing and consumption of real-time / shared data. Snowflake has out-of-the-box features like separation of storage and compute, on-the-fly scalable compute, data sharing, data cloning, and third-party tools support to handle the demanding needs of growing enterprises.

Prerequisites

You need Snowflake instance up-and-running. If you don’t have one, you can create it from here.

Dependencies

Required dependencies are:

  • net.snowflake:spark-snowflake_<scala_version>:<version>

  • net.snowflake:snowflake-jdbc:<version>

From Snowflake to Neo4j

// Step (1)
// Load a table into a Spark DataFrame
val snowflakeDF: DataFrame = spark.read
  .format("snowflake")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .load()

// Step (2)
// Save the `snowflakeDF` as nodes with labels `Person` and `Customer` into Neo4j
snowflakeDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
snowflakeDF = (spark.read
  .format("snowflake")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .load())

# Step (2)
# Save the `snowflakeDF` as nodes with labels `Person` and `Customer` into Neo4j
(snowflakeDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

From Neo4j to Snowflake

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Snowflake
neo4jDF.write
  .format("snowflake")
  .mode("overwrite")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Snowflake
(neo4jDF.write
  .format("snowflake")
  .mode("overwrite")
  .option("sfURL", "<account_identifier>.snowflakecomputing.com")
  .option("sfUser", "<user_name>")
  .option("sfPassword", "<password>")
  .option("sfDatabase", "<database>")
  .option("sfSchema", "<schema>")
  .option("dbtable", "CUSTOMER")
  .save())

Exchange data between BigQuery and Neo4j

BigQuery is a fully-managed, serverless data warehouse that enables scalable analysis over petabytes of data.

Prerequisites

You need a Google BigQuery instance up-and-running. If you don’t have one you can create it from here.

Dependencies

If you’re in the Databricks Runtime environment you don’t need to add any external dependency, otherwise you may need:

  • com.google.cloud.spark:spark-bigquery-with-dependencies_<scala_version>:<version>

From BigQuery to Neo4j

// Step (1)
// Load a table into a Spark DataFrame
val bigqueryDF: DataFrame = spark.read
  .format("bigquery")
  .option("table", "google.com:bigquery-public-data.stackoverflow.post_answers")
  .load()

// Step (2)
// Save the `bigqueryDF` as nodes with labels `Person` and `Customer` into Neo4j
bigqueryDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Answer")
  .load()
# Step (1)
# Load a table into a Spark DataFrame
bigqueryDF = (spark.read
  .format("bigquery")
  .option("table", "google.com:bigquery-public-data.stackoverflow.post_answers")
  .load())

# Step (2)
# Save the `bigqueryDF` as nodes with labels `Person` and `Customer` into Neo4j
(bigqueryDF.write
  .format("org.neo4j.spark.DataSource")
  .mode("ErrorIfExists")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Answer")
  .load())

From Neo4j to BigQuery

// Step (1)
// Load `:Answer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Answer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into BigQuery
neo4jDF.write
  .format("bigquery")
  .mode("overwrite")
  .option("temporaryGcsBucket", "<my-bigquery-temp>")
  .option("table", "<my-project-id>:<my-private-database>.stackoverflow.answers")
  .save()
# Step (1)
# Load `:Answer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Answer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into BigQuery
(neo4jDF.write
  .format("bigquery")
  .mode("overwrite")
  .option("temporaryGcsBucket", "<my-bigquery-temp>")
  .option("table", "<my-private-database>.stackoverflow.answers")
  .save())

Exchange data between Redshift and Neo4j

Amazon Redshift uses SQL to analyze structured and semi-structured data across data warehouses, operational databases, and data lakes, using AWS-designed hardware and machine learning to deliver the best price performance at any scale.

Prerequisites

You need an Amazon Redshift instance up-and-running. If you don’t have one you can create it from here.

Dependencies

If you’re in the Databricks Runtime environment you don’t need to add any external dependency, otherwise you may need:

  • com.amazon.redshift:redshift-jdbc42:<version>

  • org.apache.spark:spark-avro_<scala_version>:<version>

  • io.github.spark-redshift-community:spark-redshift_<scala_version>:<version>

  • com.amazonaws:aws-java-sdk:<version>

From Redshift to Neo4j

In Databricks Runtime

A good starting point in this case is this Databricks Guide.

// Step (1)
// Load a table into a Spark DataFrame
val redshiftDF: DataFrame = spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load()

// Step (2)
// Save the `redshiftDF` as nodes with labels `Person` and `Customer` into Neo4j
redshiftDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
redshiftDF = (spark.read
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load())

# Step (2)
# Save the `redshiftDF` as nodes with labels `Person` and `Customer` into Neo4j
(redshiftDF.write
  .format("org.neo4j.spark.DataSource")
  .mode("ErrorIfExists")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

In any other Spark Runtime with Redshift community dependencies

A good starting point in this case is this Redshift Community repository

// Step (1)
// Load a table into a Spark DataFrame
val redshiftDF: DataFrame = spark.read
  .format("io.github.spark_redshift_community.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load()

// Step (2)
// Save the `redshiftDF` as nodes with labels `Person` and `Customer` into Neo4j
redshiftDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
redshiftDF = (spark.read
  .format("io.github.spark_redshift_community.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .load())

# Step (2)
# Save the `redshiftDF` as nodes with labels `Person` and `Customer` into Neo4j
(redshiftDF.write
  .format("org.neo4j.spark.DataSource")
  .mode("ErrorIfExists")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

From Neo4j to Redshift

In Databricks Runtime

A good starting point in this case is this Databricks Guide.

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Redshift
neo4jDF.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Redshift
(neo4jDF.write
  .format("com.databricks.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save())

In any other Spark Runtime with Redshift community dependencies

A good starting point in this case is this RediShift Community repository.

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Redshift
neo4jDF.write
  .format("io.github.spark_redshift_community.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Redshift
(neo4jDF.write
  .format("io.github.spark_redshift_community.spark.redshift")
  .option("url", "jdbc:redshift://<the-rest-of-the-connection-string>")
  .option("dbtable", "CUSTOMER")
  .option("tempdir", "s3a://<your-bucket>/<your-directory-path>")
  .mode("error")
  .save())

Exchange data between Azure Synapse Analytics and Neo4j

Azure Synapse Analytics (formerly SQL Data Warehouse) is a cloud-based enterprise data warehouse that leverages massively parallel processing (MPP) to quickly run complex queries across petabytes of data.

Prerequisites

You need an Azure Synapse Analytics instance up-and-running. If you don’t have one you can create it from here.

Dependencies

Azure Synapse Analytics works via Spark only in Databricks Runtime as the required connector is not released publicly.

Authentication

The Azure Synapse Connector uses three types of network connections:

  • Spark driver to Azure Synapse

  • Spark driver and executors to Azure storage account

  • Azure Synapse to Azure storage account

To choose the authentication method that fits better for your use-case we suggest to check the official Azure Synapse Docs

From Azure Synapse Analytics to Neo4j

Given the authentication method that you choose, following an example about how to ingest data from an Azure Synapse Analytics table into Neo4j as nodes:

// Step (1)
// Load a table into a Spark DataFrame
val azureDF: DataFrame = spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .load()

// Step (2)
// Save the `azureDF` as nodes with labels `Person` and `Customer` into Neo4j
azureDF.write
  .format("org.neo4j.spark.DataSource")
  .mode(SaveMode.ErrorIfExists)
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save()
# Step (1)
# Load a table into a Spark DataFrame
azureDF = (spark.read
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .load())

# Step (2)
# Save the `azureDF` as nodes with labels `Person` and `Customer` into Neo4j
(azureDF.write
  .format("org.neo4j.spark.DataSource")
  .mode("ErrorIfExists")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .save())

From Neo4j to Azure Synapse Analytics

Given the authentication method that you choose, following an example about how to ingest data from Neo4j into an Azure Synapse Analytics table:

// Step (1)
// Load `:Person:Customer` nodes as DataFrame
val neo4jDF: DataFrame = spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load()

// Step (2)
// Save the `neo4jDF` as table CUSTOMER into Azure Synapse Analytics
neo4jDF.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .save()
# Step (1)
# Load `:Person:Customer` nodes as DataFrame
neo4jDF = (spark.read.format("org.neo4j.spark.DataSource")
  .option("url", "neo4j://<host>:<port>")
  .option("labels", ":Person:Customer")
  .load())

# Step (2)
# Save the `neo4jDF` as table CUSTOMER into Azure Synapse Analytics
(neo4jDF.write
  .format("com.databricks.spark.sqldw")
  .option("url", "jdbc:sqlserver://<the-rest-of-the-connection-string>")
  .option("dbTable", "CUSTOMER")
  .save())