Skip to content

Commit

Permalink
refactor: QdrantVectorHandler
Browse files Browse the repository at this point in the history
  • Loading branch information
Anush008 committed Apr 28, 2024
1 parent e7f3bfd commit e50330c
Show file tree
Hide file tree
Showing 5 changed files with 15 additions and 25 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/release.yml
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ jobs:
- name: Semantic Release
run: |
bun install @conveyal/maven-semantic-release semantic-release @semantic-release/git conventional-changelog-conventionalcommits
bun x semantic-release --prepare @conveyal/maven-semantic-release,@semantic-release/git --publish @semantic-release/github,@conveyal/maven-semantic-release --verify-conditions @semantic-release/github,@conveyal/maven-semantic-release,@semantic-release/git --verify-release @conveyal/maven-semantic-release
bun x semantic-release --prepare @conveyal/maven-semantic-release --publish @semantic-release/github,@conveyal/maven-semantic-release --verify-conditions @semantic-release/github,@conveyal/maven-semantic-release --verify-release @conveyal/maven-semantic-release
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
GPG_KEY_NAME: ${{ secrets.GPG_KEY_NAME }}
Expand Down
11 changes: 1 addition & 10 deletions .releaserc
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,6 @@
]
}
],
"@semantic-release/release-notes-generator",
[
"@semantic-release/git",
{
"assets": [
"pom.xml"
],
"message": "chore(release): ${nextRelease.version} [skip ci]\n\n${nextRelease.notes}"
}
]
"@semantic-release/release-notes-generator"
]
}
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ spark = SparkSession.builder.config(
The connector supports ingesting multiple named/unnamed, dense/sparse vectors.

_Click each to expand._

<details>
<summary><b>Unnamed/Default vector</b></summary>

Expand Down Expand Up @@ -194,7 +196,6 @@ You can use the connector as a library in Databricks to ingest data into Qdrant.

<img width="704" alt="Screenshot 2024-04-28 at 11 34 17 AM" src="https://github.com/qdrant/qdrant-spark/assets/46051506/0c1bd356-3fba-436a-90ce-d8ff39b02d1f">


## Datatype support

The appropriate Spark data types are mapped to the Qdrant payload based on the provided `schema`.
Expand Down
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>io.qdrant</groupId>
<artifactId>spark</artifactId>
<version>2.2.0</version>
<version>2.2.1</version>
<name>qdrant-spark</name>
<url>https://github.com/qdrant/qdrant-spark</url>
<description>An Apache Spark connector for the Qdrant vector database</description>
Expand Down
22 changes: 10 additions & 12 deletions src/main/java/io/qdrant/spark/QdrantVectorHandler.java
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import io.qdrant.client.grpc.Points.Vectors;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
Expand All @@ -26,8 +25,7 @@ public static Vectors prepareVectors(

// Maitaining support for the "embedding_field" and "vector_name" options
if (!options.embeddingField.isEmpty()) {
int embeddingFieldIndex = schema.fieldIndex(options.embeddingField);
float[] embeddings = record.getArray(embeddingFieldIndex).toFloatArray();
float[] embeddings = extractFloatArray(record, schema, options.embeddingField);
// 'options.vectorName' defaults to ""
vectorsBuilder.mergeFrom(
namedVectors(Collections.singletonMap(options.vectorName, vector(embeddings))));
Expand All @@ -42,9 +40,10 @@ private static Vectors prepareSparseVectors(

for (int i = 0; i < options.sparseVectorNames.length; i++) {
String name = options.sparseVectorNames[i];
List<Float> values = extractFloatArray(record, schema, options.sparseVectorValueFields[i]);
List<Integer> indices = extractIntArray(record, schema, options.sparseVectorIndexFields[i]);
sparseVectors.put(name, vector(values, indices));
float[] values = extractFloatArray(record, schema, options.sparseVectorValueFields[i]);
int[] indices = extractIntArray(record, schema, options.sparseVectorIndexFields[i]);

sparseVectors.put(name, vector(Floats.asList(values), Ints.asList(indices)));
}

return namedVectors(sparseVectors);
Expand All @@ -56,22 +55,21 @@ private static Vectors prepareDenseVectors(

for (int i = 0; i < options.vectorNames.length; i++) {
String name = options.vectorNames[i];
List<Float> values = extractFloatArray(record, schema, options.vectorFields[i]);
float[] values = extractFloatArray(record, schema, options.vectorFields[i]);
denseVectors.put(name, vector(values));
}

return namedVectors(denseVectors);
}

private static List<Float> extractFloatArray(
private static float[] extractFloatArray(
InternalRow record, StructType schema, String fieldName) {
int fieldIndex = schema.fieldIndex(fieldName);
return Floats.asList(record.getArray(fieldIndex).toFloatArray());
return record.getArray(fieldIndex).toFloatArray();
}

private static List<Integer> extractIntArray(
InternalRow record, StructType schema, String fieldName) {
private static int[] extractIntArray(InternalRow record, StructType schema, String fieldName) {
int fieldIndex = schema.fieldIndex(fieldName);
return Ints.asList(record.getArray(fieldIndex).toIntArray());
return record.getArray(fieldIndex).toIntArray();
}
}

0 comments on commit e50330c

Please sign in to comment.