For use in Spark ETLs to populate a Weaviate vector database.
Status: Alpha, data might not always get written to Weaviate so verify your data was actually written to Weaviate.
You can choose one of the following options to install the Spark Weaviate Connector:
You can download the latest JAR from GitHub releases here.
To use in your own Spark job you will first need to build the fat jar of the package by running
sbt assembly
which will create the artifact in
./target/scala-2.12/weaviate-spark-connector-assembly-0.1.0-SNAPSHOT.jar
.
You can configure spark-shell or tools like spark-submit to use the JAR like this:
spark-shell --jars weaviate-spark-connector-assembly-0.1.0-SNAPSHOT.jar
To run on Databricks simply upload the jar file to your cluster in the libraries tab as in the below image.
After installation your cluster page should look something like this.
You can also use Maven to include the Weaviate Spark Connector as dependency in your Spark application.
COMING SOON
COMING soon
With this package loading data from Spark is as easy as this!
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.mode("append")
.save()
)
If you already have vectors available in your dataframe you can easily supply them with the vector option.
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.option("vector", vector_column_name)
.mode("append")
.save()
)
By default the Weaviate client will create document IDs for you for new documents but if you already have IDs you can also supply those in the dataframe. If you already have vectors available in your dataframe you can easily supply them with the vector option.
(
my_df
.write
.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", weaviate_host)
.option("className", "MyClass")
.option("id", id_column_name)
.mode("append")
.save()
)
Currently only the append write mode is supported. We do not yet support upsert or error if exists write semantics.
Currently only batch operations are supported. We do not yet support streaming writes.
The connector is able to automatically infer the correct Spark DataType based on your schema for the class in Weaviate. Your DataFrame column name needs to match the property name of your class in Weaviate. The table below shows how the connector infers the DataType:
Weaviate DataType | Spark DataType | Notes |
---|---|---|
string | StringType | |
string[] | Array[StringType] | |
int | IntegerType | Weaviate only supports int32 for now. More info here. |
int[] | Array[IntegerType] | |
boolean | BooleanType | |
boolean[] | Array[BooleanType] | |
number | DoubleType | |
number[] | Array[DoubleType] | |
date | DateType | |
date[] | Array[DateType] | |
text | StringType | |
text[] | StringType | |
geoCoordinates | StringType | |
phoneNumber | StringType | |
blob | StringType | Encode your blob as base64 string |
vector | Array[FloatType] | |
cross reference | string | Not supported yet |
Please also take a look at the Weaviate data types docs and the Spark DataType docs.
This repository uses SBT to compile the code. SBT can be installed on MacOS following the instructions here.
You will also need Java 8+ and Scala 2.12 installed. The easiest way to get everything set up is to install IntelliJ.
To compile the package simply run sbt compile
to ensure that you have everything needed to run the Spark connector.
The unit and integration tests can be run via sbt test
.
The integration tests stand up a local Weaviate instance running in docker and then run the Apache Spark code in a separate docker container. You will need to have docker running to run all tests.
sbt assembly
docker build -t spark-with-weaviate .
docker run -it spark-with-weaviate /opt/spark/bin/spark-shell
case class Article (title: String, content: String)
val articles = Seq( Article("Sam", "Sam")).toDF
articles.write.format("io.weaviate.spark.Weaviate")
.option("scheme", "http")
.option("host", "localhost:8080")
.mode("append").save()