Skip to content

Commit

Permalink
Merge pull request #17 from prrao87/parquet
Browse files Browse the repository at this point in the history
Use Parquet format instead of CSV for data consumption
  • Loading branch information
prrao87 authored Aug 18, 2023
2 parents 75ecc7c + a89ed93 commit 44d887e
Show file tree
Hide file tree
Showing 36 changed files with 544 additions and 29,646 deletions.
67 changes: 66 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,71 @@ Navigate to the [neo4j](./neo4j) and the [kuzudb](./kuzudb/) directories to see

Some sample queries are run in each DB to verify that the data is ingested correctly, and that the results are consistent with one another.

The following questions are asked of both graphs:

* **Query 1**: Who are the top 3 most-followed persons?
* **Query 2**: In which city does the most-followed person live?
* **Query 3**: What are the top 5 cities with the lowest average age of persons?
* **Query 4**: How many persons between ages 30-40 are there in each country?
* **Query 5**: How many men in London, United Kingdom have an interest in fine dining?
* **Query 6**: Which city has the maximum number of women that like Tennis?
* **Query 7**: Which U.S. state has the maximum number of persons between the age 23-30 who enjoy photography?
* **Query 8**: How many second degree connections are reachable in the graph?

## Performance comparison

🚧 WIP
The run times for both ingestion and queries are compared.

* For ingestion, KùzuDB is consistently faster than Neo4j by a factor of ~18x for a graph size of 100k nodes and ~2.4M edges.
* For OLAP querying, **KùzuDB is significantly faster** than Neo4j for most types of queries, especially for ones that involve aggregating on many-many relationships.

### Testing conditions

* Macbook Pro M2, 16 GB RAM
* All queries are run single-threaded (no parallelism)
* Neo4j version: `5.10.0`
* KùzuDB version: `0.7.0`
* The run times reported are for the 5th run, because we want to allow the cache to warm up before gauging query performance


### Ingestion performance

Case | Neo4j (sec) | Kùzu (sec) | Speedup factor
--- | --- | --- | ---
Nodes | 3.6144 | 0.0874 | 41.4
Edges | 37.5801 | 2.1622 | 17.4
Total | 41.1945 | 2.2496 | 18.3

In total, ~100K edges and ~2.5 million edges are ingested roughly 18x faster in KùzuDB than in Neo4j. Nodes are ingested significantly faster in Kùzu, and Neo4j's node ingestion remains of the order of seconds despite setting constraints on the ID fields as per their best practices. The speedup factors shown are expected to be even higher as the dataset gets larger and larger.

### Query performance: (Kùzu single-threaded)

Query | Neo4j (sec) | Kùzu (sec) | Speedup factor
--- | --- | --- | ---
1 | 1.617523 | 0.311524 | 5.2
2 | 0.592790 | 0.791726 | 0.7
3 | 0.009398 | 0.012013 | 0.8
4 | 0.047333 | 0.015932 | 3.0
5 | 0.011949 | 0.012567 | 1.0
6 | 0.024780 | 0.033764 | 0.7
7 | 0.160752 | 0.012508 | 12.9
8 | 0.845768 | 0.103470 | 8.2

### Query performance: (Kùzu multi-threaded)

Unlike Neo4j, KùzuDB supports multi-threaded execution of queries. The following results are for the same queries as above, but allowing Kùzu to choose the optimal number of threads for each query.

Query | Neo4j (sec) | Kùzu (sec) | Speedup factor
--- | --- | --- | ---
1 | 1.617523 | 0.230980 | 7.0
2 | 0.592790 | 0.625935 | 0.9
3 | 0.009398 | 0.011896 | 0.8
4 | 0.047333 | 0.014518 | 3.3
5 | 0.011949 | 0.012230 | 1.0
6 | 0.024780 | 0.015304 | 1.6
7 | 0.160752 | 0.010679 | 15.1
8 | 0.845768 | 0.024422 | 34.6

Some queries show as much as a 34x speedup over Neo4j, and the average speedup is 7.5x.

It would be interesting to further study the cases where Kùzu's performance is more in line with Neo4j. More to come soon!
51 changes: 38 additions & 13 deletions data/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ $ cd data
$ python create_nodes_person.py -n 100000
```

The CSV file generated contains a header and fake data and looks like the below.
The parquet file generated fake person metadata, and looks like the below.


id|name|gender|birthday|age|isMarried
Expand All @@ -32,7 +32,7 @@ id|name|gender|birthday|age|isMarried
2|Stephanie Lozano|female|1993-12-31|29|true
3|Thomas Williams|male|1979-02-09|44|true

Each column uses the `|` separator symbol to make it explicit what the column boundaries are (especially when the data itself contains commas, which is common if the data contains unstructured text).
Because the parquet format encodes the data types as inferred from the underlying arrow schema, we can be assured that the data, for example, `age`, is correctly stored of the type `date`. This reduces the verbosity of the code when compared to the CSV format, which would required us to clearly specify the separator and then re-parse the data to the correct type when using it downstream.

### Nodes: Locations

Expand All @@ -43,29 +43,29 @@ To make this dataset simpler and more realistic, we only consider cities from th
```sh
$ python create_nodes_location.py

Wrote 7117 cities to CSV
Wrote 273 states to CSV
Wrote 3 countries to CSV
Wrote 7117 cities to parquet
Wrote 273 states to parquet
Wrote 3 countries to parquet
```

Three CSV files are generated accordingly for cities, states and the specified countries. Latitude, longitude and population are the additional metadata fields for each city.
Three parquet files are generated accordingly for cities, states and the specified countries. Latitude, longitude and population are the additional metadata fields for each city, each stored with the appropriate data type within the file's schema.

#### `cities.csv`
#### `cities.parquet`

id|city|state|country|lat|lng|population
---|---|---|---|---|---|---
1|Airdrie|Alberta|Canada|51.2917|-114.0144|61581
2|Beaumont|Alberta|Canada|53.3572|-113.4147|17396

#### `states.csv`
#### `states.parquet`

id|state|country
---|---|---
1|Alberta|Canada
2|British Columbia|Canada
3|Manitoba|Canada

#### `countries.csv`
#### `countries.parquet`

id|country
---|---
Expand All @@ -75,7 +75,7 @@ id|country

### Nodes: Interests

A static list of interests/hobbies that a person could have is included in `raw/interests.csv`. This is cleaned up and formatted as required by the data generator script.
A static list of interests/hobbies that a person could have is included in `raw/interests.parquet`. This is cleaned up and formatted as required by the data generator script.

```sh
$ python create_nodes_interests.py
Expand Down Expand Up @@ -143,7 +143,7 @@ A person can have multiple interests, so the `from` column can have multiple row

### Edges: `City` is in `State`

Edges are generated between cities and the states they are in, as per the `cities.csv` file
Edges are generated between cities and the states they are in, as per the `cities.parquet` file

```sh
python create_edges_city_state.py
Expand All @@ -159,7 +159,7 @@ from|to

### Edges: `State` is in `Country`

Edges are generated between states and the countries they are in, as per the `states.csv` file
Edges are generated between states and the countries they are in, as per the `states.parquet` file

```sh
python create_edges_state_country.py
Expand All @@ -171,4 +171,29 @@ from|to
---|---
1|1
2|1
3|1
3|1

## Dataset files

The following files are generated by the scripts in this directory.

### Nodes

In the `./output/nodes` directory, the following files are generated.

* `persons.parquet`
* `interests.parquet`
* `cities.parquet`
* `states.parquet`
* `countries.parquet`


### Edges

In the `./output/edges` directory, the following files are generated.

* `follows.parquet`
* `lives_in.parquet`
* `interests.parquet`
* `city_in.parquet`
* `state_in.parquet`
26 changes: 7 additions & 19 deletions data/create_edges_follows.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,6 @@ def select_random_ids(df: pl.DataFrame, num: int) -> list[int]:
return connections


def get_persons_df(filepath: Path) -> pl.DataFrame:
# Read in person data
persons_df = pl.read_csv(filepath, separator="|").with_columns(
pl.col("birthday").str.strptime(pl.Date, "%Y-%m-%d")
)
return persons_df


def get_initial_person_edges(persons_df: pl.DataFrame) -> pl.DataFrame:
"""
Produce an initial list of person-person edges.
Expand All @@ -53,7 +45,7 @@ def create_super_node_edges(persons_df: pl.DataFrame) -> pl.DataFrame:
- The aim is to have a select few persons act as as concentration points in the graph
- The number of super nodes is set as a fraction of the total number of persons in the graph
"""
NUM_SUPER_NODES = len(persons_df) * 5 // 1000
NUM_SUPER_NODES = len(persons_df) * 5 // 1000 if len(persons_df) > 0 else 1
super_node_ids = np.random.choice(persons_df["id"], size=NUM_SUPER_NODES, replace=False)
# Convert to dataframe
super_nodes_df = pl.DataFrame({"id": super_node_ids}).sort("id")
Expand Down Expand Up @@ -81,9 +73,7 @@ def create_super_node_edges(persons_df: pl.DataFrame) -> pl.DataFrame:
)
# Explode the connections column to create a row for each connection
.explode("connections")
.filter(
pl.col("id") != pl.col("connections")
)
.filter(pl.col("id") != pl.col("connections"))
.sort(["id", "connections"])
.select(["id", "connections"])
)
Expand All @@ -93,23 +83,21 @@ def create_super_node_edges(persons_df: pl.DataFrame) -> pl.DataFrame:


def main() -> None:
persons_df = get_persons_df(NODES_PATH / "persons.csv")
persons_df = pl.read_parquet(NODES_PATH / "persons.parquet")
np.random.seed(SEED)
edges_df = get_initial_person_edges(persons_df)
# Generate edges from super nodes
super_node_edges_df = create_super_node_edges(persons_df)
# Concatenate edges from original edges_df and super_node_edges_df
edges_df = (
pl.concat([edges_df, super_node_edges_df])
.unique()
.sort(["to", "from"])
).select("from", "to")
edges_df = (pl.concat([edges_df, super_node_edges_df]).unique().sort(["to", "from"])).select(
"from", "to"
)
# Limit the number of edges
if NUM < len(edges_df):
edges_df = edges_df.head(NUM)
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df.write_csv(Path("output/edges") / "follows.csv", separator="|")
edges_df.write_parquet(Path("output/edges") / "follows.parquet")
print(f"Wrote {len(edges_df)} edges for {len(persons_df)} persons")


Expand Down
32 changes: 13 additions & 19 deletions data/create_edges_interests.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,28 +14,24 @@ def select_random_ids(df: pl.DataFrame, colname: str, num: int) -> list[int]:
return connections





def main() -> None:
interests_df = (
pl.read_csv(Path(NODES_PATH) / "interests.csv", separator="|")
.rename({"id": "interest_id"})
interests_df = pl.read_parquet(Path(NODES_PATH) / "interests.parquet").rename(
{"id": "interest_id"}
)
# Read in person IDs
persons_df = pl.read_csv(NODES_PATH / "persons.csv", separator="|").select("id")
persons_df = pl.read_parquet(NODES_PATH / "persons.parquet").select("id")
# Set a lower and upper bound on the number of interests per person
lower_bound, upper_bound = 1, 5
# Add a column with a random number of interests per person
persons_df = persons_df.with_columns(
pl.lit(
np.random.randint(
lower_bound,
upper_bound,
len(persons_df),
)
).alias("num_interests")
)
pl.lit(
np.random.randint(
lower_bound,
upper_bound,
len(persons_df),
)
).alias("num_interests")
)
# Add a column of random IDs from the interests_df, and explode it within the DataFrame
edges_df = (
# Take in the column val of num_connections and return a list of IDs from persons_df
Expand All @@ -54,10 +50,8 @@ def main() -> None:
edges_df = edges_df.head(NUM)
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df = (
edges_df.rename({"id": "from", "interests": "to"})
)
edges_df.write_csv(Path("output/edges") / "interests.csv", separator="|")
edges_df = edges_df.rename({"id": "from", "interests": "to"})
edges_df.write_parquet(Path("output/edges") / "interests.parquet")
print(f"Wrote {len(edges_df)} edges for {len(persons_df)} persons")


Expand Down
28 changes: 13 additions & 15 deletions data/create_edges_location.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@

def get_persons_df(filepath: Path) -> pl.DataFrame:
# Read in persons data
persons_df = pl.read_csv(filepath, separator="|").select("id")
persons_df = pl.read_parquet(filepath).select("id")
return persons_df


Expand All @@ -19,42 +19,40 @@ def get_cities_df(filepath: Path) -> pl.DataFrame:
Get only cities with a population of > 1M
"""
# Read in cities data and rename the ID column to avoid conflicts
residence_loc_df = pl.read_csv(filepath, separator="|").filter(
pl.col("population") >= 1_000_000
).rename({"id": "city_id"})
residence_loc_df = (
pl.read_parquet(filepath)
.filter(pl.col("population") >= 1_000_000)
.rename({"id": "city_id"})
)
return residence_loc_df


def main() -> None:
np.random.seed(SEED)
persons_df = get_persons_df(NODES_PATH / "persons.csv")
residence_loc_df = get_cities_df(NODES_PATH / "cities.csv")
persons_df = get_persons_df(NODES_PATH / "persons.parquet")
residence_loc_df = get_cities_df(NODES_PATH / "cities.parquet")
# Randomly pick a city ID from the list of all cities with population > 1M
city_ids = np.random.choice(residence_loc_df["city_id"], size=len(persons_df), replace=True)
# Obtain top 5 most common cities name via a join
city_ids_df = pl.DataFrame(city_ids).rename({"column_0": "city_id"})
# Horizontally stack the person IDs and the residence city IDs to create a list of edges
edges_df = pl.concat([persons_df, city_ids_df], how='horizontal')
edges_df = pl.concat([persons_df, city_ids_df], how="horizontal")
city_counts_df = edges_df.groupby("city_id").count().sort("count", descending=True)
top_cities_df = (
city_counts_df.join(residence_loc_df, on="city_id", how="left")
# List top 5 cities
.sort("count", descending=True)
.head(5)
.sort("count", descending=True).head(5)
)
top_5 = top_cities_df["city"].to_list()
# Limit the number of edges
if NUM < len(edges_df):
edges_df = edges_df.head(NUM)
print(f"Limiting edges to {NUM} per the `--num` argument")
# Write nodes
edges_df = (
edges_df.rename({"city_id": "to", "id": "from"})
.write_csv(Path("output/edges") / "lives_in.csv", separator="|")
)
print(
f"Generated residence cities for persons. Top 5 common cities are: {', '.join(top_5)}"
edges_df = edges_df.rename({"city_id": "to", "id": "from"}).write_parquet(
Path("output/edges") / "lives_in.parquet"
)
print(f"Generated residence cities for persons. Top 5 common cities are: {', '.join(top_5)}")


if __name__ == "__main__":
Expand Down
6 changes: 3 additions & 3 deletions data/create_edges_location_city_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,13 @@
def main() -> None:
# Read data from cities file
cities_df = (
pl.read_csv(NODES_PATH / "cities.csv", separator="|")
pl.read_parquet(NODES_PATH / "cities.parquet")
.rename({"id": "city_id"})
.select(["city_id", "city", "state"])
)
# Read in states from file
states_df = (
pl.read_csv(NODES_PATH / "states.csv", separator="|")
pl.read_parquet(NODES_PATH / "states.parquet")
.rename({"id": "state_id"})
.select("state_id", "state")
)
Expand All @@ -26,7 +26,7 @@ def main() -> None:
.rename({"city_id": "from", "state_id": "to"})
)
# Write nodes
edges_df.write_csv(Path("output/edges") / "city_in.csv", separator="|")
edges_df.write_parquet(Path("output/edges") / "city_in.parquet")
print(f"Wrote {len(edges_df)} edges for {len(cities_df)} cities")


Expand Down
Loading

0 comments on commit 44d887e

Please sign in to comment.