"""
``base_optimizer`` 模块包含了父类 ``BaseOptimizer`` ,所有的优化器都将继承于此。
``BaseConvexOptimizer`` 是所有 ``cvxpy`` (和 ``scipy`` )优化的基类。
此外,我们定义了一个通用的效用函数 ``portfolio_performance`` 来评估一组给定的投资组合权重的收益和风险。
"""
import collections
import copy
import json
import warnings
from collections.abc import Iterable
from typing import List
import cvxpy as cp
import numpy as np
import pandas as pd
import scipy.optimize as sco
from . import exceptions, objective_functions
[文档]
class BaseOptimizer:
"""
可用变量:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
公共方法:
- ``set_weights()`` 从一个权重数据中创建 self.weights (np.ndarray)。
- ``clean_weights()`` 对权重进行四舍五入,并将接近零的部分剪掉。
- ``save_weights_to_file()`` 将权重保存为 csv、json 或 txt。
"""
[文档]
def __init__(self, n_assets, tickers=None):
"""
:param n_assets: number of assets
:type n_assets: int
:param tickers: name of assets
:type tickers: list
"""
self.n_assets = n_assets
if tickers is None:
self.tickers = list(range(n_assets))
else:
self.tickers = tickers
self._risk_free_rate = None
# Outputs
self.weights = None
def _make_output_weights(self, weights=None):
"""
Utility function to make output weight dict from weight attribute (np.array). If no
arguments passed, use self.tickers and self.weights. If one argument is passed, assume
it is an alternative weight array so use self.tickers and the argument.
"""
if weights is None:
weights = self.weights
return collections.OrderedDict(zip(self.tickers, weights))
[文档]
def set_weights(self, input_weights):
"""
设置用户输入的权重属性(np.array)的实用函数
:param input_weights: {ticker: weight} dict
:type input_weights: dict
"""
self.weights = np.array([input_weights[ticker] for ticker in self.tickers])
[文档]
def clean_weights(self, cutoff=1e-4, rounding=5):
"""
帮助方法来清理原始权重,将绝对值低于分界线的权重设置为零,并对其余部分进行舍入。
:param cutoff: the lower bound, defaults to 1e-4
:type cutoff: float, optional
:param rounding: number of decimal places to round the weights, defaults to 5.
Set to None if rounding is not desired.
:type rounding: int, optional
:return: asset weights
:rtype: OrderedDict
"""
if self.weights is None:
raise AttributeError("Weights not yet computed")
clean_weights = self.weights.copy()
clean_weights[np.abs(clean_weights) < cutoff] = 0
if rounding is not None:
if not isinstance(rounding, int) or rounding < 1:
raise ValueError("rounding must be a positive integer")
clean_weights = np.round(clean_weights, rounding)
return self._make_output_weights(clean_weights)
[文档]
def save_weights_to_file(self, filename="weights.csv"):
"""
保存权重到一个文本文件的实用方法。
:param filename: name of file. Should be csv, json, or txt.
:type filename: str
"""
clean_weights = self.clean_weights()
ext = filename.split(".")[1]
if ext == "csv":
pd.Series(clean_weights).to_csv(filename, header=False)
elif ext == "json":
with open(filename, "w") as fp:
json.dump(clean_weights, fp)
elif ext == "txt":
with open(filename, "w") as f:
f.write(str(dict(clean_weights)))
else:
raise NotImplementedError("Only supports .txt .json .csv")
[文档]
class BaseConvexOptimizer(BaseOptimizer):
"""
BaseConvexOptimizer 包含许多私有变量供 ``cvxpy`` 使用。
例如,权重的不可变的优化变量被存储为 self._w。我们不鼓励直接与这些变量进行交互。
可用变量:
- ``n_assets`` - int
- ``tickers`` - str list
- ``weights`` - np.ndarray
- ``_opt`` - cp.Problem
- ``_solver`` - str
- ``_solver_options`` - {str: str} dict
公共方法:
- ``add_objective()`` 为优化问题添加一个(凸)目标。
- ``add_constraint()`` 在优化问题中加入一个约束条件。
- `convex_objective()` 解决一个带有线性约束的一般凸目标。
- ``nonconvex_objective()`` 使用 scipy 后端求解一个通用的非凸目标。这很容易陷入局部最小值,所以通常不推荐。
- ``set_weights()`` 从一个权重字典中创建 self.weights (np.ndarray)。
- ``clean_weights()`` 对权重进行四舍五入,并剪掉接近零的部分。
- ``save_weights_to_file()`` 将权重保存为 csv、json 或 txt。
"""
[文档]
def __init__(
self,
n_assets,
tickers=None,
weight_bounds=(0, 1),
solver=None,
verbose=False,
solver_options=None,
):
"""
:param weight_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical, defaults to (0, 1). Must be changed to (-1, 1)
for portfolios with shorting.
:type weight_bounds: tuple OR tuple list, optional
:param solver: name of solver. list available solvers with: ``cvxpy.installed_solvers()``
:type solver: str, optional.
:param verbose: whether performance and debugging info should be printed, defaults to False
:type verbose: bool, optional
:param solver_options: parameters for the given solver
:type solver_options: dict, optional
"""
super().__init__(n_assets, tickers)
# Optimization variables
self._w = cp.Variable(n_assets)
self._objective = None
self._additional_objectives = []
self._constraints = []
self._lower_bounds = None
self._upper_bounds = None
self._opt = None
self._solver = solver
self._verbose = verbose
self._solver_options = solver_options if solver_options else {}
self._map_bounds_to_constraints(weight_bounds)
[文档]
def deepcopy(self):
"""
返回优化器的自定义深度拷贝。这是必要的,因为 cvxpy 表达式不支持深度拷贝,但是为了避免意外的副作用,需要拷贝可变参数。
相反,我们创建一个优化器的浅层副本,然后手动复制可变参数。
"""
self_copy = copy.copy(self)
self_copy._additional_objectives = [
copy.copy(obj) for obj in self_copy._additional_objectives
]
self_copy._constraints = [copy.copy(con) for con in self_copy._constraints]
return self_copy
[文档]
def _map_bounds_to_constraints(self, test_bounds):
"""
将输入边界转换为 cvxpy 可以接受的形式,并添加到约束列表中。
:param test_bounds: minimum and maximum weight of each asset OR single min/max pair
if all identical OR pair of arrays corresponding to lower/upper bounds. defaults to (0, 1).
:type test_bounds: tuple OR list/tuple of tuples OR pair of np arrays
:raises TypeError: if ``test_bounds`` is not of the right type
:return: bounds suitable for cvxpy
:rtype: tuple pair of np.ndarray
"""
# If it is a collection with the right length, assume they are all bounds.
if len(test_bounds) == self.n_assets and not isinstance(
test_bounds[0], (float, int)
):
bounds = np.array(test_bounds, dtype=float)
self._lower_bounds = np.nan_to_num(bounds[:, 0], nan=-np.inf)
self._upper_bounds = np.nan_to_num(bounds[:, 1], nan=np.inf)
else:
# Otherwise this must be a pair.
if len(test_bounds) != 2 or not isinstance(test_bounds, (tuple, list)):
raise TypeError(
"test_bounds must be a pair (lower bound, upper bound) OR a collection of bounds for each asset"
)
lower, upper = test_bounds
# Replace None values with the appropriate +/- 1
if np.isscalar(lower) or lower is None:
lower = -1 if lower is None else lower
self._lower_bounds = np.array([lower] * self.n_assets)
upper = 1 if upper is None else upper
self._upper_bounds = np.array([upper] * self.n_assets)
else:
self._lower_bounds = np.nan_to_num(lower, nan=-1)
self._upper_bounds = np.nan_to_num(upper, nan=1)
self.add_constraint(lambda w: w >= self._lower_bounds)
self.add_constraint(lambda w: w <= self._upper_bounds)
def is_parameter_defined(self, parameter_name: str) -> bool:
is_defined = False
objective_and_constraints = (
self._constraints + [self._objective]
if self._objective is not None
else self._constraints
)
for expr in objective_and_constraints:
params = [
arg for arg in _get_all_args(expr) if isinstance(arg, cp.Parameter)
]
for param in params:
if param.name() == parameter_name and not is_defined:
is_defined = True
elif param.name() == parameter_name and is_defined:
raise exceptions.InstantiationError(
"Parameter name defined multiple times"
)
return is_defined
def update_parameter_value(self, parameter_name: str, new_value: float) -> None:
if not self.is_parameter_defined(parameter_name):
raise exceptions.InstantiationError("Parameter has not been defined")
was_updated = False
objective_and_constraints = (
self._constraints + [self._objective]
if self._objective is not None
else self._constraints
)
for expr in objective_and_constraints:
params = [
arg for arg in _get_all_args(expr) if isinstance(arg, cp.Parameter)
]
for param in params:
if param.name() == parameter_name:
param.value = new_value
was_updated = True
if not was_updated:
raise exceptions.InstantiationError("Parameter was not updated")
[文档]
def _solve_cvxpy_opt_problem(self):
"""
定义了目标和约束条件后,用辅助方法来解决 cvxpy 问题并检查输出。
:raises exceptions.OptimizationError: if problem is not solvable by cvxpy
"""
try:
if self._opt is None:
self._opt = cp.Problem(cp.Minimize(self._objective), self._constraints)
self._initial_objective = self._objective.id
self._initial_constraint_ids = {const.id for const in self._constraints}
else:
if not self._objective.id == self._initial_objective:
raise exceptions.InstantiationError(
"The objective function was changed after the initial optimization. "
"Please create a new instance instead."
)
constr_ids = {const.id for const in self._constraints}
if not constr_ids == self._initial_constraint_ids:
raise exceptions.InstantiationError(
"The constraints were changed after the initial optimization. "
"Please create a new instance instead."
)
self._opt.solve(
solver=self._solver, verbose=self._verbose, **self._solver_options
)
except (TypeError, cp.DCPError) as e:
raise exceptions.OptimizationError from e
if self._opt.status not in {"optimal", "optimal_inaccurate"}:
raise exceptions.OptimizationError(
"Solver status: {}".format(self._opt.status)
)
self.weights = self._w.value.round(16) + 0.0 # +0.0 removes signed zero
return self._make_output_weights()
[文档]
def add_objective(self, new_objective, **kwargs):
"""
在目标函数中添加一个新项。这个项必须是凸的,并由 cvxpy 原子函数构建。
例如::
def L1_norm(w, k=1):
return k * cp.norm(w, 1)
ef.add_objective(L1_norm, k=2)
:param new_objective: the objective to be added
:type new_objective: cp.Expression (i.e function of cp.Variable)
"""
if self._opt is not None:
raise exceptions.InstantiationError(
"Adding objectives to an already solved problem might have unintended consequences. "
"A new instance should be created for the new set of objectives."
)
self._additional_objectives.append(new_objective(self._w, **kwargs))
[文档]
def add_constraint(self, new_constraint):
"""
在优化问题中加入一个新的约束条件。这个约束必须满足 DCP 规则,即要么是一个线性平等约束,要么是凸不平等约束。
例如::
ef.add_constraint(lambda x : x[0] == 0.02)
ef.add_constraint(lambda x : x >= 0.01)
ef.add_constraint(lambda x: x <= np.array([0.01, 0.08, ..., 0.5]))
:param new_constraint: the constraint to be added
:type new_constraint: callable (e.g lambda function)
"""
if not callable(new_constraint):
raise TypeError(
"New constraint must be provided as a callable (e.g lambda function)"
)
if self._opt is not None:
raise exceptions.InstantiationError(
"Adding constraints to an already solved problem might have unintended consequences. "
"A new instance should be created for the new set of constraints."
)
self._constraints.append(new_constraint(self._w))
[文档]
def add_sector_constraints(self, sector_mapper, sector_lower, sector_upper):
"""
对不同资产组的权重总和添加限制。最常见的是行业限制,例如,投资组合中的科技风险必须低于 x%::
sector_mapper = {
"GOOG": "tech",
"FB": "tech",,
"XOM": "Oil/Gas",
"RRC": "Oil/Gas",
"MA": "Financials",
"JPM": "Financials",
}
sector_lower = {"tech": 0.1} # at least 10% to tech
sector_upper = {
"tech": 0.4, # less than 40% tech
"Oil/Gas": 0.1 # less than 10% oil and gas
}
:param sector_mapper: dict that maps tickers to sectors
:type sector_mapper: {str: str} dict
:param sector_lower: lower bounds for each sector
:type sector_lower: {str: float} dict
:param sector_upper: upper bounds for each sector
:type sector_upper: {str:float} dict
"""
if np.any(self._lower_bounds < 0):
warnings.warn(
"Sector constraints may not produce reasonable results if shorts are allowed."
)
for sector in sector_upper:
is_sector = [sector_mapper.get(t) == sector for t in self.tickers]
self.add_constraint(lambda w: cp.sum(w[is_sector]) <= sector_upper[sector])
for sector in sector_lower:
is_sector = [sector_mapper.get(t) == sector for t in self.tickers]
self.add_constraint(lambda w: cp.sum(w[is_sector]) >= sector_lower[sector])
[文档]
def convex_objective(self, custom_objective, weights_sum_to_one=True, **kwargs):
"""
优化一个自定义的凸型目标函数。应使用 ``ef.add_constraint()`` 添加约束条件。优化器的参数必须作为 keyword-args 传递。例如::
# Could define as a lambda function instead
def logarithmic_barrier(w, cov_matrix, k=0.1):
# 60 Years of Portfolio Optimization, Kolm et al (2014)
return cp.quad_form(w, cov_matrix) - k * cp.sum(cp.log(w))
w = ef.convex_objective(logarithmic_barrier, cov_matrix=ef.cov_matrix)
:param custom_objective: an objective function to be MINIMISED. This should be written using
cvxpy atoms Should map (w, `**kwargs`) -> float.
:type custom_objective: function with signature (cp.Variable, `**kwargs`) -> cp.Expression
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:raises OptimizationError: if the objective is nonconvex or constraints nonlinear.
:return: asset weights for the efficient risk portfolio
:rtype: OrderedDict
"""
# custom_objective must have the right signature (w, **kwargs)
self._objective = custom_objective(self._w, **kwargs)
for obj in self._additional_objectives:
self._objective += obj
if weights_sum_to_one:
self.add_constraint(lambda w: cp.sum(w) == 1)
return self._solve_cvxpy_opt_problem()
[文档]
def nonconvex_objective(
self,
custom_objective,
objective_args=None,
weights_sum_to_one=True,
constraints=None,
solver="SLSQP",
initial_guess=None,
):
"""
使用 scipy 后端优化一些目标函数。这可以支持非凸目标和非线性约束,但可能会卡在局部最小值。例如::
# Market-neutral efficient risk
constraints = [
{"type": "eq", "fun": lambda w: np.sum(w)}, # weights sum to zero
{
"type": "eq",
"fun": lambda w: target_risk ** 2 - np.dot(w.T, np.dot(ef.cov_matrix, w)),
}, # risk = target_risk
]
ef.nonconvex_objective(
lambda w, mu: -w.T.dot(mu), # min negative return (i.e maximise return)
objective_args=(ef.expected_returns,),
weights_sum_to_one=False,
constraints=constraints,
)
:param objective_function: an objective function to be MINIMISED. This function
should map (weight, args) -> cost
:type objective_function: function with signature (np.ndarray, args) -> float
:param objective_args: arguments for the objective function (excluding weight)
:type objective_args: tuple of np.ndarrays
:param weights_sum_to_one: whether to add the default objective, defaults to True
:type weights_sum_to_one: bool, optional
:param constraints: list of constraints in the scipy format (i.e dicts)
:type constraints: dict list
:param solver: which SCIPY solver to use, e.g "SLSQP", "COBYLA", "BFGS".
User beware: different optimizers require different inputs.
:type solver: string
:param initial_guess: the initial guess for the weights, shape (n,) or (n, 1)
:type initial_guess: np.ndarray
:return: asset weights that optimize the custom objective
:rtype: OrderedDict
"""
# Sanitise inputs
if not isinstance(objective_args, tuple):
objective_args = (objective_args,)
# Make scipy bounds
bound_array = np.vstack((self._lower_bounds, self._upper_bounds)).T
bounds = list(map(tuple, bound_array))
if initial_guess is None:
initial_guess = np.array([1 / self.n_assets] * self.n_assets)
# Construct constraints
final_constraints = []
if weights_sum_to_one:
final_constraints.append({"type": "eq", "fun": lambda w: np.sum(w) - 1})
if constraints is not None:
final_constraints += constraints
result = sco.minimize(
custom_objective,
x0=initial_guess,
args=objective_args,
method=solver,
bounds=bounds,
constraints=final_constraints,
)
self.weights = result["x"]
return self._make_output_weights()
def portfolio_performance(
weights, expected_returns, cov_matrix, verbose=False, risk_free_rate=0.02
):
"""
After optimising, calculate (and optionally print) the performance of the optimal
portfolio. Currently calculates expected return, volatility, and the Sharpe ratio.
:param expected_returns: expected returns for each asset. Can be None if
optimising for volatility only (but not recommended).
:type expected_returns: np.ndarray or pd.Series
:param cov_matrix: covariance of returns for each asset
:type cov_matrix: np.array or pd.DataFrame
:param weights: weights or assets
:type weights: list, np.array or dict, optional
:param verbose: whether performance should be printed, defaults to False
:type verbose: bool, optional
:param risk_free_rate: risk-free rate of borrowing/lending, defaults to 0.02
:type risk_free_rate: float, optional
:raises ValueError: if weights have not been calculated yet
:return: expected return, volatility, Sharpe ratio.
:rtype: (float, float, float)
"""
if isinstance(weights, dict):
if isinstance(expected_returns, pd.Series):
tickers = list(expected_returns.index)
elif isinstance(cov_matrix, pd.DataFrame):
tickers = list(cov_matrix.columns)
else:
tickers = list(range(len(expected_returns)))
new_weights = np.zeros(len(tickers))
for i, k in enumerate(tickers):
if k in weights:
new_weights[i] = weights[k]
if new_weights.sum() == 0:
raise ValueError("Weights add to zero, or ticker names don't match")
elif weights is not None:
new_weights = np.asarray(weights)
else:
raise ValueError("Weights is None")
sigma = np.sqrt(objective_functions.portfolio_variance(new_weights, cov_matrix))
if expected_returns is not None:
mu = objective_functions.portfolio_return(
new_weights, expected_returns, negative=False
)
sharpe = objective_functions.sharpe_ratio(
new_weights,
expected_returns,
cov_matrix,
risk_free_rate=risk_free_rate,
negative=False,
)
if verbose:
print("Expected annual return: {:.1f}%".format(100 * mu))
print("Annual volatility: {:.1f}%".format(100 * sigma))
print("Sharpe Ratio: {:.2f}".format(sharpe))
return mu, sigma, sharpe
else:
if verbose:
print("Annual volatility: {:.1f}%".format(100 * sigma))
return None, sigma, None
def _get_all_args(expression: cp.Expression) -> List[cp.Expression]:
"""
Helper function to recursively get all arguments from a cvxpy expression
:param expression: input cvxpy expression
:type expression: cp.Expression
:return: a list of cvxpy arguments
:rtype: List[cp.Expression]
"""
if expression.args == []:
return [expression]
else:
return list(_flatten([_get_all_args(arg) for arg in expression.args]))
def _flatten(alist: Iterable) -> Iterable:
# Helper method to flatten an iterable
for v in alist:
if isinstance(v, Iterable) and not isinstance(v, (str, bytes)):
yield from _flatten(v)
else:
yield v