在线服务

介绍

../_images/online_serving.png

除了回测之外,测试模型的一种有效方法是在真实的市场条件下进行预测,甚至根据这些预测进行真实交易。 在线服务 是一组用于使用最新数据的在线模型的模块, 其中包括 在线管理器在线策略在线工具更新器

这里 有几个示例供参考,展示了 在线服务 的不同特点。 如果您有很多模型或需要管理的 任务,请考虑 任务管理示例 基于 任务管理 中的一些组件,如 TrainerRMCollector

注意:用户应保持其数据源更新,以支持在线服务。例如,Qlib提供了 一批脚本 来帮助用户更新Yahoo的日频数据。

目前已知的限制 - 目前,支持每日更新下一个交易日的预测。但由于 公共数据限制 ,不支持为下一个交易日生成订单。

在线管理器

OnlineManager can manage a set of Online Strategy and run them dynamically.

With the change of time, the decisive models will be also changed. In this module, we call those contributing models online models. In every routine(such as every day or every minute), the online models may be changed and the prediction of them needs to be updated. So this module provides a series of methods to control this process.

This module also provides a method to simulate Online Strategy in history. Which means you can verify your strategy or find a better one.

There are 4 total situations for using different trainers in different situations:

Situations

Description

Online + Trainer

When you want to do a REAL routine, the Trainer will help you train the models. It will train models task by task and strategy by strategy.

Online + DelayTrainer

DelayTrainer will skip concrete training until all tasks have been prepared by different strategies. It makes users can parallelly train all tasks at the end of routine or first_train. Otherwise, these functions will get stuck when each strategy prepare tasks.

Simulation + Trainer

It will behave in the same way as Online + Trainer. The only difference is that it is for simulation/backtesting instead of online trading

Simulation + DelayTrainer

When your models don’t have any temporal dependence, you can use DelayTrainer for the ability to multitasking. It means all tasks in all routines can be REAL trained at the end of simulating. The signals will be prepared well at different time segments (based on whether or not any new model is online).

Here is some pseudo code that demonstrate the workflow of each situation

For simplicity
  • Only one strategy is used in the strategy

  • update_online_pred is only called in the online mode and is ignored

  1. Online + Trainer

tasks = first_train()
models = trainer.train(tasks)
trainer.end_train(models)
for day in online_trading_days:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy

    trainer.end_train(models)
    prepare_signals()  # prepare trading signals daily

Online + DelayTrainer: the workflow is the same as Online + Trainer.

  1. Simulation + DelayTrainer

# simulate
tasks = first_train()
models = trainer.train(tasks)
for day in historical_calendars:
    # OnlineManager.routine
    models = trainer.train(strategy.prepare_tasks())  # for each strategy
    strategy.prepare_online_models(models)  # for each strategy
# delay_prepare()
# FIXME: Currently the delay_prepare is not implemented in a proper way.
trainer.end_train(<for all previous models>)
prepare_signals()

# Can we simplify current workflow?

  • Can reduce the number of state of tasks?

    • For each task, we have three phases (i.e. task, partly trained task, final trained task)

class qlib.workflow.online.manager.OnlineManager(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

OnlineManager can manage online models with Online Strategy. It also provides a history recording of which models are online at what time.

__init__(strategies: OnlineStrategy | List[OnlineStrategy], trainer: Trainer | None = None, begin_time: str | Timestamp | None = None, freq='day')

Init OnlineManager. One OnlineManager must have at least one OnlineStrategy.

参数:
  • strategies (Union[OnlineStrategy, List[OnlineStrategy]]) – an instance of OnlineStrategy or a list of OnlineStrategy

  • begin_time (Union[str,pd.Timestamp], optional) – the OnlineManager will begin at this time. Defaults to None for using the latest date.

  • trainer (qlib.model.trainer.Trainer) – the trainer to train task. None for using TrainerR.

  • freq (str, optional) – data frequency. Defaults to “day”.

first_train(strategies: List[OnlineStrategy] | None = None, model_kwargs: dict = {})

Get tasks from every strategy’s first_tasks method and train them. If using DelayTrainer, it can finish training all together after every strategy’s first_tasks.

参数:
  • strategies (List[OnlineStrategy]) – the strategies list (need this param when adding strategies). None for use default strategies.

  • model_kwargs (dict) – the params for prepare_online_models

routine(cur_time: str | Timestamp | None = None, task_kwargs: dict = {}, model_kwargs: dict = {}, signal_kwargs: dict = {})

Typical update process for every strategy and record the online history.

The typical update process after a routine, such as day by day or month by month. The process is: Update predictions -> Prepare tasks -> Prepare online models -> Prepare signals.

If using DelayTrainer, it can finish training all together after every strategy’s prepare_tasks.

参数:
  • cur_time (Union[str,pd.Timestamp], optional) – run routine method in this time. Defaults to None.

  • task_kwargs (dict) – the params for prepare_tasks

  • model_kwargs (dict) – the params for prepare_online_models

  • signal_kwargs (dict) – the params for prepare_signals

get_collector(**kwargs) MergeCollector

Get the instance of Collector to collect results from every strategy. This collector can be a basis as the signals preparation.

参数:

**kwargs – the params for get_collector.

返回:

the collector to merge other collectors.

返回类型:

MergeCollector

add_strategy(strategies: OnlineStrategy | List[OnlineStrategy])

Add some new strategies to OnlineManager.

参数:

strategy (Union[OnlineStrategy, List[OnlineStrategy]]) – a list of OnlineStrategy

prepare_signals(prepare_func: ~typing.Callable = <qlib.model.ens.ensemble.AverageEnsemble object>, over_write=False)

After preparing the data of the last routine (a box in box-plot) which means the end of the routine, we can prepare trading signals for the next routine.

NOTE: Given a set prediction, all signals before these prediction end times will be prepared well.

Even if the latest signal already exists, the latest calculation result will be overwritten.

备注

Given a prediction of a certain time, all signals before this time will be prepared well.

参数:
  • prepare_func (Callable, optional) – Get signals from a dict after collecting. Defaults to AverageEnsemble(), the results collected by MergeCollector must be {xxx:pred}.

  • over_write (bool, optional) – If True, the new signals will overwrite. If False, the new signals will append to the end of signals. Defaults to False.

返回:

the signals.

返回类型:

pd.DataFrame

get_signals() Series | DataFrame

Get prepared online signals.

返回:

pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.

返回类型:

Union[pd.Series, pd.DataFrame]

simulate(end_time=None, frequency='day', task_kwargs={}, model_kwargs={}, signal_kwargs={}) Series | DataFrame

Starting from the current time, this method will simulate every routine in OnlineManager until the end time.

Considering the parallel training, the models and signals can be prepared after all routine simulating.

The delay training way can be DelayTrainer and the delay preparing signals way can be delay_prepare.

参数:
  • end_time – the time the simulation will end

  • frequency – the calendar frequency

  • task_kwargs (dict) – the params for prepare_tasks

  • model_kwargs (dict) – the params for prepare_online_models

  • signal_kwargs (dict) – the params for prepare_signals

返回:

pd.Series for only one signals every datetime. pd.DataFrame for multiple signals, for example, buy and sell operations use different trading signals.

返回类型:

Union[pd.Series, pd.DataFrame]

delay_prepare(model_kwargs={}, signal_kwargs={})

Prepare all models and signals if something is waiting for preparation.

参数:
  • model_kwargs – the params for end_train

  • signal_kwargs – the params for prepare_signals

在线策略

OnlineStrategy module is an element of online serving.

class qlib.workflow.online.strategy.OnlineStrategy(name_id: str)

OnlineStrategy is working with Online Manager, responding to how the tasks are generated, the models are updated and signals are prepared.

__init__(name_id: str)

Init OnlineStrategy. This module MUST use Trainer to finishing model training.

参数:
  • name_id (str) – a unique name or id.

  • trainer (qlib.model.trainer.Trainer, optional) – a instance of Trainer. Defaults to None.

prepare_tasks(cur_time, **kwargs) List[dict]

After the end of a routine, check whether we need to prepare and train some new tasks based on cur_time (None for latest).. Return the new tasks waiting for training.

You can find the last online models by OnlineTool.online_models.

prepare_online_models(trained_models, cur_time=None) List[object]

Select some models from trained models and set them to online models. This is a typical implementation to online all trained models, you can override it to implement the complex method. You can find the last online models by OnlineTool.online_models if you still need them.

NOTE: Reset all online models to trained models. If there are no trained models, then do nothing.

NOTE:

Current implementation is very naive. Here is a more complex situation which is more closer to the practical scenarios. 1. Train new models at the day before test_start (at time stamp T) 2. Switch models at the test_start (at time timestamp T + 1 typically)

参数:
  • models (list) – a list of models.

  • cur_time (pd.Dataframe) – current time from OnlineManger. None for the latest.

返回:

a list of online models.

返回类型:

List[object]

first_tasks() List[dict]

Generate a series of tasks firstly and return them.

get_collector() Collector

Get the instance of Collector to collect different results of this strategy.

For example:
  1. collect predictions in Recorder

  2. collect signals in a txt file

返回:

Collector

class qlib.workflow.online.strategy.RollingStrategy(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

This example strategy always uses the latest rolling model sas online models.

__init__(name_id: str, task_template: dict | List[dict], rolling_gen: RollingGen)

Init RollingStrategy.

Assumption: the str of name_id, the experiment name, and the trainer’s experiment name are the same.

参数:
  • name_id (str) – a unique name or id. Will be also the name of the Experiment.

  • task_template (Union[dict, List[dict]]) – a list of task_template or a single template, which will be used to generate many tasks using rolling_gen.

  • rolling_gen (RollingGen) – an instance of RollingGen

get_collector(process_list=[<qlib.model.ens.group.RollingGroup object>], rec_key_func=None, rec_filter_func=None, artifacts_key=None)

Get the instance of Collector to collect results. The returned collector must distinguish results in different models.

Assumption: the models can be distinguished based on the model name and rolling test segments. If you do not want this assumption, please implement your method or use another rec_key_func.

参数:
  • rec_key_func (Callable) – a function to get the key of a recorder. If None, use recorder id.

  • rec_filter_func (Callable, optional) – filter the recorder by return True or False. Defaults to None.

  • artifacts_key (List[str], optional) – the artifacts key you want to get. If None, get all artifacts.

first_tasks() List[dict]

Use rolling_gen to generate different tasks based on task_template.

返回:

a list of tasks

返回类型:

List[dict]

prepare_tasks(cur_time) List[dict]

Prepare new tasks based on cur_time (None for the latest).

You can find the last online models by OnlineToolR.online_models.

返回:

a list of new tasks.

返回类型:

List[dict]

在线工具

OnlineTool is a module to set and unset a series of online models. The online models are some decisive models in some time points, which can be changed with the change of time. This allows us to use efficient submodels as the market-style changing.

class qlib.workflow.online.utils.OnlineTool

OnlineTool will manage online models in an experiment that includes the model recorders.

__init__()

Init OnlineTool.

set_online_tag(tag, recorder: list | object)

Set tag to the model to sign whether online.

参数:
  • tag (str) – the tags in ONLINE_TAG, OFFLINE_TAG

  • recorder (Union[list,object]) – the model’s recorder

get_online_tag(recorder: object) str

Given a model recorder and return its online tag.

参数:

recorder (Object) – the model’s recorder

返回:

the online tag

返回类型:

str

reset_online_tag(recorder: list | object)

Offline all models and set the recorders to ‘online’.

参数:

recorder (Union[list,object]) – the recorder you want to reset to ‘online’.

online_models() list

Get current online models

返回:

a list of online models.

返回类型:

list

update_online_pred(to_date=None)

Update the predictions of online models to to_date.

参数:

to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to the latest.

class qlib.workflow.online.utils.OnlineToolR(default_exp_name: str | None = None)

The implementation of OnlineTool based on (R)ecorder.

__init__(default_exp_name: str | None = None)

Init OnlineToolR.

参数:

default_exp_name (str) – the default experiment name.

set_online_tag(tag, recorder: Recorder | List)

Set tag to the model’s recorder to sign whether online.

参数:
  • tag (str) – the tags in ONLINE_TAG, NEXT_ONLINE_TAG, OFFLINE_TAG

  • recorder (Union[Recorder, List]) – a list of Recorder or an instance of Recorder

get_online_tag(recorder: Recorder) str

Given a model recorder and return its online tag.

参数:

recorder (Recorder) – an instance of recorder

返回:

the online tag

返回类型:

str

reset_online_tag(recorder: Recorder | List, exp_name: str | None = None)

Offline all models and set the recorders to ‘online’.

参数:
  • recorder (Union[Recorder, List]) – the recorder you want to reset to ‘online’.

  • exp_name (str) – the experiment name. If None, then use default_exp_name.

online_models(exp_name: str | None = None) list

Get current online models

参数:

exp_name (str) – the experiment name. If None, then use default_exp_name.

返回:

a list of online models.

返回类型:

list

update_online_pred(to_date=None, from_date=None, exp_name: str | None = None)

Update the predictions of online models to to_date.

参数:
  • to_date (pd.Timestamp) – the pred before this date will be updated. None for updating to latest time in Calendar.

  • exp_name (str) – the experiment name. If None, then use default_exp_name.

更新器

Updater is a module to update artifacts such as predictions when the stock data is updating.

class qlib.workflow.online.update.RMDLoader(rec: Recorder)

Recorder Model Dataset Loader

__init__(rec: Recorder)
get_dataset(start_time, end_time, segments=None, unprepared_dataset: DatasetH | None = None) DatasetH

Load, config and setup dataset.

This dataset is for inference.

参数:
  • start_time – the start_time of underlying data

  • end_time – the end_time of underlying data

  • segments – dict the segments config for dataset Due to the time series dataset (TSDatasetH), the test segments maybe different from start_time and end_time

  • unprepared_dataset – Optional[DatasetH] if user don’t want to load dataset from recorder, please specify user’s dataset

返回:

the instance of DatasetH

返回类型:

DatasetH

class qlib.workflow.online.update.RecordUpdater(record: Recorder, *args, **kwargs)

Update a specific recorders

__init__(record: Recorder, *args, **kwargs)
abstract update(*args, **kwargs)

Update info for specific recorder

class qlib.workflow.online.update.DSBasedUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Dataset-Based Updater

  • Providing updating feature for Updating data based on Qlib Dataset

Assumption

  • Based on Qlib dataset

  • The data to be updated is a multi-level index pd.DataFrame. For example label, prediction.

                             LABEL0
    datetime   instrument
    2021-05-10 SH600000    0.006965
               SH600004    0.003407
    ...                         ...
    2021-05-28 SZ300498    0.015748
               SZ300676   -0.001321
    
__init__(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Init PredUpdater.

Expected behavior in following cases:

  • if to_date is greater than the max date in the calendar, the data will be updated to the latest date

  • if there are data before from_date or after to_date, only the data between from_date and to_date are affected.

参数:
  • record – Recorder

  • to_date

    update to prediction to the to_date

    if to_date is None:

    data will updated to the latest date.

  • from_date

    the update will start from from_date

    if from_date is None:

    the updating will occur on the next tick after the latest data in historical data

  • hist_ref

    int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref

    备注

    the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases

  • loader_cls – type the class to load the model and dataset

prepare_data(unprepared_dataset: DatasetH | None = None) DatasetH

Load dataset - if unprepared_dataset is specified, then prepare the dataset directly - Otherwise,

Separating this function will make it easier to reuse the dataset

返回:

the instance of DatasetH

返回类型:

DatasetH

update(dataset: DatasetH | None = None, write: bool = True, ret_new: bool = False) object | None
参数:
  • dataset (DatasetH) – DatasetH: the instance of DatasetH. None for prepare it again.

  • write (bool) – will the the write action be executed

  • ret_new (bool) – will the updated data be returned

返回:

the updated dataset

返回类型:

Optional[object]

abstract get_update_data(dataset: Dataset) DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)

class qlib.workflow.online.update.PredUpdater(record: ~qlib.workflow.recorder.Recorder, to_date=None, from_date=None, hist_ref: int | None = None, freq='day', fname='pred.pkl', loader_cls: type = <class 'qlib.workflow.online.update.RMDLoader'>)

Update the prediction in the Recorder

get_update_data(dataset: Dataset) DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)

class qlib.workflow.online.update.LabelUpdater(record: Recorder, to_date=None, **kwargs)

Update the label in the recorder

Assumption - The label is generated from record_temp.SignalRecord.

__init__(record: Recorder, to_date=None, **kwargs)

Init PredUpdater.

Expected behavior in following cases:

  • if to_date is greater than the max date in the calendar, the data will be updated to the latest date

  • if there are data before from_date or after to_date, only the data between from_date and to_date are affected.

参数:
  • record – Recorder

  • to_date

    update to prediction to the to_date

    if to_date is None:

    data will updated to the latest date.

  • from_date

    the update will start from from_date

    if from_date is None:

    the updating will occur on the next tick after the latest data in historical data

  • hist_ref

    int Sometimes, the dataset will have historical depends. Leave the problem to users to set the length of historical dependency If user doesn’t specify this parameter, Updater will try to load dataset to automatically determine the hist_ref

    备注

    the start_time is not included in the hist_ref; So the hist_ref will be step_len - 1 in most cases

  • loader_cls – type the class to load the model and dataset

get_update_data(dataset: Dataset) DataFrame

return the updated data based on the given dataset

The difference between get_update_data and update - update_date only include some data specific feature - update include some general routine steps(e.g. prepare dataset, checking)