"""
``discrete_allocation`` 模块包含 DiscreteAllocation 类,它提供了多种方法来从连续权重生成离散的投资组合分配。
"""
import collections
import cvxpy as cp
import numpy as np
import pandas as pd
from . import exceptions
def get_latest_prices(prices):
"""
A helper tool which retrieves the most recent asset prices from a dataframe of
asset prices, required in order to generate a discrete allocation.
:param prices: historical asset prices
:type prices: pd.DataFrame
:raises TypeError: if prices are not in a dataframe
:return: the most recent price of each asset
:rtype: pd.Series
"""
if not isinstance(prices, pd.DataFrame):
raise TypeError("prices not in a dataframe")
return prices.ffill().iloc[-1]
[文档]
class DiscreteAllocation:
"""
从连续权重生成离散的投资组合配置
可选:
- Inputs:
- ``weights`` - 字典
- ``latest_prices`` - pd.Series 或字典
- ``total_portfolio_value`` - int 或 float
- ``short_ratio``- float
- Output: ``allocation`` - dict
公共方法:
- ``greedy_portfolio()`` - 使用贪婪算法
- ``lp_portfolio()`` - 使用线性规划
"""
[文档]
def __init__(
self, weights, latest_prices, total_portfolio_value=10000, short_ratio=None
):
"""
:param weights: 从 ``efficient_frontier`` 模块生成的连续权重
:type weights: dict
:param latest_prices: 每项资产的最新价格
:type latest_prices: pd.Series
:param total_portfolio_value: 投资组合的预期总价值,默认为 10000
:type total_portfolio_value: int/float, optional
:param short_ratio: 做空比率,例如 0.3 对应于 130/30。如果 None,则默认为输入权重。
:type short_ratio: float, defaults to None.
:raises TypeError: 如果 ``weights`` 非字典
:raises TypeError: 如果 ``latest_prices`` 非 series
:raises ValueError: 如果 ``short_ratio < 0``
"""
if not isinstance(weights, dict):
raise TypeError("weights should be a dictionary of {ticker: weight}")
if any(np.isnan(val) for val in weights.values()):
raise ValueError("weights should have no NaNs")
if (not isinstance(latest_prices, pd.Series)) or any(np.isnan(latest_prices)):
raise TypeError("latest_prices should be a pd.Series with no NaNs")
if total_portfolio_value <= 0:
raise ValueError("total_portfolio_value must be greater than zero")
if short_ratio is not None and short_ratio < 0:
raise ValueError("short_ratio must be non-negative")
# Drop any companies with negligible weights. Use a tuple because order matters.
self.weights = list(weights.items())
self.latest_prices = latest_prices
self.total_portfolio_value = total_portfolio_value
if short_ratio is None:
self.short_ratio = sum((-x[1] for x in self.weights if x[1] < 0))
else:
self.short_ratio = short_ratio
[文档]
@staticmethod
def _remove_zero_positions(allocation):
"""
移除零仓位(即没有买入股票)的效用函数
:type allocation: dict
"""
return {k: v for k, v in allocation.items() if v != 0}
[文档]
def _allocation_rmse_error(self, verbose=True):
"""
计算和打印离散权重和连续权重之间的 RMSE 误差的实用函数。
使用 RMSE 而不是 MAE 是因为我们想对大的变化进行惩罚。
:param verbose: 是否输出权重差异
:type verbose: bool
:return: rmse 误差
:rtype: float
"""
portfolio_val = 0
for ticker, num in self.allocation.items():
portfolio_val += num * self.latest_prices[ticker]
sse = 0 # sum of square errors
for ticker, weight in self.weights:
if ticker in self.allocation:
allocation_weight = (
self.allocation[ticker] * self.latest_prices[ticker] / portfolio_val
)
else:
allocation_weight = 0
sse += (weight - allocation_weight) ** 2
if verbose:
print(
"{}: allocated {:.3f}, desired {:.3f}".format(
ticker, allocation_weight, weight
)
)
rmse = np.sqrt(sse / len(self.weights))
print("Allocation has RMSE: {:.3f}".format(rmse))
return rmse
[文档]
def greedy_portfolio(self, reinvest=False, verbose=False):
"""
使用贪婪的迭代方法将连续权重转换成离散的投资组合分配。
:param reinvest: 是否对做空获得的现金进行再投资
:type reinvest: bool, 默认为 False
:param verbose: 是否详细输出
:type verbose: bool, 默认为 False
:return: 应该购买的每个股票的数量,以及剩余的资金数额。
:rtype: (dict, float)
"""
# Sort in descending order of weight
self.weights.sort(key=lambda x: x[1], reverse=True)
# If portfolio contains shorts
if self.weights[-1][1] < 0:
longs = {t: w for t, w in self.weights if w >= 0}
shorts = {t: -w for t, w in self.weights if w < 0}
# Make them sum to one
long_total_weight = sum(longs.values())
short_total_weight = sum(shorts.values())
longs = {t: w / long_total_weight for t, w in longs.items()}
shorts = {t: w / short_total_weight for t, w in shorts.items()}
# Construct long-only discrete allocations for each
short_val = self.total_portfolio_value * self.short_ratio
long_val = self.total_portfolio_value
if reinvest:
long_val += short_val
if verbose:
print("\nAllocating long sub-portfolio...")
da1 = DiscreteAllocation(
longs, self.latest_prices[longs.keys()], total_portfolio_value=long_val
)
long_alloc, long_leftover = da1.greedy_portfolio()
if verbose:
print("\nAllocating short sub-portfolio...")
da2 = DiscreteAllocation(
shorts,
self.latest_prices[shorts.keys()],
total_portfolio_value=short_val,
)
short_alloc, short_leftover = da2.greedy_portfolio()
short_alloc = {t: -w for t, w in short_alloc.items()}
# Combine and return
self.allocation = long_alloc.copy()
self.allocation.update(short_alloc)
self.allocation = self._remove_zero_positions(self.allocation)
return self.allocation, long_leftover + short_leftover
# Otherwise, portfolio is long only and we proceed with greedy algo
available_funds = self.total_portfolio_value
shares_bought = []
buy_prices = []
# First round
for ticker, weight in self.weights:
price = self.latest_prices[ticker]
# Attempt to buy the lower integer number of shares, which could be zero.
n_shares = int(weight * self.total_portfolio_value / price)
cost = n_shares * price
# As weights are all > 0 (long only) we always round down n_shares
# so the cost is always <= simple weighted share of portfolio value,
# so we can not run out of funds just here.
assert cost <= available_funds, "Unexpectedly insufficient funds."
available_funds -= cost
shares_bought.append(n_shares)
buy_prices.append(price)
# Second round
while available_funds > 0:
# Calculate the equivalent continuous weights of the shares that
# have already been bought
current_weights = np.array(buy_prices) * np.array(shares_bought)
current_weights /= current_weights.sum()
ideal_weights = np.array([i[1] for i in self.weights])
deficit = ideal_weights - current_weights
# Attempt to buy the asset whose current weights deviate the most
idx = np.argmax(deficit)
ticker, weight = self.weights[idx]
price = self.latest_prices[ticker]
# If we can't afford this asset, search for the next highest deficit that we
# can purchase.
counter = 0
while price > available_funds:
deficit[idx] = 0 # we can no longer purchase the asset at idx
idx = np.argmax(deficit) # find the next most deviant asset
# If either of these conditions is met, we break out of both while loops
# hence the repeated statement below
if deficit[idx] < 0 or counter == 10:
break
ticker, weight = self.weights[idx]
price = self.latest_prices[ticker]
counter += 1
if deficit[idx] <= 0 or counter == 10: # pragma: no cover
# Dirty solution to break out of both loops
break
# Buy one share at a time
shares_bought[idx] += 1
available_funds -= price
self.allocation = self._remove_zero_positions(
collections.OrderedDict(zip([i[0] for i in self.weights], shares_bought))
)
if verbose:
print("Funds remaining: {:.2f}".format(available_funds))
self._allocation_rmse_error(verbose)
return self.allocation, available_funds
[文档]
def lp_portfolio(self, reinvest=False, verbose=False, solver="ECOS_BB"):
"""
使用整数规划将连续权重转换为离散的投资组合分配。
:param reinvest: 是否对做空获得的现金进行再投资
:type reinvest: bool, 默认为 False
:param verbose: 是否详细输出
:type verbose: bool
:param solver: 使用的 CVXPY 求解器(必须支持混合整数规划)
:type solver: str, 默认为 "ECOS_BB"
:return: 应该购买的每个股票的数量,以及剩余的资金数额。
:rtype: (dict, float)
"""
if any([w < 0 for _, w in self.weights]):
longs = {t: w for t, w in self.weights if w >= 0}
shorts = {t: -w for t, w in self.weights if w < 0}
# Make them sum to one
long_total_weight = sum(longs.values())
short_total_weight = sum(shorts.values())
longs = {t: w / long_total_weight for t, w in longs.items()}
shorts = {t: w / short_total_weight for t, w in shorts.items()}
# Construct long-only discrete allocations for each
short_val = self.total_portfolio_value * self.short_ratio
long_val = self.total_portfolio_value
if reinvest:
long_val += short_val
if verbose:
print("\nAllocating long sub-portfolio:")
da1 = DiscreteAllocation(
longs, self.latest_prices[longs.keys()], total_portfolio_value=long_val
)
long_alloc, long_leftover = da1.lp_portfolio(solver=solver)
if verbose:
print("\nAllocating short sub-portfolio:")
da2 = DiscreteAllocation(
shorts,
self.latest_prices[shorts.keys()],
total_portfolio_value=short_val,
)
short_alloc, short_leftover = da2.lp_portfolio(solver=solver)
short_alloc = {t: -w for t, w in short_alloc.items()}
# Combine and return
self.allocation = long_alloc.copy()
self.allocation.update(short_alloc)
self.allocation = self._remove_zero_positions(self.allocation)
return self.allocation, long_leftover + short_leftover
p = self.latest_prices.values
n = len(p)
w = np.fromiter([i[1] for i in self.weights], dtype=float)
# Integer allocation
x = cp.Variable(n, integer=True)
# Remaining dollars
r = self.total_portfolio_value - p.T @ x
# Set up linear program
eta = w * self.total_portfolio_value - cp.multiply(x, p)
u = cp.Variable(n)
constraints = [eta <= u, eta >= -u, x >= 0, r >= 0]
objective = cp.sum(u) + r
opt = cp.Problem(cp.Minimize(objective), constraints)
opt.solve(solver=solver)
if opt.status not in {"optimal", "optimal_inaccurate"}: # pragma: no cover
raise exceptions.OptimizationError("Please try greedy_portfolio")
vals = np.rint(x.value).astype(int)
self.allocation = self._remove_zero_positions(
collections.OrderedDict(zip([i[0] for i in self.weights], vals))
)
if verbose:
print("Funds remaining: {:.2f}".format(r.value))
self._allocation_rmse_error()
return self.allocation, r.value