DataWorks自定义节点中运行任务时,需要调用自定义插件,因此在使用自定义节点前您需要创建好自定义插件包,并上传发布至DataWorks,便于使用自定义节点运行任务时使用。本文为您介绍如何创建自定义插件包。
背景信息
DataWorks的自定义节点运行时,插件开发使用过程中涉及以下2个接口。
submitJob(String codeFilePath, List args)
:提交插件任务的接口。 包含以下入参。- codeFilePath:为页面开发的代码存储绝对路径。
- List args:为页面配置的调度参数,格式为
{"key"="value","key2"="value2"...}
。
返回参数如下。
- 0:表示任务运行成功。
- 2:表示告知调度系统重新运行任务。
- 1、4、或3:表示任务被终止。
- 其他数值:表示任务运行失败。
killJob()
:此接口主要用于监听任务终止(kill)信号,然后会触发该接口调用。此接口没有入参和返回参数。说明 若业务层面在9秒内未处理完毕,整个插件进程会被强制终止,整个任务运行失败。
下载依赖JAR
点击以下链接下载依赖JAR包:alisa-wrapper-face-1.0.0.jar。
打包代码工程包
- 新建插件代码工程。用IDE工具新建一个Maven工程,文件结构和pom.xml文件配置要求如下所示。
- 文件结构要求文件结构要求参考下图,其中lib文件夹下alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。
- pom.xml文件配置要求pom.xml文件示例如下,其中插件的类名(
)为submitJob方法所在的类路径,如
com.alibaba.dw.wrapper
。org.example dw-plugin 1.0-SNAPSHOT com.alibaba.dw alisa-wrapper-face 1.0.0-SNAPSHOT system ${basedir}/lib/alisa-wrapper-face-1.0.0.jar commons-lang commons-lang 2.4 commons-io commons-io 2.6 org.apache.commons commons-lang3 3.8.1 org.apache.maven.plugins maven-assembly-plugin 2.5.5 com.alibaba.dw.wrapper.DemoWrapper jar-with-dependencies make-assembly package assembly
- 文件结构要求文件结构要求参考下图,其中lib文件夹下alisa-wrapper-face-1.0.0.jar为上述下载的依赖JAR包,需要用本地依赖引入的方式引入代码工程中。
- 代码开发。代码示例:基本逻辑
package com.alibaba.dw.alisa.wrapper; import java.io.File; import java.util.List; import com.alibaba.dw.alisawrapper.DwalisaWrapper; import com.alibaba.dw.alisawrapper.constants.Constant; /** **DwalisaWrapper 在alisa-wrapper-face包 **/ public class DemoWrapper extends DwalisaWrapper { /** * codeFilePath: 代码存储的全路径。 * args:传入的参数。注意其中args[0]是作为codeFilePath,如果任务无代码,则args[0]即为第一个参数。 * 该方法实现该插件的主要功能内容,业务逻辑都在此方法内部实现。 * 返回码0:表示任务成功。 * 返回码1、4、3:表示任务被终止。 * 返回码为其他数值:表示任务失败。 */ @SuppressWarnings("deprecation") @Override public Integer submitJob(String codeFilePath, List args) { try { System.err.println("your code->"); //此处实现业务代码,此方法执行完毕后任务执行结束。 } catch (Exception e) { System.err.println(e); } System.out.println("task finished..."); return Constant.SUCCESSED_EXIT_CODE; } /** * 为终止任务方法,一旦发起终止任务时,会调用方法作为一些业务上的处理之后再退出。 * 注意:一旦9秒未退出,则会直接返回kill-9,终止该任务进程。 * */ @Override public void killJob() { System.err.println("收到了终止信号,需要做一些业务操作把任务从服务端终止"); } }
- 设置数据查询结果集被页面展示。如果您希望后续使用自定义节点时,进行数据查询等操作时,结果能在DataWorks的页面中直接展示,您需参考本步骤设置数据查询结果集被页面展示。从submitJob接口业务运行代码后,需将获取到的结果集(比如查询表记录)按照以下规则存储到指定目录中:
- 结果集文件获取方式
String.format("%s/%s.data", System.getenv("TASK_EXEC_PATH"), System.getenv("ALISA_TASK_ID"));
其中
System.getenv
为获取环境变量的方式,详细介绍可参考重点:通过环境变量获取对应的节点信息。 - 存储格式
- 文件格式:CSV格式,字段间以逗号分隔。
- 行分布:首行为列名称,后续各行为具体字段的数据。
样例:
"name","age" "李三","12"
- 结果集文件获取方式
- 代码开发完成后进行Maven构建打包。将本身依赖的包打成JAR包。
- 后续步骤:上传插件包。如果依赖了protobuf、guava的包,或者其他外部依赖JAR包,需将这些依赖的包和上述步骤打成的JAR包合并为ZIP包后,作为插件上传至DataWorks。上传插件的详细操作可参考新增节点插件。
重点:通过环境变量获取对应的节点信息
获取环境变量值的方式:System.getenv("SKYNET_ONDUTY")
。
- SKYNET_ID:节点ID,只有在调度运行才生效,数据开发直接运行是无该参数的。
- SKYNET_BIZDATE:业务日期。
- SKYNET_ONDUTY:
- 临时运行、补数据、测试时:表示操作者ID。
- 日常调度:节点责任人ID(工号/baseid)。
- SKYNET_TASKID:节点实例。
- IDSKYNET_SYSTEM_ENV:对应的项目环境,包括:dev 、prod。
- SKYNET_CYCTIME:节点实例的定时时间。
- SKYNET_CONNECTION:该环境变量可以读取到节点绑定数据源的具体连接串信息(通常为JSON格式)。
- SKYNET_TENANT_ID:租户ID。
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/167375.html