Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update Polars and Kùzu for parquet import #32

Merged
merged 6 commits into from
Oct 13, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions data/create_edges_follows.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ def create_super_node_edges(persons_df: pl.DataFrame) -> pl.DataFrame:
# Take in the column val of num_connections and return a list of IDs from persons_df
super_nodes_df.with_columns(
pl.col("num_connections")
.apply(lambda x: select_random_ids(persons_df, x))
.map_elements(lambda x: select_random_ids(persons_df, x))
.alias("connections")
)
# Explode the connections column to create a row for each connection
Expand Down Expand Up @@ -97,7 +97,7 @@ def main() -> None:
edges_df = edges_df.head(NUM)
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df.write_parquet(Path("output/edges") / "follows.parquet")
edges_df.write_parquet(Path("output/edges") / "follows.parquet", compression="snappy")
print(f"Wrote {len(edges_df)} edges for {len(persons_df)} persons")


Expand Down
4 changes: 2 additions & 2 deletions data/create_edges_interests.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main() -> None:
# Take in the column val of num_connections and return a list of IDs from persons_df
persons_df.with_columns(
pl.col("num_interests")
.apply(lambda x: select_random_ids(interests_df, "interest_id", x))
.map_elements(lambda x: select_random_ids(interests_df, "interest_id", x))
.alias("interests")
)
# Explode the connections column to create a row for each connection
Expand All @@ -51,7 +51,7 @@ def main() -> None:
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df = edges_df.rename({"id": "from", "interests": "to"})
edges_df.write_parquet(Path("output/edges") / "interests.parquet")
edges_df.write_parquet(Path("output/edges") / "interests.parquet", compression="snappy")
print(f"Wrote {len(edges_df)} edges for {len(persons_df)} persons")


Expand Down
5 changes: 3 additions & 2 deletions data/create_edges_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def main() -> None:
city_ids_df = pl.DataFrame(city_ids).rename({"column_0": "city_id"})
# Horizontally stack the person IDs and the residence city IDs to create a list of edges
edges_df = pl.concat([persons_df, city_ids_df], how="horizontal")
city_counts_df = edges_df.groupby("city_id").count().sort("count", descending=True)
city_counts_df = edges_df.group_by("city_id").count().sort("count", descending=True)
top_cities_df = (
city_counts_df.join(residence_loc_df, on="city_id", how="left")
# List top 5 cities
Expand All @@ -50,7 +50,8 @@ def main() -> None:
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df = edges_df.rename({"city_id": "to", "id": "from"}).write_parquet(
Path("output/edges") / "lives_in.parquet"
Path("output/edges") / "lives_in.parquet",
compression="snappy",
)
print(f"Generated residence cities for persons. Top 5 common cities are: {', '.join(top_5)}")

Expand Down
2 changes: 1 addition & 1 deletion data/create_edges_location_city_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def main() -> None:
.rename({"city_id": "from", "state_id": "to"})
)
# Write nodes
edges_df.write_parquet(Path("output/edges") / "city_in.parquet")
edges_df.write_parquet(Path("output/edges") / "city_in.parquet", compression="snappy")
print(f"Wrote {len(edges_df)} edges for {len(cities_df)} cities")


Expand Down
2 changes: 1 addition & 1 deletion data/create_edges_location_state_country.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ def main() -> None:
.rename({"state_id": "from", "country_id": "to"})
)
# Write nodes
edges_df.write_parquet(Path("output/edges") / "state_in.parquet")
edges_df.write_parquet(Path("output/edges") / "state_in.parquet", compression="snappy")
print(f"Wrote {len(edges_df)} edges for {len(states_df)} states")


Expand Down
3 changes: 2 additions & 1 deletion data/create_nodes_interests.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ def main(filename: str) -> pl.DataFrame:
interests_df = interests_df.with_columns(pl.Series(ids).alias("id"))
# Write to csv
interests_df.select(pl.col("id"), pl.all().exclude("id")).write_parquet(
Path("output/nodes") / "interests.parquet"
Path("output/nodes") / "interests.parquet",
compression="snappy",
)
print(f"Wrote {interests_df.shape[0]} interests nodes to parquet")
return interests
Expand Down
11 changes: 7 additions & 4 deletions data/create_nodes_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ def get_cities_df(world_cities: pl.DataFrame) -> pl.DataFrame:
def write_city_nodes(cities_of_interest: pl.DataFrame) -> pl.DataFrame:
# Convert states column to ascii as it has problematic characters
cities_of_interest = cities_of_interest.with_columns(
pl.col("admin_name").apply(remove_accents)
pl.col("admin_name").map_elements(remove_accents)
).drop("city")
# Rename columns
cities_of_interest = cities_of_interest.rename({"city_ascii": "city", "admin_name": "state"})
Expand All @@ -58,7 +58,8 @@ def write_city_nodes(cities_of_interest: pl.DataFrame) -> pl.DataFrame:
city_nodes = city_nodes.with_columns(pl.Series(ids).alias("id"))
# Write to csv
city_nodes.select(pl.col("id"), pl.all().exclude("id")).write_parquet(
Path("output/nodes") / "cities.parquet"
Path("output/nodes") / "cities.parquet",
compression="snappy",
)
print(f"Wrote {city_nodes.shape[0]} cities to parquet")
return city_nodes
Expand All @@ -72,7 +73,8 @@ def write_state_nodes(city_nodes: pl.DataFrame) -> None:
state_nodes = state_nodes.with_columns(pl.Series(ids).alias("id"))
# Write to csv
state_nodes.select(pl.col("id"), pl.all().exclude("id")).write_parquet(
Path("output/nodes") / "states.parquet"
Path("output/nodes") / "states.parquet",
compression="snappy",
)
print(f"Wrote {state_nodes.shape[0]} states to parquet")

Expand All @@ -85,7 +87,8 @@ def write_country_nodes(city_nodes: pl.DataFrame) -> None:
country_nodes = country_nodes.with_columns(pl.Series(ids).alias("id"))
# Write to csv
country_nodes.select(pl.col("id"), pl.all().exclude("id")).write_parquet(
Path("output/nodes") / "countries.parquet"
Path("output/nodes") / "countries.parquet",
compression="snappy",
)
print(f"Wrote {country_nodes.shape[0]} countries to parquet")

Expand Down
3 changes: 2 additions & 1 deletion data/create_nodes_person.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,8 @@ def main() -> None:
persons_df = create_person_df(female_profiles, male_profiles)
# Write nodes
persons_df.select(pl.col("id"), pl.all().exclude("id")).write_parquet(
Path("output/nodes") / "persons.parquet"
Path("output/nodes") / "persons.parquet",
compression="snappy",
)
print(f"Wrote {persons_df.shape[0]} person nodes to parquet")

Expand Down
Binary file modified data/output/edges/city_in.parquet
Binary file not shown.
Binary file modified data/output/edges/follows.parquet
Binary file not shown.
Binary file modified data/output/edges/interests.parquet
Binary file not shown.
Binary file modified data/output/edges/lives_in.parquet
Binary file not shown.
Binary file modified data/output/edges/state_in.parquet
Binary file not shown.
Binary file modified data/output/nodes/cities.parquet
Binary file not shown.
Binary file modified data/output/nodes/countries.parquet
Binary file not shown.
Binary file modified data/output/nodes/interests.parquet
Binary file not shown.
Binary file modified data/output/nodes/persons.parquet
Binary file not shown.
Binary file modified data/output/nodes/states.parquet
Binary file not shown.
44 changes: 22 additions & 22 deletions kuzudb/benchmark_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_benchmark_query2(benchmark, connection):


def test_benchmark_query3(benchmark, connection):
result = benchmark(query.run_query3, connection, [("country", "United States")])
result = benchmark(query.run_query3, connection, {"country": "United States"})
result = result.to_dicts()

assert len(result) == 5
Expand All @@ -58,28 +58,28 @@ def test_benchmark_query3(benchmark, connection):


def test_benchmark_query4(benchmark, connection):
result = benchmark(query.run_query4, connection, [("age_lower", 30), ("age_upper", 40)])
result = benchmark(query.run_query4, connection, {"age_lower": 30, "age_upper": 40})
result = result.to_dicts()

assert len(result) == 3
assert result[0]["countries"] == "United States"
assert result[1]["countries"] == "Canada"
assert result[2]["countries"] == "United Kingdom"
assert result[0]["personCounts"] == 30473
assert result[0]["personCounts"] == 30431
assert result[1]["personCounts"] == 3064
assert result[2]["personCounts"] == 1873
assert result[2]["personCounts"] == 1870


def test_benchmark_query5(benchmark, connection):
result = benchmark(
query.run_query5,
connection,
[
("gender", "male"),
("city", "London"),
("country", "United Kingdom"),
("interest", "fine dining"),
],
{
"gender": "male",
"city": "London",
"country": "United Kingdom",
"interest": "fine dining",
},
)
result = result.to_dicts()

Expand All @@ -91,10 +91,10 @@ def test_benchmark_query6(benchmark, connection):
result = benchmark(
query.run_query6,
connection,
[
("gender", "female"),
("interest", "tennis")
],
{
"gender": "female",
"interest": "tennis"
},
)
result = result.to_dicts()

Expand All @@ -108,12 +108,12 @@ def test_benchmark_query7(benchmark, connection):
result = benchmark(
query.run_query7,
connection,
[
("country", "United States"),
("age_lower", 23),
("age_upper", 30),
("interest", "photography"),
],
{
"country": "United States",
"age_lower": 23,
"age_upper": 30,
"interest": "photography",
},
)
result = result.to_dicts()

Expand All @@ -132,9 +132,9 @@ def test_benchmark_query8(benchmark, connection):


def test_benchmark_query9(benchmark, connection):
result = benchmark(query.run_query9, connection, [("age_1", 50), ("age_2", 25)])
result = benchmark(query.run_query9, connection, {"age_1": 50, "age_2": 25})
result = result.to_dicts()

assert len(result) == 1
assert result[0]["numPaths"] == 45632026
assert result[0]["numPaths"] == 45558131

44 changes: 22 additions & 22 deletions kuzudb/query.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ def run_query3(conn: Connection, params: list[tuple[str, Any]]) -> None:
print(f"\nQuery 3:\n {query}")
response = conn.execute(query, parameters=params)
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(f"Cities with lowest average age in {params[0][1]}:\n{result}")
print(f"Cities with lowest average age in {params['country']}:\n{result}")
return result


Expand All @@ -65,7 +65,7 @@ def run_query4(conn: Connection, params: list[tuple[str, Any]]) -> None:
print(f"\nQuery 4:\n {query}")
response = conn.execute(query, parameters=params)
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(f"Persons between ages {params[0][1]}-{params[1][1]} in each country:\n{result}")
print(f"Persons between ages {params['age_lower']}-{params['age_upper']} in each country:\n{result}")
return result


Expand All @@ -84,7 +84,7 @@ def run_query5(conn: Connection, params: list[tuple[str, Any]]) -> None:
response = conn.execute(query, parameters=params)
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(
f"Number of {params[0][1]} users in {params[1][1]}, {params[2][1]} who have an interest in {params[3][1]}:\n{result}"
f"Number of {params['gender']} users in {params['city']}, {params['country']} who have an interest in {params['interest']}:\n{result}"
)
return result

Expand All @@ -104,7 +104,7 @@ def run_query6(conn: Connection, params: list[tuple[str, Any]]) -> None:
response = conn.execute(query, parameters=params)
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(
f"City with the most {params[0][1]} users who have an interest in {params[1][1]}:\n{result}"
f"City with the most {params['gender']} users who have an interest in {params['interest']}:\n{result}"
)
return result

Expand All @@ -125,7 +125,7 @@ def run_query7(conn: Connection, params: list[tuple[str, Any]]) -> None:
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(
f"""
State in {params[0][1]} with the most users between ages {params[1][1]}-{params[2][1]} who have an interest in {params[3][1]}:\n{result}
State in {params['country']} with the most users between ages {params['age_lower']}-{params['age_upper']} who have an interest in {params['interest']}:\n{result}
"""
)
return result
Expand Down Expand Up @@ -161,7 +161,7 @@ def run_query9(conn: Connection, params: list[tuple[str, Any]]) -> None:
result = pl.from_arrow(response.get_as_arrow(chunk_size=1000))
print(
f"""
Number of paths through persons below {params[0][1]} to persons above {params[1][1]}:\n{result}
Number of paths through persons below {params['age_1']} to persons above {params['age_2']}:\n{result}
"""
)
return result
Expand All @@ -171,29 +171,29 @@ def main(conn: Connection) -> None:
with Timer(name="queries", text="Queries completed in {:.4f}s"):
_ = run_query1(conn)
_ = run_query2(conn)
_ = run_query3(conn, params=[("country", "United States")])
_ = run_query4(conn, params=[("age_lower", 30), ("age_upper", 40)])
_ = run_query3(conn, params={"country": "United States"})
_ = run_query4(conn, params={"age_lower": 30, "age_upper": 40})
_ = run_query5(
conn,
params=[
("gender", "male"),
("city", "London"),
("country", "United Kingdom"),
("interest", "fine dining"),
],
params={
"gender": "male",
"city": "London",
"country": "United Kingdom",
"interest": "fine dining",
},
)
_ = run_query6(conn, params=[("gender", "female"), ("interest", "tennis")])
_ = run_query6(conn, params={"gender": "female", "interest": "tennis"})
_ = run_query7(
conn,
params=[
("country", "United States"),
("age_lower", 23),
("age_upper", 30),
("interest", "photography"),
],
params={
"country": "United States",
"age_lower": 23,
"age_upper": 30,
"interest": "photography",
},
)
_ = run_query8(conn)
_ = run_query9(conn, params=[("age_1", 50), ("age_2", 25)])
_ = run_query9(conn, params={"age_1": 50, "age_2": 25})


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions neo4j/benchmark_query.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,9 +68,9 @@ def test_benchmark_query4(benchmark, session):
assert result[0]["countries"] == "United States"
assert result[1]["countries"] == "Canada"
assert result[2]["countries"] == "United Kingdom"
assert result[0]["personCounts"] == 30473
assert result[0]["personCounts"] == 30431
assert result[1]["personCounts"] == 3064
assert result[2]["personCounts"] == 1873
assert result[2]["personCounts"] == 1870


def test_benchmark_query5(benchmark, session):
Expand Down Expand Up @@ -114,4 +114,4 @@ def test_benchmark_query9(benchmark, session):
result = result.to_dicts()

assert len(result) == 1
assert result[0]["numPaths"] == 45632026
assert result[0]["numPaths"] == 45558131
3 changes: 1 addition & 2 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
faker~=19.2.0
polars~=0.18.0
polars~=0.19.0
numpy>=1.25.0
pyarrow~=13.0.0
kuzu==0.0.8
neo4j~=5.11.0
python-dotenv>=1.0.0
Expand Down