Skip to content

Commit

Permalink
feat: update merge_insert to add statistics for inserted, updated, de…
Browse files Browse the repository at this point in the history
…leted rows (#2357)

Addresses #2019
  • Loading branch information
raunaks13 committed May 22, 2024
1 parent e310ab4 commit 83ecc01
Show file tree
Hide file tree
Showing 12 changed files with 249 additions and 104 deletions.
3 changes: 2 additions & 1 deletion benchmarks/dbpedia-openai/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def ground_truth(

def compute_recall(gt: np.ndarray, result: np.ndarray) -> float:
recalls = [
np.isin(rst, gt_vector).sum() / rst.shape[0] for (rst, gt_vector) in zip(result, gt)
np.isin(rst, gt_vector).sum() / rst.shape[0]
for (rst, gt_vector) in zip(result, gt)
]
return np.mean(recalls)

Expand Down
1 change: 0 additions & 1 deletion benchmarks/flat/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import time

import lance
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down
25 changes: 13 additions & 12 deletions benchmarks/full_report/_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from typing import List

import gzip
import lance
import numpy as np
import pyarrow as pa
import requests


Expand All @@ -33,15 +30,15 @@ def cosine(X, Y):
def knn(
query: np.ndarray,
data: np.ndarray,
metric: Literal['L2', 'cosine'],
metric: Literal["L2", "cosine"],
k: int,
) -> np.ndarray:
if metric == 'L2':
if metric == "L2":
dist = l2
elif metric == 'cosine':
elif metric == "cosine":
dist = cosine
else:
raise ValueError('Invalid metric')
raise ValueError("Invalid metric")
return np.argpartition(dist(query, data), k, axis=1)[:, 0:k]


Expand All @@ -51,10 +48,12 @@ def write_lance(
):
dims = data.shape[1]

schema = pa.schema([
pa.field("vec", pa.list_(pa.float32(), dims)),
pa.field("id", pa.uint32(), False),
])
schema = pa.schema(
[
pa.field("vec", pa.list_(pa.float32(), dims)),
pa.field("id", pa.uint32(), False),
]
)

fsl = pa.FixedSizeListArray.from_arrays(
pa.array(data.reshape(-1).astype(np.float32), type=pa.float32()),
Expand All @@ -65,6 +64,7 @@ def write_lance(

lance.write_dataset(t, path)


# NYT

_DATA_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/docword.nytimes.txt.gz"
Expand Down Expand Up @@ -112,7 +112,8 @@ def _get_nyt_vectors(
tfidf = TfidfTransformer().fit_transform(freq)
print("computing dense projection")
dense_projection = random_projection.GaussianRandomProjection(
n_components=output_dims, random_state=42,
n_components=output_dims,
random_state=42,
).fit_transform(tfidf)
dense_projection = dense_projection.astype(np.float32)
np.save(_CACHE_PATH, dense_projection)
Expand Down
1 change: 0 additions & 1 deletion benchmarks/sift/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from subprocess import check_output

import lance
import pyarrow as pa


def main():
Expand Down
17 changes: 8 additions & 9 deletions benchmarks/tpch/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Benchmark performance Lance vs Parquet w/ Tpch Q1 and Q6
import lance
import pandas as pd
import pyarrow as pa
import duckdb

import sys
Expand Down Expand Up @@ -46,10 +44,10 @@
num_args = len(sys.argv)
assert num_args == 2

query = ''
if sys.argv[1] == 'q1':
query = ""
if sys.argv[1] == "q1":
query = Q1
elif sys.argv[1] == 'q6':
elif sys.argv[1] == "q6":
query = Q6
else:
sys.exit("We only support Q1 and Q6 for now")
Expand All @@ -62,17 +60,18 @@
res1 = duckdb.sql(query).df()
end1 = time.time()

print("Lance Latency: ",str(round(end1 - start1, 3)) + 's')
print("Lance Latency: ", str(round(end1 - start1, 3)) + "s")
print(res1)

##### Parquet #####
lineitem = None
start2 = time.time()
# read from parquet and create a view instead of table from it
duckdb.sql("CREATE VIEW lineitem AS SELECT * FROM read_parquet('./dataset/lineitem_sf1.parquet');")
duckdb.sql(
"CREATE VIEW lineitem AS SELECT * FROM read_parquet('./dataset/lineitem_sf1.parquet');"
)
res2 = duckdb.sql(query).df()
end2 = time.time()

print("Parquet Latency: ",str(round(end2 - start2, 3)) + 's')
print("Parquet Latency: ", str(round(end2 - start2, 3)) + "s")
print(res2)

1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Configuration file for the Sphinx documentation builder.

import shutil
import subprocess


def run_apidoc(_):
Expand Down
16 changes: 10 additions & 6 deletions docs/examples/gcs_example.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
#
#
# Lance example loading a dataset from Google Cloud Storage
#
# You need to set one of the following environment variables in order to authenticate with GS
# - GOOGLE_SERVICE_ACCOUNT: location of service account file
# - GOOGLE_SERVICE_ACCOUNT_KEY: JSON serialized service account key
#
# Follow this doc in order to create an service key: https://cloud.google.com/iam/docs/keys-create-delete
# Follow this doc in order to create an service key: https://cloud.google.com/iam/docs/keys-create-delete
#

import lance
import pandas as pd

ds = lance.dataset("gs://eto-public/datasets/oxford_pet/oxford_pet.lance")
count = ds.count_rows()
print(f"There are {count} pets")

# You can also write to GCS
import pandas as pd

uri = "gs://eto-public/datasets/oxford_pet/example.lance"
lance.write_dataset(pd.DataFrame({"a": pd.array([10], dtype="Int32")}), uri, mode='create')
lance.write_dataset(
pd.DataFrame({"a": pd.array([10], dtype="Int32")}), uri, mode="create"
)
assert lance.dataset(uri).version == 1

lance.write_dataset(pd.DataFrame({"a": pd.array([5], dtype="Int32")}), uri, mode='append')
lance.write_dataset(
pd.DataFrame({"a": pd.array([5], dtype="Int32")}), uri, mode="append"
)
assert lance.dataset(uri).version == 2

16 changes: 10 additions & 6 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class MergeInsertBuilder(_MergeInsertBuilder):
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
"""Executes the merge insert operation
There is no return value but the original dataset will be updated.
This function updates the original dataset and returns a dictionary with
information about merge statistics - i.e. the number of inserted, updated,
and deleted rows.
Parameters
----------
Expand All @@ -97,7 +99,8 @@ def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
source is some kind of generator.
"""
reader = _coerce_reader(data_obj, schema)
super(MergeInsertBuilder, self).execute(reader)

return super(MergeInsertBuilder, self).execute(reader)

# These next three overrides exist only to document the methods

Expand Down Expand Up @@ -945,10 +948,11 @@ def merge_insert(
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_table)
>>> dataset.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
a b
0 1 b
Expand Down
Loading

0 comments on commit 83ecc01

Please sign in to comment.