pypfopt.risk_models 源代码

"""
``risk_models`` 模块提供了在给定历史收益率的情况下估计协方差矩阵的函数。

数据输入的格式与 :ref:`expected-returns` 中的相同。

目前已经实现:

- 固定非半正定矩阵
- 一般风险矩阵函数,允许调用一个函数来运行风险模型。
- 样本协方差
- 半方差
- 指数加权协方差
- 最小协方差决定数
- 缩减协方差矩阵:

    - 手动收缩
    - Ledoit Wolf 收缩
    - Oracle 近似收缩

- 协方差到相关矩阵
"""

import warnings

import numpy as np
import pandas as pd

from .expected_returns import returns_from_prices


def _is_positive_semidefinite(matrix):
    """
    Helper function to check if a given matrix is positive semidefinite.
    Any method that requires inverting the covariance matrix will struggle
    with a non-positive semidefinite matrix

    :param matrix: (covariance) matrix to test
    :type matrix: np.ndarray, pd.DataFrame
    :return: whether matrix is positive semidefinite
    :rtype: bool
    """
    try:
        # Significantly more efficient than checking eigenvalues (stackoverflow.com/questions/16266720)
        np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
        return True
    except np.linalg.LinAlgError:
        return False


[文档] def fix_nonpositive_semidefinite(matrix, fix_method="spectral"): """ 检查协方差矩阵是否为半正定矩阵,如果不是,就用所选的方法修正它。 ``spectral`` 将负的特征值设置为零,然后重建矩阵,而 ``diag`` 法在对角线上增加一个小的正值。 :param matrix: 原始协方差矩阵(不是 PSD) :type matrix: pd.DataFrame :param fix_method: {"spectral", "diag"}, 默认为 "spectral" :type fix_method: str, optional :raises NotImplementedError: 如果 method 方法未实现 :return: 半正定协方差矩阵 :rtype: pd.DataFrame """ if _is_positive_semidefinite(matrix): return matrix warnings.warn( "The covariance matrix is non positive semidefinite. Amending eigenvalues." ) # Eigendecomposition q, V = np.linalg.eigh(matrix) if fix_method == "spectral": # Remove negative eigenvalues q = np.where(q > 0, q, 0) # Reconstruct matrix fixed_matrix = V @ np.diag(q) @ V.T elif fix_method == "diag": min_eig = np.min(q) fixed_matrix = matrix - 1.1 * min_eig * np.eye(len(matrix)) else: raise NotImplementedError("Method {} not implemented".format(fix_method)) if not _is_positive_semidefinite(fixed_matrix): # pragma: no cover warnings.warn( "Could not fix matrix. Please try a different risk model.", UserWarning ) # Rebuild labels if provided if isinstance(matrix, pd.DataFrame): tickers = matrix.index return pd.DataFrame(fixed_matrix, index=tickers, columns=tickers) else: return fixed_matrix
[文档] def risk_matrix(prices, method="sample_cov", **kwargs): """ 计算协方差矩阵,风险模型以 ``method`` 参数指定。 :param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。 :type prices: pd.DataFrame :param returns_data: 如果是 true, 第一个参数是收益率而不是价格 :type returns_data: bool, 默认为 False. :param method: 要使用的风险模型。应该是其中一个: - ``sample_cov`` - ``semicovariance`` - ``exp_cov`` - ``ledoit_wolf`` - ``ledoit_wolf_constant_variance`` - ``ledoit_wolf_single_factor`` - ``ledoit_wolf_constant_correlation`` - ``oracle_approximating`` :type method: str, optional :raises NotImplementedError: 如果 method 错误 :return: 年化样本协方差矩阵 :rtype: pd.DataFrame """ if method == "sample_cov": return sample_cov(prices, **kwargs) elif method == "semicovariance" or method == "semivariance": return semicovariance(prices, **kwargs) elif method == "exp_cov": return exp_cov(prices, **kwargs) elif method == "ledoit_wolf" or method == "ledoit_wolf_constant_variance": return CovarianceShrinkage(prices, **kwargs).ledoit_wolf() elif method == "ledoit_wolf_single_factor": return CovarianceShrinkage(prices, **kwargs).ledoit_wolf( shrinkage_target="single_factor" ) elif method == "ledoit_wolf_constant_correlation": return CovarianceShrinkage(prices, **kwargs).ledoit_wolf( shrinkage_target="constant_correlation" ) elif method == "oracle_approximating": return CovarianceShrinkage(prices, **kwargs).oracle_approximating() else: raise NotImplementedError("Risk model {} not implemented".format(method))
[文档] def sample_cov(prices, returns_data=False, frequency=252, log_returns=False, **kwargs): """ 计算(日频)资产收益的年化样本协方差矩阵。 :param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。 :type prices: pd.DataFrame :param returns_data: 如果是 true, 第一个参数是收益率而不是价格 :type returns_data: bool, 默认为 False. :param frequency: 一年中的时间数,默认为252(一年中的交易日数)。 :type frequency: int, optional :param log_returns: 是否使用对数收益进行计算 :type log_returns: bool, 默认为 False :return: 年化样本协方差矩阵 :rtype: pd.DataFrame """ if not isinstance(prices, pd.DataFrame): warnings.warn("data is not in a dataframe", RuntimeWarning) prices = pd.DataFrame(prices) if returns_data: returns = prices else: returns = returns_from_prices(prices, log_returns) return fix_nonpositive_semidefinite( returns.cov() * frequency, kwargs.get("fix_method", "spectral") )
[文档] def semicovariance( prices, returns_data=False, benchmark=0.000079, frequency=252, log_returns=False, **kwargs ): """ 估算半方差矩阵,即收益率低于基准的协方差。 .. semicov = E([min(r_i - B, 0)] . [min(r_j - B, 0)]) :param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。 :type prices: pd.DataFrame :param returns_data: 如果是 true, 第一个参数是收益率而不是价格 :type returns_data: bool, defaults to False. :param benchmark: 基准收益,默认为每日无风险利率,即 :math:`1.02^{(1/252)} -1` 。 :type benchmark: float :param frequency: 一年中的时间数,默认为252(一年中的交易日数)。确保你使用适当的基准,例如,如果 ``frequency=12`` ,使用月度无风险利率。 :type frequency: int, optional :param log_returns: 是否使用对数收益进行计算 :type log_returns: bool, 默认为 False :return: 半方差矩阵 :rtype: pd.DataFrame """ if not isinstance(prices, pd.DataFrame): warnings.warn("data is not in a dataframe", RuntimeWarning) prices = pd.DataFrame(prices) if returns_data: returns = prices else: returns = returns_from_prices(prices, log_returns) drops = np.fmin(returns - benchmark, 0) T = drops.shape[0] return fix_nonpositive_semidefinite( (drops.T @ drops) / T * frequency, kwargs.get("fix_method", "spectral") )
def _pair_exp_cov(X, Y, span=180): """ Calculate the exponential covariance between two timeseries of returns. :param X: first time series of returns :type X: pd.Series :param Y: second time series of returns :type Y: pd.Series :param span: the span of the exponential weighting function, defaults to 180 :type span: int, optional :return: the exponential covariance between X and Y :rtype: float """ covariation = (X - X.mean()) * (Y - Y.mean()) # Exponentially weight the covariation and take the mean if span < 10: warnings.warn("it is recommended to use a higher span, e.g 30 days") return covariation.ewm(span=span).mean().iloc[-1]
[文档] def exp_cov( prices, returns_data=False, span=180, frequency=252, log_returns=False, **kwargs ): """ 估算指数加权的协方差矩阵,该矩阵对最近的数据给予更大的权重。 :param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。 :type prices: pd.DataFrame :param returns_data: 如果是 true,则第一个参数是收益率,而不是价格 :param span: 指数加权函数的跨度,默认为 180 :type span: int, optional :param frequency: 一年中的时间数,默认为252(一年中的交易日数)。 :type frequency: int, optional :param log_returns: 是否使用对数收益率进行计算 :type log_returns: bool, 默认为 False :return: 指数协方差矩阵的年化估计值 :rtype: pd.DataFrame """ if not isinstance(prices, pd.DataFrame): warnings.warn("data is not in a dataframe", RuntimeWarning) prices = pd.DataFrame(prices) assets = prices.columns if returns_data: returns = prices else: returns = returns_from_prices(prices, log_returns) N = len(assets) # Loop over matrix, filling entries with the pairwise exp cov S = np.zeros((N, N)) for i in range(N): for j in range(i, N): S[i, j] = S[j, i] = _pair_exp_cov( returns.iloc[:, i], returns.iloc[:, j], span ) cov = pd.DataFrame(S * frequency, columns=assets, index=assets) return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def min_cov_determinant( prices, returns_data=False, frequency=252, random_state=None, log_returns=False, **kwargs ): # pragma: no cover warnings.warn("min_cov_determinant is deprecated and will be removed in v1.5") if not isinstance(prices, pd.DataFrame): warnings.warn("data is not in a dataframe", RuntimeWarning) prices = pd.DataFrame(prices) # Extra dependency try: import sklearn.covariance except (ModuleNotFoundError, ImportError): raise ImportError("Please install scikit-learn via pip or poetry") assets = prices.columns if returns_data: X = prices else: X = returns_from_prices(prices, log_returns) # X = np.nan_to_num(X.values) X = X.dropna().values raw_cov_array = sklearn.covariance.fast_mcd(X, random_state=random_state)[1] cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * frequency return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
[文档] def cov_to_corr(cov_matrix): """ 将一个协方差矩阵转换为一个相关矩阵。 :param cov_matrix: 协方差矩阵 :type cov_matrix: pd.DataFrame :return: 相关矩阵 :rtype: pd.DataFrame """ if not isinstance(cov_matrix, pd.DataFrame): warnings.warn("cov_matrix is not a dataframe", RuntimeWarning) cov_matrix = pd.DataFrame(cov_matrix) Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix))) corr = np.dot(Dinv, np.dot(cov_matrix, Dinv)) return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)
[文档] def corr_to_cov(corr_matrix, stdevs): """ 将相关矩阵转换为协方差矩阵 :param corr_matrix: 相关矩阵 :type corr_matrix: pd.DataFrame :param stdevs: 标准差的向量 :type stdevs: array-like :return: 协方差矩阵 :rtype: pd.DataFrame """ if not isinstance(corr_matrix, pd.DataFrame): warnings.warn("corr_matrix is not a dataframe", RuntimeWarning) corr_matrix = pd.DataFrame(corr_matrix) return corr_matrix * np.outer(stdevs, stdevs)
[文档] class CovarianceShrinkage: """ 提供计算协方差矩阵的收缩估计的方法,使用样本协方差矩阵,并选择结构化估计器为单位矩阵乘以平均样本方差。 收缩常数可以手动输入,也存在估计最优值的方法(推荐 Ledoit Wolf)。 Instance variables: - ``X`` - pd.DataFrame (收益率) - ``S`` - np.ndarray (样本协方差矩阵) - ``delta`` - float (收缩常数) - ``frequency`` - int """
[文档] def __init__(self, prices, returns_data=False, frequency=252, log_returns=False): """ :param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。 :type prices: pd.DataFrame :param returns_data: 如果是 true, 则第一个参数是收益率而不是价格 :type returns_data: bool, 默认为 False. :param frequency: 一年中的时间段数,默认为252(一年中的交易日数) :type frequency: int, optional :param log_returns: 是否使用对数收益进行计算 :type log_returns: bool, 默认为 False """ # Optional import try: from sklearn import covariance self.covariance = covariance except (ModuleNotFoundError, ImportError): # pragma: no cover raise ImportError("Please install scikit-learn via pip or poetry") if not isinstance(prices, pd.DataFrame): warnings.warn("data is not in a dataframe", RuntimeWarning) prices = pd.DataFrame(prices) self.frequency = frequency if returns_data: self.X = prices.dropna(how="all") else: self.X = returns_from_prices(prices, log_returns).dropna(how="all") self.S = self.X.cov().values self.delta = None # shrinkage constant
def _format_and_annualize(self, raw_cov_array): """ Helper method which annualises the output of shrinkage calculations, and formats the result into a dataframe :param raw_cov_array: raw covariance matrix of daily returns :type raw_cov_array: np.ndarray :return: annualised covariance matrix :rtype: pd.DataFrame """ assets = self.X.columns cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * self.frequency return fix_nonpositive_semidefinite(cov, fix_method="spectral")
[文档] def shrunk_covariance(self, delta=0.2): """ 将一个样本协方差矩阵缩减为单位矩阵(按平均样本方差缩放)。 这种方法并不估计最佳收缩参数,它需要手动输入。 :param delta: 收缩参数, 默认为 0.2. :type delta: float, optional :return: 收缩样本的协方差矩阵 :rtype: np.ndarray """ self.delta = delta N = self.S.shape[1] # Shrinkage target mu = np.trace(self.S) / N F = np.identity(N) * mu # Shrinkage shrunk_cov = delta * F + (1 - delta) * self.S return self._format_and_annualize(shrunk_cov)
[文档] def ledoit_wolf(self, shrinkage_target="constant_variance"): """ 计算特定收缩目标的 Ledoit-Wolf 收缩估计值。 :param shrinkage_target: 收缩目标选项, ``constant_variance``、 ``single_factor`` 或 ``constant_correlation``. 默认为 ``constant_variance``. :type shrinkage_target: str, optional :raises NotImplementedError: 如果 shrinkage_target 错误 :return: 收缩样本协方差矩阵 :rtype: np.ndarray """ if shrinkage_target == "constant_variance": X = np.nan_to_num(self.X.values) shrunk_cov, self.delta = self.covariance.ledoit_wolf(X) elif shrinkage_target == "single_factor": shrunk_cov, self.delta = self._ledoit_wolf_single_factor() elif shrinkage_target == "constant_correlation": shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation() else: raise NotImplementedError( "Shrinkage target {} not recognised".format(shrinkage_target) ) return self._format_and_annualize(shrunk_cov)
def _ledoit_wolf_single_factor(self): """ Helper method to calculate the Ledoit-Wolf shrinkage estimate with the Sharpe single-factor matrix as the shrinkage target. See Ledoit and Wolf (2001). :return: shrunk sample covariance matrix, shrinkage constant :rtype: np.ndarray, float """ X = np.nan_to_num(self.X.values) # De-mean returns t, n = np.shape(X) Xm = X - X.mean(axis=0) xmkt = Xm.mean(axis=1).reshape(t, 1) # compute sample covariance matrix sample = np.cov(np.append(Xm, xmkt, axis=1), rowvar=False) * (t - 1) / t betas = sample[0:n, n].reshape(n, 1) varmkt = sample[n, n] sample = sample[:n, :n] F = np.dot(betas, betas.T) / varmkt F[np.eye(n) == 1] = np.diag(sample) # compute shrinkage parameters c = np.linalg.norm(sample - F, "fro") ** 2 y = Xm**2 p = 1 / t * np.sum(np.dot(y.T, y)) - np.sum(sample**2) # r is divided into diagonal # and off-diagonal terms, and the off-diagonal term # is itself divided into smaller terms rdiag = 1 / t * np.sum(y**2) - sum(np.diag(sample) ** 2) z = Xm * np.tile(xmkt, (n,)) v1 = 1 / t * np.dot(y.T, z) - np.tile(betas, (n,)) * sample roff1 = ( np.sum(v1 * np.tile(betas, (n,)).T) / varmkt - np.sum(np.diag(v1) * betas.T) / varmkt ) v3 = 1 / t * np.dot(z.T, z) - varmkt * sample roff3 = ( np.sum(v3 * np.dot(betas, betas.T)) / varmkt**2 - np.sum(np.diag(v3).reshape(-1, 1) * betas**2) / varmkt**2 ) roff = 2 * roff1 - roff3 r = rdiag + roff # compute shrinkage constant k = (p - r) / c delta = max(0, min(1, k / t)) # compute the estimator shrunk_cov = delta * F + (1 - delta) * sample return shrunk_cov, delta def _ledoit_wolf_constant_correlation(self): """ Helper method to calculate the Ledoit-Wolf shrinkage estimate with the constant correlation matrix as the shrinkage target. See Ledoit and Wolf (2003) :return: shrunk sample covariance matrix, shrinkage constant :rtype: np.ndarray, float """ X = np.nan_to_num(self.X.values) t, n = np.shape(X) S = self.S # sample cov matrix # Constant correlation target var = np.diag(S).reshape(-1, 1) std = np.sqrt(var) _var = np.tile(var, (n,)) _std = np.tile(std, (n,)) r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1)) F = r_bar * (_std * _std.T) F[np.eye(n) == 1] = var.reshape(-1) # Estimate pi Xm = X - X.mean(axis=0) y = Xm**2 pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S**2 pi_hat = np.sum(pi_mat) # Theta matrix, expanded term by term term1 = np.dot((Xm**3).T, Xm) / t help_ = np.dot(Xm.T, Xm) / t help_diag = np.diag(help_) term2 = np.tile(help_diag, (n, 1)).T * S term3 = help_ * _var term4 = _var * S theta_mat = term1 - term2 - term3 + term4 theta_mat[np.eye(n) == 1] = np.zeros(n) rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum( np.dot((1 / std), std.T) * theta_mat ) # Estimate gamma gamma_hat = np.linalg.norm(S - F, "fro") ** 2 # Compute shrinkage constant kappa_hat = (pi_hat - rho_hat) / gamma_hat delta = max(0.0, min(1.0, kappa_hat / t)) # Compute shrunk covariance matrix shrunk_cov = delta * F + (1 - delta) * S return shrunk_cov, delta
[文档] def oracle_approximating(self): """ 计算 Oracle 近似收缩的估算值 :return: 收缩样本协方差矩阵 :rtype: np.ndarray """ X = np.nan_to_num(self.X.values) shrunk_cov, self.delta = self.covariance.oas(X) return self._format_and_annualize(shrunk_cov)