任务管理

介绍

工作流程 部分介绍了如何以松散耦合的方式运行研究工作流程。但当您使用 qrun 时,它只能执行一个 task。 为了自动生成和执行不同的任务,任务管理 提供了一个完整的流程,包括 任务生成任务存储任务训练任务收集 。 使用此模块,用户可以按不同的时间周期、不同的损失甚至不同的模型自动运行他们的 task。任务生成、模型训练、数据合并和数据收集的过程如下图所示。

../_images/Task-Gen-Recorder-Collector.svg

整个流程可以在 在线服务 中使用。

整个流程的示例可以在 此处 查看。

任务生成

task 由用户添加的 ModelDatasetRecord 或其他任何内容组成。 可以在 任务部分 中查看具体的任务模板。 尽管任务模板是固定的,但用户可以根据任务模板定制自己的 TaskGen 以生成不同的任务。

下面是 TaskGen 的基类:

class qlib.workflow.task.gen.TaskGen

The base class for generating different tasks

Example 1:

input: a specific task template and rolling steps

output: rolling version of the tasks

Example 2:

input: a specific task template and losses list

output: a set of tasks with different losses

abstract generate(task: dict) List[dict]

Generate different tasks based on a task template

参数:

task (dict) – a task template

返回:

A list of tasks

返回类型:

List[dict]

Qlib 提供了一个名为 RollingGen 的类,用于在不同日期段生成数据集的 task 列表。 该类允许用户在一个实验中验证不同时期的数据对模型的影响。更多信息可以在 此处 找到。

任务存储

为了实现更高的效率和集群操作的可能性,Task Manager 将把所有任务存储在 MongoDB 中。 TaskManager 可以自动获取未完成的任务并管理一组任务的生命周期,并进行错误处理。 在使用此模块时,用户 必须 完成对 MongoDB 的配置。

用户需要在 初始化 过程中提供 MongoDB 的 URL 和数据库名称,或者可以使用以下语句进行声明。

from qlib.config import C
C["mongo"] = {
    "task_url" : "mongodb://localhost:27017/", # your MongoDB url
    "task_db_name" : "rolling_db" # database name
}
class qlib.workflow.task.manage.TaskManager(task_pool: str)

Here is what will a task looks like when it created by TaskManager

{
    'def': pickle serialized task definition.  using pickle will make it easier
    'filter': json-like data. This is for filtering the tasks.
    'status': 'waiting' | 'running' | 'done'
    'res': pickle serialized task result,
}

The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.

This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage

python -m qlib.workflow.task.manage -t <pool_name> wait
python -m qlib.workflow.task.manage -t <pool_name> task_stat

备注

Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded

Here are four status which are:

STATUS_WAITING: waiting for training

STATUS_RUNNING: training

STATUS_PART_DONE: finished some step and waiting for next step

STATUS_DONE: all work done

__init__(task_pool: str)

Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.

参数:

task_pool (str) – the name of Collection in MongoDB

static list() list

List the all collection(task_pool) of the db.

返回:

list

replace_task(task, new_task)

Use a new task to replace a old one

参数:
  • task – old task

  • new_task – new task

insert_task(task)

Insert a task.

参数:

task – the task waiting for insert

返回:

pymongo.results.InsertOneResult

insert_task_def(task_def)

Insert a task to task_pool

参数:

task_def (dict) – the task definition

返回类型:

pymongo.results.InsertOneResult

create_task(task_def_l, dry_run=False, print_nt=False) List[str]

If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.

参数:
  • task_def_l (list) – a list of task

  • dry_run (bool) – if insert those new tasks to task pool

  • print_nt (bool) – if print new task

返回:

a list of the _id of task_def_l

返回类型:

List[str]

fetch_task(query={}, status='waiting') dict

Use query to fetch tasks.

参数:
  • query (dict, optional) – query dict. Defaults to {}.

  • status (str, optional) – [description]. Defaults to STATUS_WAITING.

返回:

a task(document in collection) after decoding

返回类型:

dict

safe_fetch_task(query={}, status='waiting')

Fetch task from task_pool using query with contextmanager

参数:

query (dict) – the dict of query

返回:

dict

返回类型:

a task(document in collection) after decoding

query(query={}, decode=True)

Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator

python -m qlib.workflow.task.manage -t <your task pool> query ‘{“_id”: “615498be837d0053acbc5d58”}’

参数:
  • query (dict) – the dict of query

  • decode (bool) –

返回:

dict

返回类型:

a task(document in collection) after decoding

re_query(_id) dict

Use _id to query task.

参数:

_id (str) – _id of a document

返回:

a task(document in collection) after decoding

返回类型:

dict

commit_task_res(task, res, status='done')

Commit the result to task[‘res’].

参数:
  • task ([type]) – [description]

  • res (object) – the result you want to save

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.

return_task(task, status='waiting')

Return a task to status. Always using in error handling.

参数:
  • task ([type]) – [description]

  • status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.

remove(query={})

Remove the task using query

参数:

query (dict) – the dict of query

task_stat(query={}) dict

Count the tasks in every status.

参数:

query (dict, optional) – the query dict. Defaults to {}.

返回:

dict

reset_waiting(query={})

Reset all running task into waiting status. Can be used when some running task exit unexpected.

参数:

query (dict, optional) – the query dict. Defaults to {}.

prioritize(task, priority: int)

Set priority for task

参数:
  • task (dict) – The task query from the database

  • priority (int) – the target priority

wait(query={})

When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.

参数:

query (dict, optional) – the query dict. Defaults to {}.

关于 Task Manager 的更多信息可以在 此处 找到。

任务训练

在生成和存储这些 task 之后,现在可以运行处于 WAITING 状态的 taskQlib 提供了一个名为 run_task 的方法来运行任务池中的这些 task,但用户也可以自定义任务的执行方式。 一个简单的获取 task_func 的方法是直接使用 qlib.model.trainer.task_train,它将运行由 task 定义的整个工作流,包括 ModelDatasetRecord

qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)

While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool

After running this method, here are 4 situations (before_status -> after_status):

STATUS_WAITING -> STATUS_DONE: use task[“def”] as task_func param, it means that the task has not been started

STATUS_WAITING -> STATUS_PART_DONE: use task[“def”] as task_func param

STATUS_PART_DONE -> STATUS_PART_DONE: use task[“res”] as task_func param, it means that the task has been started but not completed

STATUS_PART_DONE -> STATUS_DONE: use task[“res”] as task_func param

参数:
  • task_func (Callable) –

    def (task_def, **kwargs) -> <res which will be committed>

    the function to run the task

  • task_pool (str) – the name of the task pool (Collection in MongoDB)

  • query (dict) – will use this dict to query task_pool when fetching task

  • force_release (bool) – will the program force to release the resource

  • before_status (str:) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.

  • after_status (str:) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.

  • kwargs – the params for task_func

同时,Qlib 还提供了一个名为 Trainer 的模块。

class qlib.model.trainer.Trainer

The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.

__init__()
train(tasks: list, *args, **kwargs) list

Given a list of task definitions, begin training, and return the models.

For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.

参数:

tasks – a list of tasks

返回:

a list of models

返回类型:

list

end_train(models: list, *args, **kwargs) list

Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.

For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.

参数:

models – a list of models

返回:

a list of models

返回类型:

list

is_delay() bool

If Trainer will delay finishing end_train.

返回:

if DelayTrainer

返回类型:

bool

has_worker() bool

Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.

返回:

if the worker is enabled

返回类型:

bool

worker()

start the worker

抛出:

NotImplementedError: – If the worker is not supported

Trainer 将训练一系列任务,并返回一系列模型记录器。 Qlib 提供了两种类型的 TrainerTrainerR 是最简单的方式,而 TrainerRM 基于 TaskManager 可以自动管理任务的生命周期。 如果您不想使用 Task Manager 来管理任务,则使用 TrainerR 训练由 TaskGen 生成的任务列表就足够了。 关于不同 Trainer 的详细信息,请查看 这里

任务收集

在收集模型训练结果之前,您需要使用 qlib.init 指定 mlruns 的路径。

在训练之后收集 task 的结果时,Qlib 提供了 CollectorGroupEnsemble ,以一种可读性强、可扩展性强、耦合度低的方式收集结果。

Collector 可以从各个地方收集对象,并对它们进行处理,如合并、分组、平均等。它有两个步骤的操作,分别是 ``collect``(收集字典中的任何东西)和 ``process_collect``(处理收集到的字典)。

Group 也有两个步骤,包括 group (可以根据 group_func 将一组对象分组并将其转换为字典)和 reduce (可以根据某些规则使字典变为ensemble)。 例如:{(A,B,C1): object, (A,B,C2): object} — group —> {(A,B): {C1: object, C2: object}} — reduce —> {(A,B): object}

Ensemble <../reference/api.html#Ensemble>`_可以合并ensemble中的对象。 例如:{C1: object, C2: object} — ``Ensemble` —> object。 您可以在 Collectorprocess_list 中设置您想要的ensemble。 常见的ensemble包括 AverageEnsembleRollingEnsemble。Average ensemble用于合并同一时间段内不同模型的结果。Rolling ensemble用于合并同一时间段内不同模型的结果。

因此,Collector 的第二步对应于 Group,而 Group 的第二步对应于 Ensemble

更多信息,请参阅 CollectorGroupEnsemble ,或查看这个 示例