From 03f3cc7fe987ef6b5a50943e8ab622219252a29d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Daniel=20R=C3=BCdt?= <117752024+daniel-rdt@users.noreply.github.com> Date: Fri, 18 Oct 2024 12:51:41 +0200 Subject: [PATCH] Refactoring of linopy solvers to object oriented architecture (#349) * Refactor solvers.py to object-oriented implementation of Solvers which also changes the call of solvers in model.py * Adjustments to execution via direct API for highs and gurobi solvers * add unit tests for solver classes and solving from lp file and direct * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * fix of path_to_str type of Path * fix of path_to_str type of Path back to before * read sense and io_api from problem file * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * solve typing issues and move util methods to functions in solvers.py * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * Changed architecture of direct execution and execution from problem file. Also renaming of Solver class solve method to 'solve_problem()'. * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * get rid of unused type: ignore comment in solvers.py * refactor #349 * fix bug in maybe_adjust_objective_function and adjust docstrings * [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci * pin coptpy version away from 7.2.1 due to bug * add pytest for not direct solvers NotImplementedError --------- Co-authored-by: daniel.rdt Co-authored-by: pre-commit-ci[bot] <66853113+pre-commit-ci[bot]@users.noreply.github.com> Co-authored-by: lkstrp --- linopy/constants.py | 2 +- linopy/model.py | 46 +- linopy/solvers.py | 2735 ++++++++++++++++++++++++------------- pyproject.toml | 2 +- test/test_optimization.py | 63 +- 5 files changed, 1852 insertions(+), 996 deletions(-) diff --git a/linopy/constants.py b/linopy/constants.py index 9b4c69ea..5941d014 100644 --- a/linopy/constants.py +++ b/linopy/constants.py @@ -208,7 +208,7 @@ class Result: status: Status solution: Union[Solution, None] = None - solver_model: Union[Any, None] = None + solver_model: Any = None def __repr__(self) -> str: solver_model_string = ( diff --git a/linopy/model.py b/linopy/model.py index 3eea3028..728c61bb 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -49,7 +49,7 @@ ) from linopy.matrices import MatrixAccessor from linopy.objective import Objective -from linopy.solvers import available_solvers, quadratic_solvers +from linopy.solvers import IO_APIS, available_solvers, quadratic_solvers from linopy.types import ( ConstantLike, ConstraintLike, @@ -79,6 +79,9 @@ class Model: the optimization process. """ + solver_model: Any + solver_name: str + __slots__ = ( # containers "_variables", @@ -1015,6 +1018,12 @@ def solve( # clear cached matrix properties potentially present from previous solve commands self.matrices.clean_cached_properties() + # check io_api + if io_api is not None and io_api not in IO_APIS: + raise ValueError( + f"Keyword argument `io_api` has to be one of {IO_APIS} or None" + ) + if remote: solved = remote.solve_on_remote( self, @@ -1075,19 +1084,32 @@ def solve( ) try: - func = getattr(solvers, f"run_{solver_name}") - result = func( - self, - io_api=io_api, - problem_fn=to_path(problem_fn), - solution_fn=to_path(solution_fn), - log_fn=to_path(log_fn), - warmstart_fn=to_path(warmstart_fn), - basis_fn=to_path(basis_fn), - keep_files=keep_files, - env=env, + solver_class = getattr(solvers, f"{solvers.SolverName(solver_name).name}") + # initialize the solver as object of solver subclass + solver = solver_class( **solver_options, ) + if io_api == "direct": + # no problem file written and direct model is set for solver + result = solver.solve_problem_from_model( + model=self, + solution_fn=to_path(solution_fn), + log_fn=to_path(log_fn), + warmstart_fn=to_path(warmstart_fn), + basis_fn=to_path(basis_fn), + env=env, + ) + else: + problem_fn = self.to_file(to_path(problem_fn), io_api) + result = solver.solve_problem_from_file( + problem_fn=to_path(problem_fn), + solution_fn=to_path(solution_fn), + log_fn=to_path(log_fn), + warmstart_fn=to_path(warmstart_fn), + basis_fn=to_path(basis_fn), + env=env, + ) + finally: for fn in (problem_fn, solution_fn): if fn is not None and (os.path.exists(fn) and not keep_files): diff --git a/linopy/solvers.py b/linopy/solvers.py index 166fd9eb..5862695e 100644 --- a/linopy/solvers.py +++ b/linopy/solvers.py @@ -6,12 +6,14 @@ from __future__ import annotations import contextlib +import enum import io import logging import os import re import subprocess as sub import sys +from abc import ABC, abstractmethod from pathlib import Path from typing import TYPE_CHECKING, Callable @@ -129,561 +131,913 @@ ) -def safe_get_solution(status: Status, func: Callable) -> Solution: +def set_int_index(series: Series) -> Series: """ - Get solution from function call, if status is unknown still try to run it. + Convert string index to int index. """ - if status.is_ok: - return func() - elif status.status == SolverStatus.unknown: - with contextlib.suppress(Exception): - logger.warning("Solution status unknown. Trying to parse solution.") - return func() - return Solution() + series.index = series.index.str[1:].astype(int) + return series + + +# using enum to match solver subclasses with names +class SolverName(enum.Enum): + CBC = "cbc" + GLPK = "glpk" + Highs = "highs" + Cplex = "cplex" + Gurobi = "gurobi" + SCIP = "scip" + Xpress = "xpress" + Mosek = "mosek" + COPT = "copt" + MindOpt = "mindopt" + PIPS = "pips" + + +def path_to_string(path: Path) -> str: + """ + Convert a pathlib.Path to a string. + """ + return str(path.resolve()) + + +def read_sense_from_problem_file(problem_fn: Path | str): + f = open(problem_fn).read() + if read_io_api_from_problem_file(problem_fn) == "lp": + return "min" if "min" in f.lower() else "max" + elif read_io_api_from_problem_file(problem_fn) == "mps": + return "max" if "OBJSENSE\n MAX\n" in f else "min" + else: + msg = "Unsupported problem file format." + raise ValueError(msg) + + +def read_io_api_from_problem_file(problem_fn: Path | str): + if isinstance(problem_fn, Path): + return problem_fn.suffix[1:] + else: + return problem_fn.split(".")[-1] def maybe_adjust_objective_sign( - solution: Solution, sense: str, io_api: str | None -) -> None: + solution: Solution, io_api: str | None, sense: str | None +) -> Solution: if sense == "min": - return - + return solution if np.isnan(solution.objective): - return - + return solution if io_api == "mps" and not _new_highspy_mps_layout: logger.info( "Adjusting objective sign due to switched coefficients in MPS file." ) solution.objective *= -1 + return solution -def set_int_index(series: Series) -> Series: - """ - Convert string index to int index. +class Solver(ABC): """ - series.index = series.index.str[1:].astype(int) - return series - + Abstract base class for solving a given linear problem. -def path_to_string(path: Path) -> str: + All relevant functions are passed on to the specific solver subclasses. + Subclasses must implement the `solve_problem_from_model()` and + `solve_problem_from_file()` methods. """ - Convert a pathlib.Path to a string. - """ - return str(path.resolve()) + + def __init__( + self, + **solver_options, + ): + self.solver_options = solver_options + + def safe_get_solution(self, status: Status, func: Callable) -> Solution: + """ + Get solution from function call, if status is unknown still try to run it. + """ + if status.is_ok: + return func() + elif status.status == SolverStatus.unknown: + with contextlib.suppress(Exception): + logger.warning("Solution status unknown. Trying to parse solution.") + return func() + return Solution() + + @abstractmethod + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Abstract method to solve a linear problem from a model. + + Needs to be implemented in the specific solver subclass. Even if the solver + does not support solving from a model, this method should be implemented and + raise a NotImplementedError. + """ + pass + + @abstractmethod + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Abstract method to solve a linear problem from a problem file. + + Needs to be implemented in the specific solver subclass. Even if the solver + does not support solving from a file, this method should be implemented and + raise a NotImplementedError. + """ + pass + + def solve_problem( + self, + model: Model | None = None, + problem_fn: Path | None = None, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem either from a model or a problem file. + + Wraps around `self.solve_problem_from_model()` and + `self.solve_problem_from_file()` and calls the appropriate method + based on the input arguments (`model` or `problem_fn`). + """ + if problem_fn is not None and model is not None: + msg = "Both problem file and model are given. Please specify only one." + raise ValueError(msg) + elif model is not None: + return self.solve_problem_from_model( + model=model, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + env=env, + ) + elif problem_fn is not None: + return self.solve_problem_from_file( + problem_fn=problem_fn, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + env=env, + ) + else: + msg = "No problem file or model specified." + raise ValueError(msg) -def run_cbc( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: None = None, - **solver_options, -) -> Result: +class CBC(Solver): """ - Solve a linear problem using the cbc solver. + Solver subclass for the CBC solver. - The function reads the linear problem file and passes it to the cbc - solver. If the solution is successful it returns variable solutions - and constraint dual values. For more information on the solver - options, run 'cbc' in your shell + Attributes + ---------- + **solver_options + options for the given solver """ - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps' or None" - ) - # CBC does not like the OBJSENSE line in MPS files, which new highspy versions write - if io_api == "mps" and model.sense == "max" and _new_highspy_mps_layout: - raise ValueError( - "CBC does not support maximization in MPS format highspy versions >=1.7.1" - ) + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ): + msg = "Direct API not implemented for CBC" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the CBC solver. + + The function reads the linear problem file and passes it to the solver. + If the solution is successful it returns variable solutions + and constraint dual values. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path + Path to the solution file. This is necessary for solving with CBC. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + sense = read_sense_from_problem_file(problem_fn) + io_api = read_io_api_from_problem_file(problem_fn) + + if solution_fn is None: + msg = "No solution file specified. For solving with CBC this is necessary." + raise ValueError(msg) + + if io_api == "mps" and sense == "max" and _new_highspy_mps_layout: + msg = ( + "CBC does not support maximization in MPS format highspy versions " + " >=1.7.1" + ) + raise ValueError(msg) - problem_fn = model.to_file(problem_fn, io_api) + # printingOptions is about what goes in solution file + command = f"cbc -printingOptions all -import {problem_fn} " - # printingOptions is about what goes in solution file - command = f"cbc -printingOptions all -import {problem_fn} " + if warmstart_fn: + command += f"-basisI {warmstart_fn} " - if warmstart_fn: - command += f"-basisI {warmstart_fn} " + if self.solver_options: + command += ( + " ".join( + "-" + " ".join([k, str(v)]) for k, v in self.solver_options.items() + ) + + " " + ) + command += f"-solve -solu {solution_fn} " - if solver_options: - command += ( - " ".join("-" + " ".join([k, str(v)]) for k, v in solver_options.items()) - + " " - ) - command += f"-solve -solu {solution_fn} " + if basis_fn: + command += f"-basisO {basis_fn} " - if basis_fn: - command += f"-basisO {basis_fn} " + Path(solution_fn).parent.mkdir(exist_ok=True) - if solution_fn is None: - raise ValueError("No solution file specified") + command = command.strip() - Path(solution_fn).parent.mkdir(exist_ok=True) + if log_fn is None: + p = sub.Popen(command.split(" "), stdout=sub.PIPE, stderr=sub.PIPE) - command = command.strip() + if p.stdout is None: + msg = ( + f"Command `{command}` did not run successfully. Check if cbc is " + " installed and in PATH." + ) + raise ValueError(msg) + + output = "" + for line in iter(p.stdout.readline, b""): + output += line.decode() + logger.info(output) + p.stdout.close() + p.wait() + else: + log_f = open(log_fn, "w") + p = sub.Popen(command.split(" "), stdout=log_f, stderr=log_f) + p.wait() - if log_fn is None: - p = sub.Popen(command.split(" "), stdout=sub.PIPE, stderr=sub.PIPE) + with open(solution_fn) as f: + data = f.readline() - if p.stdout is None: - raise ValueError( - f"Command `{command}` did not run successfully. Check if cbc is installed and in PATH." + if data.startswith("Optimal - objective value"): + status = Status.from_termination_condition("optimal") + elif "Infeasible" in data: + status = Status.from_termination_condition("infeasible") + else: + status = Status(SolverStatus.warning, TerminationCondition.unknown) + status.legacy_status = data + + def get_solver_solution(): + objective = float(data[len("Optimal - objective value ") :]) + + with open(solution_fn, "rb") as f: + trimmed_sol_fn = re.sub(rb"\*\*\s+", b"", f.read()) + + df = pd.read_csv( + io.BytesIO(trimmed_sol_fn), + header=None, + skiprows=[0], + sep=r"\s+", + usecols=[1, 2, 3], + index_col=0, ) + variables_b = df.index.str[0] == "x" - output = "" - for line in iter(p.stdout.readline, b""): - output += line.decode() - logger.info(output) - p.stdout.close() - p.wait() - else: - log_f = open(log_fn, "w") - p = sub.Popen(command.split(" "), stdout=log_f, stderr=log_f) - p.wait() - - with open(solution_fn) as f: - data = f.readline() + sol = df[variables_b][2].pipe(set_int_index) + dual = df[~variables_b][3].pipe(set_int_index) + return Solution(sol, dual, objective) - if data.startswith("Optimal - objective value"): - status = Status.from_termination_condition("optimal") - elif "Infeasible" in data: - status = Status.from_termination_condition("infeasible") - else: - status = Status(SolverStatus.warning, TerminationCondition.unknown) - status.legacy_status = data - - def get_solver_solution(): - objective = float(data[len("Optimal - objective value ") :]) - - with open(solution_fn, "rb") as f: - trimmed_sol_fn = re.sub(rb"\*\*\s+", b"", f.read()) - - df = pd.read_csv( - io.BytesIO(trimmed_sol_fn), - header=None, - skiprows=[0], - sep=r"\s+", - usecols=[1, 2, 3], - index_col=0, - ) - variables_b = df.index.str[0] == "x" - - sol = df[variables_b][2].pipe(set_int_index) - dual = df[~variables_b][3].pipe(set_int_index) - return Solution(sol, dual, objective) - - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) - - return Result(status, solution) - - -def run_glpk( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: None = None, - **solver_options, -) -> Result: - """ - Solve a linear problem using the glpk solver. + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) - This function reads the linear problem file and passes it to the - glpk - solver. If the solution is successful it returns variable solutions - and - constraint dual values. + return Result(status, solution) - For more information on the glpk solver options, see - https://kam.mff.cuni.cz/~elias/glpk.pdf +class GLPK(Solver): """ - CONDITION_MAP = { - "integer optimal": "optimal", - "undefined": "infeasible_or_unbounded", - } - - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps` or None" - ) + Solver subclass for the GLPK solver. - # GLPK does not like the OBJSENSE line in MPS files, which new highspy versions write - if io_api == "mps" and model.sense == "max" and _new_highspy_mps_layout: - raise ValueError( - "GLPK does not support maximization in MPS format highspy versions >=1.7.1" - ) + Attributes + ---------- + **solver_options + options for the given solver + """ - problem_fn = model.to_file(problem_fn, io_api) - suffix = problem_fn.suffix[1:] - - if solution_fn is None: - raise ValueError("No solution file specified") - - Path(solution_fn).parent.mkdir(exist_ok=True) - - # TODO use --nopresol argument for non-optimal solution output - command = f"glpsol --{suffix} {problem_fn} --output {solution_fn} " - if log_fn is not None: - command += f"--log {log_fn} " - if warmstart_fn: - command += f"--ini {warmstart_fn} " - if basis_fn: - command += f"-w {basis_fn} " - if solver_options: - command += ( - " ".join("--" + " ".join([k, str(v)]) for k, v in solver_options.items()) - + " " - ) - command = command.strip() + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for GLPK" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the glpk solver. + + This function reads the linear problem file and passes it to the + glpk solver. If the solution is successful it returns variable solutions + and constraint dual values. + + For more information on the glpk solver options, see + + https://kam.mff.cuni.cz/~elias/glpk.pdf + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path + Path to the solution file. This is necessary for solving with GLPK. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + CONDITION_MAP = { + "integer optimal": "optimal", + "undefined": "infeasible_or_unbounded", + } + sense = read_sense_from_problem_file(problem_fn) + io_api = read_io_api_from_problem_file(problem_fn) + if solution_fn is None: + msg = "No solution file specified. For solving with GLPK this is necessary." + raise ValueError(msg) + + if io_api == "mps" and sense == "max" and _new_highspy_mps_layout: + msg = ( + "GLPK does not support maximization in MPS format highspy versions " + " >=1.7.1" + ) + raise ValueError(msg) - p = sub.Popen(command.split(" "), stdout=sub.PIPE, stderr=sub.PIPE) - if log_fn is None: - output = "" + Path(solution_fn).parent.mkdir(exist_ok=True) - if p.stdout is None: - raise ValueError( - f"Command `{command}` did not run successfully. Check if glpsol is installed and in PATH." + # TODO use --nopresol argument for non-optimal solution output + command = f"glpsol --{io_api} {problem_fn} --output {solution_fn} " + if log_fn is not None: + command += f"--log {log_fn} " + if warmstart_fn: + command += f"--ini {warmstart_fn} " + if basis_fn: + command += f"-w {basis_fn} " + if self.solver_options: + command += ( + " ".join( + "--" + " ".join([k, str(v)]) for k, v in self.solver_options.items() + ) + + " " ) + command = command.strip() - for line in iter(p.stdout.readline, b""): - output += line.decode() - logger.info(output) - p.stdout.close() - p.wait() - else: - p.wait() - - if not os.path.exists(solution_fn): - status = Status(SolverStatus.warning, TerminationCondition.unknown) - return Result(status, Solution()) - - f = open(solution_fn) - - def read_until_break(f): - while True: - line = f.readline() - if line in ["\n", ""]: - break - yield line - - info_io = io.StringIO("".join(read_until_break(f))[:-2]) - info = pd.read_csv(info_io, sep=":", index_col=0, header=None)[1] - condition = info.Status.lower().strip() - objective = float(re.sub(r"[^0-9\.\+\-e]+", "", info.Objective)) - - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition - - def get_solver_solution() -> Solution: - dual_io = io.StringIO("".join(read_until_break(f))[:-2]) - dual_ = pd.read_fwf(dual_io)[1:].set_index("Row name") - if "Marginal" in dual_: - dual = ( - pd.to_numeric(dual_["Marginal"], "coerce").fillna(0).pipe(set_int_index) - ) - else: - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) - - sol_io = io.StringIO("".join(read_until_break(f))[:-2]) - sol = ( - pd.read_fwf(sol_io)[1:] - .set_index("Column name")["Activity"] - .astype(float) - .pipe(set_int_index) - ) - f.close() - return Solution(sol, dual, objective) - - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) - return Result(status, solution) - - -def run_highs( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: None = None, - **solver_options, -) -> Result: - """ - Highs solver function. Reads a linear problem file and passes it to the - highs solver. If the solution is feasible the function returns the - objective, solution and dual constraint variables. Highs must be installed - for usage. Find the documentation at https://www.maths.ed.ac.uk/hall/HiGHS/ + p = sub.Popen(command.split(" "), stdout=sub.PIPE, stderr=sub.PIPE) + if log_fn is None: + output = "" - . The full list of solver options is documented at - https://www.maths.ed.ac.uk/hall/HiGHS/HighsOptions.set . + if p.stdout is None: + msg = ( + f"Command `{command}` did not run successfully. Check if glpsol is " + "installed and in PATH." + ) + raise ValueError(msg) - Some exemplary options are: + for line in iter(p.stdout.readline, b""): + output += line.decode() + logger.info(output) + p.stdout.close() + p.wait() + else: + p.wait() - * presolve : "choose" by default - "on"/"off" are alternatives. - * solver :"choose" by default - "simplex"/"ipm"/"pdlp" are alternatives. Only "choose" solves MIP / QP! - * parallel : "choose" by default - "on"/"off" are alternatives. - * time_limit : inf by default. - - Returns - ------- - status : string, - SolverStatus.ok or SolverStatus.warning - termination_condition : string, - Contains "optimal", "infeasible", - variables_sol : series - constraints_dual : series - objective : float - """ - CONDITION_MAP: dict[str, str] = {} - - if solver_options.get("solver") in ["simplex", "ipm", "pdlp"] and model.type in [ - "QP", - "MILP", - ]: - logger.warning( - "The HiGHS solver ignores quadratic terms / integrality if the solver is set to 'simplex', 'ipm' or 'pdlp'. " - "Drop the solver option or use 'choose' to enable quadratic terms / integrality." - ) + if not os.path.exists(solution_fn): + status = Status(SolverStatus.warning, TerminationCondition.unknown) + return Result(status, Solution()) - if io_api is None or io_api in FILE_IO_APIS: - problem_fn = model.to_file(problem_fn, io_api) - h = highspy.Highs() - h.readModel(path_to_string(problem_fn)) - elif io_api == "direct": - h = model.to_highspy() - else: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps`, `direct` or None" - ) + f = open(solution_fn) - if log_fn is None: - log_fn = model.solver_dir / "highs.log" - solver_options["log_file"] = path_to_string(log_fn) - logger.info(f"Log file at {solver_options['log_file']}") + def read_until_break(f): + while True: + line = f.readline() + if line in ["\n", ""]: + break + yield line - for k, v in solver_options.items(): - h.setOptionValue(k, v) + info_io = io.StringIO("".join(read_until_break(f))[:-2]) + info = pd.read_csv(info_io, sep=":", index_col=0, header=None)[1] + condition = info.Status.lower().strip() + objective = float(re.sub(r"[^0-9\.\+\-e]+", "", info.Objective)) - if warmstart_fn is not None and warmstart_fn.suffix == ".sol": - h.readSolution(path_to_string(warmstart_fn), 0) - elif warmstart_fn: - h.readBasis(path_to_string(warmstart_fn)) + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - h.run() + def get_solver_solution() -> Solution: + dual_io = io.StringIO("".join(read_until_break(f))[:-2]) + dual_ = pd.read_fwf(dual_io)[1:].set_index("Row name") + if "Marginal" in dual_: + dual = ( + pd.to_numeric(dual_["Marginal"], "coerce") + .fillna(0) + .pipe(set_int_index) + ) + else: + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) - condition = h.modelStatusToString(h.getModelStatus()).lower() - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + sol_io = io.StringIO("".join(read_until_break(f))[:-2]) + sol = ( + pd.read_fwf(sol_io)[1:] + .set_index("Column name")["Activity"] + .astype(float) + .pipe(set_int_index) + ) + f.close() + return Solution(sol, dual, objective) - if basis_fn: - h.writeBasis(path_to_string(basis_fn)) + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) + return Result(status, solution) - if solution_fn: - h.writeSolution(path_to_string(solution_fn), 0) - def get_solver_solution() -> Solution: - objective = h.getObjectiveValue() - solution = h.getSolution() +class Highs(Solver): + """ + Solver subclass for the Highs solver. Highs must be installed + for usage. Find the documentation at https://www.maths.ed.ac.uk/hall/HiGHS/. - if io_api == "direct": - sol = pd.Series(solution.col_value, model.matrices.vlabels, dtype=float) - dual = pd.Series(solution.row_dual, model.matrices.clabels, dtype=float) - else: - sol = pd.Series(solution.col_value, h.getLp().col_names_, dtype=float).pipe( - set_int_index - ) - dual = pd.Series(solution.row_dual, h.getLp().row_names_, dtype=float).pipe( - set_int_index - ) + The full list of solver options is documented at https://www.maths.ed.ac.uk/hall/HiGHS/HighsOptions.set. - return Solution(sol, dual, objective) + Some exemplary options are: - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) + * presolve : "choose" by default - "on"/"off" are alternatives. + * solver :"choose" by default - "simplex"/"ipm"/"pdlp" are alternatives. Only "choose" solves MIP / QP! + * parallel : "choose" by default - "on"/"off" are alternatives. + * time_limit : inf by default. - return Result(status, solution, h) + Attributes + ---------- + **solver_options + options for the given solver + """ + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem directly from a linopy model using the Highs solver. + Reads a linear problem file and passes it to the highs solver. + If the solution is feasible the function returns the + objective, solution and dual constraint variables. + + Parameters + ---------- + model : linopy.model + Linopy model for the problem. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + # check for Highs solver compatibility + if self.solver_options.get("solver") in [ + "simplex", + "ipm", + "pdlp", + ] and model.type in [ + "QP", + "MILP", + ]: + logger.warning( + "The HiGHS solver ignores quadratic terms / integrality if the solver is set to 'simplex', 'ipm' or 'pdlp'. " + "Drop the solver option or use 'choose' to enable quadratic terms / integrality." + ) -def run_cplex( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: None = None, - **solver_options, -) -> Result: - """ - Solve a linear problem using the cplex solver. + h = model.to_highspy() - This function reads the linear problem file and passes it to the cplex - solver. If the solution is successful it returns variable solutions and - constraint dual values. Cplex must be installed for using this function. + if log_fn is None and model is not None: + log_fn = model.solver_dir / "highs.log" + + return self._solve( + h, + solution_fn, + log_fn, + warmstart_fn, + basis_fn, + model=model, + io_api="direct", + sense=model.sense, + ) - Note if you pass additional solver_options, the key can specify deeper - layered parameters, use a dot as a separator here, - i.e. `**{'aa.bb.cc' : x}`. - """ - CONDITION_MAP = { - "integer optimal solution": "optimal", - "integer optimal, tolerance": "optimal", - } - - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps` or None" + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the Highs solver. + Reads a linear problem file and passes it to the highs solver. + If the solution is feasible the function returns the + objective, solution and dual constraint variables. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + + problem_fn_ = path_to_string(problem_fn) + h = highspy.Highs() + h.readModel(problem_fn_) + + return self._solve( + h, + solution_fn, + log_fn, + warmstart_fn, + basis_fn, + io_api=read_io_api_from_problem_file(problem_fn), + sense=read_sense_from_problem_file(problem_fn), ) - problem_fn = model.to_file(problem_fn, io_api) - m = cplex.Cplex() + def _solve( + self, + h: highspy.Highs, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + model: Model | None = None, + io_api: str | None = None, + sense: str | None = None, + ) -> Result: + """ + Solve a linear problem from a Highs object. + + + Parameters + ---------- + h : highspy.Highs + Highs object. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + model : linopy.model, optional + Linopy model for the problem. + io_api: str + io_api of the problem. For direct API from linopy model this is "direct". + sense: str + "min" or "max" + + Returns + ------- + Result + """ + CONDITION_MAP: dict[str, str] = {} - if log_fn is not None: - log_f = open(path_to_string(log_fn), "w") - m.set_results_stream(log_f) - m.set_warning_stream(log_f) - m.set_error_stream(log_f) - m.set_log_stream(log_f) + if log_fn is not None: + self.solver_options["log_file"] = path_to_string(log_fn) + logger.info(f"Log file at {self.solver_options['log_file']}") - if solver_options is not None: - for key, value in solver_options.items(): - param = m.parameters - for key_layer in key.split("."): - param = getattr(param, key_layer) - param.set(value) + for k, v in self.solver_options.items(): + h.setOptionValue(k, v) - m.read(path_to_string(problem_fn)) + if warmstart_fn is not None and warmstart_fn.suffix == ".sol": + h.readSolution(path_to_string(warmstart_fn), 0) + elif warmstart_fn: + h.readBasis(path_to_string(warmstart_fn)) - if warmstart_fn is not None: - m.start.read_basis(path_to_string(warmstart_fn)) + h.run() - is_lp = m.problem_type[m.get_problem_type()] == "LP" + condition = h.modelStatusToString(h.getModelStatus()).lower() + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - with contextlib.suppress(cplex.exceptions.errors.CplexSolverError): - m.solve() + if basis_fn: + h.writeBasis(path_to_string(basis_fn)) - if solution_fn is not None: - try: - m.solution.write(path_to_string(solution_fn)) - except cplex.exceptions.errors.CplexSolverError as err: - logger.info("Unable to save solution file. Raised error: %s", err) + if solution_fn: + h.writeSolution(path_to_string(solution_fn), 0) - condition = m.solution.get_status_string() - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + def get_solver_solution() -> Solution: + objective = h.getObjectiveValue() + solution = h.getSolution() - if log_fn is not None: - log_f.close() + if model is not None: + sol = pd.Series(solution.col_value, model.matrices.vlabels, dtype=float) + dual = pd.Series(solution.row_dual, model.matrices.clabels, dtype=float) + else: + sol = pd.Series( + solution.col_value, h.getLp().col_names_, dtype=float + ).pipe(set_int_index) + dual = pd.Series( + solution.row_dual, h.getLp().row_names_, dtype=float + ).pipe(set_int_index) - def get_solver_solution() -> Solution: - if basis_fn and is_lp: - try: - m.solution.basis.write(path_to_string(basis_fn)) - except cplex.exceptions.errors.CplexSolverError: - logger.info("No model basis stored") + return Solution(sol, dual, objective) - objective = m.solution.get_objective_value() + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) - solution = pd.Series( - m.solution.get_values(), m.variables.get_names(), dtype=float - ) - solution = set_int_index(solution) + return Result(status, solution, h) - if is_lp: - dual = pd.Series( - m.solution.get_dual_values(), - m.linear_constraints.get_names(), - dtype=float, - ) - dual = set_int_index(dual) - else: - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) - return Solution(solution, dual, objective) - - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) - - return Result(status, solution, m) - - -def run_gurobi( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: gurobipy.Env | None = None, - **solver_options, -) -> Result: + +class Gurobi(Solver): """ - Solve a linear problem using the gurobi solver. + Solver subclass for the gurobi solver. - This function communicates with gurobi using the gurubipy package. + Attributes + ---------- + **solver_options + options for the given solver """ - # see https://www.gurobi.com/documentation/10.0/refman/optimization_status_codes.html - CONDITION_MAP = { - 1: "unknown", - 2: "optimal", - 3: "infeasible", - 4: "infeasible_or_unbounded", - 5: "unbounded", - 6: "other", - 7: "iteration_limit", - 8: "terminated_by_limit", - 9: "time_limit", - 10: "optimal", - 11: "user_interrupt", - 12: "other", - 13: "suboptimal", - 14: "unknown", - 15: "terminated_by_limit", - 16: "internal_solver_error", - 17: "internal_solver_error", - } - - with contextlib.ExitStack() as stack: - if env is None: - env = stack.enter_context(gurobipy.Env()) - - if io_api is None or io_api in FILE_IO_APIS: - problem_fn = model.to_file(problem_fn, io_api=io_api) - m = gurobipy.read(path_to_string(problem_fn), env=env) - elif io_api == "direct": - problem_fn = None - m = model.to_gurobipy(env=env) - else: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps`, `direct` or None" + + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem directly from a linopy model using the Gurobi solver. + Reads a problem file and passes it to the Gurobi solver. + This function communicates with gurobi using the gurobipy package. + + Parameters + ---------- + model : linopy.model + Linopy model for the problem. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Gurobi environment for the solver + + Returns + ------- + Result + """ + with contextlib.ExitStack() as stack: + if env is None: + env_ = stack.enter_context(gurobipy.Env()) + else: + env_ = env + + m = model.to_gurobipy(env=env_) + + return self._solve( + m, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + io_api="direct", + sense=model.sense, + ) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the Gurobi solver. + Reads a problem file and passes it to the Gurobi solver. + This function communicates with gurobi using the gurobipy package. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Gurobi environment for the solver + + Returns + ------- + Result + """ + sense = read_sense_from_problem_file(problem_fn) + io_api = read_io_api_from_problem_file(problem_fn) + problem_fn_ = path_to_string(problem_fn) + + with contextlib.ExitStack() as stack: + if env is None: + env_ = stack.enter_context(gurobipy.Env()) + else: + env_ = env + + m = gurobipy.read(problem_fn_, env=env_) + + return self._solve( + m, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + io_api=io_api, + sense=sense, ) - if solver_options is not None: - for key, value in solver_options.items(): + def _solve( + self, + m, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + io_api: str | None = None, + sense: str | None = None, + ) -> Result: + """ + Solve a linear problem from a Gurobi object. + + + Parameters + ---------- + m + Gurobi object. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + io_api: str + io_api of the problem. For direct API from linopy model this is "direct". + sense: str + "min" or "max" + + Returns + ------- + Result + """ + # see https://www.gurobi.com/documentation/10.0/refman/optimization_status_codes.html + CONDITION_MAP = { + 1: "unknown", + 2: "optimal", + 3: "infeasible", + 4: "infeasible_or_unbounded", + 5: "unbounded", + 6: "other", + 7: "iteration_limit", + 8: "terminated_by_limit", + 9: "time_limit", + 10: "optimal", + 11: "user_interrupt", + 12: "other", + 13: "suboptimal", + 14: "unknown", + 15: "terminated_by_limit", + 16: "internal_solver_error", + 17: "internal_solver_error", + } + + if self.solver_options is not None: + for key, value in self.solver_options.items(): m.setParam(key, value) if log_fn is not None: m.setParam("logfile", path_to_string(log_fn)) @@ -712,7 +1066,7 @@ def run_gurobi( def get_solver_solution() -> Solution: objective = m.ObjVal - sol = pd.Series({v.VarName: v.x for v in m.getVars()}, dtype=float) # type: ignore + sol = pd.Series({v.VarName: v.x for v in m.getVars()}, dtype=float) sol = set_int_index(sol) try: @@ -726,222 +1080,437 @@ def get_solver_solution() -> Solution: return Solution(sol, dual, objective) - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = solution = maybe_adjust_objective_sign(solution, io_api, sense) - return Result(status, solution, m) + return Result(status, solution, m) -def run_scip( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: None = None, - **solver_options, -) -> Result: +class Cplex(Solver): """ - Solve a linear problem using the scip solver. + Solver subclass for the Cplex solver. - This function communicates with scip using the pyscipopt package. + Note if you pass additional solver_options, the key can specify deeper + layered parameters, use a dot as a separator here, + i.e. `**{'aa.bb.cc' : x}`. + + Attributes + ---------- + **solver_options + options for the given solver """ - CONDITION_MAP: dict[str, str] = {} - if io_api is None or io_api in FILE_IO_APIS: - problem_fn = model.to_file(problem_fn, io_api) + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for Cplex" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the cplex solver. + + This function reads the linear problem file and passes it to the cplex + solver. If the solution is successful it returns variable solutions and + constraint dual values. Cplex must be installed for using this function. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + CONDITION_MAP = { + "integer optimal solution": "optimal", + "integer optimal, tolerance": "optimal", + } + io_api = read_io_api_from_problem_file(problem_fn) + sense = read_sense_from_problem_file(problem_fn) + + m = cplex.Cplex() + + if log_fn is not None: + log_f = open(path_to_string(log_fn), "w") + m.set_results_stream(log_f) + m.set_warning_stream(log_f) + m.set_error_stream(log_f) + m.set_log_stream(log_f) + + if self.solver_options is not None: + for key, value in self.solver_options.items(): + param = m.parameters + for key_layer in key.split("."): + param = getattr(param, key_layer) + param.set(value) + + m.read(path_to_string(problem_fn)) + + if warmstart_fn is not None: + m.start.read_basis(path_to_string(warmstart_fn)) + + is_lp = m.problem_type[m.get_problem_type()] == "LP" + + with contextlib.suppress(cplex.exceptions.errors.CplexSolverError): + m.solve() + + if solution_fn is not None: + try: + m.solution.write(path_to_string(solution_fn)) + except cplex.exceptions.errors.CplexSolverError as err: + logger.info("Unable to save solution file. Raised error: %s", err) + + condition = m.solution.get_status_string() + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition + + if log_fn is not None: + log_f.close() + + def get_solver_solution() -> Solution: + if basis_fn and is_lp: + try: + m.solution.basis.write(path_to_string(basis_fn)) + except cplex.exceptions.errors.CplexSolverError: + logger.info("No model basis stored") + + objective = m.solution.get_objective_value() + + solution = pd.Series( + m.solution.get_values(), m.variables.get_names(), dtype=float + ) + solution = set_int_index(solution) + + if is_lp: + dual = pd.Series( + m.solution.get_dual_values(), + m.linear_constraints.get_names(), + dtype=float, + ) + dual = set_int_index(dual) + else: + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) + return Solution(solution, dual, objective) + + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) + + return Result(status, solution, m) + + +class SCIP(Solver): + """ + Solver subclass for the SCIP solver. + + Attributes + ---------- + **solver_options + options for the given solver + """ + + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for SCIP" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the scip solver. + + This function communicates with scip using the pyscipopt package. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + CONDITION_MAP: dict[str, str] = {} + + io_api = read_io_api_from_problem_file(problem_fn) + sense = read_sense_from_problem_file(problem_fn) + m = scip.Model() m.readProblem(path_to_string(problem_fn)) - elif io_api == "direct": - raise NotImplementedError("Direct API not implemented for SCIP") - else: - raise ValueError( - f"Keyword argument `io_api` has to be one of {IO_APIS} or None" - ) - if solver_options is not None: - emphasis = solver_options.pop("setEmphasis", None) - if emphasis is not None: - m.setEmphasis(getattr(scip.SCIP_PARAMEMPHASIS, emphasis.upper())) + if self.solver_options is not None: + emphasis = self.solver_options.pop("setEmphasis", None) + if emphasis is not None: + m.setEmphasis(getattr(scip.SCIP_PARAMEMPHASIS, emphasis.upper())) - heuristics = solver_options.pop("setHeuristics", None) - if heuristics is not None: - m.setEmphasis(getattr(scip.SCIP_PARAMSETTING, heuristics.upper())) + heuristics = self.solver_options.pop("setHeuristics", None) + if heuristics is not None: + m.setEmphasis(getattr(scip.SCIP_PARAMSETTING, heuristics.upper())) - presolve = solver_options.pop("setPresolve", None) - if presolve is not None: - m.setEmphasis(getattr(scip.SCIP_PARAMSETTING, presolve.upper())) + presolve = self.solver_options.pop("setPresolve", None) + if presolve is not None: + m.setEmphasis(getattr(scip.SCIP_PARAMSETTING, presolve.upper())) - m.setParams(solver_options) + m.setParams(self.solver_options) - if log_fn is not None: - m.setLogfile(path_to_string(log_fn)) + if log_fn is not None: + m.setLogfile(path_to_string(log_fn)) - if warmstart_fn: - logger.warning("Warmstart not implemented for SCIP") + if warmstart_fn: + logger.warning("Warmstart not implemented for SCIP") - # In order to retrieve the dual values, we need to turn off presolve - m.setPresolve(scip.SCIP_PARAMSETTING.OFF) + # In order to retrieve the dual values, we need to turn off presolve + m.setPresolve(scip.SCIP_PARAMSETTING.OFF) - m.optimize() + m.optimize() - if basis_fn: - logger.warning("Basis not implemented for SCIP") + if basis_fn: + logger.warning("Basis not implemented for SCIP") - if solution_fn: - try: - m.writeSol(m.getBestSol(), filename=path_to_string(solution_fn)) - except FileNotFoundError as err: - logger.warning("Unable to save solution file. Raised error: %s", err) + if solution_fn: + try: + m.writeSol(m.getBestSol(), filename=path_to_string(solution_fn)) + except FileNotFoundError as err: + logger.warning("Unable to save solution file. Raised error: %s", err) - condition = m.getStatus() - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + condition = m.getStatus() + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - def get_solver_solution() -> Solution: - objective = m.getObjVal() + def get_solver_solution() -> Solution: + objective = m.getObjVal() - s = m.getSols()[0] - sol = pd.Series({v.name: s[v] for v in m.getVars()}) - sol.drop(["quadobjvar", "qmatrixvar"], errors="ignore", inplace=True, axis=0) - sol = set_int_index(sol) + s = m.getSols()[0] + sol = pd.Series({v.name: s[v] for v in m.getVars()}) + sol.drop( + ["quadobjvar", "qmatrixvar"], errors="ignore", inplace=True, axis=0 + ) + sol = set_int_index(sol) - cons = m.getConss() - if len(cons) != 0: - dual = pd.Series({c.name: m.getDualSolVal(c) for c in cons}) - dual = dual[ - dual.index.str.startswith("c") & ~dual.index.str.startswith("cf") - ] - dual = set_int_index(dual) - else: - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) + cons = m.getConss() + if len(cons) != 0: + dual = pd.Series({c.name: m.getDualSolVal(c) for c in cons}) + dual = dual[ + dual.index.str.startswith("c") & ~dual.index.str.startswith("cf") + ] + dual = set_int_index(dual) + else: + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) - return Solution(sol, dual, objective) + return Solution(sol, dual, objective) - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) - return Result(status, solution, m) + return Result(status, solution, m) -def run_xpress( - model, - io_api=None, - problem_fn=None, - solution_fn=None, - log_fn=None, - warmstart_fn=None, - basis_fn=None, - keep_files=False, - env=None, - **solver_options, -): +class Xpress(Solver): """ - Solve a linear problem using the xpress solver. - - This function reads the linear problem file and passes it to - the Xpress solver. If the solution is successful it returns - variable solutions and constraint dual values. The xpress module - must be installed for using this function. + Solver subclass for the xpress solver. For more information on solver options, see https://www.fico.com/fico-xpress-optimization/docs/latest/solver/GUID-ACD7E60C-7852-36B7-A78A-CED0EA291CDD.html - """ - CONDITION_MAP = { - "lp_optimal": "optimal", - "mip_optimal": "optimal", - "lp_infeasible": "infeasible", - "lp_infeas": "infeasible", - "mip_infeasible": "infeasible", - "lp_unbounded": "unbounded", - "mip_unbounded": "unbounded", - } - - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps` or None" - ) - problem_fn = model.to_file(problem_fn, io_api) - - m = xpress.problem() + Attributes + ---------- + **solver_options + options for the given solver + """ - m.read(path_to_string(problem_fn)) - m.setControl(solver_options) + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for Xpress" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the Xpress solver. + + This function reads the linear problem file and passes it to + the Xpress solver. If the solution is successful it returns + variable solutions and constraint dual values. The `xpress` module + must be installed for using this function. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Environment for the solver + + Returns + ------- + Result + """ + CONDITION_MAP = { + "lp_optimal": "optimal", + "mip_optimal": "optimal", + "lp_infeasible": "infeasible", + "lp_infeas": "infeasible", + "mip_infeasible": "infeasible", + "lp_unbounded": "unbounded", + "mip_unbounded": "unbounded", + } + + io_api = read_io_api_from_problem_file(problem_fn) + sense = read_sense_from_problem_file(problem_fn) + + m = xpress.problem() + + m.read(path_to_string(problem_fn)) + m.setControl(self.solver_options) - if log_fn is not None: - m.setlogfile(path_to_string(log_fn)) + if log_fn is not None: + m.setlogfile(path_to_string(log_fn)) - if warmstart_fn is not None: - m.readbasis(path_to_string(warmstart_fn)) + if warmstart_fn is not None: + m.readbasis(path_to_string(warmstart_fn)) - m.solve() + m.solve() - if basis_fn is not None: - try: - m.writebasis(path_to_string(basis_fn)) - except Exception as err: - logger.info("No model basis stored. Raised error: %s", err) + if basis_fn is not None: + try: + m.writebasis(path_to_string(basis_fn)) + except Exception as err: + logger.info("No model basis stored. Raised error: %s", err) - if solution_fn is not None: - try: - m.tofile(path_to_string(solution_fn), filetype="sol") - except Exception as err: - logger.info("Unable to save solution file. Raised error: %s", err) + if solution_fn is not None: + try: + # TODO: possibly update saving of solution file + m.tofile(path_to_string(solution_fn), filetype="sol") + except Exception as err: + logger.info("Unable to save solution file. Raised error: %s", err) - condition = m.getProbStatusString() - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + condition = m.getProbStatusString() + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - def get_solver_solution() -> Solution: - objective = m.getObjVal() + def get_solver_solution() -> Solution: + objective = m.getObjVal() - var = [str(v) for v in m.getVariable()] + var = [str(v) for v in m.getVariable()] - sol = pd.Series(m.getSolution(var), index=var, dtype=float) - sol = set_int_index(sol) + sol = pd.Series(m.getSolution(var), index=var, dtype=float) + sol = set_int_index(sol) - try: - dual_ = [str(d) for d in m.getConstraint()] - dual = pd.Series(m.getDual(dual_), index=dual_, dtype=float) - dual = set_int_index(dual) - except (xpress.SolverError, SystemError): - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) + try: + dual_ = [str(d) for d in m.getConstraint()] + dual = pd.Series(m.getDual(dual_), index=dual_, dtype=float) + dual = set_int_index(dual) + except (xpress.SolverError, SystemError): + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) - return Solution(sol, dual, objective) + return Solution(sol, dual, objective) - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) - return Result(status, solution, m) + return Result(status, solution, m) mosek_bas_re = re.compile(r" (XL|XU)\s+([^ \t]+)\s+([^ \t]+)| (LL|UL|BS)\s+([^ \t]+)") -def run_mosek( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: mosek.Task | None = None, - **solver_options, -) -> Result: +class Mosek(Solver): """ - Solve a linear problem using the MOSEK solver. Both 'direct' mode, mps and - lp mode are supported; None is interpret as 'direct' mode. MPS mode does - not support quadratic terms. + Solver subclass for the Mosek solver. https://www.mosek.com/ @@ -953,400 +1522,614 @@ def run_mosek( set the following solver_options: {"MSK_SPAR_REMOTE_OPTSERVER_HOST": "http://solve.mosek.com:30080"} + Attributes + ---------- + **solver_options + options for the given solver """ - CONDITION_MAP = { - "solsta.unknown": "unknown", - "solsta.optimal": "optimal", - "solsta.integer_optimal": "optimal", - "solsta.prim_infeas_cer": "infeasible", - "solsta.dual_infeas_cer": "infeasible_or_unbounded", - } - - with contextlib.ExitStack() as stack: - if env is None: - env = stack.enter_context(mosek.Env()) - - with env.Task() as m: - if io_api == "direct": - model.to_mosek(m) - elif io_api is None or io_api in FILE_IO_APIS: - problem_fn = model.to_file(problem_fn, io_api) - m.readdata(path_to_string(problem_fn)) - else: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps`, `direct` or None" + + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem directly from a linopy model using the MOSEK solver. + + Parameters + ---------- + model : linopy.model + Linopy model for the problem. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Gurobi environment for the solver + + Returns + ------- + Result + """ + with contextlib.ExitStack() as stack: + if env is None: + env_ = stack.enter_context(mosek.Env()) + + with env_.Task() as m: + m = model.to_mosek(m) + + return self._solve( + m, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + io_api="direct", + sense=model.sense, ) - for k, v in solver_options.items(): - m.putparam(k, str(v)) + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the MOSEK solver. Both mps and + lp files are supported; MPS does not support quadratic terms. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + Mosek environment for the solver + + Returns + ------- + Result + """ + with contextlib.ExitStack() as stack: + if env is None: + env_ = stack.enter_context(mosek.Env()) + + with env_.Task() as m: + # read sense and io_api from problem file + sense = read_sense_from_problem_file(problem_fn) + io_api = read_io_api_from_problem_file(problem_fn) + # for Mosek solver, the path needs to be a string + problem_fn_ = path_to_string(problem_fn) + m.readdata(problem_fn_) + + return self._solve( + m, + solution_fn=solution_fn, + log_fn=log_fn, + warmstart_fn=warmstart_fn, + basis_fn=basis_fn, + io_api=io_api, + sense=sense, + ) - if log_fn is not None: - m.linkfiletostream(mosek.streamtype.log, path_to_string(log_fn), 0) - else: - m.set_Stream(mosek.streamtype.log, sys.stdout.write) - - if warmstart_fn is not None: - m.putintparam(mosek.iparam.sim_hotstart, mosek.simhotstart.status_keys) - skx = [mosek.stakey.low] * m.getnumvar() - skc = [mosek.stakey.bas] * m.getnumcon() - - with open(path_to_string(warmstart_fn)) as f: - for line in f: - if line.startswith("NAME "): - break - - for line in f: - if line.startswith("ENDATA"): - break - - o = mosek_bas_re.match(line) - if o is not None: - if o.group(1) is not None: - key = o.group(1) - try: - skx[m.getvarnameindex(o.group(2))] = ( - mosek.stakey.basis - ) - except: # noqa: E722 - pass - try: - skc[m.getvarnameindex(o.group(3))] = ( - mosek.stakey.low if key == "XL" else "XU" - ) - except: # noqa: E722 - pass - else: - key = o.group(4) - name = o.group(5) - stakey = ( - mosek.stakey.low - if key == "LL" - else ( - mosek.stakey.upr - if key == "UL" - else mosek.stakey.bas - ) + def _solve( + self, + m: mosek.Task, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + io_api: str | None = None, + sense: str | None = None, + ) -> Result: + """ + Solve a linear problem from a Mosek task object. + + Parameters + ---------- + m : mosek.Task + Mosek task object. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + io_api: str + io_api of the problem. For direct API from linopy model this is "direct". + sense: str + "min" or "max" + + Returns + ------- + Result + """ + CONDITION_MAP = { + "solsta.unknown": "unknown", + "solsta.optimal": "optimal", + "solsta.integer_optimal": "optimal", + "solsta.prim_infeas_cer": "infeasible", + "solsta.dual_infeas_cer": "infeasible_or_unbounded", + } + + for k, v in self.solver_options.items(): + m.putparam(k, str(v)) + + if log_fn is not None: + m.linkfiletostream(mosek.streamtype.log, path_to_string(log_fn), 0) + else: + m.set_Stream(mosek.streamtype.log, sys.stdout.write) + + if warmstart_fn is not None: + m.putintparam(mosek.iparam.sim_hotstart, mosek.simhotstart.status_keys) + skx = [mosek.stakey.low] * m.getnumvar() + skc = [mosek.stakey.bas] * m.getnumcon() + + with open(path_to_string(warmstart_fn)) as f: + for line in f: + if line.startswith("NAME "): + break + + for line in f: + if line.startswith("ENDATA"): + break + + o = mosek_bas_re.match(line) + if o is not None: + if o.group(1) is not None: + key = o.group(1) + try: + skx[m.getvarnameindex(o.group(2))] = mosek.stakey.basis + except: # noqa: E722 + pass + try: + skc[m.getvarnameindex(o.group(3))] = ( + mosek.stakey.low if key == "XL" else "XU" ) + except: # noqa: E722 + pass + else: + key = o.group(4) + name = o.group(5) + stakey = ( + mosek.stakey.low + if key == "LL" + else ( + mosek.stakey.upr + if key == "UL" + else mosek.stakey.bas + ) + ) + try: + skx[m.getvarnameindex(name)] = stakey + except: # noqa: E722 try: - skx[m.getvarnameindex(name)] = stakey + skc[m.getvarnameindex(name)] = stakey except: # noqa: E722 - try: - skc[m.getvarnameindex(name)] = stakey - except: # noqa: E722 - pass - m.putskc(mosek.soltype.bas, skc) - m.putskx(mosek.soltype.bas, skx) - m.optimize() - - m.solutionsummary(mosek.streamtype.log) - - if basis_fn is not None: - if m.solutiondef(mosek.soltype.bas): - with open(path_to_string(basis_fn), "w") as f: - f.write(f"NAME {basis_fn}\n") - - skc = [ - (0 if sk != mosek.stakey.bas else 1, i, sk) - for (i, sk) in enumerate(m.getskc(mosek.soltype.bas)) - ] - skx = [ - (0 if sk == mosek.stakey.bas else 1, j, sk) - for (j, sk) in enumerate(m.getskx(mosek.soltype.bas)) - ] - skc.sort() - skc.reverse() - skx.sort() - skx.reverse() - while skx and skc and skx[-1][0] == 0 and skc[-1][0] == 0: - (_, i, kc) = skc.pop() - (_, j, kx) = skx.pop() - - namex = m.getvarname(j) - namec = m.getconname(i) - - if kc in [mosek.stakey.low, mosek.stakey.fix]: - f.write(f" XL {namex} {namec}\n") - else: - f.write(f" XU {namex} {namec}\n") - while skc and skc[-1][0] == 0: - (_, i, kc) = skc.pop() - namec = m.getconname(i) - if kc in [mosek.stakey.low, mosek.stakey.fix]: - f.write(f" LL {namex}\n") - else: - f.write(f" UL {namex}\n") - while skx: - (_, j, kx) = skx.pop() - namex = m.getvarname(j) - if kx == mosek.stakey.bas: - f.write(f" BS {namex}\n") - elif kx in [mosek.stakey.low, mosek.stakey.fix]: - f.write(f" LL {namex}\n") - elif kx == mosek.stakey.upr: - f.write(f" UL {namex}\n") - f.write("ENDATA\n") - - soltype = None - possible_soltypes = [ - mosek.soltype.bas, - mosek.soltype.itr, - mosek.soltype.itg, - ] - for possible_soltype in possible_soltypes: - try: - if m.solutiondef(possible_soltype): - soltype = possible_soltype - except mosek.Error: - pass + pass + m.putskc(mosek.soltype.bas, skc) + m.putskx(mosek.soltype.bas, skx) + m.optimize() - if solution_fn is not None: - try: - m.writesolution(mosek.soltype.bas, path_to_string(solution_fn)) - except mosek.Error as err: - logger.info("Unable to save solution file. Raised error: %s", err) + m.solutionsummary(mosek.streamtype.log) - condition = str(m.getsolsta(soltype)) - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + if basis_fn is not None: + if m.solutiondef(mosek.soltype.bas): + with open(path_to_string(basis_fn), "w") as f: + f.write(f"NAME {basis_fn}\n") + + skc = [ + (0 if sk != mosek.stakey.bas else 1, i, sk) + for (i, sk) in enumerate(m.getskc(mosek.soltype.bas)) + ] + skx = [ + (0 if sk == mosek.stakey.bas else 1, j, sk) + for (j, sk) in enumerate(m.getskx(mosek.soltype.bas)) + ] + skc.sort() + skc.reverse() + skx.sort() + skx.reverse() + while skx and skc and skx[-1][0] == 0 and skc[-1][0] == 0: + (_, i, kc) = skc.pop() + (_, j, kx) = skx.pop() + + namex = m.getvarname(j) + namec = m.getconname(i) + + if kc in [mosek.stakey.low, mosek.stakey.fix]: + f.write(f" XL {namex} {namec}\n") + else: + f.write(f" XU {namex} {namec}\n") + while skc and skc[-1][0] == 0: + (_, i, kc) = skc.pop() + namec = m.getconname(i) + if kc in [mosek.stakey.low, mosek.stakey.fix]: + f.write(f" LL {namex}\n") + else: + f.write(f" UL {namex}\n") + while skx: + (_, j, kx) = skx.pop() + namex = m.getvarname(j) + if kx == mosek.stakey.bas: + f.write(f" BS {namex}\n") + elif kx in [mosek.stakey.low, mosek.stakey.fix]: + f.write(f" LL {namex}\n") + elif kx == mosek.stakey.upr: + f.write(f" UL {namex}\n") + f.write("ENDATA\n") + + soltype = None + possible_soltypes = [ + mosek.soltype.bas, + mosek.soltype.itr, + mosek.soltype.itg, + ] + for possible_soltype in possible_soltypes: + try: + if m.solutiondef(possible_soltype): + soltype = possible_soltype + except mosek.Error: + pass - def get_solver_solution() -> Solution: - objective = m.getprimalobj(soltype) + if solution_fn is not None: + try: + m.writesolution(mosek.soltype.bas, path_to_string(solution_fn)) + except mosek.Error as err: + logger.info("Unable to save solution file. Raised error: %s", err) - sol = m.getxx(soltype) - sol = {m.getvarname(i): sol[i] for i in range(m.getnumvar())} - sol = pd.Series(sol, dtype=float) - sol = set_int_index(sol) + condition = str(m.getsolsta(soltype)) + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - try: - dual = m.gety(soltype) - dual = {m.getconname(i): dual[i] for i in range(m.getnumcon())} - dual = pd.Series(dual, dtype=float) - dual = set_int_index(dual) - except (mosek.Error, AttributeError): - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) - - return Solution(sol, dual, objective) - - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) - - return Result(status, solution) - - -def run_copt( - model, - io_api=None, - problem_fn=None, - solution_fn=None, - log_fn=None, - warmstart_fn=None, - basis_fn=None, - keep_files=False, - env=None, - **solver_options, -): + def get_solver_solution() -> Solution: + objective = m.getprimalobj(soltype) + + sol = m.getxx(soltype) + sol = {m.getvarname(i): sol[i] for i in range(m.getnumvar())} + sol = pd.Series(sol, dtype=float) + sol = set_int_index(sol) + + try: + dual = m.gety(soltype) + dual = {m.getconname(i): dual[i] for i in range(m.getnumcon())} + dual = pd.Series(dual, dtype=float) + dual = set_int_index(dual) + except (mosek.Error, AttributeError): + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) + + return Solution(sol, dual, objective) + + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) + + return Result(status, solution) + + +class COPT(Solver): """ - Solve a linear problem using the COPT solver. + Solver subclass for the COPT solver. https://guide.coap.online/copt/en-doc/index.html For more information on solver options, see https://guide.coap.online/copt/en-doc/parameter.html + + Attributes + ---------- + **solver_options + options for the given solver """ - # conditions: https://guide.coap.online/copt/en-doc/constant.html#chapconst-solstatus - CONDITION_MAP = { - 0: "unstarted", - 1: "optimal", - 2: "infeasible", - 3: "unbounded", - 4: "infeasible_or_unbounded", - 5: "numerical", - 6: "node_limit", - 7: "imprecise", - 8: "time_limit", - 9: "unfinished", - 10: "interrupted", - } - - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps` or None" - ) - problem_fn = model.to_file(problem_fn, io_api) + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for COPT" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the COPT solver. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + COPT environment for the solver + + Returns + ------- + Result + """ + # conditions: https://guide.coap.online/copt/en-doc/constant.html#chapconst-solstatus + CONDITION_MAP = { + 0: "unstarted", + 1: "optimal", + 2: "infeasible", + 3: "unbounded", + 4: "infeasible_or_unbounded", + 5: "numerical", + 6: "node_limit", + 7: "imprecise", + 8: "time_limit", + 9: "unfinished", + 10: "interrupted", + } + + io_api = read_io_api_from_problem_file(problem_fn) + sense = read_sense_from_problem_file(problem_fn) - if env is None: - env = coptpy.Envr() + if env is None: + env_ = coptpy.Envr() - m = env.createModel() + m = env_.createModel() - m.read(path_to_string(problem_fn)) + m.read(path_to_string(problem_fn)) - if log_fn is not None: - m.setLogFile(path_to_string(log_fn)) + if log_fn is not None: + m.setLogFile(path_to_string(log_fn)) - for k, v in solver_options.items(): - m.setParam(k, v) + for k, v in self.solver_options.items(): + m.setParam(k, v) - if warmstart_fn is not None: - m.readBasis(path_to_string(warmstart_fn)) + if warmstart_fn is not None: + m.readBasis(path_to_string(warmstart_fn)) - m.solve() + m.solve() - if basis_fn and m.HasBasis: - try: - m.write(path_to_string(basis_fn)) - except coptpy.CoptError as err: - logger.info("No model basis stored. Raised error: %s", err) + if basis_fn and m.HasBasis: + try: + m.write(path_to_string(basis_fn)) + except coptpy.CoptError as err: + logger.info("No model basis stored. Raised error: %s", err) - if solution_fn: - try: - m.write(path_to_string(solution_fn)) - except coptpy.CoptError as err: - logger.info("No model solution stored. Raised error: %s", err) + if solution_fn: + try: + m.write(path_to_string(solution_fn)) + except coptpy.CoptError as err: + logger.info("No model solution stored. Raised error: %s", err) - condition = m.LpStatus if model.type in ["LP", "QP"] else m.MipStatus - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition + # TODO: check if this suffices + condition = m.MipStatus if m.ismip else m.LpStatus + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition - def get_solver_solution() -> Solution: - objective = m.LpObjval if model.type in ["LP", "QP"] else m.BestObj + def get_solver_solution() -> Solution: + # TODO: check if this suffices + objective = m.BestObj if m.ismip else m.LpObjVal - sol = pd.Series({v.name: v.x for v in m.getVars()}, dtype=float) - sol = set_int_index(sol) + sol = pd.Series({v.name: v.x for v in m.getVars()}, dtype=float) + sol = set_int_index(sol) - try: - dual = pd.Series({v.name: v.pi for v in m.getConstrs()}, dtype=float) - dual = set_int_index(dual) - except (coptpy.CoptError, AttributeError): - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) + try: + dual = pd.Series({v.name: v.pi for v in m.getConstrs()}, dtype=float) + dual = set_int_index(dual) + except (coptpy.CoptError, AttributeError): + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) - return Solution(sol, dual, objective) + return Solution(sol, dual, objective) - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) - env.close() + env_.close() - return Result(status, solution, m) + return Result(status, solution, m) -def run_mindopt( - model: Model, - io_api: str | None = None, - problem_fn: Path | None = None, - solution_fn: Path | None = None, - log_fn: Path | None = None, - warmstart_fn: Path | None = None, - basis_fn: Path | None = None, - keep_files: bool = False, - env: mindoptpy.Env | None = None, - **solver_options, -) -> Result: +class MindOpt(Solver): """ - Solve a linear problem using the MindOpt solver. + Solver subclass for the MindOpt solver. https://solver.damo.alibaba.com/doc/en/html/index.html For more information on solver options, see https://solver.damo.alibaba.com/doc/en/html/API2/param/index.html + + Attributes + ---------- + **solver_options + options for the given solver """ - CONDITION_MAP = { - -1: "error", - 0: "unknown", - 1: "optimal", - 2: "infeasible", - 3: "unbounded", - 4: "infeasible_or_unbounded", - 5: "suboptimal", - } - - if io_api is not None and io_api not in FILE_IO_APIS: - raise ValueError( - "Keyword argument `io_api` has to be one of `lp`, `mps` or None" - ) - if (io_api == "lp" or str(problem_fn).endswith(".lp")) and model.type == "QP": - raise ValueError( - "MindOpt does not support QP problems in LP format. Use `io_api='mps'` instead." - ) - problem_fn = model.to_file(problem_fn, io_api) + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + + def solve_problem_from_model( + self, + model: Model, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + msg = "Direct API not implemented for MindOpt" + raise NotImplementedError(msg) + + def solve_problem_from_file( + self, + problem_fn: Path, + solution_fn: Path | None = None, + log_fn: Path | None = None, + warmstart_fn: Path | None = None, + basis_fn: Path | None = None, + env: None = None, + ) -> Result: + """ + Solve a linear problem from a problem file using the MindOpt solver. + + Parameters + ---------- + problem_fn : Path + Path to the problem file. + solution_fn : Path, optional + Path to the solution file. + log_fn : Path, optional + Path to the log file. + warmstart_fn : Path, optional + Path to the warmstart file. + basis_fn : Path, optional + Path to the basis file. + env : None, optional + MindOpt environment for the solver + + Returns + ------- + Result + + """ + CONDITION_MAP = { + -1: "error", + 0: "unknown", + 1: "optimal", + 2: "infeasible", + 3: "unbounded", + 4: "infeasible_or_unbounded", + 5: "suboptimal", + } + io_api = read_io_api_from_problem_file(problem_fn) + sense = read_sense_from_problem_file(problem_fn) + + if io_api == "lp": + # for model type "QP", lp file with have "[" and "]" in objective function + if "[" in open(problem_fn).read() and "]" in open(problem_fn).read(): + msg = ( + "MindOpt does not support QP problems in LP format. Use MPS file " + "format instead." + ) + raise ValueError(msg) - if env is None: - env = mindoptpy.Env(path_to_string(log_fn) if log_fn else "") - env.start() + if env is None: + env_ = mindoptpy.Env(path_to_string(log_fn) if log_fn else "") - m = mindoptpy.read(path_to_string(problem_fn), env) + env_.start() - for k, v in solver_options.items(): - m.setParam(k, v) + m = mindoptpy.read(path_to_string(problem_fn), env_) - if warmstart_fn: - try: - m.read(path_to_string(warmstart_fn)) - except mindoptpy.MindoptError as err: - logger.info("Model basis could not be read. Raised error: %s", err) - - m.optimize() - - if basis_fn: - try: - m.write(path_to_string(basis_fn)) - except mindoptpy.MindoptError as err: - logger.info("No model basis stored. Raised error: %s", err) - - if solution_fn: - try: - m.write(path_to_string(solution_fn)) - except mindoptpy.MindoptError as err: - logger.info("No model solution stored. Raised error: %s", err) - - condition = m.status - termination_condition = CONDITION_MAP.get(condition, condition) - status = Status.from_termination_condition(termination_condition) - status.legacy_status = condition - - def get_solver_solution() -> Solution: - objective = m.objval - - sol = pd.Series({v.varname: v.X for v in m.getVars()}, dtype=float) - sol = set_int_index(sol) - - try: - dual = pd.Series({c.constrname: c.DualSoln for c in m.getConstrs()}) - dual = set_int_index(dual) - except (mindoptpy.MindoptError, AttributeError): - logger.warning("Dual values of MILP couldn't be parsed") - dual = pd.Series(dtype=float) - - return Solution(sol, dual, objective) - - solution = safe_get_solution(status, get_solver_solution) - maybe_adjust_objective_sign(solution, model.objective.sense, io_api) - - env.dispose() - - return Result(status, solution, m) - - -def run_pips( - model, - io_api=None, - problem_fn=None, - solution_fn=None, - log_fn=None, - warmstart_fn=None, - basis_fn=None, - keep_files=False, - env=None, - **solver_options, -): + for k, v in self.solver_options.items(): + m.setParam(k, v) + + if warmstart_fn: + try: + m.read(path_to_string(warmstart_fn)) + except mindoptpy.MindoptError as err: + logger.info("Model basis could not be read. Raised error: %s", err) + + m.optimize() + + if basis_fn: + try: + m.write(path_to_string(basis_fn)) + except mindoptpy.MindoptError as err: + logger.info("No model basis stored. Raised error: %s", err) + + if solution_fn: + try: + m.write(path_to_string(solution_fn)) + except mindoptpy.MindoptError as err: + logger.info("No model solution stored. Raised error: %s", err) + + condition = m.status + termination_condition = CONDITION_MAP.get(condition, condition) + status = Status.from_termination_condition(termination_condition) + status.legacy_status = condition + + def get_solver_solution() -> Solution: + objective = m.objval + + sol = pd.Series({v.varname: v.X for v in m.getVars()}, dtype=float) + sol = set_int_index(sol) + + try: + dual = pd.Series({c.constrname: c.DualSoln for c in m.getConstrs()}) + dual = set_int_index(dual) + except (mindoptpy.MindoptError, AttributeError): + logger.warning("Dual values of MILP couldn't be parsed") + dual = pd.Series(dtype=float) + + return Solution(sol, dual, objective) + + solution = self.safe_get_solution(status=status, func=get_solver_solution) + solution = maybe_adjust_objective_sign(solution, io_api, sense) + + env_.dispose() + + return Result(status, solution, m) + + +class PIPS(Solver): """ - Solve a linear problem using the PIPS solver. + Solver subclass for the PIPS solver. """ - raise NotImplementedError("The PIPS++ solver interface is not yet implemented.") + + def __init__( + self, + **solver_options, + ): + super().__init__(**solver_options) + msg = "The PIPS solver interface is not yet implemented." + raise NotImplementedError(msg) diff --git a/pyproject.toml b/pyproject.toml index b493ad6d..b0ae3787 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -72,7 +72,7 @@ solvers = [ "cplex; platform_system != 'Darwin' and python_version < '3.12'", "mosek", "mindoptpy; python_version < '3.12'", - "coptpy", + "coptpy!=7.2.1", "xpress; platform_system != 'Darwin' and python_version < '3.11'", "pyscipopt; platform_system != 'Darwin'", ] diff --git a/test/test_optimization.py b/test/test_optimization.py index 6c18893c..2a76efb0 100644 --- a/test/test_optimization.py +++ b/test/test_optimization.py @@ -14,7 +14,8 @@ import xarray as xr from xarray.testing import assert_equal -from linopy import GREATER_EQUAL, LESS_EQUAL, Model +from linopy import GREATER_EQUAL, LESS_EQUAL, Model, solvers +from linopy.common import to_path from linopy.solvers import _new_highspy_mps_layout, available_solvers, quadratic_solvers logger = logging.getLogger(__name__) @@ -27,13 +28,12 @@ params = [(name, io_api) for name in available_solvers for io_api in io_apis] -if "gurobi" in available_solvers: - params.append(("gurobi", "direct")) -if "highs" in available_solvers: - params.append(("highs", "direct")) +direct_solvers = ["gurobi", "highs", "mosek"] +for solver in direct_solvers: + if solver in available_solvers: + params.append((solver, "direct")) if "mosek" in available_solvers: - params.append(("mosek", "direct")) params.append(("mosek", "lp")) @@ -681,6 +681,57 @@ def test_model_resolve(model, solver, io_api): assert np.isclose(model.objective.value, 5.25) +@pytest.mark.parametrize("solver,io_api", [p for p in params if "direct" not in p]) +def test_solver_classes_from_problem_file(model, solver, io_api): + # first test initialization of super class. Should not be possible to initialize + with pytest.raises(TypeError): + solver_super = solvers.Solver() # noqa F841 + + # initialize the solver as object of solver subclass + solver_class = getattr(solvers, f"{solvers.SolverName(solver).name}") + solver_ = solver_class() + # get problem file for testing + problem_fn = model.get_problem_file(io_api=io_api) + model.to_file(to_path(problem_fn), io_api) + solution_fn = model.get_solution_file() if solver in ["glpk", "cbc"] else None + result = solver_.solve_problem(problem_fn=problem_fn, solution_fn=solution_fn) + assert result.status.status.value == "ok" + # x = -0.1, y = 1.7 + assert np.isclose(result.solution.objective, 3.3) + + # test for Value error message if no problem file is given + with pytest.raises(ValueError): + solver_.solve_problem(solution_fn=solution_fn) + + # test for Value error message if no solution file is passed to glpk or cbc + if solver in ["glpk", "cbc"]: + with pytest.raises(ValueError): + solver_.solve_problem(problem_fn=problem_fn) + + # test for Value error message if invalid problem file format is given + with pytest.raises(ValueError): + solver_.solve_problem(problem_fn=solution_fn) + + +@pytest.mark.parametrize("solver,io_api", params) +def test_solver_classes_direct(model, solver, io_api): + # initialize the solver as object of solver subclass + solver_class = getattr(solvers, f"{solvers.SolverName(solver).name}") + solver_ = solver_class() + if io_api == "direct": + result = solver_.solve_problem(model=model) + assert result.status.status.value == "ok" + # x = -0.1, y = 1.7 + assert np.isclose(result.solution.objective, 3.3) + # test for Value error message if direct is tried without giving model + with pytest.raises(ValueError): + solver_.model = None + solver_.solve_problem() + elif solver not in direct_solvers: + with pytest.raises(NotImplementedError): + solver_.solve_problem(model=model) + + # def init_model_large(): # m = Model() # time = pd.Index(range(10), name="time")