"""
Copyright 2013 Steven Diamond, 2017 Akshay Agrawal
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from __future__ import annotations
import time
import warnings
from collections import namedtuple
from dataclasses import dataclass
from typing import Dict, List, Optional, Union
import numpy as np
import cvxpy.utilities as u
import cvxpy.utilities.performance_utils as perf
from cvxpy import Constant, error
from cvxpy import settings as s
from cvxpy.atoms.atom import Atom
from cvxpy.constraints import Equality, Inequality, NonNeg, NonPos, Zero
from cvxpy.constraints.constraint import Constraint
from cvxpy.error import DPPError
from cvxpy.expressions import cvxtypes
from cvxpy.expressions.variable import Variable
from cvxpy.interface.matrix_utilities import scalar_value
from cvxpy.problems.objective import Maximize, Minimize
from cvxpy.reductions import InverseData
from cvxpy.reductions.chain import Chain
from cvxpy.reductions.dgp2dcp.dgp2dcp import Dgp2Dcp
from cvxpy.reductions.dqcp2dcp import dqcp2dcp
from cvxpy.reductions.eval_params import EvalParams
from cvxpy.reductions.flip_objective import FlipObjective
from cvxpy.reductions.solution import INF_OR_UNB_MESSAGE
from cvxpy.reductions.solvers import bisection
from cvxpy.reductions.solvers import defines as slv_def
from cvxpy.reductions.solvers.conic_solvers.conic_solver import ConicSolver
from cvxpy.reductions.solvers.defines import SOLVER_MAP_CONIC, SOLVER_MAP_QP
from cvxpy.reductions.solvers.qp_solvers.qp_solver import QpSolver
from cvxpy.reductions.solvers.solver import Solver
from cvxpy.reductions.solvers.solving_chain import (
SolvingChain,
construct_solving_chain,
)
from cvxpy.settings import SOLVERS
from cvxpy.utilities import debug_tools
from cvxpy.utilities.deterministic import unique_list
SolveResult = namedtuple(
'SolveResult',
['opt_value', 'status', 'primal_values', 'dual_values'])
_COL_WIDTH = 79
_HEADER = (
'='*_COL_WIDTH +
'\n' +
('CVXPY').center(_COL_WIDTH) +
'\n' +
('v' + cvxtypes.version()).center(_COL_WIDTH) +
'\n' +
'='*_COL_WIDTH
)
_COMPILATION_STR = (
'-'*_COL_WIDTH +
'\n' +
('Compilation').center(_COL_WIDTH) +
'\n' +
'-'*_COL_WIDTH
)
_NUM_SOLVER_STR = (
'-'*_COL_WIDTH +
'\n' +
('Numerical solver').center(_COL_WIDTH) +
'\n' +
'-'*_COL_WIDTH
)
_FOOTER = (
'-'*_COL_WIDTH +
'\n' +
('Summary').center(_COL_WIDTH) +
'\n' +
'-'*_COL_WIDTH
)
class Cache:
def __init__(self) -> None:
self.key = None
self.solving_chain: Optional[SolvingChain] = None
self.param_prog = None
self.inverse_data: Optional[InverseData] = None
def invalidate(self) -> None:
self.key = None
self.solving_chain = None
self.param_prog = None
self.inverse_data = None
def make_key(self, solver, gp, ignore_dpp):
return (solver, gp, ignore_dpp)
def gp(self):
return self.key is not None and self.key[1]
def _validate_constraint(constraint):
if isinstance(constraint, Constraint):
return constraint
elif isinstance(constraint, bool):
# replace `True` or `False` values with equivalent Expressions.
return (Constant(0) <= Constant(1) if constraint else
Constant(1) <= Constant(0))
else:
raise ValueError("Problem has an invalid constraint of type %s" %
type(constraint))
[docs]
class Problem(u.Canonical):
"""A convex optimization problem.
Problems are immutable, save for modification through the specification
of :class:`~cvxpy.expressions.constants.parameters.Parameter`
Arguments
---------
objective : Minimize or Maximize
The problem's objective.
constraints : list
The constraints on the problem variables.
"""
# The solve methods available.
REGISTERED_SOLVE_METHODS = {}
def __init__(
self, objective: Union[Minimize, Maximize], constraints: Optional[List[Constraint]] = None
) -> None:
if constraints is None:
constraints = []
# Check that objective is Minimize or Maximize.
if not isinstance(objective, (Minimize, Maximize)):
raise error.DCPError("Problem objective must be Minimize or Maximize.")
# Constraints and objective are immutable.
self._objective = objective
# Raise warning if objective has too many subexpressions.
if debug_tools.node_count(self._objective) >= debug_tools.MAX_NODES:
warnings.warn("Objective contains too many subexpressions. "
"Consider vectorizing your CVXPY code to speed up compilation.")
self._constraints = [_validate_constraint(c) for c in constraints]
# Raise warning if constraint has too many subexpressions.
for i, constraint in enumerate(self._constraints):
if debug_tools.node_count(constraint) >= debug_tools.MAX_NODES:
warnings.warn(f"Constraint #{i} contains too many subexpressions. "
"Consider vectorizing your CVXPY code to speed up compilation.")
self._value = None
self._status: Optional[str] = None
self._solution = None
self._cache = Cache()
self._solver_cache = {}
# Information about the shape of the problem and its constituent parts
self._size_metrics: Optional["SizeMetrics"] = None
# Benchmarks reported by the solver:
self._solver_stats: Optional["SolverStats"] = None
self._compilation_time: Optional[float] = None
self._solve_time: Optional[float] = None
self.args = [self._objective, self._constraints]
@property
def value(self):
"""float : The value from the last time the problem was solved
(or None if not solved).
"""
if self._value is None:
return None
else:
return scalar_value(self._value)
@property
def status(self) -> str:
"""str : The status from the last time the problem was solved; one
of optimal, infeasible, or unbounded (with or without
suffix inaccurate).
"""
return self._status
@property
def solution(self):
"""Solution : The solution from the last time the problem was solved.
"""
return self._solution
@property
def objective(self) -> Union[Minimize, Maximize]:
"""Minimize or Maximize : The problem's objective.
Note that the objective cannot be reassigned after creation,
and modifying the objective after creation will result in
undefined behavior.
"""
return self._objective
@property
def constraints(self) -> List[Constraint]:
"""A shallow copy of the problem's constraints.
Note that constraints cannot be reassigned, appended to, or otherwise
modified after creation, except through parameters.
"""
return self._constraints[:]
@property
def param_dict(self):
"""
Expose all parameters as a dictionary
"""
return {parameters.name(): parameters for parameters in self.parameters()}
@property
def var_dict(self) -> Dict[str, Variable]:
"""
Expose all variables as a dictionary
"""
return {variable.name(): variable for variable in self.variables()}
[docs]
@perf.compute_once
def is_dcp(self, dpp: bool = False) -> bool:
"""Does the problem satisfy DCP rules?
Arguments
---------
dpp : bool, optional
If True, enforce the disciplined parametrized programming (DPP)
ruleset; only relevant when the problem involves Parameters.
DPP is a mild restriction of DCP. When a problem involving
Parameters is DPP, subsequent solves can be much faster than
the first one. For more information, consult the documentation at
https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming
Returns
-------
bool
True if the Expression is DCP, False otherwise.
"""
return all(
expr.is_dcp(dpp) for expr in self.constraints + [self.objective])
[docs]
@perf.compute_once
def is_dgp(self, dpp: bool = False) -> bool:
"""Does the problem satisfy DGP rules?
Arguments
---------
dpp : bool, optional
If True, enforce the disciplined parametrized programming (DPP)
ruleset; only relevant when the problem involves Parameters.
DPP is a mild restriction of DGP. When a problem involving
Parameters is DPP, subsequent solves can be much faster than
the first one. For more information, consult the documentation at
https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming
Returns
-------
bool
True if the Expression is DGP, False otherwise.
"""
return all(
expr.is_dgp(dpp) for expr in self.constraints + [self.objective])
[docs]
@perf.compute_once
def is_dqcp(self) -> bool:
"""Does the problem satisfy the DQCP rules?
"""
return all(
expr.is_dqcp() for expr in self.constraints + [self.objective])
[docs]
@perf.compute_once
def is_dpp(self, context: str = 'dcp') -> bool:
"""Does the problem satisfy DPP rules?
DPP is a mild restriction of DGP. When a problem involving
Parameters is DPP, subsequent solves can be much faster than
the first one. For more information, consult the documentation at
https://www.cvxpy.org/tutorial/advanced/index.html#disciplined-parametrized-programming
Arguments
---------
context : str
Whether to check DPP-compliance for DCP or DGP; ``context`` should
be either ``'dcp'`` or ``'dgp'``. Calling ``problem.is_dpp('dcp')``
is equivalent to ``problem.is_dcp(dpp=True)``, and
`problem.is_dpp('dgp')`` is equivalent to
`problem.is_dgp(dpp=True)`.
Returns
-------
bool
Whether the problem satisfies the DPP rules.
"""
if context.lower() == 'dcp':
return self.is_dcp(dpp=True)
elif context.lower() == 'dgp':
return self.is_dgp(dpp=True)
else:
raise ValueError("Unsupported context ", context)
[docs]
@perf.compute_once
def is_qp(self) -> bool:
"""Is problem a quadratic program?
"""
for c in self.constraints:
if not (isinstance(c, (Equality, Zero)) or c.args[0].is_pwl()):
return False
for var in self.variables():
if var.is_psd() or var.is_nsd():
return False
return (self.is_dcp() and self.objective.args[0].is_qpwa())
@perf.compute_once
def is_mixed_integer(self) -> bool:
return any(v.attributes['boolean'] or v.attributes['integer']
for v in self.variables())
[docs]
@perf.compute_once
def variables(self) -> List[Variable]:
"""Accessor method for variables.
Returns
-------
list of :class:`~cvxpy.expressions.variable.Variable`
A list of the variables in the problem.
"""
vars_ = self.objective.variables()
for constr in self.constraints:
vars_ += constr.variables()
return unique_list(vars_)
[docs]
@perf.compute_once
def parameters(self):
"""Accessor method for parameters.
Returns
-------
list of :class:`~cvxpy.expressions.constants.parameter.Parameter`
A list of the parameters in the problem.
"""
params = self.objective.parameters()
for constr in self.constraints:
params += constr.parameters()
return unique_list(params)
[docs]
@perf.compute_once
def constants(self) -> List[Constant]:
"""Accessor method for constants.
Returns
-------
list of :class:`~cvxpy.expressions.constants.constant.Constant`
A list of the constants in the problem.
"""
const_dict = {}
constants_ = self.objective.constants()
for constr in self.constraints:
constants_ += constr.constants()
# Note that numpy matrices are not hashable, so we use the built-in
# function "id"
const_dict = {id(constant): constant for constant in constants_}
return list(const_dict.values())
[docs]
def atoms(self) -> List[Atom]:
"""Accessor method for atoms.
Returns
-------
list of :class:`~cvxpy.atoms.Atom`
A list of the atom types in the problem; note that this list
contains classes, not instances.
"""
atoms = self.objective.atoms()
for constr in self.constraints:
atoms += constr.atoms()
return unique_list(atoms)
@property
def size_metrics(self) -> "SizeMetrics":
""":class:`~cvxpy.problems.problem.SizeMetrics` : Information about the problem's size.
"""
if self._size_metrics is None:
self._size_metrics = SizeMetrics(self)
return self._size_metrics
@property
def solver_stats(self) -> "SolverStats":
""":class:`~cvxpy.problems.problem.SolverStats` : Information returned by the solver.
"""
return self._solver_stats
@property
def compilation_time(self) -> float | None:
"""float : The number of seconds it took to compile the problem the
last time it was compiled.
"""
return self._compilation_time
[docs]
def solve(self, *args, **kwargs):
"""Compiles and solves the problem using the specified method.
Populates the :code:`status` and :code:`value` attributes on the
problem object as a side-effect.
Arguments
---------
solver : str, optional
The solver to use. For example, 'ECOS', 'SCS', or 'OSQP'.
verbose : bool, optional
Overrides the default of hiding solver output, and prints
logging information describing CVXPY's compilation process.
gp : bool, optional
If True, parses the problem as a disciplined geometric program
instead of a disciplined convex program.
qcp : bool, optional
If True, parses the problem as a disciplined quasiconvex program
instead of a disciplined convex program.
requires_grad : bool, optional
Makes it possible to compute gradients of a solution with respect to
Parameters by calling ``problem.backward()`` after solving, or to
compute perturbations to the variables given perturbations to Parameters by
calling ``problem.derivative()``.
Gradients are only supported for DCP and DGP problems, not
quasiconvex problems. When computing gradients (i.e., when
this argument is True), the problem must satisfy the DPP rules.
enforce_dpp : bool, optional
When True, a DPPError will be thrown when trying to solve a non-DPP
problem (instead of just a warning). Only relevant for problems
involving Parameters. Defaults to False.
ignore_dpp : bool, optional
When True, DPP problems will be treated as non-DPP,
which may speed up compilation. Defaults to False.
method : function, optional
A custom solve method to use.
kwargs : keywords, optional
Additional solver specific arguments. See Notes below.
Notes
------
CVXPY interfaces with a wide range of solvers; the algorithms used by these solvers
have arguments relating to stopping criteria, and strategies to improve solution quality.
There is no one choice of arguments which is perfect for every problem. If you are not
getting satisfactory results from a solver, you can try changing its arguments. The
exact way this is done depends on the specific solver. Here are some examples:
::
prob.solve(solver='ECOS', abstol=1e-6)
prob.solve(solver='OSQP', max_iter=10000).
mydict = {"MSK_DPAR_INTPNT_CO_TOL_NEAR_REL": 10}
prob.solve(solver='MOSEK', mosek_params=mydict).
You should refer to CVXPY's web documentation for details on how to pass solver
solver arguments, available at
https://www.cvxpy.org/tutorial/advanced/index.html#setting-solver-options
Returns
-------
float
The optimal value for the problem, or a string indicating
why the problem could not be solved.
Raises
------
cvxpy.error.DCPError
Raised if the problem is not DCP and `gp` is False.
cvxpy.error.DGPError
Raised if the problem is not DGP and `gp` is True.
cvxpy.error.DPPError
Raised if DPP settings are invalid.
cvxpy.error.SolverError
Raised if no suitable solver exists among the installed solvers,
or if an unanticipated error is encountered.
"""
func_name = kwargs.pop("method", None)
if func_name is not None:
solve_func = Problem.REGISTERED_SOLVE_METHODS[func_name]
else:
solve_func = Problem._solve
return solve_func(self, *args, **kwargs)
[docs]
@classmethod
def register_solve(cls, name: str, func) -> None:
"""Adds a solve method to the Problem class.
Arguments
---------
name : str
The keyword for the method.
func : function
The function that executes the solve method. This function must
take as its first argument the problem instance to solve.
"""
cls.REGISTERED_SOLVE_METHODS[name] = func
[docs]
def get_problem_data(
self, solver,
gp: bool = False,
enforce_dpp: bool = False,
ignore_dpp: bool = False,
verbose: bool = False,
canon_backend: str | None = None,
solver_opts: Optional[dict] = None
):
"""Returns the problem data used in the call to the solver.
When a problem is solved, CVXPY creates a chain of reductions enclosed
in a :class:`~cvxpy.reductions.solvers.solving_chain.SolvingChain`,
and compiles it to some low-level representation that is
compatible with the targeted solver. This method returns that low-level
representation.
For some solving chains, this low-level representation is a dictionary
that contains exactly those arguments that were supplied to the solver;
however, for other solving chains, the data is an intermediate
representation that is compiled even further by the solver interfaces.
A solution to the equivalent low-level problem can be obtained via the
data by invoking the `solve_via_data` method of the returned solving
chain, a thin wrapper around the code external to CVXPY that further
processes and solves the problem. Invoke the unpack_results method
to recover a solution to the original problem.
For example:
::
objective = ...
constraints = ...
problem = cp.Problem(objective, constraints)
data, chain, inverse_data = problem.get_problem_data(cp.SCS)
# calls SCS using `data`
soln = chain.solve_via_data(problem, data)
# unpacks the solution returned by SCS into `problem`
problem.unpack_results(soln, chain, inverse_data)
Alternatively, the `data` dictionary returned by this method
contains enough information to bypass CVXPY and call the solver
directly.
For example:
::
problem = cp.Problem(objective, constraints)
data, _, _ = problem.get_problem_data(cp.SCS)
import scs
probdata = {
'A': data['A'],
'b': data['b'],
'c': data['c'],
}
cone_dims = data['dims']
cones = {
"f": cone_dims.zero,
"l": cone_dims.nonneg,
"q": cone_dims.soc,
"ep": cone_dims.exp,
"s": cone_dims.psd,
}
soln = scs.solve(data, cones)
The structure of the data dict that CVXPY returns depends on the
solver. For details, consult the solver interfaces in
`cvxpy/reductions/solvers`.
Arguments
---------
solver : str
The solver the problem data is for.
gp : bool, optional
If True, then parses the problem as a disciplined geometric program
instead of a disciplined convex program.
enforce_dpp : bool, optional
When True, a DPPError will be thrown when trying to parse a non-DPP
problem (instead of just a warning). Defaults to False.
ignore_dpp : bool, optional
When True, DPP problems will be treated as non-DPP,
which may speed up compilation. Defaults to False.
canon_backend : str, optional
'CPP' (default) | 'SCIPY'
Specifies which backend to use for canonicalization, which can affect
compilation time. Defaults to None, i.e., selecting the default
backend.
verbose : bool, optional
If True, print verbose output related to problem compilation.
solver_opts : dict, optional
A dict of options that will be passed to the specific solver.
In general, these options will override any default settings
imposed by cvxpy.
Returns
-------
dict or object
lowest level representation of problem
SolvingChain
The solving chain that created the data.
list
The inverse data generated by the chain.
Raises
------
cvxpy.error.DPPError
Raised if DPP settings are invalid.
"""
# Invalid DPP setting.
# Must be checked here to avoid cache issues.
if enforce_dpp and ignore_dpp:
raise DPPError("Cannot set enforce_dpp = True and ignore_dpp = True.")
start = time.time()
# Cache includes ignore_dpp because it alters compilation.
key = self._cache.make_key(solver, gp, ignore_dpp)
if key != self._cache.key:
self._cache.invalidate()
solving_chain = self._construct_chain(
solver=solver, gp=gp,
enforce_dpp=enforce_dpp,
ignore_dpp=ignore_dpp,
canon_backend=canon_backend,
solver_opts=solver_opts)
self._cache.key = key
self._cache.solving_chain = solving_chain
self._solver_cache = {}
else:
solving_chain = self._cache.solving_chain
if verbose:
print(_COMPILATION_STR)
if self._cache.param_prog is not None:
# fast path, bypasses application of reductions
if verbose:
s.LOGGER.info(
'Using cached ASA map, for faster compilation '
'(bypassing reduction chain).')
if gp:
dgp2dcp = self._cache.solving_chain.get(Dgp2Dcp)
# Parameters in the param cone prog are the logs
# of parameters in the original problem (with one exception:
# parameters appearing as exponents (in power and gmatmul
# atoms) are unchanged.
old_params_to_new_params = dgp2dcp.canon_methods._parameters
for param in self.parameters():
if param in old_params_to_new_params:
old_params_to_new_params[param].value = np.log(
param.value)
data, solver_inverse_data = solving_chain.solver.apply(
self._cache.param_prog)
inverse_data = self._cache.inverse_data + [solver_inverse_data]
self._compilation_time = time.time() - start
if verbose:
s.LOGGER.info(
'Finished problem compilation '
'(took %.3e seconds).', self._compilation_time)
else:
if verbose:
solver_name = solving_chain.reductions[-1].name()
reduction_chain_str = ' -> '.join(
type(r).__name__ for r in solving_chain.reductions)
s.LOGGER.info(
'Compiling problem (target solver=%s).', solver_name)
s.LOGGER.info('Reduction chain: %s', reduction_chain_str)
data, inverse_data = solving_chain.apply(self, verbose)
safe_to_cache = (
isinstance(data, dict)
and s.PARAM_PROB in data
and not any(isinstance(reduction, EvalParams)
for reduction in solving_chain.reductions)
)
self._compilation_time = time.time() - start
if verbose:
s.LOGGER.info(
'Finished problem compilation '
'(took %.3e seconds).', self._compilation_time)
if safe_to_cache:
if verbose and self.parameters():
s.LOGGER.info(
'(Subsequent compilations of this problem, using the '
'same arguments, should ' 'take less time.)')
self._cache.param_prog = data[s.PARAM_PROB]
# the last datum in inverse_data corresponds to the solver,
# so we shouldn't cache it
self._cache.inverse_data = inverse_data[:-1]
return data, solving_chain, inverse_data
def _find_candidate_solvers(self,
solver=None,
gp: bool = False):
"""
Find candidate solvers for the current problem. If solver
is not None, it checks if the specified solver is compatible
with the problem passed.
Arguments
---------
solver : Union[string, Solver, None]
The name of the solver with which to solve the problem or an
instance of a custom solver. If no solver is supplied
(i.e., if solver is None), then the targeted solver may be any
of those that are installed. If the problem is variable-free,
then this parameter is ignored.
gp : bool
If True, the problem is parsed as a Disciplined Geometric Program
instead of as a Disciplined Convex Program.
Returns
-------
dict
A dictionary of compatible solvers divided in `qp_solvers`
and `conic_solvers`.
Raises
------
cvxpy.error.SolverError
Raised if the problem is not DCP and `gp` is False.
cvxpy.error.DGPError
Raised if the problem is not DGP and `gp` is True.
"""
candidates = {'qp_solvers': [],
'conic_solvers': []}
if isinstance(solver, Solver):
return self._add_custom_solver_candidates(solver)
if solver is not None:
if solver not in slv_def.INSTALLED_SOLVERS:
raise error.SolverError("The solver %s is not installed." % solver)
if solver in slv_def.CONIC_SOLVERS:
candidates['conic_solvers'] += [solver]
if solver in slv_def.QP_SOLVERS:
candidates['qp_solvers'] += [solver]
else:
candidates['qp_solvers'] = [s for s in slv_def.INSTALLED_SOLVERS
if s in slv_def.QP_SOLVERS]
candidates['conic_solvers'] = []
# ECOS_BB can only be called explicitly.
for slv in slv_def.INSTALLED_SOLVERS:
if slv in slv_def.CONIC_SOLVERS and slv != s.ECOS_BB:
candidates['conic_solvers'].append(slv)
# If gp we must have only conic solvers
if gp:
if solver is not None and solver not in slv_def.CONIC_SOLVERS:
raise error.SolverError(
"When `gp=True`, `solver` must be a conic solver "
"(received '%s'); try calling " % solver +
" `solve()` with `solver=cvxpy.ECOS`."
)
elif solver is None:
candidates['qp_solvers'] = [] # No QP solvers allowed
if self.is_mixed_integer():
# ECOS_BB must be called explicitly.
if slv_def.INSTALLED_MI_SOLVERS == [s.ECOS_BB] and solver != s.ECOS_BB:
msg = """
You need a mixed-integer solver for this model. Refer to the documentation
https://www.cvxpy.org/tutorial/advanced/index.html#mixed-integer-programs
for discussion on this topic.
Quick fix 1: if you install the python package CVXOPT (pip install cvxopt),
then CVXPY can use the open-source mixed-integer linear programming
solver `GLPK`. If your problem is nonlinear then you can install SCIP
(pip install pyscipopt).
Quick fix 2: you can explicitly specify solver='ECOS_BB'. This may result
in incorrect solutions and is not recommended.
"""
raise error.SolverError(msg)
# TODO: provide a useful error message when the problem is nonlinear but
# the only installed mixed-integer solvers are MILP solvers (e.g., GLPK_MI).
candidates['qp_solvers'] = [
s for s in candidates['qp_solvers']
if slv_def.SOLVER_MAP_QP[s].MIP_CAPABLE]
candidates['conic_solvers'] = [
s for s in candidates['conic_solvers']
if slv_def.SOLVER_MAP_CONIC[s].MIP_CAPABLE]
if not candidates['conic_solvers'] and \
not candidates['qp_solvers']:
raise error.SolverError(
"Problem is mixed-integer, but candidate "
"QP/Conic solvers (%s) are not MIP-capable." %
(candidates['qp_solvers'] +
candidates['conic_solvers']))
return candidates
def _add_custom_solver_candidates(self, custom_solver: Solver):
"""
Returns a list of candidate solvers where custom_solver is the only potential option.
Arguments
---------
custom_solver : Solver
Returns
-------
dict
A dictionary of compatible solvers divided in `qp_solvers`
and `conic_solvers`.
Raises
------
cvxpy.error.SolverError
Raised if the name of the custom solver conflicts with the name of some officially
supported solver
"""
if custom_solver.name() in SOLVERS:
message = "Custom solvers must have a different name than the officially supported ones"
raise error.SolverError(message)
candidates = {'qp_solvers': [], 'conic_solvers': []}
if not self.is_mixed_integer() or custom_solver.MIP_CAPABLE:
if isinstance(custom_solver, QpSolver):
SOLVER_MAP_QP[custom_solver.name()] = custom_solver
candidates['qp_solvers'] = [custom_solver.name()]
elif isinstance(custom_solver, ConicSolver):
SOLVER_MAP_CONIC[custom_solver.name()] = custom_solver
candidates['conic_solvers'] = [custom_solver.name()]
return candidates
def _construct_chain(
self,
solver: Optional[str] = None,
gp: bool = False,
enforce_dpp: bool = False,
ignore_dpp: bool = False,
canon_backend: str | None = None,
solver_opts: Optional[dict] = None
) -> SolvingChain:
"""
Construct the chains required to reformulate and solve the problem.
In particular, this function
# finds the candidate solvers
# constructs the solving chain that performs the
numeric reductions and solves the problem.
Arguments
---------
solver : str, optional
The solver to use. Defaults to ECOS.
gp : bool, optional
If True, the problem is parsed as a Disciplined Geometric Program
instead of as a Disciplined Convex Program.
enforce_dpp : bool, optional
Whether to error on DPP violations.
ignore_dpp : bool, optional
When True, DPP problems will be treated as non-DPP,
which may speed up compilation. Defaults to False.
canon_backend : str, optional
'CPP' (default) | 'SCIPY'
Specifies which backend to use for canonicalization, which can affect
compilation time. Defaults to None, i.e., selecting the default
backend.
solver_opts: dict, optional
Additional arguments to pass to the solver.
Returns
-------
A solving chain
"""
candidate_solvers = self._find_candidate_solvers(solver=solver, gp=gp)
self._sort_candidate_solvers(candidate_solvers)
return construct_solving_chain(self, candidate_solvers, gp=gp,
enforce_dpp=enforce_dpp,
ignore_dpp=ignore_dpp,
canon_backend=canon_backend,
solver_opts=solver_opts,
specified_solver=solver)
@staticmethod
def _sort_candidate_solvers(solvers) -> None:
"""Sorts candidate solvers lists according to slv_def.CONIC_SOLVERS/QP_SOLVERS
Arguments
---------
candidates : dict
Dictionary of candidate solvers divided in qp_solvers
and conic_solvers
Returns
-------
None
"""
if len(solvers['conic_solvers']) > 1:
solvers['conic_solvers'] = sorted(
solvers['conic_solvers'], key=lambda s: slv_def.CONIC_SOLVERS.index(s)
)
if len(solvers['qp_solvers']) > 1:
solvers['qp_solvers'] = sorted(
solvers['qp_solvers'], key=lambda s: slv_def.QP_SOLVERS.index(s)
)
def _invalidate_cache(self) -> None:
self._cache_key = None
self._solving_chain = None
self._param_prog = None
self._inverse_data = None
def _solve(self,
solver: str = None,
warm_start: bool = True,
verbose: bool = False,
gp: bool = False,
qcp: bool = False,
requires_grad: bool = False,
enforce_dpp: bool = False,
ignore_dpp: bool = False,
canon_backend: str | None = None,
**kwargs):
"""Solves a DCP compliant optimization problem.
Saves the values of primal and dual variables in the variable
and constraint objects, respectively.
Arguments
---------
solver : str, optional
The solver to use. Defaults to ECOS.
warm_start : bool, optional
Should the previous solver result be used to warm start?
verbose : bool, optional
Overrides the default of hiding solver output.
gp : bool, optional
If True, parses the problem as a disciplined geometric program.
qcp : bool, optional
If True, parses the problem as a disciplined quasiconvex program.
requires_grad : bool, optional
Makes it possible to compute gradients with respect to
parameters by calling `backward()` after solving, or to compute
perturbations to the variables by calling `derivative()`. When
True, the solver must be SCS, and dqcp must be False.
A DPPError is thrown when problem is not DPP.
enforce_dpp : bool, optional
When True, a DPPError will be thrown when trying to solve a non-DPP
problem (instead of just a warning). Defaults to False.
ignore_dpp : bool, optional
When True, DPP problems will be treated as non-DPP,
which may speed up compilation. Defaults to False.
canon_backend : str, optional
'CPP' (default) | 'SCIPY'
Specifies which backend to use for canonicalization, which can affect
compilation time. Defaults to None, i.e., selecting the default
backend.
kwargs : dict, optional
A dict of options that will be passed to the specific solver.
In general, these options will override any default settings
imposed by cvxpy.
Returns
-------
float
The optimal value for the problem, or a string indicating
why the problem could not be solved.
"""
if verbose:
print(_HEADER)
for parameter in self.parameters():
if parameter.value is None:
raise error.ParameterError(
"A Parameter (whose name is '%s') does not have a value "
"associated with it; all Parameter objects must have "
"values before solving a problem." % parameter.name())
if verbose:
n_variables = sum(np.prod(v.shape) for v in self.variables())
n_parameters = sum(np.prod(p.shape) for p in self.parameters())
s.LOGGER.info(
'Your problem has %d variables, '
'%d constraints, and ' '%d parameters.',
n_variables, len(self.constraints), n_parameters)
curvatures = []
if self.is_dcp():
curvatures.append('DCP')
if self.is_dgp():
curvatures.append('DGP')
if self.is_dqcp():
curvatures.append('DQCP')
s.LOGGER.info(
'It is compliant with the following grammars: %s',
', '.join(curvatures))
if n_parameters == 0:
s.LOGGER.info(
'(If you need to solve this problem multiple times, '
'but with different data, consider using parameters.)')
s.LOGGER.info(
'CVXPY will first compile your problem; then, it will '
'invoke a numerical solver to obtain a solution.')
s.LOGGER.info(
"Your problem is compiled with the %s canonicalization backend.",
s.DEFAULT_CANON_BACKEND if canon_backend is None else canon_backend)
if requires_grad:
dpp_context = 'dgp' if gp else 'dcp'
if qcp:
raise ValueError("Cannot compute gradients of DQCP problems.")
elif not self.is_dpp(dpp_context):
raise error.DPPError("Problem is not DPP (when requires_grad "
"is True, problem must be DPP).")
elif solver is not None and solver not in [s.SCS, s.DIFFCP]:
raise ValueError("When requires_grad is True, the only "
"supported solver is SCS "
"(received %s)." % solver)
elif s.DIFFCP not in slv_def.INSTALLED_SOLVERS:
raise ModuleNotFoundError(
"The Python package diffcp must be installed to "
"differentiate through problems. Please follow the "
"installation instructions at "
"https://github.com/cvxgrp/diffcp")
else:
solver = s.DIFFCP
else:
if gp and qcp:
raise ValueError("At most one of `gp` and `qcp` can be True.")
if qcp and not self.is_dcp():
if not self.is_dqcp():
raise error.DQCPError("The problem is not DQCP.")
if verbose:
s.LOGGER.info(
'Reducing DQCP problem to a one-parameter '
'family of DCP problems, for bisection.')
reductions = [dqcp2dcp.Dqcp2Dcp()]
start = time.time()
if type(self.objective) == Maximize:
reductions = [FlipObjective()] + reductions
# flip objective flips the sign of the objective
low, high = kwargs.get("low"), kwargs.get("high")
if high is not None:
kwargs["low"] = high * -1
if low is not None:
kwargs["high"] = low * -1
chain = Chain(problem=self, reductions=reductions)
soln = bisection.bisect(
chain.reduce(), solver=solver, verbose=verbose, **kwargs)
self.unpack(chain.retrieve(soln))
return self.value
data, solving_chain, inverse_data = self.get_problem_data(
solver, gp, enforce_dpp, ignore_dpp, verbose, canon_backend, kwargs
)
if verbose:
print(_NUM_SOLVER_STR)
s.LOGGER.info(
'Invoking solver %s to obtain a solution.',
solving_chain.reductions[-1].name())
start = time.time()
solution = solving_chain.solve_via_data(
self, data, warm_start, verbose, kwargs)
end = time.time()
self._solve_time = end - start
self.unpack_results(solution, solving_chain, inverse_data)
if verbose:
print(_FOOTER)
s.LOGGER.info('Problem status: %s', self.status)
val = self.value if self.value is not None else np.NaN
s.LOGGER.info('Optimal value: %.3e', val)
s.LOGGER.info('Compilation took %.3e seconds', self._compilation_time)
s.LOGGER.info(
'Solver (including time spent in interface) took '
'%.3e seconds', self._solve_time)
return self.value
[docs]
def backward(self) -> None:
"""Compute the gradient of a solution with respect to Parameters.
This method differentiates through the solution map of the problem,
obtaining the gradient of a solution with respect to the Parameters.
In other words, it calculates the sensitivities of the Parameters
with respect to perturbations in the optimal Variable values. This
can be useful for integrating CVXPY into automatic differentiation
toolkits.
``backward()`` populates the ``gradient`` attribute of each Parameter
in the problem as a side-effect. It can only be called after calling
``solve()`` with ``requires_grad=True``.
Below is a simple example:
::
import cvxpy as cp
import numpy as np
p = cp.Parameter()
x = cp.Variable()
quadratic = cp.square(x - 2 * p)
problem = cp.Problem(cp.Minimize(quadratic), [x >= 0])
p.value = 3.0
problem.solve(requires_grad=True, eps=1e-10)
# backward() populates the gradient attribute of the parameters
problem.backward()
# Because x* = 2 * p, dx*/dp = 2
np.testing.assert_allclose(p.gradient, 2.0)
In the above example, the gradient could easily be computed by hand.
The ``backward()`` is useful because for almost all problems, the
gradient cannot be computed analytically.
This method can be used to differentiate through any DCP or DGP
problem, as long as the problem is DPP compliant (i.e.,
``problem.is_dcp(dpp=True)`` or ``problem.is_dgp(dpp=True)`` evaluates to
``True``).
This method uses the chain rule to evaluate the gradients of a
scalar-valued function of the Variables with respect to the Parameters.
For example, let x be a variable and p a Parameter; x and p might be
scalars, vectors, or matrices. Let f be a scalar-valued function, with
z = f(x). Then this method computes dz/dp = (dz/dx) (dx/p). dz/dx
is chosen as the all-ones vector by default, corresponding to
choosing f to be the sum function. You can specify a custom value for
dz/dx by setting the ``gradient`` attribute on your variables. For example,
::
import cvxpy as cp
import numpy as np
b = cp.Parameter()
x = cp.Variable()
quadratic = cp.square(x - 2 * b)
problem = cp.Problem(cp.Minimize(quadratic), [x >= 0])
b.value = 3.
problem.solve(requires_grad=True, eps=1e-10)
x.gradient = 4.
problem.backward()
# dz/dp = dz/dx dx/dp = 4. * 2. == 8.
np.testing.assert_allclose(b.gradient, 8.)
The ``gradient`` attribute on a variable can also be interpreted as a
perturbation to its optimal value.
Raises
------
ValueError
if solve was not called with ``requires_grad=True``
SolverError
if the problem is infeasible or unbounded
"""
if s.DIFFCP not in self._solver_cache:
raise ValueError("backward can only be called after calling "
"solve with `requires_grad=True`")
elif self.status not in s.SOLUTION_PRESENT:
raise error.SolverError("Backpropagating through "
"infeasible/unbounded problems is not "
"yet supported. Please file an issue on "
"Github if you need this feature.")
# TODO(akshayka): Backpropagate through dual variables as well.
backward_cache = self._solver_cache[s.DIFFCP]
DT = backward_cache["DT"]
zeros = np.zeros(backward_cache["s"].shape)
del_vars = {}
gp = self._cache.gp()
for variable in self.variables():
if variable.gradient is None:
del_vars[variable.id] = np.ones(variable.shape)
else:
del_vars[variable.id] = np.asarray(variable.gradient,
dtype=np.float64)
if gp:
# x_gp = exp(x_cone_program),
# dx_gp/d x_cone_program = exp(x_cone_program) = x_gp
del_vars[variable.id] *= variable.value
dx = self._cache.param_prog.split_adjoint(del_vars)
start = time.time()
dA, db, dc = DT(dx, zeros, zeros)
end = time.time()
backward_cache['DT_TIME'] = end - start
dparams = self._cache.param_prog.apply_param_jac(dc, -dA, db)
if not gp:
for param in self.parameters():
param.gradient = dparams[param.id]
else:
dgp2dcp = self._cache.solving_chain.get(Dgp2Dcp)
old_params_to_new_params = dgp2dcp.canon_methods._parameters
for param in self.parameters():
# Note: if param is an exponent in a power or gmatmul atom,
# then the parameter passes through unchanged to the DCP
# program; if the param is also used elsewhere (not as an
# exponent), then param will also be in
# old_params_to_new_params. Therefore, param.gradient =
# dparams[param.id] (or 0) + 1/param*dparams[new_param.id]
#
# Note that param.id is in dparams if and only if
# param was used as an exponent (because this means that
# the parameter entered the DCP problem unchanged.)
grad = 0.0 if param.id not in dparams else dparams[param.id]
if param in old_params_to_new_params:
new_param = old_params_to_new_params[param]
# new_param.value == log(param), apply chain rule
grad += (1.0 / param.value) * dparams[new_param.id]
param.gradient = grad
[docs]
def derivative(self) -> None:
"""Apply the derivative of the solution map to perturbations in the Parameters
This method applies the derivative of the solution map to perturbations
in the Parameters to obtain perturbations in the optimal values of the
Variables. In other words, it tells you how the optimal values of the
Variables would be changed by small changes to the Parameters.
You can specify perturbations in a Parameter by setting its ``delta``
attribute (if unspecified, the perturbation defaults to 0).
This method populates the ``delta`` attribute of the Variables as a
side-effect.
This method can only be called after calling ``solve()`` with
``requires_grad=True``. It is compatible with both DCP and DGP
problems (that are also DPP-compliant).
Below is a simple example:
::
import cvxpy as cp
import numpy as np
p = cp.Parameter()
x = cp.Variable()
quadratic = cp.square(x - 2 * p)
problem = cp.Problem(cp.Minimize(quadratic), [x >= 0])
p.value = 3.0
problem.solve(requires_grad=True, eps=1e-10)
# derivative() populates the delta attribute of the variables
p.delta = 1e-3
problem.derivative()
# Because x* = 2 * p, dx*/dp = 2, so (dx*/dp)(p.delta) == 2e-3
np.testing.assert_allclose(x.delta, 2e-3)
Raises
------
ValueError
if solve was not called with ``requires_grad=True``
SolverError
if the problem is infeasible or unbounded
"""
if s.DIFFCP not in self._solver_cache:
raise ValueError("derivative can only be called after calling "
"solve with `requires_grad=True`")
elif self.status not in s.SOLUTION_PRESENT:
raise ValueError("Differentiating through infeasible/unbounded "
"problems is not yet supported. Please file an "
"issue on Github if you need this feature.")
# TODO(akshayka): Forward differentiate dual variables as well
backward_cache = self._solver_cache[s.DIFFCP]
param_prog = self._cache.param_prog
D = backward_cache["D"]
param_deltas = {}
gp = self._cache.gp()
if gp:
dgp2dcp = self._cache.solving_chain.get(Dgp2Dcp)
if not self.parameters():
for variable in self.variables():
variable.delta = np.zeros(variable.shape)
return
for param in self.parameters():
delta = param.delta if param.delta is not None else np.zeros(param.shape)
if gp:
if param in dgp2dcp.canon_methods._parameters:
new_param_id = dgp2dcp.canon_methods._parameters[param].id
else:
new_param_id = param.id
param_deltas[new_param_id] = (
1.0/param.value * np.asarray(delta, dtype=np.float64))
if param.id in param_prog.param_id_to_col:
# here, param generated a new parameter and also
# passed through to the param cone prog unchanged
# (because it was an exponent of a power)
param_deltas[param.id] = np.asarray(delta,
dtype=np.float64)
else:
param_deltas[param.id] = np.asarray(delta, dtype=np.float64)
dc, _, dA, db = param_prog.apply_parameters(param_deltas,
zero_offset=True)
start = time.time()
dx, _, _ = D(-dA, db, dc)
end = time.time()
backward_cache['D_TIME'] = end - start
dvars = param_prog.split_solution(
dx, [v.id for v in self.variables()])
for variable in self.variables():
variable.delta = dvars[variable.id]
if gp:
# x_gp = exp(x_cone_program),
# dx_gp/d x_cone_program = exp(x_cone_program) = x_gp
variable.delta *= variable.value
def _clear_solution(self) -> None:
for v in self.variables():
v.save_value(None)
for c in self.constraints:
for dv in c.dual_variables:
dv.save_value(None)
self._value = None
self._status = None
self._solution = None
def unpack(self, solution) -> None:
"""Updates the problem state given a Solution.
Updates problem.status, problem.value and value of primal and dual
variables. If solution.status is in cvxpy.settins.ERROR, this method
is a no-op.
Arguments
_________
solution : cvxpy.Solution
A Solution object.
Raises
------
ValueError
If the solution object has an invalid status
"""
if solution.status in s.SOLUTION_PRESENT:
for v in self.variables():
v.save_value(solution.primal_vars[v.id])
for c in self.constraints:
if c.id in solution.dual_vars:
c.save_dual_value(solution.dual_vars[c.id])
# Eliminate confusion of problem.value versus objective.value.
self._value = self.objective.value
elif solution.status in s.INF_OR_UNB:
for v in self.variables():
v.save_value(None)
for constr in self.constraints:
for dv in constr.dual_variables:
dv.save_value(None)
self._value = solution.opt_val
else:
raise ValueError("Cannot unpack invalid solution: %s" % solution)
self._status = solution.status
self._solution = solution
[docs]
def unpack_results(self, solution, chain: SolvingChain, inverse_data) -> None:
"""Updates the problem state given the solver results.
Updates problem.status, problem.value and value of
primal and dual variables.
Arguments
_________
solution : object
The solution returned by applying the chain to the problem
and invoking the solver on the resulting data.
chain : SolvingChain
A solving chain that was used to solve the problem.
inverse_data : list
The inverse data returned by applying the chain to the problem.
Raises
------
cvxpy.error.SolverError
If the solver failed
"""
solution = chain.invert(solution, inverse_data)
if solution.status in s.INACCURATE:
warnings.warn(
"Solution may be inaccurate. Try another solver, "
"adjusting the solver settings, or solve with "
"verbose=True for more information."
)
if solution.status == s.INFEASIBLE_OR_UNBOUNDED:
warnings.warn(INF_OR_UNB_MESSAGE)
if solution.status in s.ERROR:
raise error.SolverError(
"Solver '%s' failed. " % chain.solver.name() +
"Try another solver, or solve with verbose=True for more "
"information.")
self.unpack(solution)
self._solver_stats = SolverStats.from_dict(self._solution.attr,
chain.solver.name())
def __str__(self) -> str:
if len(self.constraints) == 0:
return str(self.objective)
else:
subject_to = "subject to "
lines = [str(self.objective),
subject_to + str(self.constraints[0])]
for constr in self.constraints[1:]:
lines += [len(subject_to) * " " + str(constr)]
return '\n'.join(lines)
def __repr__(self) -> str:
return "Problem(%s, %s)" % (repr(self.objective),
repr(self.constraints))
def __neg__(self) -> "Problem":
return Problem(-self.objective, self.constraints)
def __add__(self, other) -> "Problem":
if other == 0:
return self
elif not isinstance(other, Problem):
raise NotImplementedError()
return Problem(self.objective + other.objective,
unique_list(self.constraints + other.constraints))
def __radd__(self, other) -> "Problem":
if other == 0:
return self
else:
raise NotImplementedError()
def __sub__(self, other) -> "Problem":
if not isinstance(other, Problem):
raise NotImplementedError()
return Problem(self.objective - other.objective,
unique_list(self.constraints + other.constraints))
def __rsub__(self, other) -> "Problem":
if other == 0:
return -self
else:
raise NotImplementedError()
def __mul__(self, other) -> "Problem":
if not isinstance(other, (int, float)):
raise NotImplementedError()
return Problem(self.objective * other, self.constraints)
__rmul__ = __mul__
def __div__(self, other) -> "Problem":
if not isinstance(other, (int, float)):
raise NotImplementedError()
return Problem(self.objective * (1.0 / other), self.constraints)
def is_constant(self) -> bool:
return False
__truediv__ = __div__
[docs]
@dataclass
class SolverStats:
"""Reports some of the miscellaneous information that is returned
by the solver after solving but that is not captured directly by
the Problem instance.
Attributes
----------
solver_name : str
The name of the solver.
solve_time : double
The time (in seconds) it took for the solver to solve the problem.
setup_time : double
The time (in seconds) it took for the solver to setup the problem.
num_iters : int
The number of iterations the solver had to go through to find a solution.
extra_stats : object
Extra statistics specific to the solver; these statistics are typically
returned directly from the solver, without modification by CVXPY.
This object may be a dict, or a custom Python object.
"""
solver_name: str
solve_time: Optional[float] = None
setup_time: Optional[float] = None
num_iters: Optional[int] = None
extra_stats: Optional[dict] = None
[docs]
@classmethod
def from_dict(cls, attr: dict, solver_name: str) -> "SolverStats":
"""Construct a SolverStats object from a dictionary of attributes.
Parameters
----------
attr : dict
A dictionary of attributes returned by the solver.
solver_name : str
The name of the solver.
Returns
-------
SolverStats
A SolverStats object.
"""
return cls(
solver_name,
solve_time=attr.get(s.SOLVE_TIME),
setup_time=attr.get(s.SETUP_TIME),
num_iters=attr.get(s.NUM_ITERS),
extra_stats=attr.get(s.EXTRA_STATS),
)
[docs]
class SizeMetrics:
"""Reports various metrics regarding the problem.
Attributes
----------
num_scalar_variables : integer
The number of scalar variables in the problem.
num_scalar_data : integer
The number of scalar constants and parameters in the problem. The number of
constants used across all matrices, vectors, in the problem.
Some constants are not apparent when the problem is constructed: for example,
The sum_squares expression is a wrapper for a quad_over_lin expression with a
constant 1 in the denominator.
num_scalar_eq_constr : integer
The number of scalar equality constraints in the problem.
num_scalar_leq_constr : integer
The number of scalar inequality constraints in the problem.
max_data_dimension : integer
The longest dimension of any data block constraint or parameter.
max_big_small_squared : integer
The maximum value of (big)(small)^2 over all data blocks of the problem, where
(big) is the larger dimension and (small) is the smaller dimension
for each data block.
"""
def __init__(self, problem: Problem) -> None:
# num_scalar_variables
self.num_scalar_variables = 0
for var in problem.variables():
self.num_scalar_variables += var.size
# num_scalar_data, max_data_dimension, and max_big_small_squared
self.max_data_dimension = 0
self.num_scalar_data = 0
self.max_big_small_squared = 0
for const in problem.constants()+problem.parameters():
big = 0
# Compute number of data
self.num_scalar_data += const.size
big = 1 if len(const.shape) == 0 else max(const.shape)
small = 1 if len(const.shape) == 0 else min(const.shape)
# Get max data dimension:
if self.max_data_dimension < big:
self.max_data_dimension = big
max_big_small_squared = float(big)*(float(small)**2)
if self.max_big_small_squared < max_big_small_squared:
self.max_big_small_squared = max_big_small_squared
# num_scalar_eq_constr
self.num_scalar_eq_constr = 0
for constraint in problem.constraints:
if isinstance(constraint, (Equality, Zero)):
self.num_scalar_eq_constr += constraint.expr.size
# num_scalar_leq_constr
self.num_scalar_leq_constr = 0
for constraint in problem.constraints:
if isinstance(constraint, (Inequality, NonPos, NonNeg)):
self.num_scalar_leq_constr += constraint.expr.size