Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: update merge_insert to add statistics for inserted, updated, deleted rows #2357

Merged
merged 14 commits into from
May 22, 2024
Merged
3 changes: 2 additions & 1 deletion benchmarks/dbpedia-openai/benchmarks.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ def ground_truth(

def compute_recall(gt: np.ndarray, result: np.ndarray) -> float:
recalls = [
np.isin(rst, gt_vector).sum() / rst.shape[0] for (rst, gt_vector) in zip(result, gt)
np.isin(rst, gt_vector).sum() / rst.shape[0]
for (rst, gt_vector) in zip(result, gt)
]
return np.mean(recalls)

Expand Down
1 change: 0 additions & 1 deletion benchmarks/flat/benchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
import time

import lance
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import pyarrow as pa
Expand Down
25 changes: 13 additions & 12 deletions benchmarks/full_report/_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,6 @@
from typing import List

import gzip
import lance
import numpy as np
import pyarrow as pa
import requests


Expand All @@ -33,15 +30,15 @@ def cosine(X, Y):
def knn(
query: np.ndarray,
data: np.ndarray,
metric: Literal['L2', 'cosine'],
metric: Literal["L2", "cosine"],
k: int,
) -> np.ndarray:
if metric == 'L2':
if metric == "L2":
dist = l2
elif metric == 'cosine':
elif metric == "cosine":
dist = cosine
else:
raise ValueError('Invalid metric')
raise ValueError("Invalid metric")
return np.argpartition(dist(query, data), k, axis=1)[:, 0:k]


Expand All @@ -51,10 +48,12 @@ def write_lance(
):
dims = data.shape[1]

schema = pa.schema([
pa.field("vec", pa.list_(pa.float32(), dims)),
pa.field("id", pa.uint32(), False),
])
schema = pa.schema(
[
pa.field("vec", pa.list_(pa.float32(), dims)),
pa.field("id", pa.uint32(), False),
]
)

fsl = pa.FixedSizeListArray.from_arrays(
pa.array(data.reshape(-1).astype(np.float32), type=pa.float32()),
Expand All @@ -65,6 +64,7 @@ def write_lance(

lance.write_dataset(t, path)


# NYT

_DATA_URL = "https://archive.ics.uci.edu/ml/machine-learning-databases/bag-of-words/docword.nytimes.txt.gz"
Expand Down Expand Up @@ -112,7 +112,8 @@ def _get_nyt_vectors(
tfidf = TfidfTransformer().fit_transform(freq)
print("computing dense projection")
dense_projection = random_projection.GaussianRandomProjection(
n_components=output_dims, random_state=42,
n_components=output_dims,
random_state=42,
).fit_transform(tfidf)
dense_projection = dense_projection.astype(np.float32)
np.save(_CACHE_PATH, dense_projection)
Expand Down
1 change: 0 additions & 1 deletion benchmarks/sift/index.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
from subprocess import check_output

import lance
import pyarrow as pa


def main():
Expand Down
17 changes: 8 additions & 9 deletions benchmarks/tpch/benchmark.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
# Benchmark performance Lance vs Parquet w/ Tpch Q1 and Q6
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved
import lance
import pandas as pd
import pyarrow as pa
import duckdb

import sys
Expand Down Expand Up @@ -46,10 +44,10 @@
num_args = len(sys.argv)
assert num_args == 2

query = ''
if sys.argv[1] == 'q1':
query = ""
if sys.argv[1] == "q1":
query = Q1
elif sys.argv[1] == 'q6':
elif sys.argv[1] == "q6":
query = Q6
else:
sys.exit("We only support Q1 and Q6 for now")
Expand All @@ -62,17 +60,18 @@
res1 = duckdb.sql(query).df()
end1 = time.time()

print("Lance Latency: ",str(round(end1 - start1, 3)) + 's')
print("Lance Latency: ", str(round(end1 - start1, 3)) + "s")
print(res1)

##### Parquet #####
lineitem = None
start2 = time.time()
# read from parquet and create a view instead of table from it
duckdb.sql("CREATE VIEW lineitem AS SELECT * FROM read_parquet('./dataset/lineitem_sf1.parquet');")
duckdb.sql(
"CREATE VIEW lineitem AS SELECT * FROM read_parquet('./dataset/lineitem_sf1.parquet');"
)
res2 = duckdb.sql(query).df()
end2 = time.time()

print("Parquet Latency: ",str(round(end2 - start2, 3)) + 's')
print("Parquet Latency: ", str(round(end2 - start2, 3)) + "s")
print(res2)

1 change: 0 additions & 1 deletion docs/conf.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
# Configuration file for the Sphinx documentation builder.

import shutil
import subprocess


def run_apidoc(_):
Expand Down
16 changes: 10 additions & 6 deletions docs/examples/gcs_example.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,29 @@
#
#
# Lance example loading a dataset from Google Cloud Storage
#
# You need to set one of the following environment variables in order to authenticate with GS
# - GOOGLE_SERVICE_ACCOUNT: location of service account file
# - GOOGLE_SERVICE_ACCOUNT_KEY: JSON serialized service account key
#
# Follow this doc in order to create an service key: https://cloud.google.com/iam/docs/keys-create-delete
# Follow this doc in order to create an service key: https://cloud.google.com/iam/docs/keys-create-delete
#

import lance
import pandas as pd

ds = lance.dataset("gs://eto-public/datasets/oxford_pet/oxford_pet.lance")
count = ds.count_rows()
print(f"There are {count} pets")

# You can also write to GCS
import pandas as pd

uri = "gs://eto-public/datasets/oxford_pet/example.lance"
lance.write_dataset(pd.DataFrame({"a": pd.array([10], dtype="Int32")}), uri, mode='create')
lance.write_dataset(
pd.DataFrame({"a": pd.array([10], dtype="Int32")}), uri, mode="create"
)
assert lance.dataset(uri).version == 1

lance.write_dataset(pd.DataFrame({"a": pd.array([5], dtype="Int32")}), uri, mode='append')
lance.write_dataset(
pd.DataFrame({"a": pd.array([5], dtype="Int32")}), uri, mode="append"
)
assert lance.dataset(uri).version == 2

16 changes: 10 additions & 6 deletions python/python/lance/dataset.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,9 @@ class MergeInsertBuilder(_MergeInsertBuilder):
def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
"""Executes the merge insert operation

There is no return value but the original dataset will be updated.
This function updates the original dataset and returns a dictionary with
information about merge statistics - i.e. the number of inserted, updated,
and deleted rows.

Parameters
----------
Expand All @@ -97,7 +99,8 @@ def execute(self, data_obj: ReaderLike, *, schema: Optional[pa.Schema] = None):
source is some kind of generator.
"""
reader = _coerce_reader(data_obj, schema)
super(MergeInsertBuilder, self).execute(reader)

return super(MergeInsertBuilder, self).execute(reader)
raunaks13 marked this conversation as resolved.
Show resolved Hide resolved

# These next three overrides exist only to document the methods

Expand Down Expand Up @@ -945,10 +948,11 @@ def merge_insert(
>>> dataset = lance.write_dataset(table, "example")
>>> new_table = pa.table({"a": [2, 3, 4], "b": ["x", "y", "z"]})
>>> # Perform a "upsert" operation
>>> dataset.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_table)
>>> dataset.merge_insert("a") \\
... .when_matched_update_all() \\
... .when_not_matched_insert_all() \\
... .execute(new_table)
{'num_inserted_rows': 1, 'num_updated_rows': 2, 'num_deleted_rows': 0}
>>> dataset.to_table().sort_by("a").to_pandas()
a b
0 1 b
Expand Down
Loading
Loading