Portfolio 源代码

""""""  #
"""
Copyright (c) 2020-2023, Dany Cajas
All rights reserved.
This work is licensed under BSD 3-Clause "New" or "Revised" License.
License available at https://github.com/dcajasn/Riskfolio-Lib/blob/master/LICENSE.txt
"""

import numpy as np
import pandas as pd
import cvxpy as cp
import scipy.stats as st
from scipy.linalg import sqrtm
import riskfolio.src.RiskFunctions as rk
import riskfolio.src.ParamsEstimation as pe
import riskfolio.src.AuxFunctions as af
import riskfolio.src.OwaWeights as owa
import riskfolio.external.cppfunctions as cf


__all__ = [
    "Portfolio",
]


[文档] class Portfolio(object): r""" Class that creates a portfolio object with all properties needed to calculate optimal portfolios. Parameters ---------- returns : DataFrame, optional A dataframe that containts the returns of the assets. The default is None. sht : bool, optional Indicate if the portfolio consider short positions (negative weights). The default is False. uppersht : float, optional Indicate the maximum value of the sum of absolute values of short positions (negative weights). The default is 0.2. upperlng : float, optional Indicate the maximum value of the sum of long positions (positive weights). When sht=True, the difference between upperlng and uppersht must be equal to the budget (upperlng - uppersht = budget) The default is 1. budget : float, optional Indicate the maximum value of the sum of long positions (positive weights) and short positions (negative weights). The default is 1. nea : int, optional Indicate the minimum number of effective assets (NEA) used in portfolio. This value is the inverse of Herfindahl-Hirschman index of portfolio's weights. The default is None. card : int, optional Indicate the maximum number of assets used in portfolio. It requires a solver that supports Mixed Integer Programs (MIP), see `Solvers <https://www.cvxpy.org/tutorial/advanced/index.html#solve-method-options>`_ for more details. This constraint is based on :cite:`a-YUE2014949`. The default is None. factors : DataFrame, optional A dataframe that containts the returns of the factors. The default is None. B : DataFrame, optional A dataframe that containts the loadings matrix. The default is None. alpha : float, optional Significance level of CVaR, EVaR, CDaR, EDaR and Tail Gini of losses. The default is 0.05. a_sim : float, optional Number of CVaRs used to approximate Tail Gini of losses. The default is 100. beta : float, optional Significance level of CVaR and Tail Gini of gains. If None it duplicates alpha value. The default is None. b_sim : float, optional Number of CVaRs used to approximate Tail Gini of gains. If None it duplicates a_sim value. The default is None. kappa : float, optional Deformation parameter of RLVaR and RLDaR, must be between 0 and 1. The default is 0.30. n_max_kurt : int, optional Maximum number of assets to use Kurtosis model based on semidefinte formulation. If number of assets is higher than n_max_kurt, it uses relaxed kurtosis model based on second order cone. The default is 50. kindbench : bool, optional True if the benchmark is a portfolio with detailed weights and False if the benchmark is an index. The default is True. allowTO : bool, optional Indicate if there is turnover constraints. The default is False. turnover : float, optional The maximum limit of turnover deviations. The default is 0.05. allowTE : bool, optional Indicate if there is tracking error constraints.. The default is False. TE : float, optional The maximum limit of tracking error deviations. The default is 0.05. benchindex : DataFrame, optional A dataframe that containts the returns of an index. If kindbench is False the tracking error constraints are calculated respect to this index. The default is None. benchweights : DataFrame, optional A dataframe that containts the weights of an index. The default is the equally weighted portfolio 1/N. ainequality : nd-array, optional The matrix :math:`A` of the linear constraint :math:`A \geq B`. The default is None. binequality : 1d-array, optional The matrix :math:`B` of the linear constraint :math:`A \geq B`. The default is None. b : 1d-array, optional The risk budgeting constraint vector. The default is None. lowerret : float, optional Constraint on min level of expected return. The default is None. upperdev : float, optional Constraint on max level of standard deviation. The default is None. upperkt : float, optional Constraint on max level of square root kurtosis. The default is None. uppermad : float, optional Constraint on max level of MAD. The default is None. uppergmd : float, optional Constraint on max level of GMD. The default is None. uppersdev : float, optional Constraint on max level of semi standard deviation. The default is None. upperskt : float, optional Constraint on max level of square root semi kurtosis. The default is None. upperflpm : float, optional Constraint on max level of first lower partial moment. The default is None. upperslpm : float, optional Constraint on max level of second lower partial moment. The default is None. upperCVaR : float, optional Constraint on max level of conditional value at risk (CVaR). The default is None. uppertg : float, optional Constraint on max level of Tail Gini. The default is None. upperEVaR : float, optional Constraint on max level of entropic value at risk (EVaR). The default is None. upperRLVaR : float, optional Constraint on max level of relativistic value at risk (RLVaR). The default is None. upperwr : float, optional Constraint on max level of worst realization. The default is None. upperrg : float, optional Constraint on max level of range. The default is None. uppercvrg : float, optional Constraint on max level of CVaR range. The default is None. uppertgrg : float, optional Constraint on max level of Tail Gini range. The default is None. uppermdd : float, optional Constraint on max level of maximum drawdown of uncompounded cumulative returns. The default is None. upperadd : float, optional Constraint on max level of average drawdown of uncompounded cumulative returns. The default is None. upperCDaR : float, optional Constraint on max level of conditional drawdown at risk (CDaR) of uncompounded cumulative returns. The default is None. upperEDaR : float, optional Constraint on max level of entropic drawdown at risk (EDaR) of uncompounded cumulative returns. The default is None. upperRLDaR : float, optional Constraint on max level of relativistic drawdown at risk (RLDaR) of uncompounded cumulative returns. The default is None. upperuci : float, optional Constraint on max level of ulcer index (UCI) of uncompounded cumulative returns. The default is None. """ def __init__( self, returns=None, sht=False, uppersht=0.2, upperlng=1, budget=1, nea=None, card=None, factors=None, B=None, alpha=0.05, a_sim=100, beta=None, b_sim=None, kappa=0.30, n_max_kurt=50, kindbench=True, allowTO=False, turnover=0.05, allowTE=False, TE=0.05, benchindex=None, benchweights=None, ainequality=None, binequality=None, b=None, lowerret=None, upperdev=None, upperkt=None, uppermad=None, uppergmd=None, uppersdev=None, upperskt=None, upperflpm=None, upperslpm=None, upperCVaR=None, uppertg=None, upperEVaR=None, upperRLVaR=None, upperwr=None, uppercvrg=None, uppertgrg=None, upperrg=None, uppermdd=None, upperadd=None, upperCDaR=None, upperEDaR=None, upperRLDaR=None, upperuci=None, ): # Optimization Models Options self._returns = returns self.sht = sht self.uppersht = uppersht self.upperlng = upperlng self.budget = budget self.nea = nea self.card = card self._factors = factors self.alpha = alpha self.a_sim = a_sim self.beta = beta self.b_sim = b_sim self._kappa = kappa self.n_max_kurt = n_max_kurt self.kindbench = kindbench self.benchindex = benchindex self._benchweights = benchweights self._ainequality = ainequality self._binequality = binequality self._b = b self.lowerret = lowerret self.upperdev = upperdev self.upperkt = upperkt self.uppermad = uppermad self.uppergmd = uppergmd self.uppersdev = uppersdev self.upperskt = upperskt self.upperflpm = upperflpm self.upperslpm = upperslpm self.upperCVaR = upperCVaR self.uppertg = uppertg self.upperEVaR = upperEVaR self.upperRLVaR = upperRLVaR self.upperwr = upperwr self.uppercvrg = uppercvrg self.uppertgrg = uppertgrg self.upperrg = upperrg self.uppermdd = uppermdd self.upperadd = upperadd self.upperCDaR = upperCDaR self.upperEDaR = upperEDaR self.upperRLDaR = upperRLDaR self.upperuci = upperuci self.allowTO = allowTO self.turnover = turnover self.allowTE = allowTE self.TE = TE # Inputs of Optimization Models self.mu = None self.cov = None self.kurt = None self.skurt = None self.L_2 = None self.S_2 = None self.mu_f = None self.cov_f = None self._B = None self.mu_fm = None self.cov_fm = None self.mu_bl = None self.cov_bl = None self.mu_bl_fm = None self.cov_bl_fm = None self.returns_fm = None self.z_EVaR = None self.z_EDaR = None self.z_RLVaR = None self.z_RLDaR = None # Inputs of Worst Case Optimization Models self.cov_l = None self.cov_u = None self.cov_mu = None self.cov_sigma = None self.d_mu = None self.k_mu = None self.k_sigma = None # Optimal portfolios self.optimal = None self.rp_optimal = None self.rrp_optimal = None self.wc_optimal = None self.limits = None self.frontier = None # Solver params self.solvers = ["ECOS", "SCS", "OSQP", "CVXOPT"] self.sol_params = { # 'ECOS': {"max_iters": 500, "abstol": 1e-8}, # 'SCS': {"max_iters": 2500, "eps": 1e-5}, # 'OSQP': {"max_iter": 10000, "eps_abs": 1e-8}, # 'CVXOPT': {"max_iters": 500, "abstol": 1e-8}, } @property def returns(self): if self._returns is not None and isinstance(self._returns, pd.DataFrame): return self._returns else: raise NameError("returns must be a DataFrame") @returns.setter def returns(self, value): if value is not None and isinstance(value, pd.DataFrame): self._returns = value else: raise NameError("returns must be a DataFrame") @property def assetslist(self): if self._returns is not None and isinstance(self._returns, pd.DataFrame): return self._returns.columns.tolist() elif self._returns is None: return None @property def numassets(self): if self._returns is not None and isinstance(self._returns, pd.DataFrame): return self._returns.shape[1] @property def factors(self): return self._factors @factors.setter def factors(self, value): a = value if a is not None and isinstance(a, pd.DataFrame): if self.returns.index.equals(a.index): self._factors = a else: raise NameError("factors must be a DataFrame") @property def factorslist(self): if self._factors is not None and isinstance(self._factors, pd.DataFrame): return self._factors.columns.tolist() elif self._factors is None: return None @property def B(self): return self._B @B.setter def B(self, value): a = value if a is not None and isinstance(a, pd.DataFrame): self._B = a else: raise NameError("loadings matrix must be a DataFrame") @property def benchweights(self): n = self.numassets if self._benchweights is not None: if self._benchweights.shape[0] == n and self._benchweights.shape[1] == 1: a = self._benchweights else: raise NameError("Weights must have a size of shape (n_assets,1)") else: a = np.array(np.ones((n, 1)) / n) return a @benchweights.setter def benchweights(self, value): a = value n = self.numassets if a is not None: if a.shape[0] == n and a.shape[1] == 1: a = a else: raise NameError("Weights must have a size of shape (n_assets,1)") else: a = np.array(np.ones((n, 1)) / n) self._benchweights = a @property def ainequality(self): a = self._ainequality if a is not None: if a.shape[1] == self.numassets: a = a else: raise NameError( "The array ainequality must have the same number of columns than assets' number" ) return a @ainequality.setter def ainequality(self, value): a = value if a is not None: if a.shape[1] == self.numassets: a = a else: raise NameError( "The matrix ainequality must have the same number of columns than assets' number" ) self._ainequality = a @property def binequality(self): a = self._binequality if a is not None: if a.shape[1] == 1: a = a else: raise NameError("The matrix binequality must have one column") return a @binequality.setter def binequality(self, value): a = value if a is not None: if a.shape[1] == 1: a = a else: raise NameError("The matrix binequality must have one column") self._binequality = a @property def b(self): a = self._b if a is not None: if a.shape[0] == self.numassets and a.shape[1] == 1: pass elif a.shape[0] == 1 and a.shape[1] == self.numassets: a = a.T else: raise NameError( "The vector of risk contribution constraints must have a size equal than the assets' number" ) return a @b.setter def b(self, value): a = value if a is not None: if a.shape[0] == self.numassets and a.shape[1] == 1: pass elif a.shape[0] == 1 and a.shape[1] == self.numassets: a = a.T else: raise NameError( "The vector of risk contribution constraints must have a size equal than the assets' number" ) self._b = a @property def kappa(self): return self._kappa @kappa.setter def kappa(self, value): a = value if a >= 1: print( "kappa must be between 0 and 1, values higher or equal to 1 are setting to 0.99" ) self._kappa = 0.99 elif a <= 0: print( "kappa must be between 0 and 1, values lower or equal to 0 are setting to 0.01" ) self._kappa = 0.01 else: self._kappa = a
[文档] def assets_stats( self, method_mu="hist", method_cov="hist", method_kurt=None, d=0.94, **kwargs ): r""" Calculate the inputs that will be used by the optimization method when we select the input model='Classic'. Parameters ---------- method_mu : str, optional The method used to estimate the expected returns. The default value is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'JS': James-Stein estimator. For more information see :cite:`a-Meucci2005` and :cite:`a-Feng2016`. - 'BS': Bayes-Stein estimator. For more information see :cite:`a-Jorion1986`. - 'BOP': BOP estimator. For more information see :cite:`a-Bodnar2019`. method_cov : str, optional The method used to estimate the covariance matrix. The default is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ledoit': use the Ledoit and Wolf Shrinkage method. - 'oas': use the Oracle Approximation Shrinkage method. - 'shrunk': use the basic Shrunk Covariance method. - 'gl': use the basic Graphical Lasso Covariance method. - 'jlogo': use the j-LoGo Covariance method. For more information see: :cite:`a-jLogo`. - 'fixed': denoise using fixed method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'spectral': denoise using spectral method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'shrink': denoise using shrink method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'gerber1': use the Gerber statistic 1. For more information see: :cite:`a-Gerber2021`. - 'gerber2': use the Gerber statistic 2. For more information see: :cite:`a-Gerber2021`. method_kurt : str, optional The method used to estimate the kurtosis square matrix: The default is None. Possible values are: - None: do not calculate kurtosis square matrix. - 'hist': use historical estimates. For more information see :cite:`a-Cajas4`. - 'semi': use semi cokurtosis square matrix. For more information see :cite:`a-Cajas4`. - 'fixed': denoise using fixed method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'spectral': denoise using spectral method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'shrink': denoise using shrink method. For more information see chapter 2 of :cite:`a-MLforAM`. **kwargs : dict All additional parameters of mean_vector and covar_matrix functions. See Also -------- riskfolio.src.ParamsEstimation.mean_vector riskfolio.src.ParamsEstimation.covar_matrix riskfolio.src.ParamsEstimation.cokurt_matrix """ self.mu = pe.mean_vector(self.returns, method=method_mu, d=d) self.cov = pe.covar_matrix(self.returns, method=method_cov, **kwargs) value = af.is_pos_def(self.cov, threshold=1e-8) for i in range(5): if value == False: try: self.cov = af.cov_fix(self.cov, method="clipped", threshold=1e-5) value = af.is_pos_def(self.cov, threshold=1e-8) except: break else: break if value == False: print("You must convert self.cov to a positive definite matrix") if method_kurt not in [None, "semi"]: T, N = self.returns.shape self.L_2 = cf.duplication_elimination_matrix(N) self.S_2 = cf.duplication_summation_matrix(N) self.kurt = pe.cokurt_matrix(self.returns, method=method_kurt, **kwargs) value = af.is_pos_def(self.kurt, threshold=1e-8) for i in range(5): if value == False: try: self.kurt = af.cov_fix( self.kurt, method="clipped", threshold=1e-5 ) value = af.is_pos_def(self.kurt, threshold=1e-8) except: break else: break if value == False: print("You must convert self.kurt to a positive definite matrix") self.skurt = pe.cokurt_matrix(self.returns, method="semi") value = af.is_pos_def(self.skurt, threshold=1e-8) for i in range(5): if value == False: try: self.skurt = af.cov_fix( self.skurt, method="clipped", threshold=1e-5 ) value = af.is_pos_def(self.skurt, threshold=1e-8) except: break else: break if value == False: print("You must convert self.skurt to a positive definite matrix") else: self.kurt = None self.skurt = None self.L_2 = None self.S_2 = None
[文档] def blacklitterman_stats( self, P, Q, rf=0, w=None, delta=None, eq=True, method_mu="hist", method_cov="hist", **kwargs, ): r""" Calculate the inputs that will be used by the optimization method when we select the input model='BL'. Parameters ---------- P : DataFrame of shape (n_views, n_assets) Analyst's views matrix, can be relative or absolute. Q: DataFrame of shape (n_views, 1) Expected returns of analyst's views. delta: float Risk aversion factor. The default value is 1. rf: scalar, optional Risk free rate. The default is 0. w : DataFrame of shape (n_assets, 1) Weights matrix, where n_assets is the number of assets. The default is None. eq: bool, optional Indicates if use equilibrium or historical excess returns. The default is True. method_mu : str, optional The method used to estimate the expected returns. The default value is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'JS': James-Stein estimator. For more information see :cite:`a-Meucci2005` and :cite:`a-Feng2016`. - 'BS': Bayes-Stein estimator. For more information see :cite:`a-Jorion1986`. - 'BOP': BOP estimator. For more information see :cite:`a-Bodnar2019`. method_cov : str, optional The method used to estimate the covariance matrix. The default is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ledoit': use the Ledoit and Wolf Shrinkage method. - 'oas': use the Oracle Approximation Shrinkage method. - 'shrunk': use the basic Shrunk Covariance method. - 'gl': use the basic Graphical Lasso Covariance method. - 'jlogo': use the j-LoGo Covariance method. For more information see: :cite:`a-jLogo`. - 'fixed': denoise using fixed method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'spectral': denoise using spectral method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'shrink': denoise using shrink method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'gerber1': use the Gerber statistic 1. For more information see: :cite:`a-Gerber2021`. - 'gerber2': use the Gerber statistic 2. For more information see: :cite:`a-Gerber2021`. **kwargs : dict Other variables related to the covariance estimation. See Also -------- riskfolio.src.ParamsEstimation.black_litterman """ X = self.returns if w is None: w = np.array(self.benchweights, ndmin=2) if delta is None: a = np.array(self.mu, ndmin=2) @ np.array(w, ndmin=2) delta = (a - rf) / ( np.array(w, ndmin=2).T
[文档] @ np.array(self.cov, ndmin=2) @ np.array(w, ndmin=2) ) delta = delta.item() mu, cov, w = pe.black_litterman( X=X, w=w, P=P, Q=Q, delta=delta, rf=rf, eq=eq, method_mu=method_mu, method_cov=method_cov, **kwargs, ) self.mu_bl = mu self.cov_bl = cov value = af.is_pos_def(self.cov_bl, threshold=1e-8) for i in range(5): if value == False: try: self.cov_bl = af.cov_fix( self.cov_bl, method="clipped", threshold=1e-5 ) value = af.is_pos_def(self.cov_bl, threshold=1e-8) except: break else: break if value == False: print("You must convert self.cov_bl to a positive definite matrix")
def factors_stats( self, method_mu="hist", method_cov="hist", d=0.94, B=None, dict_cov={}, dict_risk={}, ): r""" Calculate the inputs that will be used by the optimization method when we select the input model='FM'. Parameters ---------- method_mu : str, optional The method used to estimate the expected returns. The default value is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'JS': James-Stein estimator. For more information see :cite:`a-Meucci2005` and :cite:`a-Feng2016`. - 'BS': Bayes-Stein estimator. For more information see :cite:`a-Jorion1986`. - 'BOP': BOP estimator. For more information see :cite:`a-Bodnar2019`. method_cov : str, optional The method used to estimate the covariance matrix. The default is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ledoit': use the Ledoit and Wolf Shrinkage method. - 'oas': use the Oracle Approximation Shrinkage method. - 'shrunk': use the basic Shrunk Covariance method. - 'gl': use the basic Graphical Lasso Covariance method. - 'jlogo': use the j-LoGo Covariance method. For more information see: :cite:`a-jLogo`. - 'fixed': denoise using fixed method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'spectral': denoise using spectral method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'shrink': denoise using shrink method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'gerber1': use the Gerber statistic 1. For more information see: :cite:`a-Gerber2021`. - 'gerber2': use the Gerber statistic 2. For more information see: :cite:`a-Gerber2021`. dict_cov : dict Other variables related to the covariance estimation. dict_risk : dict Other variables related of risk_factors function. See Also -------- riskfolio.src.ParamsEstimation.forward_regression riskfolio.src.ParamsEstimation.backward_regression riskfolio.src.ParamsEstimation.loadings_matrix riskfolio.src.ParamsEstimation.risk_factors """ X = self.factors Y = self.returns mu_f = pe.mean_vector(self.returns, method=method_mu, d=d) cov_f = pe.covar_matrix(self.returns, method=method_cov, d=d, **dict_cov) self.mu_f = mu_f self.cov_f = cov_f value = af.is_pos_def(self.cov_f, threshold=1e-8) if value == False: try: self.cov = af.cov_fix(self.cov, method="clipped", threshold=1e-5) value = af.is_pos_def(self.cov, threshold=1e-8) if value == False: print("You must convert self.cov to a positive definite matrix") except: print("You must convert self.cov to a positive definite matrix") mu, cov, returns = pe.risk_factors( X, Y, B=B, method_mu=method_mu, method_cov=method_cov, **dict_risk ) self.mu_fm = mu self.cov_fm = cov self.returns_fm = returns value = af.is_pos_def(self.cov_fm, threshold=1e-8) for i in range(5): if value == False: try: self.cov_fm = af.cov_fix( self.cov_fm, method="clipped", threshold=1e-5 ) value = af.is_pos_def(self.cov_fm, threshold=1e-8) except: break else: break if value == False: print("You must convert self.cov_fm to a positive definite matrix")
[文档] def blfactors_stats( self, flavor="BLB", B=None, P=None, Q=None, P_f=None, Q_f=None, rf=0, w=None, delta=None, eq=True, const=False, diag=False, method_mu="hist", method_cov="hist", kwargs_1=None, kwargs_2=None, ): r""" Calculate the inputs that will be used by the optimization method when we select the input model='BL'. Parameters ---------- flavor : str Model used, can be 'BLB' for Black Litterman Bayesian or 'ABL' for Augmented Black Litterman. The default value is 'BLB'. B : DataFrame of shape (n_assets, n_features) Loadings matrix. The default value is None. P : DataFrame of shape (n_views, n_assets) Analyst's views matrix, can be relative or absolute. Q: DataFrame of shape (n_views, 1) Expected returns of analyst's views. P_f : DataFrame of shape (n_views, n_assets) Analyst's factors views matrix, can be relative or absolute. Q_f: DataFrame of shape (n_views, 1) Expected returns of analyst's factors views. delta: float Risk aversion factor. The default value is 1. rf: scalar, optional Risk free rate. The default is 0. w : DataFrame of shape (n_assets, 1) Weights matrix, where n_assets is the number of assets. The default is None. eq: bool, optional Indicates if use equilibrium or historical excess returns. The default is True. const : bool, optional Indicate if the loadings matrix has a constant. The default is False. diag : bool, optional Indicate if we use the diagonal matrix to calculate covariance matrix of factor model, only useful when we work with a factor model based on a regresion model (only equity portfolio). The default is False. method_mu : str, optional The method used to estimate the expected returns. The default value is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'JS': James-Stein estimator. For more information see :cite:`a-Meucci2005` and :cite:`a-Feng2016`. - 'BS': Bayes-Stein estimator. For more information see :cite:`a-Jorion1986`. - 'BOP': BOP estimator. For more information see :cite:`a-Bodnar2019`. method_cov : str, optional The method used to estimate the covariance matrix: The default is 'hist'. Possible values are: - 'hist': use historical estimates. - 'ewma1'': use ewma with adjust=True, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ewma2': use ewma with adjust=False, see `EWM <https://pandas.pydata.org/pandas-docs/stable/user_guide/computation.html#exponentially-weighted-windows>`_ for more details. - 'ledoit': use the Ledoit and Wolf Shrinkage method. - 'oas': use the Oracle Approximation Shrinkage method. - 'shrunk': use the basic Shrunk Covariance method. - 'gl': use the basic Graphical Lasso Covariance method. - 'jlogo': use the j-LoGo Covariance method. For more information see: :cite:`a-jLogo`. - 'fixed': denoise using fixed method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'spectral': denoise using spectral method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'shrink': denoise using shrink method. For more information see chapter 2 of :cite:`a-MLforAM`. - 'gerber1': use the Gerber statistic 1. For more information see: :cite:`a-Gerber2021`. - 'gerber2': use the Gerber statistic 2. For more information see: :cite:`a-Gerber2021`. kwargs_1 : dict Other variables related to the loadings matrix estimation. kwargs_2 : dict Other variables related to the factors Black Litterman model selected. See Also -------- riskfolio.src.ParamsEstimation.augmented_black_litterman riskfolio.src.ParamsEstimation.black_litterman_bayesian """ X = self.returns F = self.factors if w is None: w = np.array(self.benchweights, ndmin=2) if delta is None: a = np.array(self.mu, ndmin=2) @ np.array(w, ndmin=2) delta = (a - rf) / ( np.array(w, ndmin=2).T
[文档] @ np.array(self.cov, ndmin=2) @ np.array(w, ndmin=2) ) delta = delta.item() if B is None: if self.B is None: self.B = pe.loadings_matrix(X=F, Y=X, **kwargs_1) const = True elif self.B is not None: pass elif B is not None: self.B = B if flavor == "BLB": if isinstance(kwargs_1, dict): mu, cov, w = pe.black_litterman_bayesian( X=X, F=F, B=self.B, P_f=P_f, Q_f=Q_f, delta=delta, rf=rf, eq=eq, const=const, diag=diag, method_mu=method_mu, method_cov=method_cov, **kwargs_2, ) else: mu, cov, w = pe.black_litterman_bayesian( X=X, F=F, B=self.B, P_f=P_f, Q_f=Q_f, delta=delta, rf=rf, eq=eq, const=const, diag=diag, method_mu=method_mu, method_cov=method_cov, ) elif flavor == "ABL": if isinstance(kwargs_1, dict): mu, cov, w = pe.augmented_black_litterman( X=X, w=w, F=F, B=self.B, P=P, Q=Q, P_f=P_f, Q_f=Q_f, delta=delta, rf=rf, eq=eq, const=const, method_mu=method_mu, method_cov=method_cov, **kwargs_2, ) else: mu, cov, w = pe.augmented_black_litterman( X=X, w=w, F=F, B=self.B, P=P, Q=Q, P_f=P_f, Q_f=Q_f, delta=delta, rf=rf, eq=eq, const=const, method_mu=method_mu, method_cov=method_cov, ) self.mu_bl_fm = mu self.cov_bl_fm = cov value = af.is_pos_def(self.cov_bl_fm, threshold=1e-8) for i in range(5): if value == False: try: self.cov_bl_fm = af.cov_fix( self.cov_bl_fm, method="clipped", threshold=1e-5 ) value = af.is_pos_def(self.cov_bl_fm, threshold=1e-8) except: break else: break if value == False: print("You must convert self.cov_bl_fm to a positive definite matrix")
def wc_stats( self, box="s", ellip="s", q=0.05, n_sim=3000, window=3, dmu=0.1, dcov=0.1, seed=0, ): r""" Calculate the inputs that will be used by the wc_optimization method. Parameters ---------- box : string The method used to estimate the box uncertainty sets. The default is 's'. Possible values are: - 's': stationary bootstrapping method. - 'c': circular bootstrapping method. - 'm': moving bootstrapping method. - 'n': assuming normal returns to calculate confidence levels. - 'd': delta method, this method increase and decrease by a percentage. ellip : string The method used to estimate the elliptical uncertainty sets. The default is 's'. Possible values are: - 's': stationary bootstrapping method. - 'c': circular bootstrapping method. - 'm': moving bootstrapping method. - 'n': assuming normal returns to calculate confidence levels. q : scalar Significance level of the selected bootstrapping method. The default is 0.05. n_sim : scalar Number of simulations of the bootstrapping method. The default is 3000. window: Block size of the bootstrapping method. Must be greather than 1 and lower than the n_samples - n_features + 1 The default is 3. dmu : scalar Percentage used by delta method to increase and decrease the mean vector in box constraints. The default is 0.1. dcov : scalar Percentage used by delta method to increase and decrease the covariance matrix in box constraints. The default is 0.1. See Also -------- riskfolio.src.ParamsEstimation.bootstrapping """ if box not in list("scmdn"): raise ValueError("box only can be 's', 'c', 'm', 'd' or 'n'") if ellip not in list("scmn"): raise ValueError("box only can be 's', 'c', 'm' or 'n'") X = self.returns cols = X.columns.tolist() cols_2 = [i + "-" + j for i in cols for j in cols] (T, N) = X.shape mu = X.mean().to_frame().T cov = X.cov() if box == "s": mu_l, mu_u, cov_l, cov_u, _, _ = pe.bootstrapping( X, kind="stationary", q=q, n_sim=n_sim, window=window, seed=seed ) d_mu = (mu_u - mu_l) / 2 elif box == "c": mu_l, mu_u, cov_l, cov_u, _, _ = pe.bootstrapping( X, kind="circular", q=q, n_sim=n_sim, window=window, seed=seed ) d_mu = (mu_u - mu_l) / 2 elif box == "m": mu_l, mu_u, cov_l, cov_u, _, _ = pe.bootstrapping( X, kind="moving", q=q, n_sim=n_sim, window=window, seed=seed ) d_mu = (mu_u - mu_l) / 2 elif box == "n": # Defining confidence level of mean vector assuming normal returns d_mu = st.norm.ppf(1 - q / 2) * np.sqrt(np.diag(cov) / T).reshape(1, -1) d_mu = pd.DataFrame(d_mu, index=[0], columns=cols) # Defining confidence level of covariance matrix assuming normal returns rs = np.random.RandomState(seed=seed) A = st.wishart.rvs(T, cov / T, size=10000, random_state=rs) cov_l = np.percentile(A, q=q / 2, axis=0) cov_u = np.percentile(A, q=1 - q / 2, axis=0) cov_l = pd.DataFrame(cov_l, index=cols, columns=cols) cov_u = pd.DataFrame(cov_u, index=cols, columns=cols) if af.is_pos_def(cov_l) == False: cov_l = af.cov_fix(cov_l, method="clipped", threshold=1e-3) if af.is_pos_def(cov_u) == False: cov_u = af.cov_fix(cov_u, method="clipped", threshold=1e-3) elif box == "d": d_mu = dmu * np.abs(mu) cov_l = cov - dcov * np.abs(cov) cov_u = cov + dcov * np.abs(cov) d_mu = pd.DataFrame(d_mu, index=[0], columns=cols) cov_l = pd.DataFrame(cov_l, index=cols, columns=cols) cov_u = pd.DataFrame(cov_u, index=cols, columns=cols) if ellip == "s": _, _, _, _, cov_mu, cov_sigma = pe.bootstrapping( X, kind="stationary", q=q, n_sim=n_sim, window=window, seed=seed ) elif ellip == "c": _, _, _, _, cov_mu, cov_sigma = pe.bootstrapping( X, kind="circular", q=q, n_sim=n_sim, window=window, seed=seed ) elif ellip == "m": _, _, _, _, cov_mu, cov_sigma = pe.bootstrapping( X, kind="moving", q=q, n_sim=n_sim, window=window, seed=seed ) elif ellip == "n": # Covariance of mean returns cov_mu = cov / T cov_mu = np.diag(np.diag(cov_mu)) cov_mu = pd.DataFrame(cov_mu, index=cols, columns=cols) # Covariance of covariance matrix K = af.commutation_matrix(cov) I = np.identity(N * N) cov_sigma = T * (I + K) @ np.kron(cov_mu, cov_mu) cov_sigma = np.diag(np.diag(cov_sigma)) cov_sigma = pd.DataFrame(cov_sigma, index=cols_2, columns=cols_2) k_mu = st.chi2.ppf(1 - q, df=N) ** 0.5 k_sigma = st.chi2.ppf(1 - q, df=N * N) ** 0.5 self.cov_l = cov_l self.cov_u = cov_u self.cov_mu = cov_mu self.cov_sigma = cov_sigma self.d_mu = d_mu self.k_mu = k_mu self.k_sigma = k_sigma
[文档] def optimization( self, model="Classic", rm="MV", obj="Sharpe", kelly=False, rf=0, l=2, hist=True ): r""" This method that calculates the optimal portfolio according to the optimization model selected by the user. The general problem that solves is: .. math:: \begin{align} &\underset{w}{\text{optimize}} & & F(w)\\ &\text{s. t.} & & Aw \geq B\\ & & & \phi_{i}(w) \leq c_{i}\\ \end{align} Where: :math:`F(w)` is the objective function. :math:`Aw \geq B` is a set of linear constraints. :math:`\phi_{i}(w) \leq c_{i}` are constraints on maximum values of several risk measures. Parameters ---------- model : str can be {'Classic', 'BL', 'FM' or 'BLFM'} The model used for optimize the portfolio. The default is 'Classic'. Possible values are: - 'Classic': use estimates of expected return vector and covariance matrix that depends on historical data. - 'BL': use estimates of expected return vector and covariance matrix based on the Black Litterman model. - 'FM': use estimates of expected return vector and covariance matrix based on a Risk Factor model specified by the user. - 'BLFM': use estimates of expected return vector and covariance matrix based on Black Litterman applied to a Risk Factor model specified by the user. rm : str, optional The risk measure used to optimize the portfolio. The default is 'MV'. Possible values are: - 'MV': Standard Deviation. - 'KT': Square Root of Kurtosis. - 'MAD': Mean Absolute Deviation. - 'GMD': Gini Mean Difference. - 'MSV': Semi Standard Deviation. - 'SKT': Square Root of Semi Kurtosis. - 'FLPM': First Lower Partial Moment (Omega Ratio). - 'SLPM': Second Lower Partial Moment (Sortino Ratio). - 'CVaR': Conditional Value at Risk. - 'TG': Tail Gini. - 'EVaR': Entropic Value at Risk. - 'RLVaR': Relativistic Value at Risk. - 'WR': Worst Realization (Minimax). - 'RG': Range of returns. - 'CVRG': CVaR range of returns. - 'TGRG': Tail Gini range of returns. - 'MDD': Maximum Drawdown of uncompounded cumulative returns (Calmar Ratio). - 'ADD': Average Drawdown of uncompounded cumulative returns. - 'CDaR': Conditional Drawdown at Risk of uncompounded cumulative returns. - 'EDaR': Entropic Drawdown at Risk of uncompounded cumulative returns. - 'RLDaR': Relativistic Drawdown at Risk of uncompounded cumulative returns. - 'UCI': Ulcer Index of uncompounded cumulative returns. obj : str can be {'MinRisk', 'Utility', 'Sharpe' or 'MaxRet'}. Objective function of the optimization model. The default is 'Sharpe'. Possible values are: - 'MinRisk': Minimize the selected risk measure. - 'Utility': Maximize the Utility function :math:`\mu w - l \phi_{i}(w)`. - 'Sharpe': Maximize the risk adjusted return ratio based on the selected risk measure. - 'MaxRet': Maximize the expected return of the portfolio. kelly : str, optional Method used to calculate mean return. Possible values are False for arithmetic mean return, "approx" for approximate mean logarithmic return using first and second moment and "exact" for mean logarithmic return. The default is False. rf : float, optional Risk free rate, must be in the same period of assets returns. The default is 0. l : scalar, optional Risk aversion factor of the 'Utility' objective function. The default is 2. hist : bool, optional Indicate what kind of returns are used to calculate risk measures that depends on scenarios (All except 'MV' risk measure). If model = 'BL', True means historical covariance and returns and False Black Litterman covariance and historical returns. If model = 'FM', True means historical covariance and returns and False Risk Factor model for covariance and returns. If model = 'BL_FM', True means historical covariance and returns, False Black Litterman with Risk Factor model for covariance and Risk Factor model for returns, and '2' Risk Factor model for covariance and returns. The default is True. Returns ------- w : DataFrame The weights of optimal portfolio. """ # General model Variables mu = None sigma = None returns = None if model == "Classic": mu = np.array(self.mu, ndmin=2) sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "FM": mu = np.array(self.mu_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "BL": mu = np.array(self.mu_bl, ndmin=2) if hist == False: sigma = np.array(self.cov_bl, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "BL_FM": mu = np.array(self.mu_bl_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_bl_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif hist == 2: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) # General Model Variables returns = np.array(returns, ndmin=2) w = cp.Variable((mu.shape[1], 1)) k = cp.Variable((1, 1)) rf0 = rf T, N = returns.shape gr = cp.Variable((T, 1)) # MV Model Variables g = cp.Variable(nonneg=True) G = sqrtm(sigma) risk1 = g**2 devconstraints = [cp.SOC(g, G.T @ w)] # Return Variables if model == "Classic": if kelly == "exact": if obj == "Sharpe": ret = 1 / T * cp.sum(gr) - rf0 * k else: ret = 1 / T * cp.sum(cp.log(1 + returns @ w)) elif kelly == "approx": if obj == "Sharpe": ret = mu @ w - 0.5 * cp.quad_over_lin(g, k) else: ret = mu @ w - 0.5 * g**2 elif kelly == False: ret = mu @ w else: ret = mu @ w # MAD Model Variables madmodel = False Y = cp.Variable((T, 1)) u = np.repeat(mu, T, axis=0) a = returns - u risk2 = cp.sum(Y) / T # madconstraints=[a @ w >= -Y, a @ w <= Y, Y >= 0] madconstraints = [a @ w * 1000 >= -Y * 1000, Y * 1000 >= 0] # Semi Variance Model Variables risk3 = cp.norm(Y, "fro") / cp.sqrt(T - 1) # CVaR Model Variables VaR = cp.Variable((1, 1)) alpha = self.alpha X = returns @ w Z1 = cp.Variable((T, 1)) risk4 = VaR + 1 / (alpha * T) * cp.sum(Z1) cvarconstraints = [Z1 * 1000 >= 0, Z1 * 1000 >= -X * 1000 - VaR * 1000] # Worst Realization (Minimax) Model Variables M = cp.Variable((1, 1)) risk5 = M wrconstraints = [-X <= M] # Lower Partial Moment Variables lpmmodel = False lpm = cp.Variable((T, 1)) lpmconstraints = [lpm * 1000 >= 0] if obj == "Sharpe": lpmconstraints += [lpm * 1000 >= rf0 * k * 1000 - X * 1000] else: lpmconstraints += [lpm * 1000 >= rf0 * 1000 - X * 1000] # First Lower Partial Moment (Omega) Model Variables risk6 = cp.sum(lpm) / T # Second Lower Partial Moment (Sortino) Model Variables risk7 = cp.norm(lpm, "fro") / cp.sqrt(T - 1) # Drawdown Model Variables drawdown = False U = cp.Variable((T + 1, 1)) ddconstraints = [ U[1:] * 1000 >= U[:-1] * 1000 - X * 1000, U[1:] * 1000 >= 0, U[0] * 1000 == 0, ] # Maximum Drawdown Model Variables MDD = cp.Variable((1, 1)) risk8 = MDD mddconstraints = [MDD >= U[1:]] # Average Drawdown Model Variables risk9 = 1 / T * cp.sum(U[1:]) # Conditional Drawdown Model Variables DaR = cp.Variable((1, 1)) Zd1 = cp.Variable((T, 1)) risk10 = DaR + 1 / (alpha * T) * cp.sum(Zd1) cdarconstraints = [ Zd1 * 1000 >= U[1:] * 1000 - DaR * 1000, Zd1 * 1000 >= 0, ] # Ulcer Index Model Variables risk11 = cp.norm(U[1:] * 1000, "fro") / np.sqrt(T) # Entropic Value at Risk Model Variables t1 = cp.Variable((1, 1)) s1 = cp.Variable((1, 1), nonneg=True) ui = cp.Variable((T, 1)) risk12 = t1 + s1 * np.log(1 / (alpha * T)) if obj == "Sharpe": evarconstraints = [cp.sum(ui) * 1000 <= s1 * 1000] evarconstraints += [ cp.constraints.ExpCone( -X * 1000 - t1 * 1000, np.ones((T, 1)) @ s1 * 1000, ui * 1000 ) ] else: evarconstraints = [cp.sum(ui) <= s1] evarconstraints += [ cp.constraints.ExpCone(-X - t1, np.ones((T, 1)) @ s1, ui) ] # Entropic Drawdown at Risk Model Variables t2 = cp.Variable((1, 1)) s2 = cp.Variable((1, 1), nonneg=True) uj = cp.Variable((T, 1)) risk13 = t2 + s2 * np.log(1 / (alpha * T)) if obj == "Sharpe": edarconstraints = [cp.sum(uj) * 1000 <= s2 * 1000] edarconstraints += [ cp.constraints.ExpCone( U[1:] * 1000 - t2 * 1000, np.ones((T, 1)) @ s2 * 1000, uj * 1000, ) ] else: edarconstraints = [cp.sum(uj) <= s2] edarconstraints += [ cp.constraints.ExpCone(U[1:] - t2, np.ones((T, 1)) @ s2, uj) ] # Gini Mean Difference Model Variables owamodel = False a1 = cp.Variable((T, 1)) b1 = cp.Variable((T, 1)) y = cp.Variable((T, 1)) risk14 = cp.sum(a1 + b1) owaconstraints = [returns @ w == y] gmd_w = owa.owa_gmd(T) / 2 onesvec = np.ones((T, 1)) gmdconstraints = [y @ gmd_w.T <= onesvec @ a1.T + b1 @ onesvec.T] # Tail Gini Model Variables a2 = cp.Variable((T, 1)) b2 = cp.Variable((T, 1)) risk15 = cp.sum(a2 + b2) a_sim = self.a_sim tg_w = owa.owa_tg(T, alpha=alpha) tgconstraints = [y @ tg_w.T <= onesvec @ a2.T + b2 @ onesvec.T] # Range Model Variables a3 = cp.Variable((T, 1)) b3 = cp.Variable((T, 1)) risk16 = cp.sum(a3 + b3) rg_w = owa.owa_rg(T) rgconstraints = [y @ rg_w.T <= onesvec @ a3.T + b3 @ onesvec.T] # CVaR Range Model Variables a4 = cp.Variable((T, 1)) b4 = cp.Variable((T, 1)) risk17 = cp.sum(a4 + b4) if self.beta is None: beta = alpha else: beta = self.beta cvrg_w = owa.owa_cvrg(T, alpha=alpha, beta=beta) cvrgconstraints = [y @ cvrg_w.T <= onesvec @ a4.T + b4 @ onesvec.T] # Tail Gini Range Model Variables a5 = cp.Variable((T, 1)) b5 = cp.Variable((T, 1)) risk18 = cp.sum(a5 + b5) if self.b_sim is None: b_sim = a_sim else: b_sim = self.b_sim tgrg_w = owa.owa_tgrg(T, alpha=alpha, a_sim=a_sim, beta=beta, b_sim=b_sim) tgrgconstraints = [y @ tgrg_w.T <= onesvec @ a5.T + b5 @ onesvec.T] # Kurtosis Model Variables if self.kurt is not None: W = cp.Variable((N, N), symmetric=True) M1 = cp.vstack([W, w.T]) if obj == "Sharpe": M2 = cp.vstack([w, k]) else: M2 = cp.vstack([w, np.ones((1, 1))]) M3 = cp.hstack([M1, M2]) ktconstraints = [M3 >> 0] if N > self.n_max_kurt: K = 2 * N g2 = cp.Variable((K, 1)) risk19 = cp.pnorm(g2, p=2) A = af.block_vec_pq(self.kurt, N, N) s_A, V_A = np.linalg.eig(A) s_A = np.clip(s_A, 0, np.inf) Bi = [] for i in range(K): B = s_A[i] ** 0.5 * V_A[:, i] B = B.reshape((N, N), order="F").real Bi.append(B) for i in range(K): ktconstraints += [g2[i, 0] == cp.trace(Bi[i] @ W)] else: L_2 = self.L_2 S_2 = self.S_2 Sqrt_Sigma_4 = S_2 @ self.kurt @ S_2.T Sqrt_Sigma_4 = sqrtm(Sqrt_Sigma_4) g2 = cp.Variable(nonneg=True) risk19 = g2 z = L_2 @ cp.reshape(cp.vec(W), (N * N, 1)) ktconstraints += [cp.SOC(g2, Sqrt_Sigma_4 @ z)] # Semi Kurtosis Model Variables if self.skurt is not None: SW = cp.Variable((N, N), symmetric=True) SM1 = cp.vstack([SW, w.T]) if obj == "Sharpe": SM2 = cp.vstack([w, k]) else: SM2 = cp.vstack([w, np.ones((1, 1))]) SM3 = cp.hstack([SM1, SM2]) sktconstraints = [SM3 >> 0] if N > self.n_max_kurt: K = 2 * N sg2 = cp.Variable((K, 1)) risk20 = cp.pnorm(sg2, p=2) SA = af.block_vec_pq(self.skurt, N, N) s_SA, V_SA = np.linalg.eig(SA) s_SA = np.clip(s_SA, 0, np.inf) SBi = [] for i in range(K): SB = s_SA[i] ** 0.5 * V_SA[:, i] SB = SB.reshape((N, N), order="F").real SBi.append(SB) for i in range(K): sktconstraints += [sg2[i, 0] == cp.trace(SBi[i] @ SW)] else: Sqrt_SSigma_4 = S_2 @ self.skurt @ S_2.T Sqrt_SSigma_4 = sqrtm(Sqrt_SSigma_4) sg2 = cp.Variable(nonneg=True) risk20 = sg2 sz = L_2 @ cp.reshape(cp.vec(SW), (N * N, 1)) sktconstraints += [cp.SOC(sg2, Sqrt_SSigma_4 @ sz)] # Relativistic Value at Risk Variables kappa = self.kappa t3 = cp.Variable((1, 1)) s3 = cp.Variable((1, 1), nonneg=True) omega3 = cp.Variable((T, 1)) psi3 = cp.Variable((T, 1)) theta3 = cp.Variable((T, 1)) epsilon3 = cp.Variable((T, 1)) rlvarconstraints = [ cp.constraints.power.PowCone3D( s3 * (1 + kappa) / (2 * kappa) * onesvec, psi3 * (1 + kappa) / kappa, epsilon3, 1 / (1 + kappa), ), cp.constraints.power.PowCone3D( omega3 / (1 - kappa), theta3 / kappa, -s3 / (2 * kappa) * onesvec, (1 - kappa), ), -X * 1000 - t3 * 1000 + epsilon3 * 1000 + omega3 * 1000 <= 0, ] ln_k = ((1 / (alpha * T)) ** kappa - (1 / (alpha * T)) ** (-kappa)) / ( 2 * kappa ) risk21 = t3 + ln_k * s3 + cp.sum(psi3 + theta3) # Relativistic Drawdown at Risk Variables t4 = cp.Variable((1, 1)) s4 = cp.Variable((1, 1), nonneg=True) omega4 = cp.Variable((T, 1)) psi4 = cp.Variable((T, 1)) theta4 = cp.Variable((T, 1)) epsilon4 = cp.Variable((T, 1)) rldarconstraints = [ cp.constraints.power.PowCone3D( s4 * (1 + kappa) / (2 * kappa) * onesvec, psi4 * (1 + kappa) / kappa, epsilon4, 1 / (1 + kappa), ), cp.constraints.power.PowCone3D( omega4 / (1 - kappa), theta4 / kappa, -s4 / (2 * kappa) * onesvec, (1 - kappa), ), U[1:] * 1000 - t4 * 1000 + epsilon4 * 1000 + omega4 * 1000 <= 0, ] risk22 = t4 + ln_k * s4 + cp.sum(psi4 + theta4) # Cardinal Boolean Variables if self.card is not None: if obj == "Sharpe": e = cp.Variable((mu.shape[1], 1), boolean=True) e1 = cp.Variable((mu.shape[1], 1)) else: e = cp.Variable((mu.shape[1], 1), boolean=True) # Problem Weight Constraints if obj == "Sharpe": constraints = [cp.sum(w) == self.budget * k, k * 1000 >= 0] if self.sht == False: constraints += [w <= self.upperlng * k, w * 1000 >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w <= self.upperlng * e1, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * k * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * k * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w >= -self.uppersht * e1, w <= self.upperlng * e1, ] else: constraints = [cp.sum(w) == self.budget] if self.sht == False: constraints += [w <= self.upperlng, w * 1000 >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w <= self.upperlng * e, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w >= -self.uppersht * e, w <= self.upperlng * e, ] # Problem Linear Constraints if self.ainequality is not None and self.binequality is not None: A = np.array(self.ainequality, ndmin=2) * 1000 B = np.array(self.binequality, ndmin=2) * 1000 if obj == "Sharpe": constraints += [A @ w - B @ k >= 0] else: constraints += [A @ w - B >= 0] # Number of Effective Assets Constraints if self.nea is not None: if obj == "Sharpe": constraints += [cp.norm(w, "fro") <= 1 / self.nea**0.5 * k] else: constraints += [cp.norm(w, "fro") <= 1 / self.nea**0.5] # Tracking Error Model Variables c = np.array(self.benchweights, ndmin=2) if self.kindbench == True: bench = returns @ c elif self.kindbench == False: bench = np.array(self.benchindex, ndmin=2) # Tracking error Constraints if obj == "Sharpe": if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench @ k, "fro") / cp.sqrt(T - 1) constraints += [TE_1 * 1000 <= self.TE * k * 1000] else: if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench, "fro") / cp.sqrt(T - 1) constraints += [TE_1 * 1000 <= self.TE * 1000] # Turnover Constraints if obj == "Sharpe": if self.allowTO == True: TO_1 = cp.abs(w - c @ k) * 1000 constraints += [TO_1 <= self.turnover * k * 1000] else: if self.allowTO == True: TO_1 = cp.abs(w - c) * 1000 constraints += [TO_1 <= self.turnover * 1000] # Problem return Constraints if self.lowerret is not None: if obj == "Sharpe": constraints += [ret >= self.lowerret * k] else: constraints += [ret >= self.lowerret] # Problem risk Constraints if self.upperdev is not None: if obj == "Sharpe": constraints += [g <= self.upperdev * k] else: constraints += [g <= self.upperdev] constraints += devconstraints if self.uppermad is not None: if obj == "Sharpe": constraints += [risk2 <= self.uppermad * k / 2] else: constraints += [risk2 <= self.uppermad / 2] madmodel = True if self.uppersdev is not None: if obj == "Sharpe": constraints += [risk3 <= self.uppersdev * k] else: constraints += [risk3 <= self.uppersdev] madmodel = True if self.upperCVaR is not None: if obj == "Sharpe": constraints += [risk4 <= self.upperCVaR * k] else: constraints += [risk4 <= self.upperCVaR] constraints += cvarconstraints if self.upperwr is not None: if obj == "Sharpe": constraints += [-X <= self.upperwr * k] else: constraints += [-X <= self.upperwr] constraints += wrconstraints if self.upperflpm is not None: if obj == "Sharpe": constraints += [risk6 <= self.upperflpm * k] else: constraints += [risk6 <= self.upperflpm] lpmmodel = True if self.upperslpm is not None: if obj == "Sharpe": constraints += [risk7 <= self.upperslpm * k] else: constraints += [risk7 <= self.upperslpm] lpmmodel = True if self.uppermdd is not None: if obj == "Sharpe": constraints += [U[1:] <= self.uppermdd * k] else: constraints += [U[1:] <= self.uppermdd] constraints += mddconstraints drawdown = True if self.upperadd is not None: if obj == "Sharpe": constraints += [risk9 <= self.upperadd * k] else: constraints += [risk9 <= self.upperadd] drawdown = True if self.upperCDaR is not None: if obj == "Sharpe": constraints += [risk10 <= self.upperCDaR * k] else: constraints += [risk10 <= self.upperCDaR] constraints += cdarconstraints drawdown = True if self.upperuci is not None: if obj == "Sharpe": constraints += [risk11 <= self.upperuci * 1000 * k] else: constraints += [risk11 <= self.upperuci * 1000] drawdown = True if self.upperEVaR is not None: if obj == "Sharpe": constraints += [risk12 <= self.upperEVaR * k] else: constraints += [risk12 <= self.upperEVaR] constraints += evarconstraints if self.upperEDaR is not None: if obj == "Sharpe": constraints += [risk13 <= self.upperEDaR * k] else: constraints += [risk13 <= self.upperEDaR] constraints += edarconstraints if self.uppergmd is not None: if obj == "Sharpe": constraints += [risk14 <= self.uppergmd * k / 2] else: constraints += [risk14 <= self.uppergmd / 2] constraints += gmdconstraints owamodel = True if self.uppertg is not None: if obj == "Sharpe": constraints += [risk15 <= self.uppertg * k] else: constraints += [risk15 <= self.uppertg] constraints += tgconstraints owamodel = True if self.upperrg is not None: if obj == "Sharpe": constraints += [risk16 <= self.upperrg * k] else: constraints += [risk16 <= self.upperrg] constraints += rgconstraints owamodel = True if self.uppercvrg is not None: if obj == "Sharpe": constraints += [risk17 <= self.uppercvrg * k] else: constraints += [risk17 <= self.uppercvrg] constraints += cvrgconstraints owamodel = True if self.uppertgrg is not None: if obj == "Sharpe": constraints += [risk18 <= self.uppertgrg * k] else: constraints += [risk18 <= self.uppertgrg] constraints += tgrgconstraints owamodel = True if self.kurt is not None: if self.upperkt is not None: if obj == "Sharpe": constraints += [risk19 <= self.upperkt * k] else: constraints += [risk19 <= self.upperkt] constraints += ktconstraints if self.skurt is not None: if self.upperskt is not None: if obj == "Sharpe": constraints += [risk20 <= self.upperskt * k] else: constraints += [risk20 <= self.upperskt] constraints += sktconstraints if self.upperRLVaR is not None: if obj == "Sharpe": constraints += [risk21 <= self.upperRLVaR * k] else: constraints += [risk21 <= self.upperRLVaR] constraints += rlvarconstraints if self.upperRLDaR is not None: if obj == "Sharpe": constraints += [risk22 <= self.upperRLDaR * k] else: constraints += [risk22 <= self.upperRLDaR] constraints += rldarconstraints # Defining risk function if rm == "MV": risk = risk1 if self.upperdev is None: constraints += devconstraints elif rm == "MAD": risk = risk2 madmodel = True elif rm == "MSV": risk = risk3 madmodel = True elif rm == "CVaR": risk = risk4 if self.upperCVaR is None: constraints += cvarconstraints elif rm == "WR": risk = risk5 if self.upperwr is None: constraints += wrconstraints elif rm == "FLPM": risk = risk6 lpmmodel = True elif rm == "SLPM": risk = risk7 lpmmodel = True elif rm == "MDD": risk = risk8 drawdown = True if self.uppermdd is None: constraints += mddconstraints elif rm == "ADD": risk = risk9 drawdown = True elif rm == "CDaR": risk = risk10 drawdown = True if self.upperCDaR is None: constraints += cdarconstraints elif rm == "UCI": risk = risk11 drawdown = True l = l / 1000 elif rm == "EVaR": risk = risk12 if self.upperEVaR is None: constraints += evarconstraints elif rm == "EDaR": risk = risk13 drawdown = True if self.upperEDaR is None: constraints += edarconstraints elif rm == "GMD": risk = risk14 owamodel = True if self.uppergmd is None: constraints += gmdconstraints elif rm == "TG": risk = risk15 owamodel = True if self.uppertg is None: constraints += tgconstraints elif rm == "RG": risk = risk16 owamodel = True if self.upperrg is None: constraints += rgconstraints elif rm == "CVRG": risk = risk17 owamodel = True if self.uppertgrg is None: constraints += cvrgconstraints elif rm == "TGRG": risk = risk18 owamodel = True if self.uppertg is None: constraints += tgrgconstraints elif rm == "KT": if self.kurt is not None: risk = risk19 if self.upperkt is None: constraints += ktconstraints else: raise ValueError( "First you need to calculate Cokurtosis Square Matrix." ) elif rm == "SKT": if self.skurt is not None: risk = risk20 if self.upperskt is None: constraints += sktconstraints else: raise ValueError( "First you need to calculate Semi Cokurtosis Square Matrix." ) elif rm == "RLVaR": risk = risk21 if self.upperRLVaR is None: constraints += rlvarconstraints elif rm == "RLDaR": risk = risk22 drawdown = True if self.upperRLDaR is None: constraints += rldarconstraints if madmodel == True: constraints += madconstraints if lpmmodel == True: constraints += lpmconstraints if drawdown == True: constraints += ddconstraints if owamodel == True: constraints += owaconstraints # Frontier Variables portafolio = {} for i in self.assetslist: portafolio.update({i: []}) # Optimization Process # Defining objective function if obj == "Sharpe": if model == "Classic": if kelly == "exact": constraints += [risk <= 1] constraints += [ cp.constraints.ExpCone(gr, np.ones((T, 1)) @ k, k + returns @ w) ] objective = cp.Maximize(ret * 1000) elif kelly == "approx": constraints += [risk <= 1] if rm != "MV": constraints += devconstraints objective = cp.Maximize(ret) elif kelly == False: constraints += [mu @ w - rf0 * k == 1] objective = cp.Minimize(risk * 1000) else: constraints += [mu @ w - rf0 * k == 1] objective = cp.Minimize(risk * 1000) elif obj == "MinRisk": objective = cp.Minimize(risk * 1000) elif obj == "Utility": objective = cp.Maximize(ret - l * risk) elif obj == "MaxRet": objective = cp.Maximize(ret * 1000) try: prob = cp.Problem(objective, constraints) for solver in self.solvers: try: if len(self.sol_params) == 0: prob.solve(solver=solver) else: prob.solve(solver=solver, **self.sol_params[solver]) except: pass if w.value is not None: break if obj == "Sharpe": weights = np.array(w.value / k.value, ndmin=2).T if rm == "EVaR" or self.upperEVaR is not None: self.z_EVaR = s1.value / k.value if rm == "EDaR" or self.upperEDaR is not None: self.z_EDaR = s2.value / k.value if rm == "RLVaR" or self.upperRLVaR is not None: self.z_RLVaR = s3.value / k.value if rm == "RLDaR" or self.upperRLDaR is not None: self.z_RLDaR = s4.value / k.value else: weights = np.array(w.value, ndmin=2).T if rm == "EVaR" or self.upperEVaR is not None: self.z_EVaR = s1.value if rm == "EDaR" or self.upperEDaR is not None: self.z_EDaR = s2.value if rm == "RLVaR" or self.upperRLVaR is not None: self.z_RLVaR = s3.value if rm == "RLDaR" or self.upperRLDaR is not None: self.z_RLDaR = s4.value if self.sht == False: weights = np.abs(weights) / np.sum(np.abs(weights)) * self.budget for j in self.assetslist: portafolio[j].append(weights[0, self.assetslist.index(j)]) except: pass try: self.optimal = pd.DataFrame( portafolio, index=["weights"], dtype=np.float64 ).T except: self.optimal = None print("The problem doesn't have a solution with actual input parameters") return self.optimal
[文档] def rp_optimization(self, model="Classic", rm="MV", rf=0, b=None, hist=True): r""" This method that calculates the risk parity portfolio using the risk budgeting approach :cite:`a-Roncalli` :cite:`a-RichardRoncalli`, according to the optimization model selected by the user. The general problem that solves is: .. math:: \begin{aligned} &\underset{w}{\min} & & \phi(w)\\ &\text{s.t.} & & b \log(w) \geq c\\ & & & \mu w \geq \overline{\mu} \\ & & & Aw \geq B \\ & & & w \geq 0 \\ \end{aligned} Where: :math:`w` are the weights of the portfolio. :math:`\mu`: is the vector of expected returns. :math:`b` is a vector of risk constraints. :math:`Aw \geq B`: is a set of linear constraints. :math:`\phi(w)`: is a risk measure. :math:`c`: is an arbitrary constant. Parameters ---------- model : str can be 'Classic' or 'FM' The model used for optimize the portfolio. The default is 'Classic'. Possible values are: - 'Classic': use estimates of expected return vector and covariance matrix that depends on historical data. - 'FM': use estimates of expected return vector and covariance matrix based on a Risk Factor model specified by the user. rm : str, optional The risk measure used to optimize the portfolio. The default is 'MV'. Possible values are: - 'MV': Standard Deviation. - 'KT': Square Root of Kurtosis. - 'MAD': Mean Absolute Deviation. - 'GMD': Gini Mean Difference. - 'MSV': Semi Standard Deviation. - 'SKT': Square Root of Semi Kurtosis. - 'FLPM': First Lower Partial Moment (Omega Ratio). - 'SLPM': Second Lower Partial Moment (Sortino Ratio). - 'CVaR': Conditional Value at Risk. - 'TG': Tail Gini. - 'EVaR': Entropic Value at Risk. - 'RLVaR': Relativistic Value at Risk. - 'CVRG': CVaR range of returns. - 'TGRG': Tail Gini range of returns. - 'CDaR': Conditional Drawdown at Risk of uncompounded cumulative returns. - 'EDaR': Entropic Drawdown at Risk of uncompounded cumulative returns. - 'RLDaR': Relativistic Drawdown at Risk of uncompounded cumulative returns. - 'UCI': Ulcer Index of uncompounded cumulative returns. rf : float, optional Risk free rate, must be in the same period of assets returns. Used for 'FLPM' and 'SLPM'. The default is 0. b : float, optional The vector of risk constraints per asset. The default is 1/n (number of assets). hist : bool, optional Indicate what kind of returns are used to calculate risk measures that depends on scenarios (All except 'MV' risk measure). If model = 'FM', True means historical covariance and returns and False means Risk Factor model for covariance and returns. The default is True. Returns ------- w : DataFrame The weights of optimal portfolio. """ # General model Variables mu = None sigma = None returns = None if model == "Classic": mu = np.array(self.mu, ndmin=2) sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "FM": mu = np.array(self.mu_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) # General Model Variables T, N = returns.shape if b is None: rb = np.ones((N, 1)) rb = rb / N else: self.b = b.copy() rb = self.b returns = np.array(returns, ndmin=2) w = cp.Variable((N, 1)) k = cp.Variable((1, 1)) rf0 = rf ret = mu @ w constraints = [] # MV Model Variables g = cp.Variable(nonneg=True) G = sqrtm(sigma) risk1 = g**2 devconstraints = [cp.SOC(g, G.T @ w)] # MAD Model Variables Y = cp.Variable((T, 1)) u = np.repeat(mu, T, axis=0) a = returns - u risk2 = cp.sum(Y) / T # madconstraints=[a @ w >= -Y, a @ w <= Y, Y >= 0] madconstraints = [a @ w * 1000 >= -Y * 1000, Y * 1000 >= 0] # Semi Variance Model Variables risk3 = cp.norm(Y, "fro") / cp.sqrt(T - 1) # CVaR Model Variables VaR = cp.Variable((1, 1)) alpha = self.alpha X = returns @ w Z1 = cp.Variable((T, 1)) risk4 = VaR + 1 / (alpha * T) * cp.sum(Z1) cvarconstraints = [Z1 * 1000 >= 0, Z1 * 1000 >= -X * 1000 - VaR * 1000] # Lower Partial Moment Variables lpm = cp.Variable((T, 1)) lpmconstraints = [lpm * 1000 >= 0, lpm * 1000 >= rf0 * k * 1000 - X * 1000] # First Lower Partial Moment (Omega) Model Variables risk6 = cp.sum(lpm) / T # Second Lower Partial Moment (Sortino) Model Variables risk7 = cp.norm(lpm, "fro") / cp.sqrt(T - 1) # Drawdown Model Variables # X1 = k + np.cumsum(returns, axis=0) @ w U = cp.Variable((T + 1, 1)) ddconstraints = [ U[1:] * 1000 >= U[:-1] * 1000 - X * 1000, U[1:] * 1000 >= 0, U[0] * 1000 == 0, ] # Conditional Drawdown Model Variables DaR = cp.Variable((1, 1)) Zd = cp.Variable((T, 1)) risk10 = DaR + 1 / (alpha * T) * cp.sum(Zd) cdarconstraints = [ Zd * 1000 >= U[1:] * 1000 - DaR * 1000, Zd * 1000 >= 0, ] # Ulcer Index Model Variables risk11 = cp.norm(U[1:], "fro") / np.sqrt(T) # Entropic Value at Risk Model Variables t1 = cp.Variable((1, 1)) s1 = cp.Variable((1, 1), nonneg=True) ui = cp.Variable((T, 1)) risk12 = t1 + s1 * np.log(1 / (alpha * T)) evarconstraints = [cp.sum(ui) * 1000 <= s1 * 1000] evarconstraints += [ cp.constraints.ExpCone( -X * 1000 - t1 * 1000, np.ones((T, 1)) @ s1 * 1000, ui * 1000 ) ] # Entropic Drawdown at Risk Model Variables t2 = cp.Variable((1, 1)) s2 = cp.Variable((1, 1), nonneg=True) uj = cp.Variable((T, 1)) risk13 = t2 + s2 * np.log(1 / (alpha * T)) edarconstraints = [cp.sum(uj) * 1000 <= s2 * 1000] edarconstraints += [ cp.constraints.ExpCone( U[1:] * 1000 - t2 * 1000, np.ones((T, 1)) @ s2 * 1000, uj * 1000, ) ] # Gini Mean Difference Model Variables a1 = cp.Variable((T, 1)) b1 = cp.Variable((T, 1)) y = cp.Variable((T, 1)) risk14 = cp.sum(a1 + b1) owaconstraints = [returns @ w == y] gmd_w = owa.owa_gmd(T) / 2 onesvec = np.ones((T, 1)) gmdconstraints = [ y @ gmd_w.T * 1000 <= (onesvec @ a1.T + b1 @ onesvec.T) * 1000 ] # Tail Gini Model Variables a2 = cp.Variable((T, 1)) b2 = cp.Variable((T, 1)) risk15 = cp.sum(a2 + b2) a_sim = self.a_sim tg_w = owa.owa_tg(T, alpha=alpha, a_sim=a_sim) tgconstraints = [y @ tg_w.T * 1000 <= (onesvec @ a2.T + b2 @ onesvec.T) * 1000] # CVaR Range Model Variables a4 = cp.Variable((T, 1)) b4 = cp.Variable((T, 1)) risk17 = cp.sum(a4 + b4) if self.beta is None: beta = alpha else: beta = self.beta cvrg_w = owa.owa_cvrg(T, alpha=alpha, beta=beta) cvrgconstraints = [ y @ cvrg_w.T * 1000 <= (onesvec @ a4.T + b4 @ onesvec.T) * 1000 ] # Tail Gini Range Model Variables a5 = cp.Variable((T, 1)) b5 = cp.Variable((T, 1)) risk18 = cp.sum(a5 + b5) if self.b_sim is None: b_sim = a_sim else: b_sim = self.b_sim tgrg_w = owa.owa_tgrg(T, alpha=alpha, a_sim=a_sim, beta=beta, b_sim=b_sim) tgrgconstraints = [ y @ tgrg_w.T * 1000 <= (onesvec @ a5.T + b5 @ onesvec.T) * 1000 ] # Kurtosis Model Variables if self.kurt is not None: W = cp.Variable((N, N), symmetric=True) M1 = cp.vstack([W, w.T]) M2 = cp.vstack([w, np.ones((1, 1))]) M3 = cp.hstack([M1, M2]) ktconstraints = [M3 >> 0] if N > self.n_max_kurt: K = 2 * N g2 = cp.Variable((K, 1)) risk19 = cp.pnorm(g2, p=2) A = af.block_vec_pq(self.kurt, N, N) s_A, V_A = np.linalg.eig(A) s_A = np.clip(s_A, 0, np.inf) Bi = [] for i in range(K): B = s_A[i] ** 0.5 * V_A[:, i] B = B.reshape((N, N), order="F").real Bi.append(B) for i in range(K): ktconstraints += [g2[i, 0] == cp.trace(Bi[i] @ W)] else: L_2 = self.L_2 S_2 = self.S_2 Sqrt_Sigma_4 = S_2 @ self.kurt @ S_2.T Sqrt_Sigma_4 = sqrtm(Sqrt_Sigma_4) g2 = cp.Variable(nonneg=True) risk19 = g2 z = L_2 @ cp.reshape(cp.vec(W), (N * N, 1)) ktconstraints += [cp.SOC(g2, Sqrt_Sigma_4 @ z)] # Semi Kurtosis Model Variables if self.skurt is not None: SW = cp.Variable((N, N), symmetric=True) SM1 = cp.vstack([SW, w.T]) SM2 = cp.vstack([w, np.ones((1, 1))]) SM3 = cp.hstack([SM1, SM2]) sktconstraints = [SM3 >> 0] if N > self.n_max_kurt: K = 2 * N sg2 = cp.Variable((K, 1)) risk20 = cp.pnorm(sg2, p=2) SA = af.block_vec_pq(self.skurt, N, N) s_SA, V_SA = np.linalg.eig(SA) s_SA = np.clip(s_SA, 0, np.inf) SBi = [] for i in range(K): SB = s_SA[i] ** 0.5 * V_SA[:, i] SB = SB.reshape((N, N), order="F").real SBi.append(SB) for i in range(K): sktconstraints += [sg2[i, 0] == cp.trace(SBi[i] @ SW)] else: Sqrt_SSigma_4 = S_2 @ self.skurt @ S_2.T Sqrt_SSigma_4 = sqrtm(Sqrt_SSigma_4) sg2 = cp.Variable(nonneg=True) risk20 = sg2 sz = L_2 @ cp.reshape(cp.vec(SW), (N * N, 1)) sktconstraints += [cp.SOC(sg2, Sqrt_SSigma_4 @ sz)] # Relativistic Value at Risk Variables kappa = self.kappa t3 = cp.Variable((1, 1)) s3 = cp.Variable((1, 1), nonneg=True) omega3 = cp.Variable((T, 1)) psi3 = cp.Variable((T, 1)) theta3 = cp.Variable((T, 1)) epsilon3 = cp.Variable((T, 1)) rlvarconstraints = [ cp.constraints.power.PowCone3D( s3 * (1 + kappa) / (2 * kappa) * onesvec, psi3 * (1 + kappa) / kappa, epsilon3, 1 / (1 + kappa), ), cp.constraints.power.PowCone3D( omega3 / (1 - kappa), theta3 / kappa, -s3 / (2 * kappa) * onesvec, (1 - kappa), ), -X * 1000 - t3 * 1000 + epsilon3 * 1000 + omega3 * 1000 <= 0, ] ln_k = ((1 / (alpha * T)) ** kappa - (1 / (alpha * T)) ** (-kappa)) / ( 2 * kappa ) risk21 = t3 + ln_k * s3 + onesvec.T @ psi3 + onesvec.T @ theta3 # Relativistic Drawdown at Risk Variables t4 = cp.Variable((1, 1)) s4 = cp.Variable((1, 1), nonneg=True) omega4 = cp.Variable((T, 1)) psi4 = cp.Variable((T, 1)) theta4 = cp.Variable((T, 1)) epsilon4 = cp.Variable((T, 1)) rldarconstraints = [ cp.constraints.power.PowCone3D( s4 * (1 + kappa) / (2 * kappa) * onesvec, psi4 * (1 + kappa) / kappa, epsilon4, 1 / (1 + kappa), ), cp.constraints.power.PowCone3D( omega4 / (1 - kappa), theta4 / kappa, -s4 / (2 * kappa) * onesvec, (1 - kappa), ), U[1:] * 1000 - t4 * 1000 + epsilon4 * 1000 + omega4 * 1000 <= 0, ] risk22 = t4 + ln_k * s4 + onesvec.T @ psi4 + onesvec.T @ theta4 # Problem Linear Constraints if self.ainequality is not None and self.binequality is not None: A = np.array(self.ainequality, ndmin=2) * 1000 B = np.array(self.binequality, ndmin=2) * 1000 constraints += [A @ w - B @ k >= 0] # Problem Return Constraint if self.lowerret is not None: constraints += [ret >= self.lowerret * k] # Defining risk function if rm == "MV": risk = risk1 constraints += devconstraints elif rm == "MAD": risk = risk2 constraints += madconstraints elif rm == "MSV": risk = risk3 constraints += madconstraints elif rm == "CVaR": risk = risk4 constraints += cvarconstraints elif rm == "FLPM": risk = risk6 constraints += lpmconstraints elif rm == "SLPM": risk = risk7 constraints += lpmconstraints elif rm == "CDaR": risk = risk10 constraints += ddconstraints constraints += cdarconstraints elif rm == "UCI": risk = risk11 constraints += ddconstraints elif rm == "EVaR": risk = risk12 constraints += evarconstraints elif rm == "EDaR": risk = risk13 constraints += ddconstraints constraints += edarconstraints elif rm == "GMD": risk = risk14 constraints += owaconstraints constraints += gmdconstraints elif rm == "TG": risk = risk15 constraints += owaconstraints constraints += tgconstraints elif rm == "CVRG": risk = risk17 constraints += owaconstraints constraints += cvrgconstraints elif rm == "TGRG": risk = risk18 constraints += owaconstraints constraints += tgrgconstraints elif rm == "KT": if self.kurt is not None: risk = risk19 constraints += ktconstraints else: raise ValueError( "First you need to calculate Cokurtosis Square Matrix." ) elif rm == "SKT": if self.skurt is not None: risk = risk20 constraints += sktconstraints else: raise ValueError( "First you need to calculate Semi Cokurtosis Square Matrix." ) elif rm == "RLVaR": risk = risk21 constraints += rlvarconstraints elif rm == "RLDaR": risk = risk22 constraints += ddconstraints constraints += rldarconstraints # Risk budgeting constraint log_w = cp.Variable((N, 1)) constraints += [ # rb.T @ cp.log(w) >= 1, rb.T @ log_w >= 1, cp.ExpCone(log_w * 1000, np.ones((N, 1)) * 1000, w * 1000), w * 1000 >= 0, cp.sum(w) * 1000 == k * 1000, ] # Frontier Variables portafolio = {} for i in self.assetslist: portafolio.update({i: []}) # Optimization Process # Defining objective function objective = cp.Minimize(risk * 1000) try: prob = cp.Problem(objective, constraints) for solver in self.solvers: try: if len(self.sol_params) == 0: prob.solve(solver=solver) else: prob.solve(solver=solver, **self.sol_params[solver]) except: pass if w.value is not None: break if rm == "EVaR": self.z_EVaR = s1.value if rm == "EDaR": self.z_EDaR = s2.value if rm == "RLVaR": self.z_RLVaR = s3.value if rm == "RLDaR": self.z_RLDaR = s4.value weights = np.array(w.value, ndmin=2).T weights = np.abs(weights) / np.sum(np.abs(weights)) for j in self.assetslist: portafolio[j].append(weights[0, self.assetslist.index(j)]) except: pass try: self.rp_optimal = pd.DataFrame( portafolio, index=["weights"], dtype=np.float64 ).T except: self.rp_optimal = None print("The problem doesn't have a solution with actual input parameters") return self.rp_optimal
[文档] def rrp_optimization(self, model="Classic", version="A", l=1, b=None, hist=True): r""" This method that calculates the relaxed risk parity portfolio according to the optimization model and version selected by the user :cite:`a-GambetaKwon`. The general problem that solves is: .. math:: \begin{aligned} &\underset{w}{\min} & & \psi - \gamma & \\ &\text{s.t.} & & \zeta = \Sigma w \\ & & & w^{T} \Sigma w \leq \left ( \psi^{2} - \rho^{2} \right ) & \\ & & & w_{i} \zeta_{i} \geq \gamma^{2} b_{i} & \forall i=1 , \ldots , N \\ & & & \lambda x^{T} \Theta x \leq \rho^{2} & \\ & & & \mu w \geq \overline{\mu} & \\ & & & Aw \geq B & \\ & & & \sum^{N}_{i=1} w_{i} = 1 & \\ & & & \psi, \gamma, \rho, w \geq 0 & \\ \end{aligned} Where: :math:`w`: is the vector of weights of the optimum portfolio. :math:`\mu`: is the vector of expected returns. :math:`\Sigma`: is the covariance matrix of assets returns. :math:`\psi`: is the average risk of the portfolio. :math:`\gamma`: is the lower bound of each asset risk constribution. :math:`b`: is the risk constribution constraint vector. :math:`\zeta_{i}`: is the marginal risk of asset :math:`i`. :math:`\rho`: is a regularization variable. :math:`\lambda`: is a penalty parameter of :math:`\rho`. :math:`\Theta = \text{diag}(\Sigma)` :math:`Aw \geq B`: is a set of linear constraints. Parameters ---------- model : str can be 'Classic' or 'FM' The model used for optimize the portfolio. The default is 'Classic'. Possible values are: - 'Classic': use estimates of expected return vector and covariance matrix that depends on historical data. - 'FM': use estimates of expected return vector and covariance matrix based on a Risk Factor model specified by the user. version : str can be 'A', 'B' or 'C' Relaxed risk parity model version proposed in :cite:`a-RichardRoncalli`. The default is 'A'. Possible values are: - 'A': without regularization and penalization constraints. - 'B': with regularization constraint but without penalization constraint. - 'C': with regularization and penalization constraints. l : float, optional The penalization factor of penalization constraints. Only used with version 'C'. The default is 1. b : float, optional The vector of risk constraints per asset. The default is 1/n (number of assets). hist : bool, optional Indicate what kind of covariance matrix is used. If model = 'FM', True means historical covariance and False means Risk Factor model for covariance. The default is True. Returns ------- w : DataFrame The weights of optimal portfolio. """ # General model Variables mu = None sigma = None returns = None if model == "Classic": mu = np.array(self.mu, ndmin=2) sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "FM": mu = np.array(self.mu_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) # General Model Variables T, N = returns.shape if b is None: rb = np.ones((N, 1)) rb = rb / N else: self.b = b.copy() rb = self.b returns = np.array(returns, ndmin=2) w = cp.Variable((mu.shape[1], 1)) ret = mu @ w # RRP Model Variables G = sqrtm(sigma) Theta = np.diag(np.sqrt(np.diag(sigma))) psi = cp.Variable(nonneg=True) rho = cp.Variable(nonneg=True) gamma = cp.Variable(nonneg=True) zeta = cp.Variable((mu.shape[1], 1)) risk = psi - gamma # General Model Constraints constraints = [] constraints += [ zeta == sigma @ w, cp.sum(w) == 1, gamma >= 0, psi >= 0, zeta >= 0, w >= 0, ] for i in range(mu.shape[1]): constraints += [ cp.SOC( w[i, 0] + zeta[i, 0], cp.vstack([2 * gamma * rb[i] ** 0.5, w[i, 0] - zeta[i, 0]]), ) ] # Specific Model Constraints if version == "A": constraints += [cp.SOC(psi, G.T @ w)] elif version == "B": constraints += [ cp.SOC( 2 * psi, cp.vstack([2 * G.T @ w, -2 * rho * np.ones((1, 1))]), ) ] constraints += [cp.SOC(rho, G.T @ w)] elif version == "C": constraints += [ cp.SOC( 2 * psi, cp.vstack([2 * G.T @ w, -2 * rho * np.ones((1, 1))]), ) ] constraints += [cp.SOC(rho, l**0.5 * Theta.T @ w)] # Problem Linear Constraints if self.ainequality is not None and self.binequality is not None: A = np.array(self.ainequality, ndmin=2) * 1000 B = np.array(self.binequality, ndmin=2) * 1000 constraints += [A @ w - B >= 0] # Problem Return Constraint if self.lowerret is not None: constraints += [ret >= self.lowerret] # Frontier Variables portafolio = {} for i in self.assetslist: portafolio.update({i: []}) # Optimization Process # Defining objective function objective = cp.Minimize(risk * 1000) try: prob = cp.Problem(objective, constraints) for solver in self.solvers: try: if len(self.sol_params) == 0: prob.solve(solver=solver) else: prob.solve(solver=solver, **self.sol_params[solver]) except: pass if w.value is not None: break weights = np.array(w.value, ndmin=2).T weights = np.abs(weights) / np.sum(np.abs(weights)) for j in self.assetslist: portafolio[j].append(weights[0, self.assetslist.index(j)]) except: pass try: self.rrp_optimal = pd.DataFrame( portafolio, index=["weights"], dtype=np.float64 ).T except: self.rrp_optimal = None print("The problem doesn't have a solution with actual input parameters") return self.rrp_optimal
[文档] def wc_optimization(self, obj="Sharpe", rf=0, l=2, Umu="box", Ucov="box"): r""" This method that calculates the worst case mean variance portfolio according to the objective function and uncertainty sets selected by the user. Parameters ---------- obj : str can be {'MinRisk', 'Utility', 'Sharpe' or 'MaxRet'}. Objective function of the optimization model. The default is 'Sharpe'. Possible values are: - 'MinRisk': Minimize the worst case formulation of the selected risk measure. - 'Utility': Maximize the worst case formulation of the Utility function :math:`\mu w - l \phi_{i}(w)`. - 'Sharpe': Maximize the worst case formulation of the risk adjusted return ratio based on the selected risk measure. - 'MaxRet': Maximize the worst case formulation of the expected return of the portfolio. rf : float, optional Risk free rate, must be in the same period of assets returns. The default is 0. l : scalar, optional Risk aversion factor of the 'Utility' objective function. The default is 2. Umu : str, optional The type of uncertainty set for the mean vector used in the model. The default is 'box'. Possible values are: - 'box': Use a box uncertainty set for the mean vector. - 'ellip': Use a elliptical uncertainty set for the mean vector. - None: Don't use an uncertainty set for mean vector. Ucov : str, optional The type of uncertainty set for the covariance matrix used in the model. The default is 'box'. Possible values are: - 'box': Use a box uncertainty set for the covariance matrix. - 'ellip': Use a elliptical uncertainty set for the covariance matrix. - None: Don't use an uncertainty set for covariance matrix. Returns ------- w : DataFrame The weights of optimal portfolio. """ # General model Variables mu = self.mu.to_numpy() sigma = self.cov.to_numpy() returns = self.returns.to_numpy() cov_l = self.cov_l.to_numpy() cov_u = self.cov_u.to_numpy() cov_mu = self.cov_mu.to_numpy() cov_sigma = self.cov_sigma.to_numpy() d_mu = self.d_mu.to_numpy() k_mu = self.k_mu k_sigma = self.k_sigma T, N = returns.shape w = cp.Variable((N, 1)) Au = cp.Variable((N, N), symmetric=True) Al = cp.Variable((N, N), symmetric=True) X = cp.Variable((N, N), symmetric=True) Z = cp.Variable((N, N), symmetric=True) k = cp.Variable((1, 1)) rf0 = rf g = cp.Variable(nonneg=True) constraints = [] # Uncertainty Sets for Mean Vector if Umu == "box": if obj == "Sharpe": constraints += [mu @ w - d_mu @ cp.abs(w) - rf0 * k >= 1] else: ret = mu @ w - d_mu @ cp.abs(w) elif Umu == "ellip": if obj == "Sharpe": constraints += [ mu @ w - k_mu * cp.pnorm(sqrtm(cov_mu) @ w, 2) - rf0 * k >= 1 ] else: ret = mu @ w - k_mu * cp.pnorm(sqrtm(cov_mu) @ w, 2) else: if obj == "Sharpe": constraints += [mu @ w - rf0 * k >= 1] else: ret = mu @ w # Uncertainty Sets for Covariance Matrix if Ucov == "box": M1 = cp.vstack([Au - Al, w.T]) if obj == "Sharpe": M2 = cp.vstack([w, k]) else: M2 = cp.vstack([w, np.ones((1, 1))]) M = cp.hstack([M1, M2]) risk = cp.trace(Au @ cov_u) - cp.trace(Al @ cov_l) constraints += [M >> 0, Au >= 0, Al >= 0] elif Ucov == "ellip": M1 = cp.vstack([X, w.T]) if obj == "Sharpe": M2 = cp.vstack([w, k]) else: M2 = cp.vstack([w, np.ones((1, 1))]) M = cp.hstack([M1, M2]) risk = cp.trace(sigma @ (X + Z)) risk += k_sigma * cp.pnorm(sqrtm(cov_sigma) @ (cp.vec(X) + cp.vec(Z)), 2) constraints += [M >> 0, Z >> 0] else: G = sqrtm(sigma) risk = g**2 constraints += [cp.SOC(g, G.T @ w)] # Cardinal Boolean Variables if self.card is not None: if obj == "Sharpe": e = cp.Variable((mu.shape[1], 1), boolean=True) e1 = cp.Variable((mu.shape[1], 1)) else: e = cp.Variable((mu.shape[1], 1), boolean=True) # Problem Weight Constraints if obj == "Sharpe": constraints += [cp.sum(w) == self.budget * k, k >= 0] if self.sht == False: constraints += [w <= self.upperlng * k, w >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w <= self.upperlng * e1, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * k * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * k * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w >= -self.uppersht * e1, w <= self.upperlng * e1, ] else: constraints += [cp.sum(w) == self.budget] if self.sht == False: constraints += [w <= self.upperlng, w >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w <= self.upperlng * e, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w >= -self.uppersht * e, w <= self.upperlng * e, ] # Number of effective assets constraints if self.nea is not None: if obj == "Sharpe": constraints += [cp.sum_squares(w) * 1000 <= 1 / self.nea * k * 1000] else: constraints += [cp.sum_squares(w) * 1000 <= 1 / self.nea * 1000] # Tracking Error Model Variables c = np.array(self.benchweights, ndmin=2) if self.kindbench == True: bench = returns @ c elif self.kindbench == False: bench = np.array(self.benchindex, ndmin=2) # Problem additional linear constraints if self.ainequality is not None and self.binequality is not None: A = np.array(self.ainequality, ndmin=2) * 1000 B = np.array(self.binequality, ndmin=2) * 1000 if obj == "Sharpe": constraints += [A @ w - B @ k >= 0] else: constraints += [A @ w - B >= 0] # Tracking error Constraints if obj == "Sharpe": if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench @ k, "fro") / np.sqrt(T - 1) constraints += [TE_1 <= self.TE * k] else: if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench, "fro") / np.sqrt(T - 1) constraints += [TE_1 <= self.TE] # Turnover Constraints if obj == "Sharpe": if self.allowTO == True: TO_1 = cp.abs(w - c @ k) * 1000 constraints += [TO_1 <= self.turnover * k * 1000] else: if self.allowTO == True: TO_1 = cp.abs(w - c) * 1000 constraints += [TO_1 <= self.turnover * 1000] # Frontier Variables portafolio = {} for i in self.assetslist: portafolio.update({i: []}) # Optimization Process # Defining objective function if obj == "Sharpe": objective = cp.Minimize(risk) elif obj == "MinRisk": objective = cp.Minimize(risk) elif obj == "Utility": objective = cp.Maximize(ret - l * risk) elif obj == "MaxRet": objective = cp.Maximize(ret) try: prob = cp.Problem(objective, constraints) for solver in self.solvers: try: if len(self.sol_params) == 0: prob.solve(solver=solver) else: prob.solve(solver=solver, **self.sol_params[solver]) except: pass if w.value is not None: break if obj == "Sharpe": weights = np.array(w.value / k.value, ndmin=2).T else: weights = np.array(w.value, ndmin=2).T if self.sht == False: weights = np.abs(weights) / np.sum(np.abs(weights)) * self.budget for j in self.assetslist: portafolio[j].append(weights[0, self.assetslist.index(j)]) except: pass try: self.wc_optimal = pd.DataFrame( portafolio, index=["weights"], dtype=np.float64 ).T except: self.wc_optimal = None print("The problem doesn't have a solution with actual input parameters") return self.wc_optimal
[文档] def owa_optimization(self, obj="Sharpe", owa_w=None, kelly=False, rf=0, l=2): r""" This method that calculates the owa optimal portfolio according to the weight vector given by the user. The general problem that solves is: .. math:: \begin{align} &\underset{w}{\text{optimize}} & & F(w)\\ &\text{s. t.} & & Aw \geq B\\ \end{align} Where: :math:`F(w)` is the objective function based on an owa risk measure. :math:`Aw \geq B` is a set of linear constraints. Parameters ---------- obj : str can be {'MinRisk', 'Utility', 'Sharpe' or 'MaxRet'}. Objective function of the optimization model. The default is 'Sharpe'. Possible values are: - 'MinRisk': Minimize the selected risk measure. - 'Utility': Maximize the Utility function :math:`\mu w - l \phi_{i}(w)`. - 'Sharpe': Maximize the risk adjusted return ratio based on the selected risk measure. owa_w : 1darray, optional The owa weight used to define the owa risk measure. The default is 'MV'. Possible values are: kelly : str, optional Method used to calculate mean return. Possible values are False for arithmetic mean return, "approx" for approximate mean logarithmic return using first and second moment and "exact" for mean logarithmic return. The default is False. rf : float, optional Risk free rate, must be in the same period of assets returns. The default is 0. l : scalar, optional Risk aversion factor of the 'Utility' objective function. The default is 2. Returns ------- w : DataFrame The weights of optimal portfolio. """ # General model Variables mu = self.mu.to_numpy() sigma = self.cov.to_numpy() returns = self.returns.to_numpy() w = cp.Variable((mu.shape[1], 1)) k = cp.Variable((1, 1)) rf0 = rf T, N = returns.shape gr = cp.Variable((T, 1)) # MV Model Variables (for approx log returns) g = cp.Variable(nonneg=True) G = sqrtm(sigma) devconstraints = [cp.SOC(g, G.T @ w)] # Return Variables if kelly == "exact": if obj == "Sharpe": ret = 1 / T * cp.sum(gr) - rf0 * k else: ret = 1 / T * cp.sum(cp.log(1 + returns @ w)) elif kelly == "approx": if obj == "Sharpe": ret = mu @ w - 0.5 * cp.quad_over_lin(g, k) else: ret = mu @ w - 0.5 * g**2 elif kelly == False: ret = mu @ w # OWA Model Variables a = cp.Variable((T, 1)) b = cp.Variable((T, 1)) y = cp.Variable((T, 1)) risk = cp.sum(a + b) constraints = [] constraints += [returns @ w == y] if owa_w is None: owa_w = owa.owa_gmd(T) / 2 onesvec = np.ones((T, 1)) constraints += [y @ owa_w.T <= onesvec @ a.T + b @ onesvec.T] # Cardinal Boolean Variables if self.card is not None: if obj == "Sharpe": e = cp.Variable((mu.shape[1], 1), boolean=True) e1 = cp.Variable((mu.shape[1], 1)) else: e = cp.Variable((mu.shape[1], 1), boolean=True) # Problem Weight Constraints if obj == "Sharpe": constraints += [cp.sum(w) == self.budget * k, k * 1000 >= 0] if self.sht == False: constraints += [w <= self.upperlng * k, w * 1000 >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w <= self.upperlng * e1, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * k * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * k * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, e1 <= k, e1 >= 0, e1 <= 100000 * e, e1 >= k - 100000 * (1 - e), w >= -self.uppersht * e1, w <= self.upperlng * e1, ] else: constraints += [cp.sum(w) == self.budget] if self.sht == False: constraints += [w <= self.upperlng, w * 1000 >= 0] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w <= self.upperlng * e, ] elif self.sht == True: constraints += [ cp.sum(cp.pos(w)) * 1000 <= self.upperlng * 1000, cp.sum(cp.neg(w)) * 1000 <= self.uppersht * 1000, ] if self.card is not None: constraints += [ cp.sum(e) <= self.card, w >= -self.uppersht * e, w <= self.upperlng * e, ] # Problem Linear Constraints if self.ainequality is not None and self.binequality is not None: A = np.array(self.ainequality, ndmin=2) * 1000 B = np.array(self.binequality, ndmin=2) * 1000 if obj == "Sharpe": constraints += [A @ w - B @ k >= 0] else: constraints += [A @ w - B >= 0] # Number of Effective Assets Constraints if self.nea is not None: if obj == "Sharpe": constraints += [cp.norm(w, "fro") <= 1 / self.nea**0.5 * k] else: constraints += [cp.norm(w, "fro") <= 1 / self.nea**0.5] # Tracking Error Model Variables c = np.array(self.benchweights, ndmin=2) if self.kindbench == True: bench = returns @ c elif self.kindbench == False: bench = np.array(self.benchindex, ndmin=2) # Tracking error Constraints if obj == "Sharpe": if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench @ k, "fro") / cp.sqrt(T - 1) constraints += [TE_1 * 1000 <= self.TE * k * 1000] else: if self.allowTE == True: TE_1 = cp.norm(returns @ w - bench, "fro") / cp.sqrt(T - 1) constraints += [TE_1 * 1000 <= self.TE * 1000] # Turnover Constraints if obj == "Sharpe": if self.allowTO == True: TO_1 = cp.abs(w - c @ k) * 1000 constraints += [TO_1 <= self.turnover * k * 1000] else: if self.allowTO == True: TO_1 = cp.abs(w - c) * 1000 constraints += [TO_1 <= self.turnover * 1000] # Problem return Constraints if self.lowerret is not None: if obj == "Sharpe": constraints += [ret >= self.lowerret * k] else: constraints += [ret >= self.lowerret] # Frontier Variables portafolio = {} for i in self.assetslist: portafolio.update({i: []}) # Optimization Process # Defining objective function if obj == "Sharpe": if kelly == "exact": constraints += [risk <= 1] constraints += [ cp.constraints.ExpCone(gr, np.ones((T, 1)) @ k, k + returns @ w) ] objective = cp.Maximize(ret * 1000) elif kelly == "approx": constraints += [risk <= 1] constraints += devconstraints objective = cp.Maximize(ret) elif kelly == False: constraints += [mu @ w - rf0 * k == 1] objective = cp.Minimize(risk * 1000) elif obj == "MinRisk": objective = cp.Minimize(risk * 1000) elif obj == "Utility": objective = cp.Maximize(ret - l * risk) elif obj == "MaxRet": objective = cp.Maximize(ret * 1000) try: prob = cp.Problem(objective, constraints) for solver in self.solvers: try: if len(self.sol_params) == 0: prob.solve(solver=solver) else: prob.solve(solver=solver, **self.sol_params[solver]) except: pass if w.value is not None: break if obj == "Sharpe": weights = np.array(w.value / k.value, ndmin=2).T else: weights = np.array(w.value, ndmin=2).T if self.sht == False: weights = np.abs(weights) / np.sum(np.abs(weights)) * self.budget for j in self.assetslist: portafolio[j].append(weights[0, self.assetslist.index(j)]) except: pass try: self.owa_optimal = pd.DataFrame( portafolio, index=["weights"], dtype=np.float64 ).T except: self.owa_optimal = None print("The problem doesn't have a solution with actual input parameters") return self.owa_optimal
[文档] def frontier_limits(self, model="Classic", rm="MV", kelly=False, rf=0, hist=True): r""" Method that calculates the minimum risk and maximum return portfolios available with current assets and constraints. Parameters ---------- model : str, optional Methodology used to estimate input parameters. The default is 'Classic'. rm : str, optional The risk measure used to optimize the portfolio. The default is 'MV'. Possible values are: - 'MV': Standard Deviation. - 'KT': Square Root of Kurtosis. - 'MAD': Mean Absolute Deviation. - 'GMD': Gini Mean Difference. - 'MSV': Semi Standard Deviation. - 'SKT': Square Root of Semi Kurtosis. - 'FLPM': First Lower Partial Moment (Omega Ratio). - 'SLPM': Second Lower Partial Moment (Sortino Ratio). - 'CVaR': Conditional Value at Risk. - 'TG': Tail Gini. - 'EVaR': Entropic Value at Risk. - 'RLVaR': Relativistic Value at Risk. - 'WR': Worst Realization (Minimax). - 'RG': Range of returns. - 'CVRG': CVaR range of returns. - 'TGRG': Tail Gini range of returns. - 'MDD': Maximum Drawdown of uncompounded cumulative returns (Calmar Ratio). - 'ADD': Average Drawdown of uncompounded cumulative returns. - 'CDaR': Conditional Drawdown at Risk of uncompounded cumulative returns. - 'EDaR': Entropic Drawdown at Risk of uncompounded cumulative returns. - 'RLDaR': Relativistic Drawdown at Risk of uncompounded cumulative returns. - 'UCI': Ulcer Index of uncompounded cumulative returns. kelly : str, optional Method used to calculate mean return. Possible values are False for arithmetic mean return, "approx" for approximate mean logarithmic return using first and second moment and "exact" for mean logarithmic return. The default is False. rf : scalar, optional Risk free rate. The default is 0. hist : bool, optional Indicate what kind of returns are used to calculate risk measures that depends on scenarios (All except 'MV' risk measure). If model = 'BL', True means historical covariance and returns and False Black Litterman covariance and historical returns. If model = 'FM', True means historical covariance and returns and False Risk Factor model for covariance and returns. If model = 'BL_FM', True means historical covariance and returns, False Black Litterman with Risk Factor model for covariance and Risk Factor model for returns, and '2' Risk Factor model for covariance and returns. The default is True. Returns ------- limits : DataFrame A dataframe that containts the weights of the portfolios. Notes ----- This method is preferable (faster) to use instead of efficient_frontier method to know the range of expected return and expected risk. """ w_min = self.optimization( model=model, rm=rm, obj="MinRisk", kelly=kelly, rf=rf, l=0, hist=hist ) w_max = self.optimization( model=model, rm=rm, obj="MaxRet", kelly=kelly, rf=rf, l=0, hist=hist ) if w_min is not None and w_max is not None: self.limits = pd.concat([w_min, w_max], axis=1) self.limits.columns = ["w_min", "w_max"] return self.limits else: raise NameError("The limits of the frontier can't be found")
[文档] def efficient_frontier( self, model="Classic", rm="MV", kelly=False, points=20, rf=0, solver=None, hist=True, ): r""" Method that calculates several portfolios in the efficient frontier of the selected risk measure, available with current assets and constraints. Parameters ---------- model : str, optional Methodology used to estimate input parameters. The default is 'Classic'. rm : str, optional The risk measure used to optimize the portfolio. The default is 'MV'. Possible values are: - 'MV': Standard Deviation. - 'KT': Square Root of Kurtosis. - 'MAD': Mean Absolute Deviation. - 'GMD': Gini Mean Difference. - 'MSV': Semi Standard Deviation. - 'SKT': Square Root of Semi Kurtosis. - 'FLPM': First Lower Partial Moment (Omega Ratio). - 'SLPM': Second Lower Partial Moment (Sortino Ratio). - 'CVaR': Conditional Value at Risk. - 'TG': Tail Gini. - 'EVaR': Entropic Value at Risk. - 'RLVaR': Relativistic Value at Risk. - 'WR': Worst Realization (Minimax). - 'RG': Range of returns. - 'CVRG': CVaR range of returns. - 'TGRG': Tail Gini range of returns. - 'MDD': Maximum Drawdown of uncompounded cumulative returns (Calmar Ratio). - 'ADD': Average Drawdown of uncompounded cumulative returns. - 'CDaR': Conditional Drawdown at Risk of uncompounded cumulative returns. - 'EDaR': Entropic Drawdown at Risk of uncompounded cumulative returns. - 'RLDaR': Relativistic Drawdown at Risk of uncompounded cumulative returns. - 'UCI': Ulcer Index of uncompounded cumulative returns. kelly : str, optional Method used to calculate mean return. Possible values are False for arithmetic mean return, "approx" for approximate mean logarithmic return using first and second moment and "exact" for mean logarithmic return. The default is False. points : scalar, optional Number of point calculated from the efficient frontier. The default is 50. rf : scalar, optional Risk free rate. The default is 0. solver: str, optional Solver available for CVXPY that supports power cone programming. Used to calculate RLVaR and RLDaR. The default value is None. hist : bool, optional Indicate what kind of returns are used to calculate risk measures that depends on scenarios (All except 'MV' risk measure). If model = 'BL', True means historical covariance and returns and False Black Litterman covariance and historical returns. If model = 'FM', True means historical covariance and returns and False Risk Factor model for covariance and returns. If model = 'BL_FM', True means historical covariance and returns, False Black Litterman with Risk Factor model for covariance and Risk Factor model for returns, and '2' Risk Factor model for covariance and returns. The default is True. Returns ------- frontier : DataFrame A dataframe that containts the weights of the portfolios. Notes ----- It's recommendable that don't use this method when there are too many assets (more than 100) and you are using a scenario based risk measure (all except standard deviation). It's preferable to use frontier_limits method (faster) to know the range of expected return and expected risk. """ mu = None sigma = None returns = None if model == "Classic": mu = np.array(self.mu, ndmin=2) sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "FM": mu = np.array(self.mu_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "BL": mu = np.array(self.mu_bl, ndmin=2) if hist == False: sigma = np.array(self.cov_bl, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif model == "BL_FM": mu = np.array(self.mu_bl_fm, ndmin=2) if hist == False: sigma = np.array(self.cov_bl_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) elif hist == True: sigma = np.array(self.cov, ndmin=2) returns = np.array(self.returns, ndmin=2) elif hist == 2: sigma = np.array(self.cov_fm, ndmin=2) returns = np.array(self.returns_fm, ndmin=2) alpha = self.alpha a_sim = self.a_sim if self.beta is None: beta = alpha else: beta = self.beta if self.b_sim is None: b_sim = a_sim else: b_sim = self.b_sim kappa = self.kappa limits = self.frontier_limits(model=model, rm=rm, kelly=kelly, rf=rf, hist=hist) w_min = np.array(limits.iloc[:, 0], ndmin=2).T w_max = np.array(limits.iloc[:, 1], ndmin=2).T ret_min = (mu @ w_min).item() ret_max = (mu @ w_max).item() if rm == "MV": risk_min = np.sqrt(w_min.T @ sigma @ w_min).item() risk_max = np.sqrt(w_max.T @ sigma @ w_max).item() elif rm == "KT": risk_min = rk.Kurtosis(returns @ w_min) risk_max = rk.Kurtosis(returns @ w_max) elif rm == "MAD": risk_min = rk.MAD(returns @ w_min) risk_max = rk.MAD(returns @ w_max) elif rm == "MSV": risk_min = rk.SemiDeviation(returns @ w_min) risk_max = rk.SemiDeviation(returns @ w_max) elif rm == "SKT": risk_min = rk.SemiKurtosis(returns @ w_min) risk_max = rk.SemiKurtosis(returns @ w_max) elif rm == "CVaR": risk_min = rk.CVaR_Hist(returns @ w_min, alpha) risk_max = rk.CVaR_Hist(returns @ w_max, alpha) elif rm == "WR": risk_min = rk.WR(returns @ w_min) risk_max = rk.WR(returns @ w_max) elif rm == "FLPM": risk_min = rk.LPM(returns @ w_min, rf, 1) risk_max = rk.LPM(returns @ w_max, rf, 1) elif rm == "SLPM": risk_min = rk.LPM(returns @ w_min, rf, 2) risk_max = rk.LPM(returns @ w_max, rf, 2) elif rm == "MDD": risk_min = rk.MDD_Abs(returns @ w_min) risk_max = rk.MDD_Abs(returns @ w_max) elif rm == "ADD": risk_min = rk.ADD_Abs(returns @ w_min) risk_max = rk.ADD_Abs(returns @ w_max) elif rm == "CDaR": risk_min = rk.CDaR_Abs(returns @ w_min, alpha) risk_max = rk.CDaR_Abs(returns @ w_max, alpha) elif rm == "UCI": risk_min = rk.UCI_Abs(returns @ w_min) risk_max = rk.UCI_Abs(returns @ w_max) elif rm == "EVaR": risk_min = rk.EVaR_Hist(returns @ w_min, alpha)[0] risk_max = rk.EVaR_Hist(returns @ w_max, alpha)[0] elif rm == "EDaR": risk_min = rk.EDaR_Abs(returns @ w_min, alpha)[0] risk_max = rk.EDaR_Abs(returns @ w_max, alpha)[0] elif rm == "GMD": risk_min = rk.GMD(returns @ w_min) risk_max = rk.GMD(returns @ w_max) elif rm == "TG": risk_min = rk.TG(returns @ w_min, alpha, a_sim) risk_max = rk.TG(returns @ w_max, alpha, a_sim) elif rm == "RG": risk_min = rk.RG(returns @ w_min) risk_max = rk.RG(returns @ w_max) elif rm == "CVRG": risk_min = rk.CVRG(returns @ w_min, alpha, beta) risk_max = rk.CVRG(returns @ w_max, alpha, beta) elif rm == "TGRG": risk_min = rk.TGRG(returns @ w_min, alpha, a_sim, beta, b_sim) risk_max = rk.TGRG(returns @ w_max, alpha, a_sim, beta, b_sim) elif rm == "RLVaR": risk_min = rk.RLVaR_Hist(returns @ w_min, alpha, kappa, solver) risk_max = rk.RLVaR_Hist(returns @ w_max, alpha, kappa, solver) elif rm == "RLDaR": risk_min = rk.RLDaR_Abs(returns @ w_min, alpha, kappa, solver) risk_max = rk.RLDaR_Abs(returns @ w_max, alpha, kappa, solver) mus = np.linspace(ret_min, ret_max, points) risks = np.linspace(risk_min, risk_max, points) risk_lims = [ "upperdev", "upperkt", "uppermad", "uppergmd", "uppersdev", "upperskt", "upperCVaR", "uppertg", "upperEVaR", "upperRLVaR", "upperwr", "upperrg", "uppercvrg", "uppertgrg", "upperflpm", "upperslpm", "uppermdd", "upperadd", "upperCDaR", "upperEDaR", "upperRLDaR", "upperuci", ] risk_names = [ "MV", "KT", "MAD", "GMD", "MSV", "SKT", "CVaR", "TG", "EVaR", "RLVaR", "WR", "RG", "CVRG", "TGRG", "FLPM", "SLPM", "MDD", "ADD", "CDaR", "EDaR", "RLDaR", "UCI", ] item = risk_names.index(rm) frontier = [] n = 0 for i in range(len(risks)): try: if n == 0: w = self.optimization( model=model, rm=rm, obj="MinRisk", kelly=kelly, rf=rf, l=0, hist=hist, ) else: setattr(self, risk_lims[item], risks[i]) w = self.optimization( model=model, rm=rm, obj="MaxRet", kelly=kelly, rf=rf, l=0, hist=hist, ) if w is not None: n += 1 frontier.append(w) except: pass setattr(self, risk_lims[item], None) self.frontier = pd.concat(frontier, axis=1) self.frontier.columns = list(range(n)) return self.frontier
[文档] def reset_risk_constraints(self): r""" Reset all risk constraints. """ cons = [ "lowerret", "upperdev", "upperkt", "uppermad", "uppersdev", "upperskt", "upperCVaR", "upperEVaR", "upperRLVaR", "upperwr", "upperflpm", "upperslpm", "uppermdd", "upperadd", "upperCDaR", "upperEDaR", "upperRLDaR", "upperuci", "uppergmd", "uppertg", "upperrg", "uppercvrg", "uppertgrg", ] for i in cons: setattr(self, i, None)
[文档] def reset_linear_constraints(self): r""" Reset all linear constraints. """ self.ainequality = None self.binequality = None
[文档] def reset_inputs(self): r""" Reset all inputs parameters of optimization models. """ cons = [ "mu", "cov", "kurt", "skurt", "L_2", "S_2", "mu_fm", "cov_fm", "mu_bl", "cov_bl", "mu_bl_fm", "cov_bl_fm", "returns_fm", "cov_l", "cov_u", "cov_mu", "cov_sigma", "d_mu", "k_mu", "k_sigma", "z_EVaR", "z_EDaR", "z_RLVaR", "z_RLDaR", ] for i in cons: setattr(self, i, None)
[文档] def reset_all(self): r""" Reset portfolio object to defatult values. """ self.sht = False self.uppersht = 0.2 self.upperlng = 1 self.budget = 1 self.nea = None self.card = None self._factors = None self.B = None self.alpha = 0.05 self.a_sim = 100 self.beta = None self.b_sim = None self.kappa = 0.30 self.kindbench = True self.benchindex = None self._benchweights = None self.allowTO = False self.turnover = 0.05 self.allowTE = False self.TE = 0.05 self.reset_risk_constraints() self.reset_linear_constraints() self.reset_inputs()