任务管理
介绍
工作流程 部分介绍了如何以松散耦合的方式运行研究工作流程。但当您使用 qrun
时,它只能执行一个 task
。
为了自动生成和执行不同的任务,任务管理
提供了一个完整的流程,包括 任务生成 、任务存储 、任务训练 和 任务收集 。
使用此模块,用户可以按不同的时间周期、不同的损失甚至不同的模型自动运行他们的 task
。任务生成、模型训练、数据合并和数据收集的过程如下图所示。
整个流程可以在 在线服务 中使用。
整个流程的示例可以在 此处 查看。
任务生成
task
由用户添加的 Model、Dataset、Record 或其他任何内容组成。
可以在 任务部分 中查看具体的任务模板。
尽管任务模板是固定的,但用户可以根据任务模板定制自己的 TaskGen
以生成不同的任务。
下面是 TaskGen
的基类:
- class qlib.workflow.task.gen.TaskGen
The base class for generating different tasks
Example 1:
input: a specific task template and rolling steps
output: rolling version of the tasks
Example 2:
input: a specific task template and losses list
output: a set of tasks with different losses
- abstract generate(task: dict) List[dict]
Generate different tasks based on a task template
- 参数:
task (dict) – a task template
- 返回:
A list of tasks
- 返回类型:
List[dict]
Qlib
提供了一个名为 RollingGen 的类,用于在不同日期段生成数据集的 task
列表。
该类允许用户在一个实验中验证不同时期的数据对模型的影响。更多信息可以在 此处 找到。
任务存储
为了实现更高的效率和集群操作的可能性,Task Manager
将把所有任务存储在 MongoDB 中。
TaskManager
可以自动获取未完成的任务并管理一组任务的生命周期,并进行错误处理。
在使用此模块时,用户 必须 完成对 MongoDB 的配置。
用户需要在 初始化 过程中提供 MongoDB 的 URL 和数据库名称,或者可以使用以下语句进行声明。
from qlib.config import C C["mongo"] = { "task_url" : "mongodb://localhost:27017/", # your MongoDB url "task_db_name" : "rolling_db" # database name }
- class qlib.workflow.task.manage.TaskManager(task_pool: str)
Here is what will a task looks like when it created by TaskManager
{ 'def': pickle serialized task definition. using pickle will make it easier 'filter': json-like data. This is for filtering the tasks. 'status': 'waiting' | 'running' | 'done' 'res': pickle serialized task result, }
The tasks manager assumes that you will only update the tasks you fetched. The mongo fetch one and update will make it date updating secure.
This class can be used as a tool from commandline. Here are several examples. You can view the help of manage module with the following commands: python -m qlib.workflow.task.manage -h # show manual of manage module CLI python -m qlib.workflow.task.manage wait -h # show manual of the wait command of manage
python -m qlib.workflow.task.manage -t <pool_name> wait python -m qlib.workflow.task.manage -t <pool_name> task_stat
备注
Assumption: the data in MongoDB was encoded and the data out of MongoDB was decoded
Here are four status which are:
STATUS_WAITING: waiting for training
STATUS_RUNNING: training
STATUS_PART_DONE: finished some step and waiting for next step
STATUS_DONE: all work done
- __init__(task_pool: str)
Init Task Manager, remember to make the statement of MongoDB url and database name firstly. A TaskManager instance serves a specific task pool. The static method of this module serves the whole MongoDB.
- 参数:
task_pool (str) – the name of Collection in MongoDB
- static list() list
List the all collection(task_pool) of the db.
- 返回:
list
- replace_task(task, new_task)
Use a new task to replace a old one
- 参数:
task – old task
new_task – new task
- insert_task(task)
Insert a task.
- 参数:
task – the task waiting for insert
- 返回:
pymongo.results.InsertOneResult
- insert_task_def(task_def)
Insert a task to task_pool
- 参数:
task_def (dict) – the task definition
- 返回类型:
pymongo.results.InsertOneResult
- create_task(task_def_l, dry_run=False, print_nt=False) List[str]
If the tasks in task_def_l are new, then insert new tasks into the task_pool, and record inserted_id. If a task is not new, then just query its _id.
- 参数:
task_def_l (list) – a list of task
dry_run (bool) – if insert those new tasks to task pool
print_nt (bool) – if print new task
- 返回:
a list of the _id of task_def_l
- 返回类型:
List[str]
- fetch_task(query={}, status='waiting') dict
Use query to fetch tasks.
- 参数:
query (dict, optional) – query dict. Defaults to {}.
status (str, optional) – [description]. Defaults to STATUS_WAITING.
- 返回:
a task(document in collection) after decoding
- 返回类型:
dict
- safe_fetch_task(query={}, status='waiting')
Fetch task from task_pool using query with contextmanager
- 参数:
query (dict) – the dict of query
- 返回:
dict
- 返回类型:
a task(document in collection) after decoding
- query(query={}, decode=True)
Query task in collection. This function may raise exception pymongo.errors.CursorNotFound: cursor id not found if it takes too long to iterate the generator
python -m qlib.workflow.task.manage -t <your task pool> query ‘{“_id”: “615498be837d0053acbc5d58”}’
- 参数:
query (dict) – the dict of query
decode (bool) –
- 返回:
dict
- 返回类型:
a task(document in collection) after decoding
- re_query(_id) dict
Use _id to query task.
- 参数:
_id (str) – _id of a document
- 返回:
a task(document in collection) after decoding
- 返回类型:
dict
- commit_task_res(task, res, status='done')
Commit the result to task[‘res’].
- 参数:
task ([type]) – [description]
res (object) – the result you want to save
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_DONE.
- return_task(task, status='waiting')
Return a task to status. Always using in error handling.
- 参数:
task ([type]) – [description]
status (str, optional) – STATUS_WAITING, STATUS_RUNNING, STATUS_DONE, STATUS_PART_DONE. Defaults to STATUS_WAITING.
- remove(query={})
Remove the task using query
- 参数:
query (dict) – the dict of query
- task_stat(query={}) dict
Count the tasks in every status.
- 参数:
query (dict, optional) – the query dict. Defaults to {}.
- 返回:
dict
- reset_waiting(query={})
Reset all running task into waiting status. Can be used when some running task exit unexpected.
- 参数:
query (dict, optional) – the query dict. Defaults to {}.
- prioritize(task, priority: int)
Set priority for task
- 参数:
task (dict) – The task query from the database
priority (int) – the target priority
- wait(query={})
When multiprocessing, the main progress may fetch nothing from TaskManager because there are still some running tasks. So main progress should wait until all tasks are trained well by other progress or machines.
- 参数:
query (dict, optional) – the query dict. Defaults to {}.
关于 Task Manager
的更多信息可以在 此处 找到。
任务训练
在生成和存储这些 task
之后,现在可以运行处于 WAITING 状态的 task
。
Qlib
提供了一个名为 run_task
的方法来运行任务池中的这些 task
,但用户也可以自定义任务的执行方式。
一个简单的获取 task_func
的方法是直接使用 qlib.model.trainer.task_train
,它将运行由 task
定义的整个工作流,包括 Model、Dataset 和 Record。
- qlib.workflow.task.manage.run_task(task_func: Callable, task_pool: str, query: dict = {}, force_release: bool = False, before_status: str = 'waiting', after_status: str = 'done', **kwargs)
While the task pool is not empty (has WAITING tasks), use task_func to fetch and run tasks in task_pool
After running this method, here are 4 situations (before_status -> after_status):
STATUS_WAITING -> STATUS_DONE: use task[“def”] as task_func param, it means that the task has not been started
STATUS_WAITING -> STATUS_PART_DONE: use task[“def”] as task_func param
STATUS_PART_DONE -> STATUS_PART_DONE: use task[“res”] as task_func param, it means that the task has been started but not completed
STATUS_PART_DONE -> STATUS_DONE: use task[“res”] as task_func param
- 参数:
task_func (Callable) –
def (task_def, **kwargs) -> <res which will be committed>
the function to run the task
task_pool (str) – the name of the task pool (Collection in MongoDB)
query (dict) – will use this dict to query task_pool when fetching task
force_release (bool) – will the program force to release the resource
before_status (str:) – the tasks in before_status will be fetched and trained. Can be STATUS_WAITING, STATUS_PART_DONE.
after_status (str:) – the tasks after trained will become after_status. Can be STATUS_WAITING, STATUS_PART_DONE.
kwargs – the params for task_func
同时,Qlib
还提供了一个名为 Trainer
的模块。
- class qlib.model.trainer.Trainer
The trainer can train a list of models. There are Trainer and DelayTrainer, which can be distinguished by when it will finish real training.
- __init__()
- train(tasks: list, *args, **kwargs) list
Given a list of task definitions, begin training, and return the models.
For Trainer, it finishes real training in this method. For DelayTrainer, it only does some preparation in this method.
- 参数:
tasks – a list of tasks
- 返回:
a list of models
- 返回类型:
list
- end_train(models: list, *args, **kwargs) list
Given a list of models, finished something at the end of training if you need. The models may be Recorder, txt file, database, and so on.
For Trainer, it does some finishing touches in this method. For DelayTrainer, it finishes real training in this method.
- 参数:
models – a list of models
- 返回:
a list of models
- 返回类型:
list
- is_delay() bool
If Trainer will delay finishing end_train.
- 返回:
if DelayTrainer
- 返回类型:
bool
- has_worker() bool
Some trainer has backend worker to support parallel training This method can tell if the worker is enabled.
- 返回:
if the worker is enabled
- 返回类型:
bool
- worker()
start the worker
- 抛出:
NotImplementedError: – If the worker is not supported
Trainer
将训练一系列任务,并返回一系列模型记录器。
Qlib
提供了两种类型的 Trainer
, TrainerR
是最简单的方式,而 TrainerRM
基于 TaskManager
可以自动管理任务的生命周期。
如果您不想使用 Task Manager
来管理任务,则使用 TrainerR
训练由 TaskGen
生成的任务列表就足够了。
关于不同 Trainer
的详细信息,请查看 这里 。
任务收集
在收集模型训练结果之前,您需要使用 qlib.init
指定 mlruns
的路径。
在训练之后收集 task
的结果时,Qlib
提供了 Collector、Group 和 Ensemble ,以一种可读性强、可扩展性强、耦合度低的方式收集结果。
Collector 可以从各个地方收集对象,并对它们进行处理,如合并、分组、平均等。它有两个步骤的操作,分别是 ``collect``(收集字典中的任何东西)和 ``process_collect``(处理收集到的字典)。
Group 也有两个步骤,包括 group
(可以根据 group_func
将一组对象分组并将其转换为字典)和 reduce
(可以根据某些规则使字典变为ensemble)。
例如:{(A,B,C1): object, (A,B,C2): object} — group
—> {(A,B): {C1: object, C2: object}} — reduce
—> {(A,B): object}
Ensemble <../reference/api.html#Ensemble>`_可以合并ensemble中的对象。
例如:{C1: object, C2: object} — ``Ensemble` —> object。
您可以在 Collector
的 process_list
中设置您想要的ensemble。
常见的ensemble包括 AverageEnsemble
和 RollingEnsemble
。Average ensemble用于合并同一时间段内不同模型的结果。Rolling ensemble用于合并同一时间段内不同模型的结果。
因此,Collector
的第二步对应于 Group
,而 Group
的第二步对应于 Ensemble
。