详情页标题前

阿里云大数据开发治理平台 DataWorksDataHub数据源-云淘科技

详情页1

DataHub数据源作为数据中枢,为您提供读取和写入DataHub数据库的双向通道,能够快速解决海量数据的计算问题。本文为您介绍DataWorks的DataHub数据同步的能力支持情况。

支持的版本

  • DataHub Reader通过DataHub的Java SDK读取DataHub中的数据,具体使用的Java SDK版本,如下所示。

    
        com.aliyun.DataHub
        aliyun-sdk-DataHub
        2.9.1
    
  • DataHub Writer通过DataHub服务的Java SDK向DataHub写入数据,使用的日志服务Java SDK版本如下。

    
        com.aliyun.datahub
        aliyun-sdk-datahub
        2.5.1
    

使用限制

离线读写

STRING字符串仅支持UTF-8编码,单个STRING列最长允许1MB。

实时读写

实时数据同步任务仅支持使用独享数据集成资源组。

全增量实时写

运行同步任务后,生成的离线同步任务将全量数据写入DataHub,待全量数据执行完成后,启动实时同步任务,将源端增量数据实时同步至目标端。数据写入格式如下:

  • 仅支持将数据写入DataHub Tuple类型的Topic中。关于DataHub TUPLE数据类型说明,详情请参见:数据类型介绍。

  • 实时同步至DataHub会在源表字段基础上,新增5个附加字段,并支持您在配任务配置时,自行添加额外的字段。最终发送给DataHub的消息格式,详情请参见:附录:DataHub消息格式。

支持的字段类型

DataHub同步数据时,会根据DataHub Field的数据类型同步到对应的数据类型中,DataHub仅支持BIGINTSTRINGBOOLEANDOUBLETIMESTAMPDECIMAL数据类型。

数据同步任务开发

DataHub数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源。

单表离线同步任务配置指导

  • 操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。

  • 脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置。

说明

DataHub不同数据类型对应操作的支持情况,不同数据类型的分片策略、数据格式及相关消息示例。详情请参见:附录:DataHub消息格式。

单表、整库全增量(实时)同步配置指导

操作流程请参见数据集成侧同步任务配置。

常见问题

写入DataHub时,一次性写入数据超限导致写入失败如何处理?

附录:脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

Reader脚本Demo

{
    "type":"job",
    "version":"2.0",//版本号
    "steps":[
        {
         "job": {
           "content": [
            {
                "reader": {
                    "name": "DataHubreader",
                    "parameter": {
                        "endpoint": "xxx" //DataHub的endpoint。
                        "accessId": "xxx", //访问DataHub的用户accessId。
                        "accessKey": "xxx", //访问DataHub的用户accessKey。
                        "project": "xxx", //目标DataHub的项目名称。
                        "topic": "xxx" //目标DataHub的topic名称。
                        "batchSize": 1000, //一次读取的数据量。
                        "beginDateTime": "20180910111214", //数据消费的开始时间位点。
                        "endDateTime": "20180910111614", //数据消费的结束时间位点。
                        "column": [
                            "col0",
                            "col1",
                            "col2",
                            "col3",
                            "col4"
                                  ]
                                }
                           },
                "writer": {
                    "name": "streamwriter",
                    "parameter": {
                        "print": false
                                 }
                            }
             }
           ]
         }
       }
     ],
    "setting":{
        "errorLimit":{
            "record":"0"//错误记录数
        },
        "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//并发数
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order":{
        "hops":[
            {
                "from":"Reader",
                "to":"Writer"
            }
        ]
    }
}

Reader脚本参数

参数

描述

是否必选

endpoint

DataHub的endpoint。

accessId

访问DataHub的用户accessId。

accessKey

访问DataHub的用户accessKey。

project

目标DataHub的项目名称。project是DataHub中的资源管理单元,用于资源隔离和控制。

topic

目标DataHub的topic名称。

batchSize

一次读取的数据量,默认为1,024条。

beginDateTime

数据消费的开始时间位点。该参数是时间范围(左闭右开)的左边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。

说明

beginDateTime和endDateTime需要互相组合配套使用。

endDateTime

数据消费的结束时间位点。该参数是时间范围(左闭右开)的右边界,yyyyMMddHHmmss格式的时间字符串,可以和DataWorks的调度时间参数配合使用。

说明

beginDateTime和endDateTime需要互相组合配套使用。

Writer脚本Demo

{
    "type": "job",
    "version": "2.0",//版本号。
    "steps": [
        { 
            "stepType": "stream",
            "parameter": {},
            "name": "Reader",
            "category": "reader"
        },
        {
            "stepType": "datahub",//插件名。
            "parameter": {
                "datasource": "",//数据源。
                "topic": "",//Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。
                "maxRetryCount": 500,//任务失败的重试的最多次数。
                "maxCommitSize": 1048576//待积累的数据Buffer大小达到maxCommitSize大小(单位Byte)时,批量提交至目的端。
                 //datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条数据的数据总量来从侧方面进行单次写入datahub的数据条数控制。比如每条数据10 k,那么此参数的设置值要低于10*10000 k。
            },
            "name": "Writer",
            "category": "writer"
        }
    ],
    "setting": {
        "errorLimit": {
            "record": ""//错误记录数。
        },
        "speed": {
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":20, //作业并发数。
            "mbps":"12"//限流,此处1mbps = 1MB/s。
        }
    },
    "order": {
        "hops": [
            {
                "from": "Reader",
                "to": "Writer"
            }
        ]
    }
}

Writer脚本参数

参数

描述

是否必选

默认值

accessId

DataHub的accessId。

accessKey

DataHub的accessKey。

endPoint

对DataHub资源的访问请求,需要根据资源所属服务,选择正确的域名。

详情请参见域名列表。

maxRetryCount

任务失败的最多重试次数。

mode

Value是STRING类型时,写入的模式。

parseContent

解析内容。

project

项目(Project)是DataHub数据的基本组织单元,一个Project下包含多个Topic。

说明

DataHub的项目空间与MaxCompute的项目相互独立,您在MaxCompute中创建的项目不能复用于DataHub,需要单独创建。

topic

Topic是DataHub订阅和发布的最小单位,您可以用Topic来表示一类或者一种流数据。

详情请参见Project及Topic的数量限制。

maxCommitSize

为提高写出效率,DataX会积累Buffer数据,待积累的数据大小达到maxCommitSize 大小(单位Byte)时,批量提交到目的端。默认是1,048,576,即1 MB数据。另外datahub侧对于一次request请求写入的数据条数限制是10000条,超出10000条数据会超出限制导致任务出错,请根据您单条数据平均数据量*10000条的数据总量来从侧方面进行写入datahub的数据条数控制。

1MB

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/171447.html

(0)
上一篇 2023年12月10日
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云ECS云服务器搭建PHPWind论坛系统-云淘科技

    PHPWind论坛系统是一款基于PHP语言开发的开源论坛系统,支持多种数据库(如MySQL、SQL Server等),具有高度的可扩展性和可定制性。它提供了用户管理、帖子管理、板块管理、权限管理等功能,可以轻松搭建一个功能完善、易于管理的论坛网站。本文介绍如何使用阿里云市场镜像快速搭建PHPWind论坛系统。 操作步骤 前往实例购买页,使用云市场镜像快速搭建…

    阿里云服务器 2023年12月9日
  • 阿里云云原生大数据计算服务 MaxCompute运行SQL命令并导出结果数据-云淘科技

    本文介绍如何在%ignore_a_1%客户端上运行SQL命令并通过Tunnel Download导出结果数据。 前提条件 已向MaxCompute的表中导入数据。更多导入数据操作,请参见导入数据。 背景信息 MaxCompute客户端支持DDL、DML、DQL等操作,您可以结合相应语法运行SQL命令。 MaxCompute客户端的常用SQL命令,请参见常用命…

  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云日志服务SLS产品优势-云淘科技

    本文介绍日志服务的优势。 统一接入 支持多种来源的多种类型数据接入。 智能 提供完整AIOps能力,支持智能异常检测与根因分析能力。 高效 提供千亿级数据实时采集和查询与分析能力。 一站式 提供一站式数据功能,包括数据采集、加工、查询与分析、可视化、告警等。 弹性 提供PB级别数据弹性伸缩能力。 低成本 支持按量付费。您仅需为实际用量付费,总拥有成本(TCO…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云日志服务SLS时序库(MetricStore)-云淘科技

    时序库(MetricStore)是日志服务中时序数据的采集、存储和查询单元。 每个MetricStore隶属于一个Project,每个Project中可创建多个MetricStore。您可以根据实际需求为某个项目创建多个MetricStore,一般是为不同类型的时序数据创建不同的MetricStore。例如您需要采集基础主机监控数据、云服务监控数据、业务应用…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云云原生大数据计算服务 MaxCompute压缩数据从OSS迁移至MaxCompute-云淘科技

    本文为您介绍如何通过MaxCompute外部表能力,将SNAPPY压缩文件数据(以半角逗号分隔)从OSS迁移至MaxCompute。 前提条件 已开通MaxCompute并创建项目,详情请参见创建MaxCompute项目。 已开通OSS并创建存储空间,且Bucket中有压缩数据SNAPPY文件,开通OSS创建存储空间详情请参见创建存储空间。本文使用的示例文件…

    2023年12月10日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。