Skip to content

Commit

Permalink
Merge pull request #3 from e-mission/master
Browse files Browse the repository at this point in the history
Merge server changes to figure out what's going on with overpass test
  • Loading branch information
nataliejschultz authored Jul 11, 2024
2 parents 02f8e54 + 99d9c21 commit d9b1ea8
Show file tree
Hide file tree
Showing 40 changed files with 958 additions and 386 deletions.
7 changes: 6 additions & 1 deletion .docker/setup_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ source setup/setup.sh
## 10/02 - Mukul
## - Above comments talk about manually updating cryptography to version 40
## - I have upgraded to 41.0.4 as per latest vulnerability fixes.
conda install -c conda-forge cryptography=41.0.7 wheel=0.40.0
conda install -c conda-forge cryptography=42.0.0 wheel=0.40.0

## Remove the old, unused packages to avoid tripping up the checker
rm -rf /root/miniconda-23.1.0/pkgs/cryptography-38.0.4-py39h9ce1e76_0
Expand All @@ -26,12 +26,17 @@ rm -rf /root/miniconda-23.5.2/pkgs/urllib3-1.26.17-pyhd8ed1ab_0
rm -rf /root/miniconda-23.5.2/envs/emission/lib/python3.9/site-packages/urllib3-1.26.17.dist-info
rm -rf /root/miniconda-23.5.2/lib/python3.9/site-packages/urllib3-1.26.16.dist-info
rm -rf /root/miniconda-23.5.2/lib/python3.9/site-packages/tests
rm -rf /root/miniconda-23.5.2/lib/python3.9/site-packages/cryptography-41.0.7.dist-info

# Clean up the conda install
conda clean -t
find /root/miniconda-*/pkgs -wholename \*info/test\* -type d | xargs rm -rf
find ~/miniconda-23.5.2 -name \*tests\* -path '*/site-packages/*' | grep ".*/site-packages/tests" | xargs rm -rf

# Updating bash package to latest version manually
apt-get update
apt-get install bash=5.1-6ubuntu1.1

if [ -d "webapp/www/" ]; then
cp /index.html webapp/www/index.html
fi
Expand Down
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# python 3
FROM ubuntu:jammy-20231211.1
FROM ubuntu:jammy-20240227

MAINTAINER K. Shankari (shankari@eecs.berkeley.edu)

Expand All @@ -8,7 +8,7 @@ WORKDIR /usr/src/app
RUN apt-get -y -qq update
RUN apt-get install -y -qq curl
RUN apt-get install -y -qq wget
# RUN apt-get install -y git
RUN apt-get install -y -qq git

# install nano and vim for editing
# RUN apt-get -y install nano vim
Expand Down
5 changes: 3 additions & 2 deletions bin/debug/fix_usercache_processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@

def fix_usercache_errors():
copy_to_usercache()
print(">" * 30)
move_to_long_term()

def copy_to_usercache():
Expand All @@ -37,8 +38,8 @@ def copy_to_usercache():
logging.info("Found %d errors in this round" % edb.get_timeseries_error_db().estimated_document_count())
for error in error_it:
logging.debug("Copying entry %s" % error["metadata"])
save_result = uc.save(error)
remove_result = te.remove(error["_id"])
save_result = uc.replace_one({"_id": error['_id']}, error, upsert=True)
remove_result = te.delete_one({"_id": error["_id"]})
logging.debug("save_result = %s, remove_result = %s" % (save_result, remove_result))
logging.info("step copy_to_usercache DONE")

Expand Down
109 changes: 90 additions & 19 deletions bin/push/push_remind.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,59 @@
import arrow
import json
import logging
import os
import requests
import sys

import emission.core.get_database as edb
import emission.storage.decorations.analysis_timeseries_queries as esda
import emission.storage.decorations.user_queries as esdu
import emission.storage.timeseries.timequery as estt
import emission.net.ext_service.push.notify_usage as pnu

STUDY_CONFIG = os.getenv('STUDY_CONFIG', "stage-program")


def users_without_recent_user_input(uuid_list, recent_user_input_threshold=None):
if recent_user_input_threshold is None:
logging.debug("No recent_user_input_threshold provided, returning all users")
return uuid_list
now = arrow.now()
tq = estt.TimeQuery(
"data.start_ts",
now.shift(days=-recent_user_input_threshold).int_timestamp,
now.int_timestamp
)
filtered_uuids = []
for user_id in uuid_list:
trips = esda.get_entries(esda.CONFIRMED_TRIP_KEY, user_id, tq)
for trip in trips:
# If the trip's user_input is blank, it will be an empty dict {} which is falsy.
# A slight caveat to this is that if the trip is partially labeled (i.e. they
# labeled 'Mode' but not 'Purpose'), it will be non-empty and will be considered
# the same as if it was fully labeled.
# I think this is fine because if a user has partially labeled a trip, they have
# already seen it and bugging them again is not likely to help.
if not trip['data']['user_input']: # empty user_input is {} which is falsy
logging.debug(f"User {user_id} has trip with no user input: {trip['_id']}")
filtered_uuids.append(user_id)
break
return filtered_uuids


def bin_users_by_lang(uuid_list, langs, lang_key='phone_lang'):
uuids_by_lang = {lang: [] for lang in langs}
for user_id in uuid_list:
user_profile = edb.get_profile_db().find_one({'user_id': user_id})
user_lang = user_profile.get(lang_key) if user_profile else None
logging.debug(f"User {user_id} has phone language {user_lang}")
if user_lang not in uuids_by_lang:
logging.debug(f"{user_lang} was not one of the provided langs, defaulting to en")
user_lang = "en"
uuids_by_lang[user_lang].append(user_id)
return uuids_by_lang


if __name__ == '__main__':
logging.basicConfig(level=logging.DEBUG)
logging.debug(f"STUDY_CONFIG is {STUDY_CONFIG}")
Expand All @@ -20,23 +66,48 @@
if r.status_code != 200:
logging.debug(f"Unable to download study config, status code: {r.status_code}")
sys.exit(1)
else:
dynamic_config = json.loads(r.text)
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\
f"and data collection URL {dynamic_config['server']['connectUrl']}")

if "reminderSchemes" in dynamic_config:
logging.debug("Found flexible notification configuration, skipping server-side push")
else:
uuid_list = esdu.get_all_uuids()
json_data = {
"title": "Trip labels requested",
"message": "Please label your trips for the day"
}
response = pnu.send_visible_notification_to_users(uuid_list,
json_data["title"],
json_data["message"],
json_data,
dev = False)

dynamic_config = json.loads(r.text)
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\
f"and data collection URL {dynamic_config['server']['connectUrl']}")

if "reminderSchemes" in dynamic_config:
logging.debug("Found flexible notification configuration, skipping server-side push")
sys.exit(0)

# get push notification config (if not present in dynamic_config, use default)
push_config = dynamic_config.get('push_notifications', {
"title": {
"en": "Trip labels requested",
"es": "Etiquetas de viaje solicitadas",
},
"message": {
"en": "Please label your recent trips",
"es": "Por favor etiquete sus viajes recientes",
},
"recent_user_input_threshold": 7, # past week
})

# filter users based on recent user input and bin by language
filtered_uuids = users_without_recent_user_input(
esdu.get_all_uuids(),
push_config.get('recent_user_input_threshold')
)
filtered_uuids_by_lang = bin_users_by_lang(filtered_uuids, push_config['title'].keys())

# for each language, send a push notification to the selected users in that language
for lang, uuids_to_notify in filtered_uuids_by_lang.items():
if len(uuids_to_notify) == 0:
logging.debug(f"No users to notify in lang {lang}")
continue
logging.debug(f"Sending push notifications to {len(uuids_to_notify)} users in lang {lang}")
json_data = {
"title": push_config["title"][lang],
"message": push_config["message"][lang],
}
response = pnu.send_visible_notification_to_users(uuids_to_notify,
json_data["title"],
json_data["message"],
json_data,
dev = False)
1 change: 1 addition & 0 deletions conf/analysis/trip_model.conf.json.sample
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
"model_type": "greedy",
"model_storage": "document_database",
"minimum_trips": 14,
"maximum_stored_model_count": 3,
"model_parameters": {
"greedy": {
"metric": "od_similarity",
Expand Down
11 changes: 8 additions & 3 deletions emission/analysis/classification/inference/labels/inferrers.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,19 @@ def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first
user_id_list = []
for trip in trip_list:
user_id_list.append(trip['user_id'])
assert user_id_list.count(user_id_list[0]) == len(user_id_list), "Multiple user_ids found for trip_list, expected unique user_id for all trips"
error_message = f"""
Multiple user_ids found for trip_list, expected unique user_id for all trips.
Unique user_ids count = {len(set(user_id_list))}
{set(user_id_list)}
"""
assert user_id_list.count(user_id_list[0]) == len(user_id_list), error_message
# Assertion successful, use unique user_id
user_id = user_id_list[0]

# load model
start_model_load_time = time.process_time()
model = eamur._load_stored_trip_model(user_id, model_type, model_storage)
print(f"{arrow.now()} Inside predict_labels_n: Model load time = {time.process_time() - start_model_load_time}")
logging.debug(f"{arrow.now()} Inside predict_cluster_confidence_discounting: Model load time = {time.process_time() - start_model_load_time}")

labels_n_list = eamur.predict_labels_with_n(trip_list, model)
predictions_list = []
Expand All @@ -192,4 +197,4 @@ def predict_cluster_confidence_discounting(trip_list, max_confidence=None, first
labels = copy.deepcopy(labels)
for l in labels: l["p"] *= confidence_coeff
predictions_list.append(labels)
return predictions_list
return predictions_list
30 changes: 30 additions & 0 deletions emission/analysis/configs/dynamic_config.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
import sys
import os
import logging

import json
import requests

STUDY_CONFIG = os.getenv('STUDY_CONFIG', "stage-program")

dynamic_config = None
def get_dynamic_config():
global dynamic_config
if dynamic_config is not None:
logging.debug("Returning cached dynamic config for %s at version %s" % (STUDY_CONFIG, dynamic_config['version']))
return dynamic_config
logging.debug("No cached dynamic config for %s, downloading from server" % STUDY_CONFIG)
download_url = "https://raw.githubusercontent.com/e-mission/nrel-openpath-deploy-configs/main/configs/" + STUDY_CONFIG + ".nrel-op.json"
logging.debug("About to download config from %s" % download_url)
r = requests.get(download_url)
if r.status_code != 200:
logging.debug(f"Unable to download study config, status code: {r.status_code}")
# sys.exit(1)
# TODO what to do here? What if Github is down or something?
# If we terminate, will the pipeline just try again later?
else:
dynamic_config = json.loads(r.text)
logging.debug(f"Successfully downloaded config with version {dynamic_config['version']} "\
f"for {dynamic_config['intro']['translated_text']['en']['deployment_name']} "\
f"and data collection URL {dynamic_config['server']['connectUrl']}")
return dynamic_config
17 changes: 15 additions & 2 deletions emission/analysis/intake/segmentation/section_segmentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
import logging

# Our imports
import emission.analysis.configs.dynamic_config as eadc
import emission.storage.pipeline_queries as epq
import emission.storage.decorations.analysis_timeseries_queries as esda

Expand All @@ -22,6 +23,7 @@
import emission.core.wrapper.entry as ecwe

import emission.core.common as ecc
import emcommon.bluetooth.ble_matching as emcble

class SectionSegmentationMethod(object):
def segment_into_sections(self, timeseries, distance_from_place, time_query):
Expand Down Expand Up @@ -64,6 +66,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
ts = esta.TimeSeries.get_time_series(user_id)
time_query = esda.get_time_query_for_trip_like(esda.RAW_TRIP_KEY, trip_entry.get_id())
distance_from_place = _get_distance_from_start_place_to_end(trip_entry)
ble_entries_during_trip = ts.find_entries(["background/bluetooth_ble"], time_query)

if (trip_source == "DwellSegmentationTimeFilter"):
import emission.analysis.intake.segmentation.section_segmentation_methods.smoothed_high_confidence_motion as shcm
Expand Down Expand Up @@ -118,7 +121,16 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
# Particularly in this case, if we don't do this, then the trip end may overshoot the section end
end_loc = trip_end_loc

fill_section(section, start_loc, end_loc, sensed_mode)
# ble_sensed_mode represents the vehicle that was sensed via BLE beacon during the section.
# For now, we are going to rely on the current segmentation implementation and then fill in
# ble_sensed_mode by looking at scans within the timestamp range of the section.
# Later, we may want to actually use BLE sensor data as part of the basis for segmentation
dynamic_config = eadc.get_dynamic_config()
ble_sensed_mode = emcble.get_ble_sensed_vehicle_for_section(
ble_entries_during_trip, start_loc.ts, end_loc.ts, dynamic_config
)

fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode)
# We create the entry after filling in the section so that we know
# that the data is included properly
section_entry = ecwe.Entry.create_entry(user_id, esda.RAW_SECTION_KEY,
Expand All @@ -143,7 +155,7 @@ def segment_trip_into_sections(user_id, trip_entry, trip_source):
prev_section_entry = section_entry


def fill_section(section, start_loc, end_loc, sensed_mode):
def fill_section(section, start_loc, end_loc, sensed_mode, ble_sensed_mode=None):
section.start_ts = start_loc.ts
section.start_local_dt = start_loc.local_dt
section.start_fmt_time = start_loc.fmt_time
Expand All @@ -161,6 +173,7 @@ def fill_section(section, start_loc, end_loc, sensed_mode):
section.duration = end_loc.ts - start_loc.ts
section.source = "SmoothedHighConfidenceMotion"
section.sensed_mode = sensed_mode
section.ble_sensed_mode = ble_sensed_mode


def stitch_together(ending_section_entry, stop_entry, starting_section_entry):
Expand Down
6 changes: 6 additions & 0 deletions emission/analysis/modelling/trip_model/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,9 @@ def get_minimum_trips():



def get_maximum_stored_model_count():
maximum_stored_model_count = get_config_value_or_raise('maximum_stored_model_count')
if not isinstance(maximum_stored_model_count, int):
msg = f"config key 'maximum_stored_model_count' not an integer in config file {config_filename}"
raise TypeError(msg)
return maximum_stored_model_count
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,11 @@ def _generate_predictions(self):
probability is estimated with label_count / total_labels.
"""
for _, bin_record in self.bins.items():
user_label_df = pd.DataFrame(bin_record['labels'])
# TODO: Revisit after we have unified label and survey inputs (https://github.com/e-mission/e-mission-docs/issues/1045)
logging.debug("Filtering out any nested dictionaries from the list of dictionary labels")
filtered_label_dicts = [label_dict for label_dict in bin_record['labels'] if not any(isinstance(x, dict) for x in label_dict.values())]
logging.debug("Number of entries after filtering changed %s -> %s" % (len(bin_record['labels']), len(filtered_label_dicts)))
user_label_df = pd.DataFrame(filtered_label_dicts)
user_label_df = lp.map_labels(user_label_df).dropna()
# compute the sum of trips in this cluster
sum_trips = len(user_label_df)
Expand Down
5 changes: 3 additions & 2 deletions emission/analysis/modelling/trip_model/run_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ def predict_labels_with_n(
"""

predictions_list = []
print(f"{arrow.now()} Inside predict_labels_n: Predicting...")
logging.debug(f"{arrow.now()} Inside predict_labels_n: Predicting...")
start_predict_time = time.process_time()
for trip in trip_list:
if model is None:
Expand All @@ -118,7 +118,8 @@ def predict_labels_with_n(
else:
predictions, n = model.predict(trip)
predictions_list.append((predictions, n))
print(f"{arrow.now()} Inside predict_labels_n: Predictions complete for trip_list in time = {time.process_time() - start_predict_time}")
logging.debug(f"{arrow.now()} Inside predict_labels_n: Predictions complete for trip_list in time = {time.process_time() - start_predict_time}")
logging.debug(f"{arrow.now()} No. of trips = {len(trip_list)}; No. of predictions = {len(predictions_list)}")
return predictions_list


Expand Down
Loading

0 comments on commit d9b1ea8

Please sign in to comment.