From 0434b5c0043c3154457028714b642ea84bc21f75 Mon Sep 17 00:00:00 2001 From: Juliette-Gerbaux Date: Thu, 27 Jun 2024 09:49:51 +0200 Subject: [PATCH] Mode functions in tests to src --- src/andromede/hydro_heuristic/data.py | 138 ++++++ .../hydro_heuristic/heuristic_model.py | 156 ++++++ src/andromede/hydro_heuristic/problem.py | 189 ++++++++ .../data_complex_case/hydro/reservoir.txt | 365 ++++++++++++++ .../test_hydro_heuristic_complex_case.py | 406 +--------------- ...test_hydro_heuristic_third_complex_case.py | 452 +----------------- 6 files changed, 879 insertions(+), 827 deletions(-) create mode 100644 src/andromede/hydro_heuristic/data.py create mode 100644 src/andromede/hydro_heuristic/heuristic_model.py create mode 100644 src/andromede/hydro_heuristic/problem.py create mode 100644 tests/functional/data_complex_case/hydro/reservoir.txt diff --git a/src/andromede/hydro_heuristic/data.py b/src/andromede/hydro_heuristic/data.py new file mode 100644 index 00000000..9f5c9498 --- /dev/null +++ b/src/andromede/hydro_heuristic/data.py @@ -0,0 +1,138 @@ +# Copyright (c) 2024, RTE (https://www.rte-france.com) +# +# See AUTHORS.txt +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# SPDX-License-Identifier: MPL-2.0 +# +# This file is part of the Antares project. + +from typing import List + +import numpy as np + + +def get_number_of_days_in_month(month: int) -> int: + number_day_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month] + return number_day_month + + +def get_all_data( + scenario: int, horizon: str, folder_name: str +) -> tuple[List[float], List[float], List[float], List[float], List[float]]: + if horizon == "monthly": + hours_aggregated_time_steps = [ + 24 * get_number_of_days_in_month(m) for m in range(12) + ] + elif horizon == "daily": + hours_aggregated_time_steps = [24 for d in range(365)] + + demand = get_input_data( + name_file="load", + column=scenario, + hours_aggregated_time_steps=hours_aggregated_time_steps, + hours_input=1, + operator="sum", + folder_name=folder_name, + ) + inflow = get_input_data( + name_file="mod", + column=scenario, + hours_aggregated_time_steps=hours_aggregated_time_steps, + hours_input=24, + operator="sum", + folder_name=folder_name, + ) + lowerrulecruve = get_input_data( + name_file="reservoir", + column=0, + hours_aggregated_time_steps=hours_aggregated_time_steps, + hours_input=24, + operator="lag_first_element", + folder_name=folder_name, + ) + upperrulecruve = get_input_data( + name_file="reservoir", + column=2, + hours_aggregated_time_steps=hours_aggregated_time_steps, + hours_input=24, + operator="lag_first_element", + folder_name=folder_name, + ) + max_generating = get_input_data( + name_file="maxpower", + column=0, + hours_aggregated_time_steps=hours_aggregated_time_steps, + hours_input=24, + operator="sum", + folder_name=folder_name, + ) + max_generating = [x * 24 for x in max_generating] + + return ( + demand, + inflow, + max_generating, + lowerrulecruve, + upperrulecruve, + ) + + +def get_input_data( + name_file: str, + column: int, + hours_aggregated_time_steps: List[int], + hours_input: int, + operator: str, + folder_name: str, +) -> List[float]: + data = np.loadtxt( + "tests/functional/" + folder_name + "/hydro/" + name_file + ".txt" + ) + data = data[:, column] + aggregated_data: List[float] = [] + hour = 0 + for hours_time_step in hours_aggregated_time_steps: + assert hours_time_step % hours_input == 0 + if operator == "sum": + aggregated_data.append( + np.sum(data[hour : hour + hours_time_step // hours_input]) + ) + elif operator == "lag_first_element": + aggregated_data.append( + data[(hour + hours_time_step // hours_input) % len(data)] + ) + hour += hours_time_step // hours_input + return aggregated_data + + +def calculate_weekly_target(all_daily_generation: list[float]) -> list[float]: + weekly_target = np.zeros(52) + week = 0 + day_in_week = 0 + day_in_year = 0 + + while week < 52: + weekly_target[week] += all_daily_generation[day_in_year] + day_in_year += 1 + day_in_week += 1 + if day_in_week >= 7: + week += 1 + day_in_week = 0 + + return list(weekly_target) + + +def get_target( + demand: List[float], total_target: float, inter_breakdown: int = 1 +) -> List[float]: + target = ( + total_target + * np.power(demand, inter_breakdown) + / sum(np.power(demand, inter_breakdown)) + ) + + return list(target) diff --git a/src/andromede/hydro_heuristic/heuristic_model.py b/src/andromede/hydro_heuristic/heuristic_model.py new file mode 100644 index 00000000..f94e37d4 --- /dev/null +++ b/src/andromede/hydro_heuristic/heuristic_model.py @@ -0,0 +1,156 @@ +# Copyright (c) 2024, RTE (https://www.rte-france.com) +# +# See AUTHORS.txt +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# SPDX-License-Identifier: MPL-2.0 +# +# This file is part of the Antares project. + +from typing import List + +from andromede.expression import literal, param, var, ExpressionNode +from andromede.expression.indexing_structure import IndexingStructure +from andromede.model import Model, float_parameter, float_variable, model +from andromede.model.constraint import Constraint +from andromede.model.parameter import float_parameter +from andromede.model.variable import float_variable +from andromede.model.constraint import Constraint +from andromede.model.parameter import Parameter +from andromede.model.variable import Variable + +CONSTANT = IndexingStructure(False, False) +TIME_AND_SCENARIO_FREE = IndexingStructure(True, True) +ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, True) +NON_ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, False) +CONSTANT_PER_SCENARIO = IndexingStructure(False, True) + + +def get_heuristic_hydro_model( + hydro_model: Model, + horizon: str, +) -> Model: + + HYDRO_HEURISTIC = model( + id="H", + parameters=[p for p in hydro_model.parameters.values()] + + get_heuristic_parameters(), + variables=[v for v in hydro_model.variables.values()] + + get_heuristic_variables(), + constraints=[c for c in hydro_model.constraints.values()] + + get_heuristic_constraints(horizon), + objective_operational_contribution=get_heuristic_objective(), + ) + return HYDRO_HEURISTIC + + +def get_heuristic_objective() -> ExpressionNode: + return ( + param("gamma_d") * var("distance_between_target_and_generating") + + param("gamma_v+") * var("violation_upper_rule_curve") + + param("gamma_v-") * var("violation_lower_rule_curve") + + param("gamma_o") * var("overflow") + + param("gamma_s") * var("level") + ).sum().expec() + ( + param("gamma_delta") * var("max_distance_between_target_and_generating") + + param("gamma_y") * var("max_violation_lower_rule_curve") + + param("gamma_w") * var("gap_to_target") + ).expec() + + +def get_heuristic_variables() -> List[Variable]: + return [ + float_variable( + "distance_between_target_and_generating", + lower_bound=literal(0), + structure=TIME_AND_SCENARIO_FREE, + ), + float_variable( + "max_distance_between_target_and_generating", + lower_bound=literal(0), + structure=CONSTANT_PER_SCENARIO, + ), + float_variable( + "violation_lower_rule_curve", + lower_bound=literal(0), + structure=TIME_AND_SCENARIO_FREE, + ), + float_variable( + "violation_upper_rule_curve", + lower_bound=literal(0), + structure=TIME_AND_SCENARIO_FREE, + ), + float_variable( + "max_violation_lower_rule_curve", + lower_bound=literal(0), + structure=CONSTANT_PER_SCENARIO, + ), + float_variable( + "gap_to_target", + lower_bound=literal(0), + structure=CONSTANT_PER_SCENARIO, + ), + ] + + +def get_heuristic_parameters() -> List[Parameter]: + return [ + float_parameter("generating_target", TIME_AND_SCENARIO_FREE), + float_parameter("overall_target", CONSTANT_PER_SCENARIO), + float_parameter("gamma_d", CONSTANT), + float_parameter("gamma_delta", CONSTANT), + float_parameter("gamma_y", CONSTANT), + float_parameter("gamma_w", CONSTANT), + float_parameter("gamma_v+", CONSTANT), + float_parameter("gamma_v-", CONSTANT), + float_parameter("gamma_o", CONSTANT), + float_parameter("gamma_s", CONSTANT), + ] + + +def get_heuristic_constraints(horizon: str) -> List[Constraint]: + list_constraint = [ + Constraint( + "Respect generating target", + var("generating").sum() + var("gap_to_target") == param("overall_target"), + ), + Constraint( + "Definition of distance between target and generating", + var("distance_between_target_and_generating") + >= param("generating_target") - var("generating"), + ), + Constraint( + "Definition of distance between generating and target", + var("distance_between_target_and_generating") + >= var("generating") - param("generating_target"), + ), + Constraint( + "Definition of max distance between generating and target", + var("max_distance_between_target_and_generating") + >= var("distance_between_target_and_generating"), + ), + Constraint( + "Definition of violation of lower rule curve", + var("violation_lower_rule_curve") + var("level") + >= param("lower_rule_curve"), + ), + Constraint( + "Definition of violation of upper rule curve", + var("violation_upper_rule_curve") - var("level") + >= -param("upper_rule_curve"), + ), + Constraint( + "Definition of max violation of lower rule curve", + var("max_violation_lower_rule_curve") >= var("violation_lower_rule_curve"), + ), + ] + if horizon == "monthly": + list_constraint.append(Constraint("No overflow", var("overflow") <= literal(0))) + list_constraint.append( + Constraint("No gap to target", var("gap_to_target") <= literal(0)) + ) + + return list_constraint diff --git a/src/andromede/hydro_heuristic/problem.py b/src/andromede/hydro_heuristic/problem.py new file mode 100644 index 00000000..5f8c98f7 --- /dev/null +++ b/src/andromede/hydro_heuristic/problem.py @@ -0,0 +1,189 @@ +# Copyright (c) 2024, RTE (https://www.rte-france.com) +# +# See AUTHORS.txt +# +# This Source Code Form is subject to the terms of the Mozilla Public +# License, v. 2.0. If a copy of the MPL was not distributed with this +# file, You can obtain one at http://mozilla.org/MPL/2.0/. +# +# SPDX-License-Identifier: MPL-2.0 +# +# This file is part of the Antares project. + +from typing import List + +import ortools.linear_solver.pywraplp as pywraplp +import pandas as pd + +from andromede.simulation import ( + BlockBorderManagement, + OutputValues, + TimeBlock, + build_problem, +) +from andromede.study import ( + ConstantData, + DataBase, + Network, + TimeIndex, + TimeScenarioSeriesData, + TimeSeriesData, + create_component, +) +from andromede.simulation.optimization import OptimizationProblem +from tests.functional.libs.lib_hydro_heuristic import HYDRO_MODEL +from andromede.hydro_heuristic.heuristic_model import get_heuristic_hydro_model + + +def create_hydro_problem( + horizon: str, + target: List[float], + inflow: List[float], + max_generating: List[float], + lower_rule_curve: List[float], + upper_rule_curve: List[float], + initial_level: float, + capacity: float, +) -> OptimizationProblem: + database = generate_database( + target=target, + inflow=inflow, + max_generating=max_generating, + lower_rule_curve=lower_rule_curve, + upper_rule_curve=upper_rule_curve, + initial_level=initial_level, + capacity=capacity, + ) + + database = add_objective_coefficients_to_database(database, horizon) + + time_block = TimeBlock(1, [i for i in range(len(target))]) + scenarios = 1 + + hydro = create_component( + model=get_heuristic_hydro_model(HYDRO_MODEL, horizon), id="H" + ) + + network = Network("test") + network.add_component(hydro) + + problem = build_problem( + network, + database, + time_block, + scenarios, + border_management=(BlockBorderManagement.CYCLE), + solver_id="XPRESS", + ) + + return problem + + +def solve_hydro_problem(problem: OptimizationProblem) -> tuple[int, list[float], float]: + parameters = pywraplp.MPSolverParameters() + parameters.SetIntegerParam(parameters.PRESOLVE, parameters.PRESOLVE_OFF) + parameters.SetIntegerParam(parameters.SCALING, 0) + problem.solver.EnableOutput() + + status = problem.solver.Solve(parameters) + + output = OutputValues(problem) + + return ( + status, + output.component("H").var("generating").value[0], # type:ignore + output.component("H").var("level").value[0][-1], # type:ignore + ) + + +def generate_database( + target: List[float], + inflow: List[float], + max_generating: List[float], + lower_rule_curve: List[float], + upper_rule_curve: List[float], + initial_level: float, + capacity: float, +) -> DataBase: + database = DataBase() + + database.add_data("H", "capacity", ConstantData(capacity)) + database.add_data("H", "initial_level", ConstantData(initial_level)) + + inflow_data = pd.DataFrame( + inflow, + index=[i for i in range(len(inflow))], + columns=[0], + ) + database.add_data("H", "inflow", TimeScenarioSeriesData(inflow_data)) + + target_data = pd.DataFrame( + target, + index=[i for i in range(len(target))], + columns=[0], + ) + database.add_data("H", "generating_target", TimeScenarioSeriesData(target_data)) + database.add_data("H", "overall_target", ConstantData(sum(target))) + + database.add_data( + "H", + "lower_rule_curve", + TimeSeriesData( + { + TimeIndex(i): lower_rule_curve[i] * capacity + for i in range(len(lower_rule_curve)) + } + ), + ) + database.add_data( + "H", + "upper_rule_curve", + TimeSeriesData( + { + TimeIndex(i): upper_rule_curve[i] * capacity + for i in range(len(lower_rule_curve)) + } + ), + ) + database.add_data("H", "min_generating", ConstantData(0)) + + database.add_data( + "H", + "max_generating", + TimeSeriesData( + {TimeIndex(i): max_generating[i] for i in range(len(max_generating))} + ), + ) + + database.add_data( + "H", + "max_epsilon", + TimeSeriesData( + { + TimeIndex(i): capacity if i == 0 else 0 + for i in range(len(max_generating)) + } + ), + ) + + return database + + +def add_objective_coefficients_to_database( + database: DataBase, horizon: str +) -> DataBase: + objective_function_cost = { + "gamma_d": 1, + "gamma_delta": 1 if horizon == "monthly" else 2, + "gamma_y": 100000 if horizon == "monthly" else 68, + "gamma_w": 0 if horizon == "monthly" else 34, + "gamma_v+": 100 if horizon == "monthly" else 0, + "gamma_v-": 100 if horizon == "monthly" else 68, + "gamma_o": 0 if horizon == "monthly" else 23 * 68 + 1, + "gamma_s": 0 if horizon == "monthly" else -1 / 32, + } + + for name, coeff in objective_function_cost.items(): + database.add_data("H", name, ConstantData(coeff)) + + return database diff --git a/tests/functional/data_complex_case/hydro/reservoir.txt b/tests/functional/data_complex_case/hydro/reservoir.txt new file mode 100644 index 00000000..aad2dc3e --- /dev/null +++ b/tests/functional/data_complex_case/hydro/reservoir.txt @@ -0,0 +1,365 @@ +0.500 0.500 0.500 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 +0 0.500 1 diff --git a/tests/functional/test_hydro_heuristic_complex_case.py b/tests/functional/test_hydro_heuristic_complex_case.py index 34b3a4ce..fd865643 100644 --- a/tests/functional/test_hydro_heuristic_complex_case.py +++ b/tests/functional/test_hydro_heuristic_complex_case.py @@ -10,171 +10,15 @@ # # This file is part of the Antares project. -from typing import Dict, List +from typing import List -import numpy as np -import ortools.linear_solver.pywraplp as pywraplp -import pandas as pd - -from andromede.expression import literal, param, var, ExpressionNode -from andromede.expression.indexing_structure import IndexingStructure -from andromede.model import Model, float_parameter, float_variable, model -from andromede.model.constraint import Constraint -from andromede.model.parameter import float_parameter -from andromede.model.variable import float_variable -from andromede.simulation import ( - BlockBorderManagement, - OutputValues, - TimeBlock, - build_problem, -) -from andromede.study import ( - ConstantData, - DataBase, - Network, - TimeIndex, - TimeScenarioSeriesData, - TimeSeriesData, - create_component, +from andromede.hydro_heuristic.data import ( + get_all_data, + get_number_of_days_in_month, + get_target, + calculate_weekly_target, ) -from andromede.model.constraint import Constraint -from andromede.model.parameter import Parameter -from andromede.model.variable import Variable -from andromede.simulation.optimization import OptimizationProblem -from tests.functional.libs.lib_hydro_heuristic import HYDRO_MODEL - -CONSTANT = IndexingStructure(False, False) -TIME_AND_SCENARIO_FREE = IndexingStructure(True, True) -ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, True) -NON_ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, False) -CONSTANT_PER_SCENARIO = IndexingStructure(False, True) - - -def get_heuristic_hydro_model( - hydro_model: Model, - horizon: str, -) -> Model: - - HYDRO_HEURISTIC = model( - id="H", - parameters=[p for p in hydro_model.parameters.values()] - + get_heuristic_parameters(), - variables=[v for v in hydro_model.variables.values()] - + get_heuristic_variables(), - constraints=[c for c in hydro_model.constraints.values()] - + get_heuristic_constraints(horizon), - objective_operational_contribution=get_heuristic_objective(), - ) - return HYDRO_HEURISTIC - - -def get_heuristic_objective() -> ExpressionNode: - return ( - param("gamma_d") * var("distance_between_target_and_generating") - + param("gamma_v+") * var("violation_upper_rule_curve") - + param("gamma_v-") * var("violation_lower_rule_curve") - + param("gamma_o") * var("overflow") - + param("gamma_s") * var("level") - ).sum().expec() + ( - param("gamma_delta") * var("max_distance_between_target_and_generating") - + param("gamma_y") * var("max_violation_lower_rule_curve") - + param("gamma_w") * var("gap_to_target") - ).expec() - - -def get_heuristic_variables() -> List[Variable]: - return [ - float_variable( - "distance_between_target_and_generating", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "max_distance_between_target_and_generating", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - float_variable( - "violation_lower_rule_curve", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "violation_upper_rule_curve", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "max_violation_lower_rule_curve", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - float_variable( - "gap_to_target", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - ] - - -def get_heuristic_parameters() -> List[Parameter]: - return [ - float_parameter("generating_target", TIME_AND_SCENARIO_FREE), - float_parameter("overall_target", CONSTANT_PER_SCENARIO), - float_parameter("gamma_d", CONSTANT), - float_parameter("gamma_delta", CONSTANT), - float_parameter("gamma_y", CONSTANT), - float_parameter("gamma_w", CONSTANT), - float_parameter("gamma_v+", CONSTANT), - float_parameter("gamma_v-", CONSTANT), - float_parameter("gamma_o", CONSTANT), - float_parameter("gamma_s", CONSTANT), - ] - - -def get_heuristic_constraints(horizon: str) -> List[Constraint]: - list_constraint = [ - Constraint( - "Respect generating target", - var("generating").sum() + var("gap_to_target") == param("overall_target"), - ), - Constraint( - "Definition of distance between target and generating", - var("distance_between_target_and_generating") - >= param("generating_target") - var("generating"), - ), - Constraint( - "Definition of distance between generating and target", - var("distance_between_target_and_generating") - >= var("generating") - param("generating_target"), - ), - Constraint( - "Definition of max distance between generating and target", - var("max_distance_between_target_and_generating") - >= var("distance_between_target_and_generating"), - ), - Constraint( - "Definition of violation of lower rule curve", - var("violation_lower_rule_curve") + var("level") - >= param("lower_rule_curve"), - ), - Constraint( - "Definition of violation of upper rule curve", - var("violation_upper_rule_curve") - var("level") - >= -param("upper_rule_curve"), - ), - Constraint( - "Definition of max violation of lower rule curve", - var("max_violation_lower_rule_curve") >= var("violation_lower_rule_curve"), - ), - ] - if horizon == "monthly": - list_constraint.append(Constraint("No overflow", var("overflow") <= literal(0))) - list_constraint.append( - Constraint("No gap to target", var("gap_to_target") <= literal(0)) - ) - - return list_constraint +from andromede.hydro_heuristic.problem import create_hydro_problem, solve_hydro_problem def test_hydro_heuristic() -> None: @@ -192,7 +36,9 @@ def test_hydro_heuristic() -> None: monthly_demand, monthly_inflow, monthly_max_generating, - ) = get_all_data(scenario, "monthly") + monthly_lowerrulecurve, + monthly_upperrulecurve, + ) = get_all_data(scenario, "monthly", folder_name="data_complex_case") monthly_target = get_target(monthly_demand, sum(monthly_inflow)) # Ajustement de la réapartition mensuelle @@ -201,7 +47,10 @@ def test_hydro_heuristic() -> None: target=monthly_target, inflow=monthly_inflow, max_generating=monthly_max_generating, + lower_rule_curve=monthly_lowerrulecurve, + upper_rule_curve=monthly_upperrulecurve, initial_level=initial_level, + capacity=capacity, ) status, monthly_generation, _ = solve_hydro_problem(problem) @@ -214,7 +63,9 @@ def test_hydro_heuristic() -> None: daily_demand, daily_inflow, daily_max_generating, - ) = get_all_data(scenario, "daily") + daily_lowerrulecurve, + daily_upperrulecurve, + ) = get_all_data(scenario, "daily", folder_name="data_complex_case") for month in range(12): number_day_month = get_number_of_days_in_month(month) @@ -233,7 +84,10 @@ def test_hydro_heuristic() -> None: max_generating=daily_max_generating[ day_in_year : day_in_year + number_day_month ], + lower_rule_curve=daily_lowerrulecurve, + upper_rule_curve=daily_upperrulecurve, initial_level=initial_level, + capacity=capacity, ) status, daily_generation, initial_level = solve_hydro_problem(problem) @@ -258,225 +112,3 @@ def test_hydro_heuristic() -> None: assert float(expected_output[week + 7].strip().split("\t")[9]) == round( weekly_target[week] ) - - -def calculate_weekly_target(all_daily_generation: list[float]) -> list[float]: - weekly_target = np.zeros(52) - week = 0 - day_in_week = 0 - day_in_year = 0 - - while week < 52: - weekly_target[week] += all_daily_generation[day_in_year] - day_in_year += 1 - day_in_week += 1 - if day_in_week >= 7: - week += 1 - day_in_week = 0 - - return list(weekly_target) - - -def get_number_of_days_in_month(month: int) -> int: - number_day_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month] - return number_day_month - - -def get_target( - demand: List[float], total_target: float, inter_breakdown: int = 1 -) -> List[float]: - target = ( - total_target - * np.power(demand, inter_breakdown) - / sum(np.power(demand, inter_breakdown)) - ) - - return list(target) - - -def get_all_data( - scenario: int, horizon: str -) -> tuple[List[float], List[float], List[float]]: - if horizon == "monthly": - hours_aggregated_time_steps = [ - 24 * get_number_of_days_in_month(m) for m in range(12) - ] - elif horizon == "daily": - hours_aggregated_time_steps = [24 for d in range(365)] - - demand = get_input_data( - name_file="load", - column=scenario, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=1, - operator="sum", - ) - inflow = get_input_data( - name_file="mod", - column=scenario, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="sum", - ) - max_generating = get_input_data( - name_file="maxpower", - column=0, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="sum", - ) - max_generating = [x * 24 for x in max_generating] - return ( - demand, - inflow, - max_generating, - ) - - -def create_hydro_problem( - horizon: str, - target: List[float], - inflow: List[float], - max_generating: List[float], - initial_level: float, -) -> OptimizationProblem: - database = generate_database( - target=target, - inflow=inflow, - max_generating=max_generating, - initial_level=initial_level, - ) - - database = add_objective_coefficients_to_database(database, horizon) - - time_block = TimeBlock(1, [i for i in range(len(target))]) - scenarios = 1 - - hydro = create_component( - model=get_heuristic_hydro_model(HYDRO_MODEL, horizon), id="H" - ) - - network = Network("test") - network.add_component(hydro) - - problem = build_problem( - network, - database, - time_block, - scenarios, - border_management=(BlockBorderManagement.CYCLE), - solver_id="XPRESS", - ) - - return problem - - -def solve_hydro_problem(problem: OptimizationProblem) -> tuple[int, list[float], float]: - parameters = pywraplp.MPSolverParameters() - parameters.SetIntegerParam(parameters.PRESOLVE, parameters.PRESOLVE_OFF) - parameters.SetIntegerParam(parameters.SCALING, 0) - problem.solver.EnableOutput() - - status = problem.solver.Solve(parameters) - - output = OutputValues(problem) - - return ( - status, - output.component("H").var("generating").value[0], # type:ignore - output.component("H").var("level").value[0][-1], # type:ignore - ) - - -def generate_database( - target: List[float], - inflow: List[float], - max_generating: List[float], - initial_level: float, -) -> DataBase: - database = DataBase() - - database.add_data("H", "capacity", ConstantData(1711510)) - database.add_data("H", "initial_level", ConstantData(initial_level)) - - inflow_data = pd.DataFrame( - inflow, - index=[i for i in range(len(inflow))], - columns=[0], - ) - database.add_data("H", "inflow", TimeScenarioSeriesData(inflow_data)) - - target_data = pd.DataFrame( - target, - index=[i for i in range(len(target))], - columns=[0], - ) - database.add_data("H", "generating_target", TimeScenarioSeriesData(target_data)) - database.add_data("H", "overall_target", ConstantData(sum(target))) - - database.add_data("H", "lower_rule_curve", ConstantData(0)) - database.add_data("H", "upper_rule_curve", ConstantData(1711510)) - database.add_data("H", "min_generating", ConstantData(0)) - - database.add_data( - "H", - "max_generating", - TimeSeriesData( - {TimeIndex(i): max_generating[i] for i in range(len(max_generating))} - ), - ) - - database.add_data( - "H", - "max_epsilon", - TimeSeriesData( - {TimeIndex(i): 1711510 if i == 0 else 0 for i in range(len(max_generating))} - ), - ) - - return database - - -def add_objective_coefficients_to_database( - database: DataBase, horizon: str -) -> DataBase: - objective_function_cost = { - "gamma_d": 1, - "gamma_delta": 1 if horizon == "monthly" else 2, - "gamma_y": 100000 if horizon == "monthly" else 68, - "gamma_w": 0 if horizon == "monthly" else 34, - "gamma_v+": 100 if horizon == "monthly" else 0, - "gamma_v-": 100 if horizon == "monthly" else 68, - "gamma_o": 0 if horizon == "monthly" else 23 * 68 + 1, - "gamma_s": 0 if horizon == "monthly" else -1 / 32, - } - - for name, coeff in objective_function_cost.items(): - database.add_data("H", name, ConstantData(coeff)) - - return database - - -def get_input_data( - name_file: str, - column: int, - hours_aggregated_time_steps: List[int], - hours_input: int, - operator: str, -) -> List[float]: - data = np.loadtxt("tests/functional/data_complex_case/hydro/" + name_file + ".txt") - data = data[:, column] - aggregated_data: List[float] = [] - hour = 0 - for hours_time_step in hours_aggregated_time_steps: - assert hours_time_step % hours_input == 0 - if operator == "sum": - aggregated_data.append( - np.sum(data[hour : hour + hours_time_step // hours_input]) - ) - elif operator == "lag_first_element": - aggregated_data.append( - data[(hour + hours_time_step // hours_input) % len(data)] - ) - hour += hours_time_step // hours_input - return aggregated_data diff --git a/tests/functional/test_hydro_heuristic_third_complex_case.py b/tests/functional/test_hydro_heuristic_third_complex_case.py index 79d8f982..3135c061 100644 --- a/tests/functional/test_hydro_heuristic_third_complex_case.py +++ b/tests/functional/test_hydro_heuristic_third_complex_case.py @@ -10,185 +10,23 @@ # # This file is part of the Antares project. -from math import ceil, floor -from typing import Dict, List, Optional +from typing import List -import numpy as np -import ortools.linear_solver.pywraplp as pywraplp -import pandas as pd import pytest -from andromede.expression import literal, param, var, ExpressionNode -from andromede.expression.expression import ExpressionRange -from andromede.expression.indexing_structure import IndexingStructure -from andromede.model import Model, ModelPort, float_parameter, float_variable, model -from andromede.model.constraint import Constraint -from andromede.model.parameter import float_parameter, int_parameter -from andromede.model.variable import float_variable, int_variable -from andromede.simulation import ( - BlockBorderManagement, - OutputValues, - TimeBlock, - build_problem, +from andromede.hydro_heuristic.data import ( + get_all_data, + get_number_of_days_in_month, + get_target, + calculate_weekly_target, ) -from andromede.simulation.optimization import OptimizationProblem -from andromede.study import ( - ConstantData, - DataBase, - Network, - Node, - PortRef, - TimeIndex, - TimeScenarioIndex, - TimeScenarioSeriesData, - TimeSeriesData, - create_component, -) -from andromede.study.data import AbstractDataStructure -from andromede.model.constraint import Constraint -from andromede.model.parameter import Parameter -from andromede.model.variable import Variable -from tests.functional.libs.lib_hydro_heuristic import HYDRO_MODEL - -CONSTANT = IndexingStructure(False, False) -TIME_AND_SCENARIO_FREE = IndexingStructure(True, True) -ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, True) -NON_ANTICIPATIVE_TIME_VARYING = IndexingStructure(True, False) -CONSTANT_PER_SCENARIO = IndexingStructure(False, True) - -capacity = 1e07 - - -def get_heuristic_hydro_model( - hydro_model: Model, - horizon: str, -) -> Model: - - HYDRO_HEURISTIC = model( - id="H", - parameters=[p for p in hydro_model.parameters.values()] - + get_heuristic_parameters(), - variables=[v for v in hydro_model.variables.values()] - + get_heuristic_variables(), - constraints=[c for c in hydro_model.constraints.values()] - + get_heuristic_constraints(horizon), - objective_operational_contribution=get_heuristic_objective(), - ) - return HYDRO_HEURISTIC - - -def get_heuristic_objective() -> ExpressionNode: - return ( - param("gamma_d") * var("distance_between_target_and_generating") - + param("gamma_v+") * var("violation_upper_rule_curve") - + param("gamma_v-") * var("violation_lower_rule_curve") - + param("gamma_o") * var("overflow") - + param("gamma_s") * var("level") - ).sum().expec() + ( - param("gamma_delta") * var("max_distance_between_target_and_generating") - + param("gamma_y") * var("max_violation_lower_rule_curve") - + param("gamma_w") * var("gap_to_target") - ).expec() - - -def get_heuristic_variables() -> List[Variable]: - return [ - float_variable( - "distance_between_target_and_generating", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "max_distance_between_target_and_generating", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - float_variable( - "violation_lower_rule_curve", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "violation_upper_rule_curve", - lower_bound=literal(0), - structure=TIME_AND_SCENARIO_FREE, - ), - float_variable( - "max_violation_lower_rule_curve", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - float_variable( - "gap_to_target", - lower_bound=literal(0), - structure=CONSTANT_PER_SCENARIO, - ), - ] - - -def get_heuristic_parameters() -> List[Parameter]: - return [ - float_parameter("generating_target", TIME_AND_SCENARIO_FREE), - float_parameter("overall_target", CONSTANT_PER_SCENARIO), - float_parameter("gamma_d", CONSTANT), - float_parameter("gamma_delta", CONSTANT), - float_parameter("gamma_y", CONSTANT), - float_parameter("gamma_w", CONSTANT), - float_parameter("gamma_v+", CONSTANT), - float_parameter("gamma_v-", CONSTANT), - float_parameter("gamma_o", CONSTANT), - float_parameter("gamma_s", CONSTANT), - ] - - -def get_heuristic_constraints(horizon: str) -> List[Constraint]: - list_constraint = [ - Constraint( - "Respect generating target", - var("generating").sum() + var("gap_to_target") == param("overall_target"), - ), - Constraint( - "Definition of distance between target and generating", - var("distance_between_target_and_generating") - >= param("generating_target") - var("generating"), - ), - Constraint( - "Definition of distance between generating and target", - var("distance_between_target_and_generating") - >= var("generating") - param("generating_target"), - ), - Constraint( - "Definition of max distance between generating and target", - var("max_distance_between_target_and_generating") - >= var("distance_between_target_and_generating"), - ), - Constraint( - "Definition of violation of lower rule curve", - var("violation_lower_rule_curve") + var("level") - >= param("lower_rule_curve"), - ), - Constraint( - "Definition of violation of upper rule curve", - var("violation_upper_rule_curve") - var("level") - >= -param("upper_rule_curve"), - ), - Constraint( - "Definition of max violation of lower rule curve", - var("max_violation_lower_rule_curve") >= var("violation_lower_rule_curve"), - ), - ] - if horizon == "monthly": - list_constraint.append(Constraint("No overflow", var("overflow") <= literal(0))) - list_constraint.append( - Constraint("No gap to target", var("gap_to_target") <= literal(0)) - ) - - return list_constraint +from andromede.hydro_heuristic.problem import create_hydro_problem, solve_hydro_problem def test_hydro_heuristic() -> None: """ """ scenarios = 1 + capacity = 1e07 for scenario in range(scenarios): initial_level = 0.445 * capacity @@ -200,7 +38,7 @@ def test_hydro_heuristic() -> None: monthly_max_generating, monthly_lowerrulecruve, monthly_upperrulecruve, - ) = get_all_data(scenario, "monthly") + ) = get_all_data(scenario, "monthly", folder_name="data_third_complex_case") monthly_target = get_target(monthly_demand, sum(monthly_inflow)) # Ajustement de la réapartition mensuelle @@ -212,6 +50,7 @@ def test_hydro_heuristic() -> None: lower_rule_curve=monthly_lowerrulecruve, upper_rule_curve=monthly_upperrulecruve, initial_level=initial_level, + capacity=capacity, ) status, monthly_generation, _ = solve_hydro_problem(problem) @@ -248,7 +87,7 @@ def test_hydro_heuristic() -> None: daily_max_generating, daily_lowerrulecruve, daily_upperrulecruve, - ) = get_all_data(scenario, "daily") + ) = get_all_data(scenario, "daily", folder_name="data_third_complex_case") for month in range(12): number_day_month = get_number_of_days_in_month(month) @@ -273,6 +112,7 @@ def test_hydro_heuristic() -> None: day_in_year : day_in_year + number_day_month ], initial_level=initial_level, + capacity=capacity, ) status, daily_generation, initial_level = solve_hydro_problem(problem) @@ -315,271 +155,3 @@ def test_hydro_heuristic() -> None: # assert float(expected_output[week + 7].strip().split("\t")[1]) - float( # expected_output[week + 7].strip().split("\t")[2] # ) == round(weekly_target[week]) - - -def calculate_weekly_target(all_daily_generation: list[float]) -> list[float]: - weekly_target = np.zeros(52) - week = 0 - day_in_week = 0 - day_in_year = 0 - - while week < 52: - weekly_target[week] += all_daily_generation[day_in_year] - day_in_year += 1 - day_in_week += 1 - if day_in_week >= 7: - week += 1 - day_in_week = 0 - - return list(weekly_target) - - -def get_number_of_days_in_month(month: int) -> int: - number_day_month = [31, 28, 31, 30, 31, 30, 31, 31, 30, 31, 30, 31][month] - return number_day_month - - -def get_target( - demand: List[float], total_target: float, inter_breakdown: int = 1 -) -> List[float]: - target = ( - total_target - * np.power(demand, inter_breakdown) - / sum(np.power(demand, inter_breakdown)) - ) - - return list(target) - - -def get_all_data( - scenario: int, horizon: str -) -> tuple[List[float], List[float], List[float], List[float], List[float]]: - if horizon == "monthly": - hours_aggregated_time_steps = [ - 24 * get_number_of_days_in_month(m) for m in range(12) - ] - elif horizon == "daily": - hours_aggregated_time_steps = [24 for d in range(365)] - - demand = get_input_data( - name_file="load", - column=scenario, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=1, - operator="sum", - ) - inflow = get_input_data( - name_file="mod", - column=scenario, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="sum", - ) - lowerrulecruve = get_input_data( - name_file="reservoir", - column=0, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="lag_first_element", - ) - upperrulecruve = get_input_data( - name_file="reservoir", - column=2, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="lag_first_element", - ) - max_generating = get_input_data( - name_file="maxpower", - column=0, - hours_aggregated_time_steps=hours_aggregated_time_steps, - hours_input=24, - operator="sum", - ) - max_generating = [x * 24 for x in max_generating] - - return ( - demand, - inflow, - max_generating, - lowerrulecruve, - upperrulecruve, - ) - - -def create_hydro_problem( - horizon: str, - target: List[float], - inflow: List[float], - max_generating: List[float], - lower_rule_curve: List[float], - upper_rule_curve: List[float], - initial_level: float, -) -> OptimizationProblem: - database = generate_database( - target=target, - inflow=inflow, - max_generating=max_generating, - lower_rule_curve=lower_rule_curve, - upper_rule_curve=upper_rule_curve, - initial_level=initial_level, - ) - - database = add_objective_coefficients_to_database(database, horizon) - - time_block = TimeBlock(1, [i for i in range(len(target))]) - scenarios = 1 - - hydro = create_component( - model=get_heuristic_hydro_model(HYDRO_MODEL, horizon), id="H" - ) - - network = Network("test") - network.add_component(hydro) - - problem = build_problem( - network, - database, - time_block, - scenarios, - border_management=(BlockBorderManagement.CYCLE), - solver_id="XPRESS", - ) - - return problem - - -def solve_hydro_problem(problem: OptimizationProblem) -> tuple[int, list[float], float]: - parameters = pywraplp.MPSolverParameters() - parameters.SetIntegerParam(parameters.PRESOLVE, parameters.PRESOLVE_OFF) - parameters.SetIntegerParam(parameters.SCALING, 0) - problem.solver.EnableOutput() - - status = problem.solver.Solve(parameters) - - output = OutputValues(problem) - - return ( - status, - output.component("H").var("generating").value[0], # type:ignore - output.component("H").var("level").value[0][-1], # type:ignore - ) - - -def generate_database( - target: List[float], - inflow: List[float], - max_generating: List[float], - lower_rule_curve: List[float], - upper_rule_curve: List[float], - initial_level: float, -) -> DataBase: - database = DataBase() - - database.add_data("H", "capacity", ConstantData(capacity)) - database.add_data("H", "initial_level", ConstantData(initial_level)) - - inflow_data = pd.DataFrame( - inflow, - index=[i for i in range(len(inflow))], - columns=[0], - ) - database.add_data("H", "inflow", TimeScenarioSeriesData(inflow_data)) - - target_data = pd.DataFrame( - target, - index=[i for i in range(len(target))], - columns=[0], - ) - database.add_data("H", "generating_target", TimeScenarioSeriesData(target_data)) - database.add_data("H", "overall_target", ConstantData(sum(target))) - - database.add_data( - "H", - "lower_rule_curve", - TimeSeriesData( - { - TimeIndex(i): lower_rule_curve[i] * capacity - for i in range(len(lower_rule_curve)) - } - ), - ) - database.add_data( - "H", - "upper_rule_curve", - TimeSeriesData( - { - TimeIndex(i): upper_rule_curve[i] * capacity - for i in range(len(lower_rule_curve)) - } - ), - ) - database.add_data("H", "min_generating", ConstantData(0)) - - database.add_data( - "H", - "max_generating", - TimeSeriesData( - {TimeIndex(i): max_generating[i] for i in range(len(max_generating))} - ), - ) - - database.add_data( - "H", - "max_epsilon", - TimeSeriesData( - { - TimeIndex(i): capacity if i == 0 else 0 - for i in range(len(max_generating)) - } - ), - ) - - return database - - -def add_objective_coefficients_to_database( - database: DataBase, horizon: str -) -> DataBase: - objective_function_cost = { - "gamma_d": 1, - "gamma_delta": 1 if horizon == "monthly" else 2, - "gamma_y": 100000 if horizon == "monthly" else 68, - "gamma_w": 0 if horizon == "monthly" else 34, - "gamma_v+": 100 if horizon == "monthly" else 0, - "gamma_v-": 100 if horizon == "monthly" else 68, - "gamma_o": 0 if horizon == "monthly" else 23 * 68 + 1, - "gamma_s": 0 if horizon == "monthly" else -1 / 32, - } - - for name, coeff in objective_function_cost.items(): - database.add_data("H", name, ConstantData(coeff)) - - return database - - -def get_input_data( - name_file: str, - column: int, - hours_aggregated_time_steps: List[int], - hours_input: int, - operator: str, -) -> List[float]: - data = np.loadtxt( - "tests/functional/data_third_complex_case/hydro/" + name_file + ".txt" - ) - data = data[:, column] - aggregated_data: List[float] = [] - hour = 0 - for hours_time_step in hours_aggregated_time_steps: - assert hours_time_step % hours_input == 0 - if operator == "sum": - aggregated_data.append( - np.sum(data[hour : hour + hours_time_step // hours_input]) - ) - elif operator == "lag_first_element": - aggregated_data.append( - data[(hour + hours_time_step // hours_input) % len(data)] - ) - hour += hours_time_step // hours_input - return aggregated_data