详情页标题前

阿里云大数据开发治理平台 DataWorksHologres数据源-云淘科技

详情页1

Hologres数据源为您提供读取和写入Hologres双向通道的功能,本文为您介绍DataWorks的Hologres数据同步的能力支持情况。

支持的版本

Hologres支持的版本:0.7、0.8、0.9、0.10、1.1、1.2、1.3。

使用限制

离线读写

  • Hologres数据源仅支持使用独享数据集成资源组。

  • Hologres Writer不支持写入数据至Hologres的外部

  • Hologres数据源连通性获取Hologres端点的逻辑:

    • 当前地域的Hologres实例,Hologres端点获取顺序:any Tunnel > single Tunnel > Public(公网)

    • 跨地域的Hologres实例,Hologres端点获取顺序:Public(公网) > single Tunnel

整库实时写

  • 实时数据同步任务仅支持使用独享数据集成资源组。

  • 实时数据同步任务暂不支持同步没有主键的表。

单表、整库全增量实时写

同步数据至Hologres时,目前仅支持将数据写入分区表子表,暂不支持写入数据至分区表父表。

支持的字段类型

离线读写

不支持UUID类型的字段。

字段类型

离线读(Hologres Reader)

离线写(Hologres Writer)

实时写

UUID

不支持

不支持

不支持

CHAR

支持

支持

支持

NCHAR

支持

支持

支持

VARCHAR

支持

支持

支持

LONGVARCHAR

支持

支持

支持

NVARCHAR

支持

支持

支持

LONGNVARCHAR

支持

支持

支持

CLOB

支持

支持

支持

NCLOB

支持

支持

支持

SMALLINT

支持

支持

支持

TINYINT

支持

支持

支持

INTEGER

支持

支持

支持

BIGINT

支持

支持

支持

NUMERIC

支持

支持

支持

DECIMAL

支持

支持

支持

FLOAT

支持

支持

支持

REAL

支持

支持

支持

DOUBLE

支持

支持

支持

TIME

支持

支持

支持

DATE

支持

支持

支持

TIMESTAMP

支持

支持

支持

BINARY

支持

支持

支持

VARBINARY

支持

支持

支持

BLOB

支持

支持

支持

LONGVARBINARY

支持

支持

支持

BOOLEAN

支持

支持

支持

BIT

支持

支持

支持

JSON

支持

支持

支持

JSONB

支持

支持

支持

单表实时写

不支持UUID类型的字段。

实现原理

离线读写

Hologres Reader通过PSQL读取Hologres表中的数据,根据表的Shard Count发起多个并发,每个Shard对应一个Select并发任务:

  • Hologres在创建表时,在同一个CREATE TABLE事务中,通过CALL set_table_property('table_name', 'shard_count', 'xx')配置表的Shard Count。

    默认情况下,使用数据库默认的Shard Count,具体数值取决于Hologres实例的配置。

  • Select语句通过表的内置列hg_shard_id的Shard筛选数据。

离线写

Hologres Writer通过数据同步框架获取Reader生成的协议数据,根据conflictMode(冲突策略)的配置决定写入数据时的冲突解决策略。

您可以通过配置conflictMode,决定新导入的数据和已有数据的主键发生冲突时,如何处理新导入的数据:

重要

conflictMode仅适用于有主键的表。具体写入原理和性能,详情请参考技术原理。

  • conflictMode为Replace(整行更新)模式时,新数据覆盖旧数据,整行所有列全部覆盖,没有配置列映射的字段会强制写NULL。

  • conflictMode为Update(更新)模式时,新数据覆盖旧数据,只覆盖配置有列映射的字段。

  • conflictMode为Ignore(忽略)模式时,忽略新数据。

数据同步任务开发

Hologres数据同步任务的配置入口和通用配置流程指导可参见下文的配置指导,详细的配置参数解释可在配置界面查看对应参数的文案提示。

创建数据源

在进行数据同步任务开发时,您需要在DataWorks上创建一个对应的数据源,操作流程请参见创建与管理数据源。

单表离线同步任务配置指导

  • 操作流程请参见通过向导模式配置离线同步任务、通过脚本模式配置离线同步任务。

  • 脚本模式配置的全量参数和脚本Demo请参见下文的附录:脚本Demo与参数说明。

单表、整库实时同步任务配置指导

操作流程请参见DataStudio侧实时同步任务配置。

单表、整库全增量实时写任务配置指导

操作流程请参见数据集成侧同步任务配置。

附录:脚本Demo与参数说明

附录:离线任务脚本配置方式

如果您配置离线任务时使用脚本模式的方式进行配置,您需要在任务脚本中按照脚本的统一格式要求编写脚本中的reader参数和writer参数,脚本模式的统一要求请参见通过脚本模式配置离线同步任务,以下为您介绍脚本模式下的数据源的Reader参数和Writer参数的指导详情。

Reader脚本Demo

  • 配置非分区表

    • 配置从Hologres非分区表读取数据至内存,如下所示。

      {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
          {
            "stepType":"holo",//插件名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //访问Hologres的accessId。
              "accessKey": "*******************", //访问Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_****",
              "column" : [ //字段。
                "tag",
                "id",
                "title"
              ]
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//错误记录数。
          },
          "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//作业并发数。 
                        "mbps":"12"//限流,此处1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists holo_reader_basic_src;
      create table holo_reader_basic_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id));
        call set_table_property('holo_reader_basic_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_src', 'shard_count', '3');
      commit;
  • 配置分区表

    • 配置从内存产生的数据同步至Hologres分区表的子表。

      说明

      请注意partition的配置。

      {
        "type":"job",
        "version":"2.0",//版本号。
        "steps":[
          {
            "stepType":"holo",//插件名。
            "parameter":{
              "endpoint": "instance-id-region-endpoint.hologres.aliyuncs.com:port",
              "accessId": "***************", //访问Hologres的accessId。
              "accessKey": "*******************", //访问Hologres的accessKey。
              "database": "postgres",
              "table": "holo_reader_basic_****",
              "partition": "tag=foo",
              "column" : [
                "*"
              ],
              "fetchSize": "100"
            },
            "name":"Reader",
            "category":"reader"
          },
          {
            "stepType":"stream",
            "parameter":{},
            "name":"Writer",
            "category":"writer"
          }
        ],
        "setting":{
          "errorLimit":{
            "record":"0"//错误记录数。
          },
          "speed":{
            "throttle":true,//当throttle值为false时,mbps参数不生效,表示不限流;当throttle值为true时,表示限流。
            "concurrent":1,//作业并发数。
                        "mbps":"12"//限流,此处1mbps = 1MB/s。
          }
        },
        "order":{
          "hops":[
            {
              "from":"Reader",
              "to":"Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists holo_reader_basic_part_src;
      create table holo_reader_basic_part_src(
        tag text not null, 
        id int not null, 
        title text not null, 
        body text, 
        primary key (tag, id))
        partition by list( tag );
        call set_table_property('holo_reader_basic_part_src', 'orientation', 'column');
        call set_table_property('holo_reader_basic_part_src', 'shard_count', '3');
      commit;
      
      create table holo_reader_basic_part_src_1583161774228 partition of holo_reader_basic_part_src for values in ('foo');
      
      # 确保分区表子表已经创建且导入数据。
      postgres=# \d+ holo_reader_basic_part_src
                               Table "public.holo_reader_basic_part_src"
       Column |  Type   | Collation | Nullable | Default | Storage  | Stats target | Description 
      --------+---------+-----------+----------+---------+----------+--------------+-------------
       tag    | text    |           | not null |         | extended |              | 
       id     | integer |           | not null |         | plain    |              | 
       title  | text    |           | not null |         | extended |              | 
       body   | text    |           |          |         | extended |              | 
      Partition key: LIST (tag)
      Indexes:
          "holo_reader_basic_part_src_pkey" PRIMARY KEY, btree (tag, id)
      Partitions: holo_reader_basic_part_src_1583161774228 FOR VALUES IN ('foo')

Reader脚本参数

参数

描述

是否必选

默认值

endpoint

目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。

endpoint包括经典网络、公网和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:

  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例配在同一个地域的同一个可用区,以保证网络端口连通,实现最大性能。

accessId

访问Hologres的accessId。

accessKey

访问Hologres的accessKey,请确保该密钥对目标表有写入权限。

database

Hologres实例内部数据库的名称。

table

Hologres的表名称,如果是分区表,请指定父表的名称。

column

定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。

partition

针对分区表,表示分区Column以及对应的Value,格式为column=value

重要

  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

  • 请确认该参数和表DDL的分区配置匹配。

  • 请确认对应的子表已经创建,且已经导入数据。

空,表示非分区表。

fetchSize

指定使用Select语句一次性读取数据的条数。

1,000

Writer脚本Demo

  • 配置非分区表

    • 配置从内存产生的数据导入至Hologres普通表,示例为通过JDBC模式导入的配置。

      {
          "type": "job",
          "version": "2.0",
          "steps": [
              {
                  "stepType": "mysql",
                  "parameter": {
                      "envType": 0,
                          "datasource": "",
                      "column": [
                          "",
                          "",
                          ......,
                          ""
                      ],
                      "connection": [
                          {
                              "datasource": "",//mysql数据源名
                              "table": [
                                  ""
                              ]
                          }
                      ],
                      "where": "",
                      "splitPk": "",
                      "encoding": "UTF-8"
                  },
                  "name": "Reader",
                  "category": "reader"
              },
              {
                  "stepType": "holo",
                  "parameter": {
                      "maxConnectionCount": 9,
                      "datasource": "",//Hologres数据源名称
                      "truncate":true,//清理规则。
                      "conflictMode": "ignore",
                      "envType": 0,
                      "column": [
                          "",
                          "",
                          ......,
                          ""
                      ],
                      "table": ""
                  },
                  "name": "Writer",
                  "category": "writer"
              }
          ],
          "setting": {
              "executeMode": null,
              "errorLimit": {
                  "record": ""
              },
              "speed": {
                  "concurrent": 2,//作业并发数
                  "throttle": false//限流
              }
          },
          "order": {
              "hops": [
                  {
                      "from": "Reader",
                      "to": "Writer"
                  }
              ]
          }
      }
    • Hologres表的DDL语句,如下所示。

      begin;
      drop table if exists mysql_to_holo_test;
      create table mysql_to_holo_test(
        tag text not null,
        id int not null,
        body text not null,
        brrth date,
        primary key (tag, id));
        call set_table_property('mysql_to_holo_test', 'orientation', 'column');
        call set_table_property('mysql_to_holo_test', 'distribution_key', 'id');
        call set_table_property('mysql_to_holo_test', 'clustering_key', 'birth');
      commit;
  • 配置分区表

    说明

    • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

    • 请确认该参数和表DDL的分区配置匹配。

    • 配置从内存产生的数据同步至Hologres分区表的子表。

      {
        "type": "job",
        "version": "2.0",
        "steps": [
          {
            "stepType": "mysql",
            "parameter": {
              "envType": 0,
              "datasource": "",
              "column": [
                "",
                "",
                  ......,
                ""
              ],
              "connection": [
                {
                  "datasource": "",
                  "table": [
                    ""
                  ]
                }
              ],
              "where": "",
              "splitPk": "",//mysql的pk字段
              "encoding": "UTF-8"
            },
            "name": "Reader",
            "category": "reader"
          },
          {
            "stepType": "holo",
            "parameter": {
              "maxConnectionCount": 9,
              "partition": "",//Hologres分区键
              "datasource": "",//Hologres数据源名
              "conflictMode": "ignore",
              "envType": 0,
              "column": [
                "",
                "",
                  ......,
                ""
              ],
              "table": ""
            },
            "name": "Writer",
            "category": "writer"
          }
        ],
        "setting": {
          "executeMode": null,
          "errorLimit": {
            "record": ""
          },
          "speed": {
            "concurrent": 2,//作业并发数
            "throttle": false//限流
          }
        },
        "order": {
          "hops": [
            {
              "from": "Reader",
              "to": "Writer"
            }
          ]
        }
      }
    • Hologres表的DDL语句,如下所示。

      BEGIN;
      CREATE TABLE public.hologres_parent_table(
        a text ,
        b int,
        c timestamp,
        d text,
        ds text,
        primary key(ds,b)
        )
        PARTITION BY LIST(ds);
      CALL set_table_property('public.hologres_parent_table', 'orientation', 'column');
      CREATE TABLE public.holo_child_1 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201215');
      CREATE TABLE public.holo_child_2 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201216');
      CREATE TABLE public.holo_child_3 PARTITION OF public.hologres_parent_table FOR VALUES IN('20201217');
      COMMIT;

Writer脚本参数

参数

描述

是否必选

默认值

endpoint

目标交互式分析(Hologres)实例对应的endpoint,格式为instance-id-region-endpoint.hologres.aliyuncs.com:port。您可以从交互式分析实例的管理页面获取。

endpoint包括公网、经典网络和VPC三种网络类型,请根据数据集成资源组和Hologres实例所在的网络环境选择正确的endpoint类型,否则会出现网络不通或者性能受限的情况:

  • 公网示例:instance-id-region-endpoint.hologres.aliyuncs.com:port

  • 经典网络示例:instance-id-region-endpoint-internal.hologres.aliyuncs.com:port

  • VPC示例:instance-id-region-endpoint-vpc.hologres.aliyuncs.com:port

通常建议数据集成资源组和Hologres实例在同一个地域的同一个可用区,以确保网络连通,实现最大性能。

accessId

访问Hologres的accessId。

accessKey

访问Hologres的accessKey,请确保该密钥对目标表有写入权限。

database

Hologres实例内部数据库的名称。

table

Hologres的表名称,目前支持表名称中包含Schema,例如schema_name.table_name

conflictMode

conflictMode包括Replace、Update和Ignore,详情请参见实现原理。

column

定义导入目标表的数据列,必须包含目标表的主键集合。例如["*"]表示全部列。

partition

针对分区表,表示分区Column以及对应的Value,格式为column=value

说明

  • 目前Hologres仅支持LIST分区,分区Column仅支持单个Column分区,且仅支持INT4或TEXT类型。

  • 请确认该参数和表DDL的分区配置匹配。

空,表示非分区表

truncate

写入Holo表之前是否需要清空目标表。

  • true:清空目标表。

    说明

    • 目前仅支持清空非分区表和静态分区表,不支持清空动态分区表,如果您是动态分区表,并且设置了参数值为true,同步任务将会异常退出。

    • 如果您是静态分区表,并设置了参数值为true,则会清空该分区子表数据,不会清空父表数据。

  • false:不清空目标表。

false

内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

阿里云企业补贴进行中: 马上申请

腾讯云限时活动1折起,即将结束: 马上收藏

同尘科技为腾讯云授权服务中心。

购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

转转请注明出处:https://www.yunxiaoer.com/172397.html

(0)
上一篇 2023年12月10日 下午4:17
下一篇 2023年12月10日
详情页2

相关推荐

  • 阿里云日志服务SLS序列分解函数-云淘科技

    序列分解函数提供针对业务曲线的分解功能,突出曲线的趋势信息和周期信息。 ts_decompose 函数格式: select ts_decompose(x, y) 参数说明如下: 参数 说明 取值 x 时间列,从小到大排列。 格式为Unixtime时间戳,单位为秒。 y 数值列,对应某时刻的数据。 – 示例: 查询分析: * | select ts…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云ECS云服务器搭建多个Web站点(Windows)-云淘科技

    本文介绍如何在Windows Server 2012 R2 64位系统的ECS实例上使用IIS服务器搭建多个Web站点。 前提条件 已注册阿里云账号。如还未注册,请先完成账号注册。 已创建ECS实例,并部署了Web环境。具体操作请参见部署Web环境。 背景信息 本教程适用于熟悉Windows操作系统,希望合理利用资源、统一管理站点以提高运维效率的用户。比如,…

    2023年12月9日
  • 阿里云日志服务SLS管理服务日志-云淘科技

    本文介绍了如何开通、关闭服务日志功能及如何修改服务日志的配置。 前提条件 已创建Project。具体操作,请参见创建Project。 使用RAM用户登录时,必须由阿里云账号授予其对应权限。具体操作,请参见RAM用户授权。 开通服务日志 重要 详细日志的计费方式与普通Logstore一致。更多信息,请参见计费项。 重要日志和任务运行日志的接入、存储与查询分析免…

    2023年12月10日
  • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
  • 阿里云日志服务SLS使用Aliyun Log Java Producer写入日志数据-云淘科技

    本文介绍日志服务Aliyun Log Java Producer类库的使用方法。 前提条件 已创建RAM用户并完成授权。具体操作,请参见创建RAM用户并完成授权。 已配置环境变量ALIBABA_CLOUD_ACCESS_KEY_ID和ALIBABA_CLOUD_ACCESS_KEY_SECRET。具体操作,请参见配置环境变量。 重要 阿里云账号的Access…

    阿里云日志服务SLS 2023年12月10日
  • 阿里云RDS数据库主实例规格列表-云淘科技

    本文介绍RDS的主实例规格,帮助您了解RDS主实例的最新规格信息和历史规格信息,您可以查看本文了解各个规格的具体配置。 RDS还支持通过添加只读实例来扩展读性能。关于只读实例规格请参见只读实例规格列表。 选定主实例规格后您就可以创建实例并进行后续操作。 各引擎主实例规格 RDS MySQL主实例规格列表(X86) RDS MySQL主实例规格列表(ARM) …

    阿里云数据库 2023年12月9日

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信
本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。