"""
``risk_models`` 模块提供了在给定历史收益率的情况下估计协方差矩阵的函数。
数据输入的格式与 :ref:`expected-returns` 中的相同。
目前已经实现:
- 固定非半正定矩阵
- 一般风险矩阵函数,允许调用一个函数来运行风险模型。
- 样本协方差
- 半方差
- 指数加权协方差
- 最小协方差决定数
- 缩减协方差矩阵:
- 手动收缩
- Ledoit Wolf 收缩
- Oracle 近似收缩
- 协方差到相关矩阵
"""
import warnings
import numpy as np
import pandas as pd
from .expected_returns import returns_from_prices
def _is_positive_semidefinite(matrix):
"""
Helper function to check if a given matrix is positive semidefinite.
Any method that requires inverting the covariance matrix will struggle
with a non-positive semidefinite matrix
:param matrix: (covariance) matrix to test
:type matrix: np.ndarray, pd.DataFrame
:return: whether matrix is positive semidefinite
:rtype: bool
"""
try:
# Significantly more efficient than checking eigenvalues (stackoverflow.com/questions/16266720)
np.linalg.cholesky(matrix + 1e-16 * np.eye(len(matrix)))
return True
except np.linalg.LinAlgError:
return False
[文档]
def fix_nonpositive_semidefinite(matrix, fix_method="spectral"):
"""
检查协方差矩阵是否为半正定矩阵,如果不是,就用所选的方法修正它。
``spectral`` 将负的特征值设置为零,然后重建矩阵,而 ``diag`` 法在对角线上增加一个小的正值。
:param matrix: 原始协方差矩阵(不是 PSD)
:type matrix: pd.DataFrame
:param fix_method: {"spectral", "diag"}, 默认为 "spectral"
:type fix_method: str, optional
:raises NotImplementedError: 如果 method 方法未实现
:return: 半正定协方差矩阵
:rtype: pd.DataFrame
"""
if _is_positive_semidefinite(matrix):
return matrix
warnings.warn(
"The covariance matrix is non positive semidefinite. Amending eigenvalues."
)
# Eigendecomposition
q, V = np.linalg.eigh(matrix)
if fix_method == "spectral":
# Remove negative eigenvalues
q = np.where(q > 0, q, 0)
# Reconstruct matrix
fixed_matrix = V @ np.diag(q) @ V.T
elif fix_method == "diag":
min_eig = np.min(q)
fixed_matrix = matrix - 1.1 * min_eig * np.eye(len(matrix))
else:
raise NotImplementedError("Method {} not implemented".format(fix_method))
if not _is_positive_semidefinite(fixed_matrix): # pragma: no cover
warnings.warn(
"Could not fix matrix. Please try a different risk model.", UserWarning
)
# Rebuild labels if provided
if isinstance(matrix, pd.DataFrame):
tickers = matrix.index
return pd.DataFrame(fixed_matrix, index=tickers, columns=tickers)
else:
return fixed_matrix
[文档]
def risk_matrix(prices, method="sample_cov", **kwargs):
"""
计算协方差矩阵,风险模型以 ``method`` 参数指定。
:param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。
:type prices: pd.DataFrame
:param returns_data: 如果是 true, 第一个参数是收益率而不是价格
:type returns_data: bool, 默认为 False.
:param method: 要使用的风险模型。应该是其中一个:
- ``sample_cov``
- ``semicovariance``
- ``exp_cov``
- ``ledoit_wolf``
- ``ledoit_wolf_constant_variance``
- ``ledoit_wolf_single_factor``
- ``ledoit_wolf_constant_correlation``
- ``oracle_approximating``
:type method: str, optional
:raises NotImplementedError: 如果 method 错误
:return: 年化样本协方差矩阵
:rtype: pd.DataFrame
"""
if method == "sample_cov":
return sample_cov(prices, **kwargs)
elif method == "semicovariance" or method == "semivariance":
return semicovariance(prices, **kwargs)
elif method == "exp_cov":
return exp_cov(prices, **kwargs)
elif method == "ledoit_wolf" or method == "ledoit_wolf_constant_variance":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf()
elif method == "ledoit_wolf_single_factor":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="single_factor"
)
elif method == "ledoit_wolf_constant_correlation":
return CovarianceShrinkage(prices, **kwargs).ledoit_wolf(
shrinkage_target="constant_correlation"
)
elif method == "oracle_approximating":
return CovarianceShrinkage(prices, **kwargs).oracle_approximating()
else:
raise NotImplementedError("Risk model {} not implemented".format(method))
[文档]
def sample_cov(prices, returns_data=False, frequency=252, log_returns=False, **kwargs):
"""
计算(日频)资产收益的年化样本协方差矩阵。
:param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。
:type prices: pd.DataFrame
:param returns_data: 如果是 true, 第一个参数是收益率而不是价格
:type returns_data: bool, 默认为 False.
:param frequency: 一年中的时间数,默认为252(一年中的交易日数)。
:type frequency: int, optional
:param log_returns: 是否使用对数收益进行计算
:type log_returns: bool, 默认为 False
:return: 年化样本协方差矩阵
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
return fix_nonpositive_semidefinite(
returns.cov() * frequency, kwargs.get("fix_method", "spectral")
)
[文档]
def semicovariance(
prices,
returns_data=False,
benchmark=0.000079,
frequency=252,
log_returns=False,
**kwargs
):
"""
估算半方差矩阵,即收益率低于基准的协方差。
.. semicov = E([min(r_i - B, 0)] . [min(r_j - B, 0)])
:param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。
:type prices: pd.DataFrame
:param returns_data: 如果是 true, 第一个参数是收益率而不是价格
:type returns_data: bool, defaults to False.
:param benchmark: 基准收益,默认为每日无风险利率,即 :math:`1.02^{(1/252)} -1` 。
:type benchmark: float
:param frequency: 一年中的时间数,默认为252(一年中的交易日数)。确保你使用适当的基准,例如,如果 ``frequency=12`` ,使用月度无风险利率。
:type frequency: int, optional
:param log_returns: 是否使用对数收益进行计算
:type log_returns: bool, 默认为 False
:return: 半方差矩阵
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
drops = np.fmin(returns - benchmark, 0)
T = drops.shape[0]
return fix_nonpositive_semidefinite(
(drops.T @ drops) / T * frequency, kwargs.get("fix_method", "spectral")
)
def _pair_exp_cov(X, Y, span=180):
"""
Calculate the exponential covariance between two timeseries of returns.
:param X: first time series of returns
:type X: pd.Series
:param Y: second time series of returns
:type Y: pd.Series
:param span: the span of the exponential weighting function, defaults to 180
:type span: int, optional
:return: the exponential covariance between X and Y
:rtype: float
"""
covariation = (X - X.mean()) * (Y - Y.mean())
# Exponentially weight the covariation and take the mean
if span < 10:
warnings.warn("it is recommended to use a higher span, e.g 30 days")
return covariation.ewm(span=span).mean().iloc[-1]
[文档]
def exp_cov(
prices, returns_data=False, span=180, frequency=252, log_returns=False, **kwargs
):
"""
估算指数加权的协方差矩阵,该矩阵对最近的数据给予更大的权重。
:param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。
:type prices: pd.DataFrame
:param returns_data: 如果是 true,则第一个参数是收益率,而不是价格
:param span: 指数加权函数的跨度,默认为 180
:type span: int, optional
:param frequency: 一年中的时间数,默认为252(一年中的交易日数)。
:type frequency: int, optional
:param log_returns: 是否使用对数收益率进行计算
:type log_returns: bool, 默认为 False
:return: 指数协方差矩阵的年化估计值
:rtype: pd.DataFrame
"""
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
assets = prices.columns
if returns_data:
returns = prices
else:
returns = returns_from_prices(prices, log_returns)
N = len(assets)
# Loop over matrix, filling entries with the pairwise exp cov
S = np.zeros((N, N))
for i in range(N):
for j in range(i, N):
S[i, j] = S[j, i] = _pair_exp_cov(
returns.iloc[:, i], returns.iloc[:, j], span
)
cov = pd.DataFrame(S * frequency, columns=assets, index=assets)
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
def min_cov_determinant(
prices,
returns_data=False,
frequency=252,
random_state=None,
log_returns=False,
**kwargs
): # pragma: no cover
warnings.warn("min_cov_determinant is deprecated and will be removed in v1.5")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
# Extra dependency
try:
import sklearn.covariance
except (ModuleNotFoundError, ImportError):
raise ImportError("Please install scikit-learn via pip or poetry")
assets = prices.columns
if returns_data:
X = prices
else:
X = returns_from_prices(prices, log_returns)
# X = np.nan_to_num(X.values)
X = X.dropna().values
raw_cov_array = sklearn.covariance.fast_mcd(X, random_state=random_state)[1]
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * frequency
return fix_nonpositive_semidefinite(cov, kwargs.get("fix_method", "spectral"))
[文档]
def cov_to_corr(cov_matrix):
"""
将一个协方差矩阵转换为一个相关矩阵。
:param cov_matrix: 协方差矩阵
:type cov_matrix: pd.DataFrame
:return: 相关矩阵
:rtype: pd.DataFrame
"""
if not isinstance(cov_matrix, pd.DataFrame):
warnings.warn("cov_matrix is not a dataframe", RuntimeWarning)
cov_matrix = pd.DataFrame(cov_matrix)
Dinv = np.diag(1 / np.sqrt(np.diag(cov_matrix)))
corr = np.dot(Dinv, np.dot(cov_matrix, Dinv))
return pd.DataFrame(corr, index=cov_matrix.index, columns=cov_matrix.index)
[文档]
def corr_to_cov(corr_matrix, stdevs):
"""
将相关矩阵转换为协方差矩阵
:param corr_matrix: 相关矩阵
:type corr_matrix: pd.DataFrame
:param stdevs: 标准差的向量
:type stdevs: array-like
:return: 协方差矩阵
:rtype: pd.DataFrame
"""
if not isinstance(corr_matrix, pd.DataFrame):
warnings.warn("corr_matrix is not a dataframe", RuntimeWarning)
corr_matrix = pd.DataFrame(corr_matrix)
return corr_matrix * np.outer(stdevs, stdevs)
[文档]
class CovarianceShrinkage:
"""
提供计算协方差矩阵的收缩估计的方法,使用样本协方差矩阵,并选择结构化估计器为单位矩阵乘以平均样本方差。
收缩常数可以手动输入,也存在估计最优值的方法(推荐 Ledoit Wolf)。
Instance variables:
- ``X`` - pd.DataFrame (收益率)
- ``S`` - np.ndarray (样本协方差矩阵)
- ``delta`` - float (收缩常数)
- ``frequency`` - int
"""
[文档]
def __init__(self, prices, returns_data=False, frequency=252, log_returns=False):
"""
:param prices: 调整后的资产收盘价,每一行是一个日期,每一列是一个股票/ID。
:type prices: pd.DataFrame
:param returns_data: 如果是 true, 则第一个参数是收益率而不是价格
:type returns_data: bool, 默认为 False.
:param frequency: 一年中的时间段数,默认为252(一年中的交易日数)
:type frequency: int, optional
:param log_returns: 是否使用对数收益进行计算
:type log_returns: bool, 默认为 False
"""
# Optional import
try:
from sklearn import covariance
self.covariance = covariance
except (ModuleNotFoundError, ImportError): # pragma: no cover
raise ImportError("Please install scikit-learn via pip or poetry")
if not isinstance(prices, pd.DataFrame):
warnings.warn("data is not in a dataframe", RuntimeWarning)
prices = pd.DataFrame(prices)
self.frequency = frequency
if returns_data:
self.X = prices.dropna(how="all")
else:
self.X = returns_from_prices(prices, log_returns).dropna(how="all")
self.S = self.X.cov().values
self.delta = None # shrinkage constant
def _format_and_annualize(self, raw_cov_array):
"""
Helper method which annualises the output of shrinkage calculations,
and formats the result into a dataframe
:param raw_cov_array: raw covariance matrix of daily returns
:type raw_cov_array: np.ndarray
:return: annualised covariance matrix
:rtype: pd.DataFrame
"""
assets = self.X.columns
cov = pd.DataFrame(raw_cov_array, index=assets, columns=assets) * self.frequency
return fix_nonpositive_semidefinite(cov, fix_method="spectral")
[文档]
def shrunk_covariance(self, delta=0.2):
"""
将一个样本协方差矩阵缩减为单位矩阵(按平均样本方差缩放)。
这种方法并不估计最佳收缩参数,它需要手动输入。
:param delta: 收缩参数, 默认为 0.2.
:type delta: float, optional
:return: 收缩样本的协方差矩阵
:rtype: np.ndarray
"""
self.delta = delta
N = self.S.shape[1]
# Shrinkage target
mu = np.trace(self.S) / N
F = np.identity(N) * mu
# Shrinkage
shrunk_cov = delta * F + (1 - delta) * self.S
return self._format_and_annualize(shrunk_cov)
[文档]
def ledoit_wolf(self, shrinkage_target="constant_variance"):
"""
计算特定收缩目标的 Ledoit-Wolf 收缩估计值。
:param shrinkage_target: 收缩目标选项, ``constant_variance``、
``single_factor`` 或 ``constant_correlation``. 默认为
``constant_variance``.
:type shrinkage_target: str, optional
:raises NotImplementedError: 如果 shrinkage_target 错误
:return: 收缩样本协方差矩阵
:rtype: np.ndarray
"""
if shrinkage_target == "constant_variance":
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.ledoit_wolf(X)
elif shrinkage_target == "single_factor":
shrunk_cov, self.delta = self._ledoit_wolf_single_factor()
elif shrinkage_target == "constant_correlation":
shrunk_cov, self.delta = self._ledoit_wolf_constant_correlation()
else:
raise NotImplementedError(
"Shrinkage target {} not recognised".format(shrinkage_target)
)
return self._format_and_annualize(shrunk_cov)
def _ledoit_wolf_single_factor(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the Sharpe single-factor matrix as the shrinkage target.
See Ledoit and Wolf (2001).
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
# De-mean returns
t, n = np.shape(X)
Xm = X - X.mean(axis=0)
xmkt = Xm.mean(axis=1).reshape(t, 1)
# compute sample covariance matrix
sample = np.cov(np.append(Xm, xmkt, axis=1), rowvar=False) * (t - 1) / t
betas = sample[0:n, n].reshape(n, 1)
varmkt = sample[n, n]
sample = sample[:n, :n]
F = np.dot(betas, betas.T) / varmkt
F[np.eye(n) == 1] = np.diag(sample)
# compute shrinkage parameters
c = np.linalg.norm(sample - F, "fro") ** 2
y = Xm**2
p = 1 / t * np.sum(np.dot(y.T, y)) - np.sum(sample**2)
# r is divided into diagonal
# and off-diagonal terms, and the off-diagonal term
# is itself divided into smaller terms
rdiag = 1 / t * np.sum(y**2) - sum(np.diag(sample) ** 2)
z = Xm * np.tile(xmkt, (n,))
v1 = 1 / t * np.dot(y.T, z) - np.tile(betas, (n,)) * sample
roff1 = (
np.sum(v1 * np.tile(betas, (n,)).T) / varmkt
- np.sum(np.diag(v1) * betas.T) / varmkt
)
v3 = 1 / t * np.dot(z.T, z) - varmkt * sample
roff3 = (
np.sum(v3 * np.dot(betas, betas.T)) / varmkt**2
- np.sum(np.diag(v3).reshape(-1, 1) * betas**2) / varmkt**2
)
roff = 2 * roff1 - roff3
r = rdiag + roff
# compute shrinkage constant
k = (p - r) / c
delta = max(0, min(1, k / t))
# compute the estimator
shrunk_cov = delta * F + (1 - delta) * sample
return shrunk_cov, delta
def _ledoit_wolf_constant_correlation(self):
"""
Helper method to calculate the Ledoit-Wolf shrinkage estimate
with the constant correlation matrix as the shrinkage target.
See Ledoit and Wolf (2003)
:return: shrunk sample covariance matrix, shrinkage constant
:rtype: np.ndarray, float
"""
X = np.nan_to_num(self.X.values)
t, n = np.shape(X)
S = self.S # sample cov matrix
# Constant correlation target
var = np.diag(S).reshape(-1, 1)
std = np.sqrt(var)
_var = np.tile(var, (n,))
_std = np.tile(std, (n,))
r_bar = (np.sum(S / (_std * _std.T)) - n) / (n * (n - 1))
F = r_bar * (_std * _std.T)
F[np.eye(n) == 1] = var.reshape(-1)
# Estimate pi
Xm = X - X.mean(axis=0)
y = Xm**2
pi_mat = np.dot(y.T, y) / t - 2 * np.dot(Xm.T, Xm) * S / t + S**2
pi_hat = np.sum(pi_mat)
# Theta matrix, expanded term by term
term1 = np.dot((Xm**3).T, Xm) / t
help_ = np.dot(Xm.T, Xm) / t
help_diag = np.diag(help_)
term2 = np.tile(help_diag, (n, 1)).T * S
term3 = help_ * _var
term4 = _var * S
theta_mat = term1 - term2 - term3 + term4
theta_mat[np.eye(n) == 1] = np.zeros(n)
rho_hat = sum(np.diag(pi_mat)) + r_bar * np.sum(
np.dot((1 / std), std.T) * theta_mat
)
# Estimate gamma
gamma_hat = np.linalg.norm(S - F, "fro") ** 2
# Compute shrinkage constant
kappa_hat = (pi_hat - rho_hat) / gamma_hat
delta = max(0.0, min(1.0, kappa_hat / t))
# Compute shrunk covariance matrix
shrunk_cov = delta * F + (1 - delta) * S
return shrunk_cov, delta
[文档]
def oracle_approximating(self):
"""
计算 Oracle 近似收缩的估算值
:return: 收缩样本协方差矩阵
:rtype: np.ndarray
"""
X = np.nan_to_num(self.X.values)
shrunk_cov, self.delta = self.covariance.oas(X)
return self._format_and_annualize(shrunk_cov)