过滤器#
这个功能是 backtrader 中相对较晚添加的,并且必须适应已有的内部结构。这使得它不像希望的那样灵活和功能完善,但在许多情况下仍然可以达到目的。
尽管实现尝试允许插入和播放过滤器链,但是已有的内部结构使得难以确保始终可以实现。因此,有些过滤器可以链接,而其他一些则不能。
目的 *** ** **
将 数据源 提供的值转换为不同的 数据源
这个实现的目的是简化通过 cerebro API直接使用的两个显而易见的过滤器的实现。它们分别是:
重采样 (
cerebro.resampledata
)在这里,过滤器会转换 数据源 的
时间框架
和压缩率
。例如:(秒, 1) -> (天, 1)这意味着原始的数据源是以 1秒*为分辨率提供的条形图。 重采样 过滤器截取数据并将其缓冲,直到它可以提供 1天 的条形图。当看到来自第二天的 1秒 条形图时,就会发生这种情况。 - * 回放 (
cerebro.replaydata
)对于上述的相同时间段,该过滤器将使用 1秒*分辨率的柱状图重新构建 1天*柱状图。
这意味着 1天*柱状图会根据看到的 1秒*柱状图的次数进行更新,以包含最新的信息。
这样可以模拟实际交易日的发展过程。
备注
只要 day 没有改变,数据的长度
len(data)
和策略的长度都不会发生变化。
工作中的过滤器 *** ** ** ** ** ** **
在使用现有的数据源进行数据提供时,您可以使用数据提供的 addfilter
方法:
data = MyDataFeed(dataname=myname)
data.addfilter(filter, *args, **kwargs)
cerebro.addata(data)
即使它与 重新采样/回放 过滤器是兼容的,也可以进行以下操作:
data = MyDataFeed(dataname=myname)
data.addfilter(filter, *args, **kwargs)
cerebro.replaydata(data)过滤器接口
*** ** ** ** ** ** **
一个 “filter” 必须符合以下给定的接口:
一个可调用对象,接受以下签名函数:
callable(data, *args, **kwargs)或者
一个能够被 实例化 和 调用 的类
实例化过程中,
__init__
方法必须支持以下签名函数:def __init__(self, data, *args, **kwargs)
__call__
方法具有以下签名函数:def __call__(self, data, *args, **kwargs)每当有新的输入值来自* 数据喂养 *,该实例将被调用。 ``* args`` 和 ``*kwargs`` 是传递给 ``__init__`` 的参数。
返回值 :
True
: 数据喂养的内部数据获取循环必须重试从喂养中获取数据,因为流的长度被篡改了。False
: 即使数据可能已被编辑(例如:更改close
价格),流的长度仍保持不变。
对于基于类的过滤器,可以实现两个附加方法
last
,具有以下签名:def last(self, data, *args, **kwargs)
当 数据喂养 结束时,将调用此方法,允许过滤器传递其可能缓冲的数据。典型情况是 重新取样 ,因为在看到下一个时间段的数据之前,柱形图将被缓冲。当数据喂养结束时,就没有新的数据来推动缓冲区的数据。
last
提供了将缓冲区的数据推出的机会。
注意
如果 过滤器 根本不支持任何参数,并且将被添加而没有参数,则签名可以简化为:一个示例过滤器
一个非常简单的过滤器实现:
class SessionFilter(object):
def __init__(self, data):
pass
def __call__(self, data):
if data.p.sessionstart <= data.datetime.time() <= data.p.sessionend:
# bar在会话时间内
return False # 告诉外部数据循环可以处理该bar
# bar在常规会话时间之外
data.backwards() # 从数据堆栈中移除bar
return True # 告诉外部数据循环需要获取新的bar
该过滤器:
使用
data.p.sessionstart
和data.p.sessionend
(标准数据源参数)来决定bar是否在会话时间内。如果 在会话时间内 ,返回值为
False
表示没有进行任何操作,当前的bar可以继续处理。如果 不在会话时间内 ,将bar从流中移除,并返回
True
表示需要获取新的bar。.. 注意::data.backwards()
方法使用了LineBuffer
接口。这深入了解了 backtrader 的内部机制。
对于这个过滤器的使用:
一些数据源包含了 非正常交易时间 的数据,这些数据对交易者可能没有兴趣。使用这个过滤器将只考虑 交易时间内 的数据。
过滤器的数据伪-API *** ** ** ** ** ** ** ** ** ** ** ** **
在上面的示例中,展示了如何通过调用 data.backwards()
方法从流中移除当前的条。数据源对象提供了一组用于过滤器的 伪-API :
data.backwards(size=1, force=False)
:将数据流中的 size 个条移除(默认为1
),通过将逻辑指针向后移动。如果force=True
,则也会删除物理存储。删除物理存储是一项非常敏感的操作,仅用于内部操作的hack。
data.forward(value=float('NaN'), size=1)
:将存储向前移动 size 个条,如有必要则增加物理存储,并使用value
进行填充。
data._addtostack(bar, stash=False)
:将bar
添加到一个堆栈中,以供稍后处理。bar
是一个可迭代对象,包含与数据源的lines
相同数量的值。如果
stash=False
,则添加到堆栈中的条将立即被系统在下一次迭代的开始时消耗。如果stash=True
,则 bar 将经历整个循环处理过程,包括可能被过滤器重新解析。
data._save2stack(erase=False, force=False)
:将当前数据 bar 保存到堆栈中以供以后处理。如果erase=True
,那么将调用data.backwards
,并且将接收参数force
。
data._updatebar(bar, forward=False, ago=0)
:使用可迭代对象bar
中的值覆盖数据流ago
位置上的值。默认情况下,使用ago=0
来更新当前 bar 。使用-1
,则是更新上一个 bar 。
另一个示例:Pinkfish 过滤器#
这是一个可以链接的过滤器的示例,主要用于另一个过滤器,即“replay filter”。 Pinkfish 这个名字来自于描述这个概念的库的主页:使用每日数据执行只有使用分钟级数据才可能实现的操作。
为了达到该效果:
将每日 bar 分成两个部分:
OHL
和C
。这两个部分通过 replay 进行链接,以在流中发生以下情况:
对于长度为 X -> OHL 对于长度为 X -> OHLC 对于长度为 X + 1 -> OHL 对于长度为 X + 1 -> OHLC 对于长度为 X + 2 -> OHL 对于长度为 X + 2 -> OHLC …
逻辑:- 当接收到一个 OHLC
条时,它会被复制到一个可迭代对象,并进行分解,得到以下内容:
一个
OHL
条。因为这个概念实际上并不存在,所以 收盘*价格被替换为 开盘*价格,真正形成了一个OHLO
条。一个
C
条也并不存在。现实情况是它将被传送如一个CCCC
。成交量被分配到这两部分之间。
当前条从流中移除。
OHLO
部分被放入堆栈中以供立即处理。
CCCC
部分被放入存储区以供下一轮处理。因为堆栈中有内容要进行立即处理,过滤器可以返回
False
表示完成处理。
此过滤器与以下内容一起工作:
replay 过滤器将
OHLO
和CCCC
部分合并以最终提供一个OHLC
条。使用案例:查看今天的最高价格是否是过去20个交易日中的最高价格,如果是,则发出一个以第二个tick执行的“Close”订单。
代码:
- class DaySplitter_Close(bt.with_metaclass(bt.MetaParams, object)):
‘’’ 将每日的K线分为两部分,模拟2个tick,用于回放数据:
第一个tick:
OHLX
将“Close”价格替换为“Open”,“High”和“Low”的 平均值
此tick使用的是开盘时间
和
第二个tick:
CCCC
使用“Close”价格作为价格的四个组成部分的值这个滤波器用于将交易量分配给2个ticks,使用以下参数:
closevol
(默认值:0.5
)该值表示百分比,绝对数值从0.0到1.0,给予 closing tick。其余部分将给予OHLX
tick。
此滤波器旨在与 cerebro.replaydata
一起使用
‘’’(代码注释:回放中为True)
(‘closevol’, 0.5), # 在close中保留的交易量的百分比(0至1)
)
# replaying = True
- def __init__(self, data):
self.lastdt = None
- def __call__(self, data):
# 复制新的K线并从流中移除它 datadt = data.datetime.date() # 保留日期信息
- if self.lastdt == datadt:
return False # 跳过已经在滤波器中出现过的K线
self.lastdt = datadt # 保留对最后一个K线的引用
closebar = ohlbar[:] # 复制 close
# 用开盘、最高、最低价格平均值替换收盘价 ohlprice = ohlbar[data.Open] + ohlbar[data.High] + ohlbar[data.Low] ohlbar[data.Close] = ohlprice / 3.0
vol = ohlbar[data.Volume] # 调整交易量 ohlbar[data.Volume] = vohl = int(vol * (1.0 - self.p.closevol))
oi = ohlbar[data.OpenInterest] # 调整未平仓合约 ohlbar[data.OpenInterest] = 0
# 调整时间 dt = datetime.datetime.combine(datadt, data.p.sessionstart) ohlbar[data.DateTime] = data.date2num(dt)
# 调整 closebar,生成单个 tick -> 收盘价 closebar[data.Open] = cprice = closebar[data.Close] closebar[data.High] = cprice closebar[data.Low] = cprice closebar[data.Volume] = vol - vohl ohlbar[data.OpenInterest] = oi
# 调整时间 dt = datetime.datetime.combine(datadt, data.p.sessionend) closebar[data.DateTime] = data.date2num(dt)
# 更新流 data.backwards(force=True) # 从流中删除复制的条 data._add2stack(ohlbar) # 将 ohlbar 添加到堆栈 # 将第二部分添加到stash,延迟到下一轮处理 data._add2stack(closebar, stash=True)