Skip to content

Commit

Permalink
fix schema and metadata propagation
Browse files Browse the repository at this point in the history
  • Loading branch information
maurerle committed Oct 23, 2024
1 parent 37b59db commit 413dd41
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 22 deletions.
2 changes: 1 addition & 1 deletion crawler/common/base_crawler.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ def set_metadata(self, metadata_info: dict[str, str]) -> None:

def create_schema_only(engine, schema_name: str) -> None:
with engine.begin() as conn:
conn.execute(text(f"CREATE SCHEMA IF NOT EXISTS {schema_name}"))
conn.execute(text(f'CREATE SCHEMA IF NOT EXISTS "{schema_name}"'))


def set_metadata_only(engine, metadata_info: dict[str, str]):
Expand Down
32 changes: 11 additions & 21 deletions crawler/vea_industrial_load_profiles.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
from sqlalchemy import create_engine, text
from tqdm import tqdm

from common.base_crawler import create_schema_only, set_metadata_only
from common.config import db_uri


log = logging.getLogger("vea-industrial-load-profiles")


Expand Down Expand Up @@ -181,26 +183,13 @@ def write_to_database(db_conn: str, data: pd.DataFrame, name: str) -> None:
name=name,
con=engine,
if_exists="append",
schema="vea-industrial-load-profiles",
index=False,
)

log.info("Successfully inserted into databse")


def create_schema():
log.info("Trying to create schema")

engine = create_engine(db_uri)

with engine.begin() as conn:
query = text('CREATE SCHEMA IF NOT EXISTS "vea-industrial-load-profiles"')
conn.execute(query)

log.info("Succesfully created schema")


def convert_to_hypertable(relation_name: str):
def convert_to_hypertable(db_conn: str, relation_name: str):
"""
Converts table to hypertable.
Expand Down Expand Up @@ -233,8 +222,9 @@ def main(schema_name):
# extract files from response
master_file, hlt_file, load_file = extract_files(response=response)

# creat schema
create_schema()
# create schema
engine = create_engine(db_uri(schema_name))
create_schema_only(engine, schema_name)

# read load_data
load_data = read_file(load_file, filename="load")
Expand Down Expand Up @@ -264,10 +254,10 @@ def main(schema_name):
del master_data

# convert to hypertable
convert_to_hypertable(db_conn=db_conn, "high_load_times")
convert_to_hypertable(db_conn=db_conn, "load")
convert_to_hypertable("high_load_times")
convert_to_hypertable("load")
convert_to_hypertable(db_conn, "high_load_times")
convert_to_hypertable(db_conn, "load")

set_metadata_only(engine, metadata_info)


if __name__ == "__main__":
Expand All @@ -277,4 +267,4 @@ def main(schema_name):
datefmt="%d-%m-%Y %H:%M:%S",
)

main()
main("vea-industrial-load-profiles")

0 comments on commit 413dd41

Please sign in to comment.