DataHub数据源作为数据中枢,为您提供读取和写入DataHub数据库的双向通道,能够快速解决海量数据的计算问题。本文为您介绍DataWorks的DataHub数据同步的能力支持情况。
支持的版本
-
DataHub Reader通过DataHub的Java SDK读取DataHub中的数据,具体使用的Java SDK版本,如下所示。
com.aliyun.DataHub aliyun-sdk-DataHub 2.9.1
-
DataHub Writer通过DataHub服务的Java SDK向DataHub写入数据,使用的日志服务Java SDK版本如下。
com.aliyun.datahub aliyun-sdk-datahub 2.5.1
使用限制
离线读写
STRING字符串仅支持UTF-8编码,单个STRING列最长允许1MB。
实时读写
实时数据同步任务仅支持使用独享数据集成资源组。
全增量实时写
运行同步任务后,生成的离线同步任务将全量数据写入DataHub,待全量数据执行完成后,启动实时同步任务,将源端增量数据实时同步至目标端。数据写入格式如下:
-
仅支持将数据写入DataHub Tuple类型的Topic中。关于DataHub TUPLE数据类型说明,详情请参见:数据类型介绍。
-
实时同步至DataHub会在源表字段基础上,新增5个附加字段,并支持您在配任务配置时,自行添加额外的字段。最终发送给DataHub的消息格式,详情请参见:附录:DataHub消息格式。
支持的字段类型
DataHub同步数据时,会根据DataHub Field的数据类型同步到对应的数据类型中,DataHub仅支持BIGINT、STRING、BOOLEAN、DOUBLE、TIMESTAMP、DECIMAL数据类型。
数据同步任务开发
DataHub数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。
创建数据源
在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源。
单表离线同步任务配置指导
-
操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。
-
脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。
单表、整库实时同步任务配置指导
操作流程请参见DataStudio侧实时同步任务配置。
说明
DataHub不同数据类型对应操作的支持情况,不同数据类型的分片策略、数据格式及相关消息示例。详情请参见:附录:DataHub消息格式。
单表、整库全增量(实时)同步配置指导
操作流程请参见数据集成侧同步任务配置。
常见问题
写入DataHub时,一次性写入数据超限导致写入失败如何处理?
附录:脚本Demo与参数说明
附录:离线任务脚本配置方式
如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。
Reader脚本Demo
{
"type":"job",
"version":"2.0",//版本号
"steps":[
{
"job": {
"content": [
{
"reader": {
"name": "DataHubreader",
"parameter": {
"endpoint": "xxx" //DataHub的endpoint。
"accessId": "xxx", //访问DataHub的用户accessId。
"accessKey": "xxx", //访问DataHub的用户accessKey。
"project": "xxx", //目标DataHub的项目名称。
"topic": "xxx" //目标DataHub的topic名称。
"batchSize": 1000, //一次读取的数据量。
"beginDateTime": "20180910111214", //数据消费的开始时间位点。
"endDateTime": "20180910111614", //数据消费的结束时间位点。
"column": [
"col0",
"col1",
"col2",
"col3",
"col4"
]
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"print": false
}
}
}
]
}
}
],
"setting":{
"errorLimit":{
"record":"0"//错误记录数
},
"speed":{
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":1,//并发数
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order":{
"hops":[
{
"from":"Reader",
"to":"Writer"
}
]
}
}
Reader脚本参数
参数 |
描述 |
是否必选 |
endpoint |
DataHub的endpoint。 |
是 |
accessId |
访问DataHub的用户accessId。 |
是 |
accessKey |
访问DataHub的用户accessKey。 |
是 |
project |
目标DataHub的项目名称。project是DataHub中的资源管理单元,用于资源隔离和控制。 |
是 |
topic |
目标DataHub的topic名称。 |
是 |
batchSize |
一次读取的数据量,默认为1,024条。 |
否 |
beginDateTime |
数据消费的开始时间位点。该参数是时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。 说明 beginDateTime和endDateTime需要互相组合配套使用。 |
是 |
endDateTime |
数据消费的结束时间位点。该参数是时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。 说明 beginDateTime和endDateTime需要互相组合配套使用。 |
是 |
Writer脚本Demo
{
"type": "job",
"version": "2.0",//版本号。
"steps": [
{
"stepType": "stream",
"parameter": {},
"name": "Reader",
"category": "reader"
},
{
"stepType": "datahub",//插件名。
"parameter": {
"datasource": "",//数据源。
"topic": "",//Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。
"maxRetryCount": 500,//任务失败的重试的最多次数。
"maxCommitSize": 1048576//待积累的数据Buffer大小达到maxCommitSize大小(单位Byte)时,批量提交至目的端。
//datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条数据的数据总量来从侧方面进行单次写入datahub的数据条数控制。比如每条数据10 k,那么此参数的设置值要低于10*10000 k。
},
"name": "Writer",
"category": "writer"
}
],
"setting": {
"errorLimit": {
"record": ""//错误记录数。
},
"speed": {
"throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
"concurrent":20, //作业并发数。
"mbps":"12"//限流,此处1mbps = 1MB/s。
}
},
"order": {
"hops": [
{
"from": "Reader",
"to": "Writer"
}
]
}
}
Writer脚本参数
参数 |
描述 |
是否必选 |
默认值 |
accessId |
DataHub的accessId。 |
是 |
无 |
accessKey |
DataHub的accessKey。 |
是 |
无 |
endPoint |
对DataHub资源的访问请求,需要根据资源所属服务,选择正确的域名。 详情请参见域名列表。 |
是 |
无 |
maxRetryCount |
任务失败的最多重试次数。 |
否 |
无 |
mode |
Value是STRING类型时,写入的模式。 |
是 |
无 |
parseContent |
解析内容。 |
是 |
无 |
project |
项目(Project)是DataHub数据的基本组织单元,一个Project下包含多个Topic。 说明 DataHub的项目空间与MaxCompute的项目相互独立,您在MaxCompute中创建的项目不能复用于DataHub,需要单独创建。 |
是 |
无 |
topic |
Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。 详情请参见Project及Topic的数量限制。 |
是 |
无 |
maxCommitSize |
为提高写出效率,DataX会积累Buffer数据,待积累的数据大小达到maxCommitSize 大小(单位Byte)时,批量提交到目的端。默认是1,048,576,即1 MB数据。另外datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条的数据总量来从侧方面进行写入datahub的数据条数控制。 |
否 |
1MB |
内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家
阿里云企业补贴进行中: 马上申请
腾讯云限时活动1折起,即将结束: 马上收藏
同尘科技为腾讯云授权服务中心。
购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠
转转请注明出处:https://www.yunxiaoer.com/171447.html