diff --git a/building_energy_storage_simulation/environment.py b/building_energy_storage_simulation/environment.py index 038ff9d..bd3b9e2 100644 --- a/building_energy_storage_simulation/environment.py +++ b/building_energy_storage_simulation/environment.py @@ -1,4 +1,6 @@ from typing import Tuple, Optional, Union, List +from collections.abc import MutableSequence + import gymnasium as gym import numpy as np from gymnasium.core import ActType, ObsType, RenderFrame @@ -26,13 +28,15 @@ def __init__(self, building_simulation: BuildingSimulation, max_timesteps: int = 2000, num_forecasting_steps: int = 4, - randomize_start_time_step: bool = False + randomize_start_time_step: bool = False, + randomize_forecasts_in_observation: bool = False, ): self.building_simulation = building_simulation self.max_timesteps = max_timesteps self.num_forecasting_steps = num_forecasting_steps self.randomize_start_time_step = randomize_start_time_step + self.randomize_forecasts_in_observation = randomize_forecasts_in_observation self.data_profile_length = len(self.building_simulation.solar_generation_profile) assert self.max_timesteps + self.num_forecasting_steps <= self.data_profile_length, \ @@ -118,12 +122,28 @@ def get_observation(self): solar_gen_forecast = sim.solar_generation_profile[current_index: current_index + self.num_forecasting_steps] energy_price_forecast = sim.electricity_price[current_index: current_index + self.num_forecasting_steps] + if self.randomize_forecasts_in_observation: + electric_load_forecast = self._randomize_forecast(electric_load_forecast) + solar_gen_forecast = self._randomize_forecast(solar_gen_forecast) + energy_price_forecast = self._randomize_forecast(energy_price_forecast) + return np.concatenate(([self.building_simulation.battery.state_of_charge], electric_load_forecast, solar_gen_forecast, energy_price_forecast), axis=0) + @staticmethod + def _randomize_forecast(forecast: MutableSequence, + standard_deviation_start: float = 0.2, + standard_deviation_end: float = 1.0) -> MutableSequence: + # gamma can be interpreted as the quantification of the increase of uncertainty per time step. + gamma = standard_deviation_end - standard_deviation_start + for i in range(len(forecast)): + std = standard_deviation_end - gamma ** i + forecast[i] = forecast[i] + np.random.normal(0, std) + return forecast + @staticmethod def calc_reward(electricity_consumption, electricity_price): return -1 * electricity_consumption * electricity_price diff --git a/tests/test_environment.py b/tests/test_environment.py index b1c89c1..bcb878d 100644 --- a/tests/test_environment.py +++ b/tests/test_environment.py @@ -101,3 +101,31 @@ def test_set_random_first_time_step(): env.reset() # This test is very unlikely to fail ;) assert env.building_simulation.start_index != 0 + + +@pytest.mark.parametrize( + "reset", [True, False] +) +def test_forecasts_are_randomized_in_observation(reset): + dummy_profile = np.zeros(10) + building_sim = BuildingSimulation(electricity_price=dummy_profile, + solar_generation_profile=dummy_profile, + electricity_load_profile=dummy_profile) + env = Environment(building_simulation=building_sim, + num_forecasting_steps=4, + max_timesteps=6, + randomize_forecasts_in_observation=True) + + if reset: + obs, _ = env.reset() + else: + env.reset() + obs, _, _, _, _ = env.step(0) + + load_forecast = np.array(obs[1:5]) + generation_forecast = obs[5:9] + price_forecast = obs[9:14] + assert not np.array_equal(generation_forecast, load_forecast) + assert not np.array_equal(price_forecast, load_forecast) + assert not np.array_equal(price_forecast, dummy_profile) +