Task是MaxCompute的基本计算单元,例如SQL Task。本文为您介绍PyODPS中任务实例的基本操作。
基本操作
Task在执行时会被实例化, 以MaxCompute实例(下文简称为实例或Instance)的形式存在。实例常见的操作如下:
-
list_instances()
:获取项目空间下的所有的Instance。 -
exist_instance()
:判断某实例是否存在。 -
get_instance()
:获取实例。 -
stop_instance()
:停止实例。仅支持对运行中的实例进行停止实例操作,否则会报错。说明
通过Instance对象调用
stop
方法也可以用于停止Instance,如o.get_instance('instance id').stop
。
示例
-
获取Instancede的ID
for instance in o.list_instances(): print(instance.id)
-
判断Instance是否存在
print(o.exist_instance('my_instance_id'))
获取Logview地址
-
对于SQL等任务,通过Instance对象调用
get_logview_address
方法即可获取Logview地址。#从已有的instance对象获取Logview地址。 instance = o.run_sql('desc pyodps_iris') print(instance.get_logview_address()) #从instance id获取Logview地址。 instance = o.get_instance('my_instance_id') print(instance.get_logview_address())
-
对于机器学习PAI类型任务,需要先枚举其子任务,再获取子任务的Logview。
instance = o.run_xflow('AppendID', 'algo_public', {'inputTableName': 'input_table', 'outputTableName': 'output_table'}) for sub_inst_name, sub_inst in o.get_xflow_sub_instances(instance).items(): print('%s: %s' % (sub_inst_name, sub_inst.get_logview_address()))
任务实例状态
一个Instance的状态可以是Running
、Suspended
或者 Terminated
。关于任务实例状态的方法有以下几种:
-
status
:属性来获取状态。 -
is_terminated
:返回当前Instance是否已经执行完毕的结果。 -
is_successful
:返回当前Instance是否正确地执行完毕的结果。任务处于运行中或者执行失败都会返回False。
代码示例如下。
-
获取Instance状态。
instance=o.get_instance('my_instance_id') print(instance.status) print(instance.status.value)
返回值示例:
Status.TERMINATED Terminated
-
判断当前示例是否已经执行完毕。
instance=o.get_instance('my_instance_id') from odps.models import Instance print(instance.status == Instance.Status.TERMINATED)
返回值为
True
表示已执行完毕。
说明
调用wait_for_completion
方法会阻塞直到Instance执行完成。wait_for_success
方法同样会阻塞,但如果最终任务执行失败,会抛出相关异常。
子任务操作
一个Instance在运行时,可能包含一个或者多个子任务,这些子任务称为Task(注意:这里的Task不同于MaxCompute的计算单元)。关于Task的方法有以下几种:
-
get_task_names
:通过此方法获取所有的Task任务。该方法返回一个所有子任务的名称列表。instance=o.get_instance('my_instance_id') instance.get_task_names()
返回值示例:
['jdbc_sql_task']
-
get_task_result
:指定Task名称,获取该Task的执行结果。该方法是以字典的形式返回每个Task的执行结果。 代码示例如下:-
获取子任务名称。
instance=o.execute_sql('select*frompyodps_irislimit1') print(instance.get_task_names())
返回值示例:
['AnonymousSQLTask']
-
获取子任务运行结果。
print(instance.get_task_result('AnonymousSQLTask'))
返回值示例:
"sepallength","sepalwidth","petallength","petalwidth","name" 4.9,3.0,1.4,0.2,"Iris-setosa"
-
获取子任务结果。
print(instance.get_task_results())
返回值示例:
OrderedDict([('AnonymousSQLTask', '"sepallength","sepalwidth","petallength","petalwidth","name" 4.9,3.0,1.4,0.2,"Iris-setosa" ')])
-
-
get_task_progress
:该方法获取在任务实例运行时Task当前的运行进度。instance=o.get_instance('20160519101349613gzbzufck2') while not instance.is_terminated(): for task_name in instance.get_task_names(): print(instance.id, instance.get_task_progress(task_name).get_stage_progress_formatted_string()) time.sleep(10)
返回值示例:
20160519101349613gzbzufck2 2016-05-19 18:14:03 M1_Stg1_job0:0/1/1[100%]
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/159698.html