Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

write trip matrices #311

Merged
merged 1 commit into from
Apr 13, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions activitysim/abm/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,3 +27,4 @@
from . import trip_purpose
from . import trip_purpose_and_destination
from . import trip_scheduling
from . import trip_matrices
147 changes: 147 additions & 0 deletions activitysim/abm/models/trip_matrices.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
# ActivitySim
# See full license in LICENSE.txt.

import logging

import openmatrix as omx
import pandas as pd
import numpy as np

from activitysim.core import config
from activitysim.core import inject
from activitysim.core import pipeline

from .util import expressions
from .util.expressions import skim_time_period_label

logger = logging.getLogger(__name__)


@inject.step()
def write_trip_matrices(trips, skim_dict, skim_stack):
"""
Write trip matrices step.

Adds boolean columns to local trips table via annotation expressions,
then aggregates trip counts and writes OD matrices to OMX. Save annotated
trips table to pipeline if desired.
"""

model_settings = config.read_model_settings('write_trip_matrices.yaml')
trips_df = annotate_trips(trips, skim_dict, skim_stack, model_settings)

if bool(model_settings.get('SAVE_TRIPS_TABLE')):
pipeline.replace_table('trips', trips_df)

logger.info('Aggregating trips...')
aggregate_trips = trips_df.groupby(['origin', 'destination'], sort=False).sum()
logger.info('Finished.')

orig_vals = aggregate_trips.index.get_level_values('origin')
dest_vals = aggregate_trips.index.get_level_values('destination')

zone_index = pipeline.get_table('land_use').index
assert all(zone in zone_index for zone in orig_vals)
assert all(zone in zone_index for zone in dest_vals)

_, orig_index = zone_index.reindex(orig_vals)
_, dest_index = zone_index.reindex(dest_vals)

write_matrices(aggregate_trips, zone_index, orig_index, dest_index, model_settings)


def annotate_trips(trips, skim_dict, skim_stack, model_settings):
"""
Add columns to local trips table. The annotator has
access to the origin/destination skims and everything
defined in the model settings CONSTANTS.

Pipeline tables can also be accessed by listing them under
TABLES in the preprocessor settings.
"""

trips_df = trips.to_frame()

trace_label = 'trip_matrices'

# setup skim keys
assert ('trip_period' not in trips_df)
trips_df['trip_period'] = skim_time_period_label(trips_df.depart)
od_skim_wrapper = skim_dict.wrap('origin', 'destination')
odt_skim_stack_wrapper = skim_stack.wrap(left_key='origin', right_key='destination',
skim_key='trip_period')
skims = {
'od_skims': od_skim_wrapper,
"odt_skims": odt_skim_stack_wrapper
}

locals_dict = {}
constants = config.get_model_constants(model_settings)
if constants is not None:
locals_dict.update(constants)

expressions.annotate_preprocessors(
trips_df, locals_dict, skims,
model_settings, trace_label)

# Data will be expanded by an expansion weight column from
# the households pipeline table, if specified in the model settings.
hh_weight_col = model_settings.get('HH_EXPANSION_WEIGHT_COL')

if hh_weight_col and hh_weight_col not in trips_df:
logger.info("adding '%s' from households to trips table" % hh_weight_col)
household_weights = pipeline.get_table('households')[hh_weight_col]
trips_df[hh_weight_col] = trips_df.household_id.map(household_weights)

return trips_df


def write_matrices(aggregate_trips, zone_index, orig_index, dest_index, model_settings):
"""
Write aggregated trips to OMX format.

The MATRICES setting lists the new OMX files to write.
Each file can contain any number of 'tables', each specified by a
table key ('name') and a trips table column ('data_field') to use
for aggregated counts.

Any data type may be used for columns added in the annotation phase,
but the table 'data_field's must be summable types: ints, floats, bools.
"""

matrix_settings = model_settings.get('MATRICES')

if not matrix_settings:
logger.error('Missing MATRICES setting in write_trip_matrices.yaml')

for matrix in matrix_settings:
filename = matrix.get('file_name')
filepath = config.output_file_path(filename)
logger.info('opening %s' % filepath)
file = omx.open_file(filepath, 'w') # possibly overwrite existing file
table_settings = matrix.get('tables')

for table in table_settings:
table_name = table.get('name')
col = table.get('data_field')

if col not in aggregate_trips:
logger.error('missing %s column in %s DataFrame' % (col, aggregate_trips.name))
return

hh_weight_col = model_settings.get('HH_EXPANSION_WEIGHT_COL')
if hh_weight_col:
aggregate_trips[col] = aggregate_trips[col] / aggregate_trips[hh_weight_col]

data = np.zeros((len(zone_index), len(zone_index)))
data[orig_index, dest_index] = aggregate_trips[col]
logger.info('writing %s' % table_name)
file[table_name] = data # write to file

# include the index-to-zone map in the file
logger.info('adding %s mapping for %s zones to %s' %
(zone_index.name, zone_index.size, filename))
file.create_mapping(zone_index.name, zone_index.to_numpy())

logger.info('closing %s' % filepath)
file.close()
2 changes: 1 addition & 1 deletion activitysim/abm/models/trip_scheduling.py
Original file line number Diff line number Diff line change
Expand Up @@ -319,7 +319,7 @@ def schedule_trips_in_leg(
trips = trips.sort_index()
trips['next_trip_id'] = np.roll(trips.index, -1 if outbound else 1)
is_final = (trips.trip_num == trips.trip_count) if outbound else (trips.trip_num == 1)
trips.next_trip_id = trips.next_trip_id.where(is_final, NO_TRIP_ID)
trips.next_trip_id = trips.next_trip_id.where(~is_final, NO_TRIP_ID)

# iterate over outbound trips in ascending trip_num order, skipping the initial trip
# iterate over inbound trips in descending trip_num order, skipping the finial trip
Expand Down
18 changes: 14 additions & 4 deletions activitysim/abm/tables/households.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,11 @@
def households(households_sample_size, override_hh_ids, trace_hh_id):

df_full = read_input_table("households")
households_sliced = False
tot_households = df_full.shape[0]

logger.info("full household list contains %s households" % tot_households)

logger.info("full household list contains %s households" % df_full.shape[0])
households_sliced = False

# only using households listed in override_hh_ids
if override_hh_ids is not None:
Expand All @@ -48,9 +50,9 @@ def households(households_sample_size, override_hh_ids, trace_hh_id):
households_sliced = True

# if we need a subset of full store
elif households_sample_size > 0 and df_full.shape[0] > households_sample_size:
elif tot_households > households_sample_size > 0:

logger.info("sampling %s of %s households" % (households_sample_size, df_full.shape[0]))
logger.info("sampling %s of %s households" % (households_sample_size, tot_households))

"""
Because random seed is set differently for each step, sampling of households using
Expand Down Expand Up @@ -80,6 +82,14 @@ def households(households_sample_size, override_hh_ids, trace_hh_id):
# persons table
inject.add_injectable('households_sliced', households_sliced)

if 'sample_rate' not in df.columns:
if households_sample_size == 0:
sample_rate = 1
else:
sample_rate = round(households_sample_size / tot_households, 3)

df['sample_rate'] = sample_rate

logger.info("loaded households %s" % (df.shape,))

# FIXME - pathological knowledge of name of chunk_id column used by chunked_choosers_by_chunk_id
Expand Down
21 changes: 20 additions & 1 deletion activitysim/abm/test/test_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,12 @@
import logging
import pkg_resources

import openmatrix as omx
import numpy as np
import numpy.testing as npt

import pandas as pd
import pandas.util.testing as pdt
import pandas.testing as pdt
import pytest
import yaml

Expand All @@ -17,6 +21,7 @@

# set the max households for all tests (this is to limit memory use on travis)
HOUSEHOLDS_SAMPLE_SIZE = 100
HOUSEHOLDS_SAMPLE_RATE = 0.02 # HOUSEHOLDS_SAMPLE_RATE / 5000 households

# household with mandatory, non mandatory, atwork_subtours, and joint tours
HH_ID = 257341
Expand Down Expand Up @@ -52,6 +57,7 @@ def setup_dirs(configs_dir, data_dir=None):
tracing.delete_output_files('csv')
tracing.delete_output_files('txt')
tracing.delete_output_files('yaml')
tracing.delete_output_files('omx')


def teardown_function(func):
Expand Down Expand Up @@ -433,6 +439,18 @@ def regress():
# should be at least two tours per trip
assert trips_df.shape[0] >= 2*tours_df.shape[0]

# write_trip_matrices
trip_matrices_file = config.output_file_path('trips_md.omx')
assert os.path.exists(trip_matrices_file)
trip_matrices = omx.open_file(trip_matrices_file)
assert trip_matrices.shape() == (25, 25)

assert 'WALK_MD' in trip_matrices.list_matrices()
walk_trips = np.array(trip_matrices['WALK_MD'])
assert walk_trips.dtype == np.dtype('float64')

trip_matrices.close()


def test_full_run1():

Expand Down Expand Up @@ -517,6 +535,7 @@ def test_full_run5_singleton():

if __name__ == "__main__":

from activitysim import abm # register injectables
print("running test_full_run1")
test_full_run1()
# teardown_function(None)
1 change: 1 addition & 0 deletions activitysim/examples/example_mtc/configs/settings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ models:
- trip_mode_choice
- write_data_dictionary
- track_skim_usage
- write_trip_matrices
- write_tables

# to resume after last successful checkpoint, specify resume_after: _
Expand Down
Loading