diff --git a/README.md b/README.md index 1ad42f4..e109152 100644 --- a/README.md +++ b/README.md @@ -1,13 +1,17 @@ # KùzuDB: Benchmark study -[Kùzu](https://kuzudb.com/) is an in-process (embedded) graph database management system (GDBMS) built for query speed and scalability. It is written in C++, optimized for handling complex join-heavy analytical workloads on very large graph databases, and is under active development. The goal of the code shown in this repo is as follows: +[Kùzu](https://kuzudb.com/) is an in-process (embedded) graph database management system (GDBMS). Because it is written in C++, it is blazing fast, and is optimized for handling complex join-heavy analytical workloads on very large graph databases. The database is under active development, but its philosophy is to become the "DuckDB of graph databases" -- a fast, lightweight, embeddable graph database for analytics use cases, with minimum setup and infrastructure effort. -* Generate an artificial dataset that can be used to build an artificial social network graph -* Ingest the data into Kùzu -* Run a set of queries in Cypher on the data to benchmark the performance of Kùzu -* Study the ingestion and query times in comparison with Neo4j, and optimize where possible +The goal of the code shown in this repo is as follows: -Python is used as the intermediary between the source data and the DB. +* Generate an artificial social network dataset, including persons, interests and locations +* Ingest the data into KùzuDB and Neo4j +* Run a set of queries in Cypher on either DB to: + * (1) Verify that the data is ingested correctly and that the results from either DB are consistent with one another + * (2) Benchmark the performance of Kùzu vs an established vendor like Neo4j +* Study the ingestion and query times for either DB, and optimize where possible + +Python is used as the intermediary language between the source data and the DBs. ## Setup @@ -26,170 +30,24 @@ An artificial social network dataset is used, generated via the [Faker](https:// ### Generate all data at once -A shell script `generate_data.sh` is provided in the root directory of this repo that sequentially runs the Python scripts, generating the data for the nodes and edges for the social network. This is the recommended way to generate the data. A single positional argument is provided to the shell script: The number of person profiles to generate. +A shell script `generate_data.sh` is provided in the root directory of this repo that sequentially runs the Python scripts, generating the data for the nodes and edges for the social network. This is the recommended way to generate the data. A single positional argument is provided to the shell script: The number of person profiles to generate -- this is specified as an integer, as shown below. ```sh -bash generate_data.sh 1000 +bash generate_data.sh 10000 ``` Running this command generates a series of files in the `output` directory, following which we can proceed to ingesting the data into a graph database. -### Nodes: Persons - -First, fake male and female profile information is generated for the number of people required to be in the network. - -```sh -$ cd data -# Create a dataset of 1000 fake profiles for men and women with a 50-50 split by gender -$ python create_nodes_person.py -n 1000 -``` - -The CSV file generated contains a header and fake data as shown below. - - -id|name|gender|birthday|age|isMarried ----|---|---|---|---|--- -1|Natasha Evans|female|1985-08-31|37|true -2|Gregory Smith|male|1985-11-30|37|false - - -The data in each column is separated by the `|` symbol to make it explicit what the column boundaries are (especially when the data itself contains commas). - -### Nodes: Locations - -To generate a list of cities that people live in, we use the [world cities dataset](https://www.kaggle.com/datasets/juanmah/world-cities?resource=download) from Kaggle. This is an accurate and up-to-date database of the world's cities and towns, including lat/long and population information of ~44k cities all over the world. - -To make this dataset simpler and more realistic, we only consider cities from the following three countries: `US`, `UK` and `CA`. - -```sh -$ python create_nodes_location.py - -Wrote 7117 cities to CSV -Wrote 273 states to CSV -Wrote 3 countries to CSV -``` - -Three CSV files are generated accordingly for cities, states and the specified countries. Latitude, longitude and population are the additional metadata fields for each city. - -#### `cities.csv` - -id|city|state|country|lat|lng|population ----|---|---|---|---|---|--- -1|Airdrie|Alberta|Canada|51.2917|-114.0144|61581 -2|Beaumont|Alberta|Canada|53.3572|-113.4147|17396 - -#### `states.csv` - -id|state|country ----|---|--- -1|Alberta|Canada -2|British Columbia|Canada -3|Manitoba|Canada - -#### `countries.csv` - -id|country ----|--- -1|Canada -2|United Kingdom -3|United States - -### Nodes: Interests - -A static list of interests/hobbies that a person could have is included in `raw/interests.csv`. This is cleaned up and formatted as required by the data generator script. - -```sh -$ python create_nodes_interests.py -``` - -This generates data as shown below. - -id|interest ---- | --- -1|Anime -2|Art & Painting -3|Biking +See [./data/README.md](./data/README.md) for more details on each script that is run sequentially to generate the data. -### Edges: `Person` follows `Person` +## Ingest the data into Neo4j or Kùzu -Edges are generated between people in a similar way to the way we might imagine social networks. A `Person` follows another `Person`, with the direction of the edge signifying something meaningful. Rather than just generating a uniform distribution, to make the data more interesting, during generation, a small fraction of the profiles (~0.5%) is chosen to be highly connected. This resembles the role of "influencers" in real-world graphs, and in graph terminology, the nodes representing these persons can be called "hubs". The rest of the nodes are connected via these hubs in a random fashion. +Navigate to the [neo4j](./neo4j) and the [kuzudb](./kuzudb/) to see the instructions on how to ingest the data into each database. -```sh -python create_edges_follows.py -``` - -This generates data as shown below, where the `from` column contains the ID of a person who is following someone, and the `to` column contains the ID of the person being followed. - -from|to ----|--- -50|1 -152|1 -271|1 - -The "hub" nodes can be connected to anywhere from 0.5-5% of the number of persons in the graph. - -### Edges: `Person` lives in `Location` - -Edges are generated between people and the cities they live in. This is done by randomly choosing a city for each person from the list of cities generated earlier. - -```sh -$ python create_edges_location.py -``` - -The data generated contains the person ID in the `from` column and the city ID in the `to` column. - -from|to ----|--- -1|6015 -2|6296 -3|6657 +## Run the queries -### Edges: `Person` has `Interest` - -Edges are generated between people and the interests they have. This is done by randomly choosing anywhere from 1-5 interests for each person from the list of interests generated earlier for the nodes. - -```sh -python create_edges_interests.py -``` - -The data generated contains the person ID in the `from` column and the interest ID in the `to` column. - -from|to ----|--- -1|24 -2|4 -2|8 - -A person can have multiple interests, so the `from` column can have multiple rows with the same ID. - -### Edges: `City` is in `State` - -Edges are generated between cities and the states they are in, as per the `cities.csv` file - -```sh -python create_edges_city_state.py -``` - -The data generated contains the city ID in the `from` column and the state ID in the `to` column. - -from|to ----|--- -1|1 -2|1 -3|1 - -### Edges: `State` is in `Country` - -Edges are generated between states and the countries they are in, as per the `states.csv` file - -```sh -python create_edges_state_country.py -``` +Some sample queries are run in each DB to verify that the data is ingested correctly, and that the results are consistent with one another. -The data generated contains the state ID in the `from` column and the country ID in the `to` column. +## Performance comparison -from|to ----|--- -1|1 -2|1 -3|1 \ No newline at end of file +🚧 WIP \ No newline at end of file diff --git a/data/README.md b/data/README.md new file mode 100644 index 0000000..4ca4784 --- /dev/null +++ b/data/README.md @@ -0,0 +1,174 @@ +# Data generation for study + +This section describes the individual data generation scripts to build the nodes and edges of the artificial social network. + +## Generate all data at once + +As mentioned in the root level README, a shell script `generate_data.sh` is provided that sequentially runs the Python scripts from this directory, generating the data for the nodes and edges for the social network. This is the recommended way to generate the data. A single positional argument is provided to the shell script: The number of person profiles to generate, specified as an integer value as shown below. + +```sh +# Generate data for 100K persons +bash generate_data.sh 100000 +``` + +Running this command generates a series of files in the `output` directory, following which we can proceed to ingesting the data into a graph database. + +### Nodes: Persons + +First, fake male and female profile information is generated for the number of people required to be in the network. + +```sh +$ cd data +# Create a dataset of fake profiles for men and women with a 50-50 split by gender +$ python create_nodes_person.py -n 100000 +``` + +The CSV file generated contains a header and fake data and looks like the below. + + +id|name|gender|birthday|age|isMarried +---|---|---|---|---|--- +1|Kenneth Scott|male|1984-04-14|39|true +2|Stephanie Lozano|female|1993-12-31|29|true +3|Thomas Williams|male|1979-02-09|44|true + +Each column uses the `|` separator symbol to make it explicit what the column boundaries are (especially when the data itself contains commas, which is common if the data contains unstructured text). + +### Nodes: Locations + +To generate a list of cities that people live in, we use the [world cities dataset](https://www.kaggle.com/datasets/juanmah/world-cities?resource=download) from Kaggle. This is an accurate and up-to-date database of the world's cities and towns, including lat/long and population information of ~44k cities all over the world. + +To make this dataset simpler and more realistic, we only consider cities from the following three countries: `US`, `UK` and `CA`. + +```sh +$ python create_nodes_location.py + +Wrote 7117 cities to CSV +Wrote 273 states to CSV +Wrote 3 countries to CSV +``` + +Three CSV files are generated accordingly for cities, states and the specified countries. Latitude, longitude and population are the additional metadata fields for each city. + +#### `cities.csv` + +id|city|state|country|lat|lng|population +---|---|---|---|---|---|--- +1|Airdrie|Alberta|Canada|51.2917|-114.0144|61581 +2|Beaumont|Alberta|Canada|53.3572|-113.4147|17396 + +#### `states.csv` + +id|state|country +---|---|--- +1|Alberta|Canada +2|British Columbia|Canada +3|Manitoba|Canada + +#### `countries.csv` + +id|country +---|--- +1|Canada +2|United Kingdom +3|United States + +### Nodes: Interests + +A static list of interests/hobbies that a person could have is included in `raw/interests.csv`. This is cleaned up and formatted as required by the data generator script. + +```sh +$ python create_nodes_interests.py +``` + +This generates data as shown below. + +id|interest +--- | --- +1|Anime +2|Art & Painting +3|Biking + +### Edges: `Person` follows `Person` + +Edges are generated between people in a similar way to the way we might imagine social networks. A `Person` follows another `Person`, with the direction of the edge signifying something meaningful. Rather than just generating a uniform distribution, to make the data more interesting, during generation, a small fraction of the profiles (~0.5%) is chosen to be highly connected. This resembles the role of "influencers" in real-world graphs, and in graph terminology, the nodes representing these persons can be called "hubs". The rest of the nodes are connected via these hubs in a random fashion. + +```sh +python create_edges_follows.py +``` + +This generates data as shown below, where the `from` column contains the ID of a person who is following someone, and the `to` column contains the ID of the person being followed. + +from|to +---|--- +50|1 +152|1 +271|1 + +The "hub" nodes can be connected to anywhere from 0.5-5% of the number of persons in the graph. + +### Edges: `Person` lives in `Location` + +Edges are generated between people and the cities they live in. This is done by randomly choosing a city for each person from the list of cities generated earlier. + +```sh +$ python create_edges_location.py +``` + +The data generated contains the person ID in the `from` column and the city ID in the `to` column. + +from|to +---|--- +1|6015 +2|6296 +3|6657 + +### Edges: `Person` has `Interest` + +Edges are generated between people and the interests they have. This is done by randomly choosing anywhere from 1-5 interests for each person from the list of interests generated earlier for the nodes. + +```sh +python create_edges_interests.py +``` + +The data generated contains the person ID in the `from` column and the interest ID in the `to` column. + +from|to +---|--- +1|24 +2|4 +2|8 + +A person can have multiple interests, so the `from` column can have multiple rows with the same ID. + +### Edges: `City` is in `State` + +Edges are generated between cities and the states they are in, as per the `cities.csv` file + +```sh +python create_edges_city_state.py +``` + +The data generated contains the city ID in the `from` column and the state ID in the `to` column. + +from|to +---|--- +1|1 +2|1 +3|1 + +### Edges: `State` is in `Country` + +Edges are generated between states and the countries they are in, as per the `states.csv` file + +```sh +python create_edges_state_country.py +``` + +The data generated contains the state ID in the `from` column and the country ID in the `to` column. + +from|to +---|--- +1|1 +2|1 +3|1 \ No newline at end of file diff --git a/generate_data.sh b/generate_data.sh index 620617a..53b3e83 100644 --- a/generate_data.sh +++ b/generate_data.sh @@ -10,7 +10,7 @@ echo "Generating $1 samples of data"; # Nodes python create_nodes_person.py -n ${1-1000} python create_nodes_location.py -python create_nodes_interest.py +python create_nodes_interests.py # Edges python create_edges_follows.py diff --git a/kuzudb/README.md b/kuzudb/README.md new file mode 100644 index 0000000..4c7fb98 --- /dev/null +++ b/kuzudb/README.md @@ -0,0 +1,85 @@ +# Kùzu graph + +This section describes how to build and query a graph of the social network data in KùzuDB. It uses Kùzu's [client API](https://github.com/kuzudb/kuzu) to perform the ingestion and querying. + +## Setup + +Because Kùzu is an embedded graph database, the database is tightly coupled with the application layer -- there is no server to set up and run. Simply `pip install kuzu` and you're good to go! + +## Build graph + +The script `build_graph.py` contains the necessary methods to connect to the KùzuDB DB and ingest the data from the CSV files, in batches for large amounts of data. + +```sh +python build_graph.py --batch_size 50000 +``` + +### Ingestion performance + +As expected, the nodes load much faster than the edges, since there are many more edges than nodes. The run times for ingesting nodes and edges are output to the console. + +``` +Nodes loaded in 0.0872s +Edges loaded in 2.0920s +``` + +> 💡 Ingesting the nodes/edges via the CSV bulk loader in KùzuDB takes under 3 seconds 🔥, as opposed to ~65 seconds for Neo4j. The timing shown is on an M2 Macbook Pro with 16 GB of RAM. + +## Query graph + +To verify that the graph was built correctly, the script `query.py` contains a few example queries that can be run against the DB, generating some simple statistics. + +```sh +python query.py +``` + +The following questions are asked of the graph: + +* **Query 1**: Who are the top 3 most-followed persons? +* **Query 2**: In which city does the most-followed person live? +* **Query 3**: What are the top 5 cities with the lowest average age of persons? +* **Query 4**: How many persons between ages 30-40 are there in each country? + +## To do + +- [ ] Fix and optimize queries 2-4 (currently not working) + +#### Output + +``` +Query 1 completed in 0.145473s + +Query 1: + + MATCH (follower:Person)-[:Follows]->(person:Person) + RETURN person.id AS personID, person.name AS name, count(follower) AS numFollowers + ORDER BY numFollowers DESC LIMIT 3 + +Top 3 most-followed persons: +shape: (3, 3) +┌──────────┬────────────────┬──────────────┐ +│ personID ┆ name ┆ numFollowers │ +│ --- ┆ --- ┆ --- │ +│ i64 ┆ str ┆ i64 │ +╞══════════╪════════════════╪══════════════╡ +│ 85723 ┆ Rachel Cooper ┆ 4998 │ +│ 68753 ┆ Claudia Booker ┆ 4985 │ +│ 54696 ┆ Brian Burgess ┆ 4976 │ +└──────────┴────────────────┴──────────────┘ +Queries completed in 0.2517s +``` + +As can be seen, the results are identical to those obtained from Neo4j. + +### Query performance + +Query times for simple aggregation and path finding are relatively low. More advanced queries involving variable length paths will be studied later. + +Summary of run times: + +* Query 1: `0.145473s` +* Query 2: TBD +* Query 3: TBD +* Query 4: TBD + +> 💡 Query 1 takes the longest to run -- around 150 ms. The timing shown is for queries run on an M2 Macbook Pro with 16 GB of RAM. diff --git a/kuzudb/build_graph.py b/kuzudb/build_graph.py new file mode 100644 index 0000000..6c80442 --- /dev/null +++ b/kuzudb/build_graph.py @@ -0,0 +1,133 @@ +import os +import shutil +from pathlib import Path + +import kuzu +from codetiming import Timer +from kuzu import Connection + +DATA_PATH = Path(__file__).resolve().parents[1] / "data" +NODES_PATH = DATA_PATH / "output" / "nodes" +EDGES_PATH = DATA_PATH / "output" / "edges" + + +def create_person_node_table(conn: Connection) -> None: + conn.execute( + """ + CREATE NODE TABLE + Person( + id INT64, + name STRING, + gender STRING, + birthday DATE, + age INT32, + isMarried BOOLEAN, + PRIMARY KEY (id) + ) + """ + ) + + +def create_city_node_table(conn: Connection) -> None: + conn.execute( + """ + CREATE NODE TABLE + City( + id INT64, + city STRING, + state STRING, + country STRING, + lat FLOAT, + lon FLOAT, + population INT64, + PRIMARY KEY (id) + ) + """ + ) + + +def create_state_node_table(conn: Connection) -> None: + conn.execute( + """ + CREATE NODE TABLE + State( + id INT64, + state STRING, + country STRING, + PRIMARY KEY (id) + ) + """ + ) + + +def create_country_node_table(conn: Connection) -> None: + conn.execute( + """ + CREATE NODE TABLE + Country( + id INT64, + country STRING, + PRIMARY KEY (id) + ) + """ + ) + + +def create_interest_node_table(conn: Connection) -> None: + conn.execute( + """ + CREATE NODE TABLE + Interest( + id INT64, + interest STRING, + PRIMARY KEY (id) + ) + """ + ) + + +def create_edge_tables(conn: Connection) -> None: + # Create edge schemas + conn.execute("CREATE REL TABLE Follows(FROM Person TO Person)") + conn.execute("CREATE REL TABLE LivesIn(FROM Person TO City)") + conn.execute("CREATE REL TABLE HasInterest(FROM Person TO Interest)") + conn.execute("CREATE REL TABLE CityIn(FROM City TO State)") + conn.execute("CREATE REL TABLE StateIn(FROM State TO Country)") + + +def main(conn: Connection) -> None: + with Timer(name="nodes", text="Nodes loaded in {:.4f}s"): + # Nodes + create_person_node_table(conn) + create_city_node_table(conn) + create_state_node_table(conn) + create_country_node_table(conn) + create_interest_node_table(conn) + conn.execute(f"COPY Person FROM '{NODES_PATH}/persons.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY City FROM '{NODES_PATH}/cities.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY State FROM '{NODES_PATH}/states.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY Country FROM '{NODES_PATH}/countries.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY Interest FROM '{NODES_PATH}/interests.csv' (HEADER=true, DELIM='|');") + + with Timer(name="edges", text="Edges loaded in {:.4f}s"): + # Edges + create_edge_tables(conn) + conn.execute(f"COPY Follows FROM '{EDGES_PATH}/follows.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY LivesIn FROM '{EDGES_PATH}/lives_in.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY HasInterest FROM '{EDGES_PATH}/interests.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY CityIn FROM '{EDGES_PATH}/city_in.csv' (HEADER=true, DELIM='|');") + conn.execute(f"COPY StateIn FROM '{EDGES_PATH}/state_in.csv' (HEADER=true, DELIM='|');") + + print(f"Successfully loaded nodes and edges into KùzuDB!") + + +if __name__ == "__main__": + DB_NAME = "social_network" + # Delete directory each time till we have MERGE FROM available in kuzu + if os.path.exists(DB_NAME): + shutil.rmtree(DB_NAME) + # Create database + db = kuzu.Database(f"./{DB_NAME}") + CONNECTION = kuzu.Connection(db) + + main(CONNECTION) diff --git a/kuzudb/query.py b/kuzudb/query.py new file mode 100644 index 0000000..2f5181a --- /dev/null +++ b/kuzudb/query.py @@ -0,0 +1,86 @@ +""" +Run a series of queries on the Neo4j database +""" +import polars as pl +import kuzu +from kuzu import Connection +from typing import Any +from codetiming import Timer + + +def run_query1(conn: Connection) -> None: + "Who are the top 3 most-followed persons in the network?" + query = """ + MATCH (follower:Person)-[:Follows]->(person:Person) + RETURN person.id AS personID, person.name AS name, count(follower) AS numFollowers + ORDER BY numFollowers DESC LIMIT 3 + """ + with Timer(name="query1", text="Query 1 completed in {:.6f}s"): + response = conn.execute(query) + print(f"\nQuery 1:\n {query}") + result = pl.from_arrow(response.get_as_arrow(chunk_size=1000)) + print(f"Top 3 most-followed persons:\n{result}") + + +def run_query2(conn: Connection) -> None: + "In which city does the most-followed person in the network live?" + query = """ + MATCH (follower:Person)-[:Follows]->(person:Person) + WITH person, count(follower) as followers + ORDER BY followers DESC LIMIT 1 + MATCH (person) -[:LivesIn]-> (city:City) + RETURN person.name AS name, followers AS numFollowers, city.city AS city + """ + with Timer(name="query2", text="Query 2 completed in {:.6f}s"): + response = conn.execute(query) + print(f"\nQuery 2:\n {query}") + result = pl.from_arrow(response.get_as_arrow(chunk_size=1000)) + print(f"City in which most-followed person lives:\n{result}") + + +def run_query3(conn: Connection, params: list[tuple[str, Any]]) -> None: + "Which are the top 5 cities in a particular region of the world with the lowest average age in the network?" + query = """ + MATCH (country:Country) + WHERE co.country = $country + AND EXISTS { MATCH (p:Person)-[:LivesIn]->(ci:City)-[*..2]->(country) } + RETURN ci.city AS city, avg(p.age) AS averageAge + ORDER BY averageAge LIMIT 5 + """ + with Timer(name="query3", text="Query 3 completed in {:.6f}s"): + response = conn.execute(query, parameters=params) + print(f"\nQuery 3:\n {query}") + result = pl.from_arrow(response.get_as_arrow(chunk_size=1000)) + print(f"Cities with lowest average age in {params[0][1]}:\n{result}") + + +def run_query4(conn: Connection, params: list[tuple[str, Any]]) -> None: + "How many persons between a certain age range are in each country?" + query = """ + MATCH (p:Person)-[:LivesIn]->(ci:City)-[*..2]->(country:Country) + WHERE p.age > $age_lower AND p.age < $age_upper + RETURN country.country AS countries, count(country) AS personCounts + ORDER BY personCounts DESC LIMIT 3 + """ + with Timer(name="query4", text="Query 4 completed in {:.6f}s"): + response = conn.execute(query, parameters=params) + print(f"\nQuery 4:\n {query}") + result = pl.from_arrow(response.get_as_arrow(chunk_size=1000)) + print(f"Persons between ages {params[0][1]}-{params[1][1]} in each country:\n{result}") + + +def main(conn: Connection) -> None: + with Timer(name="queries", text="Queries completed in {:.4f}s"): + run_query1(conn) + # run_query2(conn) + # run_query3(conn, params=[("country", "Canada")]) + # run_query4(conn, params=[("age_lower", 30), ("age_upper", 40)]) + + +if __name__ == "__main__": + DB_NAME = "social_network" + db = kuzu.Database(f"./{DB_NAME}") + CONNECTION = kuzu.Connection(db) + + main(CONNECTION) + diff --git a/neo4j/.env.example b/neo4j/.env.example new file mode 100644 index 0000000..485a2c4 --- /dev/null +++ b/neo4j/.env.example @@ -0,0 +1,3 @@ +NEO4J_VERSION = "5.10.0" +NEO4J_USER = "neo4j" +NEO4J_PASSWORD = \ No newline at end of file diff --git a/neo4j/README.md b/neo4j/README.md new file mode 100644 index 0000000..1115711 --- /dev/null +++ b/neo4j/README.md @@ -0,0 +1,155 @@ +# Neo4j graph + +This section describes how to build and query a graph of the social network data in Neo4j. It uses the official `neo4j` [Python client](https://github.com/neo4j/neo4j-python-driver) to perform the ingestion and querying. + +## Run Neo4j in a Docker container + +Because Neo4j uses a client-server architecture, for development purposes, it makes sense to use Docker to orchestrate the setup and teardown of the DB. This is done easily via `docker-compose` as follows. + +### Create a `.env` file with DB credentials + +The necessary authentication to the database is specified via the variables in `.env.example`. Copy this example file, rename it to `.env` and update the `NEO4J_PASSWORD` field with the desired DB password. + +Then, run the Docker container in detached mode as follows. + +```sh +docker compose up -d +``` + +Once development and querying are finished, the container can be stopped as follows. + +```sh +docker compose down +``` + +## Build graph + +The script `build_graph.py` contains the necessary methods to connect to the Neo4j DB and ingest the data from the CSV files, in batches for large amounts of data. + +```sh +python build_graph.py --batch_size 50000 +``` + +### Ingestion performance + +As expected, the nodes load much faster than the edges, since there are many more edges than nodes. In addition, the nodes in Neo4j are indexed (via uniqueness constraints), following which the edges are created based on a match on existing nodes. The run times for ingesting nodes and edges are output to the console. + +``` +Nodes loaded in 5.4609s +Edges loaded in 60.7000s +``` + +> 💡 Ingesting the nodes/edges with a batch size of 50K takes just over 1 minute in Neo4j. The timing shown is on an M2 Macbook Pro with 16 GB of RAM. + + +## Query graph + +To verify that the graph was built correctly, the script `query.py` contains a few example queries that can be run against the DB, generating some simple statistics. + +```sh +python query.py +``` +The following questions are asked of the graph: + +* **Query 1**: Who are the top 3 most-followed persons? +* **Query 2**: In which city does the most-followed person live? +* **Query 3**: What are the top 5 cities with the lowest average age of persons? +* **Query 4**: How many persons between ages 30-40 are there in each country? + +#### Output + +``` +Query 1 completed in 0.016359s + +Query 1: + + MATCH (follower:Person)-[:FOLLOWS]->(person:Person) + RETURN person.personID AS personID, person.name AS name, count(follower) AS numFollowers + ORDER BY numFollowers DESC LIMIT 3 + +Top 3 most-followed persons: +shape: (3, 3) +┌──────────┬────────────────┬──────────────┐ +│ personID ┆ name ┆ numFollowers │ +│ --- ┆ --- ┆ --- │ +│ i64 ┆ str ┆ i64 │ +╞══════════╪════════════════╪══════════════╡ +│ 85723 ┆ Rachel Cooper ┆ 4998 │ +│ 68753 ┆ Claudia Booker ┆ 4985 │ +│ 54696 ┆ Brian Burgess ┆ 4976 │ +└──────────┴────────────────┴──────────────┘ +Query 2 completed in 0.002067s + +Query 2: + + MATCH (follower:Person) -[:FOLLOWS]-> (person:Person) + WITH person, count(follower) as followers + ORDER BY followers DESC LIMIT 1 + MATCH (person) -[:LIVES_IN]-> (city:City) + RETURN person.name AS name, followers AS numFollowers, city.city AS city, city.state AS state, city.country AS country + +City in which most-followed person lives: +shape: (1, 5) +┌───────────────┬──────────────┬────────┬───────┬───────────────┐ +│ name ┆ numFollowers ┆ city ┆ state ┆ country │ +│ --- ┆ --- ┆ --- ┆ --- ┆ --- │ +│ str ┆ i64 ┆ str ┆ str ┆ str │ +╞═══════════════╪══════════════╪════════╪═══════╪═══════════════╡ +│ Rachel Cooper ┆ 4998 ┆ Austin ┆ Texas ┆ United States │ +└───────────────┴──────────────┴────────┴───────┴───────────────┘ +Query 3 completed in 0.002484s + +Query 3: + + MATCH (p:Person) -[:LIVES_IN]-> (c:City) -[*..2]-> (co:Country {country: $country}) + RETURN c.city AS city, avg(p.age) AS averageAge + ORDER BY averageAge LIMIT 5 + +Cities with lowest average age in Canada: +shape: (5, 2) +┌───────────┬────────────┐ +│ city ┆ averageAge │ +│ --- ┆ --- │ +│ str ┆ f64 │ +╞═══════════╪════════════╡ +│ Montreal ┆ 37.310934 │ +│ Calgary ┆ 37.592098 │ +│ Toronto ┆ 37.705746 │ +│ Edmonton ┆ 37.931609 │ +│ Vancouver ┆ 38.011002 │ +└───────────┴────────────┘ +Query 4 completed in 0.001473s + +Query 4: + + MATCH (p:Person)-[:LIVES_IN]->(ci:City)-[*..2]->(country:Country) + WHERE p.age > $age_lower AND p.age < $age_upper + RETURN country.country AS countries, count(country) AS personCounts + ORDER BY personCounts DESC LIMIT 3 + +Persons between ages 30-40 in each country: +shape: (3, 2) +┌────────────────┬──────────────┐ +│ countries ┆ personCounts │ +│ --- ┆ --- │ +│ str ┆ i64 │ +╞════════════════╪══════════════╡ +│ United States ┆ 24983 │ +│ Canada ┆ 2514 │ +│ United Kingdom ┆ 1498 │ +└────────────────┴──────────────┘ +Query script completed in 2.822013s +``` + +### Query performance + +Query times for simple aggregation and path finding are relatively low. More advanced queries involving variable length paths will be studied later. + +Summary of run times: + +* Query 1: `0.016359s` +* Query 2: `0.002067s` +* Query 3: `0.002484s` +* Query 4: `0.001473s` + +> 💡 Query 1 takes the longest to run -- around 16 ms. Queries 2-4 (excluding data processing in Python after retrieval) takes of the order of 2 ms. The timing shown is for queries run on an M2 Macbook Pro with 16 GB of RAM. diff --git a/neo4j/build_graph.py b/neo4j/build_graph.py new file mode 100644 index 0000000..60f60bf --- /dev/null +++ b/neo4j/build_graph.py @@ -0,0 +1,251 @@ +import argparse +import os +from pathlib import Path +from typing import Any, Callable + +import polars as pl +from codetiming import Timer +from dotenv import load_dotenv +from neo4j import GraphDatabase, ManagedTransaction, Session +from polars.io.csv.batched_reader import BatchedCsvReader + +load_dotenv() + +DATA_PATH = Path(__file__).resolve().parents[1] / "data" +NODES_PATH = DATA_PATH / "output" / "nodes" +EDGES_PATH = DATA_PATH / "output" / "edges" +# Config +URI = "bolt://localhost:7687" +NEO4J_USER = os.environ.get("NEO4J_USER") +NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD") + +# Custom types +JsonBlob = dict[str, Any] + +# --- Nodes --- + + +def read_nodes_person(batch_size: int) -> BatchedCsvReader: + """Process person nodes CSV file in batches""" + csv_reader = pl.read_csv_batched( + f"{NODES_PATH}/persons.csv", separator="|", try_parse_dates=True, batch_size=batch_size + ) + return csv_reader + + +def merge_nodes_person(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MERGE (p:Person {personID: row.id}) + SET p += row + """ + tx.run(query, data=data) + print(f"Created {len(data)} person nodes") + + +def merge_nodes_interests(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MERGE (i:Interest {interestID: row.id}) + SET i += row + """ + tx.run(query, data=data) + print(f"Created {len(data)} interest nodes") + + +def merge_nodes_cities(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MERGE (ci:City {cityID: row.id}) + SET ci += row + """ + tx.run(query, data=data) + print(f"Created {len(data)} city nodes") + + +def merge_nodes_states(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MERGE (s:State {stateID: row.id}) + SET s += row + """ + tx.run(query, data=data) + print(f"Created {len(data)} state nodes") + + +def merge_nodes_countries(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MERGE (co:Country {countryID: row.id}) + SET co += row + """ + tx.run(query, data=data) + print(f"Created {len(data)} country nodes") + + +# --- Edges --- + + +def read_edges_person(batch_size: int) -> None: + csv_reader = pl.read_csv_batched( + f"{EDGES_PATH}/follows.csv", separator="|", batch_size=batch_size + ) + return csv_reader + + +def read_edges_interests(batch_size: int) -> None: + csv_reader = pl.read_csv_batched( + f"{EDGES_PATH}/interests.csv", separator="|", batch_size=batch_size + ) + return csv_reader + + +def read_edges_lives_in(batch_size: int) -> None: + csv_reader = pl.read_csv_batched( + f"{EDGES_PATH}/lives_in.csv", separator="|", batch_size=batch_size + ) + return csv_reader + + +def read_edges_city_in(batch_size: int) -> None: + csv_reader = pl.read_csv_batched( + f"{EDGES_PATH}/city_in.csv", separator="|", batch_size=batch_size + ) + return csv_reader + + +def read_edges_state_in(batch_size: int) -> None: + csv_reader = pl.read_csv_batched( + f"{EDGES_PATH}/state_in.csv", separator="|", batch_size=batch_size + ) + return csv_reader + + +def merge_edges_person(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MATCH (p1:Person {personID: row.from}) + MATCH (p2:Person {personID: row.to}) + MERGE (p1)-[:FOLLOWS]->(p2) + """ + tx.run(query, data=data) + print(f"Created {len(data)} person-follower edges") + + +def merge_edges_interests(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MATCH (p:Person {personID: row.from}) + MATCH (i:Interest {interestID: row.to}) + MERGE (p)-[:HAS_INTEREST]->(i) + """ + tx.run(query, data=data) + print(f"Created {len(data)} person-interest edges") + + +def merge_edges_lives_in(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MATCH (p:Person {personID: row.from}) + MATCH (ci:City {cityID: row.to}) + MERGE (p)-[:LIVES_IN]->(ci) + """ + tx.run(query, data=data) + print(f"Created {len(data)} person-city edges") + + +def merge_edges_city_in(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MATCH (ci:City {cityID: row.from}) + MATCH (s:State {stateID: row.to}) + MERGE (ci)-[:CITY_IN]->(s) + """ + tx.run(query, data=data) + print(f"Created {len(data)} city-state edges") + + +def merge_edges_state_in(tx: ManagedTransaction, data: list[JsonBlob]) -> None: + query = """ + UNWIND $data AS row + MATCH (s:State {stateID: row.from}) + MATCH (co:Country {countryID: row.to}) + MERGE (s)-[:STATE_IN]->(co) + """ + tx.run(query, data=data) + print(f"Created {len(data)} state-country edges") + + +# --- Run functions --- + + +def ingest_in_batches(session: Session, read_func: Callable, merge_func: Callable) -> None: + reader = read_func(batch_size=BATCH_SIZE) + batches = reader.next_batches(BATCH_SIZE) + for i, batch in enumerate(batches): + # Convert DataFrame to a list of dictionaries + data = batch.to_dicts() + # Create person nodes + session.execute_write(merge_func, data=data) + + +def create_indexes_and_constraints(session: Session) -> None: + queries = [ + # constraints + "CREATE CONSTRAINT personID IF NOT EXISTS FOR (p:Person) REQUIRE p.personID IS UNIQUE ", + "CREATE CONSTRAINT cityID IF NOT EXISTS FOR (ci:City) REQUIRE ci.cityID IS UNIQUE ", + "CREATE CONSTRAINT countryID IF NOT EXISTS FOR (co:Country) REQUIRE co.countryID IS UNIQUE ", + "CREATE CONSTRAINT stateID IF NOT EXISTS FOR (s:State) REQUIRE s.stateID IS UNIQUE ", + "CREATE CONSTRAINT interestID IF NOT EXISTS FOR (i:Interest) REQUIRE i.interestID IS UNIQUE ", + ] + for query in queries: + session.run(query) + + +def write_nodes(session: Session) -> None: + # Write person nodes in batches + ingest_in_batches(session, read_nodes_person, merge_nodes_person) + # Write interest nodes + interests = pl.read_csv(f"{NODES_PATH}/interests.csv", separator="|") + session.execute_write(merge_nodes_interests, data=interests.to_dicts()) + # Write city nodes + cities = pl.read_csv(f"{NODES_PATH}/cities.csv", separator="|") + session.execute_write(merge_nodes_cities, data=cities.to_dicts()) + # Write state nodes + states = pl.read_csv(f"{NODES_PATH}/states.csv", separator="|") + session.execute_write(merge_nodes_states, data=states.to_dicts()) + # Write country nodes + countries = pl.read_csv(f"{NODES_PATH}/countries.csv", separator="|") + session.execute_write(merge_nodes_countries, data=countries.to_dicts()) + + +def write_edges(session: Session) -> None: + ingest_in_batches(session, read_edges_person, merge_edges_person) + ingest_in_batches(session, read_edges_interests, merge_edges_interests) + ingest_in_batches(session, read_edges_lives_in, merge_edges_lives_in) + ingest_in_batches(session, read_edges_city_in, merge_edges_city_in) + ingest_in_batches(session, read_edges_state_in, merge_edges_state_in) + + +def main() -> None: + with GraphDatabase.driver(URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver: + with driver.session(database="neo4j") as session: + # Create indexes and constraints + create_indexes_and_constraints(session) + with Timer(name="nodes", text="Nodes loaded in {:.4f}s"): + # Write nodes + write_nodes(session) + with Timer(name="edges", text="Edges loaded in {:.4f}s"): + # Write edges after nodes have been created + write_edges(session) + + +if __name__ == "__main__": + parser = argparse.ArgumentParser("Build Neo4j graph from files") + parser.add_argument( + "--batch_size", "-b", type=int, default=50_000, help="Batch size of CSV reader" + ) + args = parser.parse_args() + + BATCH_SIZE = args.batch_size + main() diff --git a/neo4j/docker-compose.yml b/neo4j/docker-compose.yml new file mode 100644 index 0000000..683bfb1 --- /dev/null +++ b/neo4j/docker-compose.yml @@ -0,0 +1,25 @@ +version: '3.9' + +services: + neo4j: + image: neo4j:${NEO4J_VERSION} + restart: unless-stopped + environment: + - NEO4J_AUTH=${NEO4J_USER}/${NEO4J_PASSWORD} + # DB and server + - NEO4J_server_memory_pagecache_size=1G + - NEO4J_server_memory_heap_initial__size=1G + - NEO4J_server_memory_heap_max__size=2G + ports: + - 7687:7687 + volumes: + - logs:/logs + - data:/data + - plugins:/plugins + - import:/import + +volumes: + logs: + data: + plugins: + import: diff --git a/neo4j/query.py b/neo4j/query.py new file mode 100644 index 0000000..bf7f65f --- /dev/null +++ b/neo4j/query.py @@ -0,0 +1,88 @@ +""" +Run a series of queries on the Neo4j database +""" +import os + +import polars as pl +from codetiming import Timer +from dotenv import load_dotenv +from neo4j import GraphDatabase, Session + +load_dotenv() +# Config +URI = "bolt://localhost:7687" +NEO4J_USER = os.environ.get("NEO4J_USER") +NEO4J_PASSWORD = os.environ.get("NEO4J_PASSWORD") + + +def run_query1(session: Session) -> None: + "Who are the top 3 most-followed persons in the network?" + query = """ + MATCH (follower:Person)-[:FOLLOWS]->(person:Person) + RETURN person.personID AS personID, person.name AS name, count(follower) AS numFollowers + ORDER BY numFollowers DESC LIMIT 3 + """ + with Timer(name="query1", text="Query 1 completed in {:.6f}s"): + response = session.run(query) + print(f"\nQuery 1:\n {query}") + result = pl.from_dicts(response.data()) + print(f"Top 3 most-followed persons:\n{result}") + + +def run_query2(session: Session) -> None: + "In which city does the most-followed person in the network live?" + query = """ + MATCH (follower:Person) -[:FOLLOWS]-> (person:Person) + WITH person, count(follower) as followers + ORDER BY followers DESC LIMIT 1 + MATCH (person) -[:LIVES_IN]-> (city:City) + RETURN person.name AS name, followers AS numFollowers, city.city AS city, city.state AS state, city.country AS country + """ + with Timer(name="query2", text="Query 2 completed in {:.6f}s"): + response = session.run(query) + print(f"\nQuery 2:\n {query}") + result = pl.from_dicts(response.data()) + print(f"City in which most-followed person lives:\n{result}") + + +def run_query3(session: Session, country: str) -> None: + "Which are the top 5 cities in a particular region of the world with the lowest average age in the network?" + query = """ + MATCH (p:Person) -[:LIVES_IN]-> (c:City) -[*..2]-> (co:Country {country: $country}) + RETURN c.city AS city, avg(p.age) AS averageAge + ORDER BY averageAge LIMIT 5 + """ + with Timer(name="query3", text="Query 3 completed in {:.6f}s"): + response = session.run(query, country=country) + print(f"\nQuery 3:\n {query}") + result = pl.from_dicts(response.data()) + print(f"Cities with lowest average age in {country}:\n{result}") + + +def run_query4(session: Session, age_lower: int, age_upper: int) -> None: + "How many persons between a certain age range are in each country?" + query = """ + MATCH (p:Person)-[:LIVES_IN]->(ci:City)-[*..2]->(country:Country) + WHERE p.age > $age_lower AND p.age < $age_upper + RETURN country.country AS countries, count(country) AS personCounts + ORDER BY personCounts DESC LIMIT 3 + """ + with Timer(name="query4", text="Query 4 completed in {:.6f}s"): + response = session.run(query, age_lower=age_lower, age_upper=age_upper) + print(f"\nQuery 4:\n {query}") + result = pl.from_dicts(response.data()) + print(f"Persons between ages {age_lower}-{age_upper} in each country:\n{result}") + + +def main() -> None: + with GraphDatabase.driver(URI, auth=(NEO4J_USER, NEO4J_PASSWORD)) as driver: + with driver.session(database="neo4j") as session: + with Timer(name="queries", text="Query script completed in {:.6f}s"): + run_query1(session) + run_query2(session) + run_query3(session, country="Canada") + run_query4(session, age_lower=30, age_upper=40) + + +if __name__ == "__main__": + main() diff --git a/requirements.txt b/requirements.txt index fa5b2f8..9927000 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,2 +1,6 @@ faker~=19.2.0 -polars~=0.18.0 \ No newline at end of file +polars~=0.18.0 +kuzu>=0.0.6 +neo4j~=5.11.0 +python-dotenv>=1.0.0 +codetiming~=1.4.0 \ No newline at end of file