详情页标题前

阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技

详情页1

哈希聚簇(Hash Clustering)通过设置表的Shuffle和Sort属性,进而MaxCompute根据数据已有的存储特性,优化执行计划,提高效率,节省资源消耗。本文为您介绍在MaxCompute中如何使用Hash Clustering表。

背景信息

在MaxCompute查询中,连接(Join)表是很常见的场景。例如以下一个简单的Inner Join表Query示例:将t1表和t2表通过id列连接起来。

SELECT t1.a, t2.b FROM t1 JOIN t2 ON t1.id = t2.id;

Join在MaxCompute内部主要有三种实现方法:

  • Broadcast Hash Join

    当Join表中存在一个很小的表时,MaxCompute采用此方式,即把小表广播传递到所有的Join Task Instance上面,然后直接和表做Hash Join。

  • Shuffle Hash Join

    如果Join表比较大,就不能直接广播了。这时候把两个表按照Join Key做Hash Shuffle,由于相同的键值Hash结果也是一样的,这就保证了相同的Key的记录会收集到同一个Join Task Instance上面。然后每个Instance对数据量小的一路建Hash表,数据量大的顺序读取Join。

  • Sort Merge Join如果Join的表数据更大一些,Shuffle Hash Join方法也用不了,因为内存已经不足以容纳建立一个Hash Table。这时的实现方法是:先按照Join Key做Hash Shuffle,然后再按照Join Key做排序(Sort),最后对Join双方做一个归并,具体流程如下图所示:阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技实际上对于MaxCompute目前数据量和规模,绝大多数情况下都是使用的Sort Merge Join,但这其实是非常昂贵的操作。从上图可以看到,Shuffle的时候需要一次计算,并且中间结果需要落盘,后续Reducer读取的时候,又需要读取和排序的过程。对于M个Mapper和R个Reducer的场景,将产生M x R次的IO读取。对应的Fuxi物理执行计划如下所示,需要两个Mapper Stage,一个Join Stage,其中红色部分为Shuffle和Sort操作:阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技与此同时,有些Join是可能反复发生的,比如将Query改为:
    SELECT t1.c, t2.d FROM t1 JOIN t2 ON t1.id = t2.id;

    虽然选择的列不一样了,但是Join是完全一样的,整个Shuffle和Sort的过程也是完全一样的。又或者将Query改为:

    SELECT t1.c, t3.d FROM t1 JOIN t3 ON t1.id = t3.id;

    这个时候对表t1和t3来Join,但实际上对于t1而言,整个Shuffle和Sort过程还是完全一样。于是,考虑如果初始表数据生成时,按照Hash Shuffle和Sort的方式存储,那么后续查询中将避免对数据的再次Shuffle和Sort。这样做的好处是,虽然建表时付出了一次性的代价,却节省了将来可能产生的反复的Shuffle和Join。这时Join的Fuxi物理执行计划变成了如下,不仅节省了Shuffle和Sort的操作,并且查询从3个Stage变成了1个Stage完成:阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技

使用说明

创建Hash Clustering表

您可以使用以下语句创建Hash Clustering表。您需要指定Cluster Key(即Hash Key),以及Hash分片(Bucket)的数目。Sort是可选项,但在大多数情况下,建议和Cluster Key一致,以便取得最佳的优化效果。

  • 命令语法
    CREATE TABLE [IF NOT EXISTS] 
                 [([comment ], ...)]
                 [comment 
    ] [PARTITIONED BY ([comment ], ...)] [CLUSTERED BY ([, , ...]) [SORTED BY ([ASC | DESC] [, [ASC | DESC] ...])] INTO BUCKETS] [AS ]
  • 使用示例。
    • 非分区表。
      CREATE TABLE T1 (a string, b string, c bigint)
                   CLUSTERED BY (c)
                   SORTED by (c) INTO 1024 BUCKETS;
    • 分区表。
      CREATE TABLE T1 (a string, b string, c bigint)
             PARTITIONED BY (dt string)
             CLUSTERED BY (c)
             SORTED by (c) INTO 1024 BUCKETS;
  • 属性说明
    • CLUSTERED BY

      指定Hash Key,MaxCompute将对指定列进行Hash运算,按照Hash值分散到各个Bucket里面。为避免数据倾斜,避免热点,取得较好的并行执行效果,CLUSTERED BY列适宜选择取值范围大、重复键值少的列。此外为了达到Join优化的目的,也应该考虑选取常用的Join或Aggregation Key,即类似于传统数据库中的主键。

    • SORTED BY

      指定在Bucket内字段的排序方式,建议Sorted By和Clustered By一致,以取得较好的性能。此外当SORTED BY子句指定之后,MaxCompute将自动生成索引,并且在查询的时候利用索引来加速执行。

    • INTO number_of_buckets BUCKETS

      指定哈希桶的数目,这个数字必须提供,由数据量大小来决定。Bucket越多并发度越大,Job整体运行时间越短,但同时如果Bucket太多的话,可能导致小文件太多,另外并发度过高也会造成CPU时间的增加。目前推荐设置让每个Bucket数据大小在500MB ~ 1GB之间,如果是特别大的表,这个数值可以适当增加。对于Join优化的场景,两个表的Join要去掉Shuffle和Sort步骤,要求哈希桶数目成倍数关系,比如256512。目前建议桶的数目统一使用2的N次幂,比如512、1024、2048、4096,这样系统可以自动进行哈希桶的分裂和合并时,也可以去除Shuffle和Sort的步骤。

  • 更改表的Hash Clustering属性

    对于分区表支持通过ALTER TABLE语句,来增加或者去除Hash Clustering属性。

    • 命令语句
      --更改表为Hash Clustering表
      ALTER TABLE 
    [CLUSTERED BY ([, , ...]) [SORTED BY ([ASC | DESC] [, [ASC | DESC] ...])] INTO BUCKETS]; --更改Hash Clustering表为非Hash Clustering表 ALTER TABLE
    NOT CLUSTERED;
  • 注意事项
    • ALTER TABLE语句改变聚集属性,只对于分区表有效,非分区表一旦聚集属性建立就无法改变。
    • ALTER TABLE语句只会影响分区表的新建分区(包括insert overwrite生成的),新分区将按新的聚集属性存储,老的数据分区保持不变。
    • 由于ALTER TABLE语句只影响新分区,所以该语句不可以再指定PARTITION。
  • ALTER TABLE语句适用于存量表,在增加了新的聚簇属性之后,新的分区将做Hash Clustering存储。

    表属性显式验证

    在创建Hash Clustering Table之后,可以通过如下命令来查看表属性,Hash Clustering属性将显示在Extended Info里面。

    DESC EXTENDED 
    ;

    返回结果示例如下图所示。阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技对于分区表,除了使用以上命令查看表属性之后,还需要通过以下命令查看分区的属性。

    DESC EXTENDED 
    partition();

    返回结果示例如下图所示。阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技

    Hash Clustering优势

    Bucket Pruning和Index优化

    考虑以下查询:

    CREATE TABLE t1 (id bigint, 
                     a string, 
                     b string)
                 CLUSTERED BY (id)
                 SORTED BY (id) into 1000 BUCKETS; 
    ... 
    SELECT t1.a, t1.b FROM t1 WHERE t1.id=12345;

    对于普通表,这个通常意味着全表扫描操作,如果表非常大的情况下,资源消耗量是非常大的。但是因为我们已经对id做Hash Shuffle,并且对id做排序,查询可以极大简化:

    1. 通过查询值12345找到对应的Hash Bucket,这时候我们只需要在1个Bucket里面扫描,而不是全部1000个Bucket里面扫描。称之为Bucket Pruning。
    2. 因为Bucket内数据按id排序存放,MaxCompute会自动创建Index,利用Index lookup直接定位到相关记录。

    可以看出来,查询不仅大大减少了Mapper的个数,并且由于利用了Index,Mapper可以直接定位到数据所在Page,加载读入的数据量也大大减少。

    例如一个大数据任务,一共起了1111个Mapper,读取了427亿条记录,最后找符合条件记录26条,总共耗时1分48秒。同样的数据、同样的查询,使用Hash Clustering表来做,可以直接定位到单个Bucket,并利用Index只读取包含查询数据的Page,只用4个Mapper,读取10000条记录,总共耗时只需要6秒。

    Aggregation优化

    对于以下查询:

    SELECT department, SUM(salary) FROM employee GROUP BY (department);

    通常情况下会对department列数据进行Shuffle和Sort,然后做Stream Aggregate,统计每一个departmentGroup。但是如果表数据已经CLUSTERED BY (department) SORTED BY (department),那么这个Shuffle和Sort的操作,也就相应节省掉了。

    存储优化

    即便不考虑以上所述的各种计算上的优化,单单是把表Shuffle并排序存储,都会对于存储空间节省上有很大帮助。因为MaxCompute底层使用列存储,通过排序,将键值相同或相近的记录存放到一起,对于压缩、编码都会更加友好,从而使得压缩效率更高。在实际测试中,某些极端情况下,排序存储的表可以比无序表的存储空间节省50%。对于生命周期很长的表,使用Hash Clustering存储,是一个很值得的优化。

    以下是一个简单的实验,使用TPC-H数据集中100GB的lineitem表,包含了intdoublestring等多种数据类型,在数据和压缩方式等完全一样的情况下,对比使用Hash Clustering和未使用Hash Clustering的表存储大小,使用Hash Clustering的表存储节省了约10%,如下图所示。

    • 未使用Hash Clustering。阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技
    • 使用Hash Clustering。阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技

    测试数据及分析

    对于Hash Clustering整体带来的性能收益,通过标准的TPC-H测试集进行衡量。测试使用1TB数据,统一使用500 Buckets,除了nationregion两个极小的表以外,其余所有表均按照第一个列作为Cluster和Sort Key。整体测试结果表明,在使用了Hash Clustering之后,总CPU时间减少了约17.3%,总的Job运行时间减少了约12.8%

    需要注意到是TPC-H里并不是所有的Query都可以利用到Clustering属性,特别是两个耗时最长的Query没有办法利用上,所以从总体上的效率提升并不是非常惊人。但如果单看可以利用上Clustering属性的Query,收益还是非常明显的,比如Q4快了约68%、Q12快了约62%、Q10快了约47%等。

    以下是TPC-H Q4在普通表的Fuxi执行计划:阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技而下面则是使用Hash Clustering之后的执行计划,可以看到,此DAG被大大简化,这也是性能得到大幅提升的关键原因。阿里云云原生大数据计算服务 MaxComputeHash Clustering-云淘科技

    内容没看懂? 不太想学习?想快速解决? 有偿解决: 联系专家

    阿里云企业补贴进行中: 马上申请

    腾讯云限时活动1折起,即将结束: 马上收藏

    同尘科技为腾讯云授权服务中心。

    购买腾讯云产品享受折上折,更有现金返利:同意关联,立享优惠

    转转请注明出处:https://www.yunxiaoer.com/159199.html

    (0)
    上一篇 2023年12月10日
    下一篇 2023年12月10日
    详情页2

    相关推荐

    • 阿里云对象存储OSS常见问题-云淘科技

      本文介绍在使用ossutil时可能出现的问题及处理方法。 说明 本文各命令行示例均基于Linux 64位系统,其他系统请将命令开头的./ossutil64替换成对应的Binary名称。详情请参见命令行工具ossutil快速入门。 低频存储或标准存储的文件是否支持同步文件操作? 低频存储或标准存储的文件支持同步文件操作。关于同步文件的更多信息,请参见简介。 如…

      阿里云对象存储 2023年12月10日
    • 阿里云日志服务SLS投递CDN实时日志到SLS来分析用户访问数据-云淘科技

      本文介绍如何使用实时日志功能对用户访问日志进行分析。 概述 阿里云CDN产品的实时日志功能是CDN产品与SLS产品联合推出的一项功能,是一种时效性非常高(延迟在3分钟左右)的日志数据处理服务,能够将CDN节点上采集到的用户访问日志实时推送至SLS日志服务,然后可以通过SLS来存储和分析用户访问数据。在使用阿里云CDN产品来加速网站域名访问速度的情况下,用户访…

      阿里云日志服务SLS 2023年12月10日
    • 阿里云大数据开发治理平台 DataWorksMaxCompute数据源-云淘科技

      MaxCompute数据源作为数据中枢,为您提供读取和写入MaxCompute双向通道的功能。 使用限制 离线读 MaxCompute Reader支持读取分区表、非分区表,不支持读取虚拟视图、不支持同步外部表。 离线读MaxCompute分区表时,不支持直接对分区字段进行字段映射配置,需要在配置数据来源时指定待同步数据的分区信息。 例如,分区表t0其字段包…

    • 阿里云日志服务SLS日志采集Agent对比-云淘科技

      日志采集场景下客户端测评 DT时代,数以亿万计的服务器、移动终端、网络设备每天产生海量的日志。中心化的日志处理方案有效地解决了在完整生命周期内对日志的消费需求,而日志从设备采集上云是始于足下的第一步。 三款日志采集工具Logstash开源界ELK stack中的”L”,社区活跃,生态圈提供大量插件支持。Logstash基于JRuby实现,可以跨平台运行在JV…

      2023年12月10日
    • 信息流广告,信息流部分建议宽度830px,只针对默认列表样式,顺序随机
    • 阿里云云原生大数据计算服务 MaxCompute多类目检索-云淘科技

      Proxima CE支持多类目检索方式检索任务,本文为您介绍多类目检索功能的使用方法及示例。 前提条件 已安装Proxima CE包并准备输入表,详情请参见安装Proxima CE包。 按类目查询 当您有多个类目的向量数据,并且需要在每个类目下单独进行批量查询时,就需要使用按类目查询的功能。 建表命令 create table doc_table_float…

    联系我们

    400-800-8888

    在线咨询: QQ交谈

    邮件:admin@example.com

    工作时间:周一至周五,9:30-18:30,节假日休息

    关注微信
    本站为广大会员提供阿里云、腾讯云、华为云、百度云等一线大厂的购买,续费优惠,保证底价,买贵退差。