Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add crawling of VEA industrial load profiles #20

Merged
merged 35 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
1f220fe
Extended gitignore for virtual environment
ChriKo97 Oct 21, 2024
cccec97
Init commit with request and file extraction
ChriKo97 Oct 21, 2024
ee0ec3e
Renamed func
ChriKo97 Oct 22, 2024
2ffef79
Added exception returns and file reading func
ChriKo97 Oct 22, 2024
c82df04
Function for creating timestep datetime dictionary
ChriKo97 Oct 22, 2024
5607b8f
Added conversion to UTC
ChriKo97 Oct 22, 2024
470d92d
Removed exception from reading files
ChriKo97 Oct 22, 2024
0c4a93a
Added transforming function
ChriKo97 Oct 22, 2024
d76f134
Function calls for dict creation and transformation
ChriKo97 Oct 22, 2024
ce2b6a1
Removed unused exceptions
ChriKo97 Oct 22, 2024
037a75a
Function and call for inserting into database
ChriKo97 Oct 22, 2024
2181d94
Added hyper table conversion
ChriKo97 Oct 22, 2024
80abbd2
Fixed function call
ChriKo97 Oct 22, 2024
de40a8e
Added metadata
ChriKo97 Oct 22, 2024
1779265
Better logging
ChriKo97 Oct 22, 2024
6f380fc
Changed order for timestamp creation
ChriKo97 Oct 22, 2024
1576ae1
better logging and docstrings
ChriKo97 Oct 22, 2024
a1ca77e
Bugfix
ChriKo97 Oct 22, 2024
bc6141f
Splitted writing to database
ChriKo97 Oct 22, 2024
4335b58
Better logging
ChriKo97 Oct 22, 2024
bf4a121
Schema creation func
ChriKo97 Oct 22, 2024
5ed589a
Reworked structure to minimize ram usage
ChriKo97 Oct 22, 2024
bc95650
Removed unused import
ChriKo97 Oct 22, 2024
8a7ef5e
Converted to class
ChriKo97 Oct 24, 2024
7bd70cf
setting of metadata
ChriKo97 Oct 24, 2024
1ba98d6
Merged requesting and extracting to one function
ChriKo97 Oct 24, 2024
554e8d7
Changed schema name to snake case
ChriKo97 Oct 24, 2024
4ed046c
Exception block for hyptertable creation
ChriKo97 Oct 24, 2024
57e4a41
Changed license
ChriKo97 Oct 24, 2024
de65e6d
Fixed error message in hypertable creation
ChriKo97 Oct 24, 2024
4e5e362
Fixed df var not associated
ChriKo97 Oct 24, 2024
ad89720
Renamed var to fix AttributeError
ChriKo97 Oct 24, 2024
4a1ca99
Fixed create table error
ChriKo97 Oct 24, 2024
c6c860f
Fixed hardcoded schema name in insertion
ChriKo97 Oct 24, 2024
89a2445
converting master files columns to lowercase
ChriKo97 Oct 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ ninja_*
*.log
*.ipynb
config.py
/env/
.env
.venv
crawler/data/Waermebedarf_NRW.gdb/
Expand Down
237 changes: 237 additions & 0 deletions crawler/vea_industrial_load_profiles.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,237 @@
import logging
import io

import zipfile
import requests
import pandas as pd
from sqlalchemy import text
from tqdm import tqdm

from common.base_crawler import BaseCrawler

log = logging.getLogger("vea-industrial-load-profiles")


metadata_info = {
"schema_name": "vea_industrial_load_profiles",
"data_date": "2016-01-01",
"data_source": "https://zenodo.org/records/13910298",
"license": "CC-BY-4.0",
"description": """The data consists of 5359 one-year quarterhourly industrial load profiles (2016, leap year, 35136 values).
Each describes the electricity consumption of one industrial commercial site in Germany used for official accounting.
Local electricity generation was excluded from the data as far as it could be discovered (no guarantee of completeness).
Together with load profiles comes respective master data of the industrial sites as well as the information wether each quarterhour was a high load time of the connected German grid operator in 2016.
The data was collected by the VEA.
The dataset as a whole was assembled by Paul Hendrik Tiemann in 2017 by selecting complete load profiles without effects of renewable generation from a VEA internal database.
It is a research dataset and was used for master theses and publications.""",
"contact": "komanns@fh-aachen.de",
"temporal_start": "2016-01-01 00:00:00",
"temporal_end": "2016-12-31 23:45:00",
"concave_hull_geometry": None,
}


class IndustrialLoadProfileCrawler(BaseCrawler):

def __init__(self, schema_name):
super().__init__(schema_name)

self.schema_name = schema_name


def request_extract_zip_archive(self):
"""
Requests zip archive for industrial load profiles from zenodo.
"""

url = "https://zenodo.org/records/13910298/files/load-profile-data.zip?download=1"

log.info("Requesting zip archive from zenodo")

response = requests.get(url)

response.raise_for_status()

log.info("Succesfully requested zip archive from zenodo")

with zipfile.ZipFile(io.BytesIO(response.content)) as thezip:
self.master_data_file = thezip.open(name="master_data_tabsep.csv")
self.hlt_profiles_file = thezip.open(name="hlt_profiles_tabsep.csv")
self.load_profiles_file = thezip.open(name="load_profiles_tabsep.csv")


def read_file(
self,
filename: str | None = None):
"""Reads the given file and returns contents as pd.DataFrame.

Args:
filename (str | None, default: None): The name of the file being read.
"""

log.info(f"Trying to read file {filename} into pd.DataFrame")

if filename == "master":
file = self.master_data_file
elif filename == "load":
file = self.load_profiles_file
elif filename == "hlt":
file = self.hlt_profiles_file

self.df = pd.read_csv(file, sep="\t")

log.info("Succesfully read file into pd.DataFrame")


def create_timestep_datetime_dict(
self,
columns: list[str]):
"""Creates a dictionary mapping the timesteps (time0, time1, ...) to pd.Timestamp objects.

Args:
columns (list[str]): Columns of either the load or hlt profile dataframe (the timesteps).
"""

log.info("Creating dictionary for timesteps mapping")

timesteps = list(columns.difference(["id", "Unnamed: 35137"]))

timestamps = pd.date_range(
start="2016-01-01 00:00:00",
end="2016-12-31 23:45:00",
freq="15min",
tz="Europe/Berlin")

timestamps = timestamps.tz_convert("UTC")

self.timestep_datetime_map = {}
for timestep in timesteps:
idx = int(timestep.split("time")[1])
self.timestep_datetime_map[timestep] = timestamps[idx]

log.info("Succesfully created dictionary")


def transform_load_hlt_data(
self,
name: str | None = None):
"""Transform dataframe of load or hlt profiles into long format.

Args:
name (str | None, default None): a
"""

log.info(f"Trying to convert {name} dataframe")

# remove unused column
self.df.drop(columns="Unnamed: 35137", inplace=True)

# change to wide format
self.df = self.df.melt(id_vars="id", var_name="timestamp")

# map timestamps onto timestamp column
self.df["timestamp"] = self.df["timestamp"].map(self.timestep_datetime_map)

log.info("Succesfully converted hlt / load profile")


def write_to_database(
self,
name: str) -> None:
"""Writes dataframe to database.

Args:
name (str): The name of the table to insert data to.
"""

log.info(f"Trying to write {name} to database")

rows = 200000
list_df = [self.df[i:i+rows] for i in range(0, self.df.shape[0], rows)]

for df in tqdm(list_df):
df.to_sql(
name=name,
con=self.engine,
if_exists="append",
schema=self.schema_name,
index=False)

log.info("Succesfully inserted into databse")


def lower_column_names(self):

self.df.columns = [x.lower() for x in self.df.columns]


def convert_to_hypertable(
self,
relation_name: str):
"""
Converts table to hypertable.

Args:
relation_name (str): The relation to convert to hypertable.
"""

log.info("Trying to create hypertable")

try:
with self.engine.begin() as conn:
query = text(
f"SELECT public.create_hypertable('{relation_name}', 'timestamp', if_not_exists => TRUE, migrate_data => TRUE);"
)
conn.execute(query)

log.info("Succesfully create hypertable")

except Exception as e:
log.error(f"Could not create hyptertable for {relation_name}: {e}")


def main():

# create crawler instance
ilp_crawler = IndustrialLoadProfileCrawler("vea_industrial_load_profiles")

# request zip archive
ilp_crawler.request_extract_zip_archive()

# read load_data
ilp_crawler.read_file(filename="load")

# create timestamp dictionary to replace "timeX" with datetime object
ilp_crawler.create_timestep_datetime_dict(ilp_crawler.df.columns)

# transform and write load data
ilp_crawler.transform_load_hlt_data(name="load")
ilp_crawler.write_to_database(name="load")

# read, transform and write hlt data
ilp_crawler.read_file(filename="hlt")
ilp_crawler.transform_load_hlt_data(name="hlt")
ilp_crawler.write_to_database(name="high_load_times")

# read in master data and write to database
ilp_crawler.read_file(filename="master")
ilp_crawler.lower_column_names()
ilp_crawler.write_to_database(name="master")

# convert to hypertable
ilp_crawler.convert_to_hypertable("high_load_times")
ilp_crawler.convert_to_hypertable("load")

# set metadata
ilp_crawler.set_metadata(metadata_info)


if __name__ == "__main__":

logging.basicConfig(
level=logging.INFO,
format='%(asctime)s.%(msecs)03d %(levelname)s %(module)s - %(funcName)s: %(message)s',
datefmt='%d-%m-%Y %H:%M:%S')

main()

Loading