Skip to content

Commit

Permalink
Merge pull request #10 from wsp-sag/develop
Browse files Browse the repository at this point in the history
Bring in the latest Develop
  • Loading branch information
i-am-sijia authored Feb 7, 2024
2 parents 42d4e8c + fc06d62 commit a3cb622
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 5 deletions.
7 changes: 7 additions & 0 deletions activitysim/core/configuration/top.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
from pathlib import Path
from typing import Any, Literal

from pydantic import validator

from activitysim.core.configuration.base import PydanticBase, Union


Expand Down Expand Up @@ -119,6 +121,11 @@ class OutputTables(PydanticBase):
h5_store: bool = False
"""Write tables into a single HDF5 store instead of individual CSVs."""

file_type: Literal["csv", "parquet", "h5"] = "csv"
"""
Specifies the file type for output tables. Options are limited to 'csv',
'h5' or 'parquet'. Only applied if h5_store is set to False."""

action: str
"""Whether to 'include' or 'skip' the enumerated tables in `tables`."""

Expand Down
33 changes: 28 additions & 5 deletions activitysim/core/steps/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import pandas as pd
import pyarrow as pa
import pyarrow.csv as csv
import pyarrow.parquet as parquet

from activitysim.core import configuration, workflow
from activitysim.core.workflow.checkpoint import CHECKPOINT_NAME
Expand Down Expand Up @@ -226,8 +227,13 @@ def write_data_dictionary(state: workflow.State) -> None:
@workflow.step
def write_tables(state: workflow.State) -> None:
"""
Write pipeline tables as csv files (in output directory) as specified by output_tables list
in settings file.
Write pipeline tables as csv or parquet files (in output directory) as specified
by output_tables list in settings file. Output to parquet or a single h5 file is
also supported.
'h5_store' defaults to False, which means the output will be written out to csv.
'file_type' defaults to 'csv' but can also be used to specify 'parquet' or 'h5'.
When 'h5_store' is set to True, 'file_type' is ingored and the outputs are written to h5.
'output_tables' can specify either a list of output tables to include or to skip
if no output_tables list is specified, then all checkpointed tables will be written
Expand Down Expand Up @@ -261,6 +267,16 @@ def write_tables(state: workflow.State) -> None:
tables:
- households
To write tables to parquet files, use the file_type setting:
::
output_tables:
file_type: parquet
action: include
tables:
- households
Parameters
----------
output_dir: str
Expand All @@ -277,6 +293,7 @@ def write_tables(state: workflow.State) -> None:
tables = output_tables_settings.tables
prefix = output_tables_settings.prefix
h5_store = output_tables_settings.h5_store
file_type = output_tables_settings.file_type
sort = output_tables_settings.sort

registered_tables = state.registered_tables()
Expand Down Expand Up @@ -388,14 +405,20 @@ def map_func(x):
):
dt = dt.drop([f"_original_{lookup_col}"])

if h5_store:
if h5_store or file_type == "h5":
file_path = state.get_output_file_path("%soutput_tables.h5" % prefix)
dt.to_pandas().to_hdf(
str(file_path), key=table_name, mode="a", format="fixed"
)

else:
file_name = f"{prefix}{table_name}.csv"
file_name = f"{prefix}{table_name}.{file_type}"
file_path = state.get_output_file_path(file_name)

# include the index if it has a name or is a MultiIndex
csv.write_csv(dt, file_path)
if file_type == "csv":
csv.write_csv(dt, file_path)
elif file_type == "parquet":
parquet.write_table(dt, file_path)
else:
raise ValueError(f"unknown file_type {file_type}")

0 comments on commit a3cb622

Please sign in to comment.